RabbitMQ中五种队列的学习

HarryZhang 2019年07月15日 160次浏览

RabbitMQ 特点

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。

AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

Helloword

本文demo基于java语言

  1. maven工程的pom文件中添加依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>
  1. RabbitMQUtil工具类
package util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * RabbitMQ Util
 *
 * @author HarryZhang
 */
public class RabbitMQUtil {
    private static Connection connection;

    /**
     * getConnection
     *
     * @return Connection
     */
    public static Connection getConnection() {
        if (connection != null) {
            return connection;
        }
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("127.0.0.1");//MQ ip address

        factory.setPort(5672);//port

        factory.setUsername("HarryZhang");//username

        factory.setPassword("123456");//password

        try {
            connection = factory.newConnection();
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
        return connection;
    }
}

  1. 消息生产者
import com.rabbitmq.client.Channel;
import util.RabbitMQUtil;

import java.io.IOException;

public class Producer {
    //队列名
    private static final String QUEUE_NAME="debugers_test";
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMQUtil.getConnection().createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String message="Hello HarryZhang";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("推送成功");
    }
}

  1. 消息消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import util.RabbitMQUtil;

import java.io.IOException;

public class Consumer {
    private static final String QUEUE_NAME="debugers_test";
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMQUtil.getConnection().createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        DeliverCallback deliverCallback=(consumerTag, delivery) -> {
            String message=new String(delivery.getBody(),"utf-8");
            System.out.println("接收消息:"+message);
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

上面这里使用的新版api来操作的,原来的while(true)那种方式已经被抛弃了

  1. 运行 Consumer 先运行 Consumer ,这样当生产者发送消息的时候能在消费者后端看到消息记录。
  2. 运行 Producer 接着运行 Producer ,发布一条消息,在 Consumer 的控制台能看到接收的消息:

helloword

Work Queues 模式

python-two

一个生产者,多个消费者。一个消息只能被一个消费者所获取。

轮询分发(Round Robin)

**概念:**使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。

  1. 消费者(具体几个自己决定,代码都一样,修改一下睡眠时间方便看出不同就ok)
package worker;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import util.RabbitMQUtil;

public class Recv1 {
    private static final String QUEUE_NAME="debugers_test";
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        DeliverCallback deliverCallback=(consumerTag, delivery) -> {
            String message=new String(delivery.getBody(),"utf-8");
            System.out.println(" [x] Received '" + message + "'");
            // 休眠1秒
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

  1. 生产者

    向队列中发送100条数据。

package workerfair;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitMQUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Producer {
    private static final String QUEUE_NAME="debugers_test";
    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        for (int i = 0; i < 100; i++) {
            // 消息内容
            String message = "" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            Thread.sleep(i * 10);
        }
        channel.close();
        connection.close();
    }
}

运行结果:

有3个消费者(Recv1,Recv2,Recv3)他们的休息时间分别是1000,500,100 ms。

  1. 每个消费者获取到的消息内容是不同的,每个消费者只能获取到一个消息
  2. 三个消费者获取到的消息数量基本是相同的。(注意我每个消费者的sleep()的时间是不一样的)

为什么会这样呢?不是应该劳者多得吗?有点不合理呀!我Recv3这么辛苦跟其他的收获却是一样的。RabbitMQ默认将消息顺序发送给下一个消费者,这样,每个消费者会得到相同数量的消息。即轮询(round-robin)分发消息。

公平分发(Fair Dispatch)

消息分发者是不了解消费者的能力的。

因为在系统中,程序处理快的,就要多干点,处理慢的就想干也干不了的,就是要充分利用消费者,别闲着。

通过basicQos(perfetch)autoAck配合来实现。

  • basicQos:设置同一时刻服务器只会发perfetch**(此处为1)**条消息给消费者
  • autoAck:将自动应答改为手动。就处理完一条消息后手动提交。
  1. 消费者(Recv
package workerfair;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import util.RabbitMQUtil;

public class Recv1 {
    private static final String QUEUE_NAME="debugers_test";
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        DeliverCallback deliverCallback=(consumerTag, delivery) -> {
            String message=new String(delivery.getBody(),"utf-8");
            System.out.println(" [x] Received '" + message + "'");
            // 休眠1秒
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                System.out.println("[x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            }
        };
        //修改为手动应答,true为自动应答,false相反
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }
}

**注意:**使用公平分发,必须关闭自动应答ack,然后改成手动应答方式。

运行结果:

有3个消费者(Recv1,Recv2,Recv3)他们的休息时间分别是1000,500,100 ms。

  • Recv3获得消息最多,Recv2次之,Recv1最后。

发布订阅模式(Publish/Subscribe)

img

比如: 公众号只有你订阅后才可以收到这个公众号推送的消息.

特点:

  • 一个生产者,多个消费者
  • 每个消费者都有自己的队列
  • 生产者没有直接将消息发送到队列,而是发送到交换机(Exchange
  • 每个队列都要绑定交换机
  • 生产者发送到消息经过交换机 --> 到达队列--> 可以实现一个消息被多个消费者消
  1. 生产者
package ps;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitMQUtil;

public class Send {
    private final static String EXCHANGE_NAME = "test_exchange_fanout";
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        // 消息内容
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        //关闭通道连接
        channel.close();
        connection.close();
    }
}

注意: 此时的交换机是没有绑定队列的,发送消息肯定会丢失。交换机本身是不能存储数据的,队列才是存储数据的

  1. 消费者
  • package ps;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DeliverCallback;
    import util.RabbitMQUtil;
    
    public class Recv1 {
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
        private final static String QUEUE_NAME="debugers_test_sms";
        public static void main(String[] args) throws Exception {
            Connection connection = RabbitMQUtil.getConnection();
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "utf-8");
                System.out.println(" [x] Received '" + message + "'");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            };
            //修改为手动应答,true为自动应答,false相反
            channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
            });
        }
    }
    
    
  • package ps;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DeliverCallback;
    import util.RabbitMQUtil;
    
    public class Recv2 {
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
        private final static String QUEUE_NAME="debugers_test_email";
        public static void main(String[] args) throws Exception {
            Connection connection = RabbitMQUtil.getConnection();
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "utf-8");
                System.out.println(" [x] Received '" + message + "'");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            };
            //修改为手动应答,true为自动应答,false相反
            channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
            });
        }
    }
    

结果: 同一个消息被多个消费者获取。

在管理工具中查看队列和交换机的绑定关系:

1563099102615

路由模式(Routing)

python-four

现在又有一个场景,假如我交换机与队列绑定了,但是我不想要整个队列的消息,我可能只是要其中的一部分。

比如:就想要报错记录或者操作日志做一个后台的日志。那么就需要routing模式

绑定

绑定(binding)是队列和交换机之间的关系。你可以简单的理解为:队列对来自此交换机的消息感兴趣。

绑定可以采用额外的routingKey参数.

//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

注意:routingKey取决于交换机的类型,之前的fanout就体现不了他的价值。

千万别和basicPublish混淆了

//参数2为routingKey
channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
  1. 生产者(向delete队列推消息)
package routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitMQUtil;

public class Send {
    private final static String EXCHANGE_NAME = "test_exchange_direct";
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        // 消息内容
        String message = "Hello World!";
        //参数2为routingKey
        channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        //关闭通道连接
        channel.close();
        connection.close();
    }
}

2.消费者1(接收deleteupdateinsert队列的消息)

package routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import util.RabbitMQUtil;

public class Recv1 {
    private final static String EXCHANGE_NAME = "test_exchange_direct";
    private static final String QUEUE_NAME="debugers_test_sms";
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "utf-8");
            System.out.println(" [x] Received '" + message + "'");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        //修改为手动应答,true为自动应答,false相反
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
        });
    }
}

  1. 消费者2(只接收delete队列消息)
package routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import util.RabbitMQUtil;

public class Recv2 {
    private final static String EXCHANGE_NAME = "test_exchange_direct";
    private static final String QUEUE_NAME="debugers_test_email";
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "utf-8");
            System.out.println(" [x] Received '" + message + "'");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        //修改为手动应答,true为自动应答,false相反
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
        });
    }
}

结果:

消费者1(Recv1)和消费者2(Recv2)都接收到了生产者(Send)发送的消息。

那么我们改一改,将消费者1(Recv1)与交换机的routingKey(delete)给注释

再看看结果就不一样了:只有消费者2(Recv2)接收到了消息。因为并没有与交换机的delete进行绑定。

同样的道理消费者2(Recv2)是接收不到其他生产者发送的insertupdate消息的。

主题模式(topic)

发送到topic交换的消息不能具有任意的 routing_key- 它必须是由点(.)分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的一些功能。

一些有效的路由键示例:stock.usd.nysenyse.vmwquick.orange.rabbit。路由密钥中可以包含任意数量的字符,最多可达255个字节。

绑定键有两个重要的特殊特性:(是用.分割的单词,而不是字符

  • *:可以替代一个单词。
  • #:可以替换零个或多个单词。

python-five

  1. 生产者
package topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitMQUtil;

public class Send {
    private final static String EXCHANGE_NAME = "test_exchange_topic";
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        // 消息内容
        String message = "Hello World!";
        //参数2为routingKey
        channel.basicPublish(EXCHANGE_NAME, "routeKey.a", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        //关闭通道连接
        channel.close();
        connection.close();
    }
}

  1. 消费者1
package topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import util.RabbitMQUtil;

public class Recv1 {
    private final static String EXCHANGE_NAME = "test_exchange_topic";
    private static final String QUEUE_NAME="debugers_test_sms";
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "utf-8");
            System.out.println(" [x] Received '" + message + "'");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        //修改为手动应答,true为自动应答,false相反
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
        });
    }
}

  1. 消费者2
package topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import util.RabbitMQUtil;

public class Recv2 {
    private final static String EXCHANGE_NAME = "test_exchange_topic";
    private static final String QUEUE_NAME="debugers_test_email";
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routeKey.*");
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "utf-8");
            System.out.println(" [x] Received '" + message + "'");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        //修改为手动应答,true为自动应答,false相反
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
        });
    }
}

结果:

两个消费者都收到消息