2022-01-20 11:48

RabbitMQ示例代码

wanmatea

JavaEE

(844)

(0)

收藏

首先添加依赖:

<dependency>  
<groupId>com.rabbitmq</groupId>  
<artifactId>amqp-client</artifactId> 
<version>5.14.1</version>
</dependency>

RabbitMQ示例图示:

image.png 

生产者代码:

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建链接工厂
        ConnectionFactory connFac = new ConnectionFactory() ;
        //默认链接的主机名,RabbitMQ-Server安装在本机,所以可以直接用127.0.0.1
        connFac.setHost("127.0.0.1");
        //创建链接
        Connection conn = connFac.newConnection() ;
        //创建信息管道
        Channel channel = conn.createChannel() ;
        // 创建一个名为queue01的队列,防止队列不存在
        String queueName = "queue01" ;
        //进行信息声明        1.队列名2.是否持久化,3是否局限与链接,4不再使用是否删除,5其他的属性
        channel.queueDeclare(queueName, false, false, false, null) ;
        String msg = "Hello World!";
        //发送消息
        // 在RabbitMQ中,消息是不能直接发送到队列,它需要发送到交换器(exchange)中。
        // 第一参数空表示使用默认exchange,第二参数表示发送到的queue,第三参数是发送的消息是(字节数组)
        channel.basicPublish("", queueName , null , msg.getBytes());
        System.out.println("发送  message[" + msg + "] to "+ queueName +" success!");
        //关闭管道
        channel.close();
        //关闭连接
        conn.close();
    }
}

运行上面的代码,控制台显示:

image.png 

队列中显示:

image.png 

消费者代码:

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建链接工厂
        ConnectionFactory connFac = new ConnectionFactory() ;
        //默认链接的主机名,RabbitMQ-Server安装在本机,所以可以直接用127.0.0.1
        connFac.setHost("127.0.0.1");
        //创建链接
        Connection conn = connFac.newConnection() ;
        //创建信息管道
        Channel channel = conn.createChannel() ;
        //定义Queue名称
        String queueName = "queue01";
        //1.队列名2.是否持久化,3是否局限与链接,4不再使用是否删除,5其他的属性
        channel.queueDeclare(queueName, false, false, false, null) ;
        //上面的部分,与Producer是一样的
        //声明一个消费者,配置好获取消息的方式
        channel.basicConsume(queueName, true, new MyConsumer(channel));
    }
}

实现自己的Consumer:

public class MyConsumer extends DefaultConsumer {
    public MyConsumer(Channel channel){
        super(channel);
    }
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }
}

运行上面的代码,控制台显示:

image.png 

队列中显示:

image.png 

0条评论

点击登录参与评论