范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

第二章kafka

  引入依赖 	org.apache.kafka     kafka_2.13     3.1.0 发送消息异步发送/**      * 异步发送消息      * @param msg      */     @RequestMapping("/send")     public void send(String msg){         log.info("msg[{}]", msg);         //设置生产者属性         Properties properties = new Properties();         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master11.bbx.com:9092");         //设置序列化         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());         //创建生产者         KafkaProducer producer = new KafkaProducer(properties);         //创建发送的内容         ProducerRecord producerRecord = new ProducerRecord("bbx-first-topic",msg+"--"+System.currentTimeMillis());         //发送数据         producer.send(producerRecord);         //关闭资源         producer.close();     }回调发送/**      * 发送消息  并且回调      * @param msg      */     @RequestMapping("/sendAndCall")     public void sendAndCall(String msg){         log.info("msg[{}]", msg);         //设置生产者属性         Properties properties = new Properties();         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master11.bbx.com:9092");         //设置序列化         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());         //创建生产者         KafkaProducer producer = new KafkaProducer(properties);         //创建发送的内容         ProducerRecord producerRecord = new ProducerRecord("bbx-first-topic",msg+"--"+System.currentTimeMillis());         //发送数据         producer.send(producerRecord,new sendMsgCallback());         //关闭资源         producer.close();     }  /**  * 回调类  */ @Slf4j class sendMsgCallback implements Callback{      @Override     public void onCompletion(RecordMetadata metadata, Exception exception) {         if(exception == null){             log.info("topic[{}],partition[{}]", metadata.topic(), metadata.partition());         }else{             log.error("发送消息出现错误[{}]", exception);         }     } }同步发送    /**      * 同步发送消息      * @param msg      */     @RequestMapping("/sendSync")     public void sendSync(String msg) throws ExecutionException, InterruptedException {         log.info("msg[{}]", msg);         //设置生产者属性         Properties properties = new Properties();         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master11.bbx.com:9092");         //设置序列化         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());         //创建生产者         KafkaProducer producer = new KafkaProducer(properties);         //创建发送的内容         ProducerRecord producerRecord = new ProducerRecord("bbx-first-topic",msg+"--"+System.currentTimeMillis());           //发送数据----调用get() 获取结果         RecordMetadata o = (RecordMetadata)producer.send(producerRecord).get();         log.info("sendSync result topic[{}]", o.topic());         //关闭资源         producer.close();      }分区分区策略/**  *DefaultPartitioner  * 
  • If a partition is specified in the record, use it *
  • If no partition is specified but a key is present choose a partition based on a hash of the key *
  • If no partition or key is present choose the sticky partition that changes when the batch is full. * kafka 默认分区相关策略 * 1、指定分区,使用指定分区 * 2、不指定分区,指定key ,按照key 的hashcode 对分区数 取模 * 例: key的hashcode 为11 ,集群的分区设定为2 , 11%2=1 ,测数据会发送到1号分区 * 3、既不指定分区,有不指定key,kafka 采用 Sticky Partition 会随机选择一个分区, * 并尽可能的使用该分区,直到改分区的 batch 满或者以完成(时间到), * kafka 再随机选择一个与上次不同的分区 * */ /** * 指定分区 异步发送消息 * @param msg */ @RequestMapping("/has") public void has(String msg){ log.info("msg[{}]", msg); //设置生产者属性 Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master11.bbx.com:9092"); //设置序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //创建生产者 KafkaProducer producer = new KafkaProducer(properties); //创建发送的内容 --指定分区 ProducerRecord producerRecord = new ProducerRecord("bbx-first-topic",0,"",msg+"--"+System.currentTimeMillis()); //发送数据 producer.send(producerRecord); //关闭资源 producer.close(); }自定义分区 /** * 自定义分区 异步发送消息 * @param msg */ @RequestMapping("/myPartitioner") public void myPartitioner(String msg){ log.info("msg[{}]", msg); //设置生产者属性 Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master11.bbx.com:9092"); //设置自定义分区 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, new MyPartitioner().getClass().getName()); //设置序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //创建生产者 KafkaProducer producer = new KafkaProducer(properties); //创建发送的内容 ProducerRecord producerRecord = new ProducerRecord("bbx-first-topic",msg+"--"+System.currentTimeMillis()); //发送数据 producer.send(producerRecord); //关闭资源 producer.close(); } package com.bbxdemo.kafka.config; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; /** * 自定义分区策略 */ public class MyPartitioner implements Partitioner { /** * * @param topic * @param key * @param keyBytes * @param value * @param valueBytes * @param cluster * @return */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { String valueStr = value.toString(); if(valueStr.contains("bbx")){ return 0; }else{ return 1; } } @Override public void close() { } @Override public void configure(Map configs) { } }

  • 女车主起诉要求精神赔偿!特斯拉回应相信法律,全力推动车辆检测上海车展车顶维权女车主和特斯拉之间的纠纷终于走到了对簿公堂这一步。5月6日深夜,特斯拉河南安阳维权女车主张女士方面宣布决定寻求司法途径,向安阳市北关区人民法院起诉特斯拉公司和副总裁超充出牌,广汽吉利围堵特斯拉随着上海车展这一波新能源汽车的热潮,似乎又推动整个行业朝着转向方面迈进了一大步。续航永远是电动车使用的最底层体验基础,要想解决这一问题就两个路径,要么单次续航足够长,让痛苦的充电频清理手机内存现在用的华为手机是2017年底年会抽奖得到的奖品,刚才开始用的那几个月,由于放在床头柜摔下来坏了,大概有半年没用,2018年下半年才开始真正使用。现已使用3年多时间,最喜欢这个手机根本卖不动!中国人均换机周期25。3个月,小米最频繁,苹果最淡定伴随着大数据化时代的到来,手机系统和软件的内存越做越大。虽说智能手机每一轮的系统软件更新,带给用户的体验越来越丰富。但同时也给智能手机的性能和运行内存提出了更高的要求。在旧手机性能前有华为P50,后有三星S22,一英寸大底时代要来了?最大缺点是丑大家好,解读数码圈新鲜事,我是探长。根据海外博主特米的爆料,即将在5月或者6月发布的华为P50系列手机,将会全球首发1英寸的图像传感器IMX800。虽然网上目前并没有关于IMX80华为鸿蒙下月大规模推送体验用户兼具iOS的丝滑和安卓的开放性本文来源时代周报作者杨玲玲5月7日上午,华为鸿蒙有望下月规模化推送的话题登上新浪微博热搜榜,鸿蒙系统再次被置于聚光灯下。此前一日,华为终端有限公司刚开通华为HarmonyOS官方微华为鸿蒙正式上线,这些功能比安卓强华为鸿蒙系统(HarmonyOS)大家应该不陌生吧。在2019年8月9日,华为在东莞举行的开发者大会上,公布了全新操作系统鸿蒙。鸿蒙系统并不只限于手机,它的定位是全场景的分布式系统安卓不再是未来,华为鸿蒙系统即将大规模推送,万物互联才是趋势没有人能熄灭满天星光,与千千万万个开发者一起,打造属于未来的万物互联生态。华为鸿蒙OS即将全面推送,安卓时代依然辉煌,但它并不是未来。目前智能手机行业分为两大阵营,一是以iOS为主华为P50Pro现身,拍照是最大亮点对于华为P50系列来讲可以用江湖只存在它的传说,但没人见过它的真身。看似业内有很多消息传出来,不过没有一个是官方确认的。5月7日继华为P50华为P50Pro高清渲染图现身之后,华为狗狗太疯狂!印度最大加密货币交易所崩溃用户怨声载道近日,网红数字货币狗狗币交易量激增,并使印度最大的数字货币交易所WazirX宕机,随后,印度数字货币投资者们纷纷在社交媒体上抱怨交易和付款出现了延迟现象。据Coinmarketca不惧制裁,华为鸿蒙系统或将下个月正式推送2020年5月份,美国商务部工业与安全局(BIS)宣布,严格限制华为使用美国的技术软件设计和制造半导体芯片。美国对华为新禁令正式生效的日子为9月15日,9月15日及以后,台积电高通
    4。企业如何利用价格策略开发客户?这是一个很好的问题,因为我们都知道,市场营销有4P策略,其中就包括价格策略。企业在发展的不同历史时期,以及不同类型的企业,价格策略是不一样的,下面分别叙述。对于传统制造企业来说,质企业一体化管理云平台对酒水饮料行业有什么作用?传统行业也在努力向上游推出一系列新产品,矿泉水品牌雨后春笋般涌现,满足了更多消费者的需求。随着中国互联网用户的快速增长和年轻化,互联网的使用对矿泉水行业越来越重要。矿泉水产业作为传三星大手笔调整高层人事,为甩开中国企业猛追韩国三星电子12月7日发布消息称,领导3个主要部门的3名首席执行官(CEO)将全部撤换。这是该公司4年来首次更换CEO。今后将把智能手机和家电部门合并为产品部门进行一体运营,采用产docker搭建yapi接口管理系统1。启动MongoDBdockerrundnamemongoyapimongo2。获取yapi镜像dockerpullregistry。cnhangzhou。aliyuncs。co华为智能眼镜12月23日发布,搭载HarmonyOS系统支持万物互联?大家应该都知道本月的23日是华为的新品发布会吧,目前华为已经确定了在发布会上发布哪些产品,之前华为就透露出了华为P50Pocket华为WatchD华为MateBook笔记本等,近日怎么才能买到好手机,行内人总结3条建议,网友真的很实用到了年底想要更换手机的用户越来越多,那么在选购手机的时候,我们该怎么样找到自己满意的手机?也就是说如何才能买到好手机?这个问题一直困扰着广大的网友,现在的手机品牌越来越多,机型也越iOS15。3升级后表现如何?8部iPhone实测,这几部不建议升级iOS的每次升级,果粉们都十分关心,因为iOS升级之后对iPhone的性能会有怎么样的影响暂时还是未知。目前iOS已经可以升级到iOS15。3,和iOS15。2。1相比,iPhonMIUI13新功能刚更新了MIUI13不到一个星期,下面来和大家分享一下。首先这次更新是跨版本的更新,更新时有可能出现有些不适配的情况。接下来说的是更新的主要内容。1。妙享中心(最大的更新方面)可拖苹果6s6sp77sp配置,功能,价格,性价比综合比较,那款更值得入手?首先总体看,iPhone6s6sPlus与iPhone77Plus相比最大的改进在于双摄像头,支持人像拍照。当然还有处理器提升支持防水等特性。一拍照iPhone77Plus的拍照能消息称FacebookInstagram正研发NFT制作交易功能DoNews1月22日消息(刘文轩)据英国金融时报援引知情人士消息,Meta旗下的Facebook与Instagram接下来可能会让用户展示NFT内容,甚至未来可能允许用户通过其平别再用错误的关机方式,伤害你心爱的电脑了大家好!我是小硕看到标题,你也许会说关机还会伤害电脑,不就是点开始电源关机就完事了,其实远没有你想象中的简单,里面有不少的学问。在Windows系统中的电源选项里关机睡眠重启休眠在