聊聊KafkaProducer源码解析
一、前言
前面几篇我们讲了关于 Kafka 的基础架构以及搭建,从这篇开始我们就来源码分析一波。我们这用的 Kafka 版本是 2.7.0,其 Client 端是由 Java 实现,Server 端是由 Scala 来实现的,在使用 Kafka 时,Client 是用户最先接触到的部分,因此,我们从 Client 端开始,会先从 Producer 端开始,今天我们就来对 Producer 源码解析一番。 二、Producer 使用
首先我们先通过一段代码来展示 KafkaProducer 的使用方法。在下面的示例中,我们使用 KafkaProducer 实现向 Kafka 发送消息的功能。在示例程序中,首先将 KafkaProduce 使用的配置写入
到 Properties 中,每项配置的具体含义在注释中进行解释。之后以此 Properties 对象为参数构造 KafkaProducer 对象,最后通过 send 方法完成发送,代码中包含同步发送、异步发送两种情况。
从上面的代码可以看出 Kafka 为用户提供了非常简洁方便的 API,在使用时,只需要如下两步: 初始化 KafkaProducer 实例 调用 send 接口发送数据
本文主要是围绕着初始化 KafkaProducer 实例与如何实现 send 接口发送数据而展开的。 三、KafkaProducer 实例化
了解了 KafkaProducer 的基本使用,然后我们来深入了解下方法核心逻辑: public KafkaProducer(Properties properties) { this(Utils.propsToMap(properties), (Serializer)null, (Serializer)null, (ProducerMetadata)null, (KafkaClient)null, (ProducerInterceptors)null, Time.SYSTEM); }
四、消息发送过程
用户是直接使用 producer.send() 发送的数据,先看一下 send() 接口的实现 // 异步向一个 topic 发送数据 public Future send(ProducerRecord record) { return this.send(record, (Callback)null); } // 向 topic 异步地发送数据,当发送确认后唤起回调函数 public Future send(ProducerRecord record, Callback callback) { ProducerRecord interceptedRecord = this.interceptors.onSend(record); return this.doSend(interceptedRecord, callback); }
数据发送的最终实现还是调用了 Producer 的 doSend() 接口。
4.1 拦截器
首先方法会先进入拦截器集合 ProducerInterceptors , onSend 方法是遍历拦截器 onSend 方 法,拦截器的目的是将数据处理加工, Kafka 本身并没有给出默认的拦截器的实现。如果需要使用拦截器功能,必须自己实现接口。
4.1.1 拦截器代码
4.1.2 拦截器核心逻辑
ProducerInterceptor 接口包括三个方法: onSend(ProducerRecord var1) :该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中的。 确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算。onAcknowledgement(RecordMetadata var1, Exception var2) :该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在 producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率。close() :关闭 interceptor,主要用于执行一些资源清理工作。
拦截器可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。
4.2 Producer 的 doSend 实现
下面是 doSend() 的具体实现:
在 doSend() 方法的实现上,一条 Record 数据的发送,主要分为以下五步: 确认数据要发送到的 topic 的 metadata 是可用的(如果该 partition 的 leader 存在则是可用的,如果开启权限时,client 有相应的权限),如果没有 topic 的 metadata 信息,就需要获取相应的 metadata; 序列化 record 的 key 和 value; 获取该 record 要发送到的 partition(可以指定,也可以根据算法计算); 向 accumulator 中追加 record 数据,数据会先进行缓存; 如果追加完数据后,对应的 RecordBatch 已经达到了 batch.size 的大小(或者 batch 的剩余空间不足以添加下一条 Record),则唤醒 sender 线程发送数据。
数据的发送过程,可以简单总结为以上五点,下面会这几部分的具体实现进行详细分析。 五、消息发送过程
5.1 获取 topic 的 metadata 信息
Producer 通过 waitOnMetadata() 方法来获取对应 topic 的 metadata 信息,这块内容我下一篇再来讲。
5.2 key 和 value 的序列化
Producer 端对 record 的 key 和 value 值进行序列化操作,在 Consumer 端再进行相应的反序列化,Kafka 内部提供的序列化和反序列化算法如下图所示:
当然我们也是可以自定义序列化的具体实现,不过一般情况下,Kafka 内部提供的这些方法已经足够使用。
5.3 获取该 record 要发送到的 partition
获取 partition 值,具体分为下面三种情况: 指明 partition 的情况下,直接将指明的值直接作为 partiton 值; 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值; 既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。
具体实现如下: // 当 record 中有 partition 值时,直接返回,没有的情况下调用 partitioner 的类的 partition 方法去计算(KafkaProducer.class) private int partition(ProducerRecord record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
Producer 默认使用的 partitioner 是 org.apache.kafka.clients.producer.internals.DefaultPartitioner,用户也可以自定义 partition 的策略,下面是默认分区策略具体实现: public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { return this.partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size()); } public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) { return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; }
上面这个默认算法核心就是粘着分区缓存
5.4 向 RecordAccmulator 中追加 record 数据
我们讲 RecordAccumulator 之前先看这张图,这样的话会对整个发送流程有个大局观。
RecordAccmulator 承担了缓冲区的角色。默认是 32 MB。
在 Kafka Producer 中,消息不是一条一条发给 broker 的,而是多条消息组成一个 ProducerBatch,然后由 Sender 一次性发出去,这里的 batch.size 并不是消息的条数(凑满多少条即发送),而是一个大小。默认是 16 KB,可以根据具体情况来进行优化。
在 RecordAccumulator 中,最核心的参数就是: private final ConcurrentMap> batches;
它是一个 ConcurrentMap,key 是 TopicPartition 类,代表一个 topic 的一个 partition。value 是一个包含 ProducerBatch 的双端队列。等待 Sender 线程发送给 broker。画张图来看下:
上面的代码不知道大家有没有疑问?分配内存的代码为啥不在 synchronized 同步块中分配?导致下面的 synchronized 同步块中还要 tryAppend 一下。
因为这时候可能其他线程已经创建好 ProducerBatch 了,造成多余的内存申请。
如果把分配内存放在 synchronized 同步块会有什么问题?
内存申请不到线程会一直等待,如果放在同步块中会造成一直不释放 Deque 队列的锁,那其他线程将无法对 Deque 队列进行线程安全的同步操作。
再跟下 tryAppend() 方法,这就比较简单了。
以上代码见图解:
5.5 唤醒 sender 线程发送 ProducerBatch
当 record 写入成功后,如果发现 ProducerBatch 已满足发送的条件(通常是 queue 中有多个 batch,那么最先添加的那些 batch 肯定是可以发送了),那么就会唤醒 sender 线程,发送 ProducerBatch。
sender 线程对 ProducerBatch 的处理是在 run() 方法中进行的,该方法具体实现如下:
其中比较核心的方法是 run() 方法中的 org.apache.kafka.clients.producer.internals.Sender#sendProducerData
其中 pollTimeout 意思是最长阻塞到至少有一个通道在你注册的事件就绪了。返回 0 则表示走起发车了。
我们继续跟下:org.apache.kafka.clients.producer.internals.RecordAccumulator#ready
最后再来看下里面这个方法 org.apache.kafka.clients.producer.internals.RecordAccumulator#drain,从accumulator 缓冲区获取要发送的数据,最大一次性发 max.request.size 大小的数据。
六、总结
最后为了让你对 Kafka Producer 有个宏观的架构理解,请看下图:
简要说明: new KafkaProducer() 后创建一个后台线程 KafkaThread (实际运行线程是 Sender,KafkaThread 是对 Sender 的封装) 扫描 RecordAccumulator 中是否有消息。 调用 KafkaProducer.send() 发送消息,实际是将消息保存到 RecordAccumulator 中,实际上就是保存到一个 Map 中 (ConcurrentMap>),这条消息会被记录到同一个记录批次 (相同主题相同分区算同一个批次) 里面,这个批次的所有消息会被发送到相同的主题和分区上。 后台的独立线程扫描到 RecordAccumulator 中有消息后,会将消息发送到 Kafka 集群中 (不是一有消息就发送,而是要看消息是否 ready) 如果发送成功 (消息成功写入 Kafka), 就返回一个 RecordMetaData 对象,它包括了主题和分区信息,以及记录在分区里的偏移量。 如果写入失败,就会返回一个错误,生产者在收到错误之后会尝试重新发送消息 (如果允许的话,此时会将消息在保存到 RecordAccumulator 中),几次之后如果还是失败就返回错误消息。
好了,本文对 Kafka Producer 源码进行了解析,下一篇文章将会详细介绍 metadata 的内容以及在 Producer 端 metadata 的更新机制。敬请期待~
进口韭菜相信大家都有看到特斯拉多次因为失控事故登上热搜。每次特斯拉出现疑似刹车失灵的事件发生,都有各种各样的原因猜测,作为旁观者,我们也无法确定是谁的问题。但讽刺的是,当事故发生后,车企与
即将登场的旗舰手机,一个比一个能打手机圈最火的新品,它说第二没人敢说第一。就是它骁龙8Gen2!按照惯例,每次骁龙旗舰芯片发布,各大手机厂商都要来凑一凑热闹,今年也是一样。我们一个一个来看。vivoX90系列按时间
iPhone14Plus面板12月采购量接近0!这一代iPhone为何不好卖了?文名动科技近日,一则消息引起了手机圈的讨论,即iPhone14Plus的面板出货量在12月接近于零。该消息一出,便引起了许多用户的讨论,而之所以会出现这般情况,无非是两个原因,一是
工信部我国5G工业互联网进入规模发展新阶段2022中国5G工业互联网大会20日在湖北武汉开幕。工业和信息化部负责人表示,我国5G工业互联网将进入由起步探索向规模发展的新阶段。据了解,我国5G工业互联网创新发展进入快车道,已
腾讯市值跌破200亿,段永平6次抄底都被套了?这是怎么回事?简介在目前的互联网行业中,腾讯在社交软件和游戏方面都做得非常好,这进一步促进了腾讯的发展,巩固了其在行业中的地位。但最近,很多网友说,腾讯市值跌破2000亿,段永平前后暴跌6次,却
家居业一周丨索菲亚收购司米厨柜剩余49股权敏华控股发中报过去的一周,家居企业动态不断。索菲亚以3。43亿元收购司米厨柜剩余49股权火星人厨具称,双十一订单额达4。86亿元,私域流量渠道快速发展敏华控股发布2023财年中报公告,实现总收益
京东方这一轮液晶周期见底了吗?头条创作挑战赛11月财经新势力中国产业名片关注京东方股票的朋友,一定都知道京东方股价和液晶周期的周期性波动强相关。在某种意义上,京东方可以算是市场少有的高科技周期股。液晶周期(Cr
原神原胚别乱用!23把锻造武器使用指南能锻造的武器已经多达23把了,其中哪些值得锻造,又分别适合什么角色?本期就为大家全面分析一下,原胚别乱用,全锻造武器使用指南。阅前提醒本文更适用于武器储备少的萌新,能为你节省资源,
苹果手机iOS16景深效果壁纸使用指南iOS16景深效果使壁纸主体的一小部分出现在锁屏时钟前面,营造出漂亮的视觉风格。如果你的iPhone并没有景深效果,那么本文将为你指点迷津。什么是景深效果?运行iOS16的iPho
从入门到吃灰,威联通NAS使用指南0引子感觉近年来越来越多的小伙伴打算或者已经入了NAS的坑,讲道理来说个人觉得NAS对于家用用户来说最大的意义就是解放各类设备的存储空间,尤其是手机PC等,毕竟iPhone13Pr
超实用新手化妆刷使用指南,在座各位都能学会Hi美少女们大家好呀最近有宝宝给我们留言说感觉自己的化妆步骤流程都对了,但是化完妆总是看起来显脏不够精致,也不清楚问题出在哪里。其实,影响妆容精致度的很大一个原因就是上妆工具和手法
七星关区海棠花开正艳来源人民网贵州频道七星关区海棠花开正艳。七星关区海棠花开正艳。七星关区海棠花开正艳。七星关区海棠花开正艳。七星关区海棠花开正艳。七星关区海棠花开正艳。七星关区海棠花开正艳。七星关区
在社交平台上走红,西海岸新区上沟村花开醉人半岛全媒体记者李晓哲眼下,又到了樱桃花杏花漫山遍野肆意开放的季节。从3月中旬开始,位于青岛西海岸新区杨家山里上沟村,被漫山遍野的樱桃花杏花装扮一新,惹得游人醉。3月19日,记者驱车
向风而行番外40顾南亭,现在是合法夫妻专属的查岗时间!本文由猫猫知道你的心事原创首发,未经允许不得搬运,如果发现必追究。原创不易,谢谢理解。顾南亭程霄故意用撒娇的语气给顾南亭发了一条语音,我好想你呀她觉得不过瘾,又追加了一句。她也不想
華韻和風ampampquot时间与空间ampampquot合作展25日开幕!日中通信讯華韻和風是时间与空间的合作展,25日上午10点将在中友好会馆开幕!这个展览的宗旨是希望通过插花汉俳与和歌,让更多的人有机会体验中国和日本的文化。各美其美,美人之美,美美与
人在花中游3月21日,游人在江苏省兴化市千垛菜花景区乘船游览(无人机照片)。最近,我国多地春暖花开,人们纷纷走到户外,赏花踏青,享受春光。新华社发(孟德龙摄)3月21日,游人在江苏省兴化市千
幻狮王著作外星人的高科技碗外星人的高科技碗旺秋札巴(幻狮王)著恍如隔世,感念今昔生死相续,再结前缘。雪域圣地遇圣缘(五)外星人的碗,饮食自生小小说幻狮王著蓝衫客在江贝喇嘛的安排下,乘坐西藏当时的普通交通工具
北京有一趟列车,发往莫斯科了,满载了许多东西你知道吗?最近,北京有一趟列车,发往莫斯科了,而且,满载了许多东西!真的假的?当然是真的啦!有图有真相!不信,请看上图,上图你看到的,即为最近从北京发出的一趟列车。这趟列车的首发地
到2025年南昌将打造国内一流水平的精品民宿集聚群江南都市报讯全媒体记者段萍报道3月15日,南昌市人民政府办公室印发关于加快南昌市民宿健康发展的实施意见,记者了解到,到2024年,南昌市将培育建设3至5个国家等级旅游民宿到2025
以ChatGPT为代表的AI服务会是未来新型消费的增长点吗?随着科技的不断发展,人工智能技术在各行各业得到了应用,其中以ChatGPT为代表的人工智能技术成为了新的研究热点。ChatGPT是OpenAI公司的一项开放式人工智能技术,它可以为
聊聊今年的互联网裁员属于IT的行业很多,为什么互联网裁员这么凶,而且喜欢裁35以上员工。1,互联网裁员凶,是因为互联网薪资明显高于其他IT行业,吸引了大量人员进入互联网,而一个行业的发展通常都是波动的
虚拟与现实,数字与科技,在杭州这里碰撞出精彩火花3月21日22日,中国虚拟现实与元宇宙产业峰会暨XRMA第一次全体会议在杭州西湖区文三数字生活街区0101Park举行,本届峰会是第一届虚拟现实与元宇宙产业分会。全国200多家虚拟