万字长文Flinkcdc源码精讲(推荐收藏)(四)
// ------------------------- SnapshotSplitReader.submitSplit方法 ------------------------------------------ public void submitSplit(MySqlSplit mySqlSplit) { this.currentSnapshotSplit = mySqlSplit.asSnapshotSplit(); statefulTaskContext.configure(currentSnapshotSplit); // 拿到context的queue,在pollSplitSrecords的时候需要 this.queue = statefulTaskContext.getQueue(); this.nameAdjuster = statefulTaskContext.getSchemaNameAdjuster(); this.hasNextElement.set(true); this.reachEnd.set(false); // 主要读取逻辑在readTask中 this.splitSnapshotReadTask = new MySqlSnapshotSplitReadTask( statefulTaskContext.getConnectorConfig(), statefulTaskContext.getOffsetContext(), statefulTaskContext.getSnapshotChangeEventSourceMetrics(), statefulTaskContext.getDatabaseSchema(), statefulTaskContext.getConnection(), statefulTaskContext.getDispatcher(), statefulTaskContext.getTopicSelector(), StatefulTaskContext.getClock(), currentSnapshotSplit); // 提交一个runnable到线程中,主要是执行readTask的execute方法 executor.submit( () -> { try { currentTaskRunning = true; // 自己实现的contextImpl 主要记录高水位和低水位用 final SnapshotSplitChangeEventSourceContextImpl sourceContext = new SnapshotSplitChangeEventSourceContextImpl(); // 执行readTask SnapshotResult snapshotResult = splitSnapshotReadTask.execute(sourceContext); final MySqlBinlogSplit backfillBinlogSplit = createBackfillBinlogSplit(sourceContext); // optimization that skip the binlog read when the low watermark equals high // watermark // 如由于snapshot是并行读取的,所以当该读取该split的数据,低水位和高水位相同,说明在read数据中没有出现其他操作,所以可以退出binlog优化阶段,可以认为该split范围的数据没有变更,不需要在snapshot之后进行binlog的read final boolean binlogBackfillRequired = backfillBinlogSplit .getEndingOffset() .isAfter(backfillBinlogSplit.getStartingOffset()); if (!binlogBackfillRequired) { dispatchHighWatermark(backfillBinlogSplit); currentTaskRunning = false; return; } // snapshot执行完成后,开始binlogReadTask的读取操作 if (snapshotResult.isCompletedOrSkipped()) { // 根据snapshot read task读取结束后,会记录高低水位,水位线作为参数构建binlog read task final MySqlBinlogSplitReadTask backfillBinlogReadTask = createBackfillBinlogReadTask(backfillBinlogSplit); // 执行binlog read task,由于里面的处理逻辑太复杂了,我们就不直接进行阅读了 // 我这里直接简单介绍一下流程,就是拿到snapshot的高水位,作为endOffset,在binlog read task中,会 // 以endOffset作为结束条件,小宇endOffset的数据都会被read,并发送下游 backfillBinlogReadTask.execute( new SnapshotBinlogSplitChangeEventSourceContextImpl()); } else { readException = new IllegalStateException( String.format( "Read snapshot for mysql split %s fail", currentSnapshotSplit)); } } catch (Exception e) { currentTaskRunning = false; LOG.error( String.format( "Execute snapshot read task for mysql split %s fail", currentSnapshotSplit), e); readException = e; } }); } // ------------------------- MySqlSnapshotSplitReadTask.execute(sourceContext)方法 ------------------------------------------ @Override public SnapshotResult execute(ChangeEventSourceContext context) throws InterruptedException { SnapshottingTask snapshottingTask = getSnapshottingTask(previousOffset);//就是new了一个 final SnapshotContext ctx; try { ctx = prepare(context); //重新new了一个 context对象,比较无用 } catch (Exception e) { LOG.error("Failed to initialize snapshot context.", e); throw new RuntimeException(e); } try { // 上面都是无用代码,这里直接调用了doExecute方法,我们进入该方法看主要逻辑即可 return doExecute(context, ctx, snapshottingTask); } catch (InterruptedException e) { LOG.warn("Snapshot was interrupted before completion"); throw e; } catch (Exception t) { throw new DebeziumException(t); } } // ------------------------- MySqlSnapshotSplitReadTask.doExecute(sourceContext)方法 ------------------------------------------ @Override protected SnapshotResult doExecute( ChangeEventSourceContext context, SnapshotContext snapshotContext, SnapshottingTask snapshottingTask) throws Exception { final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx = (RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext; ctx.offset = offsetContext; // 一个dispatcher,用于记录水位线事件,后面会通过该dispatcher发射数据,当然是通过emitter发射了 final SignalEventDispatcher signalEventDispatcher = new SignalEventDispatcher( offsetContext.getPartition(), topicSelector.topicNameFor(snapshotSplit.getTableId()), dispatcher.getQueue()); // 其实log输出的日志就已经很清晰了 // 记录低水位 final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection); LOG.info( "Snapshot step 1 - Determining low watermark {} for split {}", lowWatermark, snapshotSplit); ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context)) .setLowWatermark(lowWatermark); signalEventDispatcher.dispatchWatermarkEvent( snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW); LOG.info("Snapshot step 2 - Snapshotting data"); // 读取数据 主要方法重点介绍的地方 createDataEvents(ctx, snapshotSplit.getTableId()); // 记录高水位 final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection); LOG.info( "Snapshot step 3 - Determining high watermark {} for split {}", highWatermark, snapshotSplit); signalEventDispatcher.dispatchWatermarkEvent( snapshotSplit, highWatermark, SignalEventDispatcher.WatermarkKind.HIGH); ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context)) .setHighWatermark(highWatermark); return SnapshotResult.completed(ctx.offset); } // 我们看看createDataEvents 调用过程 private void createDataEvents( RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, TableId tableId) throws Exception { EventDispatcher.SnapshotReceiver snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver(); LOG.debug("Snapshotting table {}", tableId); createDataEventsForTable( snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId)); // receiver的逻辑我们就不看了,我这里介绍一下就好 // receiver通过changeRecord方法接收到数据后,通过一个成员变量(bufferedEvent)控制,如果!=null加入队列,然后创建一个新的SourceRecord,直到所有的数据读取完成,所以说最后一条数据创建成功之后,如果没有新的数据了,则不会调用changeRecord该方法,也就是说成员变量记录了最后一个record // 这里调用completeSnapshot方法的时候会对bufferedEvent变量进行判断,如果不等于null做一些complete相关的工作最后加入队列中,如果不调用该方法,则当前split的snapshot阶段读取的数据少了一条,嘻嘻嘻 snapshotReceiver.completeSnapshot(); } // createDataEvents中调用到本类的createDataEventsForTable,也就是开始了具体读取逻辑 private void createDataEventsForTable( RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, EventDispatcher.SnapshotReceiver snapshotReceiver, Table table) throws InterruptedException { long exportStart = clock.currentTimeInMillis(); LOG.info("Exporting data from split "{}" of table {}", snapshotSplit.splitId(), table.id()); // 构建sql final String selectSql = StatementUtils.buildSplitScanQuery( snapshotSplit.getTableId(), snapshotSplit.getSplitKeyType(), snapshotSplit.getSplitStart() == null, snapshotSplit.getSplitEnd() == null); LOG.info( "For split "{}" of table {} using select statement: "{}"", snapshotSplit.splitId(), table.id(), selectSql); try (PreparedStatement selectStatement = StatementUtils.readTableSplitDataStatement( // 创建statement,然后查询sql jdbcConnection, selectSql, snapshotSplit.getSplitStart() == null, snapshotSplit.getSplitEnd() == null, snapshotSplit.getSplitStart(), snapshotSplit.getSplitEnd(), snapshotSplit.getSplitKeyType().getFieldCount(), connectorConfig.getQueryFetchSize()); // 然后对查询出来的数据进行封装成sourceRecord发送下游 ResultSet rs = selectStatement.executeQuery()) { ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table); long rows = 0; Threads.Timer logTimer = getTableScanLogTimer(); while (rs.next()) { rows++; final Object[] row = new Object[columnArray.getGreatestColumnPosition()]; for (int i = 0; i < columnArray.getColumns().length; i++) { Column actualColumn = table.columns().get(i); row[columnArray.getColumns()[i].position() - 1] = readField(rs, i + 1, actualColumn, table); } if (logTimer.expired()) { long stop = clock.currentTimeInMillis(); LOG.info( "Exported {} records for split "{}" after {}", rows, snapshotSplit.splitId(), Strings.duration(stop - exportStart)); snapshotProgressListener.rowsScanned(table.id(), rows); logTimer = getTableScanLogTimer(); } // 这里会将数据放入队列,通过receiver接收数据,然后再将数据放入其队列的一个过程,其实不必深入,就是封装的比较好,难以理解 dispatcher.dispatchSnapshotEvent( table.id(), getChangeRecordEmitter(snapshotContext, table.id(), row),// 就是new了一个 snapshotReceiver); } LOG.info( "Finished exporting {} records for split "{}", total duration "{}"", rows, snapshotSplit.splitId(), Strings.duration(clock.currentTimeInMillis() - exportStart)); } catch (SQLException e) { throw new ConnectException("Snapshotting of table " + table.id() + " failed", e); } } // ------------------------- dispatcher.dispatchSnapshotEvent方法之后的流程 ---------------------------------- // 进入evnentDisptcher.dispatchSnapshotEvent方法 public void dispatchSnapshotEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter, SnapshotReceiver receiver) throws InterruptedException { DataCollectionSchema dataCollectionSchema = schema.schemaFor(dataCollectionId); if (dataCollectionSchema == null) { errorOnMissingSchema(dataCollectionId, changeRecordEmitter); } changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new Receiver() { @Override public void changeRecord(DataCollectionSchema schema, Operation operation, Object key, Struct value, OffsetContext offset, ConnectHeaders headers) throws InterruptedException { eventListener.onEvent(dataCollectionSchema.id(), offset, key, value); // 真正的放入队列的逻辑在这里调用 // receiver使我们传入的 对应BufferingSnapshotChangeRecordReceiver类 receiver.changeRecord(dataCollectionSchema, operation, key, value, offset, headers); } }); } // BufferingSnapshotChangeRecordReceiver的changeRecord方法 // 前面简单介绍过他的处理逻辑了,就不必多做介绍了 @Override public void changeRecord(DataCollectionSchema dataCollectionSchema, Operation operation, Object key, Struct value, OffsetContext offsetContext, ConnectHeaders headers) throws InterruptedException { Objects.requireNonNull(value, "value must not be null"); LOGGER.trace("Received change record for {} operation on key {}", operation, key); if (bufferedEvent != null) { queue.enqueue(bufferedEvent.get()); } Schema keySchema = dataCollectionSchema.keySchema(); String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id()); // the record is produced lazily, so to have the correct offset as per the pre/post completion callbacks bufferedEvent = () -> { SourceRecord record = new SourceRecord( offsetContext.getPartition(), offsetContext.getOffset(), topicName, null, keySchema, key, dataCollectionSchema.getEnvelopeSchema().schema(), value, null, headers); return changeEventCreator.createDataChangeEvent(record); }; }