范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

Kafka入门教程(基于C)

  一、搭建 Kafka 环境
  本章的内容比较简单,我们将使用 Docker 快速部署一个单节点的 Kafka 或 Kafka 集群,在后面的章节中,将会使用已经部署好的 Kafka 实例做实验,然后我们通过不断地实验,逐渐了解 Kafka 的知识点以及掌握客户端的使用。这里笔者给出了单机和集群两种部署方式,但是为了便于学习后面的章节,请以集群的方式部署 Kafka。安装 docker-compose 使用 docker-compose 部署 Kafka 可以减少很多没必要的麻烦,一个脚本即可完成部署,省下折腾时间。安装 docker-compose 也是挺简单的,直接下载二进制可执行文件即可。INSTALLPATH=/usr/local/bin sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o ${INSTALLPATH}/docker-compose  sudo chmod +x ${INSTALLPATH}/docker-compose  docker-compose --version
  如果系统没有映射  /usr/local/bin/   路径,执行命令完成后,如果发现找不到 docker-compose   命令,请将文件下载到 /usr/bin  ,即替换 INSTALLPATH=/usr/local/bin   为 INSTALLPATH=/usr/bin  。单节点 Kafka 的部署
  创建一个 docker-compose.yml 文件,文件内容如下: --- version: "3" services: zookeeper: image: confluentinc/cp-zookeeper:7.3.0 container_name: zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000  broker: image: confluentinc/cp-kafka:7.3.0 container_name: broker ports: # To learn about configuring Kafka for access across networks see # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/ - "9092:9092" depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.3.156:9092,PLAINTEXT_INTERNAL://broker:29092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 volumes: - /data/kafka/broker/logs:/opt/kafka/logs - /var/run/docker.sock:/var/run/docker.sock
  请替换  PLAINTEXT://192.168.3.156   中的 IP 。
  然后执行命令开始部署应用: docker-compose up -d
  接着,安装 kafdrop,这是一个 Kafka 管理界面,可以很方便地查看一些信息。 docker run -d --rm -p 9000:9000  -e JVM_OPTS="-Xms32M -Xmx64M"  -e KAFKA_BROKERCONNECT=192.168.3.156:9092  -e SERVER_SERVLET_CONTEXTPATH="/"  obsidiandynamics/kafdrop
  Kafka 集群的部署
  Kafka 集群的部署方法有很多,方法不尽相同,其中使用的配置参数(环境变量)也很多,这里笔者只给出自己在使用的快速部署参数,读者可以参阅官方文档,以便定制配置。
  笔者的部署脚本中其中一些重要的环境变量说明如下: KAFKA_BROKER_ID  : 当前 Broker 实例的 id,Broker id 不能重复;KAFKA_NUM_PARTITIONS  :默认 Topic 的分区数量,默认为 1,如果设置了这个配置,自动创建的 Topic 会根据这个大小设置分区数量。
  KAFKA_DEFAULT_REPLICATION_FACTOR  :默认 Topic 分区的副本数;KAFKA_ZOOKEEPER_CONNECT  :Zookeeper 地址;KAFKA_LISTENERS  :Kafka Broker 实例监听的 ip;
  KAFKA_ADVERTISED_LISTENERS  :外部如何访问当前实例,用于 Zookeeper 监控;
  创建一个 docker-compose.yml 文件,文件内容如下: --- version: "3" services: zookeeper: image: confluentinc/cp-zookeeper:7.3.0 container_name: zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000  kafka1: image: confluentinc/cp-kafka:7.3.0 container_name: broker1 ports: - 19092:9092 depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_NUM_PARTITIONS: 3 KAFKA_DEFAULT_REPLICATION_FACTOR: 2 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.3.158:19092 volumes: - /data/kafka/broker1/logs:/opt/kafka/logs - /var/run/docker.sock:/var/run/docker.sock  kafka2: image: confluentinc/cp-kafka:7.3.0 container_name: broker2 ports: - 29092:9092 depends_on: - zookeeper environment: KAFKA_BROKER_ID: 2 KAFKA_NUM_PARTITIONS: 3 KAFKA_DEFAULT_REPLICATION_FACTOR: 2 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.3.158:29092 volumes: - /data/kafka/broker2/logs:/opt/kafka/logs - /var/run/docker.sock:/var/run/docker.sock  kafka3: image: confluentinc/cp-kafka:7.3.0 container_name: broker3 ports: - 39092:9092 depends_on: - zookeeper environment: KAFKA_BROKER_ID: 3 KAFKA_NUM_PARTITIONS: 3 KAFKA_DEFAULT_REPLICATION_FACTOR: 2 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.3.158:39092 volumes: - /data/kafka/broker3/logs:/opt/kafka/logs - /var/run/docker.sock:/var/run/docker.sock
  由于三个 Broker 实例都在同一个虚拟机上面,因此这里通过暴露不同的端口,避免 Broker 冲突。
  然后执行命令开始部署应用: docker-compose up -d
  接着部署 kafdrop: docker run -d --rm -p 9000:9000  -e JVM_OPTS="-Xms32M -Xmx64M"  -e KAFKA_BROKERCONNECT=192.168.3.158:19092,192.168.3.158:29092,192.168.3.158:39092  -e SERVER_SERVLET_CONTEXTPATH="/"  obsidiandynamics/kafdrop
  现在,已经部署好了 Kafka 环境以及管理面板。
  二、 Kafka 概念在本章中,笔者会介绍 Kafka 的一些基本概念,文中的内容是笔者个人理解总结,可能会有错误或其它问题,如有疑问,欢迎指出。 基本概念 一个简单的 生产消息   -> 保存到 Broker   -> 消费消息   的结构图示例如下:在这里,出现了四个对象:生产者 Producer:产生 Message 的客户端;消费者 Consumer :消费 Message 的客户端;主题 Topic:逻辑上的东西;消息 Message:数据实体;当然图中每一个对象本身都是很复杂的,这里为了便于学习,画了个简单的图,现在我们先从最简单的结构图开始了解这些东西。这里的图比较简单,大概是这样的, Kafka 中有多个 Topic,Producer 可以向指定的 Topic 生产一条消息,而 Consumer 可以消费指定 Topic 的消息。Producer 和 Consumer 都是客户端应用,只是在执行的功能上有所区分,理论上 Kafka 的客户端库都是将两者的代码写在同一个模块,例如 C# 的 confluent-kafka-dotnet,同时具有生产者和消费者的 API。然后就是这个 Message 了,Message 主要结构是:Key Value  其它元数据
  其中 Value 是我们自定义消息内容的地方。
  关于 Message,我们这里简单了解即可,在后面的章节中会继续深入介绍。
  在 Kafka 中,每个 Kafka 实例称为 Broker,每个 Broker 中可以保存多个 Topic。每个 Topic 可以划分为多个分区, 每个分区保存的数据是不一样的, 这些分区可以在同一个 Broker 中,也可以在散布在不同的 Broker 中。
  一个 Broker 可以存储不同 Topic 的不同分区,也可以存储同一个 Topic 的不同分区。
  如果一个 Topic 有多个分区,一般来说其并发量会有所提高,通过增加分区数实现集群的负载均衡,一般情况下,分区均衡需要散布在不同的 Broker 才能合理地负载均衡,不然分区都在同一个 Broker 时,瓶颈在单个机器上。
  如果 Broker 的实例比较少,但是 Topic 划分了多个分区,那么这些分区会被部署到同一个 Broker 上。
  主题分区可以有效提高生产者或消费者的并发量,因为将消息分别存储到不同的分区中,可以同时往多个分区推送消息,会比只向一个分区推送消息的速度快。
  前面提到,每个 Message 都有 Key 和 Value,Topic 可以根据 Message 的 Key 将一个 Message 存储到不同的分区。当然,我们也可以在生产消息的时候,指定向一个分区推送消息。
  分区可以提高并发,但是如果一个 Broker 挂了,数据便会丢失,怎么办?
  在 Kafka 中,分区可以设置多个分区副本,这些副本跟分区并不在同一个 Broker 上,这个当 Broker 挂了后,这些分区可以利用副本在其它 Broker 上复活。 [info] 提示
  在 《Kafka权威指南(第2版)》 的 21 页中,指导了如何合理设置分区数量,以及分区的优势和缺点。 关于 Kafka 脚本工具
  前面介绍了 Kafka 的一些简单概念,为了更加好地了解 Kafka,我们可以利用 Kafka 的脚本做一些实验。
  打开其中一个 Kafka 容器( docker exec   命令进入容器),然后执行命令查看自带的二进制脚本: ls -lah /usr/bin/ | grep kafka
  可以看到,里面有很多 CLI 工具,每种 CLI 工具说明文档可以到这里查看:
  https://docs.cloudera.com/runtime/7.2.10/kafka-managing/topics/kafka-manage-basics.html
  下面笔者介绍部分 CLI 工具的使用方法。 主题管理
  kafka-topics 是用于主题管理的 CLI 工具,kafka-topics 提供基本操作如下所示: 操作: --create  :创建主题;--alter  :变更这个主题,修改分区数等;--config  :修改主题相关的配置;--delete  :删除该主题;
  在管理主题时,我们可以设置主题配置,主题配置存储时,其格式示例为  default.replication.factor   ,如果用 CLI 工具操作,那么传递的参数示例为 --replication-factor  ,因此我们通过不同工具操作主题时,参数名称可能不同一样。主题的所有配置参数可以查看官方文档:
  https://kafka.apache.org/090/documentation.html
  kafka-topics 一些常用参数: --partitions   :分区数量,该主题划分成多少个分区;--replication-factor  :副本数量,表示每个分区一共有多少个副本;副本数量需要小于或等于 Broker 的数量;--replica-assignment  :指定副本分配方案,不能与 --partitions   或 --replication-factor   同时使用;--list  :列出有效的主题;--describe  :查询该主题的信息信息。
  下面是使用 CLI 手工创建主题的命令,创建主题时设置分区、分区副本。 kafka-topics --create --bootstrap-server 192.168.3.158:19092  --replication-factor 3  --partitions 3  --topic hello-topic
  使用 CLI 时,可以通过  --bootstrap-server   配置连接到一个 Kafka 实例,或者通过 --zookeeper   连接到 Zookeeper,然后 CLI 自动找到 Kafka 实例执行命令。
  查看主题的详细信息: kafka-topics --describe --bootstrap-server 192.168.3.158:19092 --topic hello-topic Topic: hello-topicTopicId: r3IlKv8BSMaaoaT4MYG8WAPartitionCount: 3ReplicationFactor: 3Configs:  Topic: hello-topicPartition: 0Leader: 3Replicas: 3,1,2Isr: 3,1,2 Topic: hello-topicPartition: 1Leader: 1Replicas: 1,2,3Isr: 1,2,3 Topic: hello-topicPartition: 2Leader: 2Replicas: 2,3,1Isr: 2,3,1
  可以看到,创建的分区会被均衡分布到不同的 Broker 实例中;对于 Replicas 这些东西,我们后面的章节再讨论。
  也可以打开 kafdrop 查看主题的信息。
  如果一个 Topic 的分区数量大于 Broker 数量呢?前面笔者已经提到,如果分区数量比较大时,部分 Broker 中会存在同一个主题的多个分区。
  下面我们来实验验证一下: kafka-topics --create --bootstrap-server 192.168.3.158:19092  --replication-factor 2  --partitions 4  --topic hello-topic1
  可以看到,Broker 2,分到了  hello-topic1   的两个分区。使用 C# 创建分区
  客户端库中可以利用接口管理主题,如 C# 的 confluent-kafka-dotnet,使用 C# 代码创建 Topic 的示例如下:  static async Task Main()  { var config = new AdminClientConfig  {  BootstrapServers = "192.168.3.158:19092"  };  using (var adminClient = new AdminClientBuilder(config).Build())  { try  { await adminClient.CreateTopicsAsync(new TopicSpecification[] { new TopicSpecification { Name = "hello-topic2", ReplicationFactor = 3, NumPartitions = 2 } });  } catch (CreateTopicsException e)  {  Console.WriteLine(#34;An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");  }  }  }
  在 AdminClient 中还有很多方法可以探索。 分区与复制
  在前面,我们创建了一个名为  hello-topic   的主题,并且为其设置三个分区,三个副本。
  接着,使用  kafka-topics --describe   命令查看一个 Topic 的信息,可以看到:Topic: hello-topicTopicId: r3IlKv8BSMaaoaT4MYG8WAPartitionCount: 3ReplicationFactor: 3Configs:  Topic: hello-topicPartition: 0Leader: 3Replicas: 3,1,2Isr: 3,1,2 Topic: hello-topicPartition: 1Leader: 1Replicas: 1,2,3Isr: 1,2,3 Topic: hello-topicPartition: 2Leader: 2Replicas: 2,3,1Isr: 2,3,1
  Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2   这些数字都是指 Broker ID,Broker ID 可以是数字也可以是有英文。
  主题的每个分区都有至少一个副本,也就是  --replication-factor   参数必须设置大于大于 1。副本分为 leader 和 follwer 两种,每个副本都需要消耗一个存储空间,leader 对外提供读写消息,而 follwer 提供冗余备份,leader 会及时将消息增量同步到所有 follwer 中。
  Partition: 0 Leader: 3 Replicas: 3,1,2   表示分区 0 的副本分布在 ID 为 3  、1  、2   的 Kafka broker 中。
  在  hello-topic   主题中,当分区只有一个副本时,或只关注 leader 副本时,leader 副本对应的 Broker 节点位置如下:
  Kafka 分配分区到不同的节点有一定的规律,感兴趣的读者可参考 《Kafka 权威指南》第二版或官方文档。
  如果设置了多个副本(  --replication-factor=3   ) 时,leader 副本和 follwer 副本的位置如下所示:
  分区的副本数量不能大于 Broker 数量,每个 Broker 只能有此分区的一个副本,副本数量范围必须在 [1,{Broker数量}]   中。也就是说,如果集群只有三个 Broker,那么创建的分区,其副本数量必须在 [1,3]   范围内。
  在不同的副本中,只有 leader 副本能够进行读写,follwer 接收从 leader 推送过来的数据,做好冗余备份。
  一个分区的所有副本统称为 AR(Assigned Repllicas),当 leader 接收到消息时,需要推送到 follwer 中,理想情况下,分区的所有副本的数据都是一致的。
  但是 leader 同步到 follwer 的过程中可能会因为网络拥堵、故障等,导致 follwer 在一定时间内未能与 leader 中的数据一致(同步滞后),那么这些副本称为 OSR( Out-Sync Relipcas)。
  如果副本中的数据为最新的数据,在给定的时间内同步没有出现滞后,那么这些副本称为 ISR。 AR = ISR + OSR
  如果 leader 故障,那么剩下的 follwer 会重新选举 一个 leader;但是如果 leader 接收到生产者的消息后还没有同步到 follwer 就故障了,那么这些消息就会丢失。为了避免这种情况,需要生产者设置合理的 ACK,在第四章中会讨论这个问题。 生产者消费者
  kafka-console-producer 可以给指定的主题发送消息: kafka-console-producer --bootstrap-server 192.168.3.158:19092 --topic hello-topic
  kafka-console-consumer 则可以从指定主题接收消息: kafka-console-consumer --bootstrap-server 192.168.3.158:19092 --topic hello-topic  --group hello-group  --from-beginning
  订阅主题时,消费者需要指定消费者组。可以通过  --group   指定;如果不指定,脚本会自动为我们创建一个消费者组。
  kafka-consumer-groups 则可以为我们管理消费者组,例如查看所有的消费者组: kafka-consumer-groups --bootstrap-server 192.168.3.158:19092 --list
  查看消费者组详细信息: kafka-consumer-groups --bootstrap-server 192.168.3.158:19092 --describe --group hello-group
  当然,也可以从 Kafdrop 界面中查看消费者组的信息。
  这些参数我们现在可以先跳过。
  C# 部分并没有重要的内容要说,代码可以参考:  static async Task Main()  { var config = new AdminClientConfig  {  BootstrapServers = "192.168.3.158:19092"  };  using (var adminClient = new AdminClientBuilder(config).Build())  { var groups = adminClient.ListGroups(TimeSpan.FromSeconds(10)); foreach (var item in groups)  {  Console.WriteLine(item.Group);  }  }  }
  对于消费者组来说,我们需要关注以下参数: state  :消费者组的状态;members  :消费者组成员;offsets  :ACK 偏移量;修改配置
  可以使用 kafka-configs 工具设置、描述或删除主题属性。
  查看主题属性描述: kafka-configs --bootstrap-server [HOST:PORT] --entity-type topics --entity-name [TOPIC] --describe kafka-configs --bootstrap-server 192.168.3.158:19092 --entity-type topics --entity-name hello-topic --describe
  使用  --alter   参数后,可以添加、修改或删除主题属性,命令格式:kafka-configs --bootstrap-server [HOST:PORT] --entity-type topics --entity-name [TOPIC] --alter --add-config [PROPERTY NAME]=[VALUE] kafka-configs --bootstrap-server [HOST:PORT] --entity-type topics --entity-name [TOPIC] --alter --delete-config [PROPERTY_NAME]
  例如 Kafka 默认限制发送的消息最大为 1MB,为了修改这个限制,可以使用以下命令: kafka-configs --bootstrap-server 192.168.3.158:19092 --entity-type topics --entity-name hello-topic --alter --add-config "max.message.bytes=1048576" BASH复制全屏
  其中还有很多参数,请参考:
  https://kafka.apache.org/10/documentation.html#topicconfigs
  此外,我们还可以通过 kafka-configs 查看 Broker 的配置: kafka-configs --bootstrap-server 192.168.3.158:19092 --describe --broker 1
  三、 Kafka .NET 基础
  在第一章中,笔者介绍了如何部署 Kafka;在第二章中,笔者介绍了 Kafka 的一些基础知识;在本章中,笔者将介绍如何使用 C# 编写程序连接 kafka,完成生产和消费过程。
  在第二章的时候,我们已经使用到了 confluent-kafka-dotnet   ,通过 confluent-kafka-dotnet   编写代码调用 Kafka 的接口,去管理主题。
  confluent-kafka-dotnet   其底层使用了一个 C 语言编写的库 librdkafka,其它语言编写的 Kafka 客户端库也是基于 librdkafka 的,基于 librdkafka 开发客户端库,官方可以统一维护底层库,不同的编程语言可以复用代码,还可以利用 C 语言编写的库提升性能。
  此外,因为不同的语言都使用了相同的底层库,也使用了相同的接口,因此其编写的客户端库接口看起来也会十分接近。大多数情况下,Java 和 C# 使用 Kafka 的代码是比较相近的。
  接着说一下 confluent-kafka-dotnet  ,Github 仓库中对这个库的其中一个特点介绍是:High performance : confluent-kafka-dotnet 是一个轻量级的程序包装器,它包含了一个精心调优的 C 语言写的 librdkafka 库。
  Library dkafka 是 Apache Kafka 协议的 C 库实现,提供了 Producer、 Consumer 和 Admin 客户端。它的设计考虑到信息传递的可靠性和高性能,目前的性能超过 100万条消息/秒 的生产和 300万条消息/秒 的消费能力(原话是:current figures exceed 1 million msgs/second for the producer and 3 million msgs/second for the consumer)。
  现在,这么牛逼的东西,到 nuget 直接搜索 Confluent.Kafka 即可使用。
  回归正题,下面笔者将会介绍如果使用 C# 编写生产者、消费者程序。在本章中,我们只需要学会怎么用就行,大概了解过程,而不必深究参数配置,也不必细究代码的功能或作用,在后面的章节中,笔者会详细介绍的。生产者
  编写生产者程序大概可以分为两步,第一步是定义 ProducerConfig 配置,里面是关于生产者的各种配置,例如 Broker 地址、发布消息重试次数、缓冲区大小等;第二步是定义发布消息的过程。例如要发布什么内容、如何记录错误消息、如何拦截异常、自定义消息分区等。
  下面是生产者代码的示例:using Confluent.Kafka; using System.Net;  public class Program { static void Main()  { var config = new ProducerConfig  {  BootstrapServers = "host1:9092",  ...  };  using (var producer = new ProducerBuilder<, string>(config).Build())  {  ...  }  } }
  如果要将消息推送到 Kafka,那么代码是这样写的:var result = await producer.ProduceAsync("weblog", new Message<, string> { Value="a log message" });
  Value   就是消息的内容。其实一条消息的结构比较复杂的,除了 Value ,还有 Key 和各种元数据,这个在后面的章节中我们再讨论。
  下面是发布一条消息的实际代码示例:using Confluent.Kafka; using System.Net;  public class Program { static async Task Main()  { var config = new ProducerConfig  {  BootstrapServers = "192.168.3.156:9092"  };  using (var producer = new ProducerBuilder<, string>(config).Build())  { var result = await producer.ProduceAsync("weblog", new Message<, string> { Value = "a log message" });  }  } }
  运行这段代码后,可以打开 kafdrop 面板查看主题信息。
  如果我们断点调试 ProduceAsync   后的内容,可以看到有比较多的信息,例如:
  这些信息记录了当前消息是否被 Broker 接收并确认(ACK),该条消息被推送到哪个 Broker 的哪个分区中,消息偏移量数值又是什么。
  当然,这里暂时不需要关注这个。批量生产
  这一节中,我们来了解如何通过代码批量推送消息到 Broker。
  下面是代码示例:using Confluent.Kafka; using System.Net;  public class Program { static async Task Main()  { var config = new ProducerConfig  {  BootstrapServers = "192.168.3.156:9092"  };  using (var producer = new ProducerBuilder<, string>(config).Build())  { for (int i = 0; i < 10; ++i)  {  producer.Produce("my-topic", new Message<, string> { Value = i.ToString() }, handler);  }  } // 帮忙程序自动退出  Console.ReadKey();  }  public static void handler(DeliveryReport<, string> r)  {  Console.WriteLine(!r.Error.IsError  ? #34;Delivered message to {r.TopicPartitionOffset}"  : #34;Delivery Error: {r.Error.Reason}");  } }
  可以看到,这里批量推送消息使用了 Produce  ,而之前我们使用的异步代码用了 ProduceAsync  。
  其实两者都是异步的,但是 Product   方法更直接地映射到底层的 librdkafka API,能够利用 librdkafka 中高性能的接口批量推送消息。而 ProduceAsync   则是 C# 实现的异步,相对来说Product   的开销小一些,但是 ProduceAsync 仍然非常高性能——在典型的硬件上每秒能够产生数十万条消息
  如果说最最直观的差异,那么就是两者的返回结果。
  从定义来看:Task> ProduceAsync(string topic, Message message, ...);  void Produce(string topic, Message message, Action> deliveryHandler = );
  ProduceAsync   可以直接获得 Task,然后通过等待 Task 获取响应结果。
  而 Produce   并不能直接获得结果,而是通过回调方式获取推送结果,由 librdkafka 执行回调。
  由于 Produce   是框架底层异步的,但是没有 Task,所以不能 await   ,为了避免在批量消息处理完成之前,producer   生命周期结束了,所以需要使用 producer.Flush(TimeSpan.FromSeconds(10))   这样的代码等待批量消息完成推送。
  调用 Flush   方法可使所有缓冲记录立即可用于发送,并在与这些记录关联的请求完成时发生阻塞。
  Flush   有两个重载:int Flush(TimeSpan timeout); void Flush(CancellationToken cancellationToken = default(CancellationToken));
  int Flush()   会等待指定的时间,如果时间到了,队列中的消息只发送一部分,那么会返回没成功发送的消息数量。
  示例代码如下:using Confluent.Kafka; using System.Net;  public class Program { static async Task Main()  { var config = new ProducerConfig  {  BootstrapServers = "192.168.3.156:9092"  };  using (var producer = new ProducerBuilder<, string>(config).Build())  { for (int i = 0; i < 10; ++i)  {  producer.Produce("my-topic", new Message<, string> { Value = i.ToString() }, handler);  } // 只等待 10s var count = producer.Flush(TimeSpan.FromSeconds(10)); // 或者使用 // void Flush(CancellationToken cancellationToken = default(CancellationToken));  } // 不让程序自动退出  Console.ReadKey();  }  public static void handler(DeliveryReport<, string> r)  {  Console.WriteLine(!r.Error.IsError  ? #34;Delivered message to {r.TopicPartitionOffset}"  : #34;Delivery Error: {r.Error.Reason}");  } }
  如果将 Kafka 服务停止,客户端肯定是不能推送消息的,那么我们在使用批量推送代码时会有什么现象呢?
  这里可以停止所有 Broker 或者给 BootstrapServers   参数设置一个错误的地址,然后启动程序,会发现 producer.Flush(TimeSpan.FromSeconds(10));   会等待 10s,但是此时 handler 不会起效。
  可以看到,如果使用批量消息,需要注意使用 Flush  ,即使连接不上 Broker,程序也不会报错。
  所以我们使用批量消息时,一定要注意与 Broker 的连接状态,以及处理 Flush   返回的失败数量。 var result = producer.Flush(TimeSpan.FromSeconds(10));  Console.WriteLine(result);
  使用 Tasks.WhenAll
  前面提到了使用 Produce   方法来批量推送消息,除了框架本身的批量提交,我们也可以利用 Tasks.WhenAll   来实现批量提交获取返回结果,不过性能并没有 produce - Flush   好。
  示例代码如下: using (var producer = new ProducerBuilder<, string>(config).Build())  {  List tasks = new(); for (int i = 0; i < 10; ++i)  { var task = producer.ProduceAsync("my-topic", new Message<, string> { Value = i.ToString() });  tasks.Add(task);  } await Task.WhenAll(tasks.ToArray());  } 如何进行性能测试
  produce - Flush   的性能到底有多好呢?
  我们可以使用 BenchmarkDotNet 做性能测试,来评估推送不同消息数量时,消耗的时间和内存。由于不同服务器的 CPU、内存、磁盘速度,以及客户端与服务器之间的网络带宽、时延都是影响消息吞吐量的重要因素,因此有必要编写代码来进行性能测试,来评估客户端以及服务器需要多高的性能来运行程序。
  示例代码如下:using Confluent.Kafka; using System.Net; using System.Security.Cryptography; using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Running; using BenchmarkDotNet.Jobs;  public class Program { static void Main()  { var summary = BenchmarkRunner.Run();  } }  [SimpleJob(RuntimeMoniker.Net70)] [SimpleJob(RuntimeMoniker.NativeAot70)] [RPlotExporter] public class KafkaProduce { // 每批消息数量  [Params(1000, 10000,100000)] public int N;  private ProducerConfig _config;    [GlobalSetup] public void Setup()  {  _config = new ProducerConfig  {  BootstrapServers = "192.168.3.156:9092"  };  }   [Benchmark] public async Task UseAsync()  { using (var producer = new ProducerBuilder<, string>(_config).Build())  {  List tasks = new(); for (int i = 0; i < N; ++i)  { var task = producer.ProduceAsync("ben1-topic", new Message<, string> { Value = i.ToString() });  tasks.Add(task);  } await Task.WhenAll(tasks);  }  }   [Benchmark] public void UseLibrd()  { using (var producer = new ProducerBuilder<, string>(_config).Build())  { for (int i = 0; i < N; ++i)  {  producer.Produce("ben2-topic", new Message<, string> { Value = i.ToString() }, );  }  producer.Flush(TimeSpan.FromSeconds(60));  }  } }
  在示例代码中,笔者除了记录时间速度外,也开启了 GC 记录。
  Ping 服务器的结果以及 BenchmarkDotNet 性能测试结果如下:正在 Ping 192.168.3.156 具有 32 字节的数据: 来自 192.168.3.156 的回复: 字节=32 时间=1ms TTL=64 来自 192.168.3.156 的回复: 字节=32 时间=2ms TTL=64 来自 192.168.3.156 的回复: 字节=32 时间=2ms TTL=64 来自 192.168.3.156 的回复: 字节=32 时间=1ms TTL=64
  Method
  Job
  Runtime
  N
  Mean
  Error
  StdDev
  Gen0
  Gen1
  Gen2
  Allocated
  UseAsync
  .NET 7.0
  .NET 7.0
  1000
  125.1 ms   2.21 ms   2.17 ms   -   -   -   1055.43 KB   UseLibrd   .NET 7.0   .NET 7.0   1000
  124.7 ms   2.26 ms   2.12 ms   -   -   -   359.18 KB   UseAsync   NativeAOT 7.0   NativeAOT 7.0   1000
  124.8 ms   1.83 ms   1.62 ms   -   -   -   1055.43 KB   UseLibrd   NativeAOT 7.0   NativeAOT 7.0   1000
  125.1 ms   1.76 ms   1.64 ms   -   -   -   359.18 KB   UseAsync   .NET 7.0   .NET 7.0   10000
  143.9 ms   3.70 ms   10.86 ms   1250.0000   750.0000   250.0000   10577.22 KB   UseLibrd   .NET 7.0   .NET 7.0   10000
  140.6 ms   2.74 ms   4.80 ms   250.0000   -   -   3523.29 KB   UseAsync   NativeAOT 7.0   NativeAOT 7.0   10000
  145.7 ms   3.25 ms   9.59 ms   1250.0000   750.0000   250.0000   10577.22 KB   UseLibrd   NativeAOT 7.0   NativeAOT 7.0   10000
  140.6 ms   2.78 ms   5.56 ms   250.0000   -   -   3523.29 KB   UseAsync   .NET 7.0   .NET 7.0   100000
  407.3 ms   7.17 ms   9.58 ms   13000.0000   7000.0000   2000.0000   105185.91 KB   UseLibrd   .NET 7.0   .NET 7.0   100000
  259.7 ms   5.72 ms   16.78 ms   4000.0000   -   -   35164.82 KB   UseAsync   NativeAOT 7.0   NativeAOT 7.0   100000
  419.8 ms   8.31 ms   13.19 ms   14000.0000   8000.0000   2000.0000   105194.3 KB   UseLibrd   NativeAOT 7.0   NativeAOT 7.0   100000
  255.3 ms   6.31 ms   18.62 ms   4000.0000   -   -   35164.72 KB   可以看到使用了 librdkafka 批量推送,比使用 Task.WhenAll 性能要好一些,特别是消息数量比较大的情况下。   不过这个性能测试的结果意义也不大,主要是让读者了解如何使用 BenchmarkDotNet 进行性能测试,客户端推送消息到 Broker,能够实现每秒多大的负载,以此评估在当前环境下可以承载多大的流量。消费   生产消息后,接着编写消费者程序处理消息,消费的代码分为 ConsumerConfig 配置和消费两步,其示例代码如下:using System.Collections.Generic; using Confluent.Kafka; ... var config = new ConsumerConfig { // 这些配置后面的章节中笔者会介绍,这里跳过。 BootstrapServers = "host1:9092,host2:9092", GroupId = "foo", AutoOffsetReset = AutoOffsetReset.Earliest }; using (var consumer = new ConsumerBuilder(config).Build()) { ... }   消费者配置默认会自动提交确认(ACK),所以消费后不需要编写代码确认消息,所以笔者编写的消费者示例代码如下:using Confluent.Kafka; using System.Net; public class Program { static void Main() { var config = new ConsumerConfig { BootstrapServers = "192.168.3.156:9092", GroupId = "test1", AutoOffsetReset = AutoOffsetReset.Earliest }; CancellationTokenSource source = new CancellationTokenSource(); using (var consumer = new ConsumerBuilder(config).Build()) { // 订阅主题 consumer.Subscribe("my-topic"); // 循环消费 while (!source.IsCancellationRequested) { var consumeResult = consumer.Consume(source.Token); Console.WriteLine(consumeResult.Message.Value); } consumer.Close(); } } }

宁波通商银行违规被罚未对交易单证真实性合理审查等来源中国经济网国家外汇管理局宁波市分局近日公布的行政处罚决定书(甬外管罚202244号)显示,宁波通商银行股份有限公司办理经常项目资金收付,未对交易单证的真实性及其与外汇收支的一致杜锋用心良苦!放弃功勋队长,20岁小将砍21分回报,广东人才济济CBA常规赛广东大胜北控,比分12396,广东男篮取得近16场比赛的第15场胜利。由于早早大比分领先,本场比赛杜锋也给予年轻球员足够多的上场时间,报名12人全部出场,上场时间最少的吉林银行王立生调任吉林省联社理事长,担任董事长不到一年文记者黄宇昆1月9日,吉林省人民政府网站发布的关于王立生高壮任免职的通知显示,吉林省人民政府2022年12月30日决定,建议王立生为吉林省农村信用社联合社(下称吉林省联社)理事长人上海王思聪4人当街打人,致受害者轻微伤,曝光209万赔偿为假?文江湖独白专栏说起王思聪的名字,国内很多人都知道他,多重身份叠加在一起,让他的一举一动都受到广泛的关注。在此次上海警方通报王某某等殴打路人的话题下,有权威人士指出,涉事的人员为王思再现炫薪事件?有人自称投行大佬,晒出1。4亿银行卡余额多金的证券业从来就不缺高薪传闻。近日,一位自称是投行MD(董事总经理)晒出了一张银行卡总资产高达9位数的截图,瞬时刷爆网络。不少网友认为真实性存疑。不过,事件真实性如何虽未有定论,一箭数雕!童磊若加盟,多个问题迎刃而解,泰山如虎添翼久古1月11日,知名足球记者陈永曝料,大连人边后卫童磊无限接近签约泰山。这一消息若属实,个人认为此乃一雕数雕之举,新赛季的泰山队必将如虎添翼。今年刚刚踏入26岁的童磊,场上司职右后詹姆斯我能砍下60分20助攻,但是无法像东契奇一样砍下602010本赛季的NBA出现了越来越多的爆炸性数据,比如独行侠的当家球星,东契奇就是其中的代表人物,他在之前的一场比赛当中疯狂砍下了60分21篮板10助攻的狂暴数据,并且这也是NBA历史上绝妻子谈阿尔维斯性侵指控他不是那种人,跳舞听歌有什么问题?直播吧1月12日讯阿尔维斯最近被指控在12月31日于加泰罗尼亚一家夜店性侵一名女性,这名巴西老将的妻子模特儿乔安娜桑斯随后接受媒体采访,并捍卫了自己的丈夫。乔安娜告诉Antena3王天一炮轰郑惟桐告诉你为什么你是九年第二引发棋迷关注的王郑十番棋原计划在本月13日开赛,临近赛期突生变故,王天一连发两微博阐述观点,指出郑惟桐以各种冠冕堂皇的理由要求比赛改期。首篇微博在11日晚发出致敬爱的郑特大十番棋大体育要闻曼城英联杯出局英联杯曼城02完败南安普顿爆冷遭淘汰今晨英联杯14决赛中,曼城客战目前英超积分榜垫底的南安普顿。上班时,塞库马拉接莱扬科助攻推射破门,仅5分钟后,杰内波打入世界波吊射为圣徒扩大领先八战七胜!CBA老牌劲旅低调崛起,有望跻身争冠球队行列老牌劲旅新疆男篮低调崛起新疆男篮作为CBA的老牌劲旅,曾在1617赛季夺得队史首座CBA总冠军,在周琦的带领下,新疆也一直都是总冠军的有力竞争者。但随着新疆男篮完成阵容大洗牌之后,
兰州出发!高铁可达!如果城市有颜色,你心中的庆阳是滴滴滴!甘肃庆阳有高铁了,现在出行再也不用担心旅途漫长了!兰州出发4小时40分即可到达,这确实是个令人开心的好消息。从此得空逛庆阳也将不是什么难事,那么庆阳有什么值得打卡的景点,有越南游客为河口消费市场添动力自河口口岸恢复通关以来,来往于口岸的越南游客日渐增多,持续不断的人流带旺了边城河口的消费市场,流动在街头巷尾的异国游客也自成一道风景。近日,记者走访了城区部分商铺,看到许多越南边民重庆有个地方,被称为活棺材,曾是国民党军统局的秘密监狱!我的宝藏作者在中国的西南部,有一座美丽的城市重庆。重庆又名山城,因为重庆城里有许多山。还由于它的特殊地理位置,冬天的早上,大雾笼罩了整个城市,所以它又叫雾都。城市在雾中若隐若现,整雍和宫烧头香,灵不灵有这样一群人,他们不看春晚不吃年夜饭,宁可跋山涉水数百公里排队等待2076个小时也要在每年大年初一去做一件事情,那就是去北京雍和宫烧头香。我们不禁会去思考,雍和宫烧头香许愿真的靠谱富尔茨准三双班凯罗22分,魔术送公牛4连败魔术客场10091送公牛四连败。魔术上半场4944领先公牛,第三节文德尔卡特加里哈里斯弗朗茨瓦格纳命中4记三分,魔术6552领先13分。科比怀特连得5分,公牛追到6876进入末节。疯狂一夜!4笔签约达成,76人捡漏补强,杜兰特太阳首次训练北京时间2月14日,NBA常规赛继续展开,场外传来多个消息,又迎来了疯狂一夜。在交易截止日结束之后,多支球队都在关注着买断市场,一夜又有4笔签约达成,步行者独行侠和火箭都完成了一笔奥沙利文大爆发,40横扫对手,欲夺66万奖金说到丁俊晖和奥沙利文相信很多球迷并不陌生,他们都是非常出色的斯诺克运动员,奥沙利文是七五三杰之首职业生涯夺得了7个世锦赛冠军,丁俊晖曾经是八零五虎之首,可惜的是现如今是八零五虎中唯优惠三百,一加Ace2从主角变成陪衬,RedmiK60同价位依旧没有对手本以为这个月手机市场会冷清一段时间,然而没有想到,各大手机厂商为我们带来了一款又一款出色的产品,就有网友将一加Ace2以及RedmiK60做了一个对比,这两款手机的表现都很不错,哪联盟第1联盟第1,交易双赢!里弗斯悬崖勒马,哈登终于如愿以偿当欧文和杜兰特纷纷选择西游,西部本赛季的竞争激烈程度,比过去十年形势更加严峻,简单来说,像如今西部前三,包括欧文东契奇的组合,小卡保罗乔治的组合,杜兰特布克保罗艾顿的组合,勇士库里美媒列出杜兰特搭档过的最强队友,17人按实力排名,谁被高估了杜兰特最近被交易到菲尼克斯太阳后,与克里斯保罗德文布克和艾顿组成了四巨头。这样的组合,也是让太阳成为了202223赛季的夺冠大热门。我们也可以看到杜兰特在职业生涯里,再次获得了一些与大海为伴泰国丽贝岛印象之三到丽贝岛的第二天,我搬去了日出沙滩,住在一个名为Village的小村庄里。穿过它破旧的大门,可以看到周围一排排的竹屋和巨大的椰子树,地上则是白色的细沙和草地。不远处,在路的尽头,尽