SpringBoot整合rabbitMQ

1.消息队列概述

1.1.消息队列和MQ

MQ全称Message Queue,消息队列是应用程序之间的通信方法。

  • 为什么使用MQ?

    在项目中,可将一些无需及时返回且耗时的操作提取出来,进行异步处理,这种处理方式大大节省了服务器的响应时间,从而提高系统的吞吐量

  • 应用场景

    • 任务异步处理

    • 应用程序解耦

      相当于中介,生产方通过MQ与消费者交互。

1.2.AMQP和JMS

MQ的两种实现方式—>AMQP,JMS。

1.2.1 AMQP

是一种协议(链接协议),和JMS的本质区别是不从API层进行限定,直接定义网络交换的数据格式。

1.2.2 JMS

java消息服务,是一个java平台中关于面向消息中间件的API,应用于两个程序之间,或分布式系统中发送消息,进行异步通信。

1.2.3 AMQP和IMS区别

  • JMS定义统一的接口,对消息进行统一操作;A是通过协议来统一数据交换的格式。
  • J必须是Java语言,A是协议,跨语言
  • J规定两种消息模式;A更丰富。

1.3.消息队列产品

  • ActiveMQ:基于JMS
  • ZeroMQ:基于C开发
  • RabbitMQ:AMQP协议,erlang语言,性能好
  • RocketMQ:基于JMS,alibaba
  • kafka:类似于MQ的产品,分布式消息系统,高吞吐量

1.4.RabbitMQ

提供6种模式:简单模式,work模式,publish/subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用。

2.安装及配置MQ

  • 安装erlang

  • 安装MQ

  • 安装MQ的图形管理插件

  • 创建管理用户

  • 创建虚拟主机Virtual Hosts

    1
    2
    3
    4
    5
    graph LR;
    1[安装erlang]-->2[安装RabbitMQ]
    2-->3[安装管理插件]
    3-->4[创建用户]
    4-->5[创建虚拟主机]

    安装过程中以管理员身份运行

3.RabbitMQ入门

1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>

3.1 生产者

生产者发送消息到RabbitMQ的队列(simple_queue);消费者可以从队列中获取信息,可以使用简单模式(simple)。

  • 创建连接工厂(设置MQ的连接参数)

  • 创建连接

  • 创建频道

  • 声明队列

  • 发送消息

  • 关闭资源

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    package com.hhzhu.rabbitmq;

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * Description:
    *简单模式:发送消息
    * @author ZhuHh
    * @data Create on 2020/3/16
    */
    public class Producer {
    static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
    //1创建连接工厂,ctrl+ait+v快速生成对象
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //主机默认localhost
    connectionFactory.setHost("localhost");
    //端口:默认5672
    connectionFactory.setPort(5672);
    //虚拟主机:默认/
    connectionFactory.setVirtualHost("/hhzhu");
    //用户名,默认guest
    connectionFactory.setUsername("root");
    connectionFactory.setPassword("shei");
    //2创建连接
    Connection connection = connectionFactory.newConnection();
    //3创建频道
    Channel channel = connection.createChannel();
    //4声明队列
    /**
    * 参数1:队列名称
    * 参数2:是否定义持久化
    * 参数3:是否独占本连接
    * 参数4:是否不用时自动删除
    * 参数5:其他参数
    */
    channel.queueDeclare(QUEUE_NAME,true,false,false,null);
    //5发送消息
    String message = "你好,小兔纸。";
    /**
    * 参数1:交换机名称,空串使用迷人交换机
    * 参数2:路由key,简单模式中,可以使用队列名称
    * 参数3:消息其他属性
    * 参数4:消息内容
    */
    channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
    System.out.println("已发送消息:"+message);
    //6关闭资源
    channel.close();
    connection.close();

    }
    }

    在设置连接工场的时候,不设置会有默认;

3.2 消费者

从RabbitMQ中队列(与生产者发送消息的队列一致)接收消息。

  • 创建连接工厂的连接参数

  • 创建连接(抽取一个获取连接的工具类)

  • 创建频道

  • 声明队列

  • 创建消费者(接收消息并处理接收到的消息)

  • 监听队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    package com.hhzhu.simple;

    import com.hhzhu.util.ConnectionUtil;
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * Description:
    *使用简单模式,消费者接收消息
    * @author ZhuHh
    * @data Create on 2020/3/16
    */
    public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtil.getConnection();
    //创建频道
    Channel channel = connection.createChannel();
    //声明队列
    channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
    //创建消费者
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    //路由key
    System.out.println("路由key为:"+envelope.getRoutingKey());
    //交换机
    System.out.println("交换机为:"+envelope.getExchange());
    //消息ID
    System.out.println("消息ID为:"+envelope.getDeliveryTag());
    //接收到的消息
    System.out.println("接收到的消息为:"+new String(body,"utf-8"));
    }
    };
    //监听队列
    /**
    * 参数1:队列名
    * 参数2:是否自动确认,设置true表示消息接收到自动向MQ回复接收到了,MQ将消息从队列删除,false则需要手动确认
    * 参数3:消息的消费者
    */
    channel.basicConsume(Producer.QUEUE_NAME,true,defaultConsumer);

    }
    }

    消费者持续监听队列消息,不需要关闭资源

    如果消费者在听一个队列中有多个,消息如何分配?

4.RabbitMQ工作模式

4.1 Work queue工作模式

与simple相比,多了一个或多个消费者,多个消费者共同消费同一个队列中的消息。(消费者之间对消息的接收是竞争关系)。一个消息只能被一个消费者接收。

应用场景:可以在消费者端处理任务比较耗时的时候,添加对同一个队列的消费者来提高任务处理能力。

生产者:发送30个消息

消费者:创建2个消费者监听同一个队列,查看两个消费者接收消息是否存在重复。

  • 抽取连接工具类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    package com.hhzhu.util;

    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * Description:
    *创建连接工具类
    * @author ZhuHh
    * @data Create on 2020/3/16
    */
    public class ConnectionUtil {

    public static Connection getConnection() throws IOException, TimeoutException {
    //1创建连接工厂,ctrl+ait+v快速生成对象
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //主机默认localhost
    connectionFactory.setHost("localhost");
    //端口:默认5672
    connectionFactory.setPort(5672);
    //虚拟主机:默认/
    connectionFactory.setVirtualHost("/hhzhu");
    //用户名,默认guest
    connectionFactory.setUsername("root");
    connectionFactory.setPassword("shei");
    //2创建连接
    return connectionFactory.newConnection();
    }
    }

  • 生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    package com.hhzhu.work;

    import com.hhzhu.util.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * Description:
    *work工作队列模式:发送消息
    * @author ZhuHh
    * @data Create on 2020/3/16
    */
    public class Producer {
    static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

    Connection connection = ConnectionUtil.getConnection();
    //3创建频道
    Channel channel = connection.createChannel();
    //4声明队列
    /**
    * 参数1:队列名称
    * 参数2:是否定义持久化
    * 参数3:是否独占本连接
    * 参数4:是否不用时自动删除
    * 参数5:其他参数
    */
    channel.queueDeclare(QUEUE_NAME,true,false,false,null);
    for(int i = 0;i<30; i++) {
    //5发送消息
    String message = "你好,小兔纸。work模式"+i ;
    /**
    * 参数1:交换机名称,空串使用迷人交换机
    * 参数2:路由key,简单模式中,可以使用队列名称
    * 参数3:消息其他属性
    * 参数4:消息内容
    */
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println("已发送消息:" + message);
    }
    //6关闭资源
    channel.close();
    connection.close();

    }
    }

  • 消费者1

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    package com.hhzhu.work;

    import com.hhzhu.util.ConnectionUtil;
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * Description:
    *使用简单模式,消费者接收消息
    * @author ZhuHh
    * @data Create on 2020/3/16
    */
    public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtil.getConnection();
    //创建频道
    Channel channel = connection.createChannel();
    //声明队列
    channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
    //每次可以获取多少个信息
    channel.basicQos(1);
    //创建消费者
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    try {
    //路由key
    System.out.println("路由key为:"+envelope.getRoutingKey());
    //交换机
    System.out.println("交换机为:"+envelope.getExchange());
    //消息ID
    System.out.println("消息ID为:"+envelope.getDeliveryTag());
    //接收到的消息
    System.out.println("消费者1---接收到的消息为:"+new String(body,"utf-8"));

    Thread.sleep(1000);
    //确认消息
    /**
    * 参数1:消息ID
    * 参数2:是否确认,false表示只有当前这个消息被处理。
    */
    channel.basicAck(envelope.getDeliveryTag(),false);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    };
    //监听队列
    /**
    * 参数1:队列名
    * 参数2:是否自动确认,设置true表示消息接收到自动向MQ回复接收到了,MQ将消息从队列删除,false则需要手动确认
    * 参数3:消息的消费者
    */
    channel.basicConsume(Producer.QUEUE_NAME,true,defaultConsumer);

    }
    }

  • 消费者2

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    package com.hhzhu.work;

    import com.hhzhu.util.ConnectionUtil;
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * Description:
    *使用简单模式,消费者接收消息
    * @author ZhuHh
    * @data Create on 2020/3/16
    */
    public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtil.getConnection();
    //创建频道
    Channel channel = connection.createChannel();
    //声明队列
    channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
    //每次可以获取多少个信息
    channel.basicQos(1);
    //创建消费者
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    try {
    //路由key
    System.out.println("路由key为:"+envelope.getRoutingKey());
    //交换机
    System.out.println("交换机为:"+envelope.getExchange());
    //消息ID
    System.out.println("消息ID为:"+envelope.getDeliveryTag());
    //接收到的消息
    System.out.println("消费者2---接收到的消息为:"+new String(body,"utf-8"));

    Thread.sleep(1000);
    //确认消息
    /**
    * 参数1:消息ID
    * 参数2:是否确认,false表示只有当前这个消息被处理。
    */
    channel.basicAck(envelope.getDeliveryTag(),false);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    };
    //监听队列
    /**
    * 参数1:队列名
    * 参数2:是否自动确认,设置true表示消息接收到自动向MQ回复接收到了,MQ将消息从队列删除,false则需要手动确认
    * 参数3:消息的消费者
    */
    channel.basicConsume(Producer.QUEUE_NAME,true,defaultConsumer);

    }
    }

4.2 订阅模式

说出订阅模式中的Exchange交换机的作用和交换机的三种类型

前面两个案例,只有3个角色:生产者,消费者,消息队列

在订阅者模型,多了一个Exchange角色,过程略有变化:

  • P:生产者,发送给exchange
  • C:消费者,一直等待消息的到来
  • Queue:接收消息,缓存消息
  • Exchange:交换机,接收P消息,知道如何处理消息,三种
    • Fanout:广播,将消息交给所有的交换机队列(竞争)
    • Direct:定向,符合指定routing key 的队列
    • Topic:通配符,把消息交给routing patern的队列

Exchange只负责转发消息,不具备存储能力,如果没有任何队列与exchange绑定,那么消息会丢失。

  • 生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    package com.hhzhu.ps;

    import com.hhzhu.util.ConnectionUtil;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * Description:
    *发布与订阅,生产者
    * @author ZhuHh
    * @data Create on 2020/3/16
    */
    public class Producer {
    //交换机名称
    static final String FANOUT_EXCHANGE = "fanout_exchange";
    //队列名称
    static final String FANOUT_QUEUE_1 = "fanout_queue1";
    static final String FANOUT_QUEUE_2 = "fanout_queue2";


    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtil.getConnection();
    //2创建频道
    Channel channel = connection.createChannel();
    //声明交换机
    /**
    * 参数1:名称
    * 参数2:交换机类型(fanout,direct,topic)
    */
    channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
    //4声明队列
    /**
    * 参数1:队列名称
    * 参数2:是否定义持久化
    * 参数3:是否独占本连接
    * 参数4:是否不用时自动删除
    * 参数5:其他参数
    */
    channel.queueDeclare(FANOUT_QUEUE_1,true,false,false,null);
    channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null);
    //5队列绑定到交换机
    /**
    * 参数1:队列名称
    * 参数2:交换机名称
    * 参数3:路由key,这里没用到
    */
    channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,"");
    channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,"");
    //5发送消息
    for(int i=1;i<=10;i++) {
    String message = "你好,小兔纸。发布与订阅---"+i;
    /**
    * 参数1:交换机名称,空串使用迷人交换机
    * 参数2:路由key,简单模式中,可以使用队列名称
    * 参数3:消息其他属性
    * 参数4:消息内容
    */
    channel.basicPublish(FANOUT_EXCHANGE, "", null, message.getBytes());
    System.out.println("已发送消息:" + message);
    }
    //6关闭资源
    channel.close();
    connection.close();

    }
    }

  • 消费者1

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    package com.hhzhu.ps;

    import com.hhzhu.util.ConnectionUtil;
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * Description:
    *使用发布与订阅模式,消费者接收消息
    * @author ZhuHh
    * @data Create on 2020/3/16
    */
    public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtil.getConnection();
    //2创建频道
    Channel channel = connection.createChannel();
    //3声明交换机
    channel.exchangeDeclare(Producer.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);
    //4声明队列
    channel.queueDeclare(Producer.FANOUT_QUEUE_1,true,false,false,null);
    //5队列绑定到交换机上
    channel.queueBind(Producer.FANOUT_QUEUE_1,Producer.FANOUT_EXCHANGE,"");
    //6创建消费者
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    //路由key
    System.out.println("路由key为:"+envelope.getRoutingKey());
    //交换机
    System.out.println("交换机为:"+envelope.getExchange());
    //消息ID
    System.out.println("消息ID为:"+envelope.getDeliveryTag());
    //接收到的消息
    System.out.println("消费者1接收到的消息为:"+new String(body,"utf-8"));
    }
    };
    //监听队列
    /**
    * 参数1:队列名
    * 参数2:是否自动确认,设置true表示消息接收到自动向MQ回复接收到了,MQ将消息从队列删除,false则需要手动确认
    * 参数3:消息的消费者
    */
    channel.basicConsume(Producer.FANOUT_QUEUE_1,true,defaultConsumer);

    }
    }

  • 消费者2:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    package com.hhzhu.ps;

    import com.hhzhu.util.ConnectionUtil;
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * Description:
    *使用发布与订阅模式,消费者接收消息
    * @author ZhuHh
    * @data Create on 2020/3/16
    */
    public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtil.getConnection();
    //2创建频道
    Channel channel = connection.createChannel();
    //3声明交换机
    channel.exchangeDeclare(Producer.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);
    //4声明队列
    channel.queueDeclare(Producer.FANOUT_QUEUE_2,true,false,false,null);
    //5队列绑定到交换机上
    channel.queueBind(Producer.FANOUT_QUEUE_2,Producer.FANOUT_EXCHANGE,"");
    //6创建消费者
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    //路由key
    System.out.println("路由key为:"+envelope.getRoutingKey());
    //交换机
    System.out.println("交换机为:"+envelope.getExchange());
    //消息ID
    System.out.println("消息ID为:"+envelope.getDeliveryTag());
    //接收到的消息
    System.out.println("消费者2接收到的消息为:"+new String(body,"utf-8"));
    }
    };
    //监听队列
    /**
    * 参数1:队列名
    * 参数2:是否自动确认,设置true表示消息接收到自动向MQ回复接收到了,MQ将消息从队列删除,false则需要手动确认
    * 参数3:消息的消费者
    */
    channel.basicConsume(Producer.FANOUT_QUEUE_2,true,defaultConsumer);

    }
    }

  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.

请我喝杯咖啡吧~

支付宝
微信