聊聊KafkaConsumer源码解析之ConsumerNetworkClient
一、Consumer 的使用
Consumer 的源码解析主要来看 KafkaConsumer,KafkaConsumer 是 Consumer 接口的实现类。KafkaConsumer 提供了一套封装良好的 API,开发人员可以基于这套 API 轻松实现从 Kafka 服务端拉取消息的功能,这样开发人员根本不用关心与 Kafka 服务端之间网络连接的管理、心跳检测、请求超时重试等底层操作,也不必关心订阅 Topic 的分区数量、分区副本的网络拓扑以及 Consumer Group 的 Rebalance 等 Kafka 具体细节,KafkaConsumer 中还提供了自动提交 offset 的功能,使的开发人员更加关注业务逻辑,提高了开发效率。
下面我们来看一个 KafkaConsumer 的示例程序: /** * @author: 微信公众号【老周聊架构】 */ public class KafkaConsumerTest { public static void main(String[] args) { Properties props = new Properties(); // kafka地址,列表格式为host1:port1,host2:port2,...,无需添加所有的集群地址,kafka会根据提供的地址发现其他的地址(建议多提供几个,以防提供的服务器关闭) 必须设置 props.put("bootstrap.servers", "localhost:9092"); // key序列化方式 必须设置 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化方式 必须设置 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("group.id", "consumer_riemann_test"); KafkaConsumer consumer = new KafkaConsumer<>(props); // 可消费多个topic,组成一个list String topic = "riemann_kafka_test"; consumer.subscribe(Arrays.asList(topic)); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { System.out.printf("offset = %d, key = %s, value = %s ", record.offset(), record.key(), record.value()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
从示例中可以看出 KafkaConsumer 的核心方法是 poll(),它负责从 Kafka 服务端拉取消息。核心方法的具体细节我想放在下一篇再细讲,关乎消费侧的客户端与 Kafka 服务端的通信模型。这一篇我们主要从宏观的角度来剖析下 Consumer 消费端的源码。 二、KafkaConsumer 分析
我们先来看下 Consumer 接口,该接口定义了 KafkaConsumer 对外的 API,其核心方法可以分为以下六类: subscribe () 方法:订阅指定的 Topic,并为消费者自动分配分区。 assign () 方法:用户手动订阅指定的 Topic,并且指定消费的分区,此方法 subscribe() 方法互斥。 poll () 方法:负责从服务端获取消息。 commit *() 方法:提交消费者已经消费完成的 offset。 seek *() 方法:指定消费者起始消费的位置。 pause ()、 resume () 方法:暂停、继续 Consumer,暂停后 poll() 方法会返回空。
我们先来看下 KafkaConsumer 的重要属性以及 UML 结构图。
clientId :Consumer 的唯一标识。groupId :消费者组的唯一标识。coordinator :控制着 Consumer 与服务端 GroupCoordinator 之间的通信逻辑,读者可以理解为 Consumer 与服务端 GroupCoordinator 通信的门面。keyDeserializer、valueDeserializer :key 和 value 的反序列化器。fetcher :负责从服务端获取消息。interceptors :ConsumerInterceptors 集合,ConsumerInterceptors.onConsumer() 方法可以在消息通过 poll() 方法返回给用户之前对其进行拦截或修改;ConsumerInterceptors.onCommit() 方法也可以在服务端返回提交 offset 成功的响应进行拦截或修改。client :ConsumerNetworkClient 负责消费者与 Kafka 服务端的网络通信。subscriptions :SubscriptionState 维护了消费者的消费状态。metadata :ConsumerMetadata 记录了整个 Kafka 集群的元信息。currentThread、refcount :分别记录的 KafkaConsumer 的线程 id 和重入次数三、ConsumerNetworkClient
ConsumerNetworkClient 在 NetworkClient 之上进行了封装,提供了更高级的功能和更易用的 API。
我们先来看下 ConsumerNetworkClient 的重要属性以及 UML 结构图。
client :NetworkClient 对象。unsent :缓冲队列。UnsentRequests 对象,该对象内部维护了一个 unsent 属性,该属性是 ConcurrentMap> ,key 是 Node 节点,value 是 ConcurrentLinkedQueue 。metadata :用于管理 Kafka 集群元数据。retryBackoffMs :在尝试重试对给定主题分区的失败请求之前等待的时间量,这避免了在某些故障情况下在紧密循环中重复发送请求。对应 retry.backoff.ms 配置,默认 100 ms。maxPollTimeoutMs :使用 Kafka 的组管理工具时,消费者协调器的心跳之间的预期时间。心跳用于确保消费者的会话保持活跃,并在新消费者加入或离开组时促进重新平衡。该值必须设置为低于 session.timeout.ms,但通常不应设置为高于该值的 1/3。它可以调整得更低,以控制正常重新平衡的预期时间。对应 heartbeat.interval.ms 配置,默认 3000 ms。构造函数中,maxPollTimeoutMs 取的是 maxPollTimeoutMs 与 MAX_POLL_TIMEOUT_MS 的最小值,MAX_POLL_TIMEOUT_MS 默认为 5000 ms。requestTimeoutMs :配置控制客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试用尽,则请求失败。对应 request.timeout.ms 配置,默认 305000 ms。wakeupDisabled :由调用 KafkaConsumer 对象的消费者线程之外的其它线程设置,表示要中断 KafkaConsumer 线程。lock :我们不需要高吞吐量,所以使用公平锁来尽量避免饥饿。pendingCompletion :当请求完成时,它们在调用之前被转移到这个队列。目的是避免在持有此对象的监视器时调用它们,这可能会为死锁打开门。pendingDisconnects :断开与协调器连接节点的队列。wakeup :这个标志允许客户端被安全唤醒而无需等待上面的锁。为了同时启用它,避免需要获取上面的锁是原子的。
ConsumerNetworkClient 的核心方法是 poll() 方法,poll() 方法有很多重载方法,最终会调用 poll(Timer timer, PollCondition pollCondition, boolean disableWakeup) 方法,这三个参数含义是:timer 表示定时器限制此方法可以阻塞多长时间;pollCondition 表示可空阻塞条件;disableWakeup 表示如果 true 禁用触发唤醒。
我们来简单回顾下 ConsumerNetworkClient 的功能: 3.1 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#trySend
循环处理 unsent 中缓存的请求,对每个 Node 节点,循环遍历其 ClientRequest 链表,每次循环都调用 NetworkClient.ready() 方法检测消费者与此节点之间的连接,以及发送请求的条件。若符合条件,则调用 NetworkClient.send() 方法将请求放入 InFlightRequest 中等待响应,也放入 KafkaChannel 中的 send 字段等待发送,并将消息从列表中删除。代码如下:long trySend(long now) { long pollDelayMs = maxPollTimeoutMs; // send any requests that can be sent now // 遍历 unsent 集合 for (Node node : unsent.nodes()) { Iterator iterator = unsent.requestIterator(node); if (iterator.hasNext()) pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, now)); while (iterator.hasNext()) { ClientRequest request = iterator.next(); // 调用 NetworkClient.ready()检查是否可以发送请求 if (client.ready(node, now)) { // 调用 NetworkClient.send()方法,等待发送请求。 client.send(request, now); // 从 unsent 集合中删除此请求 iterator.remove(); } else { // try next node when current node is not ready break; } } } return pollDelayMs; } 3.2 计算超时时间
如果没有请求在进行中,则阻塞时间不要超过重试退避时间。3.3 org.apache.kafka.clients.NetworkClient#poll判断是否需要更新 metadata 元数据调用 Selector.poll() 进行 socket 相关的 IO 操作处理完成后的操作(处理一系列 handle*() 方法处理请求响应、连接断开、超时等情况,并调用每个请求的回调函数)3.4 调用 checkDisconnects() 方法检测连接状态
调用 checkDisconnects() 方法检测连接状态。检测消费者与每个 Node 之间的连接状态,当检测到连接断开的 Node 时,会将其在 unsent 集合中对应的全部 ClientRequest 对象清除掉,之后调用这些ClientRequest 的回调函数。private void checkDisconnects(long now) { // any disconnects affecting requests that have already been transmitted will be handled // by NetworkClient, so we just need to check whether connections for any of the unsent // requests have been disconnected; if they have, then we complete the corresponding future // and set the disconnect flag in the ClientResponse for (Node node : unsent.nodes()) { // 检测消费者与每个 Node 之间的连接状态 if (client.connectionFailed(node)) { // Remove entry before invoking request callback to avoid callbacks handling // coordinator failures traversing the unsent list again. // 在调用请求回调之前删除条目以避免回调处理再次遍历未发送列表的协调器故障。 Collection requests = unsent.remove(node); for (ClientRequest request : requests) { RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback(); AuthenticationException authenticationException = client.authenticationException(node); // 调用 ClientRequest 的回调函数 handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().latestAllowedVersion()), request.callback(), request.destination(), request.createdTimeMs(), now, true, null, authenticationException, null)); } } } } 3.5 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#maybeTriggerWakeup
检查 wakeupDisabled 和 wakeup,查看是否有其它线程中断。如果有中断请求,则抛出 WakeupException 异常,中断当前 ConsumerNetworkClient.poll() 方法。public void maybeTriggerWakeup() { // 通过 wakeupDisabled 检测是否在执行不可中断的方法,通过 wakeup 检测是否有中断请求。 if (!wakeupDisabled.get() && wakeup.get()) { log.debug("Raising WakeupException in response to user wakeup"); // 重置中断标志 wakeup.set(false); throw new WakeupException(); } } 3.6 再次调用 trySend() 方法
再次调用 trySend() 方法。在步骤 2.1.3 中调用了 NetworkClient.poll() 方法,在其中可能已经将 KafkaChannel.send 字段上的请求发送出去了,也可能已经新建了与某些 Node 的网络连接,所以这里再次尝试调用 trySend() 方法。3.7 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#failExpiredRequests
处理 unsent 中超时请求。它会循环遍历整个 unsent 集合,检测每个 ClientRequest 是否超时,将过期请求加入到 expiredRequests 集合,并将其从 unsent 集合中删除。调用超时 ClientRequest 的回调函数 onFailure()。private void failExpiredRequests(long now) { // clear all expired unsent requests and fail their corresponding futures // 清除所有过期的未发送请求并使其相应的 futures 失败 Collection expiredRequests = unsent.removeExpiredRequests(now); for (ClientRequest request : expiredRequests) { RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback(); // 调用回调函数 handler.onFailure(new TimeoutException("Failed to send request after " + request.requestTimeoutMs() + " ms.")); } } private Collection removeExpiredRequests(long now) { List expiredRequests = new ArrayList<>(); for (ConcurrentLinkedQueue requests : unsent.values()) { Iterator requestIterator = requests.iterator(); while (requestIterator.hasNext()) { ClientRequest request = requestIterator.next(); // 检查是否超时 long elapsedMs = Math.max(0, now - request.createdTimeMs()); if (elapsedMs > request.requestTimeoutMs()) { // 将过期请求加入到 expiredRequests 集合 expiredRequests.add(request); requestIterator.remove(); } else break; } } return expiredRequests; } 四、RequestFutureCompletionHandler
说 RequestFutureCompletionHandler 之前,我们先来看下 ConsumerNetworkClient.send() 方法。里面的逻辑会将待发送的请求封装成 ClientRequest,然后保存到 unsent 集合中等待发送,代码如下:public RequestFuture send(Node node, AbstractRequest.Builder<?> requestBuilder, int requestTimeoutMs) { long now = time.milliseconds(); RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler(); ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true, requestTimeoutMs, completionHandler); // 创建 clientRequest 对象,并保存到 unsent 集合中。 unsent.put(node, clientRequest); // wakeup the client in case it is blocking in poll so that we can send the queued request // 唤醒客户端以防它在轮询中阻塞,以便我们可以发送排队的请求。 client.wakeup(); return completionHandler.future; }
我们重点来关注一下 ConsumerNetworkClient 中使用的回调对象——RequestFutureCompletionHandler。其继承关系如下:
从 RequestFutureCompletionHandler 继承关系图我们可以知道,它不仅实现了 RequestCompletionHandler 接口,还组合了 RequestFuture 类,RequestFuture 是一个泛型类,其核心字段与方法如下:listeners :RequestFutureListener 队列,用来监听请求完成的情况。RequestFutureListener 接口有 onSuccess() 和 onFailure () 两个方法,对应于请求正常完成和出现异常两种情况。isDone() :表示当前请求是否已经完成,不管正常完成还是出现异常,此字段都会被设置为 true。value() :记录请求正常完成时收到的响应,与 exception() 方法互斥。此字段非空表示正常完成,反之表示出现异常。exception() :记录导致请求异常完成的异常类,与 value() 互斥。此字段非空则表示出现异常,反之则表示正常完成。
我们之所以要分析源码,是因为源码中有很多设计模式可以借鉴,应用到你自己的工作中。RequestFuture 中有两处典型的设计模式的使用,我们来看一下:compose() 方法:使用了适配器模式。chain() 方法:使用了责任链模式。4.1 RequestFuture.compose()/** * 适配器 * Adapt from a request future of one type to another. * * @param Type to adapt from * @param Type to adapt to */ public abstract class RequestFutureAdapter { public abstract void onSuccess(F value, RequestFuture future); public void onFailure(RuntimeException e, RequestFuture future) { future.raise(e); } } /** * RequestFuture 适配成 RequestFuture * Convert from a request future of one type to another type * @param adapter The adapter which does the conversion * @param The type of the future adapted to * @return The new future */ public RequestFuture compose(final RequestFutureAdapter adapter) { // 适配之后的结果 final RequestFuture adapted = new RequestFuture<>(); // 在当前 RequestFuture 上添加监听器 addListener(new RequestFutureListener() { @Override public void onSuccess(T value) { adapter.onSuccess(value, adapted); } @Override public void onFailure(RuntimeException e) { adapter.onFailure(e, adapted); } }); return adapted; }
使用 compose() 方法进行适配后,回调时的调用过程,也可以认为是请求完成的事件传播流程。当调用 RequestFuture 对象的 complete() 或 raise() 方法时,会调用 RequestFutureListener 的 onSuccess() 或 onFailure() 方法,然后调用 RequestFutureAdapter 的对应方法,最终调用RequestFuture 对象的对应方法。
4.2 RequestFuture.chain()
chain() 方法与 compose() 方法类似,也是通过 RequestFutureListener 在多个 RequestFuture 之间传递事件。代码如下:public void chain(final RequestFuture future) { // 添加监听器 addListener(new RequestFutureListener() { @Override public void onSuccess(T value) { // 通过监听器将 value 传递给下一个 RequestFuture 对象 future.complete(value); } @Override public void onFailure(RuntimeException e) { // 通过监听器将异常传递给下一个 RequestFuture 对象 future.raise(e); } }); }
好了,ConsumerNetworkClient 的源码分析告一段落了,希望文章对你有帮助,我们下期再见。
追梦一线职工风采录追梦一线职工风采录原标题专攻卡脖子难题从一名职校生成长为全国五一劳动奖章获得者安徽省劳模工匠创新工作室带头人,27岁的邱军强用技能擦亮人生实现逆袭。赵春青绘2014年,18岁的邱军
上海富豪张旭豪33岁将公司出售给马云,套现665亿,创造新奇迹随着网络时代的到来,一台手机就可以解决一切问题,甚至可以在家里吃到各种各样的美味佳肴,这也是因为网络外卖市场的蓬勃发展,美团和饿了么成为了其中的佼佼者。饿了么的创始人张旭豪,在33
肥胖与大脑发育,有关系吗?在美国,大约有2500万儿童超重或肥胖。一项新的研究探讨了超重或肥胖如何影响儿童的大脑发育。有关这项研究的报告本月发表在美国医学会(AmericanMedicalAssociati
坚持带孩子做视力训练,裸眼视力越来越高!自从发现孩子近视后,我就各种查资料看经验问大夫,带着孩子积极防控,除了基本的户外和减少用眼,每天还会坚持室內练眼,没想到坚持那么久真的管用,姑娘的裸眼视力从之前的4。3到现在的5。
呕吐腹泻近期流行的诺如病毒,怎么预防?2岁的田田(化名)最近几天频繁呕吐和腹泻,还吐出了黄色胆汁。随后,田田的姐姐妈妈和爸爸也出现恶心呕吐等不适症状,这是怎么回事?难道是全家人都吃坏肚子了?正值冬春交际,医院儿科门急诊
10年游戏从业者良心建议梦手尽量选择正式服作为一个游戏行业的从业者,小编长期浸泡在各大游戏论坛,在密切关注着圈内热点时事的同时,试图提出一些值得讨论的想法与观点,从而更好地与玩家们进行对话与碰撞。那么今天,小编想和各位聊聊
原神探索收集须弥千壑沙地观景点收集原神探索收集须弥千壑沙地观景点收集注以下内容均为作者个人观点,不代表官方态度,如有其他想法,欢迎评论区留言本篇为大家带来的是须弥千壑沙地的17个观景点收集,解锁图鉴的同时也可以欣赏
冠军野辅火力全开!UP21战胜OMG,宝蓝拿下回归后的首个MVPLPL春季常规赛第六周第2个比赛日,33的OMG对阵25的UP,这两支队伍目前的联赛排名差距并不大,且最近都有点起伏不定。双方首局就上演了一场翻中翻,UP在前中期掌控了绝对的优势,
有这3个坏习惯,肾病反而不容易恶化!如果有,恭喜你在一些肾友的眼里,我今天要跟大家分享的,是3个坏习惯,可能还是特别坏的习惯!但实际上,我要说的这些坏习惯,不但不坏,还在无形中帮助了许多肾友获得更好的康复效果。如果有,那么,我要恭
百度发布2022年第四季度财报营收330。8亿元,净利润49。53亿元Tech星球2月22日消息,今日下午,百度集团公布2022年第四季度及全年财报。百度第四季度营收330。8亿元,去年同期331亿元,市场预期320。12亿元净利润49。53亿元,去
第一只克隆羊多莉,后来如何了?变成了怪物?物种繁衍的方式多种多样,有的超越人类的认知,成为科学家重要的研究对象。克隆羊多莉便是科学家针对于无性繁殖的研究成果,是世界上第一只克隆羊,它的结局如何?多莉与它的创造者有的人说它变
糖尿病能吃桃子么?感谢邀请。桃子这么香甜,含糖量一定很多吧?但糖友可以吃桃子的,是不是很开心)桃子中的主要糖分是果糖,当然也有葡萄糖和蔗糖。果糖含量更丰富代表了它更香甜,因为果糖的甜度比蔗糖葡萄糖还
中国银行与中国人民银行有什么区别?中国银行,与中国人民银行,看起来就差了两个字,而实际上有着天壤之别。我就在银行上班,给大家直接上干货。中国银行我国有4000家银行,我们平时里了解最多的就是六大国有银行。中国银行,
你身边有退休的人吗?退休的拿两万多的工资高吗?有我老父亲军队离休干部我老公军队退休干部。他们的离(退)休金都是两万多,应该算收入高的了。老父亲1943的八路军司务长,抗战解放战爭共立三等功四次,四等功二次。84年正团(享付师)
阎锡山权力欲望狂潮中如何守住江山?阎锡山是中国近代史上的一位传奇人物。他出身于一个贵族家庭,从小就接受了传统文化教育和军事训练。他不仅具备出色的领导才能和战斗力,还拥有着顽强的意志和雄厚的经济实力。这些优势成为他最
唐代丧葬奢靡至极,死要葬北邙,厚葬风气到底为何?引言中国四大文明古国之一,文明未曾中断。对于中国人而言,生死乃是人生大事。出生时的满月酒,离去时的身后事,都格外的重视。尤其是死后的丧葬场面,上至帝王贵族,下到地主平民,都很是讲究
清代茶叶土引考论摘要土引是清代独有的国家茶叶分配制度,只发行于四川西部天全一州,在打箭炉定销售。它出现于清康熙年间,清朝正式将打箭炉地区编入一统志之后,卫藏营官控制打箭炉茶叶贸易结束。土引行土司,
致敬英雄任长霞剑胆琴心,保境安民在阅读此文前,诚邀您右上角点击一下关注任长霞,图源自网络2004年4月17日,河南登封,万人空巷。长4公里宽60米的少林大道鲜花如潮挽幛如云。20万名群众迸发出一片哭喊声长霞,一路
1945年他救下一日本女俘,娶做老婆,42年后儿子在日本获百亿家产问题如何才能每天读到这样的故事?回答点击右上方关注小编头条号!1931年9月18日,日本悍然发动九一八事变,侵占我东三省,自此中国人民开始14年抗战历程。1937年7月7日,卢沟桥
推荐2500左右的手机?2500元价位想要买到旗舰机是不现实了,入门级的高端机都还要使劲打折才能买到,机会渺茫。所以想要简单粗暴一点的话,那就直接选择一些高性价比的手机,目前双十一也快到了,各家给出优惠幅
荣耀再发5G大屏超长续航新机!你满意吗?荣耀X30Max终于发布了,性能部分,采用了联发科天玑900处理器,屏幕方面,7。09英寸LCD水滴屏,RGBW显示方案,不支持高刷新率,拍照方面,前置800万像素摄像头,后置双摄
妻管严ampquot是种什么体验?有一个妻管严的老婆很好的,可以叮嘱男人少抽烟少喝酒,在外面不要粘花惹草,让男人有责任感,女人才会有安全感。男人愿意被老婆管是好事,说明两个人感情好,都是为了家庭的幸福,有智慧的男人