C++中第三方RabbitMQ库的深入探究

文章标题:

对C++中第三方RabbitMQ库的深度剖析

文章内容:

目录

  • 1.引言
  • 2.安装
    • 1.RabbitMq
    • 2.客户端库
  • 3.AMQP-CPP基础应用
    • 1.概述
    • 2.运用
  • 4.类与接口
    • 1.Channel
    • 2.ev
  • 5.应用
    • 1.publish.cc
    • 2.consume.cc
    • 3.makefile

1.引言

  • RabbitMQ:是一种消息队列组件,能够实现两个客户端主机之间消息的传输功能,包括发布和订阅操作。
  • 核心概念:涵盖交换机、队列、绑定以及消息等要素。
  • 交换机类型
    • 广播交换:当交换机接收到消息后,会把消息发布到所有与之绑定的队列中。
    • 直接交换:依据消息里的bkey和绑定的rkey进行对比,若一致则将消息放入对应的队列。
    • 主题交换:利用bkey和绑定的rkey进行规则匹配,匹配成功就把消息放入队列。

2.安装

1.RabbitMq

  • 安装方法:通过执行sudo apt install rabbitmq-server命令来进行安装。
  • 简单使用示例
    # 安装完成时默认有guest用户,但权限不足,需要创建一个administrator用户以便远程登录和进行消息的发布订阅操作
    
    #添加用户 
    sudo rabbitmqctl add_user root <PASSWORD>
    
    #设置用户tag 
    sudo rabbitmqctl set_user_tags root administrator
    
    #设置用户权限 
    sudo rabbitmqctl set_permissions -p / root "." "." ".*"
    
    # RabbitMQ自带web管理界面,执行以下命令开启,默认端口15672
    sudo rabbitmq-plugins enable rabbitmq_management
    

2.客户端库

  • C语言库
  • C++库

    sudo apt install libev-dev #libev网络库组件
    git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
    cd AMQP-CPP/
    make
    make install
    
  • 若安装时出现如下报错,表明是ssl版本存在问题

    /usr/include/openssl/macros.h:147:4: error: #error 
    "OPENSSL_API_COMPAT expresses an impossible API compatibility 
    level" 
      147 | #  error "OPENSSL_API_COMPAT expresses an impossible API 
    compatibility level" 
          |    ^~~~~ 
    In file included from /usr/include/openssl/ssl.h:18, 
                     from linux_tcp/openssl.h:20, 
                     from linux_tcp/openssl.cpp:12: 
    /usr/include/openssl/bio.h:687:1: error: expected constructor, 
    destructor, or type conversion before ‘DEPRECATEDIN_1_1_0’ 
      687 | DEPRECATEDIN_1_1_0(int BIO_get_port(const char *str, 
    unsigned short *port_ptr))
    
  • 解决办法:卸载当前的ssl库,然后重新进行修复安装

    dpkg -l | grep ssl
    sudo dpkg -P --force-all libevent-openssl-2.1-7
    sudo dpkg -P --force-all openssl
    sudo dpkg -P --force-all libssl-dev
    sudo apt --fix-broken install
    

3.AMQP-CPP基础应用

1.概述

  • AMQP-CPP是用于和RabbitMq消息中间件进行通信的C++库
    • 它能够解析从RabbitMq服务发送过来的数据,也可以生成发往RabbitMq的数据包。
    • AMQP-CPP库自身不会向RabbitMq建立网络连接,所有的网络IO都由用户来完成。
  • AMQP-CPP提供了可选的网络层接口,它预先定义了TCP模块,用户就不用自己去实现网络IO,
    • 也能够选择libevent、libev、libuv、asio等异步通信组件,不过需要手动安装对应的组件。
  • AMQP-CPP完全是异步的,没有阻塞式的系统调用,不使用线程就可以应用在高性能应用场景中。
  • 注意事项:它需要C++17的支持。

2.运用

  • AMQP-CPP的使用存在两种模式:
    • 采用默认的TCP模块来进行网络通信。
    • 运用扩展的libevent、libev、libuv、asio异步通信组件来进行通信。
  • 这里以libev为例,不需要自己实现monitor函数,能够直接使用AMQP::LibEvHandler

4.类与接口

1.Channel

  • channel是一个虚拟连接,一个连接之上可以创建多个通道

    • 并且所有的RabbitMq指令都是通过channel来传输的
    • 所以在连接建立之后的第一步,就是创建channel
    • 由于所有操作都是异步的,所以在channel上执行指令的返回值并不能当作操作执行结果
    • 实际上它返回的是Deferred类,能够使用它来安装处理函数。

    namespace AMQP
    {
    /*
    * 通用的回调函数,被很多延迟对象所使用
    /
    using SuccessCallback = std::function;
    using ErrorCallback = std::function;
    using FinalizeCallback = std::function;

    /** 
     * 声明和删除队列 
     */ 
    using QueueCallback = std::function<void(const std::string &name, 
                                                   uint32_t messagecount, 
                                                   uint32_t consumercount)>;
    using DeleteCallback = std::function<void(uint32_t deletedmessages)>; 
    using MessageCallback = std::function<void(const Message &message, 
                                                   uint64_t deliveryTag, 
                                                   bool redelivered)>;
    
    // 当使用发布者确认时,当服务器确认消息已被接收和处理时,会调用AckCallback 
    using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;
    
    // 使用确认包裹通道时,当消息被ack/nacked时,会调用这些回调 
    using PublishAckCallback = std::function<void()>; 
    using PublishNackCallback = std::function<void()>; 
    using PublishLostCallback = std::function<void()>;
    
    // 信道类
    class Channel 
    { 
        Channel(Connection *connection); 
        bool connected();
        /** 
            *声明交换机 
            *如果传入空名称,服务器会分配一个名称。 
            *以下flags可用于交换机: 
            * 
            *-durable     持久化,重启后交换机依然有效 
            *-autodelete  删除所有连接的队列后,自动删除交换 
            *-passive     仅被动检查交换机是否存在 
            *-internal    创建内部交换 
            * 
            *@param name    交换机的名称 
            *@param-type    交换类型 
                enum ExchangeType 
                { 
                    fanout,  广播交换,绑定的队列都能拿到消息 
                    direct,  直接交换,只将消息交给routingkey一致的队列 
                    topic,   主题交换,将消息交给符合bindingkey规则的队列 
                    headers, 
                    consistent_hash, 
                    message_deduplication 
                }; 
            *@param flags    交换机标志 
            *@param arguments其他参数 
            * 
            *此函数返回一个延迟处理程序。可以安装回调 
            using onSuccess(), onError() and onFinalize() methods. 
        */ 
        Deferred &declareExchange(const std::string_view &name,
                                  ExchangeType type,
                                  int flags,
                                  const Table &arguments);
        /** 
            *声明队列 
            *如果不提供名称,服务器将分配一个名称。 
            *flags可以是以下值的组合: 
            * 
            *-durable 持久队列在代理重新启动后仍然有效 
            *-autodelete 当所有连接的使用者都离开时,自动删除队列 
            *-passive 仅被动检查队列是否存在 
            *-exclusive 队列仅存在于此连接,并且在连接断开时自动删除 
            * 
            *@param name        队列的名称 
            *@param flags       标志组合 
            *@param arguments  可选参数 
            * 
            *此函数返回一个延迟处理程序。可以安装回调 
            *使用onSuccess()、onError()和onFinalize()方法。 
            * 
            Deferred &onError(const char *message) 
            * 
            *可以安装的onSuccess()回调应该具有以下签名: 
            void myCallback(const std::string &name,  
                uint32_t messageCount,  
                uint32_t consumerCount); 
            例如: 
            channel.declareQueue("myqueue").onSuccess( 
                [](const std::string &name,  
                    uint32_t messageCount, 
                    uint32_t consumerCount) { 
                       std::cout << "Queue '" << name << "' "; 
                       std::cout << "has been declared with "; 
                       std::cout << messageCount; 
                       std::cout << " messages and "; 
                       std::cout << consumerCount; 
                       std::cout << " consumers" << std::endl; 
         *  }); 
        */ 
        DeferredQueue &declareQueue(const std::string_view &name,
                                    int flags,
                                    const Table &arguments);
        /** 
            *将队列绑定到交换机 
            * 
            *@param exchange     源交换机 
            *@param queue        目标队列 
            *@param routingkey   路由密钥 
            *@param arguments    其他绑定参数 
            * 
            *此函数返回一个延迟处理程序。可以安装回调 
            *使用onSuccess()、onError()和onFinalize()方法。 
        */ 
        Deferred &bindQueue(const std::string_view &exchange,
                            const std::string_view &queue,
                            const std::string_view &routingkey,
                            const Table &arguments);
        /** 
            *将消息发布到exchange
            *您必须提供交换机的名称和路由密钥。 
            然后,RabbitMQ将尝试将消息发送到一个或多个队列。 
            使用可选的flags参数,可以指定如果消息无法路由到队列时应该发生的情况。
            默认情况下,不可更改的消息将被静默地丢弃。 
            *  
            *如果设置了'mandatory'或'immediate'标志, 
            则无法处理的消息将返回到应用程序。 
            在开始发布之前,请确保您已经调用了recall()-方法, 
            并设置了所有适当的处理程序来处理这些返回的消息。 
            *  
            *可以提供以下flags: 
            *  
            *-mandatory 如果设置,服务器将返回未发送到队列的消息 
            *-immediate 如果设置,服务器将返回无法立即转发给使用者的消息。
    
            *@param exchange要发布到的交易所 
            *@param routingkey路由密钥 
            *@param envelope要发送的完整信封 
            *@param message要发送的消息 
            *@param size消息的大小 
            *@param flags可选标志 
        */ 
        bool publish(const std::string_view &exchange,
                     const std::string_view &routingKey,
                     const std::string &message,
                     int flags = 0);
    
        /** 
            *告诉RabbitMQ服务器已准备好使用消息-也就是 订阅队列消息 
            * 
            *调用此方法后,RabbitMQ开始向客户端应用程序传递消息。 
            consumer tag是一个字符串标识符, 
            如果您以后想通过channel::cancel()调用停止它, 
            可以使用它来标识使用者。 
            *如果您没有指定使用者tag,服务器将为您分配一个。 
            * 
            *支持以下flags: 
            * 
            *-nolocal    如果设置了,则不会同时消耗在此通道上发布的消息 
            *-noack      如果设置了,则不必对已消费的消息进行确认 
            *-exclusive  请求独占访问,只有此使用者可以访问队列 
            * 
            *@param queue    您要使用的队列 
            *@param tag      将与此消费操作关联的消费者标记 
            *@param flags    其他标记 
            *@param arguments其他参数 
            * 
            *此函数返回一个延迟处理程序。 
            可以使用onSuccess()、onError()和onFinalize()方法安装回调
    
            可以安装的onSuccess()回调应该具有以下格式: 
                void myCallback(const std::string_view&tag); 
            样例: 
            channel.consume("myqueue").onSuccess( 
                [](const std::string_view& tag) { 
                    std::cout << "Started consuming under tag "; 
                    std::cout << tag << std::endl; 
            }); 
        */ 
        DeferredConsumer &consume(const std::string_view &queue,
                                  const std::string_view &tag,
                                  int flags,
                                  const Table &arguments);
        /** 
            *确认接收到的消息 
            *
            *消费者客户端对收到的消息进行确认应答
            *
            *当在DeferredConsumer::onReceived()方法中接收到消息时, 
            必须确认该消息, 
            以便RabbitMQ将其从队列中删除(除非使用noack选项消费)
            * 
            *支持以下标志: 
            * 
            *-多条确认多条消息:之前传递的所有未确认消息也会得到确认 
            * 
            *@param deliveryTag    消息的唯一delivery标签 
            *@param flags          可选标志 
            *@return bool 
        */ 
        bool ack(uint64_t deliveryTag, int flags=0);
    };
    
    class DeferredConsumer 
    { 
        /* 
            注册一个回调函数,该函数在消费者启动时被调用
            void onSuccess(const std::string &consumertag) 
        */ 
        DeferredConsumer &onSuccess(const ConsumeCallback& callback);
        /* 
            注册回调函数,用于接收到一个完整消息的时候被调用 
            void MessageCallback(const AMQP::Message &message,  
                uint64_t deliveryTag, bool redelivered) 
        */ 
        DeferredConsumer &onReceived(const MessageCallback& callback);
        /* Alias for onReceived() */ 
        DeferredConsumer &onMessage(const MessageCallback& callback);
    
        /* 
            注册要在服务器取消消费者时调用的函数 
            void CancelCallback(const std::string &tag) 
        */ 
        DeferredConsumer &onCancelled(const CancelCallback& callback);
    };
    
    class Message : public Envelope
    { 
        const std::string &exchange();
        const std::string &routingkey();
    };
    
    class Envelope : public MetaData
    { 
        const char *body();  // 获取消息正文
        uint64_t bodySize(); // 获取消息正文大小
    };
    

    }


2.ev

typedef struct ev_async 
{ 
    EV_WATCHER (ev_async);
    EV_ATOMIC_T sent; /* private */ 
}ev_async;

//break type 
enum 
{ 
    EVBREAK_CANCEL = 0, /* undo unloop */ 
    EVBREAK_ONE    = 1, /* unloop once */ 
    EVBREAK_ALL    = 2  /* unloop all loops */ 
};

// 实例化并获取IO事件监控接口句柄
struct ev_loop *ev_default_loop (unsigned int flags EV_CPP (= 0));
# define EV_DEFAULT  ev_default_loop (0)

// 开始运行IO事件监控, 这是一个阻塞接口
int  ev_run (struct ev_loop *loop);

/* break out of the loop */
// 结束IO监控
// 如果在主线程进行ev_run(), 则可以直接调用,
// 如果在其他线程中进行ev_run(), 需要通过异步通知进行
void ev_break (struct ev_loop *loop, int32_t break_type) ;

void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents);

// 初始化异步事件结构, 并设置回调函数
void ev_async_init(ev_async *w, callback cb);
// 启动事件监控循环中的异步任务处理
void ev_async_start(struct ev_loop *loop, ev_async *w); 
// 发送当前异步事件到异步线程中执行
void ev_async_send(struct ev_loop *loop, ev_async *w);

5.应用

1.publish.cc

#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>

int main()
{
    // 1.实例化底层网络通信框架的IO事件监控句柄

文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/13115.html

(0)
LomuLomu
上一篇 1天前
下一篇 1天前

相关推荐

  • Redis Stream:消息队列的策略抉择

    文章标题:Redis Stream:消息队列的策略选择 文章内容:在人员规模较大的企业中,通常会有专门的部门来维护成熟的消息队列,以此方便多个业务方实现解耦。而在小型公司里,则需考量成本因素,多依靠个人来处理相关事宜。 有的简易场景下,可能仅仅是借助 Redis list 或者 MySQL 来存储一些状态,若出现问题时自行进行手工补偿,也并非不可行。 当下呈…

    2025 年 6 月 19 日
    10000
  • 2025最新PyCharm永久激活码及破解教程(亲测有效,支持2099年)🔥

    本教程适用于Jetbrains全家桶,包括IDEA、PyCharm、DataGrip、Goland等开发工具!💯 先给大家看看最新版PyCharm成功破解后的效果✨,有效期直接延长到2099年,简直不要太爽! 下面我就手把手教大家如何激活PyCharm,这个方法同样适用于旧版本哦~ 无论你是Windows、Mac还是Linux系统 无论你使用哪个版本 统统都…

    2025 年 6 月 1 日
    1.1K00
  • 2025年最新PyCharm激活码及永久破解教程(支持2099年)

    本方法适用于Jetbrains全家桶,包括PyCharm、IDEA、DataGrip、Goland等开发工具! 先给大家看看最新PyCharm版本成功破解的截图,可以看到已经完美激活到2099年,非常稳定可靠! 下面我将用详细的图文教程,手把手教你如何将PyCharm永久激活至2099年。 这个方法不仅适用于最新版本,之前的旧版本也同样有效! 支持Windo…

    PyCharm激活码 2025 年 7 月 6 日
    8500
  • DataGrip 2025最新破解指南:零成本获取注册码激活至2099年(详细教程)

    JetBrains的DataGrip是一款功能强大的数据库IDE,提供了智能SQL编辑、高级搜索和替换、数据导出、版本控制等众多实用功能。它支持几乎所有主流数据库系统,包括MySQL、PostgreSQL、Microsoft SQL Server、Oracle等,是数据库开发人员和DBA的得力助手。然而,这款优秀的工具需要付费使用,一年订阅费用对于许多个人开…

    2025 年 4 月 28 日
    49700
  • 2025年最新IDEA激活码永久破解教程 – IDEA注册码及破解方法全解析

    JetBrains全家桶通用破解指南(IDEA/PyCharm/DataGrip等) 先给大家展示最新IDEA版本成功破解的截图,可以看到有效期已经延长至2099年! 下面将用详细的图文教程,手把手教你如何永久激活IDEA。这个方法同样适用于旧版本,无论你使用什么操作系统或版本,都能找到对应的解决方案。 第一步:获取IDEA安装包 若已安装可跳过此步骤 前往…

    IDEA破解教程 2025 年 8 月 3 日
    5800

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信