范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

(3)sparkstreaming从kafka接入实时数据流最实现数据可视化

  (1)sparkstreaming从kafka接入实时数据流最终实现数据可视化展示,我们先看下整体方案架构:
  (2)方案说明:
  1)我们通过kafka与各个业务系统的数据对接,将各系统中的数据实时接到kafka;
  2)通过sparkstreaming接入kafka数据流,定义时间窗口和计算窗口大小,业务计算逻辑处理;
  3)将结果数据写入到mysql;
  4)通过可视化平台接入mysql数据库,这里使用的是NBI大数据可视化构建平台;
  5)在平台上通过拖拽式构建各种数据应用,数据展示;
  (3)代码演示:
  定义一个kafka生产者,模拟数据源 package com.producers;  import com.alibaba.fastjson.JSONObject; import com.pojo.WaterSensor; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata;  import java.util.Properties; import java.util.Random;  /**  * Created by lj on 2022-07-18.  */ public class Kafaka_Producer {     public final static String bootstrapServers = "127.0.0.1:9092";      public static void main(String[] args) {         Properties props = new Properties();         //设置Kafka服务器地址         props.put("bootstrap.servers", bootstrapServers);         //设置数据key的序列化处理类         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");         //设置数据value的序列化处理类         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");         KafkaProducer producer = new KafkaProducer<>(props);          try {             int i = 0;             Random r=new Random();               String[] lang = {"flink","spark","hadoop","hive","hbase","impala","presto","superset","nbi"};              while(true) {                 Thread.sleep(2000);                 WaterSensor waterSensor = new WaterSensor(lang[r.nextInt(lang.length)]+"_kafka",i,i);                 i++;                  String msg = JSONObject.toJSONString(waterSensor);                 System.out.println(msg);                 RecordMetadata recordMetadata = producer.send(new ProducerRecord<>("kafka_data_waterSensor", null, null,  msg)).get(); //                System.out.println("recordMetadata: {"+ recordMetadata +"}");             }          } catch (Exception e) {             System.out.println(e.getMessage());         }     } }
  根据业务需要,定义各种消息对象 package com.pojo;  import java.io.Serializable; import java.util.Date;  /**  * Created by lj on 2022-07-13.  */ public class WaterSensor implements Serializable {     public String id;     public long ts;     public int vc;      public WaterSensor(){      }      public WaterSensor(String id,long ts,int vc){         this.id = id;         this.ts = ts;         this.vc = vc;     }      public int getVc() {         return vc;     }      public void setVc(int vc) {         this.vc = vc;     }      public String getId() {         return id;     }      public void setId(String id) {         this.id = id;     }      public long getTs() {         return ts;     }      public void setTs(long ts) {         this.ts = ts;     } }
  sparkstreaming数据流计算 package com.examples;  import com.alibaba.fastjson.JSONObject; import com.pojo.WaterSensor; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction2; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies;  import java.util.*;  /**  * Created by lj on 2022-07-18.  */ public class SparkSql_Kafka {     private static String appName = "spark.streaming.demo";     private static String master = "local[*]";     private static String topics = "kafka_data_waterSensor";     private static String brokers = "127.0.0.1:9092";      public static void main(String[] args) {         //初始化sparkConf         SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);          //获得JavaStreamingContext         JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(3));          /**          * 设置日志的级别: 避免日志重复          */         ssc.sparkContext().setLogLevel("ERROR");          Collection topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));         //kafka相关参数,必要!缺了会报错         Map kafkaParams = new HashMap<>();         kafkaParams.put("metadata.broker.list", brokers) ;         kafkaParams.put("bootstrap.servers", brokers);         kafkaParams.put("group.id", "group1");         kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");         kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");         kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");                  //通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定         JavaInputDStream> lines = KafkaUtils.createDirectStream(                 ssc,                 LocationStrategies.PreferConsistent(),                 ConsumerStrategies.Subscribe(topicsSet, kafkaParams)         );          JavaDStream mapDStream = lines.map(new Function, WaterSensor>() {             @Override             public WaterSensor call(ConsumerRecord s) throws Exception {                 WaterSensor waterSensor = JSONObject.parseObject(s.value().toString(),WaterSensor.class);                 return waterSensor;             }         }).window(Durations.minutes(9), Durations.minutes(6));      //指定窗口大小 和 滑动频率 必须是批处理时间的整数倍;          mapDStream.foreachRDD(new VoidFunction2, Time>() {             @Override             public void call(JavaRDD waterSensorJavaRDD, Time time) throws Exception {                 SparkSession spark = JavaSparkSessionSingleton.getInstance(waterSensorJavaRDD.context().getConf());                  Dataset dataFrame = spark.createDataFrame(waterSensorJavaRDD, WaterSensor.class);                 // 创建临时表                 dataFrame.createOrReplaceTempView("log");                 Dataset result = spark.sql("select * from log");                 System.out.println("========= " + time + "=========");                 //输出前20条数据                 result.show();                                  //数据写入mysql                 writeDataToMysql(result);             }         });          //开始作业         ssc.start();         try {             ssc.awaitTermination();         } catch (Exception e) {             e.printStackTrace();         } finally {             ssc.close();         }     } }
  NBI大数据可视化构建平台对接mysql,构建数据应用:
  NBI可视化

抗糖就能抵抗衰老吗?这才是真相要想皮肤好,抗糖少不了?这是近年来美容界流行的一种说法。糖被视为导致皮肤衰老的杀手,主食水果等含糖量稍高的食物,都被爱美人士拉入饮食黑名单。为了迎合这种爱美的需求,市面上出现了很多图腾物语(长诗)海男一窸窣声穿过的花园面积一个女人早起,穿上了有洗衣液香味和太阳晾晒过的,带有草本植物痕迹的衣裙痕迹,犹如花纹,细看从柔软的花片中曲线弯如波浪,是的,波浪就在门口近在一个女人早起的你要学会扛住烦恼扛住烦恼常言道,世界上没有不烦恼的人。可是世间事却是千奇百怪,每个人都会有烦恼。人到中年,你有了工作和家庭,你的烦恼也越来越多。你的心被焦虑和烦恼占据满了,开始了你所逃避和抗拒的人唯有南山与君眼,相逢不改旧时青1人生在世,最好的状态就是就是冷眼暖心醉态三者的调和。2传媒是社会的良知,人类的道义。3一生中总会让你碰到几个怦然心动,让你觉得即使用尽一生都无法遗忘。很多年后,琐碎生活与时间疲倦无效的护肤以下这6条你可能每天都在做为什么勤快护肤,用各种大牌护肤品,皮肤状态还是很差呢?一年四季中最干燥的季节来啦,不涂防晒,无效护肤,都会让皮肤出现各种问题给大家总结了容易伤害皮肤的6大护肤误区,快看看你是不是中腕上机械的极致浪漫国潮腕表艾戈勒上手体验前言谈起机械手表,大家可能会立马联想到劳力士欧米茄浪琴,更奢侈一点的是百达翡丽江诗丹顿,诚然,这些品牌的机械手表无论是技术品质还是文化品牌都无可挑剔。但与之相对的价格也自然高高在上西安中央空调厂家剖析中央空调和传统空调的不同之处西安中央空调厂家剖析中央空调和传统空调的不同之处中央空调主要是指多联机,俗称的一个外机带动几个内机的空调,这种空调才是真正的重要空调。中央空调有以下的优缺点。中央空调的占用空间小(厉害的中年人,话不多,也不太合群很多事情,说起来头头是道,做起来却不是那么回事,甚至会把你之前的认知,全部推翻。常规的看法,中年人应该很合群,符合朋友多了路好走的套路。当自己混出名堂之后,就会有炫耀的资本,也很出愿来生再次相遇(二)郝有花(图片来自网络)掌心握着谁的万水千山从头到尾写满了离殇从尾到头挂满了牵念一撇一捺,镌刻在眉间流年里的暖已随寂寞凝成一个支撑点汇聚在心头,暖了一个秋秋风扫落叶,一地红每片叶子,福永州传统村落丨宁远琵琶岗村却似梦里回家山编者按作为国家历史文化名城,永州拥有数目繁多各具特色的古村落,2012年以来,永州市共有85个古村落被国家住建部命名为中国传统村落。它们有群山环抱青山绿水的颜值,也有古色典雅淳朴善突发!英国首相特拉斯宣布辞职,继任者中这两位谁的胜算大?上任45天之后,英国首相特拉斯宣布辞职。当地时间20日下午130分,特拉斯走出唐宁街10号的大门,发表了她的辞职演说。而上个月初,她刚在同一个位置发表了就职演讲。特拉斯在演讲中表示
美媒评选的NBA小前总统山,詹姆斯伯德杜兰特?这题你怎么答?今天大家带来的是美媒评选的NBA小前总统山,分别有詹姆斯伯德杜兰特,还有一位留给大家五选一,分别是J博士皮蓬伦纳德哈夫利切克贝勒。下面为大家带来这几位球员NBA生涯数据情况NBA小21,终于赢了!西甲豪门爆发创13年纪录登榜首,全队疯狂庆祝欧冠首轮比赛,马竞坐镇主场迎战波尔图,西甲豪门以21的比分击败对手,全取三分。此役,马竞占据场面优势,但创造出来的机会不如对手。在常规时间,两队互交白卷,补时阶段,连续打进3球,格穆里尼奥真的江郎才尽了嘛罗马又输了,输给了保加利亚的球队。做为穆里尼奥的铁杆球迷,我想说几句。罗马的定位罗马的实力在意甲只能算中游偏上,也就欧联杯与欧冠区之间。后卫库姆布拉斯莫林曼奇尼希利克斯皮纳佐拉卡斯近两年詹眉同时出战胜率高达74!加上威少三巨头胜率堪堪五成?耐克为勒布朗倾情打造的俄州最大建筑,面积达到65000平方米的个人中心,再次展现了詹姆斯无与伦比的商业价值。新赛季即将来到38岁的勒布朗,但是他依旧展现出了极为巅峰的竞技状态。上赛2笔签约官宣,科尔放下豪言,奥尼尔声援纳什,NBA宣布新决定NBA正处于休赛期,2K23球队能力值正式发布,具体排名如下绿军以总评分83高居联盟第一位,篮网勇士雄鹿76人太阳和骑士6支球队以82并列第二位,接下来分别是快船灰熊热火老鹰公牛森魔力鸟还有魔力吗?欧联杯卢多戈雷VS罗马来自保加利亚超级联赛的卢多戈雷虽说在本土联赛中算是一方霸主,但与欧洲的其他球队相遇时,却难有抗衡之力,球队在之前的欧联杯赛场中也已经连续18场未能赢球。客队罗马在意甲也算是名声赫赫断崖式下滑!曼联欧战开门黑C罗4射0正他为何无法像梅西一样转型9月9日凌晨,202223赛季欧联杯小组赛首战拉开战幕,红魔曼联主场01不敌皇家社会遭遇欧洲开门黑。前曼城中场大师大卫席尔瓦禁区内射门造成利桑德罗马丁内斯的手球犯规,主裁判判罚了点04,又输了!伪豪门倒下,58控球摆设,穆帅无能狂怒,难争四罗马是意甲拥有95年历史的老牌强队,曾获得9次意大利杯冠军,不过由于将近20年来从未染指过联赛冠军,使得红狼一直被认为是伪豪门。穆里尼奥到来后,他们非常渴望拿到重量级的冠军,从而来生命是一个圆,我们是半个圈头条创作挑战赛我们总是留恋回忆里的那个自己,于是止步不前,或许太害怕一无所获,所以宁愿在现实里沉沦,我们只是习惯性忽略回忆里的不悦,而千万倍的放大美好,试着反问一下自己,你念念不忘中子星或黑洞的自转速度具有减弱的趋势中子星或黑洞的自转速度具有减弱的趋势白矮星的自转角速度要比恒星高两个数量级,其在两极形成的磁场强度也会高两个数量级。恒星在两极形成的磁场是比较弱的,提高两个数量级的白矮星在两极形成为什么老MC玩家不喜欢网易我的世界?如果一个游戏花了很大精力去更新,却大不如前,你还会继续支持它吗?如果一个公司花了很大财力去购买版权,却做的不如盗版,你还会继续支持它吗?新玩家可能会说网易这不做的挺好的吗?但是请你