SpringBoot实现多rabbitmq连接
•
大数据
一、配置
1. 配置文件

rabbitmq:
first:
host:
port:
username:
password:
#虚拟host 可以不设置,使用server默认host
virtual-host: /
second:
host:
port:
username:
password:
virtual-host: /
2. 配置类
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
/**
* RabbitMq多源配置
*
* @author lq
*/
@Configuration
public class RabbitConfig {
@Bean(name = "firstConnectionFactory")
@Primary
public ConnectionFactory firstConnectionFactory(
@Value("${spring.rabbitmq.first.host}") String host,
@Value("${spring.rabbitmq.first.port}") int port,
@Value("${spring.rabbitmq.first.username}") String username,
@Value("${spring.rabbitmq.first.password}") String password,
@Value("${spring.rabbitmq.first.virtual-host}") String virtualHost
) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
return connectionFactory;
}
@Bean(name = "secondConnectionFactory")
public ConnectionFactory secondConnectionFactory(
@Value("${spring.rabbitmq.second.host}") String host,
@Value("${spring.rabbitmq.second.port}") int port,
@Value("${spring.rabbitmq.second.username}") String username,
@Value("${spring.rabbitmq.second.password}") String password,
@Value("${spring.rabbitmq.second.virtual-host}") String virtualHost
) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
return connectionFactory;
}
@Bean(name = "firstRabbitTemplate")
@Primary
public RabbitTemplate firstRabbitTemplate(
@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory
) {
RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);
return firstRabbitTemplate;
}
@Bean(name = "secondRabbitTemplate")
public RabbitTemplate secondRabbitTemplate(
@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory
) {
RabbitTemplate secondRabbitTemplate = new RabbitTemplate(connectionFactory);
return secondRabbitTemplate;
}
@Bean(name = "firstFactory")
public SimpleRabbitListenerContainerFactory firstFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean(name = "secondFactory")
public SimpleRabbitListenerContainerFactory secondFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
}
3.信道构建器
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
/**
* 信道构建器
*
* @author liuqi
*/
@Configuration
public class CreateQueue {
@Bean
public String chargeQueue(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
try {
connectionFactory.createConnection().createChannel(false).queueDeclare(Constants.RABBITMQ_QUEUE_NAME, true, false, false, null);
}catch (IOException e){
e.printStackTrace();
}
return Constants.RABBITMQ_QUEUE_NAME;
}
@Bean
public String chargeQueue2(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
try {
connectionFactory.createConnection().createChannel(false).queueDeclare(Constants.RABBITMQ_QUEUE_NAME2, true, false, false, null);
}catch (IOException e){
e.printStackTrace();
}
return Constants.RABBITMQ_QUEUE_NAME2;
}
}
二、发送
1. 创建发送类
import com.alibaba.fastjson.JSONObject;
import com.google.gson.JsonObject;
import com.zlhy.websocket.util.constant.Constants;
import lombok.extern.slf4j.Slf4j;
import org.apache.tomcat.util.bcel.Const;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Map;
/**
* @author liuqi
* @version 1.0
* @description 向关联方的队列发送消息
*/
@Slf4j
@Service
public class SendMessage {
@Resource(name = "firstRabbitTemplate")
private RabbitTemplate firstRabbitTemplate;
@Resource(name = "secondRabbitTemplate")
private RabbitTemplate secondRabbitTemplate;
public void sendToOneMessage(JSONObject jsonObject) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
Message info = new Message(jsonObject.toString().getBytes(), messageProperties);
firstRabbitTemplate.convertAndSend(Constants.RABBITMQ_QUEUE_NAME, info);
}
public void sendToTwoMessage(JSONObject jsonObject) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
Message info = new Message(jsonObject.toString().getBytes(), messageProperties);
secondRabbitTemplate.convertAndSend(Constants.RABBITMQ_QUEUE_NAME2, info);
}
}
2. 调用方法发送数据
sendMessage.sendToOneMessage(jsonResult);
sendMessage.sendToTwoMessage(jsonResult);
三、消费测试
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@Slf4j
@Service
public class OneReceive {
@RabbitListener(queues = Constants.RABBITMQ_QUEUE_NAME, containerFactory = "firstFactory")
public void listenOne(Message message, Channel channel) throws IOException {
//获取MQ返回的数据
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
String data = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("MQ1返回的数据:{}", data);
//下面进行业务逻辑处理
}
}
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@Slf4j
@Service
public class TwoReceive {
@RabbitListener(queues = Constants.RABBITMQ_QUEUE_NAME2, containerFactory = "secondFactory")
public void listenTwo(Message message, Channel channel) throws IOException {
//获取MQ返回的数据
//channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
String data = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("MQ2返回的数据:{}", data);
//下面进行业务逻辑处理
}
}
也可放到一个类中测试消费
本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/6f8c81ae9e.html
