👏

Pandas-likeなラッパー、Ibis触ってみた

2022/12/25に公開

気にはなってるけど触ってないビッグデータ系のツール・サービスを触る Advent Calendar 2022の#25です。

tl;dr

  • Pandas-likeな記法で、各種データベース(SQLiteやPostgreSQL)や分散処理エンジン(DaskやSpark)にアクセスできるよ
  • 基本的には処理は各種データベースや分散処理エンジン側で実行されるよ
    • =ローカルマシンに載らないデータ量も処理できるはず

Ibisとは

公式ページ曰く、

Ibis is a Python framework to access data and perform analytical computations from different sources, in a standard way.

です。色々なデータソース・処理環境(Backend)へのアクセス・処理を抽象化するPythonライブラリで、PandasのDataframeに近い雰囲気でデータを操作することができます。

Backendとしては、

  • DWH系(Impala、ClickHouse、BigQuery、HeavyAI)
  • RDB系(MySQL、PostgreSQL)
  • 組み込みのDB系(SQLite、DuckDB)
  • 処理エンジン系(Spark、Pandas、Dask、Datafusion、Polars)

などに対応しています(対応状況はこちら。また、TrinoやSnowflakeなど開発中のものもあります)。

「SQLAlchemy + 処理エンジンの対応 + Pandas-likeな構文」のイメージが近いかもしれません。

Pandas・SQLAlchemyじゃダメなの

Ibisが提供する機能に関して、

  • SQLAlchemy
  • PandasのテーブルをPandas DataFrameに読み込むメソッド(read_sql_table・read_sql_query)

でも良いのでは?と疑問に思われるかもしれません。

Ibisの差別化ポイントとしては、

  • SQLAlchemyが対応していない処理エンジン系(e.g. SparkやDask)の利用、統一的な記法
    • 例えば最初はPandasをBackendとして開発して、データ量が増えたら分散できるエンジン(e.g. Spark)に変えたり、高速化(e.g. Polars、DuckDB)に変えたりする
  • Pandasでread_sql_table+PandasのDataFrame操作を行う場合に比べ、データ操作がBackend側で行われる
  • SQLAlchemy + Pandasに比べ、(Pandas-likeな)一つの記法に統一できる

あたりがありそうです。

なお、IbisはSQLALchmeyもPandasも利用しています(SQLAlchemyはRDB系のアクセスを使う場合のみ)。

試してみる(SQLite)

Ubuntu 20.04 (Windows10のWSL2上)、Python3.8で試しました。

準備

IbisのパッケージとBackend固有のパッケージをインストールします。pypiに登録されているibisは無関係なプロジェクトなので、そちらをインストールしないようにしましょう。

pip install ibis-framework 'ibis-framework[sqlite]'

チュートリアル用のSQLiteのファイルを提供してくれているので、ダウンロードします。

curl -LsS -o geography.db 'https://storage.googleapis.com/ibis-tutorial-data/geography.db

中身も見ておきましょう。

sqlite3 geography.db
SQLite version 3.31.1 2020-01-27 19:55:54
Enter ".help" for usage hints.
sqlite> .tables
countries     gdp           independence
sqlite> SELECT * FROM countries LIMIT 1;
AD|AND|20|AN|Andorra|Andorra la Vella|468.0|84000|EU

データアクセス

接続先を指定しconnectionメソッドを実行し、以降の処理に使用するBackend(SQLiteの場合はibis.backends.sqlite.Backend)を取得します。

import ibis
connection = ibis.sqlite.connect('geography.db')

接続出来た事の確認の意味で、テーブルの一覧を取得します。

connection.list_tables()
Out[3]: ['countries', 'gdp', 'independence']

接続出来ていそうですね。データも見てみます。

t = connection.table('countries')
t.limit(3).execute()

Out[7]:
  iso_alpha2 iso_alpha3  iso_numeric fips                  name           capital  area_km2  population continent
0         AD        AND           20   AN               Andorra  Andorra la Vella     468.0       84000        EU
1         AE        ARE          784   AE  United Arab Emirates         Abu Dhabi   82880.0     4975593        AS
2         AF        AFG            4   AF           Afghanistan             Kabul  647500.0    29121286        AS

Pandasに近い表記で、フィルターもできます。

t.filter(t['continent'] == 'AN').execute()
Out[12]:
  iso_alpha2 iso_alpha3  iso_numeric fips                                      name            capital    area_km2  population continent
0         AQ        ATA           10   AY                                Antarctica                     14000000.0           0        AN
1         BV        BVT           74   BV                             Bouvet Island                           49.0           0        AN
2         GS        SGS          239   SX  South Georgia and South Sandwich Islands          Grytviken      3903.0          30        AN
3         HM        HMD          334   HM         Heard Island and McDonald Islands                          412.0           0        AN
4         TF        ATF          260   FS               French Southern Territories  Port-aux-Francais      7829.0         140        AN

ちなみにexeucteのレスポンスはPandasのData Frameです

type(t.filter(t['continent'] == 'AN').execute())
Out[140]: pandas.core.frame.DataFrame

ログ

verboseオプションを有効にすると実行に対応するSQLクエリが出力されるようになります。

ibis.options.verbose = True

t.filter(t['continent'] == 'AN')
Out[32]: SELECT t0.iso_alpha2, t0.iso_alpha3, t0.iso_numeric, t0.fips, t0.name, t0.capital, t0.area_km2, t0.population, t0.continent
FROM main.countries AS t0
WHERE t0.continent = ?
 LIMIT ? OFFSET ?

  iso_alpha2 iso_alpha3  iso_numeric fips                                      name            capital    area_km2  population continent
0         AQ        ATA           10   AY                                Antarctica                     14000000.0           0        AN
1         BV        BVT           74   BV                             Bouvet Island                           49.0           0        AN
2         GS        SGS          239   SX  South Georgia and South Sandwich Islands          Grytviken      3903.0          30        AN
3         HM        HMD          334   HM         Heard Island and McDonald Islands                          412.0           0        AN
4         TF        ATF          260   FS               French Southern Territories  Port-aux-Francais      7829.0         140        AN

interactive mode

デフォルトではlazy mode(executeメソッドまで実行されない)ですが、interative modeにするとexecuteを省略できるようになります。

ibis.options.interactive = True
t.filter(t['continent'] == 'AN')
Out[15]:
  iso_alpha2 iso_alpha3  iso_numeric fips                                      name            capital    area_km2  population continent
0         AQ        ATA           10   AY                                Antarctica                     14000000.0           0        AN
1         BV        BVT           74   BV                             Bouvet Island                           49.0           0        AN
2         GS        SGS          239   SX  South Georgia and South Sandwich Islands          Grytviken      3903.0          30        AN
3         HM        HMD          334   HM         Heard Island and McDonald Islands                          412.0           0        AN
4         TF        ATF          260   FS               French Southern Territories  Port-aux-Francais      7829.0         140        AN

ただし、本番環境やデータ量が多い場合はinteractive modeを使用しない方が良いそうです。

というのも、executeメソッドの実行では、

  • Ibisの式(filterとか)をBackendで実行できる形式にコンパイル
    • BackendによってSQLAlchemyだったりPySparkだったりのObject
  • Backendに送信
  • Backendからのレスポンスを、Pandas DataFrameに変換

の処理を行います。つまり、暗黙的にexecuteが実行されるinteractive modeでは、式を作る毎にリクエスト、ローカルマシンのメモリへの結果の格納が行われます。

例えば、下記のように、interactive modeかつ、ふたつのクエリに分けて実行すると、(filterの結果次第では)大量のデータがローカルマシンのメモリの載る可能性があります。

ibis.options.interactive = True
# ここで一度Backendでのクエリの実行と、**limit前の全データの**ローカルマシンのメモリへの結果の格納が行われます(verboseオプションを有効にすると確認できます)
x = gdp_t.filter(gdp_t["country_code"] == "AND")
# ここで再度、Backendでのクエリの実行と、ローカルマシンのメモリへの結果の格納が行われます
# この時xのクエリは結果は使われないです
y = x.limit(2)

代わりに、一つのクエリとして実行するか、

# 一度だけクエリが実行され、limitの後の結果だけがローカルマシンのメモリに
y = gdp_t.filter(gdp_t["country_code"] == "AND").limit(2)

あるいは、二つのクエリにする場合でも、interactive modeを切って(lazy mode)実行すると、途中の結果をローカルマシンのメモリに載せずにすみます。

ibis.options.interactive = False
# ここではクエリの実行も、結果のメモリへの格納も行われません
x = gdp_t.filter(gdp_t["country_code"] == "AND")
# ここで、Backendでのクエリの実行と、ローカルマシンのメモリへの結果の格納が行われます
y = x.limit(2)

ただし、以下の実行例ではinteractive modeでのコードを記載します。lazy modeで実行する場合は、適当にexecuteを追加してください。

集計・JOIN

  • group_byでグループ化するカラムを指定
  • aggregateで集計する方法・カラムを指定

することで、集計を行うことができます。

t.group_by(t['continent']).aggregate([t['population'].sum().name('population'), t['population'].count().name('countries')])
Out[23]:
  continent  population  countries
0        AF  1021238685         58
1        AN         170          5
2        AS  4130584841         51
3        EU   750724554         54
4        NA   540204371         42
5        OC    36067549         28
6        SA   400143568         14

JOIN先のテーブルと、JOINの条件を指定してJOINも行えます。

t.inner_join(gdp_t, predicates=t['iso_alpha3']==gdp_t['country_code']).filter(gdp_t['year'] == 2017).sort_by(ibis.desc('value')).limit(2)
Out[49]: 
  iso_alpha2 iso_alpha3  iso_numeric fips           name     capital   area_km2  population continent country_code  year         value
0         US        USA          840   US  United States  Washington  9629091.0   310232863        NA          USA  2017  1.948539e+13
1         CN        CHN          156   CH          China     Beijing  9596960.0  1330044000        AS          CHN  2017  1.214349e+13

なお、事前にフィルタリングしておき、その結果とJOINすることも可能です。

latest_gdp_t = gdp_t.filter(gdp_t['year'] == 2017)
t.inner_join(latest_gdp_t, predicates=t['iso_alpha3']==latest_gdp_t['country_code']).sort_by(ibis.desc('value')).limit(2)
Out[57]:
  iso_alpha2 iso_alpha3  iso_numeric fips           name     capital   area_km2  population continent country_code  year         value
0         US        USA          840   US  United States  Washington  9629091.0   310232863        NA          USA  2017  1.948539e+13
1         CN        CHN          156   CH          China     Beijing  9596960.0  1330044000        AS          CHN  2017  1.214349e+13

書き込み

テーブルに保存もできます。

top_2 = t.inner_join(latest_gdp_t, predicates=t['iso_alpha3']==latest_gdp_t['country_code']).sort_by(ibis.desc('value')).limit(2)
connection.create_table('top_2', top_2)
sqlite3 geography.db 'SELECT * FROM top_2'
US|USA|840|US|United States|Washington|9629091.0|310232863|NA|USA|2017|19485393853000.0
CN|CHN|156|CH|China|Beijing|9596960.0|1330044000|AS|CHN|2017|12143491448186.1

追記・上書きも可能です。

top_3 = t.inner_join(latest_gdp_t, predicates=t['iso_alpha3']==latest_gdp_t['country_code']).sort_by(ibis.desc('value')).limit(3)
 connection.insert('top_2', top_3)
# 追記なのでデータが重複しています
sqlite3 geography.db 'SELECT * FROM top_2'
US|USA|840|US|United States|Washington|9629091.0|310232863|NA|USA|2017|19485393853000.0
CN|CHN|156|CH|China|Beijing|9596960.0|1330044000|AS|CHN|2017|12143491448186.1
US|USA|840|US|United States|Washington|9629091.0|310232863|NA|USA|2017|19485393853000.0
CN|CHN|156|CH|China|Beijing|9596960.0|1330044000|AS|CHN|2017|12143491448186.1
JP|JPN|392|JA|Japan|Tokyo|377835.0|127288000|AS|JPN|2017|4859950558538.97
connection.insert('top_2', top_3, overwrite=True)
sqlite3 geography.db 'SELECT * FROM top_2'
US|USA|840|US|United States|Washington|9629091.0|310232863|NA|USA|2017|19485393853000.0
CN|CHN|156|CH|China|Beijing|9596960.0|1330044000|AS|CHN|2017|12143491448186.1
JP|JPN|392|JA|Japan|Tokyo|377835.0|127288000|AS|JPN|2017|4859950558538.97

Backendを変えてみる(PostgreSQL)

SQLite以外のBackendの例として、PostgreSQLを試してみます。

準備

動作確認のPostgreSQLはコンテナで起動します。docker-compose.yamlを下のように記載し、コンテナを起動します。

version: '3.1'

services:
  db:
    image: postgres:15.1
    restart: always
    environment:
      POSTGRES_PASSWORD: admin
    ports:
      - 5433:5432
    volumes:
      - ./postgresql:/var/lib/postgresql/data
    command: ["postgres", "-c", "log_statement=all"]
# コンテナを起動
docker-compose up

適当にデータを入れておきます。

psql -h 127.0.0.1 -p 5433 -U postgres
CREATE TABLE countries(iso_alpha2 varchar, memo varchar);
INSERT INTO countries VALUES ('AD', 'hogehoge');

IbisではBackend毎にパッケージがわかれているので、PostgreSQLの分をインストールします。

pip install 'ibis-framework[postgres]'

接続

接続先の指定以外はSQLiteの場合と同じように接続できます。

pg_connection = ibis.postgres.connect( host='localhost', user='postgres', password='admin', port=5433, database="postgres")
pg_t = pg_connection.table("countries")
pg_t.filter(pg_t.iso_alpha2 == 'AD')
Out[197]:
  iso_alpha2      memo
0         AD  hogehoge

処理がDB側で実行されていることを確認

Ibisの処理がBackend側で本当に処理されている事を、念のため確認してみます。

verboseオプションを有効にして、先ほどのfilterを実行すると以下のようなクエリが表示されます。

SELECT t0.iso_alpha2, t0.memo
FROM countries AS t0
WHERE t0.iso_alpha2 = %(param_1)s
 LIMIT %(param_2)s

上のクエリに対応するクエリが、Backend(PostgreSQL)で実行されている事を、PostgreSQLのログから確認もできます。

docker logs ibis-db-1 1>/dev/null 2> >(tail -n10 >&2)
2022-12-24 10:26:43.255 UTC [1162] LOG:  statement: COMMIT
2022-12-24 10:26:58.466 UTC [1162] LOG:  statement: BEGIN
2022-12-24 10:26:58.466 UTC [1162] LOG:  statement: SHOW TIMEZONE
2022-12-24 10:26:58.467 UTC [1162] LOG:  statement: SET TIMEZONE = UTC
2022-12-24 10:26:58.467 UTC [1162] LOG:  statement: SELECT t0.iso_alpha2, t0.memo
        FROM countries AS t0
        WHERE t0.iso_alpha2 = 'AD'
         LIMIT 10000
2022-12-24 10:26:58.468 UTC [1162] LOG:  statement: SET TIMEZONE = 'Etc/UTC'
2022-12-24 10:26:58.469 UTC [1162] LOG:  statement: COMMIT

UDF

PostgreSQLを含むいくつかのBackendは、Backend側で動くPython処理(UDF)を記載することができます。

なお、UDFとは別に、Backendへの変換処理を追加するadd_operationも別に存在します。
(こちらはすべてのBackendで対応しているはず)

準備

plpythonをインストール・有効化します。

Dockerfile

FROM postgres:15.1

RUN apt-get update
RUN apt-get -y install python3 postgresql-plpython3-15

docker-compose.yaml

version: '3.1'

services:
  db:
    # image: postgres:15.1
    build: .
    restart: always
    environment:
      POSTGRES_PASSWORD: admin
    ports:
      - 5433:5432
    volumes:
      - ./postgresql/data:/var/lib/postgresql/data
      - ./postgresql/init:/docker-entrypoint-initdb.d
    command: ["postgres", "-c", "log_statement=all"]

postgresql/init/init.sql

create extension plpython3u;

PostgreSQLのコンテナを再起動します。

docker-compose down
docker-compose up

UDF登録

適当なメソッドを作成します。

def concat_suffixvalue(value):
    return value + "_with_udf"

登録します。二つ目の引数は入力の型、三つの引数はレスポンスの型です。

import ibis.expr.datatypes as dt
concat_suffixvalue_udf =  pg_connection.udf(concat_suffixvalue, [dt.string()], dt.string(), language="plpython3u", replace=True)

UDF実行

UDFの引数に対応する式を、udfメソッドのレスポンス(concat_suffixvalue_udf)に渡してやるだけです。

concat_suffixvalue_udf(pg_t['memo'])
Out[395]:
0    hogehoge_with_udf

ローカルのPythonではなく、PostgreSQL側で実行されていることは、verboseオプションの結果、またPostgreSQLのログで確認できます。

verboseオプションで表示されるSQL

SELECT concat_suffixvalue(t0.memo) AS tmp
FROM countries AS t0
 LIMIT %(param_1)s

PostgreSQLのログ(抜粋)

ibis-db-1       | 2022-12-24 23:19:39.639 UTC [358] LOG:  statement: SELECT concat_suffixvalue(t0.memo) AS tmp
ibis-db-1       |       FROM countries AS t0
ibis-db-1       |        LIMIT 10000

BackendをまたいだJOIN

異なるBackend(SQLiteとPostgreSQLなど)間のデータの組み合わせは出来ないようです。

pg_t.inner_join(t, predicates=pg_t['iso_alpha2']==t['iso_alpha2'])
# (エラーメッセージ一部省略)
File ~/project/ibis/venv2/lib/python3.8/site-packages/ibis/expr/types/core.py:262, in Expr._find_backend(self, use_default)
    259     return default
    261 if len(backends) > 1:
--> 262     raise ValueError('Multiple backends found')
    264 return backends[0]

ValueError: Multiple backends found

Pandas Backendを経由すればやれないこともないです(ただしデータをメモリに載ってしまうはず)。

pip install 'ibis-framework[pandas]'
pd_connection = ibis.pandas.connect({'sqlite_countries': t.execute(), 'postgre_countries': pg_t.execute()})
pd_t = pd_connection.table('sqlite_countries')
pg_pd_t = pd_connection.table('postgre_countries')

pg_pd_t.inner_join(pd_t, predicates=pg_pd_t['iso_alpha2']==pd_t['iso_alpha2'])
  iso_alpha3_x iso_alpha2_x iso_alpha2_y iso_alpha3_y  iso_numeric fips           name     capital   area_km2  population continent
0          USA           US           US          USA          840   US  United States  Washington  9629091.0   310232863        NA
1          JPN           JP           JP          JPN          392   JA          Japan       Tokyo   377835.0   127288000        AS

Fugue

IbisをさらにラップするFugueというのもあるらしいです。力尽きたので、詳しい人は教えてください。

https://fugue-tutorials.readthedocs.io/index.html

Discussion