Polarsのpushdown深堀り(S3上のParquetファイルを例に)
ログラスの龍島(@hryushm)です。最近は本当に寒いですね。寒いと言えば北極。北極と言えばホッキョクグマ。ホッキョクグマと言えばPolarsです。
Polarsの特徴として、外部リソース(例:S3上のParquetファイル)からのデータ取得時に必要なデータのみに絞って取得する述語pushdown機能があります。
これに関して上記のPolarsのブログにて下記のような記述があります。A common pattern when applying predicate pushdown in parquet files is to read the file in two steps. First you get the metadata section. Then you use your predicate to determine if a certain row group needs to be read or not. For the example column_a > 2 the parquet reader would read the column metadata of column_a and look at its min and max values for each row group. If the predicate evaluates to false then there is no need to read in the row group.
(日本語訳)Parquetファイルにおける述語プッシュダウンを適用する際の一般的なパターンは、ファイルを2つのステップで読み込むことです。最初にメタデータセクションを取得します。その後、述語を使用して特定の行グループを読み込む必要があるかどうかを判断します。例えば「column_a > 2」という条件では、Parquetリーダーはcolumn_aのメタデータ(各行グループの最小値と最大値)を読み込みます。この条件が偽と評価される場合、その行グループを読み込む必要はありません。
まずメタデータのみ読み取り、その後対象となるデータを含むRow groupのみを取得することで実現されているようです。
実際にS3上にあるParquetファイルを取得する際、実際どのようなリクエストを送り上記が実現されているのか、HTTPリクエストをキャプチャしつつ理解していこうと思います。
事前知識
そもそもParquetファイルはどのような内部構成をしているのでしょうか?Parquetファイルの概観は下記ブログが詳しく、参考になったので参照いただくと良いと思います。ここから先は大まかなParquetのファイルレイアウトについて理解している前提で進めます。
事前準備
Polarsのスクリプトを実行した際のS3へのリクエストをキャプチャするため、mitmproxyを利用します。mitmproxyはHTTP proxyとして通信をキャプチャすることができるツールです。
macであれば下記でインストール可能です。brew install mitmproxy
また簡単のため、AWS S3ではなくS3互換のオブジェクトストレージであるMINIOをローカルでホストして利用します。
最後にParquetファイルのメタデータなどを簡単に見るため、parquet-cliを利用します
brew install parquet-cli
テストデータ生成
テストデータを下記のスクリプトで生成します。id列に連番を振り、value, scoreの列に適当な文字列や数値を入れておきます。
またParquetファイルの書き込み時にrow_group_sizeを指定して1000レコード/100 = 10のRow groupに分かれるように設定しておきます。
import polars as pl
import s3fs
# テストデータの作成
df = pl.DataFrame({
"id": range(1, 1001), # 1から1000までの連番
"value": [chr(65 + i % 26) for i in range(1000)], # A-Zを繰り返す文字列データ
"score": [i % 100 for i in range(1000)] # 0-99の数値データを繰り返す
})
# テストデータをS3に保存
fs = s3fs.S3FileSystem(
endpoint_url="http://localhost:9000",
key="<your-minio-user>",
secret="<your-minio-password>"
)
destination = "s3://polars-pushdown/data.parquet"
# write parquet
with fs.open(destination, mode='wb') as f:
df.write_parquet(
f,
row_group_size=100 # ブロックサイズを100に設定
)
Polarsでのクエリ
Polarsでテストデータに対する簡単なクエリを書きます。今回はid=1のレコードのvalueを出力するものです。idは連番を振っているので結果は1レコードのみしかありません。
import polars as pl
# S3の設定
storage_options = {
"endpoint_url": "http://localhost:9000",
"aws_access_key_id": "<your-minio-user>",
"aws_secret_access_key": "<your-minio-password>",
}
# S3からparquetファイルを読み込む
lf = pl.scan_parquet(
"s3://polars-pushdown/data.parquet",
storage_options=storage_options
).filter(
pl.col("id") == 1
).select(["id", "value"])
print(lf.explain()) # クエリプランの出力
print(lf.collect()) # クエリ結果の出力
実行すると下記のような出力となります。
Parquet SCAN [s3://polars-pushdown/data.parquet]
PROJECT 2/3 COLUMNS
SELECTION: [(col("id")) == (1)]
shape: (1, 2)
┌─────┬───────┐
│ id ┆ value │
│ --- ┆ --- │
│ i64 ┆ str │
╞═════╪═══════╡
│ 1 ┆ A │
└─────┴───────┘
PROJECT 2/3 COLUMNS
から3カラムの内2カラム(id, value)のみ利用され、SELECTION: [(col("id")) == (1)]
からid=1のデータのみに絞られていることがわかります。
クエリのキャプチャ
上記のテストデータ生成スクリプトとクエリスクリプトを実行すると、mitmproxyでは下記のようなHTTPリクエストをキャプチャできました。
※先頭の(1)などは説明のため追記
(1) http://localhost:9000/polars-pushdown/data.parquet PUT 200 13.3kb 64ms
(2) http://localhost:9000/polars-pushdown/data.parquet HEAD 200 0 22ms
(3) http://localhost:9000/polars-pushdown/data.parquet GET 206 8b 28ms
(4) http://localhost:9000/polars-pushdown/data.parquet GET 206 2.5kb 28ms
(5) http://localhost:9000/polars-pushdown/data.parquet GET 206 574b 25ms
データを配置するPUTのリクエストの後にHEAD、そして3つGETのリクエストが来ていることがわかります。
PUTのリクエストからファイルは13.3kbありますが、GETのリクエストは8b, 2.5kb, 574bと全て合わせてもファイルの全体量よりは大きく削減できているようです。pushdownが効いていることがわかります。
HEAD, GETのそれぞれのリクエストについて、見るべきポイントを抜粋してもう少し詳しく見ていきます。
(2) http://localhost:9000/polars-pushdown/data.parquet HEAD 200 0 22ms
レスポンスヘッダー
HTTP/1.1 200 OK
Accept-Ranges: bytes
Content-Length: 13640
...(略)
Accept-Rangesヘッダーが付与されています。これはHTTP Range Requestの仕様に定められているもので、Range Requestにサーバ側が対応していることを示しています。またファイル自体のサイズも取得できています。
続いて3のGETリクエストを見ていきます。
(3) http://localhost:9000/polars-pushdown/data.parquet GET 206 8b 28ms
リクエストヘッダー
GET http://localhost:9000/polars-pushdown/data.parquet HTTP/1.1
range: bytes=13632-13639
host: localhost:9000
...(略)
先程のRange Requestの仕様に基づいてrangeヘッダーを付与しており、Content-Length(13640)から逆算した最後の8byteを取得しようとしているようです。
レスポンスボディ(hexdump)
0000000000 21 0a 00 00 50 41 52 31 !...PAR1
Parquetファイルの仕様で、最後の8byteはfooterの長さ(4byte、リトルエンディアン)+マジックナンバー(4byte)なので、21 0a 00 00
= 2593byte がfooterの長さと取得できています。
※詳細は事前知識の項のブログを参照
続いて4のGETリクエストを見ていきます
(4) http://localhost:9000/polars-pushdown/data.parquet GET 206 2.5kb 28ms
リクエストヘッダー
GET http://localhost:9000/polars-pushdown/data.parquet HTTP/1.1
range: bytes=11039-13639
...(略)
レスポンスヘッダー
HTTP/1.1 206 Partial Content
Accept-Ranges: bytes
Content-Length: 2601
Content-Range: bytes 11039-13639/13640
...(略)
3で取得できたfooterの長さから逆算したfooterの領域をRange Requestで取得しています。
この取得できるfooterの情報はparquet-cliで確認することができます。
parquet footer
コマンドでfooterに含まれる全量は確認できますが、長すぎるため parquet meta
コマンドの結果を載せておきます。
$ parquet meta data.parquet
File path: data.parquet
Created by: Polars
Properties:
ARROW:schema: /////wMBAAAEAAAA8v///xQAAAAEAAEAAAAKAAsACAAKAAQA+P///wwAAAAIAAgAAAAEAAMAAACQAAAAUAAAAAQAAADs////OAAAACAAAAAYAAAAAQIAABAAEgAEABAAEQAIAAAADAAAAAAA9P///0AAAAABAAAACAAJAAQACAAFAAAAc2NvcmUAAADs////LAAAACAAAAAYAAAAARQAABAAEgAEABAAEQAIAAAADAAAAAAA/P///wQABAAFAAAAdmFsdWUAAADs////OAAAACAAAAAYAAAAAQIAABAAEgAEABAAEQAIAAAADAAAAAAA9P///0AAAAABAAAACAAJAAQACAACAAAAaWQA
Schema:
message root {
optional int64 id;
optional binary value (STRING);
optional int64 score;
}
Row group 0: count: 100 8.40 B records start: 4 total(compressed): 840 B total(uncompressed):2.118 kB
--------------------------------------------------------------------------------
type encodings count avg size nulls min / max
id INT64 Z RB_ 100 3.16 B 0 "1" / "100"
value BINARY Z RB_ 100 2.08 B 0 "A" / "Z"
score INT64 Z RB_ 100 3.16 B 0 "0" / "99"
Row group 1: count: 100 8.65 B records start: 988 total(compressed): 865 B total(uncompressed):2.118 kB
--------------------------------------------------------------------------------
type encodings count avg size nulls min / max
id INT64 Z RB_ 100 3.41 B 0 "101" / "200"
value BINARY Z RB_ 100 2.08 B 0 "A" / "Z"
score INT64 Z RB_ 100 3.16 B 0 "0" / "99"
...(以下Row group 9まで)
ポイントは各Row groupにおいて各カラムのmin/maxの値があるため、絞りたい値の存在するRow groupをfooterの情報から特定できることです。今回はid=1を指定しているのでRow group 0に対象となるデータが含まれていると特定できます。
最後に5のGETリクエストです。
(5) http://localhost:9000/polars-pushdown/data.parquet GET 206 574b 25ms
リクエストヘッダー
GET http://localhost:9000/polars-pushdown/data.parquet HTTP/1.1
range: bytes=4-577
...(略)
4で取得したRow groupのデータのみに絞ってRange Requestをしています。このリクエストによってid=1のデータが含まれるRow groupのデータのみを取得できたことになります。
以上が全体のリクエストの流れです。ブログの通りParquetのメタデータから必要なRow groupのファイル内での位置を特定し、Range Requestによって部分的にデータを取得していることがわかりました。
まとめ
PolarsがS3からParquetファイルのデータを取得する際、HTTPのRange Requestを駆使してメタデータのみを取得、更にメタデータからわかるRow groupのみを取得することでファイル全体を取得せずに必要な箇所のみ取得していることがわかりました。id=1のように大きく絞り込みが行われるクエリにおいては、Parquetファイルのrow_group_sizeを小さく設定したほうが大きくRow groupを絞り込むことができそうですが、メタデータが肥大化することや絞り込みがあまり行われないクエリのオーバーヘッド増加などトレードオフがありそうです。時間がある時にまた検証してみたいです。
Discussion