👋

Airbyteの構成情報管理ツールOctavia CLIを使ってみた

2023/07/12に公開

株式会社ウェイブでSREグループに所属しています、渡邉です!

社内データ基盤のELTツールとしてAirbyteをセルフホストで利用しています。
最近新しいバージョンに上げたら構成情報を管理するツールOctavia CLIがリリースされていたので使ってみました。

Airbyteとは

https://github.com/airbytehq/airbyte
https://airbyte.com/

AirbyteはOSSで公開されているELTツールで、環境構築が簡単で使いやすいです。
今はセルフホストで運用していますが、運用が大変になってきたらクラウド版に移行できるというのも魅力的なポイントです。

Octavia CLIとは

https://github.com/airbytehq/airbyte/tree/master/octavia-cli
https://airbyte.com/tutorials/version-control-airbyte-configurations

Octavia CLIは、YAMLでAirbyteの構成情報を管理できるツールになります。
以前はWeb上から構成情報を入出力できる機能があったのですが、2023/07/12現在のAirbyte 0.50.6ではなくなっておりOctavia CLIで管理する方式に移行したようです。

実行環境

  • macOS Ventura 13.4.1
  • Docker 24.0.2
  • Airbyte 0.50.6
  • Octavia CLI 0.40.32

手順

保存先ディレクトリ作成

最初は構成情報ファイルを保存するディレクトリを作ります。
Dockerイメージが提供されているので、Dockerを使った方法でやっていきます。
簡単に利用できるように、octavia-cli操作用のスクリプトを作成します。

octavia.sh
#!/bin/sh

docker run --name octavia-cli -i --rm \
  -v ./airbyte-configuration:/home/octavia-project \
  --network host \
  --env-file .octavia \
  airbyte/octavia-cli:0.40.32  $@

次に、CLIに渡すパラメータを保存するための環境変数ファイルを作成します。

.octavia
AIRBYTE_URL=https://airbyte.example.com
AIRBYTE_USERNAME=airbyte
AIRBYTE_PASSWORD=xxxxxxxxx

次に構成情報ファイルを保存するディレクトリを作成し、初期化します。

$ mkdir airbyte-configuration
$ ./octavia.sh init

🐙 - Octavia is targetting your Airbyte instance running at https://airbyte.example.com on workspace 226299ee-e2f3-46f6-a6ba-9f8a6830288e.
🐙 - Project is not yet initialized.
🔨 - Initializing the project.
✅ - Created the following directories: sources, destinations, connections.
✅ - Created API HTTP headers file in api_http_headers.yaml

$ tree airbyte-configuration 
airbyte-configuration
├── api_http_headers.yaml
├── connections
├── destinations
└── sources

これで、保存先のディレクトリが完成しました。

インポート

次は、構成情報を保存先のディレクトリにインポートしていきます。

$ ./octavia.sh import all

 🐙 - Octavia is targetting your Airbyte instance running at https://airbyte.example.com on workspace 226299ee-e2f3-46f6-a6ba-9f8a6830288e.
✅ - Imported source source_ga4 in sources/source_ga4/configuration.yaml. State stored in sources/source_ga4/state_226299ee-e2f3-46f6-a6ba-9f8a6830288e.yaml
⚠️  - Please update any secrets stored in sources/source_ga4/configuration.yaml
✅ - Imported source source_gitlab in sources/source_gitlab/configuration.yaml. State stored in sources/source_gitlab/state_226299ee-e2f3-46f6-a6ba-9f8a6830288e.yaml
⚠️  - Please update any secrets stored in sources/source_gitlab/configuration.yaml
...

コマンドを実行して、既存の構成情報をすべてコードとしてインポートします。
すると、以下のようなファイルが出力されます。

$ cat airbyte-configuration/sources/source_ga4/configuration.yaml

resource_name: source_ga4
definition_type: source
definition_id: 3cc2eafd-84aa-4dca-93af-322d9dfeec1a
definition_image: airbyte/source-google-analytics-data-api
definition_version: 1.1.2
configuration:
  credentials:
    auth_type: Service
    credentials_json: '**********'
  property_id: '123456789'
  window_in_days: 1
  date_ranges_start_date: '2023-06-01'

機密情報がマスクされた状態で構成情報をインポートすることができました。

反映

インポートした構成情報をもとに再度Airbyteサーバーに反映してみます。
機密情報がマスクされているので、反映時に入力されるように変更します。
環境変数を渡せるので、構成情報ファイルと環境変数ファイルにそれぞれ設定していきます。

airbyte-configuration/sources/source_ga4/configuration.yaml
resource_name: source_ga4
definition_type: source
definition_id: 3cc2eafd-84aa-4dca-93af-322d9dfeec1a
definition_image: airbyte/source-google-analytics-data-api
definition_version: 1.1.2
configuration:
  credentials:
    auth_type: Service
    credentials_json: ${GA4_CREDENTIALS_JSON}
  property_id: '123456789'
  window_in_days: 1
  date_ranges_start_date: '2023-06-01'

.octavia
# 追記
GA4_CREDENTIALS_JSON={"type": "service_account","project_id":...}

反映するコマンドを実行します

$  ./octavia.sh apply -f sources/source_ga4/configuration.yaml
🐙 - Octavia is targetting your Airbyte instance running at https://airbyte.example.com on workspace 226299ee-e2f3-46f6-a6ba-9f8a6830288e.
🐙 - source_ga4 exists on your Airbyte instance according to your state file, let's check if we need to update it!
🟡 - Running update because a local file change was detected and a secret field might have been edited.
🎉 - Successfully updated source_ga4 on your Airbyte instance!
💾 - New state for source_ga4 stored at sources/source_ga4/state_226299ee-e2f3-46f6-a6ba-9f8a6830288e.yaml.

これで反映することができました。

注意点

構成情報のインポート時に、うまく取り込めない接続設定がありました。

$ ./octavia.sh import connection 6b185d84-a2d9-4125-84ba-fa848cef38c8
🐙 - Octavia is targetting your Airbyte instance running at https://airbyte.example.com on workspace 226299ee-e2f3-46f6-a6ba-9f8a6830288e.
Traceback (most recent call last):
  File "/usr/local/bin/octavia", line 8, in <module>
    sys.exit(octavia())
  File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1128, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1053, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1659, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1659, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.9/site-packages/octavia_cli/base_commands.py", line 54, in invoke
    raise e
  File "/usr/local/lib/python3.9/site-packages/octavia_cli/base_commands.py", line 51, in invoke
    result = super().invoke(ctx)
  File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1395, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.9/site-packages/click/core.py", line 754, in invoke
    return __callback(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/click/decorators.py", line 26, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/octavia_cli/check_context.py", line 91, in wrapper
    f(ctx, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/octavia_cli/_import/commands.py", line 156, in connection
    click.echo(import_connection(ctx.obj["API_CLIENT"], ctx.obj["WORKSPACE_ID"], resource))
  File "/usr/local/lib/python3.9/site-packages/octavia_cli/_import/commands.py", line 121, in import_connection
    managed_resource, state = resources.factory(api_client, workspace_id, new_configuration_path).manage(connection_id)
  File "/usr/local/lib/python3.9/site-packages/octavia_cli/apply/resources.py", line 868, in factory
    return Connection(api_client, workspace_id, raw_configuration, configuration_path)
  File "/usr/local/lib/python3.9/site-packages/octavia_cli/apply/resources.py", line 279, in __init__
    self.configuration = self._deserialize_raw_configuration()
  File "/usr/local/lib/python3.9/site-packages/octavia_cli/apply/resources.py", line 631, in _deserialize_raw_configuration
    configuration["sync_catalog"] = self._create_configured_catalog(configuration["sync_catalog"])
  File "/usr/local/lib/python3.9/site-packages/octavia_cli/apply/resources.py", line 748, in _create_configured_catalog
    stream=AirbyteStream(**stream["stream"]), config=AirbyteStreamConfiguration(**stream["config"])
  File "/usr/local/lib/python3.9/site-packages/airbyte_api_client/model_utils.py", line 46, in wrapped_init
    return fn(_self, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airbyte_api_client/model/airbyte_stream_configuration.py", line 296, in __init__
    setattr(self, var_name, var_value)
  File "/usr/local/lib/python3.9/site-packages/airbyte_api_client/model_utils.py", line 185, in __setattr__
    self[attr] = value
  File "/usr/local/lib/python3.9/site-packages/airbyte_api_client/model_utils.py", line 510, in __setitem__
    self.set_attribute(name, value)
  File "/usr/local/lib/python3.9/site-packages/airbyte_api_client/model_utils.py", line 157, in set_attribute
    value = validate_and_convert_types(
  File "/usr/local/lib/python3.9/site-packages/airbyte_api_client/model_utils.py", line 1620, in validate_and_convert_types
    input_value[index] = validate_and_convert_types(
  File "/usr/local/lib/python3.9/site-packages/airbyte_api_client/model_utils.py", line 1582, in validate_and_convert_types
    raise get_type_error(input_value, path_to_item, valid_classes,
airbyte_api_client.exceptions.ApiTypeError: Invalid type for variable '0'. Required value type is SelectedFieldInfo and passed type was dict at ['selected_fields'][0]

解決できなかったので、対象の構成情報はスキップしてインポートは断念いたしました...
多分このあたりが関連してるのかなと思います。

https://github.com/airbytehq/airbyte/issues/19768#issuecomment-1477669273
https://github.com/airbytehq/airbyte/issues/24896

最後に

Octavia CLIの導入は簡単でしたし、構成情報をコード管理できるようになって、検証環境の構築などが容易になってよりAirbyteが扱いやすくなりました。
今後もOctavia CLIを使っていきたいと思います!

wwwave's Techblog

Discussion