Queueを介してBlob更新をFunctionにプライベート接続でイベント通知する方法
はじめに
こちらの記事で紹介していますが、BlobStorageのイベントをEventGridを使い、プライベート通信でFunctionsに通知することはできません。
上記の問題に対処するために、以下の回避策を設定・実装します。
BlobStorageにドキュメントを格納したのち、Queueに登録したドキュメント名を含むメッセージを登録します。その後、QueueTriggerでFunctionを実行し、ドキュメント名からドキュメントを取得します。
必要なリソースと実装箇所
構築のためには以下のリソースが必要です。
※左側の「ドキュメントを格納するアプリケーション」は作成はせずに、手動で仮定して稼働確認します。
- StorageAccount
- Blob
- Queue
- FunctionApp
- PrivateEndpoint
- python実装したソースコード
- QueueTriggerで実行
- Queueのメッセージに含まれたBlobのドキュメント名を取得
- ドキュメント名からドキュメントを取得する。
リソースの作成
StorageAccountの作成
-
StorageAccountを作成します。
その他の設定はデフォルトのままでOKです。
※セキュリティを考慮したエンタープライズ向けの記事ですので、サービスエンドポイントは使わずプライベートエンドポイントを設定しました。
-
作成したStorageAccountのネットワーク画面から、接続文字列を控えます。この接続文字列は、後ほどFunctionの環境変数に設定します。
Blobコンテナの作成
- 作成したStorageAccountのBlobコンテナ画面から作成
- コンテナ名を入力し作成
Queueの作成
- 作成したStorageAccountのQueue画面から作成
- Queue名を入力し作成
CosmosDBのコンテナを作成
CosmosDBに、データベースとコンテナを作成します。
ここで設定したデータベース名とコンテナ名は、FunctionのPythonソースコードに記載しますので控えておきましょう。
Functionの作成
-
Functionリソースを作成する。
ランタイム:Python
OS:Linux
ホスティング:Function Premium -
FunctionのVnet統合を設定する。
FunctionがQueueTriggerする仕組みは、Functionからのポーリングする仕組みになっています。そのため、QueueのStorageAccountのプライベートエンドポイントに接続できる仮想ネットワーク内にFunctionを配置する必要があります。- 払いだされたFunctionリソースのネットワーク画面から、仮想ネットワーク統合を選択する。
※1. のFunctionを払い出す際の画面で設定することもできます。
- 仮想ネットワーク統合の追加をクリック
- サブネットを設定して保存
- 払いだされたFunctionリソースのネットワーク画面から、仮想ネットワーク統合を選択する。
Function上で動くPythonコードの実装
QueueTriggerで実行されるPythonコードを実装します。
まずQueueメッセージに含まれるBlobドキュメント名を取得し()、そのドキュメント名からBlobドキュメントを取得します。
取得したBlobドキュメントの内容をCosmosDBに登録します。
import azure.functions as func
import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.exceptions as Exceptions
from azure.storage.blob import BlobServiceClient
from logging import getLogger
import os, uuid
logging = getLogger(__name__)
COSMOS_ACCOUNT_URI = os.environ['COSMOS_ACCOUNT_URI']
COSMOS_ACCOUNT_KEY = os.environ['COSMOS_ACCOUNT_KEY']
COSMOS_DATABASE_NAME = 'shrkmn-doc'
COSMOS_CONTAINER_NAME = 'shrkmn-doc'
BLOB_CONTAINER_NAME = 'shrkmn-doc-container'
blob_connection_string = os.getenv("AZURE_STORAGE_ACCESS_URL")
storage_access_key = os.getenv("AZURE_STORAGE_ACCESS_KEY")
app = func.FunctionApp()
@app.function_name(name="shrkmn_doc_func")
@app.queue_trigger(arg_name="azqueue", queue_name="shrkmn-doc-queue",
connection="AZURE_STORAGE_ACCESS_URL")
def shrkmn_doc_func(azqueue: func.QueueMessage):
#QueueのメッセージにBlobドキュメント名が格納される
blob_doc_name = azqueue.get_body().decode('utf-8')
logging.info('blob_doc_name: %s', blob_doc_name)
#Blobドキュメントの内容を取得する
try:
logging.info('get document content of blob storage.')
blob_service_client = BlobServiceClient.from_connection_string(blob_connection_string,storage_access_key)
blob_client = blob_service_client.get_blob_client(container=BLOB_CONTAINER_NAME, blob=blob_doc_name)
downloader = blob_client.download_blob(max_concurrency=1, encoding='UTF-8')
blob_text = downloader.readall()
logging.info('blob_text: %s', blob_text)
except Exception as ex:
logging.error('Exception: %s', ex)
#CosmosDBに登録する
#CosmosDBClientを作成
client = cosmos_client.CosmosClient(COSMOS_ACCOUNT_URI, {'masterKey': COSMOS_ACCOUNT_KEY}, user_agent="AssistantApp", user_agent_overwrite=True)
try:
logging.info('create item for cosmosdb.')
#CosmosDBに登録するitemを定義
#idにはuuidを設定
item = {
"id": str(uuid.uuid4()),
"content": blob_text
}
logging.info('item: %s', item)
#CosmosDBのデータベースを取得
database = client.get_database_client(COSMOS_DATABASE_NAME)
#CosmosDBのコンテナーを取得
container = database.get_container_client(COSMOS_CONTAINER_NAME)
#CosmosDBに登録する
container.create_item(body=item)
except Exceptions.CosmosHttpResponseError as e:
logging.error('CosmosHttpResponseError: %s', e.message)
logging.info('finish function.')
ローカル環境で実行する場合、.envファイルに環境変数の定義が必要です。
COSMOS_ACCOUNT_URI="COSMOSアカウントのURI"
COSMOS_ACCOUNT_KEY="COSMOSアカウントのキー"
AZURE_STORAGE_ACCESS_URL="StorageAccountの接続文字列"
AZURE_STORAGE_ACCESS_KEY="StorageAccountのアクセスキー"
FunctionにPythonコードをデプロイ
VSCodeのAzure Functions拡張機能を使い、FunctionにPythonコードをデプロイします。
- AzureFunction拡張機能タブから、Functionアイコンをクリックし、「Deploy to Function App」を選択。
- デプロイ先のFunctionを選択。
稼働確認
- BlobStorageにこのようなサンプルドキュメントを格納。
- Queueにファイル名(ここでは「sample-blob.txt」)をメッセージとして登録。
- QueueTriggerでFunctionが動き、CosmosDBにアイテムが作成されることを確認。
contentフィールドに、BlobStorageに登録したドキュメントの内容が格納されています。
これで、Queueメッセージを介してBlobStorageのドキュメント更新をFunctionに通知することができました。
まとめ
今回の構築で、BlobStorageのドキュメント更新をFunctionに通知することができました。
ここの実装ではBlobStorageのドキュメントの内容をそのままCosmosDBに登録していますが、実際のシステムでは、ドキュメントの内容を加工して登録することが多いと思います。
OpenAIのRAGシステムを構築する場合、OpenAIが推論しやすい形式に変換したりする処理を実装することが望ましいです。
もともとのドキュメントはBlobに格納され、加工されたドキュメントはCosmosDBに格納するというように、BlobとCosmosDBで保管するデータの役割を分けることができます。
Discussion