🐻‍❄️

Polarsのpushdown深堀り(S3上のParquetファイルを例に)

2024/12/02に公開

ログラスの龍島(@hryushm)です。最近は本当に寒いですね。寒いと言えば北極。北極と言えばホッキョクグマ。ホッキョクグマと言えばPolarsです。

Polarsの特徴として、外部リソース(例:S3上のParquetファイル)からのデータ取得時に必要なデータのみに絞って取得する述語pushdown機能があります。
https://pola.rs/posts/predicate-pushdown-query-optimizer/
これに関して上記のPolarsのブログにて下記のような記述があります。

A common pattern when applying predicate pushdown in parquet files is to read the file in two steps. First you get the metadata section. Then you use your predicate to determine if a certain row group needs to be read or not. For the example column_a > 2 the parquet reader would read the column metadata of column_a and look at its min and max values for each row group. If the predicate evaluates to false then there is no need to read in the row group.
(日本語訳)Parquetファイルにおける述語プッシュダウンを適用する際の一般的なパターンは、ファイルを2つのステップで読み込むことです。最初にメタデータセクションを取得します。その後、述語を使用して特定の行グループを読み込む必要があるかどうかを判断します。例えば「column_a > 2」という条件では、Parquetリーダーはcolumn_aのメタデータ(各行グループの最小値と最大値)を読み込みます。この条件が偽と評価される場合、その行グループを読み込む必要はありません。

まずメタデータのみ読み取り、その後対象となるデータを含むRow groupのみを取得することで実現されているようです。

実際にS3上にあるParquetファイルを取得する際、実際どのようなリクエストを送り上記が実現されているのか、HTTPリクエストをキャプチャしつつ理解していこうと思います。

事前知識

そもそもParquetファイルはどのような内部構成をしているのでしょうか?Parquetファイルの概観は下記ブログが詳しく、参考になったので参照いただくと良いと思います。ここから先は大まかなParquetのファイルレイアウトについて理解している前提で進めます。
https://mrasu.hatenablog.jp/entry/2024/09/22/190000

事前準備

Polarsのスクリプトを実行した際のS3へのリクエストをキャプチャするため、mitmproxyを利用します。mitmproxyはHTTP proxyとして通信をキャプチャすることができるツールです。
https://mitmproxy.org/
macであれば下記でインストール可能です。

brew install mitmproxy

また簡単のため、AWS S3ではなくS3互換のオブジェクトストレージであるMINIOをローカルでホストして利用します。
https://min.io/

最後にParquetファイルのメタデータなどを簡単に見るため、parquet-cliを利用します
https://github.com/apache/parquet-java/tree/master/parquet-cli

brew install parquet-cli  

テストデータ生成

テストデータを下記のスクリプトで生成します。id列に連番を振り、value, scoreの列に適当な文字列や数値を入れておきます。
またParquetファイルの書き込み時にrow_group_sizeを指定して1000レコード/100 = 10のRow groupに分かれるように設定しておきます。

import polars as pl
import s3fs

# テストデータの作成
df = pl.DataFrame({
    "id": range(1, 1001),  # 1から1000までの連番
    "value": [chr(65 + i % 26) for i in range(1000)],  # A-Zを繰り返す文字列データ
    "score": [i % 100 for i in range(1000)]  # 0-99の数値データを繰り返す
})

# テストデータをS3に保存
fs = s3fs.S3FileSystem(
    endpoint_url="http://localhost:9000",
    key="<your-minio-user>", 
    secret="<your-minio-password>"
)
destination = "s3://polars-pushdown/data.parquet"

# write parquet
with fs.open(destination, mode='wb') as f:
    df.write_parquet(
        f,
        row_group_size=100 # ブロックサイズを100に設定
    )

Polarsでのクエリ

Polarsでテストデータに対する簡単なクエリを書きます。今回はid=1のレコードのvalueを出力するものです。idは連番を振っているので結果は1レコードのみしかありません。

import polars as pl

# S3の設定
storage_options = {
    "endpoint_url": "http://localhost:9000",
    "aws_access_key_id": "<your-minio-user>",
    "aws_secret_access_key": "<your-minio-password>",
}

# S3からparquetファイルを読み込む
lf = pl.scan_parquet(
    "s3://polars-pushdown/data.parquet",
    storage_options=storage_options
).filter(
    pl.col("id") == 1
).select(["id", "value"])

print(lf.explain()) # クエリプランの出力
print(lf.collect()) # クエリ結果の出力

実行すると下記のような出力となります。

Parquet SCAN [s3://polars-pushdown/data.parquet]
PROJECT 2/3 COLUMNS
SELECTION: [(col("id")) == (1)]
shape: (1, 2)
┌─────┬───────┐
│ id  ┆ value │
│ --- ┆ ---   │
│ i64 ┆ str   │
╞═════╪═══════╡
│ 1   ┆ A     │
└─────┴───────┘

PROJECT 2/3 COLUMNS から3カラムの内2カラム(id, value)のみ利用され、SELECTION: [(col("id")) == (1)] からid=1のデータのみに絞られていることがわかります。

クエリのキャプチャ

上記のテストデータ生成スクリプトとクエリスクリプトを実行すると、mitmproxyでは下記のようなHTTPリクエストをキャプチャできました。
※先頭の(1)などは説明のため追記

(1) http://localhost:9000/polars-pushdown/data.parquet	PUT	200	13.3kb	64ms
(2) http://localhost:9000/polars-pushdown/data.parquet	HEAD	200	0	22ms	
(3) http://localhost:9000/polars-pushdown/data.parquet	GET	206	8b	28ms	
(4) http://localhost:9000/polars-pushdown/data.parquet	GET	206	2.5kb	28ms	
(5) http://localhost:9000/polars-pushdown/data.parquet	GET	206	574b	25ms	

データを配置するPUTのリクエストの後にHEAD、そして3つGETのリクエストが来ていることがわかります。
PUTのリクエストからファイルは13.3kbありますが、GETのリクエストは8b, 2.5kb, 574bと全て合わせてもファイルの全体量よりは大きく削減できているようです。pushdownが効いていることがわかります。

HEAD, GETのそれぞれのリクエストについて、見るべきポイントを抜粋してもう少し詳しく見ていきます。

(2) http://localhost:9000/polars-pushdown/data.parquet	HEAD	200	0	22ms	

レスポンスヘッダー

HTTP/1.1 200 OK
Accept-Ranges: bytes
Content-Length: 13640
...(略)

Accept-Rangesヘッダーが付与されています。これはHTTP Range Requestの仕様に定められているもので、Range Requestにサーバ側が対応していることを示しています。またファイル自体のサイズも取得できています。
https://developer.mozilla.org/ja/docs/Web/HTTP/Range_requests

続いて3のGETリクエストを見ていきます。

(3) http://localhost:9000/polars-pushdown/data.parquet	GET	206	8b	28ms	

リクエストヘッダー

GET http://localhost:9000/polars-pushdown/data.parquet HTTP/1.1
range: bytes=13632-13639
host: localhost:9000
...(略)

先程のRange Requestの仕様に基づいてrangeヘッダーを付与しており、Content-Length(13640)から逆算した最後の8byteを取得しようとしているようです。

レスポンスボディ(hexdump)

0000000000 21 0a 00 00 50 41 52 31                           !...PAR1

Parquetファイルの仕様で、最後の8byteはfooterの長さ(4byte、リトルエンディアン)+マジックナンバー(4byte)なので、21 0a 00 00 = 2593byte がfooterの長さと取得できています。
※詳細は事前知識の項のブログを参照

続いて4のGETリクエストを見ていきます

(4) http://localhost:9000/polars-pushdown/data.parquet	GET	206	2.5kb	28ms	

リクエストヘッダー

GET http://localhost:9000/polars-pushdown/data.parquet HTTP/1.1
range: bytes=11039-13639
...(略)

レスポンスヘッダー

HTTP/1.1 206 Partial Content
Accept-Ranges: bytes
Content-Length: 2601
Content-Range: bytes 11039-13639/13640
...(略)

3で取得できたfooterの長さから逆算したfooterの領域をRange Requestで取得しています。
この取得できるfooterの情報はparquet-cliで確認することができます。
parquet footer コマンドでfooterに含まれる全量は確認できますが、長すぎるため parquet meta コマンドの結果を載せておきます。

$ parquet meta data.parquet

File path:  data.parquet
Created by: Polars
Properties:
  ARROW:schema: /////wMBAAAEAAAA8v///xQAAAAEAAEAAAAKAAsACAAKAAQA+P///wwAAAAIAAgAAAAEAAMAAACQAAAAUAAAAAQAAADs////OAAAACAAAAAYAAAAAQIAABAAEgAEABAAEQAIAAAADAAAAAAA9P///0AAAAABAAAACAAJAAQACAAFAAAAc2NvcmUAAADs////LAAAACAAAAAYAAAAARQAABAAEgAEABAAEQAIAAAADAAAAAAA/P///wQABAAFAAAAdmFsdWUAAADs////OAAAACAAAAAYAAAAAQIAABAAEgAEABAAEQAIAAAADAAAAAAA9P///0AAAAABAAAACAAJAAQACAACAAAAaWQA
Schema:
message root {
  optional int64 id;
  optional binary value (STRING);
  optional int64 score;
}


Row group 0:  count: 100  8.40 B records  start: 4  total(compressed): 840 B total(uncompressed):2.118 kB 
--------------------------------------------------------------------------------
       type      encodings count     avg size   nulls   min / max
id     INT64     Z RB_     100       3.16 B     0       "1" / "100"
value  BINARY    Z RB_     100       2.08 B     0       "A" / "Z"
score  INT64     Z RB_     100       3.16 B     0       "0" / "99"

Row group 1:  count: 100  8.65 B records  start: 988  total(compressed): 865 B total(uncompressed):2.118 kB 
--------------------------------------------------------------------------------
       type      encodings count     avg size   nulls   min / max
id     INT64     Z RB_     100       3.41 B     0       "101" / "200"
value  BINARY    Z RB_     100       2.08 B     0       "A" / "Z"
score  INT64     Z RB_     100       3.16 B     0       "0" / "99"

...(以下Row group 9まで)

ポイントは各Row groupにおいて各カラムのmin/maxの値があるため、絞りたい値の存在するRow groupをfooterの情報から特定できることです。今回はid=1を指定しているのでRow group 0に対象となるデータが含まれていると特定できます。

最後に5のGETリクエストです。

(5) http://localhost:9000/polars-pushdown/data.parquet	GET	206	574b	25ms	

リクエストヘッダー

GET http://localhost:9000/polars-pushdown/data.parquet HTTP/1.1
range: bytes=4-577
...(略)

4で取得したRow groupのデータのみに絞ってRange Requestをしています。このリクエストによってid=1のデータが含まれるRow groupのデータのみを取得できたことになります。

以上が全体のリクエストの流れです。ブログの通りParquetのメタデータから必要なRow groupのファイル内での位置を特定し、Range Requestによって部分的にデータを取得していることがわかりました。

まとめ

PolarsがS3からParquetファイルのデータを取得する際、HTTPのRange Requestを駆使してメタデータのみを取得、更にメタデータからわかるRow groupのみを取得することでファイル全体を取得せずに必要な箇所のみ取得していることがわかりました。id=1のように大きく絞り込みが行われるクエリにおいては、Parquetファイルのrow_group_sizeを小さく設定したほうが大きくRow groupを絞り込むことができそうですが、メタデータが肥大化することや絞り込みがあまり行われないクエリのオーバーヘッド増加などトレードオフがありそうです。時間がある時にまた検証してみたいです。

株式会社ログラス テックブログ

Discussion