vivo Pulsar万亿消息处理中KoP指标异常的修复历程(其三)

文章标题:

vivo Pulsar万亿消息处理中KoP指标异常的修复过程(其三)

文章内容:

作者:vivo 互联网大数据技术团队- Chen Jianbo

本文是《vivo Pulsar万亿级消息处理实践》系列文章的第3篇。

Pulsar属于Apache基金会的开源分布式流处理平台与消息中间件,它实现了Kafka协议,使得使用Kafka API的应用能够直接迁移至Pulsar,这让Pulsar在Kafka生态系统中更易被接纳和运用。KoP提供了从Kafka到Pulsar的无缝转换,用户可借助Kafka API操作Pulsar集群,保留了Kafka的广泛用户基础与丰富生态系统。它助力Pulsar更好地与Kafka整合,带来更优的消息传输性能、更强的兼容性及可扩展性。vivo在使用Pulsar KoP时碰到过一些问题,本篇主要分享一个分区消费指标缺失的问题。

系列文章:

  1. vivo Pulsar万亿级消息处理实践(1)-数据发送原理解析和性能调优

  2. vivo Pulsar万亿级消息处理实践(2)-从0到1建设Pulsar指标监控链路

文章太长?1分钟看图抓住核心观点👇

一、问题背景

在一次版本的灰度升级过程中,我们发现某个使用KoP的业务topic的消费速率出现了明显下降,具体情况如下图示:

是什么原因致使正常的升级重启服务器会引发这个问题呢?直接查看上报采集的数据报文:

kop_server_MESSAGE_OUT{group="",partition="0",tenant="kop",topic="persistent://kop-tenant/kop-ns/service-raw-stats"} 3
kop_server_BYTES_OUT{group="",partition="0",tenant="kop",topic="persistent://kop-tenant/kop-ns/service-raw-stats"} 188

我们看到,KoP消费指标kop_server_MESSAGE_OUT、kop_server_BYTES_OUT有上报,但指标数据里的group标签为空串(缺少消费组名称),分区的消费指标无法展示。那是什么导致消费组名称缺失呢?

二、问题分析

1、找到问题代码

我们去查找这个消费组名称是在何处获取的,看逻辑是否存在问题。依据druid中kop_subscription对应的消费指标kop_server_MESSAGE_OUT、kop_server_BYTES_OUT,找到相关代码如下:

private void handleEntries(final List<Entry> entries,
                               final TopicPartition topicPartition,
                               final FetchRequest.PartitionData partitionData,
                               final KafkaTopicConsumerManager tcm,
                               final ManagedCursor cursor,
                               final AtomicLong cursorOffset,
                               final boolean readCommitted) {
....
        // 处理消费数据时,获取消费组名称
        CompletableFuture<String> groupNameFuture = requestHandler
                .getCurrentConnectedGroup()
                .computeIfAbsent(clientHost, clientHost -> {
                    CompletableFuture<String> future = new CompletableFuture<>();
                    String groupIdPath = GroupIdUtils.groupIdPathFormat(clientHost, header.clientId());
                    requestHandler.getMetadataStore()
                            .get(requestHandler.getGroupIdStoredPath() + groupIdPath)
                            .thenAccept(getResultOpt -> {
                                if (getResultOpt.isPresent()) {
                                    GetResult getResult = getResultOpt.get();
                                    future.complete(new String(getResult.getValue() == null
                                            ? new byte[0] : getResult.getValue(), StandardCharsets.UTF_8));
                                } else {
                                    // 从zk节点 /client_group_id/xxx 获取不到消费组,消费组就是空的
                                    future.complete("");
                                }
                            }).exceptionally(ex -> {
                                future.completeExceptionally(ex);
                                return null;
                            });
                    returnfuture;
                });

        // this part is heavyweight, and we should not execute in the ManagedLedger Ordered executor thread
        groupNameFuture.whenCompleteAsync((groupName, ex) -> {
            if (ex != null) {
                log.error("Get groupId failed.", ex);
                groupName = "";
            }
.....
            // 获得消费组名称后,记录消费组对应的消费指标
            decodeResult.updateConsumerStats(topicPartition,
                    entries.size(),
                    groupName,
                    statsLogger);

代码的逻辑是,从requestHandler的currentConnectedGroup(map)中通过host获取groupName,不存在则通过MetadataStore(带缓存的zk存储对象)获取,如果zk缓存也没有,再发起zk读请求(路径为/client_group_id/host-clientId)。读取到消费组名称后,用它来更新消费组指标。从复现的集群确定走的是这个分支,即是从metadataStore(带缓存的zk客户端)获取不到对应zk节点/client_group_id/xxx。

2、查找可能导致zk节点/client_group_id/xxx节点获取不到的原因

有两种可能性:一是没写进去,二是写进去但是被删除了。

    @Override
    protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinator,
                                                CompletableFuture<AbstractResponse> resultFuture) {
...
        // Store group name to metadata store for current client, use to collect consumer metrics.
        storeGroupId(groupId, groupIdPath)
                .whenComplete((stat, ex) -> {
                    if (ex != null) {
                        // /client_group_id/xxx节点写入失败
                        log.warn("Store groupId failed, the groupId might already stored.", ex);
                    }
                    findBroker(TopicName.get(pulsarTopicName))
                            .whenComplete((node, throwable) -> {
                                ....
                            });
                });
...

从代码看到,clientId与groupId的关联关系是通过handleFindCoordinatorRequest(FindCoordinator)写进去的,而且只有这个方法入口。由于没有找到warn日志,排除了第一种没写进去的可能性。看看删除的逻辑:

protected void close(){
    if (isActive.getAndSet(false)) {
        ...
        currentConnectedClientId.forEach(clientId -> {
            String path = groupIdStoredPath + GroupIdUtils.groupIdPathFormat(clientHost, clientId);
            // 删除zk上的 /client_group_id/xxx 节点
            metadataStore.delete(path, Optional.empty())
                    .whenComplete((__, ex) -> {
                        if (ex != null) {
                            if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
                                if (log.isDebugEnabled()) {
                                    log.debug("The groupId store path doesn't exist. Path: [{}]", path);
                                }
                                return;
                            }
                            log.error("Delete groupId failed. Path: [{}]", path, ex);
                            return;
                        }
                        if (log.isDebugEnabled()) {
                            log.debug("Delete groupId success. Path: [{}]", path);
                        }
                    });
        });
    }
}

删除是在requsetHandler.close方法中执行,也就是说连接断开就会触发zk节点删除。

但有几个疑问:

  • /client_group_id/xxx 到底是干嘛用的?消费指标为什么要依赖它

  • 为什么要在handleFindCoordinatorRequest写入?

  • 节点/client_group_id/xxx为什么要删除,而且是在连接断开时删除,删除时机是否有问题?

首先回答第1个问题,通过阅读代码可以知道,/client_group_id/xxx 这个zk节点是用于在不同broker实例间交换数据用的(相当redis cache),用于临时存放IP+clientId与groupId的映射关系。由于fetch接口(拉取数据)的request没有groupId的,只能依赖加入Group过程中的元数据,在fetch消费时才能知道当前拉数据的consumer是哪个消费组的。

3、复现

若要解决问题,最好能够稳定地复现出问题,这样才能确定问题的根本原因,并且确认修复是否完成。

因为节点是在requsetHandle.close方法中执行删除,broker节点关闭会触发连接关闭,进而触发删除。假设:客户端通过brokerA发起FindCoordinator请求,写入zk节点/client_group_id/xxx,同时请求返回brokerB作为Coordinator,后续与brokerB进行joinGroup、syncGroup等交互确定消费关系,客户端在brokerA、brokerB、brokerC都有分区消费。这时重启brokerA,分区均衡到BrokerC上,但此时/client_group_id/xxx因关闭broker而断开连接被删除,consumer消费刚转移到topic1-partition-1的分区就无法获取到groupId。

按照假设,有3个broker,开启生产和消费,通过在FindCoordinator返回前获取node.leader()的返回节点BrokerB,关闭brokerA后,brokerC出现断点复现,再关闭brokerC,brokerA也会复现(假设分区在brokerA与brokerC之间转移)。

复现要几个条件:

  1. broker数量要足够多(不小于3个)

  2. broker内部有zk缓存metadataCache默认为5分钟,可以把时间调小为1毫秒,相当于没有cache

  3. findCoordinator返回的必须是其他broker的IP

  4. 重启的必须是接收到findCoordinator请求那台broker,而不是真正的coordinator,这时会从zk删除节点

  5. 分区转移到其他broker,这时新的broker会重新读取zk节点数据

到此,我们基本上清楚了问题原因:连接关闭导致zk节点被删除了,别的broker节点需要时就读取不到了。那怎么解决?

三、问题解决

方案一

既然知道把消费者与FindCoordinator的连接进行绑定不合适的,那么是否应该把FindCoordinator写入zk节点换成由JoinGroup写入,断连即删除。

consumer统一由Coordinator管理,由于FindCoordinator接口不一定是Coordinator处理的,如果换成由Coordinator处理的JoinGroup接口是否就可以了,这样consumer断开与Coordinator的连接就应该删除数据。但实现验证时却发现,客户端在断连后也不会再重连,所以没法重新写入zk,不符合预期。

方案二

还是由FindCoordinator写入zk节点,但删除改为GroupCoordinator监听consumer断开触发。

因为consumer统一由Coordinator管理,它能监听到consumer加入或者离开。GroupCoordinator的removeMemberAndUpdateGroup方法是coordinator对consumer成员管理。

private void removeMemberAndUpdateGroup(GroupMetadata group,
                                        MemberMetadata member) {
    group.remove(member.memberId());
    switch (group.currentState()) {
        case Dead:
        case Empty:
            return;
        case Stable:
        case CompletingRebalance:
            maybePrepareRebalance(group);
            break;
        case PreparingRebalance:
            joinPurgatory.checkAndComplete(new GroupKey(group.groupId()));
            break;
        default:
            break;
    }
    // 删除 /client_group_id/xxx 节点
    deleteClientIdGroupMapping(group, member.clientHost(), member.clientId());
}

调用入口有两个,其中handleLeaveGroup是主动离开,onExpireHeartbeat是超时被动离开,客户端正常退出或者宕机都可以调用removeMemberAndUpdateGroup方法触发删除。

```
public CompletableFuture handleLeaveGroup(
    String groupId,
    String memberId
) {
    return validateGroupStatus(groupId, ApiKeys.LEAVE_GROUP).map(error ->
        CompletableFuture.completedFuture(error)
    ).orElseGet(() -> {
        return groupManager.getGroup(groupId).map(group -> {
            return group.inLock(() -> {
                if (group.is(Dead) || !group.has(memberId)) {
                    return CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID);
                } else {
                    ...
                
                    // 触发删除消费者consumer
                    removeMemberAndUpdateGroup(group, member);
                    return CompletableFuture.completedFuture(Errors.NONE);
                }
            });
        })
        ....
    });
}

void onExpireHeartbeat(GroupMetadata group,
                       MemberMetadata member,
                       long heartbeatDeadline) {
    group.inLock(() -> {
        if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
            log.info("Member {} in group

文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/12993.html

(0)
LomuLomu
上一篇 20小时前
下一篇 11小时前

相关推荐

  • 2025年最新DataGrip永久破解教程(附激活码/注册码)🔥

    🚀 本教程适用于Jetbrains全家桶,包括IDEA、PyCharm、DataGrip、Golang等所有产品! 先给大家看看最新版本的破解成果,直接续命到2099年,简直不要太爽!💯 下面我就用详细的图文教程,手把手教你如何永久激活DataGrip。这个方法同样适用于旧版本哦! ✨ 无论你用的是Windows、Mac还是Linux系统,都能完美破解! 第…

    DataGrip激活码 2025 年 6 月 17 日
    18400
  • 2025年最新IDEA激活码及永久破解教程:支持JetBrains全家桶

    前言 本教程适用于JetBrains系列所有开发工具,包括但不限于IDEA、PyCharm、DataGrip和Goland等。下面先展示最新IDEA版本成功破解的截图,可以看到授权有效期已延长至2099年! 接下来,我将通过详细的图文步骤,手把手教你如何将IDEA激活至2099年。这个方法同样适用于旧版本,无论你使用什么操作系统或哪个版本,都能找到对应的解决…

    IDEA破解教程 2025 年 7 月 3 日
    5200
  • 2024 PyCharm最新激活码,PyCharm永久免费激活码2025-01-15 更新

    PyCharm 2024最新激活码 以下是最新的PyCharm激活码,更新时间:2025-01-15 🔑 激活码使用说明 1️⃣ 复制下方激活码 2️⃣ 打开 PyCharm 软件 3️⃣ 在菜单栏中选择 Help -> Register 4️⃣ 选择 Activation Code 5️⃣ 粘贴激活码,点击 Activate ⚠️ 必看!必看! 🔥 获取最…

    2025 年 1 月 15 日
    47100
  • 2025年最新PyCharm激活码分享及永久破解教程(支持2099年)

    本教程适用于JetBrains全家桶,包括IDEA、PyCharm、DataGrip、Golang等所有产品! 先给大家看看最新PyCharm版本成功破解的截图,可以看到已经完美激活到2099年! 下面我将通过详细的图文步骤,手把手教你如何永久激活PyCharm至2099年。 这个方法适用于:- 所有操作系统(Windows/Mac/Linux)- 任何版本…

    2025 年 5 月 10 日
    15400
  • 2025年最新PyCharm永久破解教程:亲测可用激活码分享

    前言 本教程适用于JetBrains全家桶所有产品,包括但不限于:IDEA、PyCharm、DataGrip、Goland等开发工具。下面先展示最新版PyCharm成功破解至2099年的截图: 准备工作 下载PyCharm安装包 若已安装可跳过此步骤 访问PyCharm官网:https://www.jetbrains.com/pycharm/download…

    2025 年 5 月 13 日
    21400

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信