SpringBoot中集成RabbitMQ
1.Maven 依赖配置
<!-- RabbitMQ 依赖 -->
<!--注:以下依赖配置中的version不要省略,否则报错 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>1.5.8.RELEASE</version>
</dependency>
2.配置连接属性
在application.properties文件中,添加如下配置:
spring.rabbitmq.host = 192.168.16.98
spring.rabbitmq.port = 5672
spring.rabbitmq.username = guest
spring.rabbitmq.password = guest
spring.rabbitmq.virtualHost=/
3.@SpringBootApplication文件开启基于注解的rabbitmq
@EnableScheduling
@EnableRabbit //开启基于注解的rabbitmq
@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class,
DataSourceTransactionManagerAutoConfiguration.class })
@ComponentScan({ "org.remonde.mysql2stream" })
@PropertySource(value = { "classpath:mysql2stream-application.properties" })
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}
4.发送消息和接收消息的测试类
import java.time.LocalDateTime;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TestSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(){
String context = "hello----"+LocalDateTime.now();
System.out.println("send:"+context);
//往名称为hello的队列发送消息
this.amqpTemplate.convertAndSend("hello",context);
}
}
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "hello") //监听 名称为hello的队列
public class TestReceiver {
//消息处理器
@RabbitHandler
public void process(String message){
System.out.println("RabbitMq Received:"+message);
}
}
5.消息服务间隔发送消息
package org.remonde.mysql2stream.server; import java.io.IOException; import java.util.concurrent.TimeoutException; import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.parser.Feature; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /** * RabbitMq通讯服务 */ @Component @RabbitListener(queues = "${spring.rabbitmq.queueName}") // 监听queue @ConditionalOnExpression("'${collectType}'.equals('rabbitmq') || '${collectType}'.equals('all')") public class RabbitMqServer { private static final Logger logger = LoggerFactory.getLogger(RabbitMqServer.class); private @Value(value = "${spring.rabbitmq.host}") String host; private @Value(value = "${spring.rabbitmq.port}") int port; private @Value(value = "${spring.rabbitmq.username}") String username; private @Value(value = "${spring.rabbitmq.password}") String password; private @Value(value = "${spring.rabbitmq.virtualHost}") String virtualHost; private @Value(value = "${spring.rabbitmq.queueName}") String queueName; // 消息处理器,接收消息时自动回调 @RabbitHandler public void process(Object message) { try { if(message instanceof String) { System.out.println("RabbitMq Received:" + (String)message); }else if(message instanceof Message){ String msg = new String(((Message)message).getBody(), "UTF-8"); System.out.println("RabbitMq Received:" + msg); } } catch (Exception e) { e.printStackTrace(); } } @PostConstruct public void init() { RabbitMqThread thread = new RabbitMqThread(); thread.start(); } // 初始化消息业务的线程 class RabbitMqThread extends Thread { @Override public void run() { DirectProducer directProducer = new DirectProducer(); DirectConsumer consumer = new DirectConsumer(); String routingKey = "kl"; String testMessage = "hello rabbitmq"; consumer.consumerMsg(queueName, routingKey); while (true) { try { directProducer.publishMsg(routingKey, testMessage); Thread.sleep(30000); } catch (Exception e) { e.printStackTrace(); } } } } /** * 消息生产者 **/ public class DirectProducer { private static final String EXCHANGE_NAME = "direct.exchange"; /** * 发布消息 **/ public void publishMsg(String routingKey, String message) throws IOException, TimeoutException { ConnectionFactory factory = RabbitMqUtils.getConnectionFactory(host,port,username,password,virtualHost); // 创建连接 Connection connection = factory.newConnection(); // 创建消息通道 Channel channel = connection.createChannel(); // 声明exchange中的消息为可持久化,不自动删除 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null); // 发布消息 channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); channel.close(); connection.close(); } } /** * 消息消费者 **/ public class DirectConsumer { private static final String exchangeName = "direct.exchange"; /** * 接收处理消息 **/ public void consumerMsg(String queueName, String routingKey) throws IOException, TimeoutException { ConnectionFactory factory = RabbitMqUtils.getConnectionFactory(host,port,username,password,virtualHost); // 创建连接 Connection connection = factory.newConnection(); // 创建消息信道 final Channel channel = connection.createChannel(); // 消息队列 channel.queueDeclare(queueName, true, false, false, null); // 绑定队列到交换机 channel.queueBind(queueName, exchange, routingKey); System.out.println("[*] Waiting for message. To exist press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { // 注册消息处理事件 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); try { System.out.println(" [x] Received '" + message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 取消自动ack channel.basicConsume(queueName, false, consumer); } } // 连接工具 public static class RabbitMqUtils{ public static ConnectionFactory getConnectionFactory(String host,int port,String username,String password,String virtualHost) { // 创建连接工程,下面给出的是默认的case ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); factory.setVirtualHost(virtualHost); return factory; } } }
6.报错处理
1.internalRabbitListenerEndpointRegistry
监听服务启动失败,报错如下:
2.AmqpAuthenticationException
发送者服务发送消息失败,报错如下:
---
spring:
rabbitmq:
host: x.x.x.x
port: 5672
username: user
password: pass
virtual-host: /