文章标题:
对C++中第三方RabbitMQ库的深度剖析
文章内容:
目录
- 1.引言
- 2.安装
- 1.RabbitMq
- 2.客户端库
- 3.AMQP-CPP基础应用
- 1.概述
- 2.运用
- 4.类与接口
- 1.Channel
- 2.ev
- 5.应用
- 1.publish.cc
- 2.consume.cc
- 3.makefile
1.引言
RabbitMQ
:是一种消息队列组件,能够实现两个客户端主机之间消息的传输功能,包括发布和订阅操作。- 核心概念:涵盖交换机、队列、绑定以及消息等要素。
- 交换机类型:
- 广播交换:当交换机接收到消息后,会把消息发布到所有与之绑定的队列中。
- 直接交换:依据消息里的
bkey
和绑定的rkey
进行对比,若一致则将消息放入对应的队列。 - 主题交换:利用
bkey
和绑定的rkey
进行规则匹配,匹配成功就把消息放入队列。
2.安装
1.RabbitMq
- 安装方法:通过执行
sudo apt install rabbitmq-server
命令来进行安装。 - 简单使用示例:
# 安装完成时默认有guest用户,但权限不足,需要创建一个administrator用户以便远程登录和进行消息的发布订阅操作 #添加用户 sudo rabbitmqctl add_user root <PASSWORD> #设置用户tag sudo rabbitmqctl set_user_tags root administrator #设置用户权限 sudo rabbitmqctl set_permissions -p / root "." "." ".*" # RabbitMQ自带web管理界面,执行以下命令开启,默认端口15672 sudo rabbitmq-plugins enable rabbitmq_management
2.客户端库
- C语言库
-
sudo apt install libev-dev #libev网络库组件 git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git cd AMQP-CPP/ make make install
-
若安装时出现如下报错,表明是
ssl
版本存在问题/usr/include/openssl/macros.h:147:4: error: #error "OPENSSL_API_COMPAT expresses an impossible API compatibility level" 147 | # error "OPENSSL_API_COMPAT expresses an impossible API compatibility level" | ^~~~~ In file included from /usr/include/openssl/ssl.h:18, from linux_tcp/openssl.h:20, from linux_tcp/openssl.cpp:12: /usr/include/openssl/bio.h:687:1: error: expected constructor, destructor, or type conversion before ‘DEPRECATEDIN_1_1_0’ 687 | DEPRECATEDIN_1_1_0(int BIO_get_port(const char *str, unsigned short *port_ptr))
-
解决办法:卸载当前的
ssl
库,然后重新进行修复安装dpkg -l | grep ssl sudo dpkg -P --force-all libevent-openssl-2.1-7 sudo dpkg -P --force-all openssl sudo dpkg -P --force-all libssl-dev sudo apt --fix-broken install
3.AMQP-CPP基础应用
1.概述
AMQP-CPP
是用于和RabbitMq
消息中间件进行通信的C++库- 它能够解析从
RabbitMq
服务发送过来的数据,也可以生成发往RabbitMq
的数据包。 AMQP-CPP
库自身不会向RabbitMq
建立网络连接,所有的网络IO都由用户来完成。
- 它能够解析从
AMQP-CPP
提供了可选的网络层接口,它预先定义了TCP
模块,用户就不用自己去实现网络IO,- 也能够选择
libevent、libev、libuv、asio
等异步通信组件,不过需要手动安装对应的组件。
- 也能够选择
AMQP-CPP
完全是异步的,没有阻塞式的系统调用,不使用线程就可以应用在高性能应用场景中。- 注意事项:它需要C++17的支持。
2.运用
AMQP-CPP
的使用存在两种模式:- 采用默认的
TCP
模块来进行网络通信。 - 运用扩展的
libevent、libev、libuv、asio
异步通信组件来进行通信。
- 采用默认的
- 这里以
libev
为例,不需要自己实现monitor
函数,能够直接使用AMQP::LibEvHandler
。
4.类与接口
1.Channel
-
channel
是一个虚拟连接,一个连接之上可以创建多个通道- 并且所有的
RabbitMq
指令都是通过channel
来传输的 - 所以在连接建立之后的第一步,就是创建
channel
。 - 由于所有操作都是异步的,所以在
channel
上执行指令的返回值并不能当作操作执行结果 - 实际上它返回的是
Deferred
类,能够使用它来安装处理函数。
namespace AMQP
{
/*
* 通用的回调函数,被很多延迟对象所使用
/
using SuccessCallback = std::function;
using ErrorCallback = std::function;
using FinalizeCallback = std::function; /** * 声明和删除队列 */ using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>; using DeleteCallback = std::function<void(uint32_t deletedmessages)>; using MessageCallback = std::function<void(const Message &message, uint64_t deliveryTag, bool redelivered)>; // 当使用发布者确认时,当服务器确认消息已被接收和处理时,会调用AckCallback using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>; // 使用确认包裹通道时,当消息被ack/nacked时,会调用这些回调 using PublishAckCallback = std::function<void()>; using PublishNackCallback = std::function<void()>; using PublishLostCallback = std::function<void()>; // 信道类 class Channel { Channel(Connection *connection); bool connected(); /** *声明交换机 *如果传入空名称,服务器会分配一个名称。 *以下flags可用于交换机: * *-durable 持久化,重启后交换机依然有效 *-autodelete 删除所有连接的队列后,自动删除交换 *-passive 仅被动检查交换机是否存在 *-internal 创建内部交换 * *@param name 交换机的名称 *@param-type 交换类型 enum ExchangeType { fanout, 广播交换,绑定的队列都能拿到消息 direct, 直接交换,只将消息交给routingkey一致的队列 topic, 主题交换,将消息交给符合bindingkey规则的队列 headers, consistent_hash, message_deduplication }; *@param flags 交换机标志 *@param arguments其他参数 * *此函数返回一个延迟处理程序。可以安装回调 using onSuccess(), onError() and onFinalize() methods. */ Deferred &declareExchange(const std::string_view &name, ExchangeType type, int flags, const Table &arguments); /** *声明队列 *如果不提供名称,服务器将分配一个名称。 *flags可以是以下值的组合: * *-durable 持久队列在代理重新启动后仍然有效 *-autodelete 当所有连接的使用者都离开时,自动删除队列 *-passive 仅被动检查队列是否存在 *-exclusive 队列仅存在于此连接,并且在连接断开时自动删除 * *@param name 队列的名称 *@param flags 标志组合 *@param arguments 可选参数 * *此函数返回一个延迟处理程序。可以安装回调 *使用onSuccess()、onError()和onFinalize()方法。 * Deferred &onError(const char *message) * *可以安装的onSuccess()回调应该具有以下签名: void myCallback(const std::string &name, uint32_t messageCount, uint32_t consumerCount); 例如: channel.declareQueue("myqueue").onSuccess( [](const std::string &name, uint32_t messageCount, uint32_t consumerCount) { std::cout << "Queue '" << name << "' "; std::cout << "has been declared with "; std::cout << messageCount; std::cout << " messages and "; std::cout << consumerCount; std::cout << " consumers" << std::endl; * }); */ DeferredQueue &declareQueue(const std::string_view &name, int flags, const Table &arguments); /** *将队列绑定到交换机 * *@param exchange 源交换机 *@param queue 目标队列 *@param routingkey 路由密钥 *@param arguments 其他绑定参数 * *此函数返回一个延迟处理程序。可以安装回调 *使用onSuccess()、onError()和onFinalize()方法。 */ Deferred &bindQueue(const std::string_view &exchange, const std::string_view &queue, const std::string_view &routingkey, const Table &arguments); /** *将消息发布到exchange *您必须提供交换机的名称和路由密钥。 然后,RabbitMQ将尝试将消息发送到一个或多个队列。 使用可选的flags参数,可以指定如果消息无法路由到队列时应该发生的情况。 默认情况下,不可更改的消息将被静默地丢弃。 * *如果设置了'mandatory'或'immediate'标志, 则无法处理的消息将返回到应用程序。 在开始发布之前,请确保您已经调用了recall()-方法, 并设置了所有适当的处理程序来处理这些返回的消息。 * *可以提供以下flags: * *-mandatory 如果设置,服务器将返回未发送到队列的消息 *-immediate 如果设置,服务器将返回无法立即转发给使用者的消息。 *@param exchange要发布到的交易所 *@param routingkey路由密钥 *@param envelope要发送的完整信封 *@param message要发送的消息 *@param size消息的大小 *@param flags可选标志 */ bool publish(const std::string_view &exchange, const std::string_view &routingKey, const std::string &message, int flags = 0); /** *告诉RabbitMQ服务器已准备好使用消息-也就是 订阅队列消息 * *调用此方法后,RabbitMQ开始向客户端应用程序传递消息。 consumer tag是一个字符串标识符, 如果您以后想通过channel::cancel()调用停止它, 可以使用它来标识使用者。 *如果您没有指定使用者tag,服务器将为您分配一个。 * *支持以下flags: * *-nolocal 如果设置了,则不会同时消耗在此通道上发布的消息 *-noack 如果设置了,则不必对已消费的消息进行确认 *-exclusive 请求独占访问,只有此使用者可以访问队列 * *@param queue 您要使用的队列 *@param tag 将与此消费操作关联的消费者标记 *@param flags 其他标记 *@param arguments其他参数 * *此函数返回一个延迟处理程序。 可以使用onSuccess()、onError()和onFinalize()方法安装回调 可以安装的onSuccess()回调应该具有以下格式: void myCallback(const std::string_view&tag); 样例: channel.consume("myqueue").onSuccess( [](const std::string_view& tag) { std::cout << "Started consuming under tag "; std::cout << tag << std::endl; }); */ DeferredConsumer &consume(const std::string_view &queue, const std::string_view &tag, int flags, const Table &arguments); /** *确认接收到的消息 * *消费者客户端对收到的消息进行确认应答 * *当在DeferredConsumer::onReceived()方法中接收到消息时, 必须确认该消息, 以便RabbitMQ将其从队列中删除(除非使用noack选项消费) * *支持以下标志: * *-多条确认多条消息:之前传递的所有未确认消息也会得到确认 * *@param deliveryTag 消息的唯一delivery标签 *@param flags 可选标志 *@return bool */ bool ack(uint64_t deliveryTag, int flags=0); }; class DeferredConsumer { /* 注册一个回调函数,该函数在消费者启动时被调用 void onSuccess(const std::string &consumertag) */ DeferredConsumer &onSuccess(const ConsumeCallback& callback); /* 注册回调函数,用于接收到一个完整消息的时候被调用 void MessageCallback(const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) */ DeferredConsumer &onReceived(const MessageCallback& callback); /* Alias for onReceived() */ DeferredConsumer &onMessage(const MessageCallback& callback); /* 注册要在服务器取消消费者时调用的函数 void CancelCallback(const std::string &tag) */ DeferredConsumer &onCancelled(const CancelCallback& callback); }; class Message : public Envelope { const std::string &exchange(); const std::string &routingkey(); }; class Envelope : public MetaData { const char *body(); // 获取消息正文 uint64_t bodySize(); // 获取消息正文大小 };
}
- 并且所有的
2.ev
typedef struct ev_async
{
EV_WATCHER (ev_async);
EV_ATOMIC_T sent; /* private */
}ev_async;
//break type
enum
{
EVBREAK_CANCEL = 0, /* undo unloop */
EVBREAK_ONE = 1, /* unloop once */
EVBREAK_ALL = 2 /* unloop all loops */
};
// 实例化并获取IO事件监控接口句柄
struct ev_loop *ev_default_loop (unsigned int flags EV_CPP (= 0));
# define EV_DEFAULT ev_default_loop (0)
// 开始运行IO事件监控, 这是一个阻塞接口
int ev_run (struct ev_loop *loop);
/* break out of the loop */
// 结束IO监控
// 如果在主线程进行ev_run(), 则可以直接调用,
// 如果在其他线程中进行ev_run(), 需要通过异步通知进行
void ev_break (struct ev_loop *loop, int32_t break_type) ;
void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents);
// 初始化异步事件结构, 并设置回调函数
void ev_async_init(ev_async *w, callback cb);
// 启动事件监控循环中的异步任务处理
void ev_async_start(struct ev_loop *loop, ev_async *w);
// 发送当前异步事件到异步线程中执行
void ev_async_send(struct ev_loop *loop, ev_async *w);
5.应用
1.publish.cc
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
int main()
{
// 1.实例化底层网络通信框架的IO事件监控句柄
文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/13115.html