Kafka-Connectのサンプル実装
はじめに
三菱UFJインフォメーションテクノロジー株式会社の辻恵三と申します。
Zennでのテックブログは初投稿となります。
よろしくお願いいたします。
最近、Kafkaを使ったアプリケーション(主に、Consumerアプリ)に携わる機会があり、主に以下3つのライブラリを使って構築しようと考えておりました。
今回は、KafkaのTopicにあるデータをDBに登録するアプリを作りたいと考えていたため、Kafka-Connectの一種である、SinkConnector
を使ってサンプル実装してみたいと思います。
▼ライブラリ
① Kafka-Client
② Kafka-Connect(SinkConnector) ← 今回の記事で扱う内容
③ Apache-Camel
本記事の前提
- 動作環境:macOS Sequoia 15.3.1
- Kafkaの環境構築ができていること。
- Kafkaの基本操作(トピックの作成、データ投入、削除等)ができること。
- PostgreSQLの環境構築ができていること。
- PostgreSQLの基本操作(テーブル作成、参照、更新、削除等)ができること。
- Javaのインストールができていること。
- Mavenのインストールができていること。
注意事項
- 本環境は、互換性・動作確認のため、Java1.8の環境で実施しております。
他のJavaバージョンでは動作が異なる可能性があるため、ご自身の環境に応じた調整をお願いします。 - 掲載したソースコードはサンプルになります。
本ソースコードを使用することで発生するいかなる損害や不利益について、当社は一切の責任を負いませんので自己の責任においてご利用ください。
利用したバージョン等
- Apache Kafka 3.7.2(localhost:9092で起動)
- PostgreSQL 14.15(localhost:5432で起動)
- Java 1.8.0.292(互換性、動作確認のため、このバージョンを使っております。)
- Maven 3.9.9
サンプル実装のシナリオ
今回、実装するシナリオは以下になります。
- Kafka-Cliを使って、KafkaのTopicにデータを登録。(1. Producer→ 2. Topic の箇所)
- Topicに登録したデータをKafka-Connectを使って取得。(2. Topic→ 3. Consumer の箇所)
- 取得したデータをDBに登録。(3. Consumer→ 4. Table の箇所)
※図は当社にて作成
利用するTopic、Table情報
Topic情報
Kafka-Cliを使ってtest-topicを生成します。
$ kafka-topics --bootstrap-server localhost:9092 --create --topic myTopic
生成したtest-topicを確認します。
$ kafka-topics --bootstrap-server localhost:9092 --describe --topic myTopic
Table情報
今回はユーザ、データベースは既に作成済とし、テーブル作成の箇所から記載します。
(ユーザ:Postgres、データベース:test)
データベースへの接続
$ psql -U postgres test
テーブルの作成
(テーブル:messages、カラム:id(PK)、message )
CREATE TABLE messages (
id TEXT PRIMARY KEY,
message TEXT
);
Kafka-Connectアプリの実装
アプリの概要
org.apache.kafka.connect.sink.SinkConnectorを使って、KafkaのTopicからデータを取得し、DBにデータを登録します。
DBにデータ登録する処理は、org.apache.kafka.connect.sink.SinkTaskを継承した、SampleSinkTaskにて実施します。
プロジェクトの作成
Mavenを使って、プロジェクトを生成します。
オプションの説明については下表を参照してください。
$ mvn archetype:generate -DgroupId=com.example -DartifactId=my-app -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
オプション | 説明 |
---|---|
-DgroupId=com.example |
作成するプロジェクトのgroupId (グループID) |
-DartifactId=my-app |
作成するプロジェクトのartifactId (アーティファクトID) |
-DarchetypeArtifactId=maven-archetype-quickstart |
使用するアーキタイプ(テンプレート)のID |
-DinteractiveMode=false |
インタラクティブモードを無効にし、対話的な入力なしで実行 |
pom.xmlの修正
生成したプロジェクト(my-app)の直下にpom.xmlがありますので、以下定義を追加してください。
versionについては、自分の環境に合ったものを使ってください。
- 依存関係の追加
pom.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>3.6.1</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.3.8</version>
</dependency>
- ビルドセクションの追加
この定義を追加する事でパッケージングする際、依存関係のライブラリも含まれるようになります。
pom.xml
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
Javaクラスの作成
com.example直下に以下2つのクラスを作成してください。
- SampleSinkConnector.java
- SampleSinkTask.java
クラス構成
※図は当社にて作成
実装内容
com.sample.SampleSinkConnector.java
package com.example;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkConnector;
public class SampleSinkConnector extends SinkConnector {
private String dbUrl;
private String user;
private String password;
public String version() {
return "1.0.0";
}
@Override
public void start(Map<String, String> props) {
dbUrl = props.get("db.url");
user = props.get("db.user");
password = props.get("db.password");
if (dbUrl == null || user == null || password == null) {
throw new ConnectException("Missing database configuration parameters.");
}
}
@Override
public Class<? extends Task> taskClass() {
return SampleSinkTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> configs = new ArrayList<Map<String, String>>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> config = new HashMap<String, String>();
config.put("db.url", dbUrl);
config.put("db.user", user);
config.put("db.password", password);
configs.add(config);
}
return configs;
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return new ConfigDef().define("db.url", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "JDBC Database URL")
.define("db.user", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Database Username")
.define("db.password", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "Database Password");
}
}
com.sample.SampleSinkTask.java
package com.example;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Collection;
import java.util.Map;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
public class SampleSinkTask extends SinkTask {
private Connection connection;
private String dbUrl;
private String user;
private String password;
public String version() {
return "1.0.0";
}
@Override
public void start(Map<String, String> props) {
dbUrl = props.get("db.url");
user = props.get("db.user");
password = props.get("db.password");
try {
connection = DriverManager.getConnection(dbUrl, user, password);
} catch (Exception e) {
throw new RuntimeException("Failed to establish JDBC connection", e);
}
}
@Override
public void put(Collection<SinkRecord> records) {
String sql = "INSERT INTO messages (id, message) VALUES (?, ?) ";
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
for (SinkRecord record : records) {
System.out.println("Processing record: " + record.value());
// Kafka メッセージを JSON 形式で想定
Map<String, Object> value = (Map<String, Object>) record.value();
String id = (String) value.get("id");
String message = (String) value.get("message");
stmt.setString(1, id);
stmt.setString(2, message);
stmt.executeUpdate();
}
} catch (Exception e) {
}
}
@Override
public void stop() {
try {
if (connection != null)
connection.close();
} catch (Exception e) {
System.out.println("Fail Task Stopped");
}
}
}
パッケージング
my-appフォルダ直下で、以下コマンドを実行するとtarget配下にmy-app-1.0-SNAPSHOT.jar
が作成され、このJarファイルを使ってKafka-Connectを起動します。
$ mvn clean package
Kafka-Connectの設定
Kafka-Connectは、Kafkaをダウンロードした資源の中に格納されているため、それを使って起動します。
尚、以降のコマンドでは、ダウンロードしたKafkaのディレクトリ
に移動している前提で記載します。
connect-distributed.propertiesの設定値変更
config/connect-distributed.properties
# Topicから取得したデータのスキーマチェックをオフに設定
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# Jarファイルの配置パスの設定
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=`Mavenプロジェクトで生成作成したJarファイルの格納フォルダを指定`
Kafka-Connectの起動
Kafka-Connectは、8003がデフォルトポートになります。
$ bin/connect-distributed.sh config/connect-distributed.properties
Kafka-Connectorの登録
任意のディレクトリで、以下のconnector.json
を作成します。
connector.json
{
"name": "sample-sink-connector",
"config": {
"connector.class": "com.example.SampleSinkConnector",
"tasks.max": "1",
"topics": "test-topic",
"db.url": "jdbc:postgresql://localhost:5432/test",
"db.user": "postgres",
"db.password": "*****"
}
}
Connectorの登録
connector.json
を作成したディレクトリ直下で、以下コマンドを実行します。
これで、Connectorの登録も終わりましたので、後はTopicにデータを投入すれば、Topicからデータを取得しDBに登録されます。
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @connector.json
KafkaのTopicにデータを投入
kafka-console-producerコマンドを使って、Topicにデータを格納します。
尚、このコマンドは対話形式になりますので、対話形式になりましたら、以下のJSONデータを入力してください。
入力が完了しましたら、ctrl + c で停止してください。
$ kafka-console-producer --broker-list localhost:9092 --topic test-topic
{"id": "1", "message": "Hello, World!"}
{"id": "2", "message": "Kafka to DB!"}
投入したデータの確認
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --from-beginning
{"id": "1", "message": "Hello, World!"}
{"id": "2", "message": "Kafka to DB!"}
DBの確認
データベースへの接続
$ psql -U postgres test
messagesテーブルの中身の確認
select * from messages;
以下の結果が出力されたら、成功になります。
id | message
----+---------------
1 | Hello, World!
2 | Kafka to DB!
(2 rows)
まとめ
- Kafka-Connect(SinkConnector)を使う事で、Topicからのデータ取得が簡単にできるようになります。
- 今回は、CustomConnectorを作りましたが、TopicからDBにデータを登録するConnectorライブラリは他にもありますので、個別に作成しなくても使えるライブラリを探してみたいと思います。
弊社でのKafkaの活用は、発展途上になりますので、引き続き活用できるライブラリを調べ、効率良く開発できるように努めていきます。
Discussion