RabbitMQ c++
•
大数据
Windows RabbitMQ-c 消息处理流程
RabbitMQ-c Demo
- 项目需求:
多个app同时发送消息被多个app同时接收。生产者-消费者模式。
- 开发环境:
Windows 11
visual studio 2017
camke 3.27.0
Erlang 24.0
rabbitmq-server-3.9.7
- Rabbit MQ的安装
RabbitMQ的安装需要依赖Erlang。安装之前,先安装Erlang。Erlang与Rabbit的版本不是所有的都匹配。所有安装的时候注意两者版本要匹配。查看版本匹配。
Erlang下载地址
RabbitMQ下载地址

- 配置设置
RabbitMQ安装完成后开启插件节点,运行下图的软件,设置节点。

输入开启节点的指令:rabbitmqctl start_app

开启RabbitMQ管理模块的插件,并配置到RabbitMQ节点上:rabbitmq-plugins enable rabbitmq_management

通过浏览器访问RabbitMQ web管理软件:localhost:15672
用户名:guest,密码:guest


- RbbitMQ-c 库的编译
从github下载 源码,用cmake生成.sln,再编译lib。



备注:将生成的库引入demo
- 编写Client demo
分为publish demo 和 consumer demo ,封装共同访问类CRabbitClient。
#pragma once#include #include #include #include #include using RecvCallBack = std::function;class CRabbitmqClient{public: CRabbitmqClient(); ~CRabbitmqClient(); /* * 连接RabbitMQ Server * @param [in] strExchange:交换器名称 * @param [in] strType:交换器类型 ,常见的如 fanout(广播) direct(点对点) topic(订阅) * @param [in] isPassive:检测exchange是否存在。false:不存在会创建,true:不存在不会创建 * @param [in] isDurable:是否永久化。永久化:将队列信息写入磁盘,RabbitMQ Server重启后,队列不会丢失 * @param [in] isAutoDelete:当没有队列和交换器绑定时,交换器是否自动删除 * @param [in] internal: 设置是否内置的, true表示是内置的交换器, 客户端程序无法直接发送消息到这个交换器中, 只能通过交换器路由到交换器这个方式 * @returns 设置结果 */ int Connect(const std::string &strHostname, int iPort, const std::string &strUser, const std::string &strPasswd); int Disconnect(); /* * 声明交换器 * @param [in] strExchange:交换器名称 * @param [in] strType:交换器类型 ,常见的如 fanout(广播) direct(点对点) topic(订阅) * @param [in] isPassive:检测exchange是否存在。false:不存在会创建,true:不存在不会创建 * @param [in] isDurable:是否永久化。永久化:将队列信息写入磁盘,RabbitMQ Server重启后,队列不会丢失 * @param [in] isAutoDelete:当没有队列和交换器绑定时,交换器是否自动删除 * @param [in] internal: 设置是否内置的, true表示是内置的交换器, 客户端程序无法直接发送消息到这个交换器中, 只能通过交换器路由到交换器这个方式 * @returns 声明结果 */ int ExchangeDeclare(const std::string &strExchange,const std::string& strType,bool isPassive =false, bool isDurable =false,bool isAutoDelete = false,int internal =0); /* * 声明队列 * @param [in] strQueueName:队列名称 * @param [in] isPassive:检测exchange是否存在。false:不存在会创建,true:不存在不会创建 * @param [in] isDurable:是否永久化。永久化:将队列信息写入磁盘,RabbitMQ Server重启后,队列不会丢失 * @param [in] isExclusive:只有自己的用户对该队列可见 * @param [in] isAutoDelete:当没有队列和交换器绑定时,交换器是否自动删除 * @returns 声明结果 */ int QueueDelare(const std::string& strQueueName, bool isPassive = false, bool isDurable = false, bool isExclusive =false, bool isAutoDelete = false ); /* * 将队列绑定到交换器 * @param [in] strQueueName:队列名称 * @param [in] strExchange:交换器名称 * @param [in] strBindKey:路由键 * @returns 绑定结果 */ int QueueBind(const std::string &strQueueName, const std::string &strExchange, const std::string &strBindKey); int QueueUnbind(const std::string &strQueueName, const std::string &strExchange, const std::string &strBindKey); int QueueDelete(const std::string &strQueueName, int iIfUnused); /* * 发布消息 * @param [in] strMessage:需要发送的消息 * @param [in] strExchange:交换器名称 * @param [in] strBindKey:路由键 * @returns 发布结果 */ int Publish(const std::string &strMessage, const std::string &strExchange, const std::string &strRoutekey); int Consumer(const std::string &strQueueName, RecvCallBack func);private: int ErrorMsg(amqp_rpc_reply_t x, char const *context); void StartRecvThread(); void run();private: std::string m_strHostname{"127.0.0.1"}; // amqp主机 int m_iPort{5672}; // amqp端口 std::string m_strUser{"guest"}; std::string m_strPasswd{"guest"}; int m_iChannel=2; amqp_socket_t *m_pSock{ nullptr }; amqp_connection_state_t m_pConn{ nullptr }; std::unique_ptr m_pRecvThread{ nullptr }; bool m_isRun{ false }; RecvCallBack m_recvFunc{ nullptr };};#include "CRabbitmqClient.h"CRabbitmqClient::CRabbitmqClient(){}CRabbitmqClient::~CRabbitmqClient(){}int CRabbitmqClient::Connect(const std::string &strHostname, int iPort, const std::string &strUser, const std::string &strPasswd){ m_strHostname = strHostname; m_iPort = iPort; m_strUser = strUser; m_strPasswd = strPasswd; m_pConn = amqp_new_connection(); if (m_pConn == nullptr) { return -1; } m_pSock = amqp_tcp_socket_new(m_pConn); if (m_pSock == nullptr) { return -2; } int status = amqp_socket_open(m_pSock, m_strHostname.c_str(), m_iPort); if(status<0) { return -3; } if (0 != ErrorMsg(amqp_login(m_pConn, "guest", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, m_strUser.c_str(), m_strPasswd.c_str()), "Logging in")) { return -4; } return 0;}int CRabbitmqClient::Disconnect(){ if (m_pConn != nullptr) { if (0 != ErrorMsg(amqp_connection_close(m_pConn, AMQP_REPLY_SUCCESS), "Closeing connectiong")) { return -1; } if (amqp_destroy_connection(m_pConn) < 0) { return -2; } m_pConn = nullptr; } return 0;}int CRabbitmqClient::ExchangeDeclare(const std::string &strExchange, const std::string& strType, bool isPassive, bool isDurable , bool isAutoDelete, int internal ){ amqp_channel_open(m_pConn, m_iChannel); if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "open channel")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -1; } amqp_bytes_t exchange = amqp_cstring_bytes(strExchange.c_str()); amqp_bytes_t type = amqp_cstring_bytes(strType.c_str()); amqp_exchange_declare(m_pConn,m_iChannel,exchange,type, isPassive, isDurable, isAutoDelete, internal,amqp_empty_table); if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "exchange_declare")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -2; } amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return 0;}int CRabbitmqClient::QueueDelare(const std::string& strQueueName, bool isPassive ,bool isDurable , bool isExclusive, bool isAutoDelete ){ if (m_pConn == nullptr) { return -1; } amqp_channel_open(m_pConn, m_iChannel); if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "open channel")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -2; } amqp_bytes_t queue = amqp_cstring_bytes(strQueueName.c_str()); amqp_queue_declare(m_pConn, m_iChannel,queue, isPassive, isDurable, isExclusive, isAutoDelete, amqp_empty_table); if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "queue_declare")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -3; } amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return 0;}int CRabbitmqClient::QueueBind(const std::string &strQueueName, const std::string &strExchange, const std::string &strBindKey){ if (m_pConn == nullptr) { return -1; } amqp_channel_open(m_pConn, m_iChannel); if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "open channel")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -2; } amqp_bytes_t queue = amqp_cstring_bytes(strQueueName.c_str()); amqp_bytes_t exchange = amqp_cstring_bytes(strExchange.c_str()); amqp_bytes_t routkey = amqp_cstring_bytes(strBindKey.c_str()); amqp_queue_bind(m_pConn, m_iChannel, queue, exchange, routkey, amqp_empty_table); if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "queue_bind")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -3; } amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return 0;}int CRabbitmqClient::QueueUnbind(const std::string &strQueueName, const std::string &strExchange, const std::string &strBindKey){ if (m_pConn == nullptr) { return -1; } amqp_channel_open(m_pConn, m_iChannel); if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "open channel")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -2; } amqp_bytes_t queue = amqp_cstring_bytes(strQueueName.c_str()); amqp_bytes_t exchange = amqp_cstring_bytes(strExchange.c_str()); amqp_bytes_t routkey = amqp_cstring_bytes(strBindKey.c_str()); amqp_queue_unbind(m_pConn, m_iChannel, queue, exchange, routkey, amqp_empty_table); if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "queue_unbind")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -3; } amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return 0;}int CRabbitmqClient::QueueDelete(const std::string &strQueueName, int iIfUnused){ if (m_pConn == nullptr) { return -1; } amqp_channel_open(m_pConn, m_iChannel); if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "open channel")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -2; } if(0!= ErrorMsg(amqp_get_rpc_reply(m_pConn), "delete queue")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -3; } amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return 0;}int CRabbitmqClient::Publish(const std::string &strMessage, const std::string &strExchange, const std::string &strRoutekey){ if (m_pConn == nullptr) { return -1; } amqp_channel_open(m_pConn, m_iChannel); if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "open channel")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -2; } amqp_bytes_t message_bytes; message_bytes.len = strMessage.length(); message_bytes.bytes = (void *)(strMessage.c_str()); amqp_bytes_t exchange = amqp_cstring_bytes(strExchange.c_str()); amqp_bytes_t routekey = amqp_cstring_bytes(strRoutekey.c_str()); //if (0 != amqp_basic_publish(m_pConn, m_iChannel, exchange, routekey, 0, 0, &props, message_bytes)) { if (0 != amqp_basic_publish(m_pConn, m_iChannel, exchange, routekey, 0, 0, NULL, message_bytes)) { fprintf(stderr, "publish amqp_basic_publish failed\n"); if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "amqp_basic_publish")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -3; } } amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);}int CRabbitmqClient::Consumer(const std::string &strQueueName, RecvCallBack func){ if (m_pConn == nullptr) { return -1; } if (m_recvFunc == nullptr) { m_recvFunc = func; } amqp_channel_open(m_pConn, m_iChannel); if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "open channel")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -2; } amqp_basic_qos(m_pConn, m_iChannel, 0, 1, 0); int ack = 1; // no_ack 是否需要确认消息后再从队列中删除消息 amqp_bytes_t queuename = amqp_cstring_bytes(strQueueName.c_str()); amqp_basic_consume(m_pConn, m_iChannel, queuename, amqp_empty_bytes, 0, ack, 0, amqp_empty_table); if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "Consuming")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -3; } StartRecvThread(); return 0;}int CRabbitmqClient::ErrorMsg(amqp_rpc_reply_t x, char const *context) { switch (x.reply_type) { case AMQP_RESPONSE_NORMAL: return 0; case AMQP_RESPONSE_NONE: fprintf(stderr, "%s: missing RPC reply type!\n", context); break; case AMQP_RESPONSE_LIBRARY_EXCEPTION: fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x.library_error)); break; case AMQP_RESPONSE_SERVER_EXCEPTION: switch (x.reply.id) { case AMQP_CONNECTION_CLOSE_METHOD: { amqp_connection_close_t *m = (amqp_connection_close_t *)x.reply.decoded; fprintf(stderr, "%s: server connection error %uh, message: %.*s\n", context, m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes); break; } case AMQP_CHANNEL_CLOSE_METHOD: { amqp_channel_close_t *m = (amqp_channel_close_t *)x.reply.decoded; fprintf(stderr, "%s: server channel error %uh, message: %.*s\n", context, m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes); break; } default: fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id); break; } break; } return -1;}void CRabbitmqClient::StartRecvThread(){ if (m_pRecvThread == nullptr) { m_pRecvThread = std::make_unique(&CRabbitmqClient::run, this); m_isRun = true; }}void CRabbitmqClient::run(){ std::chrono::milliseconds interval{ 1000 }; amqp_rpc_reply_t res; amqp_envelope_t envelope; while (true) { amqp_maybe_release_buffers(m_pConn); res = amqp_consume_message(m_pConn, &envelope, nullptr, 0); if (AMQP_RESPONSE_NORMAL != res.reply_type) { fprintf(stderr, "Consumer amqp_channel_close failed\n"); amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); break; } std::string str((char *)envelope.message.body.bytes, (char *)envelope.message.body.bytes + envelope.message.body.len); if (m_recvFunc) { m_recvFunc(str); } amqp_destroy_envelope(&envelope); std::this_thread::sleep_for(interval); } amqp_channel_close(m_pConn, 1, AMQP_REPLY_SUCCESS); amqp_connection_close(m_pConn, AMQP_REPLY_SUCCESS); amqp_destroy_connection(m_pConn);}publish
// RabbitMQPublish.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//
#include
#include "CRabbitmqClient.h"
int main()
{
CRabbitmqClient objRabbitmq;
std::string strIP = "127.0.0.1";
int iPort = 5672;
std::string strUser = "guest";
std::string strPasswd = "guest";
int iRet = objRabbitmq.Connect(strIP, iPort, strUser, strPasswd);
printf("Rabbitmq Connect Ret: %d\n", iRet);
std::string strExchange = "ExchangeTest1";
std::string strRoutekey = "routekeyTest1";
std::string strQueuename1 = "queueTest1";
std::string strQueuename2 = "queueTest2";
iRet = objRabbitmq.ExchangeDeclare(strExchange, "fanout");
printf("Rabbitmq ExchangeDeclare Ret: %d\n", iRet);
iRet = objRabbitmq.QueueDelare(strQueuename1);
printf("Rabbitmq QueueDeclare1 Ret: %d\n", iRet);
iRet = objRabbitmq.QueueDelare(strQueuename2);
printf("Rabbitmq QueueDeclare2 Ret: %d\n", iRet);
iRet = objRabbitmq.QueueBind(strQueuename1, strExchange, strRoutekey);
printf("Rabbitmq QueueBind1 Ret: %d\n", iRet);
iRet = objRabbitmq.QueueBind(strQueuename2, strExchange, strRoutekey);
printf("Rabbitmq QueueBind2 Ret: %d\n", iRet);
// Send Msg
while (true)
{
std::string temp;
std::cin >> temp;
std::string strSendMsg = temp;
iRet = objRabbitmq.Publish(strSendMsg, strExchange, strRoutekey);
printf("Rabbitmq Publish 1 Ret: %d\n", iRet);
}
objRabbitmq.Disconnect();
return 0;
}
consumer
// RabbitMQConsumer.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//
#include
#include "CRabbitmqClient.h"
void RecvFunc(const std::string& message)
{
std::cout << message<<std::endl;
}
int main()
{
CRabbitmqClient objRabbitmq;
std::string strIP = "127.0.0.1";
int iPort = 5672;
std::string strUser = "guest";
std::string strPasswd = "guest";
int iRet = objRabbitmq.Connect(strIP, iPort, strUser, strPasswd);
printf("Rabbitmq Connect Ret: %d\n", iRet);
std::string strExchange = "ExchangeTest1";
std::string strRoutekey = "routekeyTest1";
std::string strQueuename = "queueTest1";
iRet = objRabbitmq.Consumer(strQueuename, RecvFunc);
printf("Rabbitmq Consumer Ret: %d\n", iRet);
std::chrono::milliseconds interval{ 1000 };
while (true)
{
std::this_thread::sleep_for(interval);
}
return 0;
}
本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/876ffb8d4e.html
