范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

Spark(十六)SparkStreaming需求练习

  一.环境准备1.pom文件              org.apache.spark         spark-core_2.12         3.0.0                    org.apache.spark         spark-streaming_2.12         3.0.0                    org.apache.spark         spark-streaming-kafka-0-10_2.12         3.0.0                         com.alibaba         druid         1.1.10                    mysql         mysql-connector-java         5.1.27       com.fasterxml.jackson.core     jackson-core     2.10.1                                                            net.alchim31.maven                 scala-maven-plugin                 3.2.2                                                                                                                     compile                                                                                                          org.apache.maven.plugins                 maven-assembly-plugin                 3.0.0                                                               jar-with-dependencies                                                                                                     make-assembly                         package                                                      single                                                                                           2.beanimport java.text.SimpleDateFormat import java.util.Date //数据格式:1597148289569,华北,北京,102,4,2020-08-11,11:12 case class AdsInfo(ts: Long,         area: String,         city: String,         userId: String,         adsId: String,         var dayString: String = null, // yyyy-MM-dd         var hmString: String = null) { // hh:mm          val date = new Date(ts)         dayString = new SimpleDateFormat("yyyy-MM-dd").format(date)         hmString = new SimpleDateFormat("HH:mm").format(date) }  3.工具类JDBCUtilsobject JDBCUtil {      // 创建连接池对象     var dataSource:DataSource = init()      // 连接池的初始化     def init():DataSource = {          val paramMap = new java.util.HashMap[String, String]()         paramMap.put("driverClassName", PropertiesUtil.getValue("jdbc.driver.name"))         paramMap.put("url", PropertiesUtil.getValue("jdbc.url"))         paramMap.put("username", PropertiesUtil.getValue("jdbc.user"))         paramMap.put("password", PropertiesUtil.getValue("jdbc.password"))         paramMap.put("maxActive", PropertiesUtil.getValue("jdbc.datasource.size"))          // 使用Druid连接池对象         DruidDataSourceFactory.createDataSource(paramMap)     }      // 从连接池中获取连接对象     def getConnection(): Connection = {         dataSource.getConnection     }      def main(args: Array[String]): Unit = {          println(getConnection())      } }  Properties工具类/**  * project.properties文件  */ #jdbc配置 jdbc.datasource.size=10 jdbc.url=jdbc:mysql://hadoop102:3306/steamingproject?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true jdbc.user=root jdbc.password=root jdbc.driver.name=com.mysql.jdbc.Driver  # Kafka配置 kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092 kafka.topic=mytest kafka.group.id=cg1  import java.util.ResourceBundle /**  * Properties文件工具类  */ object PropertiesUtil {      // 绑定配置文件     // ResourceBundle专门用于读取配置文件,所以读取时,不需要增加扩展名     // 国际化 = I18N => Properties     val summer: ResourceBundle = ResourceBundle.getBundle("project")      def getValue( key : String ): String = {         summer.getString(key)     }      def main(args: Array[String]): Unit = {          println(getValue("jdbc.user"))      } }  3.创建BaseApp/**  * @description: 基础类  * @author: HaoWu  * @create: 2020年08月11日  */ abstract class BaseApp {   val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("myAPP")   val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))   //设置消费kafka的参数,可以参考kafka.consumer.ConsumerConfig类中配置说明   val kafkaParams: Map[String, Object] = Map[String, Object](     "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", //zookeeper的host,port     "group.id" -> "g3", //消费者组     "enable.auto.commit" -> "true", //是否自动提交     "auto.commit.interval.ms" -> "500", //500ms自动提交offset     "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",     "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",     "auto.offset.reset" -> "earliest" //第一次运行,从最初始偏移量开始消费数据   )    //消费kafka的mytest主题生成DStream   val ds: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](     ssc,     LocationStrategies.PreferConsistent,     //订阅主题     ConsumerStrategies.Subscribe[String, String](List("mytest"),       kafkaParams))     /**    *  将输入流InputDStream[ConsumerRecord[String, String]]=>stream[对象]    * @param ds    * @return    */   def getAllBeans(ds: InputDStream[ConsumerRecord[String, String]]): DStream[AdsInfo] = {     val result: DStream[AdsInfo] = ds.map(       record => {         val arr: Array[String] = record.value().split(",")         AdsInfo(arr(0).toLong, arr(1), arr(2), arr(3), arr(4))       }     )     result   }    /**    * 处理逻辑    * @param opt    */   def runApp(opt: => Unit): Unit = {     try {       //处理逻辑       opt       //执行程序       ssc.start()       ssc.awaitTermination()     } catch {       case e: Exception => e.getMessage     }   }  }  需求一:动态添加黑名单
  实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑。
  注:黑名单保存到MySQL中。
  思路分析
  1)读取Kafka数据之后,并对MySQL中存储的黑名单数据做校验;
  2)校验通过则对给用户点击广告次数累加一并存入MySQL;
  3)在存入MySQL之后对数据做校验,如果单日超过100次则将该用户加入黑名单。
  准备工作 1)存放黑名单用户的表 CREATE TABLE black_list (userid CHAR(2) PRIMARY KEY); 2)存放单日各用户点击每个广告的次数 CREATE TABLE user_ad_count ( 	dt date, 	userid CHAR (2), 	adid CHAR (2), 	count BIGINT, 	PRIMARY KEY (dt, userid, adid) ); /**  * @description: 需求一:动态添加黑名单  *               说明:实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑  *               (用户,广告id,时间,次数)  *               注:黑名单保存到MySQL中  * @author: HaoWu  * @create: 2020年08月12日  */ object ProjectDemo_1 extends BaseApp {   def main(args: Array[String]): Unit = {     runApp {       val asdInfo: DStream[AdsInfo] = getAllBeans(ds)        /**        * 校验数据是否在黑名单中        */       def isBlackList(userid: String, connection: Connection): Boolean = {         var flag: Boolean = true         val sql =           """             |select * from black_list where userid = ?             |""".stripMargin         val ps: PreparedStatement = connection.prepareStatement(sql)         ps.setString(1, userid)         val result: ResultSet = ps.executeQuery()         if (result != null) {           flag = false         }         flag       }        //1.聚合当前批次数据((timestamp,userid,adsid),count)       val countDS: DStream[((String, String, String), Long)] = asdInfo.map {         //((2020-08-11,102,1),1)         case adsInfo: AdsInfo => ((adsInfo.dayString, adsInfo.userId, adsInfo.adsId), 1L)       }.reduceByKey(_ + _)         countDS.foreachRDD(         rdd => rdd.foreachPartition {           iter => {             //2.向mysql插入数据,准备插入sql和连接             val connection: Connection = JDBCUtil.getConnection()             val sql =               """                 |insert into user_ad_count values(?,?,?,?)                 |ON DUPLICATE KEY UPDATE COUNT= count + ?                 |""".stripMargin             val ps: PreparedStatement = connection.prepareStatement(sql)             //2.过滤出在名单中的数据             iter.filter {               case ((_, userid, _), _) => val falg = isBlackList(userid, connection); falg             }               //往mysql重插入更新数据               .foreach {                 case ((date, userid, adsid), count) => {                   ps.setString(1, date)                   ps.setString(2, userid)                   ps.setString(3, adsid)                   ps.setLong(4, count)                   ps.setLong(5, count)                   ps.executeUpdate()                 }               }             //关闭             ps.close()              //3.插入成功之后,查询对应得userid点击广告此时是否 > 100?             val sql2 =               """                 |select userid from user_ad_count where count > 20                 |""".stripMargin             val ps2: PreparedStatement = connection.prepareStatement(sql2)             val resultSet: ResultSet = ps2.executeQuery()             //封装查询出的黑名单列表             val block_list = new mutable.HashSet[String]()             while (resultSet.next()) {               val userid: String = resultSet.getString("userid")               block_list + userid             }             //关闭resulteSet,PreparedStatement             resultSet.close()             ps2.close()              //4.将block_list数据依次插入黑名单表,没有就插入,有就更新             val sql3: String =               """                 |INSERT INTO black_list VALUES (?)                 |ON DUPLICATE KEY UPDATE userid=?                 |""".stripMargin             val ps3: PreparedStatement = connection.prepareStatement(sql3)             for (userid <- block_list) {               ps3.setString(1, userid)               ps3.setString(2, userid)               ps3.executeUpdate()             }             ps3.close()             connection.close()           }         }       )     }    } } 需求二:广告点击量实时统计
  描述 :实时统计每天各地区各城市各广告的点击总流量,并将其存入MySQL
  步骤 :①updateStateByKey有状态累加计算 ②向mysql执行插入更新操作
  Mysql表  CREATE TABLE area_city_ad_count ( 	dt date, 	area CHAR(4), 	city CHAR(4), 	adid CHAR(2),   count BIGINT, 	PRIMARY KEY (dt,area,city,adid)  --联合主键 );
  代码实现 import java.sql.{Connection, PreparedStatement} import com.spark.streaming_need.bean.AdsInfo import com.spark.streaming_need.utils.JDBCUtil import org.apache.spark.streaming.dstream.DStream  /**  * @description: 需求二:广告点击量实时统计  *               描述:实时统计每天各地区各城市各广告的点击总流量,并将其存入MySQL  * @author: HaoWu  * @create: 2020年08月11日  */ object ProjectDemo_2 extends BaseApp {   def main(args: Array[String]): Unit = {     runApp {       //updateStateByKey算子有状态,需要checkpoint       ssc.checkpoint("function2")        //1.单个批次内对数据进行按照天维度的聚合统计       //数据格式:1597148289569,华北,北京,102,4       val DsAds: DStream[AdsInfo] = getAllBeans(ds)       val kvDS: DStream[((String, String, String, String), Int)] = DsAds.map {         case (adsInfo) => {           ((adsInfo.dayString, adsInfo.area, adsInfo.city, adsInfo.adsId), 1)         }       }        //2.结合MySQL数据跟当前批次数据更新原有的数据       //计算当前批次和之前的数据累加结果       val result: DStream[((String, String, String, String), Int)] = kvDS.updateStateByKey {         case (seq, opt) => {           var sum: Int = seq.sum           val value = opt.getOrElse(0)           sum += value           Some(sum)         }       }       //3.将结果写入Mysql       result.foreachRDD(         rdd => {           rdd.foreachPartition {             iter => {               //每个分区创建一个Connection连接               val connection: Connection = JDBCUtil.getConnection()               //准备sql,实现mysql的upsert操作               val sql =                 """                   |insert into area_city_ad_count values (?,?,?,?,?)                   |on duplicate key update count=?                   |""".stripMargin               //PreparedStatement               val ps: PreparedStatement = connection.prepareStatement(sql)               //RDD分区中的每个数据都执行写出               iter.foreach {                 case ((dayString, area, city, adsId), count) => {                   //填充占位符                   ps.setString(1, dayString)                   ps.setString(2, area)                   ps.setString(3, city)                   ps.setString(4, adsId)                   ps.setInt(5, count)                   ps.setInt(6, count)                   //执行写入                   ps.executeUpdate()                 }               }               //关闭资源               ps.close()               connection.close()             }           }         }       )     }   } } 需求三:最近一小时广告点击量需求说明
  求最近1h的广告点击量,要求按照以下结果显示 结果展示: 1:List [15:50->10,15:51->25,15:52->30] 2:List [15:50->10,15:51->25,15:52->30] 3:List [15:50->10,15:51->25,15:52->30]
  思路分析
  1)开窗确定时间范围;
  2)在窗口内将数据转换数据结构为((adid,hm),count);
  3)按照广告id进行分组处理,组内按照时分排序。
  代码实现 import org.apache.spark.streaming.{Minutes, Seconds} import org.apache.spark.streaming.dstream.DStream  /**  * @description: 需求三:最近一小时广告点击量,3秒更新一次  * @author:  * 结果展示:  * 1:List [15:50->10,15:51->25,15:52->30]  * 2:List [15:50->10,15:51->25,15:52->30]  * 3:List [15:50->10,15:51->25,15:52->30]  * @create: 2020年08月12日  */ object ProjectDemo_3 extends BaseApp {   def main(args: Array[String]): Unit = {     //运行app     runApp {       val AdsDStream: DStream[((String, String), Int)] = getAllBeans(ds).map {         case adsInfo => ((adsInfo.adsId, adsInfo.hmString), 1)       }       val result: DStream[(String, List[(String, Int)])] = AdsDStream         //窗口内聚合         .reduceByKeyAndWindow((a: Int, b: Int) => {           a + b         }, Minutes(60), Seconds(3))         .map { case ((adsId, ahmString), count) => (adsId, (ahmString, count)) }         //按照广告id分组         .groupByKey()         //组内按时间升序         .mapValues {           case iter => iter.toList.sortBy(_._1)         }       result.print(10)     }   } }
  结果 ------------------------------------------- Time: 1597234032000 ms ------------------------------------------- (1,List((20:01,12), (20:02,112), (20:03,98), (20:04,95), (20:05,104), (20:06,96), (20:07,13))) (2,List((20:01,24), (20:02,97), (20:03,99), (20:04,103), (20:05,95), (20:06,105), (20:07,6))) (3,List((20:01,30), (20:02,87), (20:03,92), (20:04,108), (20:05,117), (20:06,88), (20:07,22))) (4,List((20:01,15), (20:02,101), (20:03,100), (20:04,99), (20:05,84), (20:06,112), (20:07,22))) (5,List((20:01,19), (20:02,103), (20:03,111), (20:04,95), (20:05,100), (20:06,99), (20:07,10)))  ------------------------------------------- Time: 1597234035000 ms ------------------------------------------- (1,List((20:01,12), (20:02,112), (20:03,98), (20:04,95), (20:05,104), (20:06,96), (20:07,20))) (2,List((20:01,24), (20:02,97), (20:03,99), (20:04,103), (20:05,95), (20:06,105), (20:07,13))) (3,List((20:01,30), (20:02,87), (20:03,92), (20:04,108), (20:05,117), (20:06,88), (20:07,26))) (4,List((20:01,15), (20:02,101), (20:03,100), (20:04,99), (20:05,84), (20:06,112), (20:07,26))) (5,List((20:01,19), (20:02,103), (20:03,111), (20:04,95), (20:05,100), (20:06,99), (20:07,15)))  ------------------------------------------- Time: 1597234038000 ms ------------------------------------------- (1,List((20:01,12), (20:02,112), (20:03,98), (20:04,95), (20:05,104), (20:06,96), (20:07,23))) (2,List((20:01,24), (20:02,97), (20:03,99), (20:04,103), (20:05,95), (20:06,105), (20:07,16))) (3,List((20:01,30), (20:02,87), (20:03,92), (20:04,108), (20:05,117), (20:06,88), (20:07,34))) (4,List((20:01,15), (20:02,101), (20:03,100), (20:04,99), (20:05,84), (20:06,112), (20:07,30))) (5,List((20:01,19), (20:02,103), (20:03,111), (20:04,95), (20:05,100), (20:06,99), (20:07,20)))

烟雨江湖雷峰塔支线任务攻略雷峰塔支线任务通关流程烟雨江湖雷峰塔支线任务已经开启,要完成雷峰塔支线任务挑战可以来小编这里了解任务的详情,小编会把雷峰塔支线任务通关方法分享在下面,还没有完成任务的都可以来小编这里了解过关的技巧,希望大侠传奇打造专属你的大侠世界传奇这一款游戏陪伴了很多80后90后,还记得在十几岁的时候,当时我第一次接触传奇,就深深被里面的内容震撼了,看着他们组队砍怪,35个行会攻沙,我感觉我体内的热血都沸腾了,后来各种游短篇小说公公与儿媳妇作者威子刘总刚在公司办公楼二楼的公共卫生间蹲下,就听到门外传来了自己的助理大周,和销售部经理老金两个人嘻嘻哈哈的笑声。随着那笑声越来越近,老金低声地说这件事儿,可不能让任何人知道,小小说战友1950年,时年22岁的他参加了抗美援朝战争。临走时,他刚新婚三个月。三年间,他在炮火纷飞的战场,心里深藏对家乡的思念,义无反顾和战友并肩作战。三年后,那场战争结束了,他回到了家乡小说精灵传说,第二章,精灵村我们在沙漠中行走,沿途看到一些章鱼树,树叶和刺,呈正方形交叉排列,还有百年草仙人掌,含丰富的鐡钙可以食用,看到沙漠绿州,到湖裡储存一些水,太阳下山后非常冷,又饿又冷,在沙漠发现磅蟹抚顺ampampquot黑枭ampampquot巨头ampampquot蚁力神教父ampampquot王奉友的江湖路文南柯解史编辑南柯解史空手套白狼,他骗取120万百姓的辛苦钱,只为保证自己的资金链不断,200多亿被他挥霍一空,养情妇搞投资,最终结局大快人心。2002年,赵本山在央视黄金档做了这真武侠江湖万里挑一!燕云十六声震撼爆料,玩家快上线随着网游兴起的潮流,武侠游戏也迎来了井喷式的发展,市面上也有众多武侠大作。但是随着千禧年的过去,武侠题材游戏也是完美印证了盛极必衰的自然规律。在这个节点,燕云十六声的到来,以出色的恒烁股份最新公告拟开展跨境双向人民币资金池业务恒烁股份公告,公司于2022年9月16日召开第一届董事会第十三次会议,审议通过了关于开展跨境双向人民币资金池业务的议案,拟以全资子公司香港恒烁半导体有限公司作为主办企业,在杭州银行游卡公司才是套娃高手,一个三国杀游戏,成功衍生出5个相关游戏三国杀移动版大家好,我是戎马定半生。众所周知,三国杀移动版最初是由卡牌游戏三国杀演变的线上版。作为游戏的设计者和厂商,游卡公司一直在积极推广三国杀,并且在不断升级三国杀。可是这升级增加运动套件,换装新轮毂,一汽大众新款迈腾下线,或10月上市日前,我们从相关渠道得到了一组一汽大众新款迈腾下线的实拍图片,后续应该会运往全国各地经销商处。和现款车型相比,新车针对外观进行了一定的调整,加入有运动外观套件。据悉,新款迈腾或将于新车将于9月27日亮相,全新福特SuperDuty预告图发布文懂车帝原创许博全新福特SuperDuty预告图懂车帝原创产品近日,福特汽车发布了旗下皮卡车型全新SuperDuty的预告图,同时宣布新车将于当地时间9月27日正式亮相。作为该系列
华尔街最大多头之一Kolanovic称股市最近的上涨是熊市陷阱MarkoKolanovic摩根大通策略师MarkoKolanovic周一重申,股票投资者应当趁上周由美联储引发的股市涨势而卖出,并称美国经济的去通货膨胀进程可能只是暂时的。Kol木头姐姐活过来了!丨全球金融观沪深两市在兔年的表现不错,第一天大涨之后又调整了一天,随后就按照自己的节奏在运作。我还是作这一观点今年中国股票的领先市场依然在我国的香港市场。所以当香港市场出现大涨,A股也必定会出十年间,中国规模以上外商投资工业企业研发投入增长91。5中国出实招欢迎外资研发中心1月31日,回归家乡投身自贸港春节返乡人才对接会在海南省海口市海南迎宾馆举行,教育管理财务医疗医药等行业的不少外资企业参加招聘。苏弼坤摄(人民视觉)各地对外资企业精准施策,保产业链低碳节能供热筑绿色中国梦ampampamp一步电脉冲技术水处理领域首创低碳节能供热创新技术筑绿色中国梦本期三创直播室的嘉宾侯建军和他的团队,20多年来所实施的新能源供热项目,每年可减少碳排放100多万吨。低碳节能供热的系统集成创新技术,此项获得专利1中国女篮队员及教练荣获体育运动奖章!男篮需要知耻后勇了?2月2日,据中国篮协消息,国家体育总局印发文件,表彰2022年度取得优异成绩的运动员和教练员,共有12名五人篮球运动员4名三人篮球运动员1名五人篮球教练员1名三人篮球教练员获2023大暴力后场最强双能卫!男篮后卫线无解,乔帅的牌面太强了202223赛季CBA联赛第三阶段的比赛还没有正式开打,关于中国男篮接下来的世预赛成为了外界最大的看点。而本次世预赛亚洲赛区第六窗口期的比赛,新帅乔尔杰维奇更多可能是为接下来今夏的波兰30多个国家反对国际奥委会的计划除了乌克兰对2024年巴黎奥运会发出了抵制威胁外,好像其他国家也正在形成反对国际奥委会计划让俄罗斯和白俄罗斯的运动员重返国际赛场的风气。波兰体育部长卡米尔博尔特尼丘克(KamilB人比切尔西还多?诺丁汉森林本赛季已签30名球员,共花费1。66亿镑直播吧2月4日讯在签下自由球员安德烈阿尤后,诺丁汉森林本赛季签约的新球员达到了30名,共花费1。6585亿英镑。夏窗吉布斯怀特2650万阿沃尼伊1850万内科威廉姆斯1800万丹尼大连万达9位功勋球员小聚,正副队长赵琳斯基到位!看看还有谁?大连万达9位功勋球员小聚,正副队长赵琳斯基到位!看看还有谁?1994年,中国足球正式开始职业化,大连万达足球队以全连班阵容(晚池注当时队内所有球员都出生于足球城大连)力压广州太阳神徐正源谈中国球员的习惯强调一定要吃早饭,不要卧草拖时间直播吧2月4日讯在接受韩国媒体sedaily采访时,蓉城主帅徐正源介绍了他对于中国足球的看法以及在成都蓉城执教的感受。所有人都说中国足球已经没落,但最近两年亲眼目睹中国足球变化的成全球首位INS破千万粉丝女足球员!她是维拉中场的前女友谁是瑞士最受关注的运动员?过去,这个头衔属于网球巨星费德勒,但现在,24岁的阿斯顿维拉女足球员艾丽莎莱曼成功取而代之。阳光沙滩比基尼,与莱曼的金发和腰身堪称绝配,这位瑞士女足国脚,