import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.common.message.Message; import com.fuint.fuintApplication; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.transaction.annotation.Transactional; import java.io.UnsupportedEncodingException; import java.util.List; @RunWith(SpringJUnit4ClassRunner.class) //@RunWith(SpringRunner.class) @SpringBootTest(classes = fuintApplication.class) @Transactional public class UserSeriviceTest { @Test public void test() throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException { // //1.创建一个发送消息的对象Producer,指定分组(生产者分组) 等会讲 // DefaultMQProducer producer = new DefaultMQProducer("group1"); // // //2.设定发送的命名服务器地址,连接上ns之后,才能拿到broker地址,发送消息 // producer.setNamesrvAddr("47.95.206.185:9876"); // // //3.1启动发送的服务 // producer.start(); // // //4.创建要发送的消息对象,指定topic,指定内容body // Message msg = new Message("topic1","hello rocketmq".getBytes("UTF-8")); // //3.2发送消息。这里是同步请求,如果broker没有给出响应,就拿不到返回值并且卡死在当前行代码 // SendResult result = producer.send(msg); // System.out.println("返回结果:"+result); // //5.关闭连接 // producer.shutdown(); //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("47.95.206.185:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } @Test public void consumer() throws MQClientException { // //1.创建一个接收消息的对象Consumer // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); // //2.设定接收的命名服务器地址 // consumer.setNamesrvAddr("47.95.206.185:9876"); // //3.设置接收消息对应的topic,对应的sub标签为任意* // // 如果想接收之前topic1的生产者发送的消息,这里的就要订阅相同的topic才可以 // try { // consumer.subscribe("topic1", "*"); // } catch (MQClientException e) { // e.printStackTrace(); // } // //4.开启监听,用于接收消息 // consumer.registerMessageListener(new MessageListenerConcurrently() { // /** // * 设置好监听之后,只要有消息出现,就会调用 consumeMessage方法 // * @param list 所有的消息都会存入该集合,供消费者消费 // * @param consumeConcurrentlyContext 同时并行消费(多线程)的上下文 // * @return // */ // @Override // public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { // //遍历消息 // for (MessageExt msg : list) { //// System.out.println("收到消息:"+msg); // System.out.println("消息:" + new String(msg.getBody())); // } // return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // } // }); // //5.启动接收消息的服务 // consumer.start(); // System.out.println("接收消息服务已开启运行"); // 不能关闭消费者端服务器,因为对broker中topic设置了监听; // 该topic中只要有了新消息,就要通知消费者消费 // consumer.shutdown(); // Instantiate with specified consumer group name. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); // Specify name server addresses. consumer.setNamesrvAddr("47.95.206.185:9876"); // Subscribe one more more topics to consume. consumer.subscribe("TopicTest", "*"); // Register callback to execute on arrival of messages fetched from brokers. consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); for (MessageExt msg : msgs) { byte[] body = msg.getBody(); String messageBody = null; try { messageBody = new String(body, RemotingHelper.DEFAULT_CHARSET); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } System.out.printf("%s 转成之后的信息: %s%n", Thread.currentThread().getName(), messageBody); // 在这里添加你的业务逻辑,处理消息 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //Launch the consumer instance. consumer.start(); System.out.printf("Consumer Started.%n"); } }