🦄

Amazon AppFlowのcustom connectorを理解する

2023/10/16に公開

Amazon AppFlowのcustom connectorについて、簡単な実装例を紹介して全体感を理解できるようにします。

ETLツールの選択とAmazon AppFlow

ETLツールの導入判断はデータパイプラインの運用に大きな影響を与えます。

現在自分が思いつく主なETLツールの選択肢として以下が挙げられます。

全てSaaSになりますが、AirbyteはOSSのセルフホスティングも可能です。これらのSaaSは、ある程度データソース・同期先が多岐に渡る場合にコストメリットが出てくるので、ある特定のサービスのAPIからデータベースにデータを同期したいだけで先を考えずにいきなり導入してしまうと利用コストや契約周りの手間の方が負担になってしまいます。

Amazon AppFlowについては既にAWSを利用しているのであればAWSの他のサービスと同じように使い始めることができます。また、Serverlessのためユーザが管理する必要があるのはデータ同期の設定だけです。そのため、とりあえず導入して試しに運用してみるということがやりやすいように思います。

各サービスに対応するためのコンポーネント connector

ETLツールの特徴としてconnectorがあります。多種多様なデータソースやデータ同期先に対応するため、サービスやデータベースごとにプラグイン形式のconnectorという形でデータ処理機能を提供しています。

FivetranやAirbyte、AppFlowなどもこのconnector形式で個々のデータ処理機能が開発されています。そして、FivetranやAirbyteは公式のconnectorだけでなくユーザーが独自にconnectorを開発することもできます。

そしてAppFlowの場合もFivetranなどと同様に足りていないconnectorがあれば、独自に開発することができます。これをcustom connectorといいます。公式でconnectorが提供されていない場合はもちろんのこと、既存のconnectorが最新のAPI仕様に未対応の場合など自前で開発することですぐに対応できます。

custom connectorの開発に関する情報が少ない

現状custom connectorを開発する場合、SDKのリポジトリに含まれるexample connectorを参考にするくらいしか情報がありません。そのexample connectorはSalesforceからデータを抽出するもので、個人で試すには少し敷居が高いです。とりあえず開発の流れや工数感を把握したい場合もっと簡単に試せる例が欲しいところです。そこで、誰でも気軽に開発して動作確認ができるcustom connectorを例示したいと思います。

シンプルなcustom connectorを作って感覚を掴む

最初はやはりhello worldですね。データソースには一切アクセスせず、実行時パラメータに名前を渡して実行したら hello <名前> というレコードを1件生成するsource connectorを作ってみます。本来source connectorはデータソースからデータを抽出するのが役割なので何の意味もないconnectorになりますが、最速でconnector開発の全体像を把握するには良い実装例かと思います。また、destination connectorを開発する際のモックとしても活用できるかと思います。

hello world connectorの説明

コードはこちらにあります。

custom connectorはLambdaの関数として実装する必要があります。また、SDKに含まれているスクリプトを利用してデプロイするのが簡単です。そのスクリプトではCloudFormationを使ってデプロイをしています。

今回作成するcustom connectorで必要なファイル構成は以下になります。これが恐らく最少のファイル構成かと思います。

├── __init__.py
├── handlers
│   ├── __init__.py
│   ├── configuration.py
│   ├── lambda_handler.py
│   ├── metadata.py
│   └── record.py
└── template.yml

handlers ディレクトリ配下にconnectorの実装をLambda関数として定義します。 template.yml はCloudFormationでconnectorをLambdaにデプロイするための設定ファイルです。

Lambda関数のエントリーポイントの実装は lambda_handler.py に実装しています。

lambda_handler.py
class HelloLambdaHandler(BaseLambdaConnectorHandler):
    def __init__(self):
        super().__init__(
            HelloMetadataHandler(),
            HelloRecordHandler(),
            HelloConfigurationHandler(),
        )


def hello_lambda_handler(event, context):
    return HelloLambdaHandler().lambda_handler(event, context)

flow実行時のデータ処理については HelloRecordHandler クラスで実装します。その他、connectorの設定定義は HelloConfigurationHandler クラス、 このconnectorで扱うデータ(AppFlowではentityという)とそのデータスキーマの定義は HelloMetadataHandler クラスで実装します。これらの実装について説明していきます。

record.py
class HelloRecordHandler(RecordHandler):
    def query_data(
        self, request: requests.QueryDataRequest
    ) -> responses.QueryDataResponse:
        name = request.connector_context.connector_runtime_settings["name"]

        return responses.QueryDataResponse(
            is_success=True,
            records=[json.dumps({"message": f"hello {name}"})],
        )

    def retrieve_data(
        self, request: requests.RetrieveDataRequest
    ) -> responses.RetrieveDataResponse:
        return responses.RetrieveDataResponse(is_success=True, records=[])

    def write_data(
        self, request: requests.WriteDataRequest
    ) -> responses.WriteDataResponse:
        return responses.WriteDataResponse(is_success=True, write_record_results=[])

データソースからデータを抽出してdestination connectorに渡す処理は、抽象クラス RecordHandler を継承したクラスの query_data メソッドに定義します。destination connectorに渡したいデータは、1レコードを1つのJSON文字列にしてそれらをPythonのlistにまとめて返します。レコードを表すJSONオブジェクトは、 {"フィールド名1": "データ1", "フィールド名2": "データ2", ...} という形式にします。データ抽出の際に事前に設定した実行時パラメータを参照したい場合は、 request.connector_context.connector_runtime_settings["設定項目"] で設定値を取得可能です。この例では設定値 name を取得しています。

また、このクラスで定義している write_data メソッドはdestination connectorが宛先にデータを書き出すときに呼ばれるものなのでsource専用のconnectorでは実装不要です。

configuration.py
class HelloConfigurationHandler(ConfigurationHandler):
    def validate_connector_runtime_settings(
        self, request: requests.ValidateConnectorRuntimeSettingsRequest
    ) -> responses.ValidateConnectorRuntimeSettingsResponse:
        return responses.ValidateConnectorRuntimeSettingsResponse(
            is_success=True
        )

    def validate_credentials(
        self, request: requests.ValidateCredentialsRequest
    ) -> responses.ValidateCredentialsResponse:
        return responses.ValidateCredentialsResponse(is_success=True)

    def describe_connector_configuration(
        self, request: requests.DescribeConnectorConfigurationRequest
    ) -> responses.DescribeConnectorConfigurationResponse:
        name_settings = settings.ConnectorRuntimeSetting(
            key="name",
            data_type=settings.ConnectorRuntimeSettingDataType.String,
            required=True,
            label="Name",
            description="Your name",
            scope=settings.ConnectorRuntimeSettingScope.SOURCE,
        )
        return responses.DescribeConnectorConfigurationResponse(
            is_success=True,
            connector_owner="koji",
            connector_name="hello",
            connector_version="0.0.1",
            connector_modes=[config.ConnectorModes.SOURCE],
            connector_runtime_setting=[name_settings],
            authentication_config=auth.AuthenticationConfig(
                is_custom_auth_supported=True,
                custom_auth_config=[auth.CustomAuthConfig("dummy", [])],
            ),
            supported_api_versions=["0.1"],
        )

実行時パラメータは抽象クラス ConfigurationHandler を継承したクラスの describe_connector_configuration メソッドで定義します。今回は実行時に name だけ事前に設定してほしい(上記record.py参照)ので ConnectorRuntimeSetting オブジェクトを1つだけ作成しています。なお、外部SaaSなどのアプリケーションからデータを抽出するのに認証情報が必要な場合、ここで認証情報に関する設定として AuthenticationConfig オブジェクトを設定します。今回は不要なので CustomAuthConfig でダミーを設定しています。

metadata.py
class HelloMetadataHandler(MetadataHandler):
    ENTITY = context.Entity(
        entity_identifier="hello",
        has_nested_entities=False,
        is_writable=False,
        label="hello",
        description="Say hello",
    )
    FIELDS = [
        fields.FieldDefinition(
            field_name="message",
            data_type=fields.FieldDataType.String,
            read_properties=fields.ReadOperationProperty(
                is_retrievable=True,
                is_nullable=True,
                is_queryable=True,
            ),
        )
    ]

    def list_entities(
        self, request: requests.ListEntitiesRequest
    ) -> responses.ListEntitiesResponse:
        return responses.ListEntitiesResponse(
            is_success=True,
            entities=[self.ENTITY],
        )

    def describe_entity(
        self, request: requests.DescribeEntityRequest
    ) -> responses.DescribeEntityResponse:
        entity_definition = context.EntityDefinition(
            entity=self.ENTITY,
            fields=self.FIELDS,
        )
        return responses.DescribeEntityResponse(
            is_success=True, entity_definition=entity_definition
        )

抽象クラス MetadataHandler を継承したクラスで、データソースにどのようなデータ(entity)が用意されていてそれぞれどのようなデータスキーマ(フィールド定義)になっているかを定義します。用意されているentityの一覧の定義は list_entities メソッドで実装します。そして、各entityのデータスキーマの定義は describe_entity メソッドで定義します。今回は、 hello というentityを1つだけ定義し、そのデータスキーマとしては message という文字列型のフィールドが1つあるだけとなります。

connectorの実装は以上になります。SDK付属のツールを利用してconnectorをデプロイする場合はCloudFormationの設定ファイルが必要になりますので、 template.yml も作成します。

custom connectorの処理の流れ

custom connectorの実体はLambda関数です。AppFlowがLambda関数として実装された上記処理とどのように連携しているか確認していきます。

このように、connectionを設定する際や実際にflowを実行してデータ処理をする際に、都度AppFlowからconnectorを呼び出す構成になっているのがわかります。

next step

実用的なconnectorには、設定値のバリデーションやデータソースへの接続確認、取得するデータのフィルタリング条件指定、ページネーション対応など実装しなければならない要素が結構あります。それらに対応するための参考実装として冒頭で触れたexample connector(Salesforce)があります。ただし、全体像把握できたこの時点でもまだ少し敷居が高いと感じるかもしれません。そのため、動作確認をしやすい参考実装としてe-Stat(政府統計API)のsource connectorを実装してみました。e-Statは無料で登録するとAPIアクセスに必要なアプリケーションIDを発行できます。月次の労働力調査のデータを取得できるentityだけ実装してみましたので次のステップとして参考になるかもしれません。

Discussion