🎄

KafkaとDeveziumによるMySQLとElasticsearch間のデータ連携

2022/12/25に公開

はじめに

この記事は株式会社LabBase テックカレンダー Advent Calendar 2022 24日目の記事です。

株式会社LabBaseでエンジニアをしている渡辺創です。

前回に引き続き、Elasticsearchを触っているのですが、RDBとのデータ連携の部分で試したいことがあったのでやってみました。

背景

現在、弊社ではAlgoliaという全文検索エンジンをつかっています。導入の詳細についてはインフラエンジニアのゾネスさんのブログを見てみてください。
今回、別サービスで全文検索エンジンとして、Elasticsearchの利用を検討しています。

Algoliaのデータ連携の方法としては、AWS SNSとAWS SQSを用いたメッセージキューを作成し、サービスのデータ更新系APIが呼び出されたら更新対象のデータをメッセージキューにキューイングして、AWS Lambdaがメッセージキューから更新対象データを読み取り、Algoliaへ非同期にデータ更新するようになっています。

Elasticsearchでもこのやり方を検討してはいるのですが、10secくらいの遅延は起きるかもという会話をゾネスさんがしていました。
ゾネスさんとの会話
Algoliaの方では遅延があっても大丈夫なように処理を工夫して対応しているのですが、その工夫によって検索結果のレスポンスが遅くなったりしているので、遅延を100msecくらいにしたいなと思い、他の方法をゆるりと調べています。

今回、DeveziumというApache Kafkaを使ったデータ連携の方法があったので、良さそうだなと思い試してみました。

Devezium とは

DeveziumとはRedHat社が開発しているオープンソースソフトウェアで既存データベース上の変更を即座に検知しイベントとしてKafkaに記録することができます。Kafkaには活用しやすいAPIとして「Kafka Conncet」が用意されており、Debezium自体は「Kafka Connect」として実装されています。MySQL、Elasticsearchの他にも多くのデータベースエンジンにも簡単に接続できるようになっています。

システムのイメージ図としては下記のような感じです。

実際にやってみた

サンプルがあったので、動かしてみました。

# Start the application
export DEBEZIUM_VERSION=2.0
docker compose -f docker-compose-jdbc.yaml up --build

docker ps で確認してみると、こんな感じでした。

MySQL、Elasticserch、Kafka、ZooKeeper、connect-jdbc-es が動いていることがわかります。

続いて、Connectorを作成していきます。ElasticsearchとMySQL、それぞれのConnectorを起動しますが、どちらもport8083で動いているコンテナーに設定情報をPOSTします。

ElasticsearchのConnector。

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @es-sink.json

MySQLのConnector。

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json

続いて、MySQLのデータの確認をします。

docker compose -f docker-compose-es.yaml exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory -e "select * from customers"'

customersテーブルにはデータが4つ入っています。

Elasticsearchのデータの確認

curl 'http://localhost:9200/customers/_search?pretty'

{
  "took" : 200,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 4,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1001",
        "_score" : 1.0,
        "_source" : {
          "id" : 1001,
          "first_name" : "Sally",
          "last_name" : "Thomas",
          "email" : "sally.thomas@acme.com"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1003",
        "_score" : 1.0,
        "_source" : {
          "id" : 1003,
          "first_name" : "Edward",
          "last_name" : "Walker",
          "email" : "ed@walker.com"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1004",
        "_score" : 1.0,
        "_source" : {
          "id" : 1004,
          "first_name" : "Anne",
          "last_name" : "Kretchmar",
          "email" : "annek@noanswer.org"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1002",
        "_score" : 1.0,
        "_source" : {
          "id" : 1002,
          "first_name" : "George",
          "last_name" : "Bailey",
          "email" : "gbailey@foobar.com"
        }
      }
    ]
  }
}

MySQLと同じデータが入っています。

データが同期されるかどうかを確かめるためにinsertをなげます。

docker compose -f docker-compose-es.yaml exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory'

mysql> insert into customers values(default, 'John', 'Doe', 'john.doe@example.com');

Elasticsearchに追加されていることを確認

curl 'http://localhost:9200/customers/_search?pretty'

{
  "took" : 256,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 5,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1001",
        "_score" : 1.0,
        "_source" : {
          "id" : 1001,
          "first_name" : "Sally",
          "last_name" : "Thomas",
          "email" : "sally.thomas@acme.com"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1003",
        "_score" : 1.0,
        "_source" : {
          "id" : 1003,
          "first_name" : "Edward",
          "last_name" : "Walker",
          "email" : "ed@walker.com"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1004",
        "_score" : 1.0,
        "_source" : {
          "id" : 1004,
          "first_name" : "Anne",
          "last_name" : "Kretchmar",
          "email" : "annek@noanswer.org"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1002",
        "_score" : 1.0,
        "_source" : {
          "id" : 1002,
          "first_name" : "George",
          "last_name" : "Bailey",
          "email" : "gbailey@foobar.com"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1005",
        "_score" : 1.0,
        "_source" : {
          "id" : 1005,
          "first_name" : "John",
          "last_name" : "Doe",
          "email" : "john.doe@example.com"
        }
      }
    ]
  }
}

この環境下だと当たり前なのですが、感覚としてはとても早く、ほぼ同時くらいです。

今回、ローカルで試しただけなので、今後サーバ/クライアント間で更新したり、データ量を増やしたりしての遅延をちゃんと測ろうと思います。

おわりに

データの同期方法には様々あって、とりあえず具体的なものから入ってみたのですが、これとかにあるように色々比較して、自分たちのやりたいことに合いそうなものやっていこうと思います。

似たようなことをやっているかたいらっしゃいましたら、苦しみやPros/Consなど聞かせてもらいたいなと思います。!

Discussion