范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

Kafka基于Windows的Kafka有关环境搭建以及使用。NET环境开发的案例代码与演示

  前言:基于Windows系统下的Kafka环境搭建;以及使用.NET 6环境进行开发简单的生产者与消费者的演示。
  一、环境部署
  Kafka是使用Java语言和Scala语言开发的,所以需要有对应的Java环境,以及Scala语言环境。
  Java环境配置,如果不清楚的,可以查看鄙人的另一篇博客:
  https://www.cnblogs.com/weskynet/p/14852471.html
  https://www.scala-lang.org/download/scala2.html
  要选择Binaries版本的环境,否则需要自己编译:
  2、Kafka基于Zookeeper环境运行,zookeeper提供给kafka一系列的功能支持,所以还需要安装Zookeeper有关的环境。下载zookeeper地址:
  https://zookeeper.apache.org/releases.html#download
  3、同样,Zookeeper也需要下载带bin 的链接,没有带bin的链接,可能是源码,需要自己编译:
  https://kafka.apache.org/downloads.html
  5、同样需要选择下载binary版本,然后根据scala的版本选择对应的版本。
  6、下载的三个安装包,如图所示:
  7、先安装Scala语言包环境:
  8、验证Scala语言包是否安装成功:
  控制台窗口,输入:scala -version
  如果提示类似如下有关版本信息,则代表安装成功。
  9、然后是安装zookeeper环境。必须先启动zookeeper,才可以使用kafka。
  安装zookeeper环境,先解压下载的包,然后在解压后的目录下新增data文件夹
  10、然后复制data文件夹的绝对路径,备用。在conf文件夹下,编辑cfg文件
  11、在cfg文件内,修改dataDir指定为上面新建的data文件夹的绝对路径。注意路径是斜杠/,如果要使用  反斜杆,需要写双反斜杠
  12、也要更改cfg格式的文件名称为 zoo.cfg 否则zookeeper无法识别配置文件。Zoo.cfg文件是zookeeper启动时候自动关联的默认配置文件名称。
  13、然后新建环境变量 ZOOKEEPER_HOME:
  14、环境变量path新增:%ZOOKEEPER_HOME%bin
  15、启动zookeeper,直接任意打开控制台,输入 zkServer
  16、如果都没有报错,一般是启动成功了的。再次验证下,可以任意开个控制台,输入JPS进行查看,如下图所示,有JPS、也有QuorumPeerMain,代表zookeeper启动成功了。
  17、Kafka环境安装。先解压,然后在解压后的目录下,新增logs文件夹
  18、然后在Config文件夹下,修改 server.properties 文件,修改 log.dirs 的值为 新增的logs文件夹的绝对路径
  19、进入到解压后的kafka目录下,在路径栏输入cmd,快速打开当前文件夹下的控制台窗口:
  20、输入命令:
  .binwindowskafka-server-start.bat .configserver.properties
  进行启动Kafka服务:
  21、启动Kafka报错了,可能是版本问题,kafka一般新版本对windows环境不友好,所以降级一下。此处我把kafka3.0降级为2.8:
  22、此处我下载的版本为 2.13-2.8.1,各位大佬们可以按照自己意愿选择版本。可能2.x版本和3.x版本跨度比较大,所以3.0版本没法玩。
  23、然后是重复以上配置kafka有关的动作,修改有关配置文件以及新增logs文件夹等。此处省略。
  24、接着在低版本的kafka目录下,快速进入当前解压缩的目录下,再次输入有关命令尝试一下:
  25、没有提示错误,根据提示信息,代表是启动成功了。任意打开控制台,再输入JPS查看下,可以看到Kafka,确认是启动OK了。
  https://www.kafkatool.com/download.html
  27、安装可视化工具,默认可以一直下一步:
  28、可以在安装目录下把可执行程序发送到桌面快捷方式,方便打开。
  29、一些配置,包括名称、kafka版本、端口号、服务地址等
  30、连接以后的效果图,如下。Topic是空的,接下来写点代码。
  二、代码开发与测试
  31、新建类库项目,当作kafka服务类库
  32、此处选择标准库2.1,用于可以给多种.net core版本使用,方便兼容。
  33、引用 Confluent.Kafka 包。
  34、此处新增发布服务类和订阅服务类:
  35、新增的生产者发布服务方法代码如下:
  代码:
  ///
  ///  Description: Kafka生产者发布服务
  ///  CreateTime: 2022/1/21 19:35:27
  ///  Author: Wesky
  ///  
  public  class  PublishService: IPublishService
  {
  public  async  Task PublishAsync(string  broker, string  topicName, TMessage message) where  TMessage : class
  {
  var  config = new  ProducerConfig
  {
  BootstrapServers  = broker, //  kafka服务集群,例如 "192.168.0.1:9092,192.168.0.2:9092" 或者单机 "192.168.0.1:9092"
  Acks = Acks.All,
  MessageSendMaxRetries  = 3 , //  发送失败重试的次数
  };
  using  (var  producer = new  ProducerBuilder<string, string >(config).Build())
  {
  try
  {
  string  data = Newtonsoft.Json.JsonConvert.SerializeObject(message);
  var  sendData = new  Message<string, string > { Key = Guid.NewGuid().ToString(" N " ), Value = data};
  var  report = await  producer.ProduceAsync(topicName, sendData);
  Console.WriteLine($ " 消息 >>>>>: {data} r 发送到:{report.TopicPartitionOffset} " );
  }
  catch  (ProduceException<string, string > ex)
  {
  Console.WriteLine($ " 消息发送失败>>>>>:r  Code= {ex.Error.Code} >>> r Error= {ex.Message} " );
  }
  }
  }
  }
  36、新增的消费者接收服务方法代码如下:
  代码:
  ///
  ///  Description: kafka 消费者订阅服务
  ///  CreateTime: 2022/1/21 19:36:25
  ///  Author: Wesky
  ///  
  public  class  SubscribeService: ISubscribeService
  {
  ///
  ///  消费者服务核心代码
  ///  
  ///
  ///   消费者配置信息 
  ///   主题集合 
  ///  
  ///  
  ///
  public  async  Task SubscribeAsync(ConsumerConfig config, IEnumerable<string> topics, Action func, CancellationToken cancellationToken) where  TMessage : class
  {
  const  int  commitPeriod = 1 ;
  using  (var  consumer = new  ConsumerBuilder(config)
  .SetErrorHandler((_, e)  =>
  {
  Console.WriteLine($ " 消费错误 >>>>>: {e.Reason} " );
  })
  .SetStatisticsHandler((_, json)  =>
  {
  Console.WriteLine($ " ************************************************ " );
  })
  .SetPartitionsAssignedHandler((c, partitionList)  =>
  {
  string  partitions = string .Join(" ,  " , partitionList);
  Console.WriteLine($ " 分配的分区 >>>>> : {partitions} " );
  })
  .SetPartitionsRevokedHandler((c, partitionList)  =>
  {
  string  partitions = string .Join(" ,  " , partitionList);
  Console.WriteLine($ " 回收的分区 >>>>> : {partitions} " );
  })
  .Build())
  {
  consumer.Subscribe(topics);
  try
  {
  while  (true )
  {
  try
  {
  var  consumeResult = consumer.Consume(cancellationToken);
  if  (consumeResult.IsPartitionEOF)
  {
  continue ;
  }
  if (consumeResult?.Offset % commitPeriod == 0 ){
  try
  {
  var  result = JsonConvert.DeserializeObject(consumeResult.Message?.Value);
  func(result);  //  消费消息
  }
  catch  (Exception ex)
  {
  Console.WriteLine($ " 消费业务处理失败: {ex.Message} " );
  }
  try
  {
  consumer.Commit(consumeResult);  //  手动提交
  Console.WriteLine($ " 消费者消费完成,已提交  " );
  }
  catch  (KafkaException e)
  {
  Console.WriteLine($ " 提交错误 >>>>> : {e.Error.Reason} " );
  }
  }
  }
  catch  (ConsumeException e)
  {
  Console.WriteLine($ " 消费错误>>>>> : {e.Error.Reason} " );
  }
  }
  }
  catch  (Exception e)
  {
  Console.WriteLine($ " 其他错误 >>>>> :{e.Message} " );
  consumer.Close();
  }
  }
  await  Task.CompletedTask;
  }
  } 
  37、并且提供对应的接口服务,用于开放给外部调用,或者提供依赖注入使用:   38、新建一个控制台项目,用来当作消费者端的测试,并且新增一个方法,用来当作消费者接收到消息以后的业务处理方法体。此处控制台环境版本为.NET 6   39、消费客户端代码如下。其中,BootstrapServers也可以提供集群地址,例如 ip1:port,ip2:port…… 服务之间以半角逗号隔开。   40、再新增一个webapi项目,用来当作生产者的客户端进行发送数据。以及对kafka服务类部分进行依赖注入注册,此处使用单例。该webapi此处使用.NET 6环境,带有控制器的模式。   41、新增的控制器里面,进行生产者的注入与实现。注意:topicName参数对应上边的topic-wesky,通过主题绑定,否则消费者不认识就没办法消费到了。   控制器代码:   [Route(" api/[controller]/[action] " )]   [ApiController]   public class ProducerController : ControllerBase   {   IPublishService _service = ;   public ProducerController(IPublishService publishService)   {   _service = publishService;   }   [HttpPost]   public IActionResult SendMessage(string broker,string topicName,string message)   {   _service.PublishAsync(broker, topicName, message);   return Ok();   }   }   42、接下来是一些测试,如图所示:   43、最后,使用可视化管理工具Offset进行查看,可以看到对应的主题。选中主题,可以设置数据类型,这里我设置为字符串,就可以查看到对应的消息内容了。如果没有设置,默认是16进制的数据。   44、查看刚刚测试时候收发的消息队列里面的数据,如下所示:   45、一些额外补充:   Kafka也是消息队列的一种,用于在高吞吐量场景下使用比较适合。如果是轻量级的,只需要用于削峰,可以使用RabbitMQ。   以上只是简单的操作演示,至于要用得溜,观众朋友们可以自行补充所需的相关理论知识。   可视化工具还有一款yahoo提供的开源的工具,叫kafka-manager,有兴趣的大佬们可以自行玩玩,开源地址:   https://github.com/yahoo/CMAK   还有一款滴滴平台做的开源的kafka运维管理平台,有兴趣的大佬们也可以自行了解,地址:   https://github.com/didi/LogiKM   以上就是该博客的全部内容,感谢各位大佬们的观看~
腊月二十三小年,36口人撑场面硬菜,5热2凉1汤,家人爱吃夸不停小年,又称为交年节灶神节祭灶节,这天通常视为忙年的开始,意味着人们要开始准备年货扫尘祭灶等,好准备干干净净的迎接新年的到来,这天忙过之后,最重要的是要一家人坐在一起吃顿团圆饭。腊月为啥白头发越长越多呢?或不是因为老了?可能是缺少3种营养导语有些人年纪轻轻就已经有了少白头,其实这一点都不罕见,仔细观察我们身边的人。一般在初高中这个阶段,就有极少部分的同学头上出现了白头发,大家的科学素养不足以解释这种现象,一般会归咎清军入关扬州后曾十日不封刀,并屠城80万人,百年后人们才知真相故人西辞黄鹤楼,烟花三月下扬州。扬州,在历史上极负盛名,甚至在很多时候成为江南的代名词。乾隆皇帝下扬州的故事就被众多影视作品广而告之,也突出了清王朝对扬州的重视。但你是否知道,在清湖南的尹峰美貌更胜扬州戴璐一筹,两人感觉完全不一样漂亮女人特别多,可是如果将自己美貌用错了地方,将会让自己走进深渊,戴璐就是这样误入歧途,但是尹峰完全不一样,尹峰的美是独一无二的,她从来没有将自己美貌当成工具,所以尹峰赢的很彻底。散文内心淡然,心有期待,则万事欣,岁平安作者子墨北风渐暖,吹开了腊梅,流年,曾经的悲欢,也都要在春天来到的时候画上句号。习惯了一个人的安静,静静享受自由,在静寂里让身心闲居,品着一盏清茶,在诗意温婉的时光里,清欢与温暖结帕雷德斯阿根廷队内充满良性竞争战胜墨西哥是转折点直播吧1月19日讯在阿根廷夺得卡塔尔世界杯冠军的一个月后,尤文中场帕雷德斯接受了TyCSports的专访,期间他谈到了在国家队角色的转变以及队内竞争等话题。在这个过程中,当轮到我出邓超11岁儿子好帅!高颜值大长腿五官酷似孙俪,妹妹性格软糯可爱邓超孙俪娱乐圈少有的模范夫妻,当年二人携手步入婚姻后,邓超转型当起了导演,自导自演了几部扑街电影,票房惨淡,并没有激起多大水花。孙俪结婚后,一心想着相夫教子,很少出来营业。说不好听萧山草莓采摘地图上线,就在家门口!快来get这份冬日限定莓好!一年一度的草莓季来啦鲜嫩多汁香甜可口想吃到新鲜的草莓又想体验采摘的快乐?快跟着下面的草莓采摘攻略迎接冬日莓好时光实现草莓自由!草莓季来了进化镇七郎平家庭农场基地采用国产高架无土栽培春晚名场面赵丽颖欧豪同框梦回风吹半夏,张若昀吃烧鸡超喜感盼望着盼望着,2023年春节联欢晚会终于来了!从第一次联排到主持人阵容揭晓,再到节目单首曝光,所有和春晚相关的报道都备受关注。春晚承载了一代又一代人的记忆,也是每年除夕夜必不可少的张翰竟成为油腻的代名词?网友赞他达到了人生巅峰滑铁卢演员张翰去年因主演自编自导的电视剧东八区的先生们胸袭女主王晓晨,剧情低俗离谱,剧集播出之后差评不断,而在豆瓣评分也创造历史国产剧评分最低的剧集。随即遭遇下架,还被官媒痛批低俗溥仪第一次工作拿到60元钱,都买了啥?网友不愧是当过皇帝的人在辛亥革命之前,我们国家经历了2000多年的封建时期,当时的最高统治者就是皇帝。秦始皇嬴政灭六国建立了秦朝,加强了中央集权,皇帝的称号也由此而来。在这之后,每个朝代具有最高权利的人
艾尔登法环需要什么硬件配置艾尔登法环的硬件配置需要满足以下要求1操作系统艾尔登法环支持Windows系统,因此使用该系统的机器是最理想的,但是也支持在Linux系统中部署。2CPU艾尔登法环需要至少配置一个王鹏用佳能EOSR5EOSR6定格滑雪的精彩瞬间无忌器材丨王鹏每年冬天滑雪场开板的时候,广大滑雪爱好者都呼朋唤友齐聚滑雪场。只为在洁白的旷野中体会那种风驰电掣的感觉。当然了,也有很多摄影师喜欢在这个时候带着长枪短炮去滑雪场拍摄一搭载Windows系统的Switch?别闹,这是国产PC游戏掌机OneXPlayer2提到游戏掌机,大家可能首先会想到老任的Switch,但若是聊到PC游戏掌机呢?是不是有些触碰到你的知识空白区了?其实一些资深玩家也多少了解到近两年PC游戏掌机的发展前景,包括V社推折叠卷轴?三星概念机展示目前各家手机大厂已经先后发布过自家的折叠屏手机了,目前左右横折上下竖折这两种形式已经是比较成熟的商用方案了,接下来蓝厂和小米也会推出竖向折叠屏新机。在近日的CES2023展会上,三从二手手表到新品牌,是什么决定了手表的价值?在2022年初,一辆零售价为3。5万美元的百达翡丽5711A可能会以24万美元的价格被抢购一空。到8月,这一数字暴跌至约19万美元左右收藏家的加密货币崩溃,需要感谢或指责,取决于他安徽人游安徽新年新旅途,带上背包去邂逅新的惊喜和美景年前的周末,在置办年货的同时也别忘记放松一下心情跨省出行相对而言比较麻烦不如来西湖湿地散散心开启一个不一样的2023新年旅行西湖湿地的冬天是不能错过的处处都透露着静谧的氛围路过的一碎碎念的迎接新年吧!2022就要结束了,失去也好,顺利也罢都将化为句号,累也好,苦也罢,只要我们平安就好。这一年,遇见失去获得成长释怀完结。愿与旧事归于尽,来年依旧迎花开。人生本身就是一场减法,来日不爱喝老酒不懂怎么存?牢记这3个小技巧,别手欠,小心美酒变差酒俗话说人是旧的好,酒是陈的香。现在的白酒圈子里,不知道什么时候刮起了一阵老酒风,喝酒不再一味讲究大牌,反倒是谁尝上了一口陈年酒香,才叫人羡慕不得。可惜大部分老酒都是有价无市,甚至更不要烤箱,不要微波炉,自制香脆红薯片,一口喀吱脆关注大胖友图图,每天都能看到新奇,简单,美味又实用的食谱哦有小胖友想要我出香脆红薯片的做法。考虑到要让多有人都能做,这个食谱将不会用到烤箱和微波炉。首先去皮以后的红薯要切成薄片,越推荐5道美味佳肴,做法简单易学,超级好吃推荐5道美味佳肴,做法简单易学,超级好吃,感兴趣的朋友快来试试吧!可乐鸡翅食材鸡翅可乐姜丝葱花芝麻料酒生抽老抽盐1鸡翅洗净,正反两面划三刀以便入味鸡翅冷水下锅,升降火锅煮开后捞去浮人越简单,越能富养自己大道至简,大富亦如此。黑格尔曾说最伟大的真理最简单,同样,最简单的人也最难得。一个人,什么都不舍得扔掉,外表富得流油,人生却苦不堪言。富养自己,是追求自由生活的过程,而不是拥有金山