SpringBoot实现多rabbitmq连接

一、配置

1. 配置文件

SpringBoot实现多rabbitmq连接

  rabbitmq:
    first:
      host: 
      port: 
      username: 
      password: 
      #虚拟host 可以不设置,使用server默认host
      virtual-host: /
    second:
      host: 
      port: 
      username: 
      password: 
      virtual-host: /

2. 配置类

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
 
/**
 * RabbitMq多源配置
 *
 * @author lq
 */
@Configuration
public class RabbitConfig {
 
    @Bean(name = "firstConnectionFactory")
    @Primary
    public ConnectionFactory firstConnectionFactory(
            @Value("${spring.rabbitmq.first.host}") String host,
            @Value("${spring.rabbitmq.first.port}") int port,
            @Value("${spring.rabbitmq.first.username}") String username,
            @Value("${spring.rabbitmq.first.password}") String password,
            @Value("${spring.rabbitmq.first.virtual-host}") String virtualHost
    ) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
 
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        return connectionFactory;
    }
 
    @Bean(name = "secondConnectionFactory")
    public ConnectionFactory secondConnectionFactory(
            @Value("${spring.rabbitmq.second.host}") String host,
            @Value("${spring.rabbitmq.second.port}") int port,
            @Value("${spring.rabbitmq.second.username}") String username,
            @Value("${spring.rabbitmq.second.password}") String password,
            @Value("${spring.rabbitmq.second.virtual-host}") String virtualHost
    ) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        return connectionFactory;
    }
 
    @Bean(name = "firstRabbitTemplate")
    @Primary
    public RabbitTemplate firstRabbitTemplate(
            @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory
    ) {
        RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);
        return firstRabbitTemplate;
    }
 
    @Bean(name = "secondRabbitTemplate")
    public RabbitTemplate secondRabbitTemplate(
            @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory
    ) {
        RabbitTemplate secondRabbitTemplate = new RabbitTemplate(connectionFactory);
        return secondRabbitTemplate;
    }
 
 
    @Bean(name = "firstFactory")
    public SimpleRabbitListenerContainerFactory firstFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory
    ) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }
 
    @Bean(name = "secondFactory")
    public SimpleRabbitListenerContainerFactory secondFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory
    ) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }
}

 3.信道构建器

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
 
/**
 * 信道构建器
 *
 * @author liuqi
 */
@Configuration
public class CreateQueue {
 
    @Bean
    public String chargeQueue(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
        try {
            connectionFactory.createConnection().createChannel(false).queueDeclare(Constants.RABBITMQ_QUEUE_NAME, true, false, false, null);
        }catch (IOException e){
            e.printStackTrace();
        }
        return Constants.RABBITMQ_QUEUE_NAME;
    }

    @Bean
    public String chargeQueue2(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
        try {
            connectionFactory.createConnection().createChannel(false).queueDeclare(Constants.RABBITMQ_QUEUE_NAME2, true, false, false, null);
        }catch (IOException e){
            e.printStackTrace();
        }
        return Constants.RABBITMQ_QUEUE_NAME2;
    }
}

二、发送

1. 创建发送类

import com.alibaba.fastjson.JSONObject;
import com.google.gson.JsonObject;
import com.zlhy.websocket.util.constant.Constants;
import lombok.extern.slf4j.Slf4j;
import org.apache.tomcat.util.bcel.Const;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.Map;

/**
 * @author liuqi
 * @version 1.0
 * @description 向关联方的队列发送消息
 */
@Slf4j
@Service
public class SendMessage {

    @Resource(name = "firstRabbitTemplate")
    private RabbitTemplate firstRabbitTemplate;

    @Resource(name = "secondRabbitTemplate")
    private RabbitTemplate secondRabbitTemplate;

    public void sendToOneMessage(JSONObject jsonObject) {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        Message info = new Message(jsonObject.toString().getBytes(), messageProperties);
        firstRabbitTemplate.convertAndSend(Constants.RABBITMQ_QUEUE_NAME, info);
    }

    public void sendToTwoMessage(JSONObject jsonObject) {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        Message info = new Message(jsonObject.toString().getBytes(), messageProperties);
        secondRabbitTemplate.convertAndSend(Constants.RABBITMQ_QUEUE_NAME2, info);
    }
}

2. 调用方法发送数据

sendMessage.sendToOneMessage(jsonResult);

sendMessage.sendToTwoMessage(jsonResult);

三、消费测试

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.nio.charset.StandardCharsets;


@Slf4j
@Service
public class OneReceive {

    @RabbitListener(queues = Constants.RABBITMQ_QUEUE_NAME, containerFactory = "firstFactory")
    public void listenOne(Message message, Channel channel) throws IOException {
        //获取MQ返回的数据
       // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        String data = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("MQ1返回的数据:{}", data);
        //下面进行业务逻辑处理
    }
}
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.nio.charset.StandardCharsets;


@Slf4j
@Service
public class TwoReceive {
    @RabbitListener(queues = Constants.RABBITMQ_QUEUE_NAME2, containerFactory = "secondFactory")
    public void listenTwo(Message message, Channel channel) throws IOException {
        //获取MQ返回的数据
        //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        String data = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("MQ2返回的数据:{}", data);
        //下面进行业务逻辑处理
    }

}

也可放到一个类中测试消费

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/6f8c81ae9e.html