1、在user-service添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2、配置文件添加
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
3、增加RabbitMQ配置类
package com.xxxx.user.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { /******************direct**********************/ /** * 创建direct队列 * @return */ @Bean public Queue directQueue(){ return new Queue("directQueue"); } /** * 创建direct交换机 * @return */ @Bean public DirectExchange directExchange(){ return new DirectExchange("directExchange"); } /** * 把队列和交换机绑定在一起 * @param queue * @param directExchange * @return */ @Bean public Binding bindingDirect(@Qualifier("directQueue") Queue queue, DirectExchange directExchange){ return BindingBuilder.bind(queue).to(directExchange).with("routingKey"); } /******************topic**********************/ @Bean public Queue topicQuerue1(){ return new Queue("topicQuerue1"); } @Bean public Queue topicQuerue2(){ return new Queue("topicQuerue2"); } @Bean public TopicExchange topicExchange(){ return new TopicExchange("topicExchange"); } @Bean public Binding bindingTopic1(@Qualifier("topicQuerue1") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){ return BindingBuilder.bind(queue).to(topicExchange).with("topic.key1"); } /** * 通配符:* 表示一个词,# 表示零个或多个词 * @param queue * @param topicExchange * @return */ @Bean public Binding bindingTopic2(@Qualifier("topicQuerue2") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){ return BindingBuilder.bind(queue).to(topicExchange).with("topic.#"); } /******************fanout**********************/ @Bean public Queue fanoutQueue1(){ return new Queue("fanoutQueue1"); } @Bean public Queue fanoutQueue2(){ return new Queue("fanoutQueue2"); } @Bean public Queue fanoutQueue3(){ return new Queue("fanoutQueue3"); } @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fanoutExchange"); } @Bean public Binding bindingFanout1(@Qualifier("fanoutQueue1") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } @Bean public Binding bindingFanout2(@Qualifier("fanoutQueue2") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } @Bean public Binding bindingFanout3(@Qualifier("fanoutQueue3") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } }
4、新增消费监听类
package com.xxxx.user.consumer; import com.xxxx.drp.common.entity.UserInfo; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j @RabbitListener(queues = "directQueue") public class DataDirectReceiver { @RabbitHandler public void process(String data){ log.info("收到directQueue队列信息:" + data); } @RabbitHandler public void process(UserInfo data){ log.info("收到directQueue队列信息:" + data); } } package com.xxxx.user.consumer; import com.xxxx.common.entity.UserInfo; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j @RabbitListener(queues = {"topicQuerue1","topicQuerue2"}) public class DataFanoutReceiver { @RabbitHandler public void process(String data){ log.info("收到topicQuerue队列信息:" + data); } @RabbitHandler public void process(UserInfo data){ log.info("收到topicQuerue队列信息:" + data); } } package com.xxxx.user.consumer; import com.xxxx.common.entity.UserInfo; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j @RabbitListener(queues = {"fanoutQueue1","fanoutQueue2","fanoutQueue3"}) public class DataTopicReceiver { @RabbitHandler public void process(String data){ log.info("收到topicQuerue队列信息:" + data); } @RabbitHandler public void process(UserInfo data){ log.info("收到topicQuerue队列信息:" + data); } }
5、消息生产端
package com.xxxx.user; import com.xxxx.common.entity.UserInfo; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest public class DataSender { @Autowired private RabbitTemplate rabbitTemplate; @Test public void sendDirect(){ UserInfo userInfo = new UserInfo(); userInfo.setUserAccount("tiger"); userInfo.setPassword("12345"); this.rabbitTemplate.convertAndSend("directExchange","routingKey",userInfo); } @Test public void sendTopic(){ this.rabbitTemplate.convertAndSend("topicExchange","topic.key2","Hello world topic"); } @Test public void sendFanout(){ this.rabbitTemplate.convertAndSend("fanoutExchange","","Hello world topic"); } }
0条评论
点击登录参与评论