文章标题:
vivo Pulsar万亿消息处理中KoP指标异常的修复过程(其三)
文章内容:
作者:vivo 互联网大数据技术团队- Chen Jianbo
本文是《vivo Pulsar万亿级消息处理实践》系列文章的第3篇。
Pulsar属于Apache基金会的开源分布式流处理平台与消息中间件,它实现了Kafka协议,使得使用Kafka API的应用能够直接迁移至Pulsar,这让Pulsar在Kafka生态系统中更易被接纳和运用。KoP提供了从Kafka到Pulsar的无缝转换,用户可借助Kafka API操作Pulsar集群,保留了Kafka的广泛用户基础与丰富生态系统。它助力Pulsar更好地与Kafka整合,带来更优的消息传输性能、更强的兼容性及可扩展性。vivo在使用Pulsar KoP时碰到过一些问题,本篇主要分享一个分区消费指标缺失的问题。
系列文章:
文章太长?1分钟看图抓住核心观点👇
一、问题背景
在一次版本的灰度升级过程中,我们发现某个使用KoP的业务topic的消费速率出现了明显下降,具体情况如下图示:
是什么原因致使正常的升级重启服务器会引发这个问题呢?直接查看上报采集的数据报文:
kop_server_MESSAGE_OUT{group="",partition="0",tenant="kop",topic="persistent://kop-tenant/kop-ns/service-raw-stats"} 3
kop_server_BYTES_OUT{group="",partition="0",tenant="kop",topic="persistent://kop-tenant/kop-ns/service-raw-stats"} 188
我们看到,KoP消费指标kop_server_MESSAGE_OUT、kop_server_BYTES_OUT有上报,但指标数据里的group标签为空串(缺少消费组名称),分区的消费指标无法展示。那是什么导致消费组名称缺失呢?
二、问题分析
1、找到问题代码
我们去查找这个消费组名称是在何处获取的,看逻辑是否存在问题。依据druid中kop_subscription对应的消费指标kop_server_MESSAGE_OUT、kop_server_BYTES_OUT,找到相关代码如下:
private void handleEntries(final List<Entry> entries,
final TopicPartition topicPartition,
final FetchRequest.PartitionData partitionData,
final KafkaTopicConsumerManager tcm,
final ManagedCursor cursor,
final AtomicLong cursorOffset,
final boolean readCommitted) {
....
// 处理消费数据时,获取消费组名称
CompletableFuture<String> groupNameFuture = requestHandler
.getCurrentConnectedGroup()
.computeIfAbsent(clientHost, clientHost -> {
CompletableFuture<String> future = new CompletableFuture<>();
String groupIdPath = GroupIdUtils.groupIdPathFormat(clientHost, header.clientId());
requestHandler.getMetadataStore()
.get(requestHandler.getGroupIdStoredPath() + groupIdPath)
.thenAccept(getResultOpt -> {
if (getResultOpt.isPresent()) {
GetResult getResult = getResultOpt.get();
future.complete(new String(getResult.getValue() == null
? new byte[0] : getResult.getValue(), StandardCharsets.UTF_8));
} else {
// 从zk节点 /client_group_id/xxx 获取不到消费组,消费组就是空的
future.complete("");
}
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
returnfuture;
});
// this part is heavyweight, and we should not execute in the ManagedLedger Ordered executor thread
groupNameFuture.whenCompleteAsync((groupName, ex) -> {
if (ex != null) {
log.error("Get groupId failed.", ex);
groupName = "";
}
.....
// 获得消费组名称后,记录消费组对应的消费指标
decodeResult.updateConsumerStats(topicPartition,
entries.size(),
groupName,
statsLogger);
代码的逻辑是,从requestHandler的currentConnectedGroup(map)中通过host获取groupName,不存在则通过MetadataStore(带缓存的zk存储对象)获取,如果zk缓存也没有,再发起zk读请求(路径为/client_group_id/host-clientId)。读取到消费组名称后,用它来更新消费组指标。从复现的集群确定走的是这个分支,即是从metadataStore(带缓存的zk客户端)获取不到对应zk节点/client_group_id/xxx。
2、查找可能导致zk节点/client_group_id/xxx节点获取不到的原因
有两种可能性:一是没写进去,二是写进去但是被删除了。
@Override
protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinator,
CompletableFuture<AbstractResponse> resultFuture) {
...
// Store group name to metadata store for current client, use to collect consumer metrics.
storeGroupId(groupId, groupIdPath)
.whenComplete((stat, ex) -> {
if (ex != null) {
// /client_group_id/xxx节点写入失败
log.warn("Store groupId failed, the groupId might already stored.", ex);
}
findBroker(TopicName.get(pulsarTopicName))
.whenComplete((node, throwable) -> {
....
});
});
...
从代码看到,clientId与groupId的关联关系是通过handleFindCoordinatorRequest(FindCoordinator)写进去的,而且只有这个方法入口。由于没有找到warn日志,排除了第一种没写进去的可能性。看看删除的逻辑:
protected void close(){
if (isActive.getAndSet(false)) {
...
currentConnectedClientId.forEach(clientId -> {
String path = groupIdStoredPath + GroupIdUtils.groupIdPathFormat(clientHost, clientId);
// 删除zk上的 /client_group_id/xxx 节点
metadataStore.delete(path, Optional.empty())
.whenComplete((__, ex) -> {
if (ex != null) {
if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
if (log.isDebugEnabled()) {
log.debug("The groupId store path doesn't exist. Path: [{}]", path);
}
return;
}
log.error("Delete groupId failed. Path: [{}]", path, ex);
return;
}
if (log.isDebugEnabled()) {
log.debug("Delete groupId success. Path: [{}]", path);
}
});
});
}
}
删除是在requsetHandler.close方法中执行,也就是说连接断开就会触发zk节点删除。
但有几个疑问:
-
/client_group_id/xxx 到底是干嘛用的?消费指标为什么要依赖它
-
为什么要在handleFindCoordinatorRequest写入?
-
节点/client_group_id/xxx为什么要删除,而且是在连接断开时删除,删除时机是否有问题?
首先回答第1个问题,通过阅读代码可以知道,/client_group_id/xxx 这个zk节点是用于在不同broker实例间交换数据用的(相当redis cache),用于临时存放IP+clientId与groupId的映射关系。由于fetch接口(拉取数据)的request没有groupId的,只能依赖加入Group过程中的元数据,在fetch消费时才能知道当前拉数据的consumer是哪个消费组的。
3、复现
若要解决问题,最好能够稳定地复现出问题,这样才能确定问题的根本原因,并且确认修复是否完成。
因为节点是在requsetHandle.close方法中执行删除,broker节点关闭会触发连接关闭,进而触发删除。假设:客户端通过brokerA发起FindCoordinator请求,写入zk节点/client_group_id/xxx,同时请求返回brokerB作为Coordinator,后续与brokerB进行joinGroup、syncGroup等交互确定消费关系,客户端在brokerA、brokerB、brokerC都有分区消费。这时重启brokerA,分区均衡到BrokerC上,但此时/client_group_id/xxx因关闭broker而断开连接被删除,consumer消费刚转移到topic1-partition-1的分区就无法获取到groupId。
按照假设,有3个broker,开启生产和消费,通过在FindCoordinator返回前获取node.leader()的返回节点BrokerB,关闭brokerA后,brokerC出现断点复现,再关闭brokerC,brokerA也会复现(假设分区在brokerA与brokerC之间转移)。
复现要几个条件:
-
broker数量要足够多(不小于3个)
-
broker内部有zk缓存metadataCache默认为5分钟,可以把时间调小为1毫秒,相当于没有cache
-
findCoordinator返回的必须是其他broker的IP
-
重启的必须是接收到findCoordinator请求那台broker,而不是真正的coordinator,这时会从zk删除节点
-
分区转移到其他broker,这时新的broker会重新读取zk节点数据
到此,我们基本上清楚了问题原因:连接关闭导致zk节点被删除了,别的broker节点需要时就读取不到了。那怎么解决?
三、问题解决
方案一
既然知道把消费者与FindCoordinator的连接进行绑定不合适的,那么是否应该把FindCoordinator写入zk节点换成由JoinGroup写入,断连即删除。
consumer统一由Coordinator管理,由于FindCoordinator接口不一定是Coordinator处理的,如果换成由Coordinator处理的JoinGroup接口是否就可以了,这样consumer断开与Coordinator的连接就应该删除数据。但实现验证时却发现,客户端在断连后也不会再重连,所以没法重新写入zk,不符合预期。
方案二
还是由FindCoordinator写入zk节点,但删除改为GroupCoordinator监听consumer断开触发。
因为consumer统一由Coordinator管理,它能监听到consumer加入或者离开。GroupCoordinator的removeMemberAndUpdateGroup方法是coordinator对consumer成员管理。
private void removeMemberAndUpdateGroup(GroupMetadata group,
MemberMetadata member) {
group.remove(member.memberId());
switch (group.currentState()) {
case Dead:
case Empty:
return;
case Stable:
case CompletingRebalance:
maybePrepareRebalance(group);
break;
case PreparingRebalance:
joinPurgatory.checkAndComplete(new GroupKey(group.groupId()));
break;
default:
break;
}
// 删除 /client_group_id/xxx 节点
deleteClientIdGroupMapping(group, member.clientHost(), member.clientId());
}
调用入口有两个,其中handleLeaveGroup是主动离开,onExpireHeartbeat是超时被动离开,客户端正常退出或者宕机都可以调用removeMemberAndUpdateGroup方法触发删除。
```
public CompletableFuture
String groupId,
String memberId
) {
return validateGroupStatus(groupId, ApiKeys.LEAVE_GROUP).map(error ->
CompletableFuture.completedFuture(error)
).orElseGet(() -> {
return groupManager.getGroup(groupId).map(group -> {
return group.inLock(() -> {
if (group.is(Dead) || !group.has(memberId)) {
return CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID);
} else {
...
// 触发删除消费者consumer
removeMemberAndUpdateGroup(group, member);
return CompletableFuture.completedFuture(Errors.NONE);
}
});
})
....
});
}
void onExpireHeartbeat(GroupMetadata group,
MemberMetadata member,
long heartbeatDeadline) {
group.inLock(() -> {
if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
log.info("Member {} in group
文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/12993.html