范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

干货BitSailConnector开发详解系列一Source

  更多技术交流、求职机会,欢迎关注 字节跳动数据平台微信公众号,回复【1】进入官方交流群
  BitSail 是字节跳动自研的数据集成产品,支持多种异构数据源间的数据同步,并提供离线、实时、全量、增量场景下全域数据集成解决方案。本系列聚焦 BitSail Connector 开发模块,为大家带来详细全面的开发方法与场景示例,本篇将主要介绍 Source 接口部分。
  持续关注,BitSail Connector 开发详解将分为四篇呈现。BitSail Connector 开发详解系列一:SourceBitSail Connector 开发详解系列二:SourceSplitCoordinatorBitSail Connector 开发详解系列三:SourceReaderBitSail Connector 开发详解系列四:Sink、WriterSource Connector
  本文将主要介绍 Source 接口部分:Source: 参与数据读取组件的生命周期管理,主要负责和框架的交互,构架作业,不参与作业真正的执行。SourceSplit: 数据读取分片,大数据处理框架的核心目的就是将大规模的数据拆分成为多个合理的 Split 并行处理。State:作业状态快照,当开启 checkpoint 之后,会保存当前执行状态。Source
  数据读取组件的生命周期管理,主要负责和框架的交互,构架作业,它不参与作业真正的执行。
  以 RocketMQSource 为例:Source 方法需要实现 Source 和 ParallelismComputable 接口。
  Source 接口public interface Source     extends Serializable, TypeInfoConverterFactory {     /**    * Run in client side for source initialize;    */   void configure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) throws IOException;     /**    * Indicate the Source type.    */   Boundedness getSourceBoundedness();     /**    * Create Source Reader.    */   SourceReader createReader(SourceReader.Context readerContext);     /**    * Create split coordinator.    */   SourceSplitCoordinator createSplitCoordinator(SourceSplitCoordinator.Context coordinatorContext);     /**    * Get Split serializer for the framework,{@link SplitT}should implement from {@link  Serializable}    */   default BinarySerializer getSplitSerializer() {     return new SimpleBinarySerializer<>();   }     /**    * Get State serializer for the framework, {@link StateT}should implement from {@link  Serializable}    */   default BinarySerializer getSplitCoordinatorCheckpointSerializer() {     return new SimpleBinarySerializer<>();   }     /**    * Create type info converter for the source, default value {@link BitSailTypeInfoConverter}    */   default TypeInfoConverter createTypeInfoConverter() {     return new BitSailTypeInfoConverter();   }     /**    * Get Source" name.    */   String getReaderName(); }
  configure 方法
  主要去做一些客户端的配置的分发和提取,可以操作运行时环境 ExecutionEnviron 的配置和 readerConfiguration 的配置。示例@Override public void configure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) {   this.readerConfiguration = readerConfiguration;   this.commonConfiguration = execution.getCommonConfiguration(); }
  getSourceBoundedness 方法
  设置作业的处理方式,是采用流式处理方法、批式处理方法,或者是流批一体的处理方式,在流批一体的场景中,我们需要根据作业的不同类型设置不同的处理方式。
  具体对应关系如下:
  流批一体场景示例@Override public Boundedness getSourceBoundedness() {   return Mode.BATCH.equals(Mode.getJobRunMode(commonConfiguration.get(CommonOptions.JOB_TYPE))) ?       Boundedness.BOUNDEDNESS :       Boundedness.UNBOUNDEDNESS; }
  流批一体场景示例@Override public Boundedness getSourceBoundedness() {   return Mode.BATCH.equals(Mode.getJobRunMode(commonConfiguration.get(CommonOptions.JOB_TYPE))) ?       Boundedness.BOUNDEDNESS :       Boundedness.UNBOUNDEDNESS; }
  createTypeInfoConverter 方法
  用于指定 Source 连接器的类型转换器;我们知道大多数的外部数据系统都存在着自己的类型定义,它们的定义与 BitSail 的类型定义不会完全一致;为了简化类型定义的转换,我们支持了通过配置文件来映射两者之间的关系,进而来简化配置文件的开发。
  在行为上表现为对任务描述 Json 文件中reader 部分的columns 的解析,对于columns 中不同字段的 type 会根据上面描述文件从ClickhouseReaderOptions. COLUMNS 字段中解析到readerContext.getTypeInfos() 中。实现BitSailTypeInfoConverter 默认的TypeInfoConverter ,直接对ReaderOptions. COLUMNS 字段进行字符串的直接解析,COLUMNS 字段中是什么类型,TypeInfoConverter 中就是什么类型。FileMappingTypeInfoConverter 会在 BitSail 类型系统转换时去绑定{readername}-type-converter.yaml 文件,做数据库字段类型和 BitSail 类型的映射。ReaderOptions. COLUMNS 字段在通过这个映射文件转换后才会映射到TypeInfoConverter 中。示例
  FileMappingTypeInfoConverter
  通过 JDBC 方式连接的数据库,包括 MySql、Oracle、SqlServer、Kudu、ClickHouse 等。这里数据源的特点是以java.sql.ResultSet 的接口形式返回获取的数据,对于这类数据库,我们往往将TypeInfoConverter 对象设计为FileMappingTypeInfoConverter ,这个对象会在 BitSail 类型系统转换时去绑定{readername}-type-converter.yaml 文件,做数据库字段类型和 BitSail 类型的映射。
  @Override public TypeInfoConverter createTypeInfoConverter() {   return new FileMappingTypeInfoConverter(getReaderName()); }
  对于{readername}-type-converter.yaml 文件的解析,以clickhouse-type-converter.yaml 为例。# Clickhouse Type to BitSail Type engine.type.to.bitsail.type.converter:     - source.type: int32     target.type: int     - source.type: float64     target.type: double     - source.type: string     target.type: string     - source.type: date     target.type: date.date     - source.type: null     target.type: void   # BitSail Type to Clickhouse Type bitsail.type.to.engine.type.converter:     - source.type: int     target.type: int32     - source.type: double     target.type: float64     - source.type: date.date     target.type: date     - source.type: string     target.type: string
  这个文件起到的作用是进行 job 描述 json 文件中reader 部分的columns 的解析,对于columns 中不同字段的 type 会根据上面描述文件从ClickhouseReaderOptions. COLUMNS 字段中解析到readerContext.getTypeInfos() 中。
  "reader": {   "class": "com.bytedance.bitsail.connector.clickhouse.source.ClickhouseSource",   "jdbc_url": "jdbc:clickhouse://localhost:8123",   "db_name": "default",   "table_name": "test_ch_table",   "split_field": "id",   "split_config": "{"name": "id", "lower_bound": 0, "upper_bound": "10000", "split_num": 3}",   "sql_filter": "( id % 2 == 0 )",   "columns": [     {       "name": "id",       "type": "int64"     },     {       "name": "int_type",       "type": "int32"     },     {       "name": "double_type",       "type": "float64"     },     {       "name": "string_type",       "type": "string"     },     {       "name": "p_date",       "type": "date"     }   ] },
  这种方式不仅仅适用于数据库,也适用于所有需要在类型转换中需要引擎侧和 BitSail 侧进行类型映射的场景。
  BitSailTypeInfoConverter
  通常采用默认的方式进行类型转换,直接对ReaderOptions. COLUMNS 字段进行字符串的直接解析。
  @Override public TypeInfoConverter createTypeInfoConverter() {   return new BitSailTypeInfoConverter(); }
  以 Hadoop 为例:"reader": {   "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource",   "path_list": "hdfs://127.0.0.1:9000/test_namespace/source/test.json",   "content_type":"json",   "reader_parallelism_num": 1,   "columns": [     {       "name":"id",       "type": "int"     },     {       "name": "string_type",       "type": "string"     },     {       "name": "map_string_string",       "type": "map"     },     {       "name": "array_string",       "type": "list"     }   ] }
  createSourceReader 方法
  书写具体的数据读取逻辑,负责数据读取的组件,在接收到 Split 后会对其进行数据读取,然后将数据传输给下一个算子。
  具体传入构造 SourceReader 的参数按需求决定,但是一定要保证所有参数可以序列化。如果不可序列化,将会在 createJobGraph 的时候出错。示例public SourceReader createReader(SourceReader.Context readerContext) {   return new RocketMQSourceReader(       readerConfiguration,       readerContext,       getSourceBoundedness()); }
  createSplitCoordinator 方法
  书写具体的数据分片、分片分配逻辑,SplitCoordinator 承担了去创建、管理 Split 的角色。
  具体传入构造 SplitCoordinator 的参数按需求决定,但是一定要保证所有参数可以序列化。如果不可序列化,将会在 createJobGraph 的时候出错。示例public SourceSplitCoordinator createSplitCoordinator(SourceSplitCoordinator                                                                                        .Context coordinatorContext) {   return new RocketMQSourceSplitCoordinator(       coordinatorContext,       readerConfiguration,       getSourceBoundedness()); }
  ParallelismComputable 接口public interface ParallelismComputable extends Serializable {     /**    * give a parallelism advice for reader/writer based on configurations and upstream parallelism advice    *    * @param commonConf     common configuration    * @param selfConf       reader/writer configuration    * @param upstreamAdvice parallelism advice from upstream (when an operator has no upstream in DAG, its upstream is    *                       global parallelism)    * @return parallelism advice for the reader/writer    */   ParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConf,                                          BitSailConfiguration selfConf,                                          ParallelismAdvice upstreamAdvice) throws Exception; }
  getParallelismAdvice 方法
  用于指定下游 reader 的并行数目。一般有以下的方式:
  可以选择selfConf.get(ClickhouseReaderOptions. READER_PARALLELISM_NUM ) 来指定并行度。
  也可以自定义自己的并行度划分逻辑。示例
  比如在 RocketMQ 中,我们可以定义每 1 个 reader 可以处理至多 4 个队列DEFAULT_ROCKETMQ_PARALLELISM_THRESHOLD = 4
  通过这种自定义的方式获取对应的并行度。public ParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConfiguration,                                                 BitSailConfiguration rocketmqConfiguration,                                                 ParallelismAdvice upstreamAdvice) throws Exception {     String cluster = rocketmqConfiguration.get(RocketMQSourceOptions.CLUSTER);     String topic = rocketmqConfiguration.get(RocketMQSourceOptions.TOPIC);     String consumerGroup = rocketmqConfiguration.get(RocketMQSourceOptions.CONSUMER_GROUP);     DefaultLitePullConsumer consumer = RocketMQUtils.prepareRocketMQConsumer(rocketmqConfiguration, String.format(SOURCE_INSTANCE_NAME_TEMPLATE,         cluster,         topic,         consumerGroup,         UUID.randomUUID()     ));     try {       consumer.start();       Collection messageQueues = consumer.fetchMessageQueues(topic);       int adviceParallelism = Math.max(CollectionUtils.size(messageQueues) / DEFAULT_ROCKETMQ_PARALLELISM_THRESHOLD, 1);         return ParallelismAdvice.builder()           .adviceParallelism(adviceParallelism)           .enforceDownStreamChain(true)           .build();     } finally {       consumer.shutdown();     }   } }
  SourceSplit
  数据源的数据分片格式,需要我们实现 SourceSplit 接口。
  SourceSplit 接口
  要求我们实现一个实现一个获取 splitId 的方法。public interface SourceSplit extends Serializable {   String uniqSplitId(); }
  对于具体切片的格式,开发者可以按照自己的需求进行自定义。示例JDBC 类存储
  一般会通过主键,来对数据进行最大、最小值的划分;对于无主键类则通常会将其认定为一个 split,不再进行拆分,所以 split 中的参数包括主键的最大最小值,以及一个布尔类型的readTable ,如果无主键类或是不进行主键的切分则整张表会视为一个 split,此时readTable 为true ,如果按主键最大最小值进行切分,则设置为false 。
  以 ClickhouseSourceSplit 为例:@Setter public class ClickhouseSourceSplit implements SourceSplit {   public static final String SOURCE_SPLIT_PREFIX = "clickhouse_source_split_";   private static final String BETWEEN_CLAUSE = "( `%s` BETWEEN ? AND ? )";     private final String splitId;     /**    * Read whole table or range [lower, upper]    */   private boolean readTable;   private Long lower;   private Long upper;     public ClickhouseSourceSplit(int splitId) {     this.splitId = SOURCE_SPLIT_PREFIX + splitId;   }     @Override   public String uniqSplitId() {     return splitId;   }     public void decorateStatement(PreparedStatement statement) {     try {       if (readTable) {         lower = Long.MIN_VALUE;         upper = Long.MAX_VALUE;       }       statement.setObject(1, lower);       statement.setObject(2, upper);     } catch (SQLException e) {       throw BitSailException.asBitSailException(CommonErrorCode.RUNTIME_ERROR, "Failed to decorate statement with split " + this, e.getCause());     }   }     public static String getRangeClause(String splitField) {     return StringUtils.isEmpty(splitField) ? null : String.format(BETWEEN_CLAUSE, splitField);   }     @Override   public String toString() {     return String.format(         "{"split_id":"%s", "lower":%s, "upper":%s, "readTable":%s}",         splitId, lower, upper, readTable);   } }
  消息队列
  一般按照消息队列中 topic 注册的 partitions 的数量进行 split 的划分,切片中主要应包含消费的起点和终点以及消费的队列。
  以 RocketMQSplit 为例:@Builder @Getter public class RocketMQSplit implements SourceSplit {     private MessageQueue messageQueue;     @Setter   private long startOffset;     private long endOffset;     private String splitId;     @Override   public String uniqSplitId() {     return splitId;   }     @Override   public String toString() {     return "RocketMQSplit{" +         "messageQueue=" + messageQueue +         ", startOffset=" + startOffset +         ", endOffset=" + endOffset +         "}";   } }
  文件系统
  一般会按照文件作为最小粒度进行划分,同时有些格式也支持将单个文件拆分为多个子 Splits。文件系统 split 中需要包装所需的文件切片。
  以 FtpSourceSplit 为例:public class FtpSourceSplit implements SourceSplit {     public static final String FTP_SOURCE_SPLIT_PREFIX = "ftp_source_split_";     private final String splitId;     @Setter   private String path;   @Setter   private long fileSize;     public FtpSourceSplit(int splitId) {     this.splitId = FTP_SOURCE_SPLIT_PREFIX + splitId;   }     @Override   public String uniqSplitId() {     return splitId;   }     @Override   public boolean equals(Object obj) {     return (obj instanceof FtpSourceSplit) && (splitId.equals(((FtpSourceSplit) obj).splitId));   }   }
  特别的,在 Hadoop 文件系统中,我们也可以利用对org.apache.hadoop.mapred.InputSplit 类的包装来自定义我们的 Split。public class HadoopSourceSplit implements SourceSplit {   private static final long serialVersionUID = 1L;   private final Class<? extends InputSplit> splitType;   private transient InputSplit hadoopInputSplit;     private byte[] hadoopInputSplitByteArray;     public HadoopSourceSplit(InputSplit inputSplit) {     if (inputSplit == null) {       throw new NullPointerException("Hadoop input split must not be null");     }       this.splitType = inputSplit.getClass();     this.hadoopInputSplit = inputSplit;   }     public InputSplit getHadoopInputSplit() {     return this.hadoopInputSplit;   }     public void initInputSplit(JobConf jobConf) {     if (this.hadoopInputSplit != null) {       return;     }       checkNotNull(hadoopInputSplitByteArray);       try {       this.hadoopInputSplit = (InputSplit) WritableFactories.newInstance(splitType);         if (this.hadoopInputSplit instanceof Configurable) {         ((Configurable) this.hadoopInputSplit).setConf(jobConf);       } else if (this.hadoopInputSplit instanceof JobConfigurable) {         ((JobConfigurable) this.hadoopInputSplit).configure(jobConf);       }         if (hadoopInputSplitByteArray != null) {         try (ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(hadoopInputSplitByteArray))) {           this.hadoopInputSplit.readFields(objectInputStream);         }           this.hadoopInputSplitByteArray = null;       }     } catch (Exception e) {       throw new RuntimeException("Unable to instantiate Hadoop InputSplit", e);     }   }     private void writeObject(ObjectOutputStream out) throws IOException {       if (hadoopInputSplit != null) {       try (           ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();           ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)       ) {         this.hadoopInputSplit.write(objectOutputStream);         objectOutputStream.flush();         this.hadoopInputSplitByteArray = byteArrayOutputStream.toByteArray();       }     }     out.defaultWriteObject();   }     @Override   public String uniqSplitId() {     return hadoopInputSplit.toString();   } }
  State
  在需要做 checkpoint 的场景下,通常我们会通过 Map 来保留当前的执行状态流批一体场景
  在流批一体场景中,我们需要保存状态以便从异常中断的流式作业恢复
  以 RocketMQState 为例:public class RocketMQState implements Serializable {     private final Map assignedWithSplitIds;     public RocketMQState(Map assignedWithSplitIds) {     this.assignedWithSplitIds = assignedWithSplitIds;   }     public Map getAssignedWithSplits() {     return assignedWithSplitIds;   } }
  复制代码  批式场景
  对于批式场景,我们可以使用EmptyState 不存储状态,如果需要状态存储,和流批一体场景采用相似的设计方案。public class EmptyState implements Serializable {     public static EmptyState fromBytes() {     return new EmptyState();   } }

如果拿把铲子朝地球往下挖,最终能挖穿地球吗?挖到一米,地鼠就居住于这个深度。地鼠居住深度地下两米,这是每个人百年以后长眠的地方。人类长眠深度地下六米,金属探测器只能探测到这里了。金属探测器深度地下20米,这是巴黎地下墓穴博物赵明确认荣耀下一代折叠屏手机Q4发布预装MagicOS7。0惊艳全球正当网间不但曝光出关于荣耀手机将于年内发布新一代折叠屏手机MagiceV2,明年三月发布影像旗舰Magic5系列的消息后,日前,荣耀终端有限公司CEO赵明George在接受媒体采访六场比赛伤六人!年近花甲的穆里尼奥越来越像安西教练,过于慈祥当一个人喝水都塞牙时,有些事情便不再是科学能够解释清楚。就好比葡萄牙老头儿本赛季的开局。按罗马官方的说法,罗马波兰小将扎莱夫斯基在最近的训练之中疑似出现伤情,初步诊断为内收肌拉伤。男排世锦赛波兰队意大利队将争夺金牌新华社波兰卡托维兹9月10日电(记者张章张琨)在10日进行的2022世界男排锦标赛半决赛中,波兰队以32淘汰巴西队,意大利队30战胜斯洛文尼亚队。当天在波兰南部城市卡托维兹进行的第赏月观星品茗插花中秋消费新时尚你最中意哪一个?来源央视新闻客户端昨天(9月10日)是中秋节,今年的中秋是十五的月亮十五圆,昨天17时59分迎来中秋满月的最圆时刻。中秋逢圆月赏月地图观美景中秋节,赏月必不可少。在不同地方不同场景再强调一次秋天穿风衣别老配裤子,学时尚博主搭这种裙子秋季穿风衣是许多气质女孩或者大龄女性的选择,穿上走路带风的风衣比较显气质。尤其是长款的风衣,可以将整个人套上,这样里面的搭配就显得没那么重要了,随便套一条长裤就OK了。可是,真的是一场高奢珠宝红毯秀,成了女明星们争夺的名利场早前宝格丽的珠宝红毯秀真是刷爆全网了,很多知名的女星都去参加了这场盛宴。这场盛宴不仅仅是宝格丽的品牌秀,更是女明星们真正要争夺的名利场。在这场秀里,女明星们佩戴的珠宝,以及身穿的礼iPhone14调查数据中国预购比例85买Pro只有5买基础版天风国际证券知名台籍分析师郭明錤周五(9日)在Twitter发表最新iPhone14实体渠道预购调查,显示中国地区iPhone14Pro系列占订单配比约为85,而苹果新推出的大尺寸中纪委评华为mate50逆境下的砥砺前行面对封锁打压中国企业抗压前行寻求突破华为突围图为位于广东省东莞市的华为生产基地内,工作人员正在操作先进的生产设施。(图片来源视觉中国)时隔两年,华为高端机型的代表Mate系列终于迎2022年9月11日国内油价最新消息国内油价本轮调价周期本周中秋节内油价预期简报汇总本轮国内最高零售限价调价周期第三个工作日的油价预期数据肯定会覆盖今年中秋节的非工作日报道。所以目前原油品种变化率暂时驻停在负值8。4我炒股8年,从10万入市,到炒股养家!总结出4条铁律趋势是交易的本质。了解这一趋势将获得在市场上赚钱的金钥匙。顺应潮流可以避免亏损和盈利。大多数投资者,无论他们的分析水平和技术方法有多高,都不可避免地以失败告终。根本原因是什么?这是
1899买下了半年前3499的4KMiniLED显示器,联合创新27M2U杀疯了大家发现没有,今年组装电脑成本很高,只有显示器疯狂内卷。主流MiniLED显示器降价更猛,去年两三千只够买入门级的,款型也少现在各家都在猛推新品,2K分辨率的已经跌到近千元,4K分C编程初探(5)你好,世界!上节我们创建了第一个项目,这节我们继续,让程序能跑起来。打开VS,起始界面是这样,我们可以在箭头位置找到最近的解决方案。界面布局点击打开,呈现出如下界面默认情况下,分成上图所示12詹姆斯很多纪录也许会被后辈们打破,但这5个只能被仰视,破不了詹姆斯创造纪录的能力,放眼历史都是独一档,詹姆斯从奥尼尔巅峰尾,经历四大前锋马刺零号特工奇才苦主活塞五虎凯尔三巨头四星老鹰双雄(格兰杰,乔治)步行者雷霆三少四星半勇士,可以说从四大Qt多个信号关联同一个槽函数背景多个信号需要执行同一个函数或者一类函数的时候,可以选择每个信号创建一个槽函数去实现功能,如果直接关联到一个函数中,该函数只能执行一份功能,有时候并不能满足业务需求在多个信号绑定荣耀80Pro三体限定版曝光后置三体LogoIT之家12月15日消息,三体动画已于近期开播,该动画改编自作家刘慈欣的系列同名长篇科幻小说,由B站主要出品制作,三体宇宙联合出品,艺画开天联合出品承制。三体动画版也出现了指定产品导致排毒能力衰退的13个误区你的健康常识其实漏洞百出,这13个健康误区一定要纠正!误区1只要吃点对身体好的食物就够了健康全在饮食上。所以只要关注饮食,营养均衡,定时定量吃饭就够了。只是这样做是没法保证健康的。java之反射(2)属性field头条创作挑战赛通过反射可以得到字节码文件中的信息,比如字节码中的属性方法构造器等,是与类对应的。今天学习属性field的一些api。设置一个User类,内容如下publicclas有能力却有污点的人,该不该重用?用人的格局,决定了发展的上限我们常说,道不同,不相为谋。这话就是告诉我们,只要路的方向是一致的,我们是可以合作的,合作共赢嘛!但是,有的人,却有道德洁癖,对那些道德上有点不太完美的人,看不顺眼,不愿意跟这样的目前CBA本土球员真正称得上攻防一体的球员就这三位吧所谓攻防一体就是进攻端和防守端都是顶级的水平,目前国内球员具备这个能力的真是少之又少啊!联盟中进攻端出色的球员很多,防守端出色的球员也很多,但是大部分都偏科,比如,赵继伟和沈梓捷,国家华中区域应急救援中心开工活动16日举办16日上午,华中区域应急救援中心项目开工活动在湖北武汉举办。华中区域应急救援中心是国家区域应急救援中心建设工程的重要组成,辐射湖北安徽江西湖南江苏河南等地区,主要担负华中地区洪涝灾在江苏的绿林仙海中,寻找侠气与柔情前言旅途对于我来说,总是会有各种各样的意义,每一次出行,都能找到新的体验。一直在路上的我,很庆幸自己能见到那么的风景感受那么多人文,结识了这么多最可爱的人。带着对旅途的憧憬与满心欢