📘

Bytewax触ってみた

2022/12/03に公開

気にはなってるけど触ってないビッグデータ系のツール・サービスを触る Advent Calendar 2022の#3です。

tl;dr

  • Pythonで動くストリーミングエンジンだよ
  • Kubernetesの上でも動くよ
  • 正直、エコシステム・機能的にはSpark・Flinkよりもまだ少なそうだよ

Bytewaxとは

Pythonで動く分散・ストリーミングエンジンです。公式ページ曰く、

Bytewax is an open source Python framework for building real-time applications with streaming data.

で、

  • Open Source
  • No Vendor Lock-In
  • No JVM
  • Cloud Native
  • Simple Scaling

を謳っています。2021年に起業した、(プロダクト名と同じ)bytewaxという企業が主に開発を行っているようです。

ストリーミングエンジンとして有名なApache FlinkやApache SparkなどにもPythonのバインディングもあります(e.g PyFlink、PySpark)。ただし、これらのエンジン・バインディングでは、

  • 各ワーカ(Flinkで言うところのTaskManager)ではJVMが動く
  • Pythonで書いた集計(Flinkで言うところのkeyBy)などのDAGは対応するコードに変更して動く
  • UDFの部分ではJVMとPythonインタプリタ間で、データをやりとり(PyFlinkの説明

といった風に動きますが、Bytewaxの場合は完全にJVM無しで動きます。
RustのTimely Dataflowをベースとしているらしいです)

JVMをを使わないメリットが正直ピンと来なかったのですが、Bytewaxの開発者が出演したPodcastでは

Yeah, so I mean, it comes back to those trade-offs. Again, like Flink is a more mature product. And you have SQL you can use like Flink. And there are many different like bindings for languages. And they’ve built out a bunch of features that bytewax doesn’t have. But the trade-off that we are sort of investing in there is it’s a lot more involved to get up and running and started with sling for like a big group of users that maybe don’t have the experience with that whole Java ecosystem, you know, tuning the JVM, et cetera, et cetera. And so our thought is like, we’ll make a trade-off of maybe not having the exact same robustness that Flink has today. But we’ll give an experience where it’s a lot easier to get started and quicker to get started. For this subset of users that are, you know, maybe a newer group of users are there like machine learning engineers, data scientists or newer data? to engineers. So that’s the trade-off that we’ve been playing in.

  • (既存のJVMベースのストリーミングエンジンの例として)Flinkの方が成熟しているし機能もある
  • ただし、Java・JVMエコシステムになれていないユーザの集団もいる(機械学習エンジニア、データサイエンティストなど)
  • そういったユーザのために、入門の用意さを重視した

という流れなようです(作者の人も元々GitHubのデータサイエンティストです)。

機能

入出力

入力の方はKafkaとカスタムのコード、出力の方はKafkaとカスタムのコード、標準出力に対応しています。
https://bytewax.io/docs/getting-started/ins_and_outs

操作

ストリーミングエンジンによくある、map、flat_map、filterあたりは組み込み機能として提供されています。

State

https://bytewax.io/docs/getting-started/recovery

他のストリーミングエンジンにあってBytewaxに無いor劣ってそうな機能

  • WebUI
  • タイマー
  • 組み込みで提供されているのは実質的にKakfaだけなので、FlinkCloud Dataflow(Apache Beam)と比べて、少ないのは少ないです。
    • (Pythonのエコシステムで頑張ってねという感じでしょうか)

ローカルで試してみる

Getting Statedの通り、

  1. bytewaxをpipインストール
  2. サンプルファイル(wordcount.txt)、サンプルプログラム(wordcount.py)を作成
  3. 作成したサンプルプログラムを実行

します。特に詰まった点は無かったです。

 python3 wordcount.py
('end', 1)
("'tis", 1)
('them', 1)
('suffer', 1)
('arrows', 1)
('take', 1)
('arms', 1)
('not', 1)
('outrageous', 1)
('in', 1)
('slings', 1)
('a', 1)
('question', 1)
('the', 3)
('nobler', 1)
('to', 4)
('of', 2)
('by', 1)
('sea', 1)
('and', 2)
('mind', 1)
('is', 1)
('that', 1)
('be', 2)
('or', 2)
('whether', 1)
('troubles', 1)
('against', 1)
('opposing', 1)
('fortune', 1)

Kubernetesで試してみる

  1. Helmチャートをインストール
  2. デプロイなどを行うCLIツール(waxctl)をインストール
  3. waxctlでPythonコードをデプロイ

の手順で行います。

まずは、Helmチャートの追加を行います。

helm repo add bytewax https://bytewax.github.io/helm-charts
helm repo update
helm install my-release bytewax/bytewax

デフォルトでは(多分)bytewax内部のリポジトリからコンテナイメージを取得しにいこうとし、image pullに失敗するので変更しておきます。

helm show values bytewax/bytewax > values.yaml
emacs  values.yaml
# valuesのimage.repositoryを変更(`bytewax.docker.scarf.sh/bytewax/bytewax`から`bytewax/bytewax`)
helm upgrade my-release bytewax/bytewax   -f values.yaml

Kubernetesの上にデプロイを行くために、Bytewaxではwaxctlというツールが用意されていますので、ダウンロード・展開します。

wget https://downloads.bytewax.io/waxctl/0.6.2/waxctl_0.6.2_linux_amd64.tar.gz
tar -xzf waxctl_0.6.2_linux_amd64.tar.gz

Getting StartedのWordCountを少し変えて、Kubernetesの上で分散して動くようにします。
具体的には、

  • Kubernetesクラスタの上で動かす場合の対応として
    • cluster_mainで起動
    • 引数に**parse.proc_env()を渡す
  • 入力をローカルファイルではなく、StringIOから受け取るように
    • コンテナイメージに入力ファイルを入れたり、ConfigMapに入れてマウントしても対応できると思いますが横着しました

の変更を行います。

import re
from bytewax import parse
import textwrap
from io import StringIO

from datetime import timedelta

from bytewax.dataflow import Dataflow
from bytewax.inputs import ManualInputConfig
from bytewax.outputs import StdOutputConfig
from bytewax.execution import cluster_main
from bytewax.window import SystemClockConfig, TumblingWindowConfig


def input_builder(worker_index, worker_count, resume_state):
    state = None  # ignore recovery

    s = StringIO(textwrap.dedent("""
To be, or not to be, that is the question:
Whether 'tis nobler in the mind to suffer
The slings and arrows of outrageous fortune,
Or to take arms against a sea of troubles
And by opposing end them.
"""))

    for i, line in enumerate(s):
        if i % worker_count == worker_index:
            yield state, line


def lower(line):
    return line.lower()


def tokenize(line):
    return re.findall(r'[^\s!,.?":;0-9]+', line)


def initial_count(word):
    return word, 1


def add(count1, count2):
    return count1 + count2


clock_config = SystemClockConfig()
window_config = TumblingWindowConfig(length=timedelta(seconds=5))

flow = Dataflow()
flow.input("input", ManualInputConfig(input_builder))
flow.map(lower)
flow.flat_map(tokenize)
flow.map(initial_count)
flow.reduce_window("sum", clock_config, window_config, add)
flow.capture(StdOutputConfig())


cluster_main(
    flow,
    **parse.proc_env()
)

デプロイします

 ./waxctl df deploy wordcount_k8s.py --name my-dataflow   --processes=3  --workers=1

Podを見てみます。三つPodが起動しています

kubectl get pod
NAME            READY   STATUS    RESTARTS      AGE
my-dataflow-0   1/1     Running   0             2m45s
my-dataflow-1   1/1     Running   0             2m45s
my-dataflow-2   1/1     Running   0             2m43s
my-release-0    1/1     Running   1 (16h ago)   28h

それっぽいログが出ています。

 kubectl get pod --selector='app.kubernetes.io/instance=my-dataflow'  -o jsonpath='{range .items[*]}{.metadata.name}{"\n"}{end}' | xargs -I{} kubectl logs {} | grep "
(" | sort
Defaulted container "process" out of: process, init-hostfile (init)
Defaulted container "process" out of: process, init-hostfile (init)
Defaulted container "process" out of: process, init-hostfile (init)
("'tis", 1)
('a', 1)
('against', 1)
('and', 2)
('arms', 1)
('arrows', 1)
('be', 2)
('by', 1)
('end', 1)
('fortune', 1)
('in', 1)
('is', 1)
('mind', 1)
('nobler', 1)
('not', 1)
('of', 2)
('opposing', 1)
('or', 2)
('outrageous', 1)
('question', 1)
('sea', 1)
('slings', 1)
('suffer', 1)
('take', 1)
('that', 1)
('the', 3)
('them', 1)
('to', 4)
('troubles', 1)
('whether', 1)

Discussion