kafka elasticsearch sink connector 开发

kafka elasticsearch sink connector 开发

20200729114841.png

写在最初

由于时间紧张,本来没打算花多少时间写这个东西,开始就想着从网上扒拉几个现成的改改就用,但是没想到从 github clone 的项目都存在各种各样的问题,结果就是花的时间甚至比老老实实看文档自己实现来得多。

此后操作此类之前未熟悉的东西之前,还是回到最初看看文档,然后再动手自己写或者改已有的。

connector 分为 source connector 和 sink connector,前者是从第三方系统获取数据写入 kafka,后者是从 kafka 获取数据然后写入第三方系统。我们这里实现的是一个 sink connector,即从 kafka 读取数据然后写入到 ES 中。

connector 本质是一个插件,无法独立运行,只能通过 connect-runtime 来调用。runtime 可以用分布式方式也可以用 standalone 方式运行,我们这里采用的是 standalone 方式。具体采用哪种方式与 connector 编写毫无关系。

版本选择

版本选择非常重要,也是本次开发过程遇到问题最多花时间最多的一个地方,尤其是 ES。由于之前自己并不十分熟悉 ES,不知道 ES 版本之间 API 兼容如此之差。

kafka connect api 和 runtime 版本

最初参考的版本比较老,导致设置多个 task(后面讲)的时候,各个 consumer 协调总是出问题。于是根据线上使用的 kafka 集群版本选择了 kafka connect-api 的版本,即 2.0.1 版本。connect-runtime 版本与此一致。

这还不算完,由于 connect api 采用的 log4j 版本比较低,而我在项目中使用的版本较高,导致前者解析配置文件出错(具体是无法识别配置中的 Delete 节点)无法打印日志。所以,最好像下面这样 exclue 掉 connect-api 中的 log4j 相关配置。connect 具体依赖如下:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>connect-api</artifactId>
    <version>${kafka.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>connect-runtime</artifactId>
    <version>${kafka.version}</version>
    <exclusions>
    <exclusion>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
    </exclusion>
    <exclusion>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
    </exclusion>
    <exclusion>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
    </exclusion>
    </exclusions>
</dependency>

我们开发的 connector 其实就是个类似插件的东西,runtime 才是主程序框架,用于运行我们写的 connector。

elasticsearch 版本

首先说我之前从来没做个 ES 相关开发,对它的认识很不足。初始采用的版本很低,而且远低于线上 ES 版本,加上 ES 本身有一些莫名其妙的状况,导致花了很多时间。后来还是采用与线上一致的版本(6.8.2),把版本拉齐,即使出了问题也减少了一个排查变量,可以省很多事。ES 相关依赖如下:

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>${elasticsearch.version}</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>${elasticsearch.version}</version>
</dependency>

上面之所以出现 transport 依赖,是因为我在 connector 中采用的是 transport client,即把 connector 作为其中一个 node 去连接 ES 集群然后发送数据。

实现

开发一个 sink connector,只须扩展以下两个类即可:

  • SinkConnector
  • SinkTask

扩展 SinkConnector

runtime 相关配置

connector 只有个别需要开发者必须指定的配置,如:

# 要连接的 kafka 集群,多个实例地址用英文逗号分隔
bootstrap.servers=localhost:9092
# 消费 kafka 消息时所用的 key convertor
key.converter=org.apache.kafka.connect.json.JsonConverter
# 消费 kafka 消息时所用的 value convertor
value.converter=org.apache.kafka.connect.json.JsonConverter

其它配置可以随意选择,值得说明的一点是,虽然官方文档明确指出,如果以 standalone 方式运行 runtime,需要指定 offset.storage.file.filename 用于保存 connector 从 kafka 消费的消息偏移量以在 connector 重启后从该点继续,但是实践中发现,虽然指定了该配置,但是 kafka 并未使用,而是将 offset 保存到了 kafka 集群中。

上述配置放到配置文件中,配置文件地址作为第一个参数传给 runtime。

若要定制更多 runtime 行为,具体配置见官方文档

connector 相关配置

# connector 名字,确保唯一
name=local-elasticsearch-sink
# 我们自己开发的 connector 类名(full-qualified)
connector.class=com.my.connect.es.ElasticsearchSinkConnector
# 要在 connector 中启动的 SinkTask 数目,由于我为目标 topic 分配了 5 个分区,
# 所以希望能启动 5 个 consumer 去消费同时将消费的数据存入 ES,该参数具体含义键官方文档
tasks.max=5
# 要消费的 topic 列表,若有多个则用英文逗号分隔
topics=fluent-bit
# 下面几个参数是与 connector 的自定义行为相关的
# 要连接的 ES 集群的名称,因为我才用的 transport client,需要知晓集群的名称
es.cluster.name=gnome-adx
# ES 集群其中一个 Node 的 ip
es.host=172.21.53.18
# 必须上述 host 上 Node 监听的 transport.tcp.port
es.port=9300
# 由于我设计的 ES 的 index 为一个前缀加 topic 名称,这里指定前缀
index.prefix=kafka-
# 写入 ES 时对应的 type
type=kafka

有了上述 connector 配置,我们要做一个重要的工作,就是实现 config 方法。

config 方法

该方法实现很简单,就是返回我们自定义个一个 ConfigDef 实例:

@Override
public ConfigDef config() {
  logger.info("connector[{}] get config-def", this.getClass());
  return CONFIG_DEF;
}

CONFIG_DEF 就是我们自己定义个一个 ConfigDef 类型的成员,它涵盖了前一节 connector 全部配置,具体如下:

static final ConfigDef CONFIG_DEF = new ConfigDef()
  .define(NAME, 
    ConfigDef.Type.STRING, 
    ConfigDef.Importance.HIGH, 
    "Unique name for the connector. 
      Attempting to register again with the same name will fail.")
  .define(CONNECTOR_CLASS, 
    ConfigDef.Type.STRING,   
    ConfigDef.Importance.HIGH, 
    "The Java class for the connector.")
  .define(TASKS_MAX, 
    ConfigDef.Type.INT, 
    ConfigDef.Importance.HIGH,   
    "The maximum number of tasks that should be created for this  connector. 
      The connector may create fewer tasks if it cannot achieve this level of parallelism.")
  .define(TOPICS, 
    ConfigDef.Type.LIST, 
    ConfigDef.Importance.HIGH,   
    "A comma-separated list of topics to use as input for this connector.")
  .define(ES_CLUSTER_NAME, 
    ConfigDef.Type.STRING,   
    ConfigDef.Importance.HIGH, 
    "The name of es cluster to be   connected.")
  .define(ES_HOST, 
    ConfigDef.Type.STRING, 
    ConfigDef.Importance.HIGH,   
    "The host of an node of the es cluster.")
  .define(ES_PORT, 
    ConfigDef.Type.INT, 
    ConfigDef.Importance.HIGH,   
    "The transport tcp port of an node of the es cluster.")
  .define(INDEX_PREFIX, 
    ConfigDef.Type.STRING,   
    ConfigDef.Importance.HIGH, 
    "Index prefix to be written to the es cluster.")
  .define(TYPE_NAME, 
    ConfigDef.Type.STRING,   
    ConfigDef.Importance.HIGH, 
    "Type to be written to the es cluster.");

后面在扩展 SinkTask 的时候我们会用这里的 CONFIG_DEF 来解析 runtime
传入的配置项。而且,runtime 也会通过调用 config 方法拿到这里的配置后去进行校验。

version 方法

此方法返回自定义 connector 版本,直接返回字符串或者采用比较复杂的配置均可。不赘述。参考实现如下:

@Override
public String version() {
  return "1.0";
}

start 方法

该方法在该 connector 第一次实例化或者被中止(connector 启动后会自动监听 8083 端口,通过发送 put 请求到 PUT /connectors/{name}/pause 即可,具体见官方文档)后调用。该方法就是做一些初始化工作,其参数为 runtime 框架传入的 connector 相关配置,你可以定义一个本地变量将该这些配置存储到本地 configProperties 中,参考如下:

@Override
public void start(Map<String, String> props) {
  configProperties = props;
  logger.info("connector[{}] started, configuration is {}", 
    this.getClass(), configProperties);
}

taskClass 方法

该类很关键,它决定了 runtime 如何去起 sink 任务。实现起来很简单,只需返回我们扩展的 SinkTask 类即可:

@Override
public Class<? extends Task> taskClass() {
  return ElasticsearchSinkTask.class;
}

taskConfigs 方法

该方法用于在自定义 connector 中为每个 SinkTask 指定配置,具体 SinkTask 数目以你指定的 taskMax 参数有关,我们这里实现是将上面保存的 start 方法的参数拷贝到返回列表中:

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
  ArrayList<Map<String, String>> configs = new ArrayList<>();
  Map<String, String> config = new HashMap<>(configProperties);
  for (int i = 0; i < maxTasks; i++) {
    configs.add(config);
  }
  return configs;
}

runtime 调用该方法拿到配置列表后去启动 SinkTask 时会作为参数传入其 start 方法,然后 SinkTask 解析这些参数并使用。

stop 方法

停止自定义 connector 时如果要做一些清理工作可以放在这里做,因为我们这次并不需要做什么清理,所以仅在里面打印了一条日志:

@Override
public void stop() {
  logger.info("connector[{}] stopped", this.getClass());
}

扩展 SinkTask

version 方法

此处直接返回上面自定义 connector 的版本号:

@Override
public String version() {
  return new ElasticsearchSinkConnector().version();
}

start 方法

参数即为上面自定义 connector 的 taskConfigs 中指定的配置。我们会根据 connector 中定义的 CONFIG_DEF 来解析这些配置。

然后重头戏就是根据指定的配置创建 ES tranport client。注意,由于 runtime 不会因为某个 connector 挂了而退出,而是一直运行,由于我们这里就只有一个 connector,所以如果连不上 ES 集群我们会强制 runtime 退出。

@Override
public void start(Map<String, String> props) {
  AbstractConfig parsedConfig = 
    new AbstractConfig(ElasticsearchSinkConnector.CONFIG_DEF, props);
  typeName = parsedConfig.getString(ElasticsearchSinkConnector.TYPE_NAME);
  final String clusterName = parsedConfig.getString(
    ElasticsearchSinkConnector.ES_CLUSTER_NAME);
  final String esHost = parsedConfig.getString(ElasticsearchSinkConnector.ES_HOST);
  final int esPort = parsedConfig.getInt(ElasticsearchSinkConnector.ES_PORT);
  indexPrefix = parsedConfig.getString(ElasticsearchSinkConnector.INDEX_PREFIX);

  try {
    System.setProperty("es.set.netty.runtime.available.processors", "false");
    Settings settings = Settings.builder().put("cluster.name", clusterName).build();
    client = new PreBuiltTransportClient(settings)
      .addTransportAddress(new TransportAddress(InetAddress.getByName(esHost), esPort));

    client
      .admin()
      .indices()
      .preparePutTemplate("kafka_template")
      .setTemplate(indexPrefix + "*")
      .addMapping(typeName, new HashMap<String, Object>() {{
        put("date_detection", false);
        put("numeric_detection", false);
      }})
      .get();

  } catch (Exception e) {
    log.error("Couldn't connect to es: ", e);
    System.exit(-1);
  }
}

stop 方法

如果 task 关闭时要做一些清理工作,则在 stop 中做,我们会在这里关闭 start 中创建的 client:

@Override
public void stop() {
  if (client != null) {
    client.close();
  }
}

put 方法

这个方法是 SinkTask 真正做 sink 任务的地方,参数为从 kafka 拉取到的一批消息,为了提升效率,我们也会一批一批向 ES 写入,于是我们采用 BulkRequest 来向 ES 发送数据:

@Override
public void put(Collection<SinkRecord> records) {
  if (records.isEmpty()) {
    log.debug("no records to be sent.");
    return;
  }
  BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
  for (SinkRecord record : records) {
    log.debug("Processing record type = {}, record content = {}",
      record.value().getClass(),
      record);
    IndexRequest indexRequest = client.prepareIndex(indexPrefix + record.topic(), typeName)
      .setSource(gson.toJson(record), XContentType.JSON)
      .request();
    bulkRequestBuilder.add(indexRequest);
  }
  BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
  if (bulkResponse.hasFailures()) {
    for (BulkItemResponse item : bulkResponse.getItems()) {
      log.warn("send to es failed: failure = {}", item.getFailure());
    }
  }
}

如果发送失败,我们这里会打印相关错误,并不会进行重试。你可以根据自己需要编写重试逻辑。

flush 方法

该方法是为 put 兜底的,我们没有要做的事情,留空。

使用

编写完毕后,运行 mvn clean package 打包,然后将前面提到的配置放到两个配置文件中。然后运行下述命令运行之:

$ CLASSPATH=./kafka-connect-elasticsearch-1.0.0-SNAPSHOT-jar-with-dependencies.jar
$ java -server -Xms6g -Xmx6g  -XX:NewRatio=3 -Dlog4j.configurationFile=./log4j2.xml \
-cp $CLASSPATH org.apache.kafka.connect.cli.ConnectStandalone \
./connect-standalone.properties ./connect-elasticsearch-sink-connector.properties &

添加新评论

我们会加密处理您的邮箱保证您的隐私. 标有星号的为必填信息 *