Spark(十六)SparkStreaming需求练习
一.环境准备1.pom文件 org.apache.spark spark-core_2.12 3.0.0 org.apache.spark spark-streaming_2.12 3.0.0 org.apache.spark spark-streaming-kafka-0-10_2.12 3.0.0 com.alibaba druid 1.1.10 mysql mysql-connector-java 5.1.27 com.fasterxml.jackson.core jackson-core 2.10.1 net.alchim31.maven scala-maven-plugin 3.2.2 compile org.apache.maven.plugins maven-assembly-plugin 3.0.0 jar-with-dependencies make-assembly package single 2.beanimport java.text.SimpleDateFormat import java.util.Date //数据格式:1597148289569,华北,北京,102,4,2020-08-11,11:12 case class AdsInfo(ts: Long, area: String, city: String, userId: String, adsId: String, var dayString: String = null, // yyyy-MM-dd var hmString: String = null) { // hh:mm val date = new Date(ts) dayString = new SimpleDateFormat("yyyy-MM-dd").format(date) hmString = new SimpleDateFormat("HH:mm").format(date) } 3.工具类JDBCUtilsobject JDBCUtil { // 创建连接池对象 var dataSource:DataSource = init() // 连接池的初始化 def init():DataSource = { val paramMap = new java.util.HashMap[String, String]() paramMap.put("driverClassName", PropertiesUtil.getValue("jdbc.driver.name")) paramMap.put("url", PropertiesUtil.getValue("jdbc.url")) paramMap.put("username", PropertiesUtil.getValue("jdbc.user")) paramMap.put("password", PropertiesUtil.getValue("jdbc.password")) paramMap.put("maxActive", PropertiesUtil.getValue("jdbc.datasource.size")) // 使用Druid连接池对象 DruidDataSourceFactory.createDataSource(paramMap) } // 从连接池中获取连接对象 def getConnection(): Connection = { dataSource.getConnection } def main(args: Array[String]): Unit = { println(getConnection()) } } Properties工具类/** * project.properties文件 */ #jdbc配置 jdbc.datasource.size=10 jdbc.url=jdbc:mysql://hadoop102:3306/steamingproject?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true jdbc.user=root jdbc.password=root jdbc.driver.name=com.mysql.jdbc.Driver # Kafka配置 kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092 kafka.topic=mytest kafka.group.id=cg1 import java.util.ResourceBundle /** * Properties文件工具类 */ object PropertiesUtil { // 绑定配置文件 // ResourceBundle专门用于读取配置文件,所以读取时,不需要增加扩展名 // 国际化 = I18N => Properties val summer: ResourceBundle = ResourceBundle.getBundle("project") def getValue( key : String ): String = { summer.getString(key) } def main(args: Array[String]): Unit = { println(getValue("jdbc.user")) } } 3.创建BaseApp/** * @description: 基础类 * @author: HaoWu * @create: 2020年08月11日 */ abstract class BaseApp { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("myAPP") val ssc: StreamingContext = new StreamingContext(conf, Seconds(3)) //设置消费kafka的参数,可以参考kafka.consumer.ConsumerConfig类中配置说明 val kafkaParams: Map[String, Object] = Map[String, Object]( "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", //zookeeper的host,port "group.id" -> "g3", //消费者组 "enable.auto.commit" -> "true", //是否自动提交 "auto.commit.interval.ms" -> "500", //500ms自动提交offset "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "auto.offset.reset" -> "earliest" //第一次运行,从最初始偏移量开始消费数据 ) //消费kafka的mytest主题生成DStream val ds: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, //订阅主题 ConsumerStrategies.Subscribe[String, String](List("mytest"), kafkaParams)) /** * 将输入流InputDStream[ConsumerRecord[String, String]]=>stream[对象] * @param ds * @return */ def getAllBeans(ds: InputDStream[ConsumerRecord[String, String]]): DStream[AdsInfo] = { val result: DStream[AdsInfo] = ds.map( record => { val arr: Array[String] = record.value().split(",") AdsInfo(arr(0).toLong, arr(1), arr(2), arr(3), arr(4)) } ) result } /** * 处理逻辑 * @param opt */ def runApp(opt: => Unit): Unit = { try { //处理逻辑 opt //执行程序 ssc.start() ssc.awaitTermination() } catch { case e: Exception => e.getMessage } } } 需求一:动态添加黑名单
实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑。
注:黑名单保存到MySQL中。
思路分析
1)读取Kafka数据之后,并对MySQL中存储的黑名单数据做校验;
2)校验通过则对给用户点击广告次数累加一并存入MySQL;
3)在存入MySQL之后对数据做校验,如果单日超过100次则将该用户加入黑名单。
准备工作 1)存放黑名单用户的表 CREATE TABLE black_list (userid CHAR(2) PRIMARY KEY); 2)存放单日各用户点击每个广告的次数 CREATE TABLE user_ad_count ( dt date, userid CHAR (2), adid CHAR (2), count BIGINT, PRIMARY KEY (dt, userid, adid) ); /** * @description: 需求一:动态添加黑名单 * 说明:实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑 * (用户,广告id,时间,次数) * 注:黑名单保存到MySQL中 * @author: HaoWu * @create: 2020年08月12日 */ object ProjectDemo_1 extends BaseApp { def main(args: Array[String]): Unit = { runApp { val asdInfo: DStream[AdsInfo] = getAllBeans(ds) /** * 校验数据是否在黑名单中 */ def isBlackList(userid: String, connection: Connection): Boolean = { var flag: Boolean = true val sql = """ |select * from black_list where userid = ? |""".stripMargin val ps: PreparedStatement = connection.prepareStatement(sql) ps.setString(1, userid) val result: ResultSet = ps.executeQuery() if (result != null) { flag = false } flag } //1.聚合当前批次数据((timestamp,userid,adsid),count) val countDS: DStream[((String, String, String), Long)] = asdInfo.map { //((2020-08-11,102,1),1) case adsInfo: AdsInfo => ((adsInfo.dayString, adsInfo.userId, adsInfo.adsId), 1L) }.reduceByKey(_ + _) countDS.foreachRDD( rdd => rdd.foreachPartition { iter => { //2.向mysql插入数据,准备插入sql和连接 val connection: Connection = JDBCUtil.getConnection() val sql = """ |insert into user_ad_count values(?,?,?,?) |ON DUPLICATE KEY UPDATE COUNT= count + ? |""".stripMargin val ps: PreparedStatement = connection.prepareStatement(sql) //2.过滤出在名单中的数据 iter.filter { case ((_, userid, _), _) => val falg = isBlackList(userid, connection); falg } //往mysql重插入更新数据 .foreach { case ((date, userid, adsid), count) => { ps.setString(1, date) ps.setString(2, userid) ps.setString(3, adsid) ps.setLong(4, count) ps.setLong(5, count) ps.executeUpdate() } } //关闭 ps.close() //3.插入成功之后,查询对应得userid点击广告此时是否 > 100? val sql2 = """ |select userid from user_ad_count where count > 20 |""".stripMargin val ps2: PreparedStatement = connection.prepareStatement(sql2) val resultSet: ResultSet = ps2.executeQuery() //封装查询出的黑名单列表 val block_list = new mutable.HashSet[String]() while (resultSet.next()) { val userid: String = resultSet.getString("userid") block_list + userid } //关闭resulteSet,PreparedStatement resultSet.close() ps2.close() //4.将block_list数据依次插入黑名单表,没有就插入,有就更新 val sql3: String = """ |INSERT INTO black_list VALUES (?) |ON DUPLICATE KEY UPDATE userid=? |""".stripMargin val ps3: PreparedStatement = connection.prepareStatement(sql3) for (userid <- block_list) { ps3.setString(1, userid) ps3.setString(2, userid) ps3.executeUpdate() } ps3.close() connection.close() } } ) } } } 需求二:广告点击量实时统计
描述 :实时统计每天各地区各城市各广告的点击总流量,并将其存入MySQL
步骤 :①updateStateByKey有状态累加计算 ②向mysql执行插入更新操作
Mysql表 CREATE TABLE area_city_ad_count ( dt date, area CHAR(4), city CHAR(4), adid CHAR(2), count BIGINT, PRIMARY KEY (dt,area,city,adid) --联合主键 );
代码实现 import java.sql.{Connection, PreparedStatement} import com.spark.streaming_need.bean.AdsInfo import com.spark.streaming_need.utils.JDBCUtil import org.apache.spark.streaming.dstream.DStream /** * @description: 需求二:广告点击量实时统计 * 描述:实时统计每天各地区各城市各广告的点击总流量,并将其存入MySQL * @author: HaoWu * @create: 2020年08月11日 */ object ProjectDemo_2 extends BaseApp { def main(args: Array[String]): Unit = { runApp { //updateStateByKey算子有状态,需要checkpoint ssc.checkpoint("function2") //1.单个批次内对数据进行按照天维度的聚合统计 //数据格式:1597148289569,华北,北京,102,4 val DsAds: DStream[AdsInfo] = getAllBeans(ds) val kvDS: DStream[((String, String, String, String), Int)] = DsAds.map { case (adsInfo) => { ((adsInfo.dayString, adsInfo.area, adsInfo.city, adsInfo.adsId), 1) } } //2.结合MySQL数据跟当前批次数据更新原有的数据 //计算当前批次和之前的数据累加结果 val result: DStream[((String, String, String, String), Int)] = kvDS.updateStateByKey { case (seq, opt) => { var sum: Int = seq.sum val value = opt.getOrElse(0) sum += value Some(sum) } } //3.将结果写入Mysql result.foreachRDD( rdd => { rdd.foreachPartition { iter => { //每个分区创建一个Connection连接 val connection: Connection = JDBCUtil.getConnection() //准备sql,实现mysql的upsert操作 val sql = """ |insert into area_city_ad_count values (?,?,?,?,?) |on duplicate key update count=? |""".stripMargin //PreparedStatement val ps: PreparedStatement = connection.prepareStatement(sql) //RDD分区中的每个数据都执行写出 iter.foreach { case ((dayString, area, city, adsId), count) => { //填充占位符 ps.setString(1, dayString) ps.setString(2, area) ps.setString(3, city) ps.setString(4, adsId) ps.setInt(5, count) ps.setInt(6, count) //执行写入 ps.executeUpdate() } } //关闭资源 ps.close() connection.close() } } } ) } } } 需求三:最近一小时广告点击量需求说明
求最近1h的广告点击量,要求按照以下结果显示 结果展示: 1:List [15:50->10,15:51->25,15:52->30] 2:List [15:50->10,15:51->25,15:52->30] 3:List [15:50->10,15:51->25,15:52->30]
思路分析
1)开窗确定时间范围;
2)在窗口内将数据转换数据结构为((adid,hm),count);
3)按照广告id进行分组处理,组内按照时分排序。
代码实现 import org.apache.spark.streaming.{Minutes, Seconds} import org.apache.spark.streaming.dstream.DStream /** * @description: 需求三:最近一小时广告点击量,3秒更新一次 * @author: * 结果展示: * 1:List [15:50->10,15:51->25,15:52->30] * 2:List [15:50->10,15:51->25,15:52->30] * 3:List [15:50->10,15:51->25,15:52->30] * @create: 2020年08月12日 */ object ProjectDemo_3 extends BaseApp { def main(args: Array[String]): Unit = { //运行app runApp { val AdsDStream: DStream[((String, String), Int)] = getAllBeans(ds).map { case adsInfo => ((adsInfo.adsId, adsInfo.hmString), 1) } val result: DStream[(String, List[(String, Int)])] = AdsDStream //窗口内聚合 .reduceByKeyAndWindow((a: Int, b: Int) => { a + b }, Minutes(60), Seconds(3)) .map { case ((adsId, ahmString), count) => (adsId, (ahmString, count)) } //按照广告id分组 .groupByKey() //组内按时间升序 .mapValues { case iter => iter.toList.sortBy(_._1) } result.print(10) } } }
结果 ------------------------------------------- Time: 1597234032000 ms ------------------------------------------- (1,List((20:01,12), (20:02,112), (20:03,98), (20:04,95), (20:05,104), (20:06,96), (20:07,13))) (2,List((20:01,24), (20:02,97), (20:03,99), (20:04,103), (20:05,95), (20:06,105), (20:07,6))) (3,List((20:01,30), (20:02,87), (20:03,92), (20:04,108), (20:05,117), (20:06,88), (20:07,22))) (4,List((20:01,15), (20:02,101), (20:03,100), (20:04,99), (20:05,84), (20:06,112), (20:07,22))) (5,List((20:01,19), (20:02,103), (20:03,111), (20:04,95), (20:05,100), (20:06,99), (20:07,10))) ------------------------------------------- Time: 1597234035000 ms ------------------------------------------- (1,List((20:01,12), (20:02,112), (20:03,98), (20:04,95), (20:05,104), (20:06,96), (20:07,20))) (2,List((20:01,24), (20:02,97), (20:03,99), (20:04,103), (20:05,95), (20:06,105), (20:07,13))) (3,List((20:01,30), (20:02,87), (20:03,92), (20:04,108), (20:05,117), (20:06,88), (20:07,26))) (4,List((20:01,15), (20:02,101), (20:03,100), (20:04,99), (20:05,84), (20:06,112), (20:07,26))) (5,List((20:01,19), (20:02,103), (20:03,111), (20:04,95), (20:05,100), (20:06,99), (20:07,15))) ------------------------------------------- Time: 1597234038000 ms ------------------------------------------- (1,List((20:01,12), (20:02,112), (20:03,98), (20:04,95), (20:05,104), (20:06,96), (20:07,23))) (2,List((20:01,24), (20:02,97), (20:03,99), (20:04,103), (20:05,95), (20:06,105), (20:07,16))) (3,List((20:01,30), (20:02,87), (20:03,92), (20:04,108), (20:05,117), (20:06,88), (20:07,34))) (4,List((20:01,15), (20:02,101), (20:03,100), (20:04,99), (20:05,84), (20:06,112), (20:07,30))) (5,List((20:01,19), (20:02,103), (20:03,111), (20:04,95), (20:05,100), (20:06,99), (20:07,20)))