2023-12-28 09:09

Spring Boot 整合RabbitMQ

wanmatea

JavaEE

(530)

(0)

收藏

1、在user-service添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


2、配置文件添加

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest


3、增加RabbitMQ配置类

package com.xxxx.user.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
    /******************direct**********************/
    /**
     * 创建direct队列
     * @return
     */
    @Bean
    public Queue directQueue(){
        return new Queue("directQueue");
    }
    /**
     * 创建direct交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("directExchange");
    }
    /**
     * 把队列和交换机绑定在一起
     * @param queue
     * @param directExchange
     * @return
     */
    @Bean
    public Binding bindingDirect(@Qualifier("directQueue") Queue queue, DirectExchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("routingKey");
    }
    /******************topic**********************/
    @Bean
    public Queue topicQuerue1(){
        return new Queue("topicQuerue1");
    }
    @Bean
    public Queue topicQuerue2(){
        return new Queue("topicQuerue2");
    }
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }
    @Bean
    public Binding bindingTopic1(@Qualifier("topicQuerue1") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("topic.key1");
    }
    /**
     * 通配符:* 表示一个词,# 表示零个或多个词
     * @param queue
     * @param topicExchange
     * @return
     */
    @Bean
    public Binding bindingTopic2(@Qualifier("topicQuerue2") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("topic.#");
    }
    /******************fanout**********************/
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanoutQueue1");
    }
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanoutQueue2");
    }
    @Bean
    public Queue fanoutQueue3(){
        return new Queue("fanoutQueue3");
    }
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }
    @Bean
    public Binding bindingFanout1(@Qualifier("fanoutQueue1") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
    @Bean
    public Binding bindingFanout2(@Qualifier("fanoutQueue2") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
    @Bean
    public Binding bindingFanout3(@Qualifier("fanoutQueue3") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}


4、新增消费监听类

package com.xxxx.user.consumer;
import com.xxxx.drp.common.entity.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RabbitListener(queues = "directQueue")
public class DataDirectReceiver {
    @RabbitHandler
    public void process(String data){
        log.info("收到directQueue队列信息:" + data);
    }
    @RabbitHandler
    public void process(UserInfo data){
        log.info("收到directQueue队列信息:" + data);
    }
}
package com.xxxx.user.consumer;
import com.xxxx.common.entity.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RabbitListener(queues = {"topicQuerue1","topicQuerue2"})
public class DataFanoutReceiver {
    @RabbitHandler
    public void process(String data){
        log.info("收到topicQuerue队列信息:" + data);
    }
    @RabbitHandler
    public void process(UserInfo data){
        log.info("收到topicQuerue队列信息:" + data);
    }
}
package com.xxxx.user.consumer;
import com.xxxx.common.entity.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RabbitListener(queues = {"fanoutQueue1","fanoutQueue2","fanoutQueue3"})
public class DataTopicReceiver {
    @RabbitHandler
    public void process(String data){
        log.info("收到topicQuerue队列信息:" + data);
    }
    @RabbitHandler
    public void process(UserInfo data){
        log.info("收到topicQuerue队列信息:" + data);
    }
}


5、消息生产端

package com.xxxx.user;
import com.xxxx.common.entity.UserInfo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class DataSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void sendDirect(){
        UserInfo userInfo = new UserInfo();
        userInfo.setUserAccount("tiger");
        userInfo.setPassword("12345");
        this.rabbitTemplate.convertAndSend("directExchange","routingKey",userInfo);
    }
    @Test
    public void sendTopic(){
        this.rabbitTemplate.convertAndSend("topicExchange","topic.key2","Hello world topic");
    }
    @Test
    public void sendFanout(){
        this.rabbitTemplate.convertAndSend("fanoutExchange","","Hello world topic");
    }
}


0条评论

点击登录参与评论