范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

揭秘数据湖长文详解Hudi从内核到实战(一)

  Hudi入门与构建
  Hudi介绍
  Hudi将带来流式处理大数据,提供新数据集,同时比传统批处理效率高一个数据量级。
  Hudi主要特性:  快速upsert,可插入索引;  以原子方式操作数据并具有回滚功能;  写入器之间的快照隔离;  savepoint用户数据恢复的保存点;  管理文件大小,使用统计数据布局;  数据行的异步压缩和柱状数据;  时间轴数据跟踪血统。
  Hudi快速构建  安装环境准备  Hadoop集群  Hive  Spark2.4.5(2.x)  Maven安装
  把apache-maven-3.6.1-bin.tar.gz上传到linux的/opt/software目录下。
  解压apache-maven-3.6.1-bin.tar.gz到/opt/module/目录下面。  [atguigu@hadoop102 software]$ tar -zxvf apache-maven-3.6.1-bin.tar.gz -C /opt/module/
  修改apache-maven-3.6.1的名称为maven。  [atguigu@hadoop102 module]$ mv apache-maven-3.6.1/ maven
  添加环境变量到/etc/profile中。  [atguigu@hadoop102 module]$ sudo vim /etc/profile #MAVEN_HOME export MAVEN_HOME=/opt/module/maven export PATH=$PATH:$MAVEN_HOME/bin
  测试安装结果。  [atguigu@hadoop102 module]$ source /etc/profile [atguigu@hadoop102 module]$ mvn -v
  修改setting.xml,指定为阿里云。  [atguigu@hadoop102 maven]$ cd conf [atguigu@hadoop102 maven]$ vim settings.xml           nexus-aliyun        central         Nexusaliyun        http://maven.aliyun.com/nexus/content/groups/public 
  Git安装  [atguigu@hadoop102 software]$ sudo yum install git [atguigu@hadoop102 software]$ git --version构建Hudi  [atguigu@hadoop102 software]$ cd /opt/module/ [atguigu@hadoop102 module]$ git clone https://github.com/apache/hudi.git && cd hudi [atguigu@hadoop102 hudi]$ vim pom.xml                 nexus-aliyun         nexus-aliyun        http://maven.aliyun.com/nexus/content/groups/public/                      true                              false               [atguigu@hadoop102hudi]$ mvn clean package -DskipTests -DskipITs
  通过Spark-shell快速开始  Spark-shell启动
  spark-shell启动,需要指定spark-avro模块,因为默认环境里没有,spark-avro模块版本好需要和spark版本对应,这里都是2.4.5。  [root@hadoop102hudi]# spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.5  --conf"spark.serializer=org.apache.spark.serializer.KryoSerializer"  --jars /opt/module/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.6.0-SNAPSHOT.jar设置表名
  设置表名,基本路径和数据生成器。  scala>import org.apache.hudi.QuickstartUtils._ import org.apache.hudi.QuickstartUtils._   scala>import scala.collection.JavaConversions._ import scala.collection.JavaConversions._   scala>import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.SaveMode._   scala>import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceReadOptions._   scala>import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.DataSourceWriteOptions._   scala>import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.config.HoodieWriteConfig._   scala>val tableName = "hudi_trips_cow" tableName:String = hudi_trips_cow   scala>val basePath = "file:///tmp/hudi_trips_cow" basePath:String = file:///tmp/hudi_trips_cow   scala>val dataGen = new DataGenerator dataGen:org.apache.hudi.QuickstartUtils.DataGenerator =org.apache.hudi.QuickstartUtils$DataGenerator@5cdd5ff9插入数据
  新增数据,生成一些数据,将其加载到DataFrame中,然后将DataFrame写入Hudi表。  scala>val inserts = convertToStringList(dataGen.generateInserts(10)) scala>val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) scala>df.write.format("hudi").         options(getQuickstartWriteConfigs).         option(PRECOMBINE_FIELD_OPT_KEY,"ts").         option(RECORDKEY_FIELD_OPT_KEY,"uuid").         option(PARTITIONPATH_FIELD_OPT_KEY,"partitionpath").         option(TABLE_NAME, tableName).         mode(Overwrite).         save(basePath)
  Mode(overwrite)将覆盖重新创建表(如果已存在)。可以检查/tmp/hudi_trps_cow路径下是否有数据生成。  [root@hadoop102~]# cd /tmp/hudi_trips_cow/ [root@hadoop102hudi_trips_cow]# ls americas  asia查询数据  scala>val tripsSnapshotDF = spark.      |  read.     |   format("hudi").      |  load(basePath + "/*/*/*/*") scala>tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") scala>spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare >20.0").show() +------------------+-------------------+-------------------+---+ |              fare|          begin_lon|          begin_lat| ts| +------------------+-------------------+-------------------+---+ |64.27696295884016| 0.4923479652912024| 0.5731835407930634|0.0| |33.92216483948643| 0.9694586417848392| 0.1856488085068272|0.0| |27.79478688582596| 0.6273212202489661|0.11488393157088261|0.0| |93.56018115236618|0.14285051259466197|0.21624150367601136|0.0| |  43.4923811219014| 0.8779402295427752|0.6100070562136587|0.0| |66.62084366450246|0.03844104444445928| 0.0750588760043035|0.0| |34.158284716382845|0.46157858450465483|0.4726905879569653|0.0| |41.06290929046368| 0.8192868687714224| 0.651058505660742|0.0| +------------------+-------------------+-------------------+---+ scala>spark.sql("select _hoodie_commit_time, _hoodie_record_key,_hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show() +-------------------+--------------------+----------------------+---------+----------+------------------+ |_hoodie_commit_time| _hoodie_record_key|_hoodie_partition_path|    rider|   driver|              fare| +-------------------+--------------------+----------------------+---------+----------+------------------+ |     20200701105144|6007a624-d942-4e0...|  americas/united_s...|rider-213|driver-213|64.27696295884016| |     20200701105144|db7c6361-3f05-48d...|  americas/united_s...|rider-213|driver-213|33.92216483948643| |     20200701105144|dfd0e7d9-f10c-468...| americas/united_s...|rider-213|driver-213|19.179139106643607| |     20200701105144|e36365c8-5b3a-415...|  americas/united_s...|rider-213|driver-213|27.79478688582596| |     20200701105144|fb92c00e-dea2-48e...|  americas/united_s...|rider-213|driver-213|93.56018115236618| |     20200701105144|98be3080-a058-47d...| americas/brazil/s...|rider-213|driver-213|  43.4923811219014| |     20200701105144|3dd6ef72-4196-469...|  americas/brazil/s...|rider-213|driver-213|66.62084366450246| |     20200701105144|20f9463f-1c14-4e6...| americas/brazil/s...|rider-213|driver-213|34.158284716382845| |     20200701105144|1585ad3a-11c9-43c...|    asia/india/chennai|rider-213|driver-213|17.851135255091155| |     20200701105144|d40daa90-cf1a-4d1...|    asia/india/chennai|rider-213|driver-213|41.06290929046368| +-------------------+--------------------+----------------------+---------+----------+------------------+
  由于测试数据分区是 区域/国家/城市,所以load(basePath "/*/*/*/*")。  修改数据
  类似于插入新数据,使用数据生成器生成新数据对历史数据进行更新。将数据加载到DataFrame中并将DataFrame写入Hudi表中。  scala>val updates = convertToStringList(dataGen.generateUpdates(10)) scala>val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) scala>df.write.format("hudi").      |  options(getQuickstartWriteConfigs).      |  option(PRECOMBINE_FIELD_OPT_KEY, "ts").      |  option(RECORDKEY_FIELD_OPT_KEY, "uuid").      |  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").      |  option(TABLE_NAME, tableName).      |  mode(Append).      |  save(basePath)增量查询
  Hudi还提供了获取自给定提交时间戳以来以更改记录流的功能。这可以通过使用Hudi的增量查询并提供开始流进行更改的开始时间来实现。  scala>spark.      |  read.      |  format("hudi").      |  load(basePath + "/*/*/*/*").      |  createOrReplaceTempView("hudi_trips_snapshot") scala>val commits = spark.sql("select distinct(_hoodie_commit_time) ascommitTime from  hudi_trips_snapshotorder by commitTime").map(k => k.getString(0)).take(50) scala>val beginTime = commits(commits.length - 2) beginTime:String = 20200701105144 scala>val tripsIncrementalDF = spark.read.format("hudi").      |  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).      |  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).      |  load(basePath) scala>tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") scala>spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, tsfrom  hudi_trips_incremental where fare> 20.0").show() +-------------------+------------------+--------------------+-------------------+---+ |_hoodie_commit_time|              fare|           begin_lon|          begin_lat| ts| +-------------------+------------------+--------------------+-------------------+---+ |     20200701110546|49.527694252432056|  0.5142184937933181| 0.7340133901254792|0.0| |     20200701110546|  90.9053809533154| 0.19949323322922063|0.18294079059016366|0.0| |     20200701110546|  98.3428192817987|  0.3349917833248327| 0.4777395067707303|0.0| |     20200701110546| 90.25710109008239|  0.4006983139989222|0.08528650347654165|0.0| |     20200701110546| 63.72504913279929|   0.888493603696927| 0.6570857443423376|0.0| |     20200701110546|29.47661370147079|0.010872312870502165| 0.1593867607188556|0.0| +-------------------+------------------+--------------------+-------------------+---+
  这将提供在beginTime提交后的数据,并且fare>20的数据。  时间点查询
  根据特定时间查询,可以将endTime指向特定时间,beginTime指向000(表示最早提交时间)。  scala>val beginTime = "000" beginTime:String = 000   scala>val endTime = commits(commits.length - 2) endTime:String = 20200701105144 scala>val tripsPointInTimeDF = spark.read.format("hudi").      |  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).      |  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).      |  option(END_INSTANTTIME_OPT_KEY, endTime).      |  load(basePath) scala>tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") scala>spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, tsfrom hudi_trips_point_in_time where fare > 20.0").show() +-------------------+------------------+-------------------+-------------------+---+ |_hoodie_commit_time|              fare|          begin_lon|          begin_lat| ts| +-------------------+------------------+-------------------+-------------------+---+ |     20200701105144| 64.27696295884016|0.4923479652912024| 0.5731835407930634|0.0| |     20200701105144| 33.92216483948643|0.9694586417848392| 0.1856488085068272|0.0| |     20200701105144| 27.79478688582596|0.6273212202489661|0.11488393157088261|0.0| |     20200701105144|93.56018115236618|0.14285051259466197|0.21624150367601136|0.0| |     20200701105144|  43.4923811219014| 0.8779402295427752|0.6100070562136587|0.0| |     20200701105144|66.62084366450246|0.03844104444445928| 0.0750588760043035|0.0| |    20200701105144|34.158284716382845|0.46157858450465483| 0.4726905879569653|0.0| |     20200701105144| 41.06290929046368|0.8192868687714224| 0.651058505660742|0.0| +-------------------+------------------+-------------------+-------------------+---+删除数据  scala>spark.sql("select uuid, partitionPath fromhudi_trips_snapshot").count() res12:Long = 10 scala>val ds = spark.sql("select uuid, partitionPath fromhudi_trips_snapshot").limit(2) scala>val deletes = dataGen.generateDeletes(ds.collectAsList()) scala>val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2)); scala>df.write.format("hudi").      |  options(getQuickstartWriteConfigs).      |  option(OPERATION_OPT_KEY,"delete").      |  option(PRECOMBINE_FIELD_OPT_KEY, "ts").      |  option(RECORDKEY_FIELD_OPT_KEY, "uuid").      |  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").      |  option(TABLE_NAME, tableName).      |  mode(Append).      |  save(basePath) scala>val roAfterDeleteViewDF = spark.      |  read.      |  format("hudi").      |  load(basePath + "/*/*/*/*") scala>roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") scala>spark.sql("select uuid, partitionPath fromhudi_trips_snapshot").count() res15:Long = 8
  只有append模式,才支持删除功能。
  大数据技术生态体系
  大数据的切片机制有哪些
  大数据之Kafka集群部署
  大数据JUC面试题
  大数据学习之部署Hadoop

领航数字时代网安风向标,ISC2021专家大咖齐聚一堂7月27日,由中国互联网协会中国网络空间安全协会全国工商联大数据运维(网络安全)委员会和360互联网安全中心主办,中国保密协会中国计算机学会协办,中国计算机学会计算机安全专业委员会小米华为在盒子界配置强?2篇文章了解电视盒子性能天梯(下)前文介绍了amlogic的产品情况,没看的贴上传送门小米华为在盒子界配置强?2篇文章了解电视盒子性能天梯(上)下面我们就来说说全志,因为除了晶晨还一个劲的推出新的soc,全志也没闲集时尚ID萌趣表情于一身智能家居控制一键畅连华为AI音箱2e智能音箱产品相信大家并不陌生,最近接触到华为的一个新品,它就是华为AI音箱2e,集时尚ID萌趣表情智慧体验于一身可AI语音实现智能家居控制一键畅连,功能十分丰富与强大,而且也有着不千元以下听个响?华为sound加量不加价帝瓦雷音质你值得拥有如果说之前的华为SoundX是大杯那么无疑与华为Mate40同台亮相的华为Sound算的上是中杯,之前的华为SoundX看过我的视频的或者亲身体验过的肯定会有比较深的印象,就目前来华为移动路由差旅出行的好助手插电即享高速网络连接相信有不少人,都有这样一种体验,在差旅的途中入住酒店后,从便捷酒店到五星级酒店网络情况都不太如意,快捷酒店就不用说了,基本上能连上不断就算不错了更不用想有多快的网速,而五星级酒店连华为首款头戴耳机FreeBudsStudio发布2020年10月30日,2020华为年度旗舰新品发布盛典召开,除了备受瞩目的华为Mate40系列手机之外,华为旗下首款头戴耳机FreeBudsStudio也正式与国内用户见面。作为这部短片,说出了华为Mate40系列如期而至背后的真相2020年10月30日,华为面向国内市场正式发布Mate40系列手机,发布会临近尾声阶段,现场播放了一部名为在一起就可以的短片。从一开始被推到最前线的困惑,到最后在一起,就可以带来一款3W毫安正经的移动电源ORICA3WmAH移动电源一般人使用手机的频率都不是太低,有调查显示重度使用手机的用户占绝大多数,以目前最新发布的手机华为Mate40Pro为例4400mAh的手机,虽然手机很给力然而总不可能一直满电状态,跃见非凡!华为Mate40系列国内发布,售价4999元起2020年10月30日,上海,备受瞩目的2020华为年度旗舰新品发布盛典在东方体育中心召开,新一代高端旗舰手机华为Mate40系列在中国正式发布,华为Mate40华为Mate40P痛失布洛芬,集采断供的华北制药深陷泥潭老牌制药企业华北制药(600812。SH),正陷入集采和研发疲弱的双重漩涡。雪上加霜的是公司不仅失去了山东省去年到手的集采份额,还失去了未来一年集采的申报资格。2021年8月,经与华为Mate40系列看见黑夜中的京剧暗光三岔口惊艳上演12月25日,华为联合天猫平台携华为Mate40系列及智慧屏S系列等全场景产品开启天猫华为盛典,面向消费者全面打造全场景智慧生活。为了让消费者更直观深入的体验全场景,华为Mate4
有人说,买手机可以不买苹果,可是买平板必须买ipad,这句话对吗?为什么?安卓平板的体验确实远远不如苹果iPad,就我的经历来说,苹果iPad主要有以下优点1经久耐用,合理使用的话用上10年问题不大。比如我家里的iPad2还在服役当中,这款平板是在201手机号用20年以上,移动联通电信都有哪些特殊待遇?三公司杀熟你还不知道吗?越老客户越对你苛刻,优惠活动都不包括老客户生日那天祝我生日快乐恭喜您成为老用户,本公司为了激励老用户发光发热,特别制定以下针对手机号用20年以上的用户的优惠为什么现在小偷少了,骗子却多了?信息社会变化大,新生事物频涌现。手机银行已普遍,家里很少存现钱。城乡布满监控眼,偷盗容易被破案。犯罪环境已改变,小偷减少是必然。骗子多了小偷少,这是犯罪新特点。手机绑定银行卡,支付公司股东信息好查么?哪里可以查?你想查公司股东的什么信息?如果仅是股东的姓名或名称,或者股东的出资比例出资额,在国家企业信用信息公示系统可以查询,网址httpwww。gsxt。gov。cnindex。html。当有实用的软件推荐吗?一些好的软件可以很好的帮助我们提升工作效率,节约时间。今天给大家推荐八款Windows软件,每一个都是精品。一uTools一个可以帮你提高生产率的工具uTools是一个极简插件化跨经常戴耳机会造成耳聋吗?有可能的!我们在地铁上公交上随处可见带着耳机的人,很多人甚至通宵戴着耳机听音乐睡觉,这些都是在作死的节奏。之前央视有一项调查,3000个学生里有1000人听力都受损了,再这样发现下个人家庭的固定电话还需要保留吗?现在是网络时代,固定电话的作用确实不大了,但我自己家里的固定电话还一直保留着,也有想过去注销,后来一想到有时候孩子不在家,我是不留手机在家里的,那万一有急事就没办法联系了还有一点,三星和国产手机的差距在哪里?文小伊评科技三星在国内的溃败总体可以归为三个方面原因一由于成本以及内部管理等诸多方面的原因,三星的中低端产品线在国产手机面前没有任何竞争力。原因二在系统功能优化和人机交互方面一直是高高在上的华为机皇,8256G卖17499,多少人买得起?这话说的好像你能买得起小米一样,小米你都买不起你非要看这一万多的机器那没办法!我3000多的机器有啥买不起的?法拉利买得起的人也少,为什么它不跟五菱卖一样的价?那不就更多人跑去买么戴上助听器后声音大了,像戴了听诊器,请问什么原因?您好,戴上助听器后声音大了,像戴了听诊器,请问什么原因?这种情况很有可能是因为堵耳所造成的,如果您低频听力损失程度不严重的话,是可能会有这样的困扰,尤其当您配戴的是耳内式定制助听器做自媒体,没有电脑,一直用手机上传是不是没有收益?做自媒体两个月时间涨粉52,000,变现收入超过3万元,全部都是用手机上转的,今天我就把怎么在自媒体赚钱的方法免费教给你。后台不少粉丝咨询我,为什么刘三点你可以涨粉这么快?我发布的