Embulkのcoreのソースコードから紐解くデータ転送のしくみ
この記事は trocco Advent Calendar 2023 の6日目の記事となります。
はじめに
今回はtroccoの内部でも利用されているETLのためのOSSであるEmbulkについて、core部分のソースコードリーディングを通して、そのしくみを紐解いていきたいと思います。
おことわり
- Embulkの基本的な使い方などについては解説しません。
- 筆者はembulk-coreにコントリビュートしているわけではないので、間違いなどがあればお気軽にご指摘ください。
- 今回見ていくcoreの実装自体は、比較的変更が少ないとされる各種プラグインが従うべきインターフェース部分(embulk-spi)から隠蔽されているため、今後この記事の内容が正しくなくなる可能性は容易にあります。
- Embulkにはguessやpreviewやresumeといった機能も含まれていますが、今回は単純な
embulk run
で実行される処理に絞ってお話しします。
前提
参照バージョン
embulk-coreの参照するバージョンは、2023/12/06の時点での最新であるv0.11.2とします。
後述するembulk-spiは以下のv0.11を参照します。
embulk-spi
コードを見ていく前に、いくつかの前提にも触れておきます。
各プラグインは、embulk-spiで定められたインターフェースに従ってembulk-coreと連携しています。
v0.9以前ではembulk-coreのorg/embulk/spi
パッケージ以下に配置されていましたが、インターフェース以外の各種ユーティリティクラスもこの中には存在しています。
v0.11以降ではそういった依存性を減らすために、インターフェース定義を別リポジトリに切り出しています。
コードを追っていく中で、embulk-coreのorg/embulk/spi
の中には見つからないインターフェースがあったらこちらのリポジトリを参照するとよいです。
また、この辺りの詳しい話はcoreのメンテナーをしているdmikurubeさんの記事に詳しくまとまっています。
プラグイン種別
こちらは、Embulkを利用していれば一般的な内容かもしれませんが、プラグインには以下のいくつかの種類が存在します。
- InputPlugin: 非ファイル系のデータ元(MySQLなど)
- FileInputPlugin: ファイル系のデータ元(S3など)
- OutputPlugin: 非ファイル系のデータ先(BigQueryなど)
- FileOutputPlugin: ファイル系のデータ先(S3など)
- FilterPlugin: データ元からデータ先への過程での変換処理。ETLのTの部分
- ParserPlugin: データ元ファイルのパース(csvなど)
- DecoderPlugin: データ元ファイルの解凍(gzipなど)
- FormatterPlugin: データ先ファイルのフォーマット(csvなど)
- EncoderPlugin: データ先ファイルの圧縮(gzipなど)
- (ExecutorPlugin): 今回は1つのインスタンス上での実行を想定したLocalExecutorを前提とする
- (GuessPlugin): guess機能で利用。今回は割愛
同名のインターフェースもembulk-spiに定義されており、プラグイン側はそれを満たすように実装する必要があります。
各プラグインの対応関係は、大まかに以下となります。
※ 厳密な実行時の流れというよりはイメージ図です。
FileInputPluginはデータ元の括りですが、embulk-core側のFileInputRunner
というInputPlugin
を実装したクラスから利用されます。
FileInputPluginが実装を要求するメソッドはInputPluginとは異なる点も注目です。
(FileOutputPluginも同様)
また、FilterPlugin ,DecoderPlugin ,EncoderPluginは複数指定することが可能です。
コードリーディング
前置きが長くなりましたが、それではembulk-coreのソースコードを読んでいきます。
エントリーポイント
embulkコマンドを実行した際のエントリーポイントは以下のクラスのmainメソッドです。
さまざまな初期化処理が行われますが、長いのでだいぶ飛ばします。
embulk run
が実行されると、最終的に以下のBulkLoader
クラスのdoRun
メソッドに到達します。
ここから読み進めていきます。
transaction
まずは前述のdoRun
ですが、ここでは各プラグイン側で実装されたtransaction
メソッドが順番に呼ばれていきます。
かなりネストしていて、core側とプラグイン側の処理がいり混じっている部分なので初見だと流れをイメージしづらいかもしれません。
整理すると、以下のシーケンス図のようになっています。細かくなってしまったので拡大して見てみてください。
core側に実装された~.Control
が各プラグインの橋渡しをしているイメージです。
一番右のExecutorPlugin.Executor.execute
からは実際の転送処理の実行に移ります。
そのため、処理がreturnしてくるのは転送が終了したタイミングになります。
ファイル系のプラグインを利用しているともう少し複雑です。
例えばinput部分であれば、前述のFileInputRunner
のtransaction
が呼ばれ、その後その中で定義されているRunnerControl.run
がDecoderPlugin,ParserPluginのtransaction
が呼びだしていく構造になっていたりします。
また、FilterPluginの様に複数プラグインを設定できる場合は、core側のFilterInternal.transaction
が呼ばれ、各FilterPluginのtransaction
を順に呼び出す構造になっています。(DecoderPluginやEncoderPluginも同様)
では、そもそもtransaction
では何が行われているのでしょうか。
プラグイン側の実装次第ではありますが、一言で言うとメインの転送処理に対する前処理や後処理を行なっています。
特に前述のシーケンス図の右矢印の過程(前処理)においては、各プラグインで事前に算出しておくべき値などが~Control
のインターフェース定義を見ると分かります。
transaction
の中では前処理を終えると~Control
のメソッドを呼び出すからです。
InputPluginでは、Schema
とint taskCount
を渡して~Control.run
を呼び出しています。
データ元の設定によって、最初に入ってくるデータのスキーマが決定されるので、これを事前に算出して後続へ渡しています。
また、ファイル系のプラグインでは、ファイル数に応じた並列処理がこの後に行われていくため、事前にファイル数(int taskCount
)を求めておきます。
FilterPluginでもSchema
を算出しています。
これはInputPluginから渡されたスキーマがETLのTであるFilterPluginを通すことで変化していく可能性があるからです。
ExecutorPluginでは新たにint outputTaskCount
という概念が出てきました。
詳しくは後述しますが、後続の並列処理ではInputPluginで求めたint taskCount
に加えて、ホストマシンのCPUコア数や明示的な設定値に応じてその並列処理方法が変化します。
そのために必要なint outputTaskCount
という値をExecutorPluginでは事前に算出しています。
また、transaction
では~Control.run
の呼び出し後に処理を書くと全ての転送処理が終了した後の後処理を書くことができます。
例えば、embulk-output-jdbc
においては、tempテーブルに転送したデータを最後にまとめて対象のテーブルにInsertするような処理をここで行なっています。
transaction
メソッドが呼び出されていく段階では、処理はまだ直列です。
この後は、シーケンス図の一番右のExecutorPlugin.Executor.execute
の処理内容を追っていきます。
並列転送処理
前述のExecutorPlugin.Executor.execute
以降の処理ですが、
Executor
は、DirectExecutor
,ScatterExecutor
のどちらかということがtransaction
の段階で決定されています。
DirectExecutor
,ScatterExecutor
に関しては、以下のsonotsさんの記事で詳しく解説されています。
対象バージョンはv0.8.6ですが、大まかには変わっていないと思いますので、詳細はこちらの記事を参照してください。
まずは、シンプルな方のDirectExecutor
で読み進めると分かりやすいと思います。
以降は、JavaのFutureによってスレッドごとの処理に分岐してきますが、それぞれのThreadは以下の処理に到達します。
ここが転送処理の始まりの部分です。
PageOutput
最初にOutputPluginに実装されたopen
というメソッドが呼ばれ、その返り値であるPageOutput
というインターフェースを実装したインスタンスを取得します。
次にFilterPluginのopen
が前段のPageOutput
を受け取りまた新たなPageOutput
を返します。
面白いことに、ここはデータの処理と流れが逆な点に注目です。
PageOutput
を実装したクラスはそれぞれadd
というメソッドがあり、この処理はそれぞれで異なります。
データの処理と逆の流れでリレーしていったことで、最終的にはデータの処理の順にPageOutput
がネストされたオブジェクトが得られます。
そして、最終的なPageOutput
は、InputPluginのrun
メソッドに渡されます。
run
メソッドでは各InputPluginが思いのままにデータ元からデータを取得します。
その取得したデータはPage
というクラスのインスタンスに格納されます。
生成したPage
は後続のプラグインへPageOutput
のadd
メソッドを介して受け渡されていきます。
PageOutput
を先に生成していたのはここに意味があるわけです。
各PageOutput
は後続のPageOutput
を内包しているため、各プラグインにおける処理を終えると、Page
を生成して後続のPageOutput
にadd
していきます。
最終的にはOutputPluginに到達して、そのadd
メソッドはデータロードの処理を実行します。
流れは以下のシーケンス図のようになります。(FilterPluginが一つの場合)
つまり、まずデータ元のデータを全件取得して後続に渡して処理していくような逐次的処理ではなく、データの取得中(ETLのE)にも順次後続の処理(ETLのTL)を行っていくような準ストリーミングな方式の処理となっています。
そのため大規模なデータにおいてもメモリ消費を一定に抑えることができ、更には並列処理による速度向上も見込めるアーキテクチャとなっているわけです。
Page
それではPage
とはなんでしょうか。
Page
はそのフィールドとして、buffer
とstringReferences
, jsonValueReferences
を持っています。
buffer
は32KBのバイト列で以下のようにレイアウトとなっています。
Page
という名称で行ごとのデータをシリアライズしたり、NULLのbitmapを行ヘッダーに持っているような構造は、どこかRDBMSにおけるページの概念の思い起こします。
とはいえ、string, json型のデータは可長幅のデータ型が故に、実データはstringReferences
, jsonValueReferences
に保持した上でその配列のindex値がbuffer
に格納されているようでした。
この辺のWhy?は歴史的経緯も絡んでくると思いますが深くは調べきれていません。
そして、上記のようなPage
の内部構造はプラグイン側に対しては隠蔽されているため、PageBuilder
, PageReader
というクラスを利用して読み書きをしていることがほとんどです。
PageBuilder
PageBuilder
を利用したPage
の書き込みは、プラグイン側では例として以下のように実装できます。
特に厳密な制約はないですが、Visitorパターンを使ってEmbulkにおける各型(long, double, boolean, string, timestamp, json)に対する処理を書いているプラグインが多いと思います。
while ( /* 全体ループ */ ) {
while ( /* 行ループ */ ) {
schema.visitColumns(new ColumnVisitor() {
public void longColumn(Column column) {
boolean v = /* 値を取得 */
pageBuilder.setBoolean(column, v);
}
public void stringColumn(Column column) {
String v = /* 値を取得 */
pageBuilder.setString(column, v);
}
/* その他の型、double, boolean, timestamp, jsonについても実装 */
})
}
pageBuilder.addRecord();
}
pageBuilder.finish();
カラムごとに値をset~
していき、addRecord
を呼び出すと1レコードとして記録されます。
bufferに収まらなくなったタイミングで、PageBuilder.flush
が呼び出され後続のPageOutput
へadd
されます。
PageReader
一方で、読み取り側ではPageReader
を利用します。
こちらも概ね同じようにVisitorパターンで実装しているプラグインが多いと思います。
while (pageReader.nextRecord()) {
pageReader.getSchema().visitColumns(new ColumnVisitor() {
public void booleanColumn(Column column) {
/* FilterPluginであればPageBuilderのset~を呼び出す */
/* OutputPluginであればLoadのための各種処理 */
}
/* その他の型についても実装 */
})
}
ここで一点着目したいのは、FilterPluginにおいてはPageReader
によってPage
が読み出されると然るべく処理を行った後にPageBuilder
を利用してPage
を生成して後続に渡している点です。
(embulk-standards/embulk-filter-remove_columnsの例)
InputPlugin -> OutputPluginの流れの中で同一の一行であれば同一のPage
と言うように一対一に紐づいているわけではなく、
以下のようにPage
は各プラグイン間においてのデータ受け渡しのたびに生成されます。
おわりに
いかがだったでしょうか。
embulk run
の処理の流れの詳細は、core側とプラグイン側の両方を参照しないとといけなかったり、PageOutput
の部分などコードを上から読んでいくだけではイメージが付きづらい部分があったりするかもしれません。
今回はその辺をできるだけ図やソースコードのリンクを使って解説してみました。
普段ユーザーとして利用している分にはここまで理解する必要はないと思いますが、何かの際のトラブルシューティング時で少しでもお役に立てれば幸いです。
余談ですが、私も業務で特定の状況で発生するメモリリークの問題を調査する必要があり、その際coreのコードを読み込んだことが今回の記事を執筆した始まりでした。
(少量でもファイル数が多い場合に、プラグイン側の実装次第ではPage
で割り当てたbuffer
が解放されないことによって発生する問題)
Discussion