Zenn
👺

Kafka-Connectのサンプル実装

2025/03/28に公開

はじめに

三菱UFJインフォメーションテクノロジー株式会社の辻恵三と申します。
Zennでのテックブログは初投稿となります。
よろしくお願いいたします。

最近、Kafkaを使ったアプリケーション(主に、Consumerアプリ)に携わる機会があり、主に以下3つのライブラリを使って構築しようと考えておりました。
今回は、KafkaのTopicにあるデータをDBに登録するアプリを作りたいと考えていたため、Kafka-Connectの一種である、SinkConnectorを使ってサンプル実装してみたいと思います。

▼ライブラリ
① Kafka-Client
② Kafka-Connect(SinkConnector) ← 今回の記事で扱う内容
③ Apache-Camel

本記事の前提

  • 動作環境:macOS Sequoia 15.3.1
  • Kafkaの環境構築ができていること。
  • Kafkaの基本操作(トピックの作成、データ投入、削除等)ができること。
  • PostgreSQLの環境構築ができていること。
  • PostgreSQLの基本操作(テーブル作成、参照、更新、削除等)ができること。
  • Javaのインストールができていること。
  • Mavenのインストールができていること。

注意事項

  • 本環境は、互換性・動作確認のため、Java1.8の環境で実施しております。
     他のJavaバージョンでは動作が異なる可能性があるため、ご自身の環境に応じた調整をお願いします。
  • 掲載したソースコードはサンプルになります。
     本ソースコードを使用することで発生するいかなる損害や不利益について、当社は一切の責任を負いませんので自己の責任においてご利用ください。

利用したバージョン等

  • Apache Kafka 3.7.2(localhost:9092で起動)
  • PostgreSQL 14.15(localhost:5432で起動)
  • Java 1.8.0.292(互換性、動作確認のため、このバージョンを使っております。)
  • Maven 3.9.9

サンプル実装のシナリオ

今回、実装するシナリオは以下になります。

  • Kafka-Cliを使って、KafkaのTopicにデータを登録。(1. Producer→ 2. Topic の箇所)
  • Topicに登録したデータをKafka-Connectを使って取得。(2. Topic→ 3. Consumer の箇所)
  • 取得したデータをDBに登録。(3. Consumer→ 4. Table の箇所)

※図は当社にて作成

利用するTopic、Table情報

Topic情報

Kafka-Cliを使ってtest-topicを生成します。

command
$ kafka-topics --bootstrap-server localhost:9092 --create --topic myTopic

生成したtest-topicを確認します。

command
$ kafka-topics --bootstrap-server localhost:9092 --describe --topic myTopic

Table情報

今回はユーザ、データベースは既に作成済とし、テーブル作成の箇所から記載します。
(ユーザ:Postgres、データベース:test)

データベースへの接続

command
$ psql -U postgres test

テーブルの作成
(テーブル:messages、カラム:id(PK)、message )

command
CREATE TABLE messages (
  id TEXT PRIMARY KEY,
  message TEXT
);

Kafka-Connectアプリの実装

アプリの概要

org.apache.kafka.connect.sink.SinkConnectorを使って、KafkaのTopicからデータを取得し、DBにデータを登録します。
DBにデータ登録する処理は、org.apache.kafka.connect.sink.SinkTaskを継承した、SampleSinkTaskにて実施します。

プロジェクトの作成

Mavenを使って、プロジェクトを生成します。
オプションの説明については下表を参照してください。

command
$ mvn archetype:generate -DgroupId=com.example -DartifactId=my-app -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
オプション 説明
-DgroupId=com.example 作成するプロジェクトのgroupId(グループID)
-DartifactId=my-app 作成するプロジェクトのartifactId(アーティファクトID)
-DarchetypeArtifactId=maven-archetype-quickstart 使用するアーキタイプ(テンプレート)のID
-DinteractiveMode=false インタラクティブモードを無効にし、対話的な入力なしで実行

pom.xmlの修正

生成したプロジェクト(my-app)の直下にpom.xmlがありますので、以下定義を追加してください。
versionについては、自分の環境に合ったものを使ってください。

  • 依存関係の追加

pom.xml

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>connect-api</artifactId>
      <version>3.6.1</version>
    </dependency>
    <dependency>
      <groupId>org.postgresql</groupId>
      <artifactId>postgresql</artifactId>
      <version>42.3.8</version>
    </dependency>
  • ビルドセクションの追加
    この定義を追加する事でパッケージングする際、依存関係のライブラリも含まれるようになります。

pom.xml

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

Javaクラスの作成

com.example直下に以下2つのクラスを作成してください。

  • SampleSinkConnector.java
  • SampleSinkTask.java

クラス構成

※図は当社にて作成

実装内容

com.sample.SampleSinkConnector.java
package com.example;
 
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
 
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkConnector;
 
public class SampleSinkConnector extends SinkConnector {
 
    private String dbUrl;
    private String user;
    private String password;
 
    public String version() {
        return "1.0.0";
    }
 
    @Override
    public void start(Map<String, String> props) {
        dbUrl = props.get("db.url");
        user = props.get("db.user");
        password = props.get("db.password");
        if (dbUrl == null || user == null || password == null) {
            throw new ConnectException("Missing database configuration parameters.");
        }
 
    }
 
    @Override
    public Class<? extends Task> taskClass() {
        return SampleSinkTask.class;
    }
 
    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        List<Map<String, String>> configs = new ArrayList<Map<String, String>>();
        for (int i = 0; i < maxTasks; i++) {
            Map<String, String> config = new HashMap<String, String>();
            config.put("db.url", dbUrl);
            config.put("db.user", user);
            config.put("db.password", password);
            configs.add(config);
        }
        return configs;
    }
 
    @Override
    public void stop() { 
    }
 
    @Override
    public ConfigDef config() {
        return new ConfigDef().define("db.url", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "JDBC Database URL")
                .define("db.user", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Database Username")
                .define("db.password", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "Database Password");
    }
 
}
com.sample.SampleSinkTask.java
package com.example;
 
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Collection;
import java.util.Map;
 
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
 
public class SampleSinkTask extends SinkTask {
 
    private Connection connection;
    private String dbUrl;
    private String user;
    private String password;
 
    public String version() {
        return "1.0.0";
    }
 
    @Override
    public void start(Map<String, String> props) {
        dbUrl = props.get("db.url");
        user = props.get("db.user");
        password = props.get("db.password");
 
        try {
            connection = DriverManager.getConnection(dbUrl, user, password);
        } catch (Exception e) {
            throw new RuntimeException("Failed to establish JDBC connection", e);
        }
    }
 
    @Override
    public void put(Collection<SinkRecord> records) {
        String sql = "INSERT INTO messages (id, message) VALUES (?, ?) ";
 
        try (PreparedStatement stmt = connection.prepareStatement(sql)) {
            for (SinkRecord record : records) {
 
                System.out.println("Processing record: " + record.value());
 
                // Kafka メッセージを JSON 形式で想定
                Map<String, Object> value = (Map<String, Object>) record.value();
                String id = (String) value.get("id");
                String message = (String) value.get("message");
 
                stmt.setString(1, id);
                stmt.setString(2, message);
                stmt.executeUpdate();
            }
        } catch (Exception e) {
 
        }
    }
 
    @Override
    public void stop() {
        try {
            if (connection != null)
                connection.close();
        } catch (Exception e) {
            System.out.println("Fail Task Stopped");
        }
    }
 
}

パッケージング

my-appフォルダ直下で、以下コマンドを実行するとtarget配下にmy-app-1.0-SNAPSHOT.jar が作成され、このJarファイルを使ってKafka-Connectを起動します。

command
$ mvn clean package

Kafka-Connectの設定

Kafka-Connectは、Kafkaをダウンロードした資源の中に格納されているため、それを使って起動します。
尚、以降のコマンドでは、ダウンロードしたKafkaのディレクトリに移動している前提で記載します。

connect-distributed.propertiesの設定値変更

config/connect-distributed.properties
# Topicから取得したデータのスキーマチェックをオフに設定
key.converter.schemas.enable=false
value.converter.schemas.enable=false
 
# Jarファイルの配置パスの設定
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=`Mavenプロジェクトで生成作成したJarファイルの格納フォルダを指定`

Kafka-Connectの起動

Kafka-Connectは、8003がデフォルトポートになります。

command
$ bin/connect-distributed.sh config/connect-distributed.properties

Kafka-Connectorの登録

任意のディレクトリで、以下のconnector.jsonを作成します。

connector.json

{
  "name": "sample-sink-connector",
  "config": {
    "connector.class": "com.example.SampleSinkConnector",
    "tasks.max": "1",
    "topics": "test-topic",
    "db.url": "jdbc:postgresql://localhost:5432/test",
    "db.user": "postgres",
    "db.password": "*****"
  }
}

Connectorの登録

connector.jsonを作成したディレクトリ直下で、以下コマンドを実行します。
これで、Connectorの登録も終わりましたので、後はTopicにデータを投入すれば、Topicからデータを取得しDBに登録されます。

command
$ curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @connector.json

KafkaのTopicにデータを投入

kafka-console-producerコマンドを使って、Topicにデータを格納します。
尚、このコマンドは対話形式になりますので、対話形式になりましたら、以下のJSONデータを入力してください。
入力が完了しましたら、ctrl + c で停止してください。

command
$ kafka-console-producer --broker-list localhost:9092 --topic test-topic
command
{"id": "1", "message": "Hello, World!"}
{"id": "2", "message": "Kafka to DB!"}

投入したデータの確認

command
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --from-beginning
{"id": "1", "message": "Hello, World!"}
{"id": "2", "message": "Kafka to DB!"}

DBの確認

データベースへの接続

command
$ psql -U postgres test

messagesテーブルの中身の確認

command
select * from messages;

以下の結果が出力されたら、成功になります。

id |    message    
----+---------------
1  | Hello, World!
2  | Kafka to DB!
(2 rows)

まとめ

  • Kafka-Connect(SinkConnector)を使う事で、Topicからのデータ取得が簡単にできるようになります。
  • 今回は、CustomConnectorを作りましたが、TopicからDBにデータを登録するConnectorライブラリは他にもありますので、個別に作成しなくても使えるライブラリを探してみたいと思います。

弊社でのKafkaの活用は、発展途上になりますので、引き続き活用できるライブラリを調べ、効率良く開発できるように努めていきます。

Discussion

ログインするとコメントできます