Closed6

Flowpipeを試す

kun432kun432

https://twitter.com/AWSOpen/status/1785746018308386821

https://aws.amazon.com/jp/blogs/opensource/flowpipe-a-cloud-scripting-engine-for-devops-workflows/

FlowpipeはDevOpsチームのために作られたオープンソースツールだ。パイプライン、ステップ、トリガーを、おなじみのDevOps言語であるHCLで表現し、AWS(およびあなたが普段使っている他のサービス)への問い合わせタスクをオーケストレーションすることができる。modsを使用してパイプラインを構成し、必要に応じてSQL、AWS Lambda互換関数、コンテナを混ぜる。開発、テスト、そして実行はすべてローカルマシンで行うことができる。パイプラインをスケジュールしたり、リアルタイムでイベントに対応する。次のような用途に利用できる:

  • クラウドをオーケストレーションする。シンプルなステップを複雑なワークフローに組み込む。ローカルで実行し、テストする。オープンソースのModsを使用して、クラウド間でソリューションを構成する。
  • 人とツールをつなぐ。メール、チャット、APIを使って、クラウドのデータを人やシステムに接続。ワークフローのステップでは、コンテナやカスタム関数などを実行することもできる。
  • イベントに対応する。ワークフローを手動またはスケジュールで実行する。Webhookやデータの変更からパイプラインをトリガーする。
  • クリックではなくコードを使用する。インフラストラクチャのようにDevOpsワークフローを構築し、デプロイする。HCLでコーディングし、バージョン管理からデプロイする。

HCLなのでめちゃTerraform的に書ける。

kun432kun432

ワークフロー的なやつは最近めちゃ気になってるので、まずは軽くやってみる。
全体的にTerraformライクなところがあるので、Terraformの感覚で推測しながらやってみる。

インストール

https://flowpipe.io/downloads

MacはHomebrewでインストールできる。ドキュメントにはtap使うように書いてあるけど、普通にできた。

$ brew install flowpipe

$ flowpipe -v
Flowpipe v0.4.4

Learn Flowpipe

https://flowpipe.io/docs

Flowpipeの概念的なものは以下あたりをまず抑えておけば良さそう

https://flowpipe.io/docs/build

  • Triggers - パイプラインを初期化する方法。例えばcronとかwebhookとか。
  • Pipelines - 処理を行う一連のステップ
  • Mods - TriggersとPipelinesをパッケージにしたもの。Flowpipe Hubで公開されているものを使うこともできるし、自分でカスタムに作成することもできる

まだピンとこないけど、とりあえず進めていけばわかるだろうということで。

Creating your first pipeline

まず作業ディレクトリを作成

$ mkdir learn_flowpipe
$ cd learn_flowpipe

modを初期化。Terraform initっぽい。

$ flowpipe mod init

mod.fpというファイルが作成される

$ tree -a
.
└── mod.fp

1 directory, 1 file

中身はこんな感じ。ここでmodの定義を行うことができるらしい。

mod.fp
mod "local" {
  title = "learn_flowpipe"
}

ではパイプラインを作成する。flowpipeでは.fpという拡張子を使う。実行時は作業ディレクトリの配下にある.fpファイルを全て四で実行するらしい。以下の内容でlearn.fpというファイルを作成。

learn.fp
pipeline "learn_flowpipe" {
  step "http" "get_ipv4" {
    url = "https://api.ipify.org?format=json"
  }

  output "ip_address" {
    value = step.http.get_ipv4.response_body.ip
  }
}

pipelineは処理の一連のステップとなる。上記の設定は、

  • learn_flowpipeという名前のパイプライン(pipeline)を定義
  • 上記パイプラインの内容として以下を定義
    • HTTPリクエストを送るステップ、http stepを定義
    • 結果を出力するoutputを定義

というものになっている。

ではこのパイプラインを実行する。パイプラインの実行はflowpipe pipeline runでパイプライン名を指定する。

$ flowpipe pipeline run learn_flowpipe

結果

[flowpipe] Execution ID: exec_copiblqrqiq08rc8p9vg
[learn_flowpipe] Starting pipeline
[learn_flowpipe.get_ipv4] Starting http: GET https://api.ipify.org?format=json
[learn_flowpipe.get_ipv4] Complete: 200 223ms
[learn_flowpipe] Output ip_address = XXX.XXX.XXX.35
[learn_flowpipe] Complete 247ms exec_copiblqrqiq08rc8p9vg

なるほど。

Using mods

別のパイプラインを使ってパイプラインを作ることができる。IPアドレスから位置情報を返してくれる無料サービス、reallyfreegeoip.orgのmodを使う。

modのインストールはflowpipe mod installで行う。

$ flowpipe mod install github.com/turbot/flowpipe-mod-reallyfreegeoip
Installed 1 mod:

local
└── github.com/turbot/flowpipe-mod-reallyfreegeoip@v0.1.0

インストールしたmodは作業ディレクトリの.flowpipe/modsに配置される。

$ tree .flowpipe/
.flowpipe/
├── flowpipe.db
└── mods
    └── github.com
        └── turbot
            └── flowpipe-mod-reallyfreegeoip@v0.1.0
                ├── LICENSE
                ├── README.md
                ├── docs
                │   └── images
                │       └── flowpipe_pipeline_run.png
                ├── mod.fp
                └── pipelines
                    └── get_ip_geolocation.fp

8 directories, 6 files

またflowpipe mod listでも確認ができる。

$ flowpipe mod list

local
└── github.com/turbot/flowpipe-mod-reallyfreegeoip@v0.1.0

なるほど、細書に定義したlocalというmodが依存するmodという感じになってるのかな?

またflowpipe pipeline listでパイプラインの確認ができる。

$ flowpipe pipeline list
MOD                NAME                                           DESCRIPTION
local              learn_flowpipe
reallyfreegeoip    reallyfreegeoip.pipeline.get_ip_geolocation    Get geolocation data for an IPv4 or IPv6 address.

インストールしたmodのパイプラインを直接実行することができる。その前にパイプラインの使い方を見てみる。flowpipe pipeline showを使う。

$ flowpipe pipeline show reallyfreegeoip.pipeline.get_ip_geolocation
Name:        reallyfreegeoip.pipeline.get_ip_geolocation
Title:       Get IP Geolocation
Description: Get geolocation data for an IPv4 or IPv6 address.
Tags:
  type: featured
Params:
  format:
    Type:        string
    Description: The format of the output. Accepted values are json, csv and xml. Defaults to json.
    Default:     json
  ip_address [required]:
    Type:        string
    Description: The IPv4 or IPv6 address.
Outputs:
  geolocation:
    Description: IP geolocation details.
    Type:        any
Usage:
  flowpipe pipeline run reallyfreegeoip.pipeline.get_ip_geolocation --arg ip_address=<value>

Usageにも書いてあるけど、ip_addressを引数で渡してあげれば良さそう。

ということで実行。

$ flowpipe pipeline run reallyfreegeoip.pipeline.get_ip_geolocation --arg ip_address=8.8.8.8
[flowpipe] Execution ID: exec_copirq2rqiq0cmrie6pg
[get_ip_geolocation] Starting pipeline
[get_ip_geolocation.get_ip_geolocation] Starting http: GET https://reallyfreegeoip.org/json/8.8.8.8
[get_ip_geolocation.get_ip_geolocation] Complete: 200 660ms
[get_ip_geolocation] Output geolocation = {
  "city": "",
  "country_code": "US",
  "country_name": "United States",
  "ip": "8.8.8.8",
  "latitude": 37.751,
  "longitude": -97.822,
  "metro_code": 0,
  "region_code": "",
  "region_name": "",
  "time_zone": "America/Chicago",
  "zip_code": ""
}
[get_ip_geolocation] Complete 691ms exec_copirq2rqiq0cmrie6pg

Composing with pipelines

ということで、これを自分のパイプラインと組み合わせる。

learn.fp
pipeline "learn_flowpipe" {
  step "http" "get_ipv4" {
    url = "https://api.ipify.org?format=json"
  }

  step "pipeline" "get_geo" {
    pipeline = reallyfreegeoip.pipeline.get_ip_geolocation
    args = {
      ip_address = step.http.get_ipv4.response_body.ip
    }
  }

  output "ip_address" {
    value = step.http.get_ipv4.response_body.ip
  }

  output "latitude" {
    value = step.pipeline.get_geo.output.geolocation.latitude
  }

  output "longitude" {
    value = step.pipeline.get_geo.output.geolocation.longitude
  }
}

流れとしては以下の様な感じ。

  • "get_ipv4"という名前のhttp step で自分のIPv4アドレスを取得
  • 上記の結果を "ip_address"という名前のoutputで取得
  • "get_geo"という名前のpipeline stepで、入力に上記のIPアドレスを指定して位置情報を取得
  • 上記の結果を"latitude"と"longitude"という名前のoutputで出力

では実行してみる。

$ flowpipe pipeline run learn_flowpipe
[flowpipe] Execution ID: exec_copiv3arqiq0d2ublkug
[learn_flowpipe] Starting pipeline
[learn_flowpipe.get_ipv4] Starting http: GET https://api.ipify.org?format=json
[learn_flowpipe.get_ipv4] Complete: 200 626ms
[learn_flowpipe.get_geo] Starting pipeline
[get_ip_geolocation] Starting pipeline
[get_ip_geolocation.get_ip_geolocation] Starting http: GET https://reallyfreegeoip.org/json/XXX.XXX.XXX.35
[get_ip_geolocation.get_ip_geolocation] Complete: 200 87ms
[get_ip_geolocation] Output geolocation = {
  "city": "",
  "country_code": "JP",
  "country_name": "Japan",
  "ip": "XXX.XXX.XXX.35",
  "latitude": 35.69,
  "longitude": 139.69,
  "metro_code": 0,
  "region_code": "",
  "region_name": "",
  "time_zone": "Asia/Tokyo",
  "zip_code": ""
}
[get_ip_geolocation] Complete 110ms
[learn_flowpipe.get_geo] Complete 122ms
[learn_flowpipe] Output ip_address = XXX.XXX.XXX.35
[learn_flowpipe] Output latitude = 35.69
[learn_flowpipe] Output longitude = 139.69
[learn_flowpipe] Complete 796ms exec_copiv3arqiq0d2ublkug

IPアドレス→位置情報→気象情報とつなげてみる。

learn.fp
pipeline "learn_flowpipe" {
  step "http" "get_ipv4" {
    url = "https://api.ipify.org?format=json"
  }

  step "pipeline" "get_geo" {
    pipeline = reallyfreegeoip.pipeline.get_ip_geolocation

    args = {
      ip_address = step.http.get_ipv4.response_body.ip
    }
  }

  step "http" "get_weather" {
    url = join("", [
      "https://api.open-meteo.com/v1/forecast",
      "?latitude=${step.pipeline.get_geo.output.geolocation.latitude}",
      "&longitude=${step.pipeline.get_geo.output.geolocation.longitude}",
      "&current=temperature",
      "&forecast_days=1",
      "&daily=temperature_2m_min,temperature_2m_max,precipitation_probability_mean",
      "&temperature_unit=${step.pipeline.get_geo.output.geolocation.country_code == "US" ? "fahrenheit" : "celsius"}"
    ])
  }

  step "transform" "friendly_forecast" {
    value = join("", [
      "It is currently ",
      step.http.get_weather.response_body.current.temperature,
      step.http.get_weather.response_body.current_units.temperature,
      ", with a high of ",
      step.http.get_weather.response_body.daily.temperature_2m_max[0],
      step.http.get_weather.response_body.daily_units.temperature_2m_max,
      " and a low of ",
      step.http.get_weather.response_body.daily.temperature_2m_min[0],
      step.http.get_weather.response_body.daily_units.temperature_2m_min,
      ".  There is a ",
      step.http.get_weather.response_body.daily.precipitation_probability_mean[0],
      step.http.get_weather.response_body.daily_units.precipitation_probability_mean,
      " chance of precipitation."
    ])
  }

  output "ip_address" {
    value = step.http.get_ipv4.response_body.ip
  }

  output "latitude" {
    value = step.pipeline.get_geo.output.geolocation.latitude
  }

  output "longitude" {
    value = step.pipeline.get_geo.output.geolocation.longitude
  }

  output "forecast" {
    value = step.transform.friendly_forecast.value
  }
}
  • "get_ipv4"という名前のhttp step で自分のIPv4アドレスを取得
  • 上記の結果を "ip_address"という名前のoutputで取得
  • "get_geo"という名前のpipeline stepで、入力に上記のIPアドレスを指定して位置情報を取得
  • 上記の結果から緯度経度を"latitude"と"longitude"という名前のoutputで出力
  • "get_weather"という名前のhttp step で、入力に上記の緯度経度を指定して気象情報を取得
  • 上記の結果を"friendly_forecast"というtransform stepで整形
  • 上記の結果を"forecast"という名前のoutputで取得
[flowpipe] Execution ID: exec_copj0lqrqiq0dch8s870
[learn_flowpipe] Starting pipeline
[learn_flowpipe.get_ipv4] Starting http: GET https://api.ipify.org?format=json
[learn_flowpipe.get_ipv4] Complete: 200 243ms
[learn_flowpipe.get_geo] Starting pipeline
[get_ip_geolocation] Starting pipeline
[get_ip_geolocation.get_ip_geolocation] Starting http: GET https://reallyfreegeoip.org/json/XXX.XXX.XXX.35
[get_ip_geolocation.get_ip_geolocation] Complete: 200 62ms
[get_ip_geolocation] Output geolocation = {
  "city": "",
  "country_code": "JP",
  "country_name": "Japan",
  "ip": "XXX.XXX.XXX.35",
  "latitude": 35.69,
  "longitude": 139.69,
  "metro_code": 0,
  "region_code": "",
  "region_name": "",
  "time_zone": "Asia/Tokyo",
  "zip_code": ""
}
[get_ip_geolocation] Complete 84ms
[learn_flowpipe.get_geo] Complete 97ms
[learn_flowpipe.get_weather] Starting http: GET https://api.open-meteo.com/v1/forecast?latitude=35.69&longitude=139.69&current=temperature&forecast_days=1&daily=temperature_2m_min,temperature_2m_max,precipitation_probability_mean&temperature_unit=celsius
[learn_flowpipe.get_weather] Complete: 200 853ms
[learn_flowpipe.friendly_forecast] Starting transform
[learn_flowpipe.friendly_forecast] Complete 2ms
[learn_flowpipe] Output forecast = It is currently 19°C, with a high of 19.6°C and a low of 9.7°C.  There is a 0% chance of precipitation.
[learn_flowpipe] Output ip_address = XXX.XXX.XXX.35
[learn_flowpipe] Output latitude = 35.69
[learn_flowpipe] Output longitude = 139.69
[learn_flowpipe] Complete 1s exec_copj0lqrqiq0dch8s870

forecastに気象情報が追加されているのがわかる。

Send a message

message stepでこの結果を通知することができる。

learn.fp
pipeline "learn_flowpipe" {
  step "http" "get_ipv4" {
    (snip)
  }

  step "pipeline" "get_geo" {
    (snip)
  }

  step "http" "get_weather" {
    (snip)
  }

  step "transform" "friendly_forecast" {
    (snip)
  }

  # ここに追加
  step "message" "send_forecast" {
    notifier = notifier.default
    subject  = "Todays Forecast"
    text     = step.transform.friendly_forecast.value
  }

  output "ip_address" {
    (snip)

今回のケースでは、実行すると以下のようにコンソールに表示されるだけ・・・

(snip)
[learn_flowpipe.send_forecast] Starting message: It is currently 18.6°C, with a high of 19.7°C an…
[learn_flowpipe.send_forecast] Arg text = It is currently 18.6°C, with a high of 19.7°C and a low of 10.1°C.  There is a 0% chance of precipitation.
[learn_flowpipe.send_forecast] Complete 1ms
(snip)

なんだけども、flowpipeの設定でdefault.notifierに通知先の設定を行っておけば、例えばSlackやMS Teamsなどに通知ができる。このための設定はintegrationsという設定で行うらしい。

これらの設定は~/.flowpipe/config以下に.fpcファイルを置いて行う様子。デフォルトでいくつかサンプルが用意されている。

$ tree -a ~/.flowpipe
/Users/kun432/.flowpipe
├── config
│   ├── flowpipe.fpc.sample
│   └── workspaces.fpc.sample
└── internal
    ├── salt
    └── update_check.json

3 directories, 4 files
~/.flowpipe/config/flowpipe.fpc.sample

#
# For detailed descriptions, see the reference documentation
# at https://flowpipe.io/docs
#

# integration "http" "default" {}

# notifier "default" {
#   notify {
#     integration = integration.http.default
#   }
# }
kun432kun432

flowpipeのパブリックなmodは https://hub.flowpipe.io で公開されている。

https://hub.flowpipe.io/

AWSやGCP等のクラウドはもちろん、ちょっと面白いところではOpenAIってのがあった。

AWSのやつはAWSのAPIを実行するような感じっぽい。aws-cliを使ってシェルスクリプト何かを書いている場合はflowpipeに置き換えれるかもしれない。

https://hub.flowpipe.io/mods/turbot/aws

OpenAIのやつはプロンプトを指定したらレスポンスが返ってくるというものだった。

https://hub.flowpipe.io/mods/turbot/openai

あとはサンプルもいろいろ用意されている。

kun432kun432

あとは気になるところだけサラッと。

https://flowpipe.io/docs/build/write-pipelines/control-flow#control-flow

  • パイプラインのステップはデフォルトで並列で実行される。
  • ただし、入力が別のステップに依存する場合はそれを踏まえて自動で実行順を判断する
  • depends_onみたいに明示的に実行順というか依存関係を指定することもできる。

https://flowpipe.io/docs/build/write-pipelines/conditionals

  • ifを使って条件分岐で実行させることができる
  • ただしelseやelseifはないので、条件の分だけstepをわけるような形になる
  • 三項演算子的な条件表現もある

https://flowpipe.io/docs/build/write-pipelines/iteration

  • foreachやloopでイテレーションできる
  • foreachにはif、loopにはuntilをつけてループから抜けるような処理も可能

https://flowpipe.io/docs/run/server

  • flowpipeの実行には、クライアントモードとサーバモードがある
    • クライアントモードは、CLIでアドホックに実行するイメージ
    • サーバモードは、Webhookを受けたりcronでスケジュール実行したり、という場合に必要になる

https://flowpipe.io/docs/build/mod-variables

  • Terraformと同じようにvariablelocalsで変数定義できる
kun432kun432

書き味がとてもTerraformだし、いろいろ似ているところも多いので、慣れた人だとサラッと使えるのではないだろうか。

ただ、TerraformのDSLか、CDKでコードか、みたいな話は当然flowpipeにもありそうで、DevOps向けワークフローなのでまあDSLのほうがいいっていうことなんだろうな。もっとコード的に書きたい、とかになってくると、MLOps的なワークフローエンジンとかのほうが良いのかもしれないね。

ユースケースに合わせて採用すればいいと思うけど、自分がもし使うなら定期的に何か実行して結果を通知するようなスケジューラー的使い方はすぐにでもできそうと思った。

このスクラップは2024/05/02にクローズされました