awswrangler(Athena)でのデータ抽出をPolarsへ置き換えて高速化する

はじめに

こんにちは。D2Cデータサイエンティストの名越です。

Pythonでs3にある大規模なテーブルデータを多少の条件をつけて抽出する際どのように取得していますでしょうか?
Pandasを使ってデータ処理をする場合は大規模データだと重くなってしまうためできるだけ事前に処理をしたものを取り出したいですよね。

私の所属している部署では awswrangler を用いてAthenaでデータ取得することができる環境が整っているため、私はそこまで深く考えることもなくシステム内ではこのやり方でデータを抽出することが多いです。

本記事では既にある程度十分に速いawswranglerを用いた大規模データの条件付きのデータ抽出を、そのパフォーマンスの良さで注目を浴びている表計算ライブラリ Polars へ置き換えてさらに高速化できるのかを検証してみようと思います。

開発環境

検証については以下の環境で行っています

  • AWS EC2 (インスタンスタイプ: r5.2xlarge)
  • awswrangler 2.20.0
  • pandas 1.5.1
  • polars 0.19.12

Polarsについて

Pandasが支配的であるPythonの表計算ライブラリにおいて、Pandasよりも高速なデータフレーム処理ができるとして注目を集めているRust製のライブラリです。こちらのライブラリはRustとPythonで利用することができます。(今回はPythonで行います)

大まかに以下のような特徴があります。

  • RustやApach Arrowを用いているため高速
  • 並列処理を行うことができる
  • 「Polars Expressions」と呼ばれるメソッドチェーンを繋いで記述するようなデータ処理の仕方
  • インデックスがなく、遅延評価を行うことができる

Polarsでのデータ処理の記述の仕方

polarsでの簡単なデータ処理の仕方の一例を代表的な表計算ライブラリであるPandasと比較しながら下記に示します。

import pandas as pd
import polars as pl
#pandas
df = pd.read_csv('path/file.csv')
df = (df[(df['A']==1) & (df['B']>30)]
      .groupby('B')
      .count()
      .rename(columns={'A':'count'})
     )
df = df[df['count'] <= 10000]

#polars
df = pl.read_csv('path/file.csv')
df = (df.filter((pl.col('A')==1) & (pl.col('B')>30))
             .group_by('B')
             .count()
	     .filter(pl.col('count')<=10000)
            )

このようにPolarsはPandasに近い書き方の要素もありつつ、Pandasと違いデータフレームに処理を付け足していく様なイメージで書きます。
ちなみに200万行のデータで上記の処理を試した際、速度はPolarsの方が約3倍早かったです。

より詳細な各処理の書き方については以下の記事が参考になります。(私も大変参考にさせていただきました)
https://zenn.dev/bee2/articles/e8623a603752ff
https://qiita.com/nkay/items/9cfb2776156dc7e054c8

またPolarsは更新が盛んであるため公式ドキュメントも併せてご確認ください。
Github
公式APIリファレンス
公式ガイド

遅延評価

大まかに言うとこちらから処理の指示をするまで記述した処理を走らせず、こちらから指示をしたタイミングでPolasがクエリの並列処理や最適化をおこなって処理してくれるというものです。
これはpandasにはない機能で、Polarsはこの機能によって更なる高速なデータ処理を行うことができます。

具体的にどの様に最適化処理が入っているかについてはこちらの記事で実際に確認してくださっています。
https://zenn.dev/hiro_torii/articles/06d7e845e146ee

さてPolarsでは遅延評価を

  1. lazy()を差し込み遅延評価とすることを指示(これ以降DataFrame→LazyFrameと呼ばれる様になります)
  2. collect()をLazyFrameに付けて処理の実行
    というシンプルな方法で行うことができます。
    先ほどのPolarsでの記述を遅延評価を行うように記述すると以下の様になります。
df = pl.read_csv('path/file.csv').lazy()
df = (df.filter((pl.col('A')==1) & (pl.col('B')>30))
             .group_by('B')
             .count()
	     .filter(pl.col('count')<=10000)
	     .collect()
            )

なおPolarsではscan_csvというものもあり、これを用いると始めからLazyFrameとしてcsvファイルを読み込むこともできます。
実際に上記の遅延評価ありのコードを試したところ、遅延評価を入れる前と比べさらに1/3程に処理時間が短縮されました。

awswranglerとPolarsの比較

ここからは大規模データをawswranglerで多少処理してから取り出すのとPolarsで取り出してから多少の処理をするのとでどちらが早いかを検証しようと思います。

今回検証用に以下の様なデータを用意し検証しました。

  • データ量: 処理前:約3500万行、処理後:約2500万行
  • カラム数: 処理前:4つ、処理後:1つ
  • パーティション数: 一つ

この様に今回は大規模なデータを多少削減して抽出するような処理を想定します。

まずawswranglerでの抽出時間を見てみます。

import awswrangler as wr
start = time.time()
query = f"""
    SELECT DISTINCT
        A
    FROM
        DB.Table
    WHERE
        partition = 'YYYY-mm-dd'
        AND (
            B >= 1000
            OR C IS NOT NULL
            OR D IS NOT NULL
    )
"""
df = wr.athena.read_sql_query(query, database='DB', ctas_approach=False)
end = time.time()
time_diff = end - start
print(time_diff)

結果:
45.02007699012756秒

次にPolarsを用いた抽出をしてみます。
方法としてはs3にあるDB.Tableの元データを参照して、partition配下で複数に分割されているファイルをfor文で回して取得する流れになります。

import polars as pl
start = time.time()
df_pl = pl.DataFrame().lazy()
for path in path_list:
    df_pl_apart = (pl.read_parquet(f's3://bucket/{path}', columns=['A', 'B', 'C', 'D'])
                   .lazy()
                   .filter((pl.col('B') >= 100) 
                           | (pl.col('C').is_null().not_()) 
                           | (pl.col('D').is_null().not_())
                          )
                  )
    df_pl = pl.concat([df_pl, df_pl_apart])
df_pl_A = df_pl.select(['A'])
end = time.time()
time_diff = end - start
print(time_diff)

結果:
8.76078462600708秒

またwr.athena.read_sql_queryは最終的にPandas.DataFrameの出力となりAthenaのみでの処理速度は分からないため、先ほどのクエリをコンソールで実行したところ以下の結果となりました。

これらの結果から以下のような事がわかります

  • 今回のような条件ではPolarsの方がAthenaより処理が早い
  • 出力のデータ数が大きい場面においてwr.athena.read_sql_query は出力がPandasとなる都合上重い

これらの要因により、今回Polarsのみで抽出を行うことでawswranglerより早くデータの抽出を行う事ができました。

(参考)
参考として今回polarsのみ抽出をPandasで行った場合の速度も計測してみました

start = time.time()
df = pd.DataFrame()
for path in path_list:
    df_apart = pd.read_parquet(f's3://Bucket/{path}')
    df_apart = df_apart[(df_apart['B']>=100)|(~df_apart['C'].isnull())|(~df_apart['D'].isnull())]                  
    df = pd.concat([df, df_apart])
df_A = df[['A']]
end = time.time()
time_diff = end - start
print(time_diff)

結果:
74.63417530059814秒
やはりPandasだと大きなデータの読み込み・処理は比較的重くなってしまうことがわかります。

まとめ

本記事ではPolarsについて簡単に紹介し、大規模データに少し処理を加えて大きなデータを抽出する際の速度についてawswrangler(Athena)とPolarsで比較する検証を行いました。その結果として今回想定した様なケースについてはPolarsのみの方が抽出速度が速くなることがわかりました。

少なくとも処理後のデータも大きい場合においてwr.athena.read_sql_query よりPolarsの方が早いことは確実かと思いますが、どのような場面においてAthenaよりPolarsが早くなるのか、もしくは全ての場面でPolarsの方が早いのかは今後更に使っていく中で確認していけたらと思います。

この記事がPolarsに興味を持ち、大規模データを取り扱う場合についてPolarsが選択肢に加わるきっかけの一つになりましたら幸いです。

参照

https://zenn.dev/bee2/articles/e8623a603752ff
https://qiita.com/nkay/items/9cfb2776156dc7e054c8
https://zenn.dev/hiro_torii/articles/06d7e845e146ee
https://qiita.com/_jinta/items/fac13f09e8e8a5769b79

D2C m-tech

Discussion