🚿

PolarsのStreaming APIについて

2023/12/19に公開

本記事について

本記事はPolars Advent Calendar 2023の17日目の記事です。@Johannyjmさん、素敵な企画ありがとうございます(投稿遅れちゃってすみません...)。

本記事ではPolarsのStreaming機能について記載します。

PolarsのStreaming APIとは?

クエリを実行する際、データを一度に処理するのではなくいくつかのバッチに分割して処理をする仕組みのこと。
https://pola-rs.github.io/polars/user-guide/concepts/streaming/

なぜ使うのか?

  • マシンのメモリを上回るような巨大なデータを処理したい場合、ストリーミング形式でいくつかのバッチに分けて処理を実行することで、限られたメモリ内で処理をすることができる
  • またこちらの記事に書かれているように、将来的に扱うデータが大きくなる可能性があるパイプラインを実装する場合、データ量の変化に対してスケーラブルな処理にするためにストリーミング形式を採用するメリットがある

どう使うのか?

polars.LazyFrame.collect()を用いて遅延評価を実行する際に、パラメータとしてstreaming=Trueを指定する。以下はサンプルコード。

import polars as pl

q = (
    pl.scan_csv("./transactions_train.csv")
    .filter(pl.col("sales_channel_id") == 2)
    .group_by("customer_id")
    .agg(pl.col("price").mean())
)
df = q.collect(streaming=True)

実際にこのクエリを実行したときのメモリをメモリプロファイラのmemrayを使って確認してみる。

%%memray_flamegraph
# 可視化時にわかりやすいように前後に3秒ずつ待機
time.sleep(3)
q.collect(streaming=True)
time.sleep(3)

メモリの使用状況

比較のためにstreaming=Falseの場合も確認してみる。

%%memray_flamegraph
# 可視化時にわかりやすいように前後に3秒ずつ待機
time.sleep(3)
q.collect(streaming=False)
time.sleep(3)

メモリの使用状況

メモリの使用状況の可視化の青色の線(Resident size)が物理メモリ上に確保されたメモリサイズであるが、streaming=Trueにすることでメモリサイズのピーク値が7.9 GB程度から4.7 GB程度と小さくなっていることがわかる。

ストリーミング形式で実行されるかどうかを、どう確認するか

polars.LazyFrame.explain()というquery planを確認できるメソッドがある。これでstreaming=Trueにしたクエリとstreaming=Falseにしたクエリを比較してみる。

streaming=Trueの場合

print(q.explain(streaming=False))

出力:

AGGREGATE
[col("price").mean()] BY [col("customer_id")] FROM
FILTER [(col("sales_channel_id")) == (2)] FROM
Csv SCAN ./transactions_train.csv
PROJECT 3/5 COLUMNS

streaming=Falseの場合

print(q.explain(streaming=False))

出力:

--- PIPELINE
AGGREGATE
[col("price").mean()] BY [col("customer_id")] FROM
FILTER [(col("sales_channel_id")) == (2)] FROM
Csv SCAN ./transactions_train.csv
PROJECT 3/5 COLUMNS --- END PIPELINE

DF []; PROJECT */0 COLUMNS; SELECTION: "None"

explainの結果で ---PIPELINE XXX ---END PIPELINE と表示されている部分が、ストリーミング形式で実行されることを表しているようである。

Streaming処理が可能な処理

現時点での公式ドキュメントによると、すべての処理がストリーミング処理が可能なわけではないが、少なくとも以下の処理は可能とのこと。個人的には欲しいところは概ねカバーされている印象がある。

  • filter,slice,head,tail
  • with_columns,select
  • group_by
  • join
  • sort
  • explode,melt
  • scan_csv,scan_parquet,scan_ipc

まとめ

PolarsのStreaming APIは限られたメモリでサイズの大きなデータセットを扱う必要がある場合に有効で、クエリ実行時にパラメータ指定するだけで簡単に実行できる。ただし現時点で実行可能なメソッドは限られており、まだ開発中の機能のため、今後の変更に注目していきたい。

Discussion