import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.common.message.Message; import com.fuint.fuintApplication; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.transaction.annotation.Transactional; import java.io.UnsupportedEncodingException; import java.util.List; @RunWith(SpringJUnit4ClassRunner.class) //@RunWith(SpringRunner.class) @SpringBootTest(classes = fuintApplication.class) @Transactional public class UserSeriviceTest { @Test public void test() throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException { //1.创建一个发送消息的对象Producer,指定分组(生产者分组) 等会讲 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.设定发送的命名服务器地址,连接上ns之后,才能拿到broker地址,发送消息 producer.setNamesrvAddr("39.104.58.101:9876"); //3.1启动发送的服务 producer.start(); //4.创建要发送的消息对象,指定topic,指定内容body Message msg = new Message("topic1","hello rocketmq".getBytes("UTF-8")); //3.2发送消息。这里是同步请求,如果broker没有给出响应,就拿不到返回值并且卡死在当前行代码 SendResult result = producer.send(msg); System.out.println("返回结果:"+result); //5.关闭连接 // producer.shutdown(); } @Test public void consumer() throws MQClientException { //1.创建一个接收消息的对象Consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.设定接收的命名服务器地址 consumer.setNamesrvAddr("39.104.58.101:9876"); //3.设置接收消息对应的topic,对应的sub标签为任意* // 如果想接收之前topic1的生产者发送的消息,这里的就要订阅相同的topic才可以 try { consumer.subscribe("topic1", "*"); } catch (MQClientException e) { e.printStackTrace(); } //4.开启监听,用于接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { /** * 设置好监听之后,只要有消息出现,就会调用 consumeMessage方法 * @param list 所有的消息都会存入该集合,供消费者消费 * @param consumeConcurrentlyContext 同时并行消费(多线程)的上下文 * @return */ @Override public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //遍历消息 for (MessageExt msg : list) { // System.out.println("收到消息:"+msg); System.out.println("消息:" + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.启动接收消息的服务 consumer.start(); System.out.println("接收消息服务已开启运行"); // 不能关闭消费者端服务器,因为对broker中topic设置了监听; // 该topic中只要有了新消息,就要通知消费者消费 // consumer.shutdown(); } }