RabbitMQ简介及Java API

作者:陆金龙    发表时间:2024-02-24 22:56   

关键词:  

1.RabbitMQ简介

RabbitMQ是一个多租户系统,由虚拟主机提供了资源的逻辑分组和分隔:连接、交换器、队列、绑定、用户权限、策略和其他的东西都属于虚拟主机(virtual hosts,v_host)。虚拟主机能够使得不需要部署多个RabbitMQ来负责不同的业务。

虚拟机

客户端连接到RabbitMQ时,需要指定虚拟主机名称,同时还需要提供用户名和密码,只有用户具有相关的权限才能建立连接。

RabbitMQ包含一个默认的虚拟主机:“/”。默认操作的都是这个虚拟主机,其用户名和密码默认都是guest。RabbitMQ禁止guest用户远程访问,只可以访问本地的mq服务。

虚拟机相关操作如下:

创建虚拟机命令:rabbitmqctl add_vhost vhost_name

添加用户: rabbitmqctl add_user <username> <password>

用户和授权:rabbitmqctl set_permissions -p <vhost> <username> <permissions>

限制虚拟主机的最大连接数量: rabbitmqctl set_vhost_limits -p <vhost_name>  '{"max-connections": 256}'

限制虚拟主机的最大队列数量:rabbitmqctl set_vhost_limits -p <vhost_name>  '{"max-queues": 256}'

交换器

从3.7版本开始,RabbitMQ开始支持为topic交换器设置权限,在设置权限时,要结合发布到主题交换器的消息的routing key信息。设置了topic授权后,向topic路由器发布消息,或者从topic路由器接收消息,都会依据消息的routing key进行权限检查,如果routing key与权限设置匹配,则成功,否则抛出异常。

权限机制

RabbitMQ有一套独立的权限机制:配置、写、读,通过三个正则表达式来分别匹配所访问资源的这三项权限。

相关术语

代理(即RabbitMQ服务端):Broker、RabbitMQ Server

客户端:Client

连接:Connection

连接工厂:Connection Factory

通道:Channel

声明:Declare

交换机:Exchange

队列:Queue

发布者:Publisher

消费者:Consumer

路由:Route、routable、unroutable

持久化:durable、non-durable

自动删除:autodelete、non-autodelete

排斥:exclusive

路由关键字:routing key

 

2.Java API

RabbitMQ Java客户端使用“com.rabbitmq.client”作为顶级包。主要的类和接口:

● Channel

● Connection

● ConnectionFactory

● Consumer

通过“Channel”接口可以进行协议操作。

连接到代理(虚拟机):

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost(vhostName);

factory.setUsername(user);

factory.setPassword(pass);

Connection connection = factory.newConnection();

建立消息通道:

然后,“Connection”接口可用用来打开一个通道,用来发送和接收消息:

Channel channel = conn.createChannel();

通常每个线程使用一个“Channel”。

与代理断开连接:

channel.close();

conn.close();

绑定队列到交换机:

声明交换机和队列,通过给定的路由关键字(routing key)将队列绑定到交换机:

channel.exchangeDeclare(exchangeName, "direct", true);

String queueName = channel.queueDeclare().getQueue();

channel.queueBind(queueName, exchangeName, routingKey);

发布消息:

使用“Channel.basicPublish”:

byte[] messageBodyBytes = "Hello!".getBytes();

channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

接收消息:

使用“Consumer”接口发起订阅,传空字符串调用“Channel.basicConsume”方法,并使用返回值即可得到唯一的tag,消费者标签用于取消消费者的订阅。不同的“Consumer”实例必须有不同的消费者标签。

channel.basicConsume(queueName, false, "consumerTag",

    new DefaultConsumer(channel) {

        @Override

        public void handleDelivery(String consumerTag,

                                                    Envelope envelope,

                                                    AMQP.BasicProperties properties,

                                                    byte[] body)

            throws IOException

        {

            String routingKey =envelope.getRoutingKey();

            String contentType = properties.getContentType();

            long deliveryTag =envelope.getDeliveryTag();

            // todo 处理消息

            channel.basicAck(deliveryTag, false);

        }

    });

取消消费者的订阅:

channel.basicCancel(consumerTag);