SpringBoot中集成RabbitMQ

作者:陆金龙    发表时间:2024-02-24 22:35   

关键词:  

1.Maven 依赖配置

<!-- RabbitMQ 依赖 -->

<!--注:以下依赖配置中的version不要省略,否则报错 -->

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-amqp</artifactId>

<version>1.5.8.RELEASE</version>

 

</dependency>

2.配置连接属性

在application.properties文件中,添加如下配置:

 spring.rabbitmq.host = 192.168.16.98

 spring.rabbitmq.port = 5672

 spring.rabbitmq.username = guest

 spring.rabbitmq.password = guest

 spring.rabbitmq.virtualHost=/

3.@SpringBootApplication文件开启基于注解的rabbitmq

@EnableScheduling

@EnableRabbit //开启基于注解的rabbitmq

@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class,

DataSourceTransactionManagerAutoConfiguration.class })

@ComponentScan({ "org.remonde.mysql2stream" })

@PropertySource(value = { "classpath:mysql2stream-application.properties" })

public class Main {

    public static void main(String[] args) {

        SpringApplication.run(Main.class, args);

    }

 

}

4.发送消息和接收消息的测试类

import java.time.LocalDateTime;

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

 

@Component

public class TestSender {

    @Autowired

    private AmqpTemplate amqpTemplate;

    public void send(){

        String context = "hello----"+LocalDateTime.now();

        System.out.println("send:"+context);

        //往名称为hello的队列发送消息

        this.amqpTemplate.convertAndSend("hello",context);

    }

}

 

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

@RabbitListener(queues = "hello") //监听 名称为hello的队列

public class TestReceiver {

 //消息处理器

    @RabbitHandler

    public void process(String message){

        System.out.println("RabbitMq Received:"+message);

    }

}

5.消息服务间隔发送消息

package org.remonde.mysql2stream.server;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;


/**
 * RabbitMq通讯服务
 */
@Component
@RabbitListener(queues = "${spring.rabbitmq.queueName}") // 监听queue
@ConditionalOnExpression("'${collectType}'.equals('rabbitmq') || '${collectType}'.equals('all')")
public class RabbitMqServer {
	private static final Logger logger = LoggerFactory.getLogger(RabbitMqServer.class);

	private @Value(value = "${spring.rabbitmq.host}") String host;
	private @Value(value = "${spring.rabbitmq.port}") int port;
	private @Value(value = "${spring.rabbitmq.username}") String username;
	private @Value(value = "${spring.rabbitmq.password}") String password;
	private @Value(value = "${spring.rabbitmq.virtualHost}") String virtualHost;
	private @Value(value = "${spring.rabbitmq.queueName}") String queueName;

	// 消息处理器,接收消息时自动回调
	@RabbitHandler
	public void process(Object message) {
		try {
			if(message instanceof String) {
				System.out.println("RabbitMq Received:" + (String)message);
			}else if(message instanceof Message){
				String msg = new String(((Message)message).getBody(), "UTF-8");
				System.out.println("RabbitMq Received:" + msg);
			}

		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@PostConstruct
	public void init() {
		RabbitMqThread thread = new RabbitMqThread();
		thread.start();
	}

    // 初始化消息业务的线程
	class RabbitMqThread extends Thread {
		@Override
		public void run() {
			DirectProducer directProducer = new DirectProducer();
			DirectConsumer consumer = new DirectConsumer();
			String routingKey =  "kl";
			String testMessage = "hello rabbitmq";
			consumer.consumerMsg(queueName, routingKey);
			while (true) {
				try {
					directProducer.publishMsg(routingKey, testMessage);
					Thread.sleep(30000);
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
	}

    /**
    * 消息生产者
    **/
	public class DirectProducer {
		private static final String EXCHANGE_NAME = "direct.exchange";

        /**
        * 发布消息
        **/
		public void publishMsg(String routingKey, String message) throws IOException, TimeoutException {
			ConnectionFactory factory = RabbitMqUtils.getConnectionFactory(host,port,username,password,virtualHost);
			// 创建连接
			Connection connection = factory.newConnection();
			// 创建消息通道
			Channel channel = connection.createChannel();
			// 声明exchange中的消息为可持久化,不自动删除
			channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);
			// 发布消息
			channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
			channel.close();
			connection.close();
		}
	}

    /**
    * 消息消费者
    **/
	public class DirectConsumer {
		private static final String exchangeName = "direct.exchange";

        /**
         * 接收处理消息
        **/
		public void consumerMsg(String queueName, String routingKey) throws IOException, TimeoutException {
			ConnectionFactory factory = RabbitMqUtils.getConnectionFactory(host,port,username,password,virtualHost);
			// 创建连接
			Connection connection = factory.newConnection();
			// 创建消息信道
			final Channel channel = connection.createChannel();
			// 消息队列
			channel.queueDeclare(queueName, true, false, false, null);
			// 绑定队列到交换机
			channel.queueBind(queueName, exchange, routingKey);
			System.out.println("[*] Waiting for message. To exist press CTRL+C");
			Consumer consumer = new DefaultConsumer(channel) {
			    // 注册消息处理事件
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
						byte[] body) throws IOException {
					String message = new String(body, "UTF-8");
					try {
						System.out.println(" [x] Received '" + message);
					} finally {
						System.out.println(" [x] Done");
						channel.basicAck(envelope.getDeliveryTag(), false);
					}
				}
			};
			// 取消自动ack
			channel.basicConsume(queueName, false, consumer);
		}
	}


	// 连接工具
	public static class RabbitMqUtils{
		public static ConnectionFactory getConnectionFactory(String host,int port,String username,String password,String virtualHost) {
			// 创建连接工程,下面给出的是默认的case
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(host);
			factory.setPort(port);
			factory.setUsername(username);
			factory.setPassword(password);
			factory.setVirtualHost(virtualHost);
			return factory;
		}

	}
}

6.报错处理

1.internalRabbitListenerEndpointRegistry

监听服务启动失败,报错如下:

Failed to start bean 'org.springframework.amqp.rabbit.config.internalRabbitListenerEndpointRegistry'; nested exception is org.springframework.amqp.AmqpIllegalStateException: Fatal exception on listener startup
at org.springframework.context.support.DefaultLifecycleProcessor.doStart
 
用户名密码错误

2.AmqpAuthenticationException

发送者服务发送消息失败,报错如下:

.ExceptionHandlerExceptionResolver : Resolved [org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. 

 

可能的原因:账号密码错误
1.账号密码错误。
2.账号密码权限问题。
3.查看是否允许非localhost登陆
 
排查结果:账号不存在。
因为部署了5672和5673两套RabbitMQ,测试时用的是5673,而使用的账号是5672的账号。
把端口改为5672之后解决。测试时尽量使用简单环境。
---

spring:

  rabbitmq:
    host: x.x.x.x
    port: 5672
    username: user
    password: pass
    virtual-host: /