Pandas-likeなラッパー、Ibis触ってみた
気にはなってるけど触ってないビッグデータ系のツール・サービスを触る Advent Calendar 2022の#25です。
tl;dr
- Pandas-likeな記法で、各種データベース(SQLiteやPostgreSQL)や分散処理エンジン(DaskやSpark)にアクセスできるよ
- 基本的には処理は各種データベースや分散処理エンジン側で実行されるよ
- =ローカルマシンに載らないデータ量も処理できるはず
Ibisとは
公式ページ曰く、
Ibis is a Python framework to access data and perform analytical computations from different sources, in a standard way.
です。色々なデータソース・処理環境(Backend)へのアクセス・処理を抽象化するPythonライブラリで、PandasのDataframeに近い雰囲気でデータを操作することができます。
Backendとしては、
- DWH系(Impala、ClickHouse、BigQuery、HeavyAI)
- RDB系(MySQL、PostgreSQL)
- 組み込みのDB系(SQLite、DuckDB)
- 処理エンジン系(Spark、Pandas、Dask、Datafusion、Polars)
などに対応しています(対応状況はこちら。また、TrinoやSnowflakeなど開発中のものもあります)。
「SQLAlchemy + 処理エンジンの対応 + Pandas-likeな構文」のイメージが近いかもしれません。
Pandas・SQLAlchemyじゃダメなの
Ibisが提供する機能に関して、
- SQLAlchemy
- PandasのテーブルをPandas DataFrameに読み込むメソッド(read_sql_table・read_sql_query)
でも良いのでは?と疑問に思われるかもしれません。
Ibisの差別化ポイントとしては、
- SQLAlchemyが対応していない処理エンジン系(e.g. SparkやDask)の利用、統一的な記法
- 例えば最初はPandasをBackendとして開発して、データ量が増えたら分散できるエンジン(e.g. Spark)に変えたり、高速化(e.g. Polars、DuckDB)に変えたりする
- Pandasでread_sql_table+PandasのDataFrame操作を行う場合に比べ、データ操作がBackend側で行われる
- SQLAlchemy + Pandasに比べ、(Pandas-likeな)一つの記法に統一できる
あたりがありそうです。
なお、IbisはSQLALchmeyもPandasも利用しています(SQLAlchemyはRDB系のアクセスを使う場合のみ)。
試してみる(SQLite)
Ubuntu 20.04 (Windows10のWSL2上)、Python3.8で試しました。
準備
IbisのパッケージとBackend固有のパッケージをインストールします。pypiに登録されているibisは無関係なプロジェクトなので、そちらをインストールしないようにしましょう。
pip install ibis-framework 'ibis-framework[sqlite]'
チュートリアル用のSQLiteのファイルを提供してくれているので、ダウンロードします。
curl -LsS -o geography.db 'https://storage.googleapis.com/ibis-tutorial-data/geography.db
中身も見ておきましょう。
sqlite3 geography.db
SQLite version 3.31.1 2020-01-27 19:55:54
Enter ".help" for usage hints.
sqlite> .tables
countries gdp independence
sqlite> SELECT * FROM countries LIMIT 1;
AD|AND|20|AN|Andorra|Andorra la Vella|468.0|84000|EU
データアクセス
接続先を指定しconnectionメソッドを実行し、以降の処理に使用するBackend(SQLiteの場合はibis.backends.sqlite.Backend)を取得します。
import ibis
connection = ibis.sqlite.connect('geography.db')
接続出来た事の確認の意味で、テーブルの一覧を取得します。
connection.list_tables()
Out[3]: ['countries', 'gdp', 'independence']
接続出来ていそうですね。データも見てみます。
t = connection.table('countries')
t.limit(3).execute()
Out[7]:
iso_alpha2 iso_alpha3 iso_numeric fips name capital area_km2 population continent
0 AD AND 20 AN Andorra Andorra la Vella 468.0 84000 EU
1 AE ARE 784 AE United Arab Emirates Abu Dhabi 82880.0 4975593 AS
2 AF AFG 4 AF Afghanistan Kabul 647500.0 29121286 AS
Pandasに近い表記で、フィルターもできます。
t.filter(t['continent'] == 'AN').execute()
Out[12]:
iso_alpha2 iso_alpha3 iso_numeric fips name capital area_km2 population continent
0 AQ ATA 10 AY Antarctica 14000000.0 0 AN
1 BV BVT 74 BV Bouvet Island 49.0 0 AN
2 GS SGS 239 SX South Georgia and South Sandwich Islands Grytviken 3903.0 30 AN
3 HM HMD 334 HM Heard Island and McDonald Islands 412.0 0 AN
4 TF ATF 260 FS French Southern Territories Port-aux-Francais 7829.0 140 AN
ちなみにexeucteのレスポンスはPandasのData Frameです
type(t.filter(t['continent'] == 'AN').execute())
Out[140]: pandas.core.frame.DataFrame
ログ
verboseオプションを有効にすると実行に対応するSQLクエリが出力されるようになります。
ibis.options.verbose = True
t.filter(t['continent'] == 'AN')
Out[32]: SELECT t0.iso_alpha2, t0.iso_alpha3, t0.iso_numeric, t0.fips, t0.name, t0.capital, t0.area_km2, t0.population, t0.continent
FROM main.countries AS t0
WHERE t0.continent = ?
LIMIT ? OFFSET ?
iso_alpha2 iso_alpha3 iso_numeric fips name capital area_km2 population continent
0 AQ ATA 10 AY Antarctica 14000000.0 0 AN
1 BV BVT 74 BV Bouvet Island 49.0 0 AN
2 GS SGS 239 SX South Georgia and South Sandwich Islands Grytviken 3903.0 30 AN
3 HM HMD 334 HM Heard Island and McDonald Islands 412.0 0 AN
4 TF ATF 260 FS French Southern Territories Port-aux-Francais 7829.0 140 AN
interactive mode
デフォルトではlazy mode(executeメソッドまで実行されない)ですが、interative modeにするとexecuteを省略できるようになります。
ibis.options.interactive = True
t.filter(t['continent'] == 'AN')
Out[15]:
iso_alpha2 iso_alpha3 iso_numeric fips name capital area_km2 population continent
0 AQ ATA 10 AY Antarctica 14000000.0 0 AN
1 BV BVT 74 BV Bouvet Island 49.0 0 AN
2 GS SGS 239 SX South Georgia and South Sandwich Islands Grytviken 3903.0 30 AN
3 HM HMD 334 HM Heard Island and McDonald Islands 412.0 0 AN
4 TF ATF 260 FS French Southern Territories Port-aux-Francais 7829.0 140 AN
ただし、本番環境やデータ量が多い場合はinteractive modeを使用しない方が良いそうです。
というのも、executeメソッドの実行では、
- Ibisの式(filterとか)をBackendで実行できる形式にコンパイル
- BackendによってSQLAlchemyだったりPySparkだったりのObject
- Backendに送信
- Backendからのレスポンスを、Pandas DataFrameに変換
の処理を行います。つまり、暗黙的にexecuteが実行されるinteractive modeでは、式を作る毎にリクエスト、ローカルマシンのメモリへの結果の格納が行われます。
例えば、下記のように、interactive modeかつ、ふたつのクエリに分けて実行すると、(filterの結果次第では)大量のデータがローカルマシンのメモリの載る可能性があります。
ibis.options.interactive = True
# ここで一度Backendでのクエリの実行と、**limit前の全データの**ローカルマシンのメモリへの結果の格納が行われます(verboseオプションを有効にすると確認できます)
x = gdp_t.filter(gdp_t["country_code"] == "AND")
# ここで再度、Backendでのクエリの実行と、ローカルマシンのメモリへの結果の格納が行われます
# この時xのクエリは結果は使われないです
y = x.limit(2)
代わりに、一つのクエリとして実行するか、
# 一度だけクエリが実行され、limitの後の結果だけがローカルマシンのメモリに
y = gdp_t.filter(gdp_t["country_code"] == "AND").limit(2)
あるいは、二つのクエリにする場合でも、interactive modeを切って(lazy mode)実行すると、途中の結果をローカルマシンのメモリに載せずにすみます。
ibis.options.interactive = False
# ここではクエリの実行も、結果のメモリへの格納も行われません
x = gdp_t.filter(gdp_t["country_code"] == "AND")
# ここで、Backendでのクエリの実行と、ローカルマシンのメモリへの結果の格納が行われます
y = x.limit(2)
ただし、以下の実行例ではinteractive modeでのコードを記載します。lazy modeで実行する場合は、適当にexecuteを追加してください。
集計・JOIN
- group_byでグループ化するカラムを指定
- aggregateで集計する方法・カラムを指定
することで、集計を行うことができます。
t.group_by(t['continent']).aggregate([t['population'].sum().name('population'), t['population'].count().name('countries')])
Out[23]:
continent population countries
0 AF 1021238685 58
1 AN 170 5
2 AS 4130584841 51
3 EU 750724554 54
4 NA 540204371 42
5 OC 36067549 28
6 SA 400143568 14
JOIN先のテーブルと、JOINの条件を指定してJOINも行えます。
t.inner_join(gdp_t, predicates=t['iso_alpha3']==gdp_t['country_code']).filter(gdp_t['year'] == 2017).sort_by(ibis.desc('value')).limit(2)
Out[49]:
iso_alpha2 iso_alpha3 iso_numeric fips name capital area_km2 population continent country_code year value
0 US USA 840 US United States Washington 9629091.0 310232863 NA USA 2017 1.948539e+13
1 CN CHN 156 CH China Beijing 9596960.0 1330044000 AS CHN 2017 1.214349e+13
なお、事前にフィルタリングしておき、その結果とJOINすることも可能です。
latest_gdp_t = gdp_t.filter(gdp_t['year'] == 2017)
t.inner_join(latest_gdp_t, predicates=t['iso_alpha3']==latest_gdp_t['country_code']).sort_by(ibis.desc('value')).limit(2)
Out[57]:
iso_alpha2 iso_alpha3 iso_numeric fips name capital area_km2 population continent country_code year value
0 US USA 840 US United States Washington 9629091.0 310232863 NA USA 2017 1.948539e+13
1 CN CHN 156 CH China Beijing 9596960.0 1330044000 AS CHN 2017 1.214349e+13
書き込み
テーブルに保存もできます。
top_2 = t.inner_join(latest_gdp_t, predicates=t['iso_alpha3']==latest_gdp_t['country_code']).sort_by(ibis.desc('value')).limit(2)
connection.create_table('top_2', top_2)
sqlite3 geography.db 'SELECT * FROM top_2'
US|USA|840|US|United States|Washington|9629091.0|310232863|NA|USA|2017|19485393853000.0
CN|CHN|156|CH|China|Beijing|9596960.0|1330044000|AS|CHN|2017|12143491448186.1
追記・上書きも可能です。
top_3 = t.inner_join(latest_gdp_t, predicates=t['iso_alpha3']==latest_gdp_t['country_code']).sort_by(ibis.desc('value')).limit(3)
connection.insert('top_2', top_3)
# 追記なのでデータが重複しています
sqlite3 geography.db 'SELECT * FROM top_2'
US|USA|840|US|United States|Washington|9629091.0|310232863|NA|USA|2017|19485393853000.0
CN|CHN|156|CH|China|Beijing|9596960.0|1330044000|AS|CHN|2017|12143491448186.1
US|USA|840|US|United States|Washington|9629091.0|310232863|NA|USA|2017|19485393853000.0
CN|CHN|156|CH|China|Beijing|9596960.0|1330044000|AS|CHN|2017|12143491448186.1
JP|JPN|392|JA|Japan|Tokyo|377835.0|127288000|AS|JPN|2017|4859950558538.97
connection.insert('top_2', top_3, overwrite=True)
sqlite3 geography.db 'SELECT * FROM top_2'
US|USA|840|US|United States|Washington|9629091.0|310232863|NA|USA|2017|19485393853000.0
CN|CHN|156|CH|China|Beijing|9596960.0|1330044000|AS|CHN|2017|12143491448186.1
JP|JPN|392|JA|Japan|Tokyo|377835.0|127288000|AS|JPN|2017|4859950558538.97
Backendを変えてみる(PostgreSQL)
SQLite以外のBackendの例として、PostgreSQLを試してみます。
準備
動作確認のPostgreSQLはコンテナで起動します。docker-compose.yamlを下のように記載し、コンテナを起動します。
version: '3.1'
services:
db:
image: postgres:15.1
restart: always
environment:
POSTGRES_PASSWORD: admin
ports:
- 5433:5432
volumes:
- ./postgresql:/var/lib/postgresql/data
command: ["postgres", "-c", "log_statement=all"]
# コンテナを起動
docker-compose up
適当にデータを入れておきます。
psql -h 127.0.0.1 -p 5433 -U postgres
CREATE TABLE countries(iso_alpha2 varchar, memo varchar);
INSERT INTO countries VALUES ('AD', 'hogehoge');
IbisではBackend毎にパッケージがわかれているので、PostgreSQLの分をインストールします。
pip install 'ibis-framework[postgres]'
接続
接続先の指定以外はSQLiteの場合と同じように接続できます。
pg_connection = ibis.postgres.connect( host='localhost', user='postgres', password='admin', port=5433, database="postgres")
pg_t = pg_connection.table("countries")
pg_t.filter(pg_t.iso_alpha2 == 'AD')
Out[197]:
iso_alpha2 memo
0 AD hogehoge
処理がDB側で実行されていることを確認
Ibisの処理がBackend側で本当に処理されている事を、念のため確認してみます。
verboseオプションを有効にして、先ほどのfilterを実行すると以下のようなクエリが表示されます。
SELECT t0.iso_alpha2, t0.memo
FROM countries AS t0
WHERE t0.iso_alpha2 = %(param_1)s
LIMIT %(param_2)s
上のクエリに対応するクエリが、Backend(PostgreSQL)で実行されている事を、PostgreSQLのログから確認もできます。
docker logs ibis-db-1 1>/dev/null 2> >(tail -n10 >&2)
2022-12-24 10:26:43.255 UTC [1162] LOG: statement: COMMIT
2022-12-24 10:26:58.466 UTC [1162] LOG: statement: BEGIN
2022-12-24 10:26:58.466 UTC [1162] LOG: statement: SHOW TIMEZONE
2022-12-24 10:26:58.467 UTC [1162] LOG: statement: SET TIMEZONE = UTC
2022-12-24 10:26:58.467 UTC [1162] LOG: statement: SELECT t0.iso_alpha2, t0.memo
FROM countries AS t0
WHERE t0.iso_alpha2 = 'AD'
LIMIT 10000
2022-12-24 10:26:58.468 UTC [1162] LOG: statement: SET TIMEZONE = 'Etc/UTC'
2022-12-24 10:26:58.469 UTC [1162] LOG: statement: COMMIT
UDF
PostgreSQLを含むいくつかのBackendは、Backend側で動くPython処理(UDF)を記載することができます。
なお、UDFとは別に、Backendへの変換処理を追加するadd_operationも別に存在します。
(こちらはすべてのBackendで対応しているはず)
準備
plpythonをインストール・有効化します。
Dockerfile
FROM postgres:15.1
RUN apt-get update
RUN apt-get -y install python3 postgresql-plpython3-15
docker-compose.yaml
version: '3.1'
services:
db:
# image: postgres:15.1
build: .
restart: always
environment:
POSTGRES_PASSWORD: admin
ports:
- 5433:5432
volumes:
- ./postgresql/data:/var/lib/postgresql/data
- ./postgresql/init:/docker-entrypoint-initdb.d
command: ["postgres", "-c", "log_statement=all"]
postgresql/init/init.sql
create extension plpython3u;
PostgreSQLのコンテナを再起動します。
docker-compose down
docker-compose up
UDF登録
適当なメソッドを作成します。
def concat_suffixvalue(value):
return value + "_with_udf"
登録します。二つ目の引数は入力の型、三つの引数はレスポンスの型です。
import ibis.expr.datatypes as dt
concat_suffixvalue_udf = pg_connection.udf(concat_suffixvalue, [dt.string()], dt.string(), language="plpython3u", replace=True)
UDF実行
UDFの引数に対応する式を、udfメソッドのレスポンス(concat_suffixvalue_udf)に渡してやるだけです。
concat_suffixvalue_udf(pg_t['memo'])
Out[395]:
0 hogehoge_with_udf
ローカルのPythonではなく、PostgreSQL側で実行されていることは、verboseオプションの結果、またPostgreSQLのログで確認できます。
verboseオプションで表示されるSQL
SELECT concat_suffixvalue(t0.memo) AS tmp
FROM countries AS t0
LIMIT %(param_1)s
PostgreSQLのログ(抜粋)
ibis-db-1 | 2022-12-24 23:19:39.639 UTC [358] LOG: statement: SELECT concat_suffixvalue(t0.memo) AS tmp
ibis-db-1 | FROM countries AS t0
ibis-db-1 | LIMIT 10000
BackendをまたいだJOIN
異なるBackend(SQLiteとPostgreSQLなど)間のデータの組み合わせは出来ないようです。
pg_t.inner_join(t, predicates=pg_t['iso_alpha2']==t['iso_alpha2'])
# (エラーメッセージ一部省略)
File ~/project/ibis/venv2/lib/python3.8/site-packages/ibis/expr/types/core.py:262, in Expr._find_backend(self, use_default)
259 return default
261 if len(backends) > 1:
--> 262 raise ValueError('Multiple backends found')
264 return backends[0]
ValueError: Multiple backends found
Pandas Backendを経由すればやれないこともないです(ただしデータをメモリに載ってしまうはず)。
pip install 'ibis-framework[pandas]'
pd_connection = ibis.pandas.connect({'sqlite_countries': t.execute(), 'postgre_countries': pg_t.execute()})
pd_t = pd_connection.table('sqlite_countries')
pg_pd_t = pd_connection.table('postgre_countries')
pg_pd_t.inner_join(pd_t, predicates=pg_pd_t['iso_alpha2']==pd_t['iso_alpha2'])
iso_alpha3_x iso_alpha2_x iso_alpha2_y iso_alpha3_y iso_numeric fips name capital area_km2 population continent
0 USA US US USA 840 US United States Washington 9629091.0 310232863 NA
1 JPN JP JP JPN 392 JA Japan Tokyo 377835.0 127288000 AS
Fugue
IbisをさらにラップするFugueというのもあるらしいです。力尽きたので、詳しい人は教えてください。
Discussion