范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

万字长文Flinkcdc源码精讲(推荐收藏)(四)

  // -------------------------   SnapshotSplitReader.submitSplit方法  ------------------------------------------ public void submitSplit(MySqlSplit mySqlSplit) {         this.currentSnapshotSplit = mySqlSplit.asSnapshotSplit();         statefulTaskContext.configure(currentSnapshotSplit);      // 拿到context的queue,在pollSplitSrecords的时候需要         this.queue = statefulTaskContext.getQueue();         this.nameAdjuster = statefulTaskContext.getSchemaNameAdjuster();         this.hasNextElement.set(true);         this.reachEnd.set(false);      // 主要读取逻辑在readTask中         this.splitSnapshotReadTask =                 new MySqlSnapshotSplitReadTask(                         statefulTaskContext.getConnectorConfig(),                         statefulTaskContext.getOffsetContext(),                         statefulTaskContext.getSnapshotChangeEventSourceMetrics(),                         statefulTaskContext.getDatabaseSchema(),                         statefulTaskContext.getConnection(),                         statefulTaskContext.getDispatcher(),                         statefulTaskContext.getTopicSelector(),                         StatefulTaskContext.getClock(),                         currentSnapshotSplit);      // 提交一个runnable到线程中,主要是执行readTask的execute方法         executor.submit(                 () -> {                     try {                         currentTaskRunning = true;                        // 自己实现的contextImpl 主要记录高水位和低水位用                         final SnapshotSplitChangeEventSourceContextImpl sourceContext =                                 new SnapshotSplitChangeEventSourceContextImpl();                        // 执行readTask                         SnapshotResult snapshotResult =                                 splitSnapshotReadTask.execute(sourceContext);                         final MySqlBinlogSplit backfillBinlogSplit =                                 createBackfillBinlogSplit(sourceContext);                         // optimization that skip the binlog read when the low watermark equals high                         // watermark                        // 如由于snapshot是并行读取的,所以当该读取该split的数据,低水位和高水位相同,说明在read数据中没有出现其他操作,所以可以退出binlog优化阶段,可以认为该split范围的数据没有变更,不需要在snapshot之后进行binlog的read                         final boolean binlogBackfillRequired =                                 backfillBinlogSplit                                         .getEndingOffset()                                         .isAfter(backfillBinlogSplit.getStartingOffset());                         if (!binlogBackfillRequired) {                             dispatchHighWatermark(backfillBinlogSplit);                             currentTaskRunning = false;                             return;                         }                         // snapshot执行完成后,开始binlogReadTask的读取操作                         if (snapshotResult.isCompletedOrSkipped()) {                            // 根据snapshot read task读取结束后,会记录高低水位,水位线作为参数构建binlog read task                             final MySqlBinlogSplitReadTask backfillBinlogReadTask =                                     createBackfillBinlogReadTask(backfillBinlogSplit);                            // 执行binlog read task,由于里面的处理逻辑太复杂了,我们就不直接进行阅读了                            // 我这里直接简单介绍一下流程,就是拿到snapshot的高水位,作为endOffset,在binlog read task中,会                            // 以endOffset作为结束条件,小宇endOffset的数据都会被read,并发送下游                             backfillBinlogReadTask.execute(                                     new SnapshotBinlogSplitChangeEventSourceContextImpl());                         } else {                             readException =                                     new IllegalStateException(                                             String.format(                                                     "Read snapshot for mysql split %s fail",                                                     currentSnapshotSplit));                         }                     } catch (Exception e) {                         currentTaskRunning = false;                         LOG.error(                                 String.format(                                         "Execute snapshot read task for mysql split %s fail",                                         currentSnapshotSplit),                                 e);                         readException = e;                     }                 });     } // -------------------------   MySqlSnapshotSplitReadTask.execute(sourceContext)方法  ------------------------------------------    @Override     public SnapshotResult execute(ChangeEventSourceContext context) throws InterruptedException {         SnapshottingTask snapshottingTask = getSnapshottingTask(previousOffset);//就是new了一个         final SnapshotContext ctx;         try {             ctx = prepare(context); //重新new了一个 context对象,比较无用         } catch (Exception e) {             LOG.error("Failed to initialize snapshot context.", e);             throw new RuntimeException(e);         }         try {            // 上面都是无用代码,这里直接调用了doExecute方法,我们进入该方法看主要逻辑即可             return doExecute(context, ctx, snapshottingTask);         } catch (InterruptedException e) {             LOG.warn("Snapshot was interrupted before completion");             throw e;         } catch (Exception t) {             throw new DebeziumException(t);         }     } // -------------------------   MySqlSnapshotSplitReadTask.doExecute(sourceContext)方法  ------------------------------------------  @Override     protected SnapshotResult doExecute(             ChangeEventSourceContext context,             SnapshotContext snapshotContext,             SnapshottingTask snapshottingTask)             throws Exception {         final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx =                 (RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext;         ctx.offset = offsetContext;        // 一个dispatcher,用于记录水位线事件,后面会通过该dispatcher发射数据,当然是通过emitter发射了         final SignalEventDispatcher signalEventDispatcher =                 new SignalEventDispatcher(                         offsetContext.getPartition(),                         topicSelector.topicNameFor(snapshotSplit.getTableId()),                         dispatcher.getQueue());     // 其实log输出的日志就已经很清晰了        // 记录低水位         final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection);         LOG.info(                 "Snapshot step 1 - Determining low watermark {} for split {}",                 lowWatermark,                 snapshotSplit);         ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))                 .setLowWatermark(lowWatermark);         signalEventDispatcher.dispatchWatermarkEvent(                 snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW);         LOG.info("Snapshot step 2 - Snapshotting data");        // 读取数据  主要方法重点介绍的地方         createDataEvents(ctx, snapshotSplit.getTableId());     // 记录高水位         final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);         LOG.info(                 "Snapshot step 3 - Determining high watermark {} for split {}",                 highWatermark,                 snapshotSplit);         signalEventDispatcher.dispatchWatermarkEvent(                 snapshotSplit, highWatermark, SignalEventDispatcher.WatermarkKind.HIGH);         ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))                 .setHighWatermark(highWatermark);         return SnapshotResult.completed(ctx.offset);     }   // 我们看看createDataEvents 调用过程 private void createDataEvents(             RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,             TableId tableId)             throws Exception {         EventDispatcher.SnapshotReceiver snapshotReceiver =                 dispatcher.getSnapshotChangeEventReceiver();         LOG.debug("Snapshotting table {}", tableId);         createDataEventsForTable(                 snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId));      // receiver的逻辑我们就不看了,我这里介绍一下就好      // receiver通过changeRecord方法接收到数据后,通过一个成员变量(bufferedEvent)控制,如果!=null加入队列,然后创建一个新的SourceRecord,直到所有的数据读取完成,所以说最后一条数据创建成功之后,如果没有新的数据了,则不会调用changeRecord该方法,也就是说成员变量记录了最后一个record      // 这里调用completeSnapshot方法的时候会对bufferedEvent变量进行判断,如果不等于null做一些complete相关的工作最后加入队列中,如果不调用该方法,则当前split的snapshot阶段读取的数据少了一条,嘻嘻嘻         snapshotReceiver.completeSnapshot();     } // createDataEvents中调用到本类的createDataEventsForTable,也就是开始了具体读取逻辑     private void createDataEventsForTable(             RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,             EventDispatcher.SnapshotReceiver snapshotReceiver,             Table table)             throws InterruptedException {         long exportStart = clock.currentTimeInMillis();         LOG.info("Exporting data from split "{}" of table {}", snapshotSplit.splitId(), table.id());             // 构建sql         final String selectSql =                 StatementUtils.buildSplitScanQuery(                         snapshotSplit.getTableId(),                         snapshotSplit.getSplitKeyType(),                         snapshotSplit.getSplitStart() == null,                         snapshotSplit.getSplitEnd() == null);         LOG.info(                 "For split "{}" of table {} using select statement: "{}"",                 snapshotSplit.splitId(),                 table.id(),                 selectSql);              try (PreparedStatement selectStatement =                         StatementUtils.readTableSplitDataStatement( // 创建statement,然后查询sql                                 jdbcConnection,                                 selectSql,                                 snapshotSplit.getSplitStart() == null,                                 snapshotSplit.getSplitEnd() == null, snapshotSplit.getSplitStart(),                                 snapshotSplit.getSplitEnd(),                                 snapshotSplit.getSplitKeyType().getFieldCount(),                                 connectorConfig.getQueryFetchSize());              // 然后对查询出来的数据进行封装成sourceRecord发送下游                 ResultSet rs = selectStatement.executeQuery()) {             ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table);             long rows = 0;             Threads.Timer logTimer = getTableScanLogTimer();             while (rs.next()) {                 rows++;                 final Object[] row = new Object[columnArray.getGreatestColumnPosition()];                 for (int i = 0; i < columnArray.getColumns().length; i++) {                     Column actualColumn = table.columns().get(i);                     row[columnArray.getColumns()[i].position() - 1] =                             readField(rs, i + 1, actualColumn, table);                 }                 if (logTimer.expired()) {                     long stop = clock.currentTimeInMillis();                     LOG.info(                             "Exported {} records for split "{}" after {}",                             rows,                             snapshotSplit.splitId(),                             Strings.duration(stop - exportStart));                     snapshotProgressListener.rowsScanned(table.id(), rows);                     logTimer = getTableScanLogTimer();                 }                 // 这里会将数据放入队列,通过receiver接收数据,然后再将数据放入其队列的一个过程,其实不必深入,就是封装的比较好,难以理解                 dispatcher.dispatchSnapshotEvent(                         table.id(),                         getChangeRecordEmitter(snapshotContext, table.id(), row),// 就是new了一个                         snapshotReceiver);             }             LOG.info(                     "Finished exporting {} records for split "{}", total duration "{}"",                     rows,                     snapshotSplit.splitId(),                     Strings.duration(clock.currentTimeInMillis() - exportStart));         } catch (SQLException e) {             throw new ConnectException("Snapshotting of table " + table.id() + " failed", e);         }     } // -------------------------   dispatcher.dispatchSnapshotEvent方法之后的流程  ----------------------------------  // 进入evnentDisptcher.dispatchSnapshotEvent方法    public void dispatchSnapshotEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter, SnapshotReceiver receiver) throws InterruptedException {         DataCollectionSchema dataCollectionSchema = schema.schemaFor(dataCollectionId);         if (dataCollectionSchema == null) {             errorOnMissingSchema(dataCollectionId, changeRecordEmitter);         }         changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new Receiver() {             @Override             public void changeRecord(DataCollectionSchema schema,                                      Operation operation,                                      Object key, Struct value,                                      OffsetContext offset,                                      ConnectHeaders headers)                     throws InterruptedException {                 eventListener.onEvent(dataCollectionSchema.id(), offset, key, value);                // 真正的放入队列的逻辑在这里调用                // receiver使我们传入的  对应BufferingSnapshotChangeRecordReceiver类                 receiver.changeRecord(dataCollectionSchema, operation, key, value, offset, headers);             }         });     }   // BufferingSnapshotChangeRecordReceiver的changeRecord方法  // 前面简单介绍过他的处理逻辑了,就不必多做介绍了   @Override         public void changeRecord(DataCollectionSchema dataCollectionSchema,                                  Operation operation,                                  Object key, Struct value,                                  OffsetContext offsetContext,                                  ConnectHeaders headers)                 throws InterruptedException {             Objects.requireNonNull(value, "value must not be null");             LOGGER.trace("Received change record for {} operation on key {}", operation, key);             if (bufferedEvent != null) {                 queue.enqueue(bufferedEvent.get());             }             Schema keySchema = dataCollectionSchema.keySchema();             String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id());             // the record is produced lazily, so to have the correct offset as per the pre/post completion callbacks             bufferedEvent = () -> {                 SourceRecord record = new SourceRecord(                         offsetContext.getPartition(),                         offsetContext.getOffset(),                         topicName, null,                         keySchema, key,                         dataCollectionSchema.getEnvelopeSchema().schema(), value,                         null, headers);                 return changeEventCreator.createDataChangeEvent(record);             };         }

Apollo出行拟收购一间智能电动车公司Tech星球12月6日消息,Apollo出行发布公告称,目前拟收购一间从事智能电动车的公司。目标公司的业务涵盖一系列配备先进技术的智能电动车,目标客户为中国年轻且精通技术的用户(特科技与生活背后那些默默付出的人十年前智能手机才刚刚开始普及使用,安卓手机开始在市场出现,各种品牌山寨机不断涌现导致市场乱象丛生,从而使诺基亚塞班系统手机市场被不断蚕食,苹果手机也是那个时候开始展露锋芒,以颠覆性雷军没有半分犹豫,小米12Pro降价1800说再见相约12月8日见,小米13系列可以说是未发先火了,目前在单个电商平台的预约人数就已经超过了30万,足以见得该新机的关注度到底有多高。其实这次小米13系列最大的亮点便是改用直屏方案,华大基因是一家很有争议的公司华大基因确实是一家很有争议的公司,其董事长汪健也是一个很有争议的人。任正非在内部会议上提到过汪健,原话如下。汪健是个有争议的新人,他说的话会有下一次突破吗?我们不妨宽容一点。任总说华为从4488跌至3399,鸿蒙系统IP68防水,售价更亲民华为手机的价格比友商高一些,现在的定位比三星还更高,不过体验也是不差的,特别是自研鸿蒙系统后,即便搭载高通芯片,国产化也比其它品牌更高,因此获得了不少国内消费者的认可和支持,如果国iPhone15Ultra诚意满满,全新双屏旗舰,不再挤牙膏了iPhone的地位很独特,始终保持着自己的风格,因为出色的闭源IOS系统,综合体验一直处于领先地位,并且拥有极高的寿命,市场份额全面领先。不过进入刘海屏时代后,iPhone更新节奏101比97,我预测北京控股明天上午四分惊险逆转取胜辽宁沈阳北京控股在第一阶段的成绩还是不错的,而辽宁队的成绩更是排在前列,不过很遗憾的是辽宁队现在在表面的平静下面暗潮汹涌,队里有很多的不和谐的地方,限制了辽宁队的实力发挥。面对北京控股,辽逝去的经典Walkman和Cybershot系列手机上篇讲索尼爱立信历史的文章中,有朋友评论建议再出一期W系列和C系列经典手机的文章。由于我也是索爱粉,所以就花大力气整理出了这篇文章。索尼的特长是设计,例如索尼爱立信那个logo史上OPPO想开了,天玑9000旗舰大方让利2250元,不再摆架子随着市场竞争不断白热化,一向高姿态的OPPO友商也发生了潜移默化的变化,彻底摆脱高价低配的说辞,开始极力向性价比靠拢。就拿这款OPPOFindX5Pro天玑版来说,售价一降再降,到小米13发布会MIUI14将会为小米10系列手机进行适配近年来,小米在系统方面获得了许多负面影响,尤其是在雷军!金凡!的事件之后,MIUI系统的不稳定问题开始成为网络热点。最近,小米宣布将举行小米13系列MIUI14新品发布会。届时,小100W快充双3200万,跌价1450元,华为四摄鸿蒙手机已濒临下架喜欢苹果新机但不愿意花过高的价格购买怎么办?很多果粉都会选择等一等,因为用惯了一个系统之后,再换其它系统手机会很不适应,虽然苹果新机售价不菲,但是手机这种产品更新换代特别快,在自己
全面放开数月后,日本的经济怎么样了?疫情放开后,人们自然会关心经济会恢复到什么程度。其实放眼全球,有很多现成的例子可以供我们借鉴。考虑到人文地理生活习惯城市格局与经济发展过程等诸多方面的相似之处,我们不妨以同处亚洲的维克多马丁内茨打破类固醇神话没有一张没有污点的纸钢铁侠维克多马丁内茨是2000年代和2010年代早期最成功的健美运动员之一,他是历史上第二个拿到IFBB职业卡的波多黎哥人,健硕的身材和完美的天赋,他被誉为是罗尼。库尔曼的接班人,NBA东部最新排名,凯尔特人稳居榜首,尼克斯第七,公牛第十2223赛季NBA常规赛今天继续进行,凯尔特人主场迎战公牛,末节比赛,拉文连续得分一度帮助公牛队将分差缩小到只有2分,关键时刻霍福德底角3分命中,拉文三分打铁,塔图姆反击暴扣打进并细思极恐!张兰的爆料,和汪小菲的话对上了头条创作挑战赛万物复苏,舆论战中的众人休整了一个年头,先后又慢慢冒出了头。尤其是年前撕得不可开交的徐汪两家,又上演了新的爱恨情仇。汪小菲高调认爱新女友,就算张颖颖已经全网被传怀有身猪价倒V走势续亏2。5亿4。5亿,中粮家佳康何时迎来向上拐点?回顾2022年猪价走势,在经历过探底反弹冲高回落这几大阶段之后,猪价整体呈现一个倒V的走势。具体而言,2022年年初生猪交易价格延续2021年末的下滑趋势,但跌幅逐步收窄,3月份触如何免费获取行业报告?行业报告通常提供特定地理区域的行业部门或市场的概述。它通常包括概念主要趋势和展望领先公司竞争环境行业数据(规模成本销售比例)核心经济指标和其他相关信息。CPDA数据分析师经常会通过入局做酱酒?碧桂园回应了地产巨头碧桂园要入局做酱酒?1月8日,有酒行业媒体称,地产龙头企业碧桂园近期已于仁怀市注册成立贵州顺朋酒业有限公司正式入局酱酒,并于2022年底拜访多家仁怀当地酒厂洽谈股权合作事宜7000万嫁女的煤老板邢利斌去世,他让山西煤老板想开了来源反做空信息中心综合今日头条史小小及自有信息来源消息1月6日,反做空信息中心获悉,7000万嫁女的山西煤老板,山西联盛能源集团原董事长邢利斌,确认已经去世。消息称,邢利斌在上海火再添4家全国500强!白云农企龙头数量位居全市第一近日,2022中国农业企业500强榜单发布,白云区共有4家农业企业入围,分别为海露控股集团有限公司(以下简称海露集团)乐禾食品集团股份有限公司粤旺农业集团有限公司广东优果农业投资有三星SK海力士等半导体厂商或削减半导体产能供应三星SK海力士等半导体厂商或削减半导体产能供应科创板日报11日讯,据业内人士透露,自2022年Q4开始,三星电子SK海力士等半导体厂商就削减半导体产能供应进行讨论,主要原因在于,22023年去西沙旅游一趟多少钱?长乐公主游轮价格表来了!西沙群岛是中国最南端的一片蔚蓝海域,这片土地没有污染,人迹罕至,纯净的让人心醉!今天小萱给大家说说2023年去西沙旅游一趟多少钱?长乐公主游轮价格表和最新行程分享给大家!长乐公主游