实践数据湖在sqlclient中,以sql方式从kafka读数据到iceberg
前言
之前使用flink1.11.6 iceberg0.11 没写成功,升级flink到1.12.7
升级后版本:
flink-1.12.7-bin-scala_2.12
flink-sql-connector-hive-2.3.6_2.12-1.12.7.jar
kafka_2.12-2.4.1 1. 启动flink sql[root@hadoop101 bin]# sql-client.sh embedded -j /opt/software/iceberg-flink-runtime-0.12.1.jar -j /opt/software/flink-sql-connector-hive-2.3.6_2.12-1.12.7.jar -j /opt/software/flink-sql-connector-kafka_2.12-1.12.7.jar shell SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/module/flink-1.12.7/lib/log4j-slf4j-impl-2.16.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/module/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] No default environment specified. Searching for "/opt/module/flink-1.12.7/conf/sql-client-defaults.yaml"...found. Reading default environment from: file:/opt/module/flink-1.12.7/conf/sql-client-defaults.yaml No session environment specified. Command history file path: /root/.flink-sql-history ▒▓██▓██▒ ▓████▒▒█▓▒▓███▓▒ ▓███▓░░ ▒▒▒▓██▒ ▒ ░██▒ ▒▒▓▓█▓▓▒░ ▒████ ██▒ ░▒▓███▒ ▒█▒█▒ ░▓█ ███ ▓░▒██ ▓█ ▒▒▒▒▒▓██▓░▒░▓▓█ █░ █ ▒▒░ ███▓▓█ ▒█▒▒▒ ████░ ▒▓█▓ ██▒▒▒ ▓███▒ ░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░ ▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒ ███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒ ░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒ ███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░ ██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓ ▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒ ▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒ ▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█ ██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █ ▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓ █▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓ ██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓ ▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒ ██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒ ▓█ ▒█▓ ░ █░ ▒█ █▓ █▓ ██ █░ ▓▓ ▒█▓▓▓▒█░ █▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█ ██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓ ▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██ ░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓ ░▓██▒ ▓░ ▒█▓█ ░░▒▒▒ ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░ ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ | | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | "_ | |/ / ___ | | | | | | | | | |/ _ "_ | __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|_ |_____/ __________| _____|_|_|___|_| |_|__| Welcome! Enter "HELP;" to list all available commands. "QUIT;" to exit. Flink SQL>2. 建kafka表
format=raw的只有在flink1.12后才支持 create table kafka_test_log ( data String ) WITH ( "connector" = "kafka", "topic" = "test_log", "properties.bootstrap.servers" = "hadoop101:9092,hadoop102:9092,hadoop103:9092", "properties.group.id" = "rickKafkaHiveGroup5", "scan.startup.mode" = "earliest-offset", "format" = "raw" ) create table kafka_test_log_csv ( data String ) WITH ( "connector" = "kafka", "topic" = "test_log", "properties.bootstrap.servers" = "hadoop101:9092,hadoop102:9092,hadoop103:9092", "properties.group.id" = "rickKafkaHiveGroup6", "scan.startup.mode" = "earliest-offset", "format" = "csv" ) create table kafka_test_log2 ( data String ) WITH ( "connector" = "kafka", "topic" = "test_log2", "properties.bootstrap.servers" = "hadoop101:9092,hadoop102:9092,hadoop103:9092", "properties.group.id" = "rickKafkaHiveGroup5", "scan.startup.mode" = "earliest-offset", "format" = "raw" ) create table kafka_test_log_csv ( data String ) WITH ( "connector" = "kafka", "topic" = "test_log", "properties.bootstrap.servers" = "hadoop101:9092,hadoop102:9092,hadoop103:9092", "properties.group.id" = "rickKafkaHiveGroup7", "scan.startup.mode" = "earliest-offset", "format" = "csv" )3. 读kafka的数据写入到kafkaFlink SQL> insert into kafka_test_log2 select * from kafka_test_log; [INFO] Submitting SQL update statement to the cluster... [INFO] Table update statement has been successfully submitted to the cluster: Job ID: 777618b911d015a9b80cab316edf3fe8
页面查看
读进来和发出去的条数都是0,
使用sql直接查,发现把数据完整从 kafka_test_log写到 kafka_test_log2;
结论:flink的insert into 语法的mertrix有bug,显示条数有问题 Flink SQL> select * from kafka_test_log2;4.写入到iceberg4.1 创建 hive catalog 从kafka->iceberg创建hive_catalog与表 CREATE CATALOG hive_catalog4 WITH ( "type"="iceberg", "catalog-type"="hive", "uri"="thrift://hadoop101:9083", "clients"="5", "property-version"="1", "warehouse"="hdfs:///user/hive/warehouse/hive_catalog4" ); 在hive_catalog下创建数据库 use catalog hive_catalog4; create table `hive_catalog4`.`default`.`ib_hive_test_log`( data String ); 在hive datalog下建表,写入iceberg insert into `hive_catalog4`.`default`.`ib_hive_test_log` select * from default_catalog.default_database.kafka_test_log_csv
4.2 创建 hadoop catalog ,从kafka->iceberg CREATE CATALOG hadoop_catalog4 WITH ( "type"="iceberg", "catalog-type"="hadoop", "warehouse"="hdfs://ns/user/hive/warehouse/iceberg_hadoop_catalog4", "property-version"="1" ); use catalog hadoop_catalog4; create database iceberg_db; create table `hadoop_catalog4`.`iceberg_db`.`ib_hadoop_test_log`( data String ); insert into hadoop_catalog4.iceberg_db.ib_hadoop_test_log select data from default_catalog.default_database.kafka_test_log ;
到hdfs查看
生产者生产看看,发现iceberg的数据目录还是0,iceberg的输出没有 [root@hadoop101 ~]# kafka-console-producer.sh --topic test_log --broker-list hadoop101:9092,hadoop102:9092总结
经过测试,读写kafka都没有问题 有想过是否消费者组的问题,更换消费者组,还是没输出… hive catalog 与 hadoop catalog都尝试过,没用
是不是iceberg有问题?