更多技术交流、求职机会,欢迎关注
字节跳动数据平台微信公众号,回复【1】进入官方交流群

BitSail 是字节跳动自研的数据集成产品,支持多种异构数据源间的数据同步,并提供离线、实时、全量、增量场景下全域数据集成解决方案。本系列聚焦 BitSail Connector 开发模块,为大家带来详细全面的开发方法与场景示例,本篇将主要介绍 Source 接口部分。

持续关注,BitSail Connector 开发详解将分为四篇呈现。

  • BitSail Connector 开发详解系列一:Source

  • BitSail Connector 开发详解系列二:SourceSplitCoordinator

  • BitSail Connector 开发详解系列三:SourceReader

  • BitSail Connector 开发详解系列四:Sink、Writer

Source Connector

本文将主要介绍 Source 接口部分:

  • Source: 参与数据读取组件的生命周期管理,主要负责和框架的交互,构架作业,不参与作业真正的执行。

  • SourceSplit: 数据读取分片,大数据处理框架的核心目的就是将大规模的数据拆分成为多个合理的 Split 并行处理。

  • State:作业状态快照,当开启 checkpoint 之后,会保存当前执行状态。

Source

数据读取组件的生命周期管理,主要负责和框架的交互,构架作业,它不参与作业真正的执行。

以 RocketMQSource 为例:Source 方法需要实现 Source 和 ParallelismComputable 接口。

Source 接口

public interface Source<T, SplitT extends SourceSplit, StateT extends Serializable>
    extendsSerializable, TypeInfoConverterFactory {/*** Run in client side for source initialize;*/
  void configure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) throwsIOException;/*** Indicate the Source type.*/Boundedness getSourceBoundedness();/*** Create Source Reader.*/SourceReader<T, SplitT>createReader(SourceReader.Context readerContext);/*** Create split coordinator.*/SourceSplitCoordinator<SplitT, StateT> createSplitCoordinator(SourceSplitCoordinator.Context<SplitT, StateT>coordinatorContext);/*** Get Split serializer for the framework,{@linkSplitT}should implement from {@linkSerializable}*/
  default BinarySerializer<SplitT>getSplitSerializer() {return new SimpleBinarySerializer<>();
}
/*** Get State serializer for the framework, {@linkStateT}should implement from {@linkSerializable}*/ default BinarySerializer<StateT>getSplitCoordinatorCheckpointSerializer() {return new SimpleBinarySerializer<>();
}
/*** Create type info converter for the source, default value {@linkBitSailTypeInfoConverter}*/ defaultTypeInfoConverter createTypeInfoConverter() {return newBitSailTypeInfoConverter();
}
/*** Get Source' name.*/String getReaderName();
}

configure 方法

主要去做一些客户端的配置的分发和提取,可以操作运行时环境 ExecutionEnviron 的配置和 readerConfiguration 的配置。

示例
@Overridepublic voidconfigure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) {this.readerConfiguration =readerConfiguration;this.commonConfiguration =execution.getCommonConfiguration();
}

getSourceBoundedness 方法

设置作业的处理方式,是采用流式处理方法、批式处理方法,或者是流批一体的处理方式,在流批一体的场景中,我们需要根据作业的不同类型设置不同的处理方式。

具体对应关系如下:

流批一体场景示例
@OverridepublicBoundedness getSourceBoundedness() {return Mode.BATCH.equals(Mode.getJobRunMode(commonConfiguration.get(CommonOptions.JOB_TYPE))) ?Boundedness.BOUNDEDNESS :
Boundedness.UNBOUNDEDNESS;
}
流批一体场景示例
@OverridepublicBoundedness getSourceBoundedness() {return Mode.BATCH.equals(Mode.getJobRunMode(commonConfiguration.get(CommonOptions.JOB_TYPE))) ?Boundedness.BOUNDEDNESS :
Boundedness.UNBOUNDEDNESS;
}

createTypeInfoConverter 方法

用于指定 Source 连接器的类型转换器;我们知道大多数的外部数据系统都存在着自己的类型定义,它们的定义与 BitSail 的类型定义不会完全一致;为了简化类型定义的转换,我们支持了通过配置文件来映射两者之间的关系,进而来简化配置文件的开发。

在行为上表现为对任务描述 Json 文件中
reader
部分的
columns
的解析,对于
columns
中不同字段的 type 会根据上面描述文件从
ClickhouseReaderOptions.
COLUMNS
字段中解析到
readerContext.getTypeInfos()
中。

实现
  • BitSailTypeInfoConverter

    默认的
    TypeInfoConverter
    ,直接对
    ReaderOptions.
    COLUMNS
    字段进行字符串的直接解析,
    COLUMNS
    字段中是什么类型,
    TypeInfoConverter
    中就是什么类型。

  • FileMappingTypeInfoConverter

    会在 BitSail 类型系统转换时去绑定
    {readername}-type-converter.yaml
    文件,做数据库字段类型和 BitSail 类型的映射。
    ReaderOptions.
    COLUMNS
    字段在通过这个映射文件转换后才会映射到
    TypeInfoConverter
    中。

示例

FileMappingTypeInfoConverter

通过 JDBC 方式连接的数据库,包括 MySql、Oracle、SqlServer、Kudu、ClickHouse 等。这里数据源的特点是以
java.sql.ResultSet
的接口形式返回获取的数据,对于这类数据库,我们往往将
TypeInfoConverter
对象设计为
FileMappingTypeInfoConverter
,这个对象会在 BitSail 类型系统转换时去绑定
{readername}-type-converter.yaml
文件,做数据库字段类型和 BitSail 类型的映射。

@OverridepublicTypeInfoConverter createTypeInfoConverter() {return newFileMappingTypeInfoConverter(getReaderName());
}

对于
{readername}-type-converter.yaml
文件的解析,以
clickhouse-type-converter.yaml
为例。

# Clickhouse Type to BitSail Type
engine.type.to.bitsail.type.converter:
-source.type: int32
target.type:
int -source.type: float64
target.type:
double -source.type: string
target.type: string
-source.type: date
target.type: date.date
- source.type: nulltarget.type:void# BitSail Type to Clickhouse Type
bitsail.type.to.engine.type.converter:
- source.type: inttarget.type: int32- source.type: doubletarget.type: float64-source.type: date.date
target.type: date
-source.type: string
target.type: string

这个文件起到的作用是进行 job 描述 json 文件中
reader
部分的
columns
的解析,对于
columns
中不同字段的 type 会根据上面描述文件从
ClickhouseReaderOptions.
COLUMNS
字段中解析到
readerContext.getTypeInfos()
中。

"reader": {"class": "com.bytedance.bitsail.connector.clickhouse.source.ClickhouseSource","jdbc_url": "jdbc:clickhouse://localhost:8123","db_name": "default","table_name": "test_ch_table","split_field": "id","split_config": "{\"name\": \"id\", \"lower_bound\": 0, \"upper_bound\": \"10000\", \"split_num\": 3}","sql_filter": "( id % 2 == 0 )","columns": [
{
"name": "id","type": "int64"},
{
"name": "int_type","type": "int32"},
{
"name": "double_type","type": "float64"},
{
"name": "string_type","type": "string"},
{
"name": "p_date","type": "date"}
]
},

这种方式不仅仅适用于数据库,也适用于所有需要在类型转换中需要引擎侧和 BitSail 侧进行类型映射的场景。

BitSailTypeInfoConverter

通常采用默认的方式进行类型转换,直接对
ReaderOptions.
COLUMNS
字段进行字符串的直接解析。

@OverridepublicTypeInfoConverter createTypeInfoConverter() {return newBitSailTypeInfoConverter();
}

以 Hadoop 为例:

"reader": {"class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource","path_list": "hdfs://127.0.0.1:9000/test_namespace/source/test.json","content_type":"json","reader_parallelism_num": 1,"columns": [
{
"name":"id","type": "int"},
{
"name": "string_type","type": "string"},
{
"name": "map_string_string","type": "map<string,string>"},
{
"name": "array_string","type": "list<string>"}
]
}

createSourceReader 方法

书写具体的数据读取逻辑,负责数据读取的组件,在接收到 Split 后会对其进行数据读取,然后将数据传输给下一个算子。

具体传入构造 SourceReader 的参数按需求决定,但是一定要保证所有参数可以序列化。如果不可序列化,将会在 createJobGraph 的时候出错。

示例
public SourceReader<Row, RocketMQSplit>createReader(SourceReader.Context readerContext) {return newRocketMQSourceReader(
readerConfiguration,
readerContext,
getSourceBoundedness());
}

createSplitCoordinator 方法

书写具体的数据分片、分片分配逻辑,SplitCoordinator 承担了去创建、管理 Split 的角色。

具体传入构造 SplitCoordinator 的参数按需求决定,但是一定要保证所有参数可以序列化。如果不可序列化,将会在 createJobGraph 的时候出错。

示例
public SourceSplitCoordinator<RocketMQSplit, RocketMQState>createSplitCoordinator(SourceSplitCoordinator
.Context
<RocketMQSplit, RocketMQState>coordinatorContext) {return newRocketMQSourceSplitCoordinator(
coordinatorContext,
readerConfiguration,
getSourceBoundedness());
}

ParallelismComputable 接口

public interface ParallelismComputable extendsSerializable {/*** give a parallelism advice for reader/writer based on configurations and upstream parallelism advice
*
*
@paramcommonConf common configuration
*
@paramselfConf reader/writer configuration
*
@paramupstreamAdvice parallelism advice from upstream (when an operator has no upstream in DAG, its upstream is
* global parallelism)
*
@returnparallelism advice for the reader/writer*/ParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConf,
BitSailConfiguration selfConf,
ParallelismAdvice upstreamAdvice)
throwsException;
}

getParallelismAdvice 方法

用于指定下游 reader 的并行数目。一般有以下的方式:

可以选择
selfConf.get(ClickhouseReaderOptions.
READER_PARALLELISM_NUM
)
来指定并行度。

也可以自定义自己的并行度划分逻辑。

示例

比如在 RocketMQ 中,我们可以定义每 1 个 reader 可以处理至多 4 个队列
DEFAULT_ROCKETMQ_PARALLELISM_THRESHOLD
= 4

通过这种自定义的方式获取对应的并行度。

publicParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConfiguration,
BitSailConfiguration rocketmqConfiguration,
ParallelismAdvice upstreamAdvice)
throwsException {
String cluster
=rocketmqConfiguration.get(RocketMQSourceOptions.CLUSTER);
String topic
=rocketmqConfiguration.get(RocketMQSourceOptions.TOPIC);
String consumerGroup
=rocketmqConfiguration.get(RocketMQSourceOptions.CONSUMER_GROUP);
DefaultLitePullConsumer consumer
=RocketMQUtils.prepareRocketMQConsumer(rocketmqConfiguration, String.format(SOURCE_INSTANCE_NAME_TEMPLATE,
cluster,
topic,
consumerGroup,
UUID.randomUUID()
));
try{
consumer.start();
Collection
<MessageQueue> messageQueues =consumer.fetchMessageQueues(topic);int adviceParallelism = Math.max(CollectionUtils.size(messageQueues) / DEFAULT_ROCKETMQ_PARALLELISM_THRESHOLD, 1);returnParallelismAdvice.builder()
.adviceParallelism(adviceParallelism)
.enforceDownStreamChain(
true)
.build();
}
finally{
consumer.shutdown();
}
}
}

SourceSplit

数据源的数据分片格式,需要我们实现 SourceSplit 接口。

SourceSplit 接口

要求我们实现一个实现一个获取 splitId 的方法。

public interface SourceSplit extendsSerializable {
String uniqSplitId();
}

对于具体切片的格式,开发者可以按照自己的需求进行自定义。

示例

JDBC 类存储

一般会通过主键,来对数据进行最大、最小值的划分;对于无主键类则通常会将其认定为一个 split,不再进行拆分,所以 split 中的参数包括主键的最大最小值,以及一个布尔类型的
readTable
,如果无主键类或是不进行主键的切分则整张表会视为一个 split,此时
readTable

true
,如果按主键最大最小值进行切分,则设置为
false

以 ClickhouseSourceSplit 为例:

@Setterpublic class ClickhouseSourceSplit implementsSourceSplit {public static final String SOURCE_SPLIT_PREFIX = "clickhouse_source_split_";private static final String BETWEEN_CLAUSE = "( `%s` BETWEEN ? AND ? )";private finalString splitId;/*** Read whole table or range [lower, upper]*/
  private booleanreadTable;privateLong lower;privateLong upper;public ClickhouseSourceSplit(intsplitId) {this.splitId = SOURCE_SPLIT_PREFIX +splitId;
}

@Override
publicString uniqSplitId() {returnsplitId;
}
public voiddecorateStatement(PreparedStatement statement) {try{if(readTable) {
lower
=Long.MIN_VALUE;
upper
=Long.MAX_VALUE;
}
statement.setObject(
1, lower);
statement.setObject(
2, upper);
}
catch(SQLException e) {throw BitSailException.asBitSailException(CommonErrorCode.RUNTIME_ERROR, "Failed to decorate statement with split " + this, e.getCause());
}
}
public staticString getRangeClause(String splitField) {return StringUtils.isEmpty(splitField) ? null: String.format(BETWEEN_CLAUSE, splitField);
}

@Override
publicString toString() {returnString.format("{\"split_id\":\"%s\", \"lower\":%s, \"upper\":%s, \"readTable\":%s}",
splitId, lower, upper, readTable);
}
}

消息队列

一般按照消息队列中 topic 注册的 partitions 的数量进行 split 的划分,切片中主要应包含消费的起点和终点以及消费的队列。

以 RocketMQSplit 为例:

@Builder
@Getter
public class RocketMQSplit implementsSourceSplit {privateMessageQueue messageQueue;

@Setter
private longstartOffset;private longendOffset;privateString splitId;

@Override
publicString uniqSplitId() {returnsplitId;
}

@Override
publicString toString() {return "RocketMQSplit{" + "messageQueue=" + messageQueue + ", startOffset=" + startOffset + ", endOffset=" + endOffset + '}';
}
}

文件系统

一般会按照文件作为最小粒度进行划分,同时有些格式也支持将单个文件拆分为多个子 Splits。文件系统 split 中需要包装所需的文件切片。

以 FtpSourceSplit 为例:

public class FtpSourceSplit implementsSourceSplit {public static final String FTP_SOURCE_SPLIT_PREFIX = "ftp_source_split_";private finalString splitId;

@Setter
privateString path;
@Setter
private longfileSize;public FtpSourceSplit(intsplitId) {this.splitId = FTP_SOURCE_SPLIT_PREFIX +splitId;
}

@Override
publicString uniqSplitId() {returnsplitId;
}

@Override
public booleanequals(Object obj) {return (obj instanceof FtpSourceSplit) &&(splitId.equals(((FtpSourceSplit) obj).splitId));
}

}

特别的,在 Hadoop 文件系统中,我们也可以利用对
org.apache.hadoop.mapred.InputSplit
类的包装来自定义我们的 Split。

public class HadoopSourceSplit implementsSourceSplit {private static final long serialVersionUID = 1L;private final Class<? extends InputSplit>splitType;private transientInputSplit hadoopInputSplit;private byte[] hadoopInputSplitByteArray;publicHadoopSourceSplit(InputSplit inputSplit) {if (inputSplit == null) {throw new NullPointerException("Hadoop input split must not be null");
}
this.splitType =inputSplit.getClass();this.hadoopInputSplit =inputSplit;
}
publicInputSplit getHadoopInputSplit() {return this.hadoopInputSplit;
}
public voidinitInputSplit(JobConf jobConf) {if (this.hadoopInputSplit != null) {return;
}

checkNotNull(hadoopInputSplitByteArray);
try{this.hadoopInputSplit =(InputSplit) WritableFactories.newInstance(splitType);if (this.hadoopInputSplit instanceofConfigurable) {
((Configurable)
this.hadoopInputSplit).setConf(jobConf);
}
else if (this.hadoopInputSplit instanceofJobConfigurable) {
((JobConfigurable)
this.hadoopInputSplit).configure(jobConf);
}
if (hadoopInputSplitByteArray != null) {try (ObjectInputStream objectInputStream = new ObjectInputStream(newByteArrayInputStream(hadoopInputSplitByteArray))) {this.hadoopInputSplit.readFields(objectInputStream);
}
this.hadoopInputSplitByteArray = null;
}
}
catch(Exception e) {throw new RuntimeException("Unable to instantiate Hadoop InputSplit", e);
}
}
private void writeObject(ObjectOutputStream out) throwsIOException {if (hadoopInputSplit != null) {try(
ByteArrayOutputStream byteArrayOutputStream
= newByteArrayOutputStream();
ObjectOutputStream objectOutputStream
= newObjectOutputStream(byteArrayOutputStream)
) {
this.hadoopInputSplit.write(objectOutputStream);
objectOutputStream.flush();
this.hadoopInputSplitByteArray =byteArrayOutputStream.toByteArray();
}
}
out.defaultWriteObject();
}

@Override
publicString uniqSplitId() {returnhadoopInputSplit.toString();
}
}

State

在需要做 checkpoint 的场景下,通常我们会通过 Map 来保留当前的执行状态

流批一体场景

在流批一体场景中,我们需要保存状态以便从异常中断的流式作业恢复

以 RocketMQState 为例:

public class RocketMQState implementsSerializable {private final Map<MessageQueue, String>assignedWithSplitIds;public RocketMQState(Map<MessageQueue, String>assignedWithSplitIds) {this.assignedWithSplitIds =assignedWithSplitIds;
}
public Map<MessageQueue, String>getAssignedWithSplits() {returnassignedWithSplitIds;
}
}

批式场景

对于批式场景,我们可以使用
EmptyState
不存储状态,如果需要状态存储,和流批一体场景采用相似的设计方案。

public class EmptyState implementsSerializable {public staticEmptyState fromBytes() {return newEmptyState();
}
}

标签: none

添加新评论