Flowpipeを試す
FlowpipeはDevOpsチームのために作られたオープンソースツールだ。パイプライン、ステップ、トリガーを、おなじみのDevOps言語であるHCLで表現し、AWS(およびあなたが普段使っている他のサービス)への問い合わせタスクをオーケストレーションすることができる。modsを使用してパイプラインを構成し、必要に応じてSQL、AWS Lambda互換関数、コンテナを混ぜる。開発、テスト、そして実行はすべてローカルマシンで行うことができる。パイプラインをスケジュールしたり、リアルタイムでイベントに対応する。次のような用途に利用できる:
- クラウドをオーケストレーションする。シンプルなステップを複雑なワークフローに組み込む。ローカルで実行し、テストする。オープンソースのModsを使用して、クラウド間でソリューションを構成する。
- 人とツールをつなぐ。メール、チャット、APIを使って、クラウドのデータを人やシステムに接続。ワークフローのステップでは、コンテナやカスタム関数などを実行することもできる。
- イベントに対応する。ワークフローを手動またはスケジュールで実行する。Webhookやデータの変更からパイプラインをトリガーする。
- クリックではなくコードを使用する。インフラストラクチャのようにDevOpsワークフローを構築し、デプロイする。HCLでコーディングし、バージョン管理からデプロイする。
HCLなのでめちゃTerraform的に書ける。
AWS謹製かと思ったけど、こちらが公式。OSSなのね。
ワークフロー的なやつは最近めちゃ気になってるので、まずは軽くやってみる。
全体的にTerraformライクなところがあるので、Terraformの感覚で推測しながらやってみる。
インストール
MacはHomebrewでインストールできる。ドキュメントにはtap使うように書いてあるけど、普通にできた。
$ brew install flowpipe
$ flowpipe -v
Flowpipe v0.4.4
Learn Flowpipe
Flowpipeの概念的なものは以下あたりをまず抑えておけば良さそう
- Triggers - パイプラインを初期化する方法。例えばcronとかwebhookとか。
- Pipelines - 処理を行う一連のステップ
- Mods - TriggersとPipelinesをパッケージにしたもの。Flowpipe Hubで公開されているものを使うこともできるし、自分でカスタムに作成することもできる
まだピンとこないけど、とりあえず進めていけばわかるだろうということで。
Creating your first pipeline
まず作業ディレクトリを作成
$ mkdir learn_flowpipe
$ cd learn_flowpipe
modを初期化。Terraform initっぽい。
$ flowpipe mod init
mod.fp
というファイルが作成される
$ tree -a
.
└── mod.fp
1 directory, 1 file
中身はこんな感じ。ここでmodの定義を行うことができるらしい。
mod "local" {
title = "learn_flowpipe"
}
ではパイプラインを作成する。flowpipeでは.fp
という拡張子を使う。実行時は作業ディレクトリの配下にある.fp
ファイルを全て四で実行するらしい。以下の内容でlearn.fp
というファイルを作成。
pipeline "learn_flowpipe" {
step "http" "get_ipv4" {
url = "https://api.ipify.org?format=json"
}
output "ip_address" {
value = step.http.get_ipv4.response_body.ip
}
}
pipeline
は処理の一連のステップとなる。上記の設定は、
-
learn_flowpipe
という名前のパイプライン(pipeline
)を定義 - 上記パイプラインの内容として以下を定義
- HTTPリクエストを送るステップ、
http step
を定義 - 結果を出力する
output
を定義
- HTTPリクエストを送るステップ、
というものになっている。
ではこのパイプラインを実行する。パイプラインの実行はflowpipe pipeline run
でパイプライン名を指定する。
$ flowpipe pipeline run learn_flowpipe
結果
[flowpipe] Execution ID: exec_copiblqrqiq08rc8p9vg
[learn_flowpipe] Starting pipeline
[learn_flowpipe.get_ipv4] Starting http: GET https://api.ipify.org?format=json
[learn_flowpipe.get_ipv4] Complete: 200 223ms
[learn_flowpipe] Output ip_address = XXX.XXX.XXX.35
[learn_flowpipe] Complete 247ms exec_copiblqrqiq08rc8p9vg
なるほど。
Using mods
別のパイプラインを使ってパイプラインを作ることができる。IPアドレスから位置情報を返してくれる無料サービス、reallyfreegeoip.orgのmodを使う。
modのインストールはflowpipe mod install
で行う。
$ flowpipe mod install github.com/turbot/flowpipe-mod-reallyfreegeoip
Installed 1 mod:
local
└── github.com/turbot/flowpipe-mod-reallyfreegeoip@v0.1.0
インストールしたmodは作業ディレクトリの.flowpipe/mods
に配置される。
$ tree .flowpipe/
.flowpipe/
├── flowpipe.db
└── mods
└── github.com
└── turbot
└── flowpipe-mod-reallyfreegeoip@v0.1.0
├── LICENSE
├── README.md
├── docs
│ └── images
│ └── flowpipe_pipeline_run.png
├── mod.fp
└── pipelines
└── get_ip_geolocation.fp
8 directories, 6 files
またflowpipe mod list
でも確認ができる。
$ flowpipe mod list
local
└── github.com/turbot/flowpipe-mod-reallyfreegeoip@v0.1.0
なるほど、細書に定義したlocal
というmodが依存するmodという感じになってるのかな?
またflowpipe pipeline list
でパイプラインの確認ができる。
$ flowpipe pipeline list
MOD NAME DESCRIPTION
local learn_flowpipe
reallyfreegeoip reallyfreegeoip.pipeline.get_ip_geolocation Get geolocation data for an IPv4 or IPv6 address.
インストールしたmodのパイプラインを直接実行することができる。その前にパイプラインの使い方を見てみる。flowpipe pipeline show
を使う。
$ flowpipe pipeline show reallyfreegeoip.pipeline.get_ip_geolocation
Name: reallyfreegeoip.pipeline.get_ip_geolocation
Title: Get IP Geolocation
Description: Get geolocation data for an IPv4 or IPv6 address.
Tags:
type: featured
Params:
format:
Type: string
Description: The format of the output. Accepted values are json, csv and xml. Defaults to json.
Default: json
ip_address [required]:
Type: string
Description: The IPv4 or IPv6 address.
Outputs:
geolocation:
Description: IP geolocation details.
Type: any
Usage:
flowpipe pipeline run reallyfreegeoip.pipeline.get_ip_geolocation --arg ip_address=<value>
Usageにも書いてあるけど、ip_address
を引数で渡してあげれば良さそう。
ということで実行。
$ flowpipe pipeline run reallyfreegeoip.pipeline.get_ip_geolocation --arg ip_address=8.8.8.8
[flowpipe] Execution ID: exec_copirq2rqiq0cmrie6pg
[get_ip_geolocation] Starting pipeline
[get_ip_geolocation.get_ip_geolocation] Starting http: GET https://reallyfreegeoip.org/json/8.8.8.8
[get_ip_geolocation.get_ip_geolocation] Complete: 200 660ms
[get_ip_geolocation] Output geolocation = {
"city": "",
"country_code": "US",
"country_name": "United States",
"ip": "8.8.8.8",
"latitude": 37.751,
"longitude": -97.822,
"metro_code": 0,
"region_code": "",
"region_name": "",
"time_zone": "America/Chicago",
"zip_code": ""
}
[get_ip_geolocation] Complete 691ms exec_copirq2rqiq0cmrie6pg
Composing with pipelines
ということで、これを自分のパイプラインと組み合わせる。
pipeline "learn_flowpipe" {
step "http" "get_ipv4" {
url = "https://api.ipify.org?format=json"
}
step "pipeline" "get_geo" {
pipeline = reallyfreegeoip.pipeline.get_ip_geolocation
args = {
ip_address = step.http.get_ipv4.response_body.ip
}
}
output "ip_address" {
value = step.http.get_ipv4.response_body.ip
}
output "latitude" {
value = step.pipeline.get_geo.output.geolocation.latitude
}
output "longitude" {
value = step.pipeline.get_geo.output.geolocation.longitude
}
}
流れとしては以下の様な感じ。
- "get_ipv4"という名前の
http step
で自分のIPv4アドレスを取得 - 上記の結果を "ip_address"という名前の
output
で取得 - "get_geo"という名前の
pipeline step
で、入力に上記のIPアドレスを指定して位置情報を取得 - 上記の結果を"latitude"と"longitude"という名前の
output
で出力
では実行してみる。
$ flowpipe pipeline run learn_flowpipe
[flowpipe] Execution ID: exec_copiv3arqiq0d2ublkug
[learn_flowpipe] Starting pipeline
[learn_flowpipe.get_ipv4] Starting http: GET https://api.ipify.org?format=json
[learn_flowpipe.get_ipv4] Complete: 200 626ms
[learn_flowpipe.get_geo] Starting pipeline
[get_ip_geolocation] Starting pipeline
[get_ip_geolocation.get_ip_geolocation] Starting http: GET https://reallyfreegeoip.org/json/XXX.XXX.XXX.35
[get_ip_geolocation.get_ip_geolocation] Complete: 200 87ms
[get_ip_geolocation] Output geolocation = {
"city": "",
"country_code": "JP",
"country_name": "Japan",
"ip": "XXX.XXX.XXX.35",
"latitude": 35.69,
"longitude": 139.69,
"metro_code": 0,
"region_code": "",
"region_name": "",
"time_zone": "Asia/Tokyo",
"zip_code": ""
}
[get_ip_geolocation] Complete 110ms
[learn_flowpipe.get_geo] Complete 122ms
[learn_flowpipe] Output ip_address = XXX.XXX.XXX.35
[learn_flowpipe] Output latitude = 35.69
[learn_flowpipe] Output longitude = 139.69
[learn_flowpipe] Complete 796ms exec_copiv3arqiq0d2ublkug
IPアドレス→位置情報→気象情報とつなげてみる。
pipeline "learn_flowpipe" {
step "http" "get_ipv4" {
url = "https://api.ipify.org?format=json"
}
step "pipeline" "get_geo" {
pipeline = reallyfreegeoip.pipeline.get_ip_geolocation
args = {
ip_address = step.http.get_ipv4.response_body.ip
}
}
step "http" "get_weather" {
url = join("", [
"https://api.open-meteo.com/v1/forecast",
"?latitude=${step.pipeline.get_geo.output.geolocation.latitude}",
"&longitude=${step.pipeline.get_geo.output.geolocation.longitude}",
"¤t=temperature",
"&forecast_days=1",
"&daily=temperature_2m_min,temperature_2m_max,precipitation_probability_mean",
"&temperature_unit=${step.pipeline.get_geo.output.geolocation.country_code == "US" ? "fahrenheit" : "celsius"}"
])
}
step "transform" "friendly_forecast" {
value = join("", [
"It is currently ",
step.http.get_weather.response_body.current.temperature,
step.http.get_weather.response_body.current_units.temperature,
", with a high of ",
step.http.get_weather.response_body.daily.temperature_2m_max[0],
step.http.get_weather.response_body.daily_units.temperature_2m_max,
" and a low of ",
step.http.get_weather.response_body.daily.temperature_2m_min[0],
step.http.get_weather.response_body.daily_units.temperature_2m_min,
". There is a ",
step.http.get_weather.response_body.daily.precipitation_probability_mean[0],
step.http.get_weather.response_body.daily_units.precipitation_probability_mean,
" chance of precipitation."
])
}
output "ip_address" {
value = step.http.get_ipv4.response_body.ip
}
output "latitude" {
value = step.pipeline.get_geo.output.geolocation.latitude
}
output "longitude" {
value = step.pipeline.get_geo.output.geolocation.longitude
}
output "forecast" {
value = step.transform.friendly_forecast.value
}
}
- "get_ipv4"という名前の
http step
で自分のIPv4アドレスを取得 - 上記の結果を "ip_address"という名前の
output
で取得 - "get_geo"という名前の
pipeline step
で、入力に上記のIPアドレスを指定して位置情報を取得 - 上記の結果から緯度経度を"latitude"と"longitude"という名前の
output
で出力 - "get_weather"という名前の
http step
で、入力に上記の緯度経度を指定して気象情報を取得 - 上記の結果を"friendly_forecast"という
transform step
で整形 - 上記の結果を"forecast"という名前の
output
で取得
[flowpipe] Execution ID: exec_copj0lqrqiq0dch8s870
[learn_flowpipe] Starting pipeline
[learn_flowpipe.get_ipv4] Starting http: GET https://api.ipify.org?format=json
[learn_flowpipe.get_ipv4] Complete: 200 243ms
[learn_flowpipe.get_geo] Starting pipeline
[get_ip_geolocation] Starting pipeline
[get_ip_geolocation.get_ip_geolocation] Starting http: GET https://reallyfreegeoip.org/json/XXX.XXX.XXX.35
[get_ip_geolocation.get_ip_geolocation] Complete: 200 62ms
[get_ip_geolocation] Output geolocation = {
"city": "",
"country_code": "JP",
"country_name": "Japan",
"ip": "XXX.XXX.XXX.35",
"latitude": 35.69,
"longitude": 139.69,
"metro_code": 0,
"region_code": "",
"region_name": "",
"time_zone": "Asia/Tokyo",
"zip_code": ""
}
[get_ip_geolocation] Complete 84ms
[learn_flowpipe.get_geo] Complete 97ms
[learn_flowpipe.get_weather] Starting http: GET https://api.open-meteo.com/v1/forecast?latitude=35.69&longitude=139.69¤t=temperature&forecast_days=1&daily=temperature_2m_min,temperature_2m_max,precipitation_probability_mean&temperature_unit=celsius
[learn_flowpipe.get_weather] Complete: 200 853ms
[learn_flowpipe.friendly_forecast] Starting transform
[learn_flowpipe.friendly_forecast] Complete 2ms
[learn_flowpipe] Output forecast = It is currently 19°C, with a high of 19.6°C and a low of 9.7°C. There is a 0% chance of precipitation.
[learn_flowpipe] Output ip_address = XXX.XXX.XXX.35
[learn_flowpipe] Output latitude = 35.69
[learn_flowpipe] Output longitude = 139.69
[learn_flowpipe] Complete 1s exec_copj0lqrqiq0dch8s870
forecast
に気象情報が追加されているのがわかる。
Send a message
message step
でこの結果を通知することができる。
pipeline "learn_flowpipe" {
step "http" "get_ipv4" {
(snip)
}
step "pipeline" "get_geo" {
(snip)
}
step "http" "get_weather" {
(snip)
}
step "transform" "friendly_forecast" {
(snip)
}
# ここに追加
step "message" "send_forecast" {
notifier = notifier.default
subject = "Todays Forecast"
text = step.transform.friendly_forecast.value
}
output "ip_address" {
(snip)
今回のケースでは、実行すると以下のようにコンソールに表示されるだけ・・・
(snip)
[learn_flowpipe.send_forecast] Starting message: It is currently 18.6°C, with a high of 19.7°C an…
[learn_flowpipe.send_forecast] Arg text = It is currently 18.6°C, with a high of 19.7°C and a low of 10.1°C. There is a 0% chance of precipitation.
[learn_flowpipe.send_forecast] Complete 1ms
(snip)
なんだけども、flowpipeの設定でdefault.notifier
に通知先の設定を行っておけば、例えばSlackやMS Teamsなどに通知ができる。このための設定はintegrations
という設定で行うらしい。
これらの設定は~/.flowpipe/config
以下に.fpc
ファイルを置いて行う様子。デフォルトでいくつかサンプルが用意されている。
$ tree -a ~/.flowpipe
/Users/kun432/.flowpipe
├── config
│ ├── flowpipe.fpc.sample
│ └── workspaces.fpc.sample
└── internal
├── salt
└── update_check.json
3 directories, 4 files
#
# For detailed descriptions, see the reference documentation
# at https://flowpipe.io/docs
#
# integration "http" "default" {}
# notifier "default" {
# notify {
# integration = integration.http.default
# }
# }
flowpipeのパブリックなmodは https://hub.flowpipe.io で公開されている。
AWSやGCP等のクラウドはもちろん、ちょっと面白いところではOpenAIってのがあった。
AWSのやつはAWSのAPIを実行するような感じっぽい。aws-cliを使ってシェルスクリプト何かを書いている場合はflowpipeに置き換えれるかもしれない。
OpenAIのやつはプロンプトを指定したらレスポンスが返ってくるというものだった。
あとはサンプルもいろいろ用意されている。
あとは気になるところだけサラッと。
- パイプラインのステップはデフォルトで並列で実行される。
- ただし、入力が別のステップに依存する場合はそれを踏まえて自動で実行順を判断する
-
depends_on
みたいに明示的に実行順というか依存関係を指定することもできる。
- ifを使って条件分岐で実行させることができる
- ただしelseやelseifはないので、条件の分だけstepをわけるような形になる
- 三項演算子的な条件表現もある
- foreachやloopでイテレーションできる
- foreachにはif、loopにはuntilをつけてループから抜けるような処理も可能
- flowpipeの実行には、クライアントモードとサーバモードがある
- クライアントモードは、CLIでアドホックに実行するイメージ
- サーバモードは、Webhookを受けたりcronでスケジュール実行したり、という場合に必要になる
- Terraformと同じように
variable
やlocals
で変数定義できる
書き味がとてもTerraformだし、いろいろ似ているところも多いので、慣れた人だとサラッと使えるのではないだろうか。
ただ、TerraformのDSLか、CDKでコードか、みたいな話は当然flowpipeにもありそうで、DevOps向けワークフローなのでまあDSLのほうがいいっていうことなんだろうな。もっとコード的に書きたい、とかになってくると、MLOps的なワークフローエンジンとかのほうが良いのかもしれないね。
ユースケースに合わせて採用すればいいと思うけど、自分がもし使うなら定期的に何か実行して結果を通知するようなスケジューラー的使い方はすぐにでもできそうと思った。