oil-station/fuintBackend/fuint-application/src/test/java/UserSeriviceTest.java
2024-01-02 11:21:12 +08:00

178 lines
7.2 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import com.fuint.business.store.service.StoreService;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.message.Message;
import com.fuint.fuintApplication;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
import java.util.List;
@RunWith(SpringJUnit4ClassRunner.class)
//@RunWith(SpringRunner.class)
@SpringBootTest(classes = fuintApplication.class)
@Transactional
public class UserSeriviceTest {
@Test
public void test() throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {
// //1.创建一个发送消息的对象Producer指定分组生产者分组 等会讲
// DefaultMQProducer producer = new DefaultMQProducer("group1");
//
// //2.设定发送的命名服务器地址连接上ns之后才能拿到broker地址发送消息
// producer.setNamesrvAddr("47.95.206.185:9876");
//
// //3.1启动发送的服务
// producer.start();
//
// //4.创建要发送的消息对象,指定topic指定内容body
// Message msg = new Message("topic1","hello rocketmq".getBytes("UTF-8"));
// //3.2发送消息。这里是同步请求如果broker没有给出响应就拿不到返回值并且卡死在当前行代码
// SendResult result = producer.send(msg);
// System.out.println("返回结果:"+result);
// //5.关闭连接
// producer.shutdown();
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("47.95.206.185:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
@Test
public void consumer() throws MQClientException {
// //1.创建一个接收消息的对象Consumer
// DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// //2.设定接收的命名服务器地址
// consumer.setNamesrvAddr("47.95.206.185:9876");
// //3.设置接收消息对应的topic,对应的sub标签为任意*
// // 如果想接收之前topic1的生产者发送的消息这里的就要订阅相同的topic才可以
// try {
// consumer.subscribe("topic1", "*");
// } catch (MQClientException e) {
// e.printStackTrace();
// }
// //4.开启监听,用于接收消息
// consumer.registerMessageListener(new MessageListenerConcurrently() {
// /**
// * 设置好监听之后,只要有消息出现,就会调用 consumeMessage方法
// * @param list 所有的消息都会存入该集合,供消费者消费
// * @param consumeConcurrentlyContext 同时并行消费(多线程)的上下文
// * @return
// */
// @Override
// public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// //遍历消息
// for (MessageExt msg : list) {
//// System.out.println("收到消息:"+msg);
// System.out.println("消息:" + new String(msg.getBody()));
// }
// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// }
// });
// //5.启动接收消息的服务
// consumer.start();
// System.out.println("接收消息服务已开启运行");
// 不能关闭消费者端服务器因为对broker中topic设置了监听
// 该topic中只要有了新消息就要通知消费者消费
// consumer.shutdown();
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("47.95.206.185:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
for (MessageExt msg : msgs) {
byte[] body = msg.getBody();
String messageBody = null;
try {
messageBody = new String(body, RemotingHelper.DEFAULT_CHARSET);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.printf("%s 转成之后的信息: %s%n", Thread.currentThread().getName(), messageBody);
// 在这里添加你的业务逻辑,处理消息
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
@Resource
StoreService storeService;
@Test
public void test2() {
// 将流水上限和时间上限的部门全部设置为不可用
storeService.flowflowConfiguration();
// 将过期油站设置为不可用
storeService.petrolStationsAreExpired();
}
}