干货BitSailConnector开发详解系列一Source
更多技术交流、求职机会,欢迎关注 字节跳动数据平台微信公众号,回复【1】进入官方交流群
BitSail 是字节跳动自研的数据集成产品,支持多种异构数据源间的数据同步,并提供离线、实时、全量、增量场景下全域数据集成解决方案。本系列聚焦 BitSail Connector 开发模块,为大家带来详细全面的开发方法与场景示例,本篇将主要介绍 Source 接口部分。
持续关注,BitSail Connector 开发详解将分为四篇呈现。BitSail Connector 开发详解系列一:SourceBitSail Connector 开发详解系列二:SourceSplitCoordinatorBitSail Connector 开发详解系列三:SourceReaderBitSail Connector 开发详解系列四:Sink、WriterSource Connector
本文将主要介绍 Source 接口部分:Source: 参与数据读取组件的生命周期管理,主要负责和框架的交互,构架作业,不参与作业真正的执行。SourceSplit: 数据读取分片,大数据处理框架的核心目的就是将大规模的数据拆分成为多个合理的 Split 并行处理。State:作业状态快照,当开启 checkpoint 之后,会保存当前执行状态。Source
数据读取组件的生命周期管理,主要负责和框架的交互,构架作业,它不参与作业真正的执行。
以 RocketMQSource 为例:Source 方法需要实现 Source 和 ParallelismComputable 接口。
Source 接口public interface Source extends Serializable, TypeInfoConverterFactory { /** * Run in client side for source initialize; */ void configure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) throws IOException; /** * Indicate the Source type. */ Boundedness getSourceBoundedness(); /** * Create Source Reader. */ SourceReader createReader(SourceReader.Context readerContext); /** * Create split coordinator. */ SourceSplitCoordinator createSplitCoordinator(SourceSplitCoordinator.Context coordinatorContext); /** * Get Split serializer for the framework,{@link SplitT}should implement from {@link Serializable} */ default BinarySerializer getSplitSerializer() { return new SimpleBinarySerializer<>(); } /** * Get State serializer for the framework, {@link StateT}should implement from {@link Serializable} */ default BinarySerializer getSplitCoordinatorCheckpointSerializer() { return new SimpleBinarySerializer<>(); } /** * Create type info converter for the source, default value {@link BitSailTypeInfoConverter} */ default TypeInfoConverter createTypeInfoConverter() { return new BitSailTypeInfoConverter(); } /** * Get Source" name. */ String getReaderName(); }
configure 方法
主要去做一些客户端的配置的分发和提取,可以操作运行时环境 ExecutionEnviron 的配置和 readerConfiguration 的配置。示例@Override public void configure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) { this.readerConfiguration = readerConfiguration; this.commonConfiguration = execution.getCommonConfiguration(); }
getSourceBoundedness 方法
设置作业的处理方式,是采用流式处理方法、批式处理方法,或者是流批一体的处理方式,在流批一体的场景中,我们需要根据作业的不同类型设置不同的处理方式。
具体对应关系如下:
流批一体场景示例@Override public Boundedness getSourceBoundedness() { return Mode.BATCH.equals(Mode.getJobRunMode(commonConfiguration.get(CommonOptions.JOB_TYPE))) ? Boundedness.BOUNDEDNESS : Boundedness.UNBOUNDEDNESS; }
流批一体场景示例@Override public Boundedness getSourceBoundedness() { return Mode.BATCH.equals(Mode.getJobRunMode(commonConfiguration.get(CommonOptions.JOB_TYPE))) ? Boundedness.BOUNDEDNESS : Boundedness.UNBOUNDEDNESS; }
createTypeInfoConverter 方法
用于指定 Source 连接器的类型转换器;我们知道大多数的外部数据系统都存在着自己的类型定义,它们的定义与 BitSail 的类型定义不会完全一致;为了简化类型定义的转换,我们支持了通过配置文件来映射两者之间的关系,进而来简化配置文件的开发。
在行为上表现为对任务描述 Json 文件中reader 部分的columns 的解析,对于columns 中不同字段的 type 会根据上面描述文件从ClickhouseReaderOptions. COLUMNS 字段中解析到readerContext.getTypeInfos() 中。实现BitSailTypeInfoConverter 默认的TypeInfoConverter ,直接对ReaderOptions. COLUMNS 字段进行字符串的直接解析,COLUMNS 字段中是什么类型,TypeInfoConverter 中就是什么类型。FileMappingTypeInfoConverter 会在 BitSail 类型系统转换时去绑定{readername}-type-converter.yaml 文件,做数据库字段类型和 BitSail 类型的映射。ReaderOptions. COLUMNS 字段在通过这个映射文件转换后才会映射到TypeInfoConverter 中。示例
FileMappingTypeInfoConverter
通过 JDBC 方式连接的数据库,包括 MySql、Oracle、SqlServer、Kudu、ClickHouse 等。这里数据源的特点是以java.sql.ResultSet 的接口形式返回获取的数据,对于这类数据库,我们往往将TypeInfoConverter 对象设计为FileMappingTypeInfoConverter ,这个对象会在 BitSail 类型系统转换时去绑定{readername}-type-converter.yaml 文件,做数据库字段类型和 BitSail 类型的映射。
@Override public TypeInfoConverter createTypeInfoConverter() { return new FileMappingTypeInfoConverter(getReaderName()); }
对于{readername}-type-converter.yaml 文件的解析,以clickhouse-type-converter.yaml 为例。# Clickhouse Type to BitSail Type engine.type.to.bitsail.type.converter: - source.type: int32 target.type: int - source.type: float64 target.type: double - source.type: string target.type: string - source.type: date target.type: date.date - source.type: null target.type: void # BitSail Type to Clickhouse Type bitsail.type.to.engine.type.converter: - source.type: int target.type: int32 - source.type: double target.type: float64 - source.type: date.date target.type: date - source.type: string target.type: string
这个文件起到的作用是进行 job 描述 json 文件中reader 部分的columns 的解析,对于columns 中不同字段的 type 会根据上面描述文件从ClickhouseReaderOptions. COLUMNS 字段中解析到readerContext.getTypeInfos() 中。
"reader": { "class": "com.bytedance.bitsail.connector.clickhouse.source.ClickhouseSource", "jdbc_url": "jdbc:clickhouse://localhost:8123", "db_name": "default", "table_name": "test_ch_table", "split_field": "id", "split_config": "{"name": "id", "lower_bound": 0, "upper_bound": "10000", "split_num": 3}", "sql_filter": "( id % 2 == 0 )", "columns": [ { "name": "id", "type": "int64" }, { "name": "int_type", "type": "int32" }, { "name": "double_type", "type": "float64" }, { "name": "string_type", "type": "string" }, { "name": "p_date", "type": "date" } ] },
这种方式不仅仅适用于数据库,也适用于所有需要在类型转换中需要引擎侧和 BitSail 侧进行类型映射的场景。
BitSailTypeInfoConverter
通常采用默认的方式进行类型转换,直接对ReaderOptions. COLUMNS 字段进行字符串的直接解析。
@Override public TypeInfoConverter createTypeInfoConverter() { return new BitSailTypeInfoConverter(); }
以 Hadoop 为例:"reader": { "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", "path_list": "hdfs://127.0.0.1:9000/test_namespace/source/test.json", "content_type":"json", "reader_parallelism_num": 1, "columns": [ { "name":"id", "type": "int" }, { "name": "string_type", "type": "string" }, { "name": "map_string_string", "type": "map" }, { "name": "array_string", "type": "list" } ] }
createSourceReader 方法
书写具体的数据读取逻辑,负责数据读取的组件,在接收到 Split 后会对其进行数据读取,然后将数据传输给下一个算子。
具体传入构造 SourceReader 的参数按需求决定,但是一定要保证所有参数可以序列化。如果不可序列化,将会在 createJobGraph 的时候出错。示例public SourceReader createReader(SourceReader.Context readerContext) { return new RocketMQSourceReader( readerConfiguration, readerContext, getSourceBoundedness()); }
createSplitCoordinator 方法
书写具体的数据分片、分片分配逻辑,SplitCoordinator 承担了去创建、管理 Split 的角色。
具体传入构造 SplitCoordinator 的参数按需求决定,但是一定要保证所有参数可以序列化。如果不可序列化,将会在 createJobGraph 的时候出错。示例public SourceSplitCoordinator createSplitCoordinator(SourceSplitCoordinator .Context coordinatorContext) { return new RocketMQSourceSplitCoordinator( coordinatorContext, readerConfiguration, getSourceBoundedness()); }
ParallelismComputable 接口public interface ParallelismComputable extends Serializable { /** * give a parallelism advice for reader/writer based on configurations and upstream parallelism advice * * @param commonConf common configuration * @param selfConf reader/writer configuration * @param upstreamAdvice parallelism advice from upstream (when an operator has no upstream in DAG, its upstream is * global parallelism) * @return parallelism advice for the reader/writer */ ParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConf, BitSailConfiguration selfConf, ParallelismAdvice upstreamAdvice) throws Exception; }
getParallelismAdvice 方法
用于指定下游 reader 的并行数目。一般有以下的方式:
可以选择selfConf.get(ClickhouseReaderOptions. READER_PARALLELISM_NUM ) 来指定并行度。
也可以自定义自己的并行度划分逻辑。示例
比如在 RocketMQ 中,我们可以定义每 1 个 reader 可以处理至多 4 个队列DEFAULT_ROCKETMQ_PARALLELISM_THRESHOLD = 4
通过这种自定义的方式获取对应的并行度。public ParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConfiguration, BitSailConfiguration rocketmqConfiguration, ParallelismAdvice upstreamAdvice) throws Exception { String cluster = rocketmqConfiguration.get(RocketMQSourceOptions.CLUSTER); String topic = rocketmqConfiguration.get(RocketMQSourceOptions.TOPIC); String consumerGroup = rocketmqConfiguration.get(RocketMQSourceOptions.CONSUMER_GROUP); DefaultLitePullConsumer consumer = RocketMQUtils.prepareRocketMQConsumer(rocketmqConfiguration, String.format(SOURCE_INSTANCE_NAME_TEMPLATE, cluster, topic, consumerGroup, UUID.randomUUID() )); try { consumer.start(); Collection messageQueues = consumer.fetchMessageQueues(topic); int adviceParallelism = Math.max(CollectionUtils.size(messageQueues) / DEFAULT_ROCKETMQ_PARALLELISM_THRESHOLD, 1); return ParallelismAdvice.builder() .adviceParallelism(adviceParallelism) .enforceDownStreamChain(true) .build(); } finally { consumer.shutdown(); } } }
SourceSplit
数据源的数据分片格式,需要我们实现 SourceSplit 接口。
SourceSplit 接口
要求我们实现一个实现一个获取 splitId 的方法。public interface SourceSplit extends Serializable { String uniqSplitId(); }
对于具体切片的格式,开发者可以按照自己的需求进行自定义。示例JDBC 类存储
一般会通过主键,来对数据进行最大、最小值的划分;对于无主键类则通常会将其认定为一个 split,不再进行拆分,所以 split 中的参数包括主键的最大最小值,以及一个布尔类型的readTable ,如果无主键类或是不进行主键的切分则整张表会视为一个 split,此时readTable 为true ,如果按主键最大最小值进行切分,则设置为false 。
以 ClickhouseSourceSplit 为例:@Setter public class ClickhouseSourceSplit implements SourceSplit { public static final String SOURCE_SPLIT_PREFIX = "clickhouse_source_split_"; private static final String BETWEEN_CLAUSE = "( `%s` BETWEEN ? AND ? )"; private final String splitId; /** * Read whole table or range [lower, upper] */ private boolean readTable; private Long lower; private Long upper; public ClickhouseSourceSplit(int splitId) { this.splitId = SOURCE_SPLIT_PREFIX + splitId; } @Override public String uniqSplitId() { return splitId; } public void decorateStatement(PreparedStatement statement) { try { if (readTable) { lower = Long.MIN_VALUE; upper = Long.MAX_VALUE; } statement.setObject(1, lower); statement.setObject(2, upper); } catch (SQLException e) { throw BitSailException.asBitSailException(CommonErrorCode.RUNTIME_ERROR, "Failed to decorate statement with split " + this, e.getCause()); } } public static String getRangeClause(String splitField) { return StringUtils.isEmpty(splitField) ? null : String.format(BETWEEN_CLAUSE, splitField); } @Override public String toString() { return String.format( "{"split_id":"%s", "lower":%s, "upper":%s, "readTable":%s}", splitId, lower, upper, readTable); } }
消息队列
一般按照消息队列中 topic 注册的 partitions 的数量进行 split 的划分,切片中主要应包含消费的起点和终点以及消费的队列。
以 RocketMQSplit 为例:@Builder @Getter public class RocketMQSplit implements SourceSplit { private MessageQueue messageQueue; @Setter private long startOffset; private long endOffset; private String splitId; @Override public String uniqSplitId() { return splitId; } @Override public String toString() { return "RocketMQSplit{" + "messageQueue=" + messageQueue + ", startOffset=" + startOffset + ", endOffset=" + endOffset + "}"; } }
文件系统
一般会按照文件作为最小粒度进行划分,同时有些格式也支持将单个文件拆分为多个子 Splits。文件系统 split 中需要包装所需的文件切片。
以 FtpSourceSplit 为例:public class FtpSourceSplit implements SourceSplit { public static final String FTP_SOURCE_SPLIT_PREFIX = "ftp_source_split_"; private final String splitId; @Setter private String path; @Setter private long fileSize; public FtpSourceSplit(int splitId) { this.splitId = FTP_SOURCE_SPLIT_PREFIX + splitId; } @Override public String uniqSplitId() { return splitId; } @Override public boolean equals(Object obj) { return (obj instanceof FtpSourceSplit) && (splitId.equals(((FtpSourceSplit) obj).splitId)); } }
特别的,在 Hadoop 文件系统中,我们也可以利用对org.apache.hadoop.mapred.InputSplit 类的包装来自定义我们的 Split。public class HadoopSourceSplit implements SourceSplit { private static final long serialVersionUID = 1L; private final Class<? extends InputSplit> splitType; private transient InputSplit hadoopInputSplit; private byte[] hadoopInputSplitByteArray; public HadoopSourceSplit(InputSplit inputSplit) { if (inputSplit == null) { throw new NullPointerException("Hadoop input split must not be null"); } this.splitType = inputSplit.getClass(); this.hadoopInputSplit = inputSplit; } public InputSplit getHadoopInputSplit() { return this.hadoopInputSplit; } public void initInputSplit(JobConf jobConf) { if (this.hadoopInputSplit != null) { return; } checkNotNull(hadoopInputSplitByteArray); try { this.hadoopInputSplit = (InputSplit) WritableFactories.newInstance(splitType); if (this.hadoopInputSplit instanceof Configurable) { ((Configurable) this.hadoopInputSplit).setConf(jobConf); } else if (this.hadoopInputSplit instanceof JobConfigurable) { ((JobConfigurable) this.hadoopInputSplit).configure(jobConf); } if (hadoopInputSplitByteArray != null) { try (ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(hadoopInputSplitByteArray))) { this.hadoopInputSplit.readFields(objectInputStream); } this.hadoopInputSplitByteArray = null; } } catch (Exception e) { throw new RuntimeException("Unable to instantiate Hadoop InputSplit", e); } } private void writeObject(ObjectOutputStream out) throws IOException { if (hadoopInputSplit != null) { try ( ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream) ) { this.hadoopInputSplit.write(objectOutputStream); objectOutputStream.flush(); this.hadoopInputSplitByteArray = byteArrayOutputStream.toByteArray(); } } out.defaultWriteObject(); } @Override public String uniqSplitId() { return hadoopInputSplit.toString(); } }
State
在需要做 checkpoint 的场景下,通常我们会通过 Map 来保留当前的执行状态流批一体场景
在流批一体场景中,我们需要保存状态以便从异常中断的流式作业恢复
以 RocketMQState 为例:public class RocketMQState implements Serializable { private final Map assignedWithSplitIds; public RocketMQState(Map assignedWithSplitIds) { this.assignedWithSplitIds = assignedWithSplitIds; } public Map getAssignedWithSplits() { return assignedWithSplitIds; } }
复制代码 批式场景
对于批式场景,我们可以使用EmptyState 不存储状态,如果需要状态存储,和流批一体场景采用相似的设计方案。public class EmptyState implements Serializable { public static EmptyState fromBytes() { return new EmptyState(); } }
1899买下了半年前3499的4KMiniLED显示器,联合创新27M2U杀疯了大家发现没有,今年组装电脑成本很高,只有显示器疯狂内卷。主流MiniLED显示器降价更猛,去年两三千只够买入门级的,款型也少现在各家都在猛推新品,2K分辨率的已经跌到近千元,4K分
C编程初探(5)你好,世界!上节我们创建了第一个项目,这节我们继续,让程序能跑起来。打开VS,起始界面是这样,我们可以在箭头位置找到最近的解决方案。界面布局点击打开,呈现出如下界面默认情况下,分成上图所示12
詹姆斯很多纪录也许会被后辈们打破,但这5个只能被仰视,破不了詹姆斯创造纪录的能力,放眼历史都是独一档,詹姆斯从奥尼尔巅峰尾,经历四大前锋马刺零号特工奇才苦主活塞五虎凯尔三巨头四星老鹰双雄(格兰杰,乔治)步行者雷霆三少四星半勇士,可以说从四大
Qt多个信号关联同一个槽函数背景多个信号需要执行同一个函数或者一类函数的时候,可以选择每个信号创建一个槽函数去实现功能,如果直接关联到一个函数中,该函数只能执行一份功能,有时候并不能满足业务需求在多个信号绑定
荣耀80Pro三体限定版曝光后置三体LogoIT之家12月15日消息,三体动画已于近期开播,该动画改编自作家刘慈欣的系列同名长篇科幻小说,由B站主要出品制作,三体宇宙联合出品,艺画开天联合出品承制。三体动画版也出现了指定产品
导致排毒能力衰退的13个误区你的健康常识其实漏洞百出,这13个健康误区一定要纠正!误区1只要吃点对身体好的食物就够了健康全在饮食上。所以只要关注饮食,营养均衡,定时定量吃饭就够了。只是这样做是没法保证健康的。
java之反射(2)属性field头条创作挑战赛通过反射可以得到字节码文件中的信息,比如字节码中的属性方法构造器等,是与类对应的。今天学习属性field的一些api。设置一个User类,内容如下publicclas
有能力却有污点的人,该不该重用?用人的格局,决定了发展的上限我们常说,道不同,不相为谋。这话就是告诉我们,只要路的方向是一致的,我们是可以合作的,合作共赢嘛!但是,有的人,却有道德洁癖,对那些道德上有点不太完美的人,看不顺眼,不愿意跟这样的
目前CBA本土球员真正称得上攻防一体的球员就这三位吧所谓攻防一体就是进攻端和防守端都是顶级的水平,目前国内球员具备这个能力的真是少之又少啊!联盟中进攻端出色的球员很多,防守端出色的球员也很多,但是大部分都偏科,比如,赵继伟和沈梓捷,
国家华中区域应急救援中心开工活动16日举办16日上午,华中区域应急救援中心项目开工活动在湖北武汉举办。华中区域应急救援中心是国家区域应急救援中心建设工程的重要组成,辐射湖北安徽江西湖南江苏河南等地区,主要担负华中地区洪涝灾
在江苏的绿林仙海中,寻找侠气与柔情前言旅途对于我来说,总是会有各种各样的意义,每一次出行,都能找到新的体验。一直在路上的我,很庆幸自己能见到那么的风景感受那么多人文,结识了这么多最可爱的人。带着对旅途的憧憬与满心欢