oil-station/fuintBackend/fuint-application/src/test/java/UserSeriviceTest.java

90 lines
4.1 KiB
Java
Raw Normal View History

2023-12-05 15:16:15 +08:00
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.message.Message;
import com.fuint.fuintApplication;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.transaction.annotation.Transactional;
import java.io.UnsupportedEncodingException;
import java.util.List;
@RunWith(SpringJUnit4ClassRunner.class)
//@RunWith(SpringRunner.class)
@SpringBootTest(classes = fuintApplication.class)
@Transactional
public class UserSeriviceTest {
@Test
public void test() throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {
//1.创建一个发送消息的对象Producer指定分组生产者分组 等会讲
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.设定发送的命名服务器地址连接上ns之后才能拿到broker地址发送消息
producer.setNamesrvAddr("39.104.58.101:9876");
//3.1启动发送的服务
producer.start();
//4.创建要发送的消息对象,指定topic指定内容body
Message msg = new Message("topic1","hello rocketmq".getBytes("UTF-8"));
//3.2发送消息。这里是同步请求如果broker没有给出响应就拿不到返回值并且卡死在当前行代码
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);
//5.关闭连接
// producer.shutdown();
}
@Test
public void consumer() throws MQClientException {
//1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.设定接收的命名服务器地址
consumer.setNamesrvAddr("39.104.58.101:9876");
//3.设置接收消息对应的topic,对应的sub标签为任意*
// 如果想接收之前topic1的生产者发送的消息这里的就要订阅相同的topic才可以
try {
consumer.subscribe("topic1", "*");
} catch (MQClientException e) {
e.printStackTrace();
}
//4.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* 设置好监听之后只要有消息出现就会调用 consumeMessage方法
* @param list 所有的消息都会存入该集合供消费者消费
* @param consumeConcurrentlyContext 同时并行消费(多线程)的上下文
* @return
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//遍历消息
for (MessageExt msg : list) {
// System.out.println("收到消息:"+msg);
System.out.println("消息:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动接收消息的服务
consumer.start();
System.out.println("接收消息服务已开启运行");
// 不能关闭消费者端服务器因为对broker中topic设置了监听
// 该topic中只要有了新消息就要通知消费者消费
// consumer.shutdown();
}
}