Bytewax触ってみた
気にはなってるけど触ってないビッグデータ系のツール・サービスを触る Advent Calendar 2022の#3です。
tl;dr
- Pythonで動くストリーミングエンジンだよ
- Kubernetesの上でも動くよ
- 正直、エコシステム・機能的にはSpark・Flinkよりもまだ少なそうだよ
Bytewaxとは
Pythonで動く分散・ストリーミングエンジンです。公式ページ曰く、
Bytewax is an open source Python framework for building real-time applications with streaming data.
で、
- Open Source
- No Vendor Lock-In
- No JVM
- Cloud Native
- Simple Scaling
を謳っています。2021年に起業した、(プロダクト名と同じ)bytewaxという企業が主に開発を行っているようです。
ストリーミングエンジンとして有名なApache FlinkやApache SparkなどにもPythonのバインディングもあります(e.g PyFlink、PySpark)。ただし、これらのエンジン・バインディングでは、
- 各ワーカ(Flinkで言うところのTaskManager)ではJVMが動く
- Pythonで書いた集計(Flinkで言うところのkeyBy)などのDAGは対応するコードに変更して動く
- UDFの部分ではJVMとPythonインタプリタ間で、データをやりとり(PyFlinkの説明)
といった風に動きますが、Bytewaxの場合は完全にJVM無しで動きます。
(RustのTimely Dataflowをベースとしているらしいです)
JVMをを使わないメリットが正直ピンと来なかったのですが、Bytewaxの開発者が出演したPodcastでは
Yeah, so I mean, it comes back to those trade-offs. Again, like Flink is a more mature product. And you have SQL you can use like Flink. And there are many different like bindings for languages. And they’ve built out a bunch of features that bytewax doesn’t have. But the trade-off that we are sort of investing in there is it’s a lot more involved to get up and running and started with sling for like a big group of users that maybe don’t have the experience with that whole Java ecosystem, you know, tuning the JVM, et cetera, et cetera. And so our thought is like, we’ll make a trade-off of maybe not having the exact same robustness that Flink has today. But we’ll give an experience where it’s a lot easier to get started and quicker to get started. For this subset of users that are, you know, maybe a newer group of users are there like machine learning engineers, data scientists or newer data? to engineers. So that’s the trade-off that we’ve been playing in.
- (既存のJVMベースのストリーミングエンジンの例として)Flinkの方が成熟しているし機能もある
- ただし、Java・JVMエコシステムになれていないユーザの集団もいる(機械学習エンジニア、データサイエンティストなど)
- そういったユーザのために、入門の用意さを重視した
という流れなようです(作者の人も元々GitHubのデータサイエンティストです)。
機能
入出力
入力の方はKafkaとカスタムのコード、出力の方はKafkaとカスタムのコード、標準出力に対応しています。
操作
ストリーミングエンジンによくある、map、flat_map、filterあたりは組み込み機能として提供されています。
State
他のストリーミングエンジンにあってBytewaxに無いor劣ってそうな機能
- WebUI
- タイマー
- 一定時間経過で処理を起こす機能。Stateの削除などに使います
- 組み込みで提供されているのは実質的にKakfaだけなので、FlinkやCloud Dataflow(Apache Beam)と比べて、少ないのは少ないです。
- (Pythonのエコシステムで頑張ってねという感じでしょうか)
ローカルで試してみる
Getting Statedの通り、
- bytewaxをpipインストール
- サンプルファイル(wordcount.txt)、サンプルプログラム(wordcount.py)を作成
- 作成したサンプルプログラムを実行
します。特に詰まった点は無かったです。
python3 wordcount.py
('end', 1)
("'tis", 1)
('them', 1)
('suffer', 1)
('arrows', 1)
('take', 1)
('arms', 1)
('not', 1)
('outrageous', 1)
('in', 1)
('slings', 1)
('a', 1)
('question', 1)
('the', 3)
('nobler', 1)
('to', 4)
('of', 2)
('by', 1)
('sea', 1)
('and', 2)
('mind', 1)
('is', 1)
('that', 1)
('be', 2)
('or', 2)
('whether', 1)
('troubles', 1)
('against', 1)
('opposing', 1)
('fortune', 1)
Kubernetesで試してみる
- Helmチャートをインストール
- デプロイなどを行うCLIツール(waxctl)をインストール
- waxctlでPythonコードをデプロイ
の手順で行います。
まずは、Helmチャートの追加を行います。
helm repo add bytewax https://bytewax.github.io/helm-charts
helm repo update
helm install my-release bytewax/bytewax
デフォルトでは(多分)bytewax内部のリポジトリからコンテナイメージを取得しにいこうとし、image pullに失敗するので変更しておきます。
helm show values bytewax/bytewax > values.yaml
emacs values.yaml
# valuesのimage.repositoryを変更(`bytewax.docker.scarf.sh/bytewax/bytewax`から`bytewax/bytewax`)
helm upgrade my-release bytewax/bytewax -f values.yaml
Kubernetesの上にデプロイを行くために、Bytewaxではwaxctlというツールが用意されていますので、ダウンロード・展開します。
wget https://downloads.bytewax.io/waxctl/0.6.2/waxctl_0.6.2_linux_amd64.tar.gz
tar -xzf waxctl_0.6.2_linux_amd64.tar.gz
Getting StartedのWordCountを少し変えて、Kubernetesの上で分散して動くようにします。
具体的には、
-
Kubernetesクラスタの上で動かす場合の対応として
- cluster_mainで起動
- 引数に
**parse.proc_env()
を渡す
- 入力をローカルファイルではなく、StringIOから受け取るように
- コンテナイメージに入力ファイルを入れたり、ConfigMapに入れてマウントしても対応できると思いますが横着しました
の変更を行います。
import re
from bytewax import parse
import textwrap
from io import StringIO
from datetime import timedelta
from bytewax.dataflow import Dataflow
from bytewax.inputs import ManualInputConfig
from bytewax.outputs import StdOutputConfig
from bytewax.execution import cluster_main
from bytewax.window import SystemClockConfig, TumblingWindowConfig
def input_builder(worker_index, worker_count, resume_state):
state = None # ignore recovery
s = StringIO(textwrap.dedent("""
To be, or not to be, that is the question:
Whether 'tis nobler in the mind to suffer
The slings and arrows of outrageous fortune,
Or to take arms against a sea of troubles
And by opposing end them.
"""))
for i, line in enumerate(s):
if i % worker_count == worker_index:
yield state, line
def lower(line):
return line.lower()
def tokenize(line):
return re.findall(r'[^\s!,.?":;0-9]+', line)
def initial_count(word):
return word, 1
def add(count1, count2):
return count1 + count2
clock_config = SystemClockConfig()
window_config = TumblingWindowConfig(length=timedelta(seconds=5))
flow = Dataflow()
flow.input("input", ManualInputConfig(input_builder))
flow.map(lower)
flow.flat_map(tokenize)
flow.map(initial_count)
flow.reduce_window("sum", clock_config, window_config, add)
flow.capture(StdOutputConfig())
cluster_main(
flow,
**parse.proc_env()
)
デプロイします
./waxctl df deploy wordcount_k8s.py --name my-dataflow --processes=3 --workers=1
Podを見てみます。三つPodが起動しています
kubectl get pod
NAME READY STATUS RESTARTS AGE
my-dataflow-0 1/1 Running 0 2m45s
my-dataflow-1 1/1 Running 0 2m45s
my-dataflow-2 1/1 Running 0 2m43s
my-release-0 1/1 Running 1 (16h ago) 28h
それっぽいログが出ています。
kubectl get pod --selector='app.kubernetes.io/instance=my-dataflow' -o jsonpath='{range .items[*]}{.metadata.name}{"\n"}{end}' | xargs -I{} kubectl logs {} | grep "
(" | sort
Defaulted container "process" out of: process, init-hostfile (init)
Defaulted container "process" out of: process, init-hostfile (init)
Defaulted container "process" out of: process, init-hostfile (init)
("'tis", 1)
('a', 1)
('against', 1)
('and', 2)
('arms', 1)
('arrows', 1)
('be', 2)
('by', 1)
('end', 1)
('fortune', 1)
('in', 1)
('is', 1)
('mind', 1)
('nobler', 1)
('not', 1)
('of', 2)
('opposing', 1)
('or', 2)
('outrageous', 1)
('question', 1)
('sea', 1)
('slings', 1)
('suffer', 1)
('take', 1)
('that', 1)
('the', 3)
('them', 1)
('to', 4)
('troubles', 1)
('whether', 1)
Discussion