揭秘数据湖长文详解Hudi从内核到实战(一)
Hudi入门与构建
Hudi介绍
Hudi将带来流式处理大数据,提供新数据集,同时比传统批处理效率高一个数据量级。
Hudi主要特性: 快速upsert,可插入索引; 以原子方式操作数据并具有回滚功能; 写入器之间的快照隔离; savepoint用户数据恢复的保存点; 管理文件大小,使用统计数据布局; 数据行的异步压缩和柱状数据; 时间轴数据跟踪血统。
Hudi快速构建 安装环境准备 Hadoop集群 Hive Spark2.4.5(2.x) Maven安装
把apache-maven-3.6.1-bin.tar.gz上传到linux的/opt/software目录下。
解压apache-maven-3.6.1-bin.tar.gz到/opt/module/目录下面。 [atguigu@hadoop102 software]$ tar -zxvf apache-maven-3.6.1-bin.tar.gz -C /opt/module/
修改apache-maven-3.6.1的名称为maven。 [atguigu@hadoop102 module]$ mv apache-maven-3.6.1/ maven
添加环境变量到/etc/profile中。 [atguigu@hadoop102 module]$ sudo vim /etc/profile #MAVEN_HOME export MAVEN_HOME=/opt/module/maven export PATH=$PATH:$MAVEN_HOME/bin
测试安装结果。 [atguigu@hadoop102 module]$ source /etc/profile [atguigu@hadoop102 module]$ mvn -v
修改setting.xml,指定为阿里云。 [atguigu@hadoop102 maven]$ cd conf [atguigu@hadoop102 maven]$ vim settings.xml nexus-aliyun central Nexusaliyun http://maven.aliyun.com/nexus/content/groups/public
Git安装 [atguigu@hadoop102 software]$ sudo yum install git [atguigu@hadoop102 software]$ git --version构建Hudi [atguigu@hadoop102 software]$ cd /opt/module/ [atguigu@hadoop102 module]$ git clone https://github.com/apache/hudi.git && cd hudi [atguigu@hadoop102 hudi]$ vim pom.xml nexus-aliyun nexus-aliyun http://maven.aliyun.com/nexus/content/groups/public/ true false [atguigu@hadoop102hudi]$ mvn clean package -DskipTests -DskipITs
通过Spark-shell快速开始 Spark-shell启动
spark-shell启动,需要指定spark-avro模块,因为默认环境里没有,spark-avro模块版本好需要和spark版本对应,这里都是2.4.5。 [root@hadoop102hudi]# spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.5 --conf"spark.serializer=org.apache.spark.serializer.KryoSerializer" --jars /opt/module/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.6.0-SNAPSHOT.jar设置表名
设置表名,基本路径和数据生成器。 scala>import org.apache.hudi.QuickstartUtils._ import org.apache.hudi.QuickstartUtils._ scala>import scala.collection.JavaConversions._ import scala.collection.JavaConversions._ scala>import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.SaveMode._ scala>import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceReadOptions._ scala>import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.DataSourceWriteOptions._ scala>import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.config.HoodieWriteConfig._ scala>val tableName = "hudi_trips_cow" tableName:String = hudi_trips_cow scala>val basePath = "file:///tmp/hudi_trips_cow" basePath:String = file:///tmp/hudi_trips_cow scala>val dataGen = new DataGenerator dataGen:org.apache.hudi.QuickstartUtils.DataGenerator =org.apache.hudi.QuickstartUtils$DataGenerator@5cdd5ff9插入数据
新增数据,生成一些数据,将其加载到DataFrame中,然后将DataFrame写入Hudi表。 scala>val inserts = convertToStringList(dataGen.generateInserts(10)) scala>val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) scala>df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY,"ts"). option(RECORDKEY_FIELD_OPT_KEY,"uuid"). option(PARTITIONPATH_FIELD_OPT_KEY,"partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)
Mode(overwrite)将覆盖重新创建表(如果已存在)。可以检查/tmp/hudi_trps_cow路径下是否有数据生成。 [root@hadoop102~]# cd /tmp/hudi_trips_cow/ [root@hadoop102hudi_trips_cow]# ls americas asia查询数据 scala>val tripsSnapshotDF = spark. | read. | format("hudi"). | load(basePath + "/*/*/*/*") scala>tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") scala>spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare >20.0").show() +------------------+-------------------+-------------------+---+ | fare| begin_lon| begin_lat| ts| +------------------+-------------------+-------------------+---+ |64.27696295884016| 0.4923479652912024| 0.5731835407930634|0.0| |33.92216483948643| 0.9694586417848392| 0.1856488085068272|0.0| |27.79478688582596| 0.6273212202489661|0.11488393157088261|0.0| |93.56018115236618|0.14285051259466197|0.21624150367601136|0.0| | 43.4923811219014| 0.8779402295427752|0.6100070562136587|0.0| |66.62084366450246|0.03844104444445928| 0.0750588760043035|0.0| |34.158284716382845|0.46157858450465483|0.4726905879569653|0.0| |41.06290929046368| 0.8192868687714224| 0.651058505660742|0.0| +------------------+-------------------+-------------------+---+ scala>spark.sql("select _hoodie_commit_time, _hoodie_record_key,_hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show() +-------------------+--------------------+----------------------+---------+----------+------------------+ |_hoodie_commit_time| _hoodie_record_key|_hoodie_partition_path| rider| driver| fare| +-------------------+--------------------+----------------------+---------+----------+------------------+ | 20200701105144|6007a624-d942-4e0...| americas/united_s...|rider-213|driver-213|64.27696295884016| | 20200701105144|db7c6361-3f05-48d...| americas/united_s...|rider-213|driver-213|33.92216483948643| | 20200701105144|dfd0e7d9-f10c-468...| americas/united_s...|rider-213|driver-213|19.179139106643607| | 20200701105144|e36365c8-5b3a-415...| americas/united_s...|rider-213|driver-213|27.79478688582596| | 20200701105144|fb92c00e-dea2-48e...| americas/united_s...|rider-213|driver-213|93.56018115236618| | 20200701105144|98be3080-a058-47d...| americas/brazil/s...|rider-213|driver-213| 43.4923811219014| | 20200701105144|3dd6ef72-4196-469...| americas/brazil/s...|rider-213|driver-213|66.62084366450246| | 20200701105144|20f9463f-1c14-4e6...| americas/brazil/s...|rider-213|driver-213|34.158284716382845| | 20200701105144|1585ad3a-11c9-43c...| asia/india/chennai|rider-213|driver-213|17.851135255091155| | 20200701105144|d40daa90-cf1a-4d1...| asia/india/chennai|rider-213|driver-213|41.06290929046368| +-------------------+--------------------+----------------------+---------+----------+------------------+
由于测试数据分区是 区域/国家/城市,所以load(basePath "/*/*/*/*")。 修改数据
类似于插入新数据,使用数据生成器生成新数据对历史数据进行更新。将数据加载到DataFrame中并将DataFrame写入Hudi表中。 scala>val updates = convertToStringList(dataGen.generateUpdates(10)) scala>val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) scala>df.write.format("hudi"). | options(getQuickstartWriteConfigs). | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | option(TABLE_NAME, tableName). | mode(Append). | save(basePath)增量查询
Hudi还提供了获取自给定提交时间戳以来以更改记录流的功能。这可以通过使用Hudi的增量查询并提供开始流进行更改的开始时间来实现。 scala>spark. | read. | format("hudi"). | load(basePath + "/*/*/*/*"). | createOrReplaceTempView("hudi_trips_snapshot") scala>val commits = spark.sql("select distinct(_hoodie_commit_time) ascommitTime from hudi_trips_snapshotorder by commitTime").map(k => k.getString(0)).take(50) scala>val beginTime = commits(commits.length - 2) beginTime:String = 20200701105144 scala>val tripsIncrementalDF = spark.read.format("hudi"). | option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). | option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). | load(basePath) scala>tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") scala>spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, tsfrom hudi_trips_incremental where fare> 20.0").show() +-------------------+------------------+--------------------+-------------------+---+ |_hoodie_commit_time| fare| begin_lon| begin_lat| ts| +-------------------+------------------+--------------------+-------------------+---+ | 20200701110546|49.527694252432056| 0.5142184937933181| 0.7340133901254792|0.0| | 20200701110546| 90.9053809533154| 0.19949323322922063|0.18294079059016366|0.0| | 20200701110546| 98.3428192817987| 0.3349917833248327| 0.4777395067707303|0.0| | 20200701110546| 90.25710109008239| 0.4006983139989222|0.08528650347654165|0.0| | 20200701110546| 63.72504913279929| 0.888493603696927| 0.6570857443423376|0.0| | 20200701110546|29.47661370147079|0.010872312870502165| 0.1593867607188556|0.0| +-------------------+------------------+--------------------+-------------------+---+
这将提供在beginTime提交后的数据,并且fare>20的数据。 时间点查询
根据特定时间查询,可以将endTime指向特定时间,beginTime指向000(表示最早提交时间)。 scala>val beginTime = "000" beginTime:String = 000 scala>val endTime = commits(commits.length - 2) endTime:String = 20200701105144 scala>val tripsPointInTimeDF = spark.read.format("hudi"). | option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). | option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). | option(END_INSTANTTIME_OPT_KEY, endTime). | load(basePath) scala>tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") scala>spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, tsfrom hudi_trips_point_in_time where fare > 20.0").show() +-------------------+------------------+-------------------+-------------------+---+ |_hoodie_commit_time| fare| begin_lon| begin_lat| ts| +-------------------+------------------+-------------------+-------------------+---+ | 20200701105144| 64.27696295884016|0.4923479652912024| 0.5731835407930634|0.0| | 20200701105144| 33.92216483948643|0.9694586417848392| 0.1856488085068272|0.0| | 20200701105144| 27.79478688582596|0.6273212202489661|0.11488393157088261|0.0| | 20200701105144|93.56018115236618|0.14285051259466197|0.21624150367601136|0.0| | 20200701105144| 43.4923811219014| 0.8779402295427752|0.6100070562136587|0.0| | 20200701105144|66.62084366450246|0.03844104444445928| 0.0750588760043035|0.0| | 20200701105144|34.158284716382845|0.46157858450465483| 0.4726905879569653|0.0| | 20200701105144| 41.06290929046368|0.8192868687714224| 0.651058505660742|0.0| +-------------------+------------------+-------------------+-------------------+---+删除数据 scala>spark.sql("select uuid, partitionPath fromhudi_trips_snapshot").count() res12:Long = 10 scala>val ds = spark.sql("select uuid, partitionPath fromhudi_trips_snapshot").limit(2) scala>val deletes = dataGen.generateDeletes(ds.collectAsList()) scala>val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2)); scala>df.write.format("hudi"). | options(getQuickstartWriteConfigs). | option(OPERATION_OPT_KEY,"delete"). | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | option(TABLE_NAME, tableName). | mode(Append). | save(basePath) scala>val roAfterDeleteViewDF = spark. | read. | format("hudi"). | load(basePath + "/*/*/*/*") scala>roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") scala>spark.sql("select uuid, partitionPath fromhudi_trips_snapshot").count() res15:Long = 8
只有append模式,才支持删除功能。
大数据技术生态体系
大数据的切片机制有哪些
大数据之Kafka集群部署
大数据JUC面试题
大数据学习之部署Hadoop
有人说,买手机可以不买苹果,可是买平板必须买ipad,这句话对吗?为什么?安卓平板的体验确实远远不如苹果iPad,就我的经历来说,苹果iPad主要有以下优点1经久耐用,合理使用的话用上10年问题不大。比如我家里的iPad2还在服役当中,这款平板是在201
手机号用20年以上,移动联通电信都有哪些特殊待遇?三公司杀熟你还不知道吗?越老客户越对你苛刻,优惠活动都不包括老客户生日那天祝我生日快乐恭喜您成为老用户,本公司为了激励老用户发光发热,特别制定以下针对手机号用20年以上的用户的优惠
为什么现在小偷少了,骗子却多了?信息社会变化大,新生事物频涌现。手机银行已普遍,家里很少存现钱。城乡布满监控眼,偷盗容易被破案。犯罪环境已改变,小偷减少是必然。骗子多了小偷少,这是犯罪新特点。手机绑定银行卡,支付
公司股东信息好查么?哪里可以查?你想查公司股东的什么信息?如果仅是股东的姓名或名称,或者股东的出资比例出资额,在国家企业信用信息公示系统可以查询,网址httpwww。gsxt。gov。cnindex。html。当
有实用的软件推荐吗?一些好的软件可以很好的帮助我们提升工作效率,节约时间。今天给大家推荐八款Windows软件,每一个都是精品。一uTools一个可以帮你提高生产率的工具uTools是一个极简插件化跨
经常戴耳机会造成耳聋吗?有可能的!我们在地铁上公交上随处可见带着耳机的人,很多人甚至通宵戴着耳机听音乐睡觉,这些都是在作死的节奏。之前央视有一项调查,3000个学生里有1000人听力都受损了,再这样发现下
个人家庭的固定电话还需要保留吗?现在是网络时代,固定电话的作用确实不大了,但我自己家里的固定电话还一直保留着,也有想过去注销,后来一想到有时候孩子不在家,我是不留手机在家里的,那万一有急事就没办法联系了还有一点,
三星和国产手机的差距在哪里?文小伊评科技三星在国内的溃败总体可以归为三个方面原因一由于成本以及内部管理等诸多方面的原因,三星的中低端产品线在国产手机面前没有任何竞争力。原因二在系统功能优化和人机交互方面一直是
高高在上的华为机皇,8256G卖17499,多少人买得起?这话说的好像你能买得起小米一样,小米你都买不起你非要看这一万多的机器那没办法!我3000多的机器有啥买不起的?法拉利买得起的人也少,为什么它不跟五菱卖一样的价?那不就更多人跑去买么
戴上助听器后声音大了,像戴了听诊器,请问什么原因?您好,戴上助听器后声音大了,像戴了听诊器,请问什么原因?这种情况很有可能是因为堵耳所造成的,如果您低频听力损失程度不严重的话,是可能会有这样的困扰,尤其当您配戴的是耳内式定制助听器
做自媒体,没有电脑,一直用手机上传是不是没有收益?做自媒体两个月时间涨粉52,000,变现收入超过3万元,全部都是用手机上转的,今天我就把怎么在自媒体赚钱的方法免费教给你。后台不少粉丝咨询我,为什么刘三点你可以涨粉这么快?我发布的