💻

Apache Airflow 経由で Azure Cosmos DB のデータを操作してみる

2021/02/14に公開

この記事について

この記事は、第 6 回 Azure Cosmos DB 勉強会のフィードバック記事です。
今回は、Docker コンテナーで PythonAzure CLIApache Airflow それぞれの環境を構築し、Azure Cosmos DB に接続し操作を行うところまでを解説します。
Azure Cosmos DB への接続は、Azure Cosmos DB Python SDK を使用し、Azure Cosmos DB の接続に必要なキーについては Azure Key Vault のシークレットで管理するようにします。

Azure Cosmos DB Python SDK のインストール

Python で Azure Cosmos DB に 接続するために、Azure Cosmos DB Python SDK をインストールします。

今回は、Docker コンテナー上にある Python 環境にライブラリをインストールします。
インストールは、requirements.txt ファイルを使用して行います。

docker-compose run --rm app pip install -r requirements.txt --user

インストール後、以下のコマンドを実行し、Azure Cosmos DB Python SDK がインストールされたことを確認してください。

docker-compose run --rm app pip list

Azure Key Vault Secrets クライアントライブラリのインストール

Azure Key Vault に Python で接続するために、Azure Key Vault Secrets client library for Python をインストールします。
Azure Cosmos DB に Python SDK 経由で接続する際、プライマリ/セカンダリキーの情報を使用する必要があります。しかし、この時に使用するキー情報を Python のコード上で公開するわけにはいきません。(不正アクセスなどに繋がるため)
そのため、接続時に必要となるキーの情報を Azure Key Vault に登録しておき、処理実行の際に Azure Key Vault からキーを取得するようにします。

インストールは、Azure Cosmos DB Python SDK 同様、requirements.txt を使用して行います。

Azure リソースの作成

実際に使用する Azure Cosmos DB や Azure Key Vault を作成します。
GitHub の /scripts ディレクトリ以下にある create-resources.sh ファイルを Azure CLI コンテナー内で実行し、Azure リソースを作成します。

docker-compose run --rm azurecli /bin/bash create-resources.sh

create-resources.sh を使用することで、以下の環境を新規に構築するようにしています。

  • Resource Group
  • Azure Cosmos DB
  • Azure Key Vault (シークレット作成含む)

スクリプトを実行すると、以下のような出力が行われ、Azure へのログインが求められます。

To sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code SDCFWTQAQ to authenticate.

これは、Azure CLI による Azure へのリモートログインが求められている状況です。
ホスト OS 側にある Web ブラウザーなどを使用して、指定された URL にアクセスし、ログインを行ってください。
Azure CLI に関して気になる方は、以下の記事も参考にしてください。

このスクリプトでは、Azure Cosmos DB は Free Tier 環境で作成します。
もし、既に Free Tier 環境がすでに存在しているサブスクリプションで新たに環境を作成する場合は、create-resources.sh 内の以下の部分を書き換えてください。

create-resources.sh(12行目)
enableFreeTier=false # falseに変更

スクリプトの実行途中で、Azure Key Vault のシークレットを作成するために、シークレットの名前と値を入力することになります。
名前については好きなものを設定してください。
ただし、値は Azure Cosmos DB のプライマリキーもしくはセカンダリキーを入力するようにしてください。

シークレットの入力後、Azure Client ID を入力するように求められます。
ここでは、スクリプト内で作成したサービスプリンシパルで設定した内容の出力結果にある、appId の値を入れてください。

Azure Cosmos DB に Python で接続できることを確認

Apache Airflow の DAG で Azure Cosmos DB に接続する前に、Azure Cosmos DB に Python で接続します。
先ほど作成した Python コンテナーを使用します。開発は好きなエディターを使用すればいいと思いますが、個人的には Visual Studio Code および Visual Studio Code の拡張機能である Python/Pylance/Remote Developmentを使った開発がおすすめです。

/src ディレクトリ以下を Python コンテナーと紐づけているので、新しく Python ファイル (.py) を作成する場合はこちらに作成します。

Azure Key Vault からシークレットを取得

まずは Python で Azure Key Vault に接続し、先ほどスクリプトで作成したシークレットの値を確認してみます。

cosmos_get_started.py
import os
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient

# Default Variables
os.environ['KEY_VAULT_NAME'] = '<your azure key vault name>'
os.environ['SECRET_NAME_COSMOS'] = '<youe secret name>'
os.environ['AZURE_CLIENT_ID'] = '<application id of the registered app>' # appId
os.environ['AZURE_CLIENT_SECRET'] = '<password of your app registered>' # password
os.environ['AZURE_TENANT_ID'] = '<your tenant id>' # tenant

# Define environment variables
kv_name = os.environ['KEY_VAULT_NAME']
kv_endpoint = f"https://{kv_name}.vault.azure.net"
secret_name = os.environ['SECRET_NAME_COSMOS']

cred = DefaultAzureCredential()
secret_client = SecretClient(vault_url=kv_endpoint, credential=cred)

retrieved_secret = secret_client.get_secret(secret_name)
print(f"Your secret is '{retrieved_secret.value}'.")

上記のコードの中で、環境変数の部分は自分のものに置き換えてください。
ソースを作成した後、コマンドを実行すると、シークレットに保存した Azure Cosmos DB のキー値が確認できると思います。

docker-compose run --rm app python3 cosmos_get_started.py
実行結果(例)
Your secret is 'xxxxxxxx...(以下略)'.

Azure Cosmos DB に接続

Azure Cosmos DB のキー情報を取得できることを確認できたので、取得したキーを使用して Azure Cosmos DB に接続していきます。
先ほどのコードに以下の内容を追加し、create-resources.sh で作成した Azure Cosmos DB アカウントに接続していきます。

Python で Azure Cosmos DB アカウントに接続する際は、CosmosClient を使用します。
CosmosClient にエンドポイントの URLプライマリまたはセカンダリのキー情報を渡してあげることで、接続クライアントを生成し、コネクションを生成することができます。

from azure.cosmos import exceptions, CosmosClient, PartitionKey

os.environ['COSMOS_ACCOUNT_NAME'] = '<your cosmos account name>'

# Connect to Azure Cosmos DB
cosmos_account = os.environ['COSMOS_ACCOUNT_NAME']
url = f"https://{cosmos_account}.documents.azure.com:443/"
client = CosmosClient(url, retrieved_secret.value)
print(f"[{cosmos_account}] Connected successfully to Azure Cosmos DB account.")

<your cosmos account name> の部分は自分のものに置き換えてください。
コードを作成し実行すると、正常に Azure Cosmos DB に接続できることを確認できると思います。

データベースとコンテナーを作成

Azure Cosmos DB アカウントに接続したので、この後使用する データベースコンテナー を Azure Cosmos DB Python SDK を使って作成していきます。

データベースについては、CosmosClient クラスにある以下のメソッドを使用していきます。

どちらも、戻り値として DatabaseProxy を返すようになっています。

コンテナーについては、DatabaseProxy クラスにある以下のメソッドを使用していきます。

どちらも、戻り値として ContainerProxy を返すようになっています。

先ほどのコードに以下の内容を追加し、create-resources.sh で作成した Azure Cosmos DB アカウントに接続していきます。

database_name = 'Sample'
container_name = 'jcdug20210214'
throuput_val = 400
part_key = "/category"

try:
    database = client.create_database(id=database_name, offer_throughput=throuput_val)
    print(f"[{cosmos_account}] Created successfully a database. name[{database_name}]")
except exceptions.CosmosResourceExistsError:
    database = client.get_database_client(database=database_name)
    print(f"[{cosmos_account}] Get successfully a database. name[{database_name}]")

try:
    container = database.create_container(
        id=container_name, partition_key=PartitionKey(path=part_key)
    )
    print(f"[{cosmos_account}] Created successfully a container. name[{container_name}]")
except exceptions.CosmosResourceExistsError:
    container = database.get_container_client(container_name)
    print(f"[{cosmos_account}] Get successfully a container. name[{container_name}]")

データベース名やコンテナー名の部分は好きなものに置き換えても問題ありません。
throuput_val でデータベースの RU (要求ユニット) の値を指定しています。この値の最小値は 400 で 100 RU 刻みでスケールアップできます。ただし、400 を超えると Free Tier (無償枠) の対象外分が発生し、課金が始まってしまうため、その点は注意してください。
コードを実行することで、Azure Cosmos DB アカウント上にデータベース/コンテナーが存在しない場合は新規に作成し、既に存在している場合は作成済みのコンテナーの情報をそのまま取得することができます。

データの取得・更新・削除

データの操作に必要なデータベースコンテナーを作成したので、実際にデータ操作を行っていきます。
データの操作は、基本としては ContainerProxy クラスにある以下のメソッドを知っていれば良いかな思います。

今回は、事前に準備している JSON ファイルを使って、先ほど作成したコンテナーにデータを Upsert していきます。

今回は、/src ディレクトリ以下に、files/demonslayer というサブディレクトリを作成し、demonslayer ディレクトリ以下に character.json という JSON ファイルを配置します。
character.json ファイルの内容は以下のようになっています。

character.json
[
  {
    "id": "1",
    "category": "main-character",
    "name": "竈門 炭治郎",
    "age": 15,
    "height": 165,
    "description": "妹を救い、家族の仇討ちを目指す、心優しい少年。鬼や相手の急所などの“匂い”を嗅ぎ分けることができる。",
    "isAlive": true
  },
  {
    "id": "2",
    "category": "main-character",
    "name": "竈門 禰󠄀豆子",
    "age": 14,
    "description": "炭治郎の妹。鬼に襲われ、鬼になってしまうが、他の鬼とは違い、人である炭治郎を守るよう動く。",
    "isAlive": true
  },
  {
    "id": "3",
    "category": "enemy",
    "name": "鬼舞辻 無惨",
    "description": "禰󠄀豆子を鬼に変えた者で炭治郎の宿敵。普段は人間のふりをして暮らしている。",
    "isAlive": false
  },
  {
    "id": "4",
    "category": "friend",
    "name": "煉獄 杏寿郎",
    "age": 20,
    "description": "鬼殺隊の“柱”の一人。“炎の呼吸”で鬼を殲滅する。",
    "isAlive": false
  }
]

この JSON ファイルの内容を Python で取得した後、ContainerProxy.upsert_item のメソッドを使用してデータを 1 件ずつ Upsert していきます。

import json

print(f"[{cosmos_account}] Loading sample data...")
data_demonslayer = json.load(open('./files/demonslayer/character.json', 'r'))

print(f"[{cosmos_account}] Upsert sample data to collection...")
for item in data_demonslayer:
    container.upsert_item(
        dict(item)
    )
    
print(f"[{cosmos_account}] Completed to upsert data.")

実行した後は、read_all_itemsquery_items のメソッドを使用することで、データが正常に投入されたことを確認できるはずです。

# Check data : read
print(f"[{cosmos_account}] Read all items in the container...")
# read_items = container.read_all_items() # Retuen all items in a container
read_items = container.read_all_items(max_item_count=4) # Max number of items to be returned

print(f"[{cosmos_account}] Read result:")
for item in read_items:
    print(json.dumps(item, indent=True, ensure_ascii=False))

# Check data : Query
print(f"[{cosmos_account}] Querying to the container...")
query_items = container.query_items(
    query='SELECT * FROM jcdug20210214 c WHERE c.category = @category',
    parameters=[dict(name="@category", value="main-character")]
)

print(f"[{cosmos_account}] Query result:")
for item in query_items:
    print(json.dumps(item, indent=True, ensure_ascii=False))

Apache Airflow で Azure Cosmos DB を操作

Apache Airflow については、以下の記事を参照してください。

Airflow はデータ処理のワークフローを自動化できるソフトウェアです。
今回は、実際に DAG を作成し、Airflow で Azure Cosmos DB のコンテナーに接続し、処理をいくつか順番に実行してみます。

Azure Cosmos DB Python SDK の最新化

Azure Cosmos DB Python SDK は、Apache Airflow のコンテナーを構築した際に一緒にインストールされるようになっています。
ただし、こちらでインストールされる SDK のバージョンは、3.2.0 と古いものになっています。

残念ながら、Apache Airflow 2.0.0 では、Azure Cosmos DB Python SDK を含め、いくつかの Azure SDK for Python は古いバージョンを使用しなければならないようになっています。

DAG の作成

Airflow で処理を行うには、DAG を定義する必要があります。
/dags ディレクトリ以下に DAF ファイルを作成することで、Airflow のスケジューラによって処理が実行されるようになります。

今回は sample_cosmos_dag.py を作成し処理を定義していきます。

/dags/sample_cosmos_dag.py
import os
import json
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta

from azure.cosmos.cosmos_client import CosmosClient
from azure.cosmos.errors import HTTPFailure

# タスクで呼び出す関数 (1)
# 本来は read_cosmos_items と upsert_cosmos で共通利用する項目は別で定義しますが今回は割愛
def read_cosmos_items():
    # Define additional variables
    database_name = 'Sample'
    container_name = 'jcdug20210214'
    os.environ['COSMOS_ACCOUNT_NAME'] = ''
    os.environ['COSMOS_Secret'] = ''

    # Connect to Azure Cosmos DB
    cosmos_account = os.environ['COSMOS_ACCOUNT_NAME']
    cosmos_key = os.environ['COSMOS_Secret']
    url = f"https://{cosmos_account}.documents.azure.com:443/"
    client = CosmosClient(url, {'masterKey': cosmos_key})
    print(f"[{cosmos_account}] Connected successfully to Azure Cosmos DB account.")

    # Query container
    try:
        result_iterable = client.QueryItems(
            f"dbs/{database_name}/colls/{container_name}",
            {
                "query": "SELECT * FROM c WHERE c.category='enemy'"
            },
        )
        return list(result_iterable)

    except HTTPFailure:
        return None

# タスクで呼び出す関数 (2)
def upsert_cosmos():
    # Define additional variables
    database_name = 'Sample'
    container_name = 'jcdug20210214'
    os.environ['COSMOS_ACCOUNT_NAME'] = ''
    os.environ['COSMOS_Secret'] = ''

    # Connect to Azure Cosmos DB
    cosmos_account = os.environ['COSMOS_ACCOUNT_NAME']
    cosmos_key = os.environ['COSMOS_Secret']
    url = f"https://{cosmos_account}.documents.azure.com:443/"
    client = CosmosClient(url, {'masterKey': cosmos_key})
    print(f"[{cosmos_account}] Connected successfully to Azure Cosmos DB account.")

    try:
        created_document = client.CreateItem(
            f"dbs/{database_name}/colls/{container_name}",
            {'id': '5', 'category': 'demon-slayer-corps', 'name': '冨岡 義勇', 'description': '鬼殺隊の隊士で、炭治郎を鬼殺隊へと導く。', 'isAlive': True},
        )
        return created_document

    except HTTPFailure:
        return None

# DAG ファイルの既定値
args = {
    'owner': 'Pramod', # DAG の所有者  
    'start_date': airflow.utils.dates.days_ago(3), # タスクの開始日時
    'depends_on_past': False,
    'email': ['airflow@example.com'], # 障害発生時などにメール送信を行う宛先
    'email_on_failure': False, # タスク失敗時にメールを送信するか否か
    'email_on_retry': False, # タスクのリトライが発生した際にメールを送信するか否か
    'retries': 1, # タスク失敗時のリトライ回数
    'retry_delay': timedelta(minutes=5), # タスクが失敗してからリトライが行われるまでの待ち時間
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2021, 12, 31), # タスクの終了日時
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}

# DAG 情報作成
dag = DAG(
    'sample_cosmos_dag', # DAG の名前
    default_args=args, # DAG のデフォルト引数
    description='Azure Cosmos DB 操作テスト', # DAG の説明
    schedule_interval=timedelta(days=1), # タスクの実行間隔
    # start_date=airflow.utils.dates.days_ago(3), # ここでも指定可能
    tags=['example','cosmosdb']
)

# タスク定義
t1 = PythonOperator(
    task_id='read_cosmos_items',
    python_callable=read_cosmos_items,
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
    dag=dag,
)

templated_command = """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)

t4 = PythonOperator(
    task_id='upsert_cosmos',
    python_callable=upsert_cosmos,
    dag=dag,
)

t1 >> [t2, t3] >> t4

DAG 内で独自の Python コードを使用するには、PythonOperator を使用し、def で定義した関数を呼び出すようにします。
今回は触れていませんが、実際にはこのタスクで

  • 取得したデータを Pandas DataFrame に変換して分析
  • 変更フィード (Change Feed) で値を取得して後続処理を実施

といったようなワークフローを組むことが基本になってくるかと思います。

参考情報

Microsoft Docs

Azure CLI

Docker Hub

Discussion