异源数据同步 → DataX 为什么要支持 kafka?
开心一刻
昨天发了一条朋友圈:酒吧有什么好去的,上个月在酒吧当服务员兼职,一位大姐看上了我,说一个月给我 10 万,要我陪她去上海,我没同意
朋友评论道:你没同意,为什么在上海?
我回复到:上个月没同意
前情回顾
关于
DataX
,官网有很详细的介绍,鄙人不才,也写过几篇文章
不了解的小伙伴可以按需去查看,所以了,
DataX
就不做过多介绍了;官方提供了非常多的插件,囊括了绝大部分的数据源,基本可以满足我们日常需要,但数据源种类太多,DataX 插件不可能包含全部,比如
kafka
,DataX 官方是没有提供读写插件的,大家知道为什么吗?你们如果对数据同步了解的比较多的话,一看到 kafka,第一反应往往想到的是
实时同步
,而 DataX 针对的是
离线同步
,所以 DataX 官方没提供 kafka 插件是不是也就能理解了?因为不合适嘛!
但如果客户非要离线同步也支持 kafka
你能怎么办?直接怼过去:实现不了?
所以没得选,那就只能给 DataX 开发一套 kafka 插件了;基于
DataX插件开发宝典
,插件开发起来还是非常简单的
kafkawriter
编程接口
自定义
Kafkawriter
继承 DataX 的
Writer
,实现 job、task 对应的接口即可/** * @author 青石路 */ public class KafkaWriter extends Writer { public static class Job extends Writer.Job { private Configuration conf = null; @Override public List<Configuration> split(int mandatoryNumber) { List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber); for (int i = 0; i < mandatoryNumber; i++) { configurations.add(this.conf.clone()); } return configurations; } private void validateParameter() { this.conf.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaWriterErrorCode.REQUIRED_VALUE); this.conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE); } @Override public void init() { this.conf = super.getPluginJobConf(); this.validateParameter(); } @Override public void destroy() { } } public static class Task extends Writer.Task { private static final Logger logger = LoggerFactory.getLogger(Task.class); private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n"); private Producer<String, String> producer; private Configuration conf; private Properties props; private String fieldDelimiter; private List<String> columns; private String writeType; @Override public void init() { this.conf = super.getPluginJobConf(); fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null); columns = conf.getList(Key.COLUMN, String.class); writeType = conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null); if (CollUtil.isEmpty(columns)) { throw DataXException.asDataXException(KafkaWriterErrorCode.REQUIRED_VALUE, String.format("您提供配置文件有误,[%s]是必填参数,不允许为空或者留白 .", Key.COLUMN)); } props = new Properties(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, conf.getString(Key.BOOTSTRAP_SERVERS)); //这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。 props.put(ProducerConfig.ACKS_CONFIG, conf.getUnnecessaryValue(Key.ACK, "0", null)); props.put(CommonClientConfigs.RETRIES_CONFIG, conf.getUnnecessaryValue(Key.RETRIES, "0", null)); props.put(ProducerConfig.BATCH_SIZE_CONFIG, conf.getUnnecessaryValue(Key.BATCH_SIZE, "16384", null)); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null)); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null)); Configuration saslConf = conf.getConfiguration(Key.SASL); if (ObjUtil.isNotNull(saslConf)) { logger.info("配置启用了SASL认证"); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslConf.getNecessaryValue(Key.SASL_SECURITY_PROTOCOL, KafkaWriterErrorCode.REQUIRED_VALUE)); props.put(SaslConfigs.SASL_MECHANISM, saslConf.getNecessaryValue(Key.SASL_MECHANISM, KafkaWriterErrorCode.REQUIRED_VALUE)); String userName = saslConf.getNecessaryValue(Key.SASL_USERNAME, KafkaWriterErrorCode.REQUIRED_VALUE); String password = saslConf.getNecessaryValue(Key.SASL_PASSWORD, KafkaWriterErrorCode.REQUIRED_VALUE); props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", userName, password)); } producer = new KafkaProducer<String, String>(props); } @Override public void prepare() { if (Boolean.parseBoolean(conf.getUnnecessaryValue(Key.NO_TOPIC_CREATE, "false", null))) { ListTopicsResult topicsResult = AdminClient.create(props).listTopics(); String topic = conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE); try { if (!topicsResult.names().get().contains(topic)) { new NewTopic( topic, Integer.parseInt(conf.getUnnecessaryValue(Key.TOPIC_NUM_PARTITION, "1", null)), Short.parseShort(conf.getUnnecessaryValue(Key.TOPIC_REPLICATION_FACTOR, "1", null)) ); List<NewTopic> newTopics = new ArrayList<NewTopic>(); AdminClient.create(props).createTopics(newTopics); } } catch (Exception e) { throw new DataXException(KafkaWriterErrorCode.CREATE_TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE.getDescription()); } } } @Override public void startWrite(RecordReceiver lineReceiver) { logger.info("start to writer kafka"); Record record = null; while ((record = lineReceiver.getFromReader()) != null) {//说明还在读取数据,或者读取的数据没处理完 //获取一行数据,按照指定分隔符 拼成字符串 发送出去 if (writeType.equalsIgnoreCase(WriteType.TEXT.name())) { producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC), recordToString(record), recordToString(record)) ); } else if (writeType.equalsIgnoreCase(WriteType.JSON.name())) { producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC), recordToString(record), recordToKafkaJson(record)) ); } producer.flush(); } } @Override public void destroy() { logger.info("producer close"); if (producer != null) { producer.close(); } } /** * 数据格式化 * * @param record * @return */ private String recordToString(Record record) { int recordLength = record.getColumnNumber(); if (0 == recordLength) { return NEWLINE_FLAG; } Column column; StringBuilder sb = new StringBuilder(); for (int i = 0; i < recordLength; i++) { column = record.getColumn(i); sb.append(column.asString()).append(fieldDelimiter); } sb.setLength(sb.length() - 1); sb.append(NEWLINE_FLAG); return sb.toString(); } private String recordToKafkaJson(Record record) { int recordLength = record.getColumnNumber(); if (recordLength != columns.size()) { throw DataXException.asDataXException(KafkaWriterErrorCode.ILLEGAL_PARAM, String.format("您提供配置文件有误,列数不匹配[record columns=%d, writer columns=%d]", recordLength, columns.size())); } List<KafkaColumn> kafkaColumns = new ArrayList<>(); for (int i = 0; i < recordLength; i++) { KafkaColumn column = new KafkaColumn(record.getColumn(i), columns.get(i)); kafkaColumns.add(column); } return JSONUtil.toJsonStr(kafkaColumns); } } }
DataX 框架按照如下的顺序执行 Job 和 Task 的接口
重点看 Task 的接口实现
init:读取配置项,然后创建 Producer 实例
prepare:判断 Topic 是否存在,不存在则创建
startWrite:通过 RecordReceiver 从 Channel 获取 Record,然后写入 Topic
支持两种写入格式:
text
、
json
,细节请看下文中的
kafkawriter.md
destroy:关闭 Producer 实例
实现不难,相信大家都能看懂
插件定义
在
resources
下新增
plugin.json
{ "name": "kafkawriter", "class": "com.qsl.datax.plugin.writer.kafkawriter.KafkaWriter", "description": "write data to kafka", "developer": "qsl" }
强调下
class
,是
KafkaWriter
的全限定类名,如果你们没有完全拷贝我的,那么要改成你们自己的配置文件
在
resources
下新增
plugin_job_template.json
{ "name": "kafkawriter", "parameter": { "bootstrapServers": "", "topic": "", "ack": "all", "batchSize": 1000, "retries": 0, "fieldDelimiter": ",", "writeType": "json", "column": [ "const_id", "const_field", "const_field_value" ], "sasl": { "securityProtocol": "SASL_PLAINTEXT", "mechanism": "PLAIN", "username": "", "password": "" } } }
配置项说明:
kafkawriter.md打包发布
可以参考官方的
assembly
配置,利用 assembly 来打包
至此,
kafkawriter
就算完成了
kafkareader
编程接口
自定义
Kafkareader
继承 DataX 的
Reader
,实现 job、task 对应的接口即可/** * @author 青石路 */ public class KafkaReader extends Reader { public static class Job extends Reader.Job { private Configuration originalConfig = null; @Override public void init() { this.originalConfig = super.getPluginJobConf(); this.validateParameter(); } @Override public void destroy() { } @Override public List<Configuration> split(int adviceNumber) { List<Configuration> configurations = new ArrayList<>(adviceNumber); for (int i=0; i<adviceNumber; i++) { configurations.add(this.originalConfig.clone()); } return configurations; } private void validateParameter() { this.originalConfig.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaReaderErrorCode.REQUIRED_VALUE); this.originalConfig.getNecessaryValue(Key.TOPIC, KafkaReaderErrorCode.REQUIRED_VALUE); } } public static class Task extends Reader.Task { private static final Logger logger = LoggerFactory.getLogger(Task.class); private Consumer<String, String> consumer; private String topic; private Configuration conf; private int maxPollRecords; private String fieldDelimiter; private String readType; private List<Column.Type> columnTypes; @Override public void destroy() { logger.info("consumer close"); if (Objects.nonNull(consumer)) { consumer.close(); } } @Override public void init() { this.conf = super.getPluginJobConf(); this.topic = conf.getString(Key.TOPIC); this.maxPollRecords = conf.getInt(Key.MAX_POLL_RECORDS, 500); fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null); readType = conf.getUnnecessaryValue(Key.READ_TYPE, ReadType.JSON.name(), null); if (!ReadType.JSON.name().equalsIgnoreCase(readType) && !ReadType.TEXT.name().equalsIgnoreCase(readType)) { throw DataXException.asDataXException(KafkaReaderErrorCode.REQUIRED_VALUE, String.format("您提供配置文件有误,不支持的readType[%s]", readType)); } if (ReadType.JSON.name().equalsIgnoreCase(readType)) { List<String> columnTypeList = conf.getList(Key.COLUMN_TYPE, String.class); if (CollUtil.isEmpty(columnTypeList)) { throw DataXException.asDataXException(KafkaReaderErrorCode.REQUIRED_VALUE, String.format("您提供配置文件有误,readType是JSON时[%s]是必填参数,不允许为空或者留白 .", Key.COLUMN_TYPE)); } convertColumnType(columnTypeList); } Properties props = new Properties(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, conf.getString(Key.BOOTSTRAP_SERVERS)); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer", null)); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer", null)); props.put(ConsumerConfig.GROUP_ID_CONFIG, conf.getNecessaryValue(Key.GROUP_ID, KafkaReaderErrorCode.REQUIRED_VALUE)); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); Configuration saslConf = conf.getConfiguration(Key.SASL); if (ObjUtil.isNotNull(saslConf)) { logger.info("配置启用了SASL认证"); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslConf.getNecessaryValue(Key.SASL_SECURITY_PROTOCOL, KafkaReaderErrorCode.REQUIRED_VALUE)); props.put(SaslConfigs.SASL_MECHANISM, saslConf.getNecessaryValue(Key.SASL_MECHANISM, KafkaReaderErrorCode.REQUIRED_VALUE)); String userName = saslConf.getNecessaryValue(Key.SASL_USERNAME, KafkaReaderErrorCode.REQUIRED_VALUE); String password = saslConf.getNecessaryValue(Key.SASL_PASSWORD, KafkaReaderErrorCode.REQUIRED_VALUE); props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", userName, password)); } consumer = new KafkaConsumer<>(props); } @Override public void startRead(RecordSender recordSender) { consumer.subscribe(CollUtil.newArrayList(topic)); int pollTimeoutMs = conf.getInt(Key.POLL_TIMEOUT_MS, 1000); int retries = conf.getInt(Key.RETRIES, 5); if (retries < 0) { logger.info("joinGroupSuccessRetries 配置有误[{}], 重置成默认值[5]", retries); retries = 5; } /** * consumer 每次都是新创建,第一次poll时会重新加入消费者组,加入过程会进行Rebalance,而 Rebalance 会导致同一 Group 内的所有消费者都不能工作 * 所以 poll 拉取的过程中,即使topic中有数据也不一定能拉到,因为 consumer 正在加入消费者组中 * kafka-clients 没有对应的API、事件机制来知道 consumer 成功加入消费者组的确切时间 * 故增加重试 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(pollTimeoutMs)); int i = 0; if (CollUtil.isEmpty(records)) { for (; i < retries; i++) { records = consumer.poll(Duration.ofMillis(pollTimeoutMs)); logger.info("第 {} 次重试,获取消息记录数[{}]", i + 1, records.count()); if (!CollUtil.isEmpty(records)) { break; } } } if (i >= retries) { logger.info("重试 {} 次后,仍未获取到消息,请确认是否有数据、配置是否正确", retries); return; } transferRecord(recordSender, records); do { records = consumer.poll(Duration.ofMillis(pollTimeoutMs)); transferRecord(recordSender, records); } while (!CollUtil.isEmpty(records) && records.count() >= maxPollRecords); } private void transferRecord(RecordSender recordSender, ConsumerRecords<String, String> records) { if (CollUtil.isEmpty(records)) { return; } for (ConsumerRecord<String, String> record : records) { Record sendRecord = recordSender.createRecord(); String msgValue = record.value(); if (ReadType.JSON.name().equalsIgnoreCase(readType)) { transportJsonToRecord(sendRecord, msgValue); } else if (ReadType.TEXT.name().equalsIgnoreCase(readType)) { // readType = text,全当字符串类型处理 String[] columnValues = msgValue.split(fieldDelimiter); for (String columnValue : columnValues) { sendRecord.addColumn(new StringColumn(columnValue)); } } recordSender.sendToWriter(sendRecord); } consumer.commitAsync(); } private void convertColumnType(List<String> columnTypeList) { columnTypes = new ArrayList<>(); for (String columnType : columnTypeList) { switch (columnType.toUpperCase()) { case "STRING": columnTypes.add(Column.Type.STRING); break; case "LONG": columnTypes.add(Column.Type.LONG); break; case "DOUBLE": columnTypes.add(Column.Type.DOUBLE); case "DATE": columnTypes.add(Column.Type.DATE); break; case "BOOLEAN": columnTypes.add(Column.Type.BOOL); break; case "BYTES": columnTypes.add(Column.Type.BYTES); break; default: throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM, String.format("您提供的配置文件有误,datax不支持数据类型[%s]", columnType)); } } } private void transportJsonToRecord(Record sendRecord, String msgValue) { List<KafkaColumn> kafkaColumns = JSONUtil.toList(msgValue, KafkaColumn.class); if (columnTypes.size() != kafkaColumns.size()) { throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM, String.format("您提供的配置文件有误,readType是JSON时[%s列数=%d]与[json列数=%d]的数量不匹配", Key.COLUMN_TYPE, columnTypes.size(), kafkaColumns.size())); } for (int i=0; i<columnTypes.size(); i++) { KafkaColumn kafkaColumn = kafkaColumns.get(i); switch (columnTypes.get(i)) { case STRING: sendRecord.setColumn(i, new StringColumn(kafkaColumn.getColumnValue())); break; case LONG: sendRecord.setColumn(i, new LongColumn(kafkaColumn.getColumnValue())); break; case DOUBLE: sendRecord.setColumn(i, new DoubleColumn(kafkaColumn.getColumnValue())); break; case DATE: // 暂只支持时间戳 sendRecord.setColumn(i, new DateColumn(Long.parseLong(kafkaColumn.getColumnValue()))); break; case BOOL: sendRecord.setColumn(i, new BoolColumn(kafkaColumn.getColumnValue())); break; case BYTES: sendRecord.setColumn(i, new BytesColumn(kafkaColumn.getColumnValue().getBytes(StandardCharsets.UTF_8))); break; default: throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM, String.format("您提供的配置文件有误,datax不支持数据类型[%s]", columnTypes.get(i))); } } } } }
重点看 Task 的接口实现
init:读取配置项,然后创建 Consumer 实例
startWrite:从 Topic 拉取数据,通过 RecordSender 写入到 Channel 中
这里有几个细节需要注意下
- Consumer 每次都是新创建的,拉取数据的时候,如果消费者还未加入到指定的消费者组中,那么它会先加入到消费者组中,加入过程会进行 Rebalance,而 Rebalance 会导致同一消费者组内的所有消费者都不能工作,此时即使 Topic 中有可拉取的消息,也拉取不到消息,所以引入了重试机制来尽量保证那一次同步任务拉取的时候,消费者能正常拉取消息
- 一旦 Consumer 拉取到消息,则会循环拉取消息,如果某一次的拉取数据量小于最大拉取量(maxPollRecords),说明 Topic 中的消息已经被拉取完了,那么循环终止;这与常规使用(Consumer 会一直主动拉取或被动接收)是有差别的
- 支持两种读取格式:
text
、
json
,细节请看下文的配置文件说明 - 为了保证写入 Channel 数据的完整,需要配置列的数据类型(DataX 的数据类型)
destroy:
关闭 Consumer 实例
插件定义
在
resources
下新增
plugin.json
{ "name": "kafkareader", "class": "com.qsl.datax.plugin.reader.kafkareader.KafkaReader", "description": "read data from kafka", "developer": "qsl" }
class
是
KafkaReader
的全限定类名配置文件
在
resources
下新增
plugin_job_template.json
{ "name": "kafkareader", "parameter": { "bootstrapServers": "", "topic": "test-kafka", "groupId": "test1", "writeType": "json", "pollTimeoutMs": 2000, "columnType": [ "LONG", "STRING", "STRING" ], "sasl": { "securityProtocol": "SASL_PLAINTEXT", "mechanism": "PLAIN", "username": "", "password": "2" } } }
配置项说明:
kafkareader.md打包发布
可以参考官方的
assembly
配置,利用 assembly 来打包
至此,
kafkareader
也完成了
总结
- 完整代码:
qsl-datax - kafkareader 重试机制只能降低拉取不到数据的概率,并不能杜绝;另外,如果上游一直往 Topic 中发消息,kafkareader 每次拉取的数据量都等于最大拉取量,那么同步任务会一直进行而不会停止,这还是离线同步吗?
- 离线同步,不推荐走 kafka,因为用 kafka 走实时同步更香