范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

Kafka生产者消息发送流程,同步异步发送API

  生产者消息发送流程
  发送原理
  Kafka的Producer发送消息采用的是异步发送的方式。
  在消息发送的过程中,涉及到了两个线程:main线程和Sender线程,以及一个线程共享变量:RecordAccumulator。
  ①main线程中创建了一个双端队列RecordAccumulator,将消息发送给RecordAccumulator。
  ②Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
  batch.size: 只有数据积累到batch.size之后,sender才会发送数据。默认16k
  linger.ms: 如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。
  0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据,Leader收到数据后应答。-1 ( al1) :生产者发送过来的数据,Leader和和ISR队列里面的所有节点收齐数据后应答。-1和al1等价。
  生产者重要参数列表
  bootstrap.servers: 生产者连接集群所需的broker地址清单。可以设置1个或者多个,中间用逗号隔开。生产者从给定的broker里查找到其他broker信息。
  key.serializer、 value.serializer: 指定发送消息的key和value的序列化类型。要写全类名。(反射获取)
  buffer.memory: RecordAccumulator缓冲区总大小,默认32m。
  batch.size: 缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
  linger.ms: 如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。
  acks:
  0:生产者发送过来的数据,不需要等数据落盘应答。
  1:生产者发送过来的数据,Leader数据落盘后应答。
  -1(all):生产者发送过来的数据,Leader和isr队列里面的所有节点数据都落盘后应答。默认值是-1
  max.in.flight.requests.per.connection: 允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。
  Retries(重试): 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
  retry.backoff.ms: 两次重试之间的时间间隔,默认是100ms。
  enable.idempotence: 是否开启幂等性,默认true,开启幂等性。
  compression.type 生产者发送的所有数据的压缩方式。默认是none,不压缩。 支持压缩类型:none、gzip、snappy、lz4和zstd。
  异步发送API
  普通异步发送
  需求:创建Kafka生产者,采用异步的方式发送到Kafka broker
  异步发送流程如下:
  batch.size: 只有数据积累到batch.size之后,sender才会发送数据。默认16k
  linger.ms: 如果数据迟迟未达到batch.size,sender等待lingerms设置的时间到了之后就会发送数据。单位ms,默认值是Oms,表示没有延迟。
  代码编写
  1)创建工程kafka-demo
  2)导入依赖
  org.apache.kafka
  kafka-clients
  3.0.0
  
  
  3)创建包名:com.taohua.kafka.producer
  4)编写代码:不带回调函数的API
  package com.taohua.kafka.producer;
  import org.apache.kafka.clients.producer.KafkaProducer;
  import org.apache.kafka.clients.producer.ProducerConfig;
  import org.apache.kafka.clients.producer.ProducerRecord;
  import java.util.Properties;
  public class CustomProducer {
  public static void main(String[] args) throws InterruptedException {
  // 1. 创建kafka生产者的配置对象
  Properties properties = new Properties();
  // 2. 给kafka配置对象添加配置信息
  properties.put("bootstrap.servers","hadoop102:9092");
  // key,value序列化
  properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  // 3. 创建kafka生产者对象
  KafkaProducer kafkaProducer = new KafkaProducer(properties);
  // 4. 调用send方法,发送消息
  for (int i = 0; i < 10; i++) {
  kafkaProducer.send(new ProducerRecord<>("first","kafka" + i));
  }
  // 5. 关闭资源
  kafkaProducer.close();
  }
  }
  5)测试:
  在hadoop102上开启kafka消费者
  [atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
  1
  在IDEA中执行上述代码,观察hadoop102消费者输出   [atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first   kafka0   kafka1   kafka2   kafka3   ……   带回调函数的异步发送   回调函数callback()会在producer收到ack时调用,为异步调用。   该方法有两个参数分别是RecordMetadata(元数据信息)和Exception(异常信息)。   ·如果Exception为null,说明消息发送成功,   ·如果Exception不为null,说明消息发送失败。   带回调函数的异步调用发送流程   batch.size: 只有数据积累到batch.size之后,sender才会发送数据。默认16k   linger.ns: 如果数据迟迟未达到batch.size,sender等待linger:ms设置的时间到了之后就会发送数据。单位ms,默认值是Oms,表示没有延迟。   编写代码:带回调函数的生产者   package com.taohua.kafka.producer;   import org.apache.kafka.clients.producer.*;   import java.util.Properties;   public class CustomProducerCallback {   public static void main(String[] args) throws InterruptedException {   // 1. 创建kafka生产者的配置对象   Properties properties = new Properties();   // 2. 给kafka配置对象添加配置信息   properties.put("bootstrap.servers", "hadoop102:9092");   properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");   // key,value序列化(必须)   properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");   properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");   // 3. 创建kafka生产者对象   KafkaProducer kafkaProducer = new KafkaProducer(properties);   // 4. 调用send方法,发送消息   for (int i = 0; i < 10; i++) {   // 添加回调   kafkaProducer.send(new ProducerRecord<>("first", "kafka" + i), new Callback() {   // 该方法在Producer收到ack时调用,为异步调用   @Override   public void onCompletion(RecordMetadata metadata, Exception exception) {   if (exception == null)   // 没有异常,输出信息到控制台   System.out.println("主题"+recordMetadata.topic() +", 分区:"+recordMetadata.partition()+", 偏移量:"+recordMetadata.offset());   }   });   }   // 5. 关闭资源   kafkaProducer.close();   }   }   测试   1)在hadoop102上开启kafka消费者   [atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first   1
  2)在IDEA中执行代码,观察hadoop102消费者输出   [atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first   kafka0   kafka1   kafka2   ……   3)在IDEA控制台观察回调信息   主题first, 分区:0, 偏移量:10   主题first, 分区:0, 偏移量:11   主题first, 分区:0, 偏移量:12   主题first, 分区:0, 偏移量:13   主题first, 分区:0, 偏移量:14   主题first, 分区:0, 偏移量:15   主题first, 分区:0, 偏移量:16   主题first, 分区:0, 偏移量:17   主题first, 分区:0, 偏移量:18   主题first, 分区:0, 偏移量:19   ……   同步发送API   同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。   由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可。   同步发送流程示意图如下:   batch.size: 只有数据积累到batch.size之后,sender才会发送数据。默认16k   linger.ns: 如果数据迟迟未达到batch.size,sender等待linger:ms设置的时间到了之后就会发送数据。单位ms,默认值是Oms,表示没有延迟。   编写代码:同步发送消息的生产者   package com.atguigu.kafka.producer;   import org.apache.kafka.clients.producer.KafkaProducer;   import org.apache.kafka.clients.producer.ProducerConfig;   import org.apache.kafka.clients.producer.ProducerRecord;   import java.util.Properties;   import java.util.concurrent.ExecutionException;   public class ConsumerProducerSync {   public static void main(String[] args) throws InterruptedException, ExecutionException {   // 1. 创建kafka生产者的配置对象   Properties properties = new Properties();   // 2. 给kafka配置对象添加配置信息   //properties.put("bootstrap.servers","hadoop102:9092");   properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");   // key,value序列化(必须)   properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");   properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");   // 3. 创建kafka生产者对象   KafkaProducer kafkaProducer = new KafkaProducer(properties);   // 4. 调用send方法,发送消息   for (int i = 0; i < 10; i++) {   // 同步发送   kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();   }   // 5. 关闭资源   kafkaProducer.close();   }   }   测试   1)在hadoop102上开启kafka消费者   [atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first   1
  2)在IDEA中执行代码,观察hadoop102消费者的消费情况   [atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first   kafka0   kafka1   kafka2   ……

敦煌石窟的前世今生(7)青甘大环线七终现梦中敦煌,多少往事重来敦者,大也煌者,盛也。可以想象敦煌曾经是一个怎样的繁华的都市。从东汉到大唐,敦煌都是通往西域的重要关口,特别是盛唐时期,这里是西安西出祁连的要塞,也是西济南万竹园的前世今生!济南趵突泉公园里有一个园中之园,她就是万竹园。如今万竹园的泉水穿庭廊檐雕花亭台石刻竹林幽静,各所门前两侧石狮门当雕刻精绝保护完美,以不同植物命名的庭院各具特色。济南特有的姜牯石修建水滴公司入选北京市市级企业技术中心近日,北京市经济和信息化局正式公布2022年度第二批新创建的北京市市级企业技术中心名单。经过对申报材料的严格审查评审及公示,凭借长期打造的技术领先优势和对保险产业的赋能带动作用,水拥有一个这样的情人,是前世修来的福分文安冬悦红尘纷扰,世事繁杂,越简单纯粹的东西,越难以获得。比如一个知心爱人,一份真挚情感。为什么现在的人,总觉得孤独寂寞心灵空虚?原因很简单,因为现在的人身在一个最好的时代,也身在走在这条街上,任何一个路人,都可能是我的前世丨周末读诗不知这条街被命名为什么,不知别人叫它什么,我可以在卫星地图上搜索,或在公交站台读到,但即使知道了,我也记不住,即使记住了,对于我也不意味什么。当我第一次来到,这条街便说出了它的名字繁星互娱温奕心一路生花唱响春节舞台,励志旋律点亮新春华彩兔年来到,举国共庆欢腾,在中国人情感浓度最为丰沛的新春佳节里,各大晚会舞台上的欢歌衬托着喜悦心情,熟悉又温柔的旋律鼓舞着人们矢志向前。过去两年在全网爆火的一路生花,今年也在各大卫视亮相央视春晚的碇步桥,就在这个小镇在前不久央视2023年春节联欢晚会上,舞蹈碇步桥惊艳亮相。溪水淙淙,江南女子踏石而行从桥上缓缓走来,让人仿佛置身烟雨江南。(图片来源诗画浙江文旅资讯)这场舞蹈灵感源于浙江省温州市泰文字新疆深山河床戈壁沙漠用心寻找新疆遍地是宝石(新疆的和田青白玉)1。hr没事的时候,你可以找个时间,趁休假期亲自到新疆来溜哒一下。不论北疆南疆还是东疆,如果条件允许就一定要下车走路,顺便到山石里河滩上田地里和戈壁滩上看一看踩蓝山科技扭亏为盈靠价涨,竞争能力待考量2007年12月31日,国务院办公厅发布关于限制生产销售使用塑料购物袋的通知。这份被群众称为限塑令的通知明确规定自2008年6月1日起,在所有超市商场集贸市场等商品零售场所实行塑料1000亿基金群!西安意欲何为?文游石大招来了。最近,西安出台方案,提出建立总规模不低于1000亿元的重点产业链基金集群。这是2023年八个方面重点任务分工推进方案(下简称方案)里,第一方面着力壮大支柱产业和新兴各省份GDP传递了啥信号1月份,浙江省宁波市舟山港金塘港区大浦口码头集装箱吞吐量19。28万标箱,同比增长59。04,实现了新年首月开门红。图为1月31日,宁波舟山港金塘港区大浦口码头灯火辉煌集卡穿梭奔忙
多国宣布减产石油背后调整产量对冲美元影响,撬动定价权?原油市场地震。4月2日,多个产油国宣布,从5月起至2023年年底自愿削减原油产量。受上述消息影响,国际原油期货价格随之上扬。为何此时集体宣布减产?将带来哪些影响?为何引来美国政府的一季度持续引领行业!三大密码助长沙造大块头热销长沙晚报掌上长沙4月3日讯(通讯员李家宇全媒体记者周辉霞)紧紧围绕制造业高端化智能化绿色化发展,加强制造向创造的转变,长沙在大国重器的创新研发上从来是不畏艰难锐意进取。记者今天从中立方早知道突发!欧佩克成员国集体宣布减产河南出台区块链产业方案沙钢收购南京钢联遭截胡第121期(2023年4月3日)深夜突发!欧佩克成员国集体宣布减产当地时间周日(4月2日),石油输出国组织(欧佩克)的多个成员国宣布,各国将从5月开始削减石油供应量,总减产幅度超过股价长期破净多家大行高管为低估值抱不平近日,六大国有银行均已披露2022年业绩报告。在业绩发布会上,多家大行的高管表示自己银行的估值明显偏低,呼吁投资者关注国有大行股的投资价值。3月30日,交通银行举行2022年度业绩猪肉别再直接炒着吃了,教你一招,出锅麻辣干香,真过瘾天南地北大拜年大家好,我是不二,猪肉买回来,别再直接炒着吃了,学会我这个做法,保你一周吃三次都不会烦,口感麻辣干香,简直太下饭了,点个赞方法全告诉你。麻辣肉粒原料猪肉葱姜辣椒段麻椒我国首次对美半导体公司美光公司(Micron)出手中国网信办在3月最后一天,宣布对美国半导体公司美光公司的产品进行安全审查。美光公司大家可能听起来比较陌生,但是他家的产品你肯定用过,这是一家做存储的美国龙头半导体公司,主要做的产品回族不吃猪肉,藏族不吃鱼肉,那汉族不吃什么肉?很多人已经遗忘出于宗教信仰与历史传统,许多民族都有着自己的饮食禁忌。比如,我们众所周知的大部分回族和维吾尔族因为信奉伊斯兰教,所以是基本不吃猪肉的。而除了这个常见的知识点以外,你还知道吗藏族人不中国驻斐济使馆发布旅行安全提示中新网北京4月3日电据外交部领事司微信公众号领事直通车消息,中国驻斐济使馆提醒赴斐旅行中国公民注意以下事项一斐济规定随身携带超过5000美元(约1万斐币)现金或等值其他货币入境,须行走河南读懂中国丨安阳汤阴康洼村千亩油菜开出致富花来源河南省文化和旅游厅媒体互连4月1日,安阳市汤阴县韩庄镇康洼村千余亩油菜花烂漫盛开,吸引游客畅游花海。站在康洼村外田野放眼望去,金黄的油菜花与村庄道路树木交相辉映,构成一幅美丽的独家利物浦一年前真的想要这名23岁的球员,但现在看起来很复杂利物浦主帅尤尔根克洛普一年前就非常想要楚阿梅尼,但法布里奇奥罗马诺现在希望这位23岁的球员今年夏天留在皇家马德里。红军如果能在他们的中场增加像楚梅尼这样的天才球员,会做得很好,尤其中国女篮主教练郑薇哪怕只有1希望,也要尽100努力中国女篮集训队日前在广东清远集结开启新一期集训,备战今年的女篮亚洲杯与亚运会。谈及本期集训遇到的困难,中国女篮主教练郑薇表示,哪怕只有1的希望,也要尽100的努力。据了解,3月31