第二章kafka
引入依赖 org.apache.kafka kafka_2.13 3.1.0 发送消息异步发送/** * 异步发送消息 * @param msg */ @RequestMapping("/send") public void send(String msg){ log.info("msg[{}]", msg); //设置生产者属性 Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master11.bbx.com:9092"); //设置序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //创建生产者 KafkaProducer producer = new KafkaProducer(properties); //创建发送的内容 ProducerRecord producerRecord = new ProducerRecord("bbx-first-topic",msg+"--"+System.currentTimeMillis()); //发送数据 producer.send(producerRecord); //关闭资源 producer.close(); }回调发送/** * 发送消息 并且回调 * @param msg */ @RequestMapping("/sendAndCall") public void sendAndCall(String msg){ log.info("msg[{}]", msg); //设置生产者属性 Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master11.bbx.com:9092"); //设置序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //创建生产者 KafkaProducer producer = new KafkaProducer(properties); //创建发送的内容 ProducerRecord producerRecord = new ProducerRecord("bbx-first-topic",msg+"--"+System.currentTimeMillis()); //发送数据 producer.send(producerRecord,new sendMsgCallback()); //关闭资源 producer.close(); } /** * 回调类 */ @Slf4j class sendMsgCallback implements Callback{ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception == null){ log.info("topic[{}],partition[{}]", metadata.topic(), metadata.partition()); }else{ log.error("发送消息出现错误[{}]", exception); } } }同步发送 /** * 同步发送消息 * @param msg */ @RequestMapping("/sendSync") public void sendSync(String msg) throws ExecutionException, InterruptedException { log.info("msg[{}]", msg); //设置生产者属性 Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master11.bbx.com:9092"); //设置序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //创建生产者 KafkaProducer producer = new KafkaProducer(properties); //创建发送的内容 ProducerRecord producerRecord = new ProducerRecord("bbx-first-topic",msg+"--"+System.currentTimeMillis()); //发送数据----调用get() 获取结果 RecordMetadata o = (RecordMetadata)producer.send(producerRecord).get(); log.info("sendSync result topic[{}]", o.topic()); //关闭资源 producer.close(); }分区分区策略/** *DefaultPartitioner * If a partition is specified in the record, use it * If no partition is specified but a key is present choose a partition based on a hash of the key * If no partition or key is present choose the sticky partition that changes when the batch is full. * kafka 默认分区相关策略 * 1、指定分区,使用指定分区 * 2、不指定分区,指定key ,按照key 的hashcode 对分区数 取模 * 例: key的hashcode 为11 ,集群的分区设定为2 , 11%2=1 ,测数据会发送到1号分区 * 3、既不指定分区,有不指定key,kafka 采用 Sticky Partition 会随机选择一个分区, * 并尽可能的使用该分区,直到改分区的 batch 满或者以完成(时间到), * kafka 再随机选择一个与上次不同的分区 * */ /** * 指定分区 异步发送消息 * @param msg */ @RequestMapping("/has") public void has(String msg){ log.info("msg[{}]", msg); //设置生产者属性 Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master11.bbx.com:9092"); //设置序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //创建生产者 KafkaProducer producer = new KafkaProducer(properties); //创建发送的内容 --指定分区 ProducerRecord producerRecord = new ProducerRecord("bbx-first-topic",0,"",msg+"--"+System.currentTimeMillis()); //发送数据 producer.send(producerRecord); //关闭资源 producer.close(); }自定义分区 /** * 自定义分区 异步发送消息 * @param msg */ @RequestMapping("/myPartitioner") public void myPartitioner(String msg){ log.info("msg[{}]", msg); //设置生产者属性 Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master11.bbx.com:9092"); //设置自定义分区 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, new MyPartitioner().getClass().getName()); //设置序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //创建生产者 KafkaProducer producer = new KafkaProducer(properties); //创建发送的内容 ProducerRecord producerRecord = new ProducerRecord("bbx-first-topic",msg+"--"+System.currentTimeMillis()); //发送数据 producer.send(producerRecord); //关闭资源 producer.close(); } package com.bbxdemo.kafka.config; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; /** * 自定义分区策略 */ public class MyPartitioner implements Partitioner { /** * * @param topic * @param key * @param keyBytes * @param value * @param valueBytes * @param cluster * @return */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { String valueStr = value.toString(); if(valueStr.contains("bbx")){ return 0; }else{ return 1; } } @Override public void close() { } @Override public void configure(Map configs) { } }