📝 Azure Learning メモ

Cosmos DB エミュレーターの構築
Docker Compose を使った Cosmos DB エミュレーターの構築
Docker Compose を利用して、Azure Cosmos DB エミュレーターを構築する手順について。
ドキュメントは Docker での構築手順が記載されているが、Docker Compose で構築した。
Docker Compose ファイル
services:
cosmosdb-emulator:
image: mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator:latest
container_name: cosmosdb-emulator
ports:
- "8081:8081"
- "10250-10255:10250-10255"
environment:
AZURE_COSMOS_EMULATOR_PARTITION_COUNT: 10
AZURE_COSMOS_EMULATOR_ENABLE_DATA_PERSISTENCE: "true"
volumes:
- cosmosdbdata:/data
networks:
- cosmosdb-network
healthcheck:
test:
[
"CMD",
"curl",
"-f",
"https://localhost:8081/_explorer/emulator.pem",
]
interval: 30s
timeout: 10s
retries: 5
cosmosdbdata: {}
networks:
cosmosdb-network:
driver: bridge
Ubuntu でのエミュレーターの TLS/SSL 証明書をインポートする手順は次の通り。
Cosmos DB エミュレーターの TLS/SSL 証明書のインポート (Ubuntu)
curl --insecure https://localhost:8081/_explorer/emulator.pem -o /tmp/cosmosdb-emulator-cert.crt || { echo 'Failed to download certificate'; exit 1; }
sudo cp /tmp/cosmosdb-emulator-cert.crt /usr/local/share/ca-certificates/ || { echo 'Failed to copy certificate'; exit 1; }
sudo update-ca-certificates || { echo 'Failed to update CA certificates'; exit 1; }
rm /tmp/cosmosdb-emulator-cert.crt

VS Code を使った Go での Azure Functions の作成
VS Code を利用し、Go 言語で Azure Functions を作成し、ローカルで実行および Azure にデプロイする手順について。

ローカル環境での Azure Functions の開発
Azure Functions をローカル環境で開発し、デバッグする手順。必要なツールのセットアップから、ローカルで関数を実行するまでの方法について。

Azure Cosmos DB の Go SDK を使った CRUD 操作
Azure Cosmos DB の Go SDK を使って、Cosmos DB に対して CRUD 操作を行う手順について。
Azure Cosmos DB の環境変数の設定
local.settings.json
ファイルに Cosmos DB の接続情報を設定します。
COSOMOSDB_KEY
には、Azure Cosmos DB のアカウントのプライマリ キーを設定します。
プライマリーキーはhttps://localhost:8081/_explorer/index.html で確認できます。
{
"IsEncrypted": false,
"Values": {
"FUNCTIONS_WORKER_RUNTIME": "custom",
"AzureWebJobsStorage": "",
"COSMOSDB_ENDPOINT": "https://localhost:8081",
"COSMOSDB_KEY": "your_cosmosdb_key"
}
}
Azure Cosmos DB の Go SDK のインストール
details Go SDK のインストール
go get -u github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos
CRUD 操作のサンプルコード
動的な ID を含むパスを処理する RESTful API を構築するために、function.json ファイルの route プロパティを次のように設定します。
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": ["get", "post", "put", "delete"],
"route": "items/{id?}"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}
CRUD 操作のサンプルコード
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"log/slog"
"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos"
"github.com/google/uuid"
)
// Itemはデータベース内のアイテムを表します。
type Item struct {
ID string `json:"id"`
Name string `json:"name"`
Category string `json:"category"`
Price float64 `json:"price"`
}
var (
cosmosClient *azcosmos.Client
databaseName = "SampleDB"
containerName = "Items"
)
// init は、環境変数から CosmosDB の接続情報を取得し、クライアントを初期化します。
func init() {
endpoint := os.Getenv("COSMOSDB_ENDPOINT")
key := os.Getenv("COSMOSDB_KEY")
if endpoint == "" || key == "" {
slog.Error("環境変数が設定されていません", "COSMOSDB_ENDPOINT", endpoint, "COSMOSDB_KEY", key)
os.Exit(1)
}
cred, err := azcosmos.NewKeyCredential(key)
if err != nil {
slog.Error("クレデンシャルの作成に失敗しました", "error", err)
os.Exit(1)
}
cosmosClient, err = azcosmos.NewClientWithKey(endpoint, cred, nil)
if err != nil {
slog.Error("Cosmosクライアントの作成に失敗しました", "error", err)
os.Exit(1)
}
}
// createItem は、POST リクエストを処理してデータベースに新しいアイテムを作成します。
func createItem(w http.ResponseWriter, r *http.Request) {
ctx := context.TODO()
var item Item
if err := json.NewDecoder(r.Body).Decode(&item); err != nil {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]string{"error": "リクエストペイロードが無効です"})
slog.Error("リクエストペイロードが無効です", "error", err)
return
}
item.ID = uuid.NewString()
itemData, _ := json.Marshal(item)
partitionKey := azcosmos.NewPartitionKeyString(item.Category)
container, err := cosmosClient.NewContainer(databaseName, containerName)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("コンテナの作成に失敗しました: %v", err)})
slog.Error("コンテナの作成に失敗しました", "error", err)
return
}
_, err = container.CreateItem(ctx, partitionKey, itemData, nil)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("アイテムの作成に失敗しました: %v", err)})
slog.Error("アイテムの作成に失敗しました", "error", err)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(item)
slog.Info("アイテムを作成しました", "item", item)
}
// getItem は、GET リクエストを処理して指定された ID とカテゴリのアイテムを取得します。
func getItem(w http.ResponseWriter, r *http.Request) {
ctx := context.TODO()
itemID := r.PathValue("id")
category := r.URL.Query().Get("category")
if category == "" {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]string{"error": "クエリパラメータとしてカテゴリが必要です"})
slog.Error("カテゴリパラメータが不足しています", "category", category)
return
}
partitionKey := azcosmos.NewPartitionKeyString(category)
container, err := cosmosClient.NewContainer(databaseName, containerName)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("コンテナの作成に失敗しました: %v", err)})
slog.Error("コンテナの作成に失敗しました", "error", err)
return
}
readResponse, err := container.ReadItem(ctx, partitionKey, itemID, nil)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("アイテムの読み取りに失敗しました: %v", err)})
slog.Error("アイテムの読み取りに失敗しました", "error", err)
return
}
var item Item
json.Unmarshal(readResponse.Value, &item)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(item)
slog.Info("アイテムを取得しました", "item", item)
}
// updateItem は、PUT リクエストを処理してデータベースの既存アイテムを更新します。
func updateItem(w http.ResponseWriter, r *http.Request) {
ctx := context.TODO()
itemID := r.PathValue("id")
var item Item
if err := json.NewDecoder(r.Body).Decode(&item); err != nil {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]string{"error": "リクエストペイロードが無効です"})
slog.Error("リクエストペイロードが無効です", "error", err)
return
}
item.ID = itemID
itemData, _ := json.Marshal(item)
partitionKey := azcosmos.NewPartitionKeyString(item.Category)
container, err := cosmosClient.NewContainer(databaseName, containerName)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("コンテナの作成に失敗しました: %v", err)})
slog.Error("コンテナの作成に失敗しました", "error", err)
return
}
_, err = container.ReplaceItem(ctx, partitionKey, item.ID, itemData, nil)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("アイテムの更新に失敗しました: %v", err)})
slog.Error("アイテムの更新に失敗しました", "error", err)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{"message": "アイテムの更新に成功しました。"})
slog.Info("アイテムを更新しました", "item", item)
}
// deleteItem は、DELETE リクエストを処理してデータベースからアイテムを削除します。
func deleteItem(w http.ResponseWriter, r *http.Request) {
ctx := context.TODO()
itemID := r.PathValue("id")
category := r.URL.Query().Get("category")
if category == "" {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]string{"error": "クエリパラメータとしてカテゴリが必要です"})
slog.Error("カテゴリパラメータが不足しています", "category", category)
return
}
partitionKey := azcosmos.NewPartitionKeyString(category)
container, err := cosmosClient.NewContainer(databaseName, containerName)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("コンテナの作成に失敗しました: %v", err)})
slog.Error("コンテナの作成に失敗しました", "error", err)
return
}
_, err = container.DeleteItem(ctx, partitionKey, itemID, nil)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("アイテムの削除に失敗しました: %v", err)})
slog.Error("アイテムの削除に失敗しました", "error", err)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{"message": "アイテムの削除に成功しました。"})
slog.Info("アイテムを削除しました", "itemID", itemID)
}
func main() {
listenAddr := ":7071"
if val, ok := os.LookupEnv("FUNCTIONS_CUSTOMHANDLER_PORT"); ok {
listenAddr = ":" + val
}
// サブルーティングを含むHTTPマルチプレクサの設定
mux := http.NewServeMux()
mux.HandleFunc("POST /api/items", createItem)
mux.HandleFunc("GET /api/items/{id}", getItem)
mux.HandleFunc("PUT /api/items/{id}", updateItem)
mux.HandleFunc("DELETE /api/items/{id}", deleteItem)
slog.Info("サーバーを起動します", "address", listenAddr)
log.Fatal(http.ListenAndServe(listenAddr, mux))
}

curl
を使った API のテスト
サンプルコードのcurl
で各エンドポイントをテストするためのサンプルコマンド
API のテスト
1. アイテムの作成 (POST /api/items)
curl -X POST http://localhost:7071/api/items -H "Content-Type: application/json" -d '{
"name": "Sample Item",
"category": "Electronics",
"price": 99.99
}'
2. アイテムの取得 (GET /api/items/{id})
{id}
は作成されたアイテムの ID に置き換えてください(例として sample-id
を使用しています)。
curl -X GET "http://localhost:7071/api/items/sample-id?category=Electronics"
3. アイテムの更新 (PUT /api/items/{id})
{id}
は更新したいアイテムの ID に置き換えてください。
curl -X PUT "http://localhost:7071/api/items/sample-id" -H "Content-Type: application/json" -d '{
"name": "Updated Item",
"category": "Electronics",
"price": 149.99
}'
4. アイテムの削除 (DELETE /api/items/{id})
{id}
は削除したいアイテムの ID に置き換えてください。
curl -X DELETE "http://localhost:7071/api/items/sample-id?category=Electronics"

VS Code と Python を使用して Azure に関数を作成する
VS Code と Python を使用して Azure Functions を作成し、ローカルで実行および Azure にデプロイする手順について。
Azurite を docker-compose で起動する
Python で Azure Functions を開発する際に、Azurite を使用して Azure Storage エミュレーターをローカルで起動する必要がある。
Docker Compose ファイルの Azurite の設定
services:
azurite:
image: mcr.microsoft.com/azure-storage/azurite
ports:
- "10000:10000"
- "10001:10001"
- "10002:10002"
Azure Storage Explorer のインストール
Microsoft Azure Storage Explorer は、Windows、macOS、Linux での Azure Storage データの操作を容易にするスタンドアローン アプリです。
Azure Storage BLOB、ファイル、キュー、テーブルに加えて、Azure Data Lake Storage エンティティと Azure マネージド ディスクのアップロード、ダウンロード、管理を行うことができます。ストレージのアクセス許可とアクセス制御、階層、規則を構成できます。

Azure Container Apps

Go Web アプリを Azure Container Apps にデプロイする
Container Registry の価格
Dockerfile のサンプル

デプロイコマンド
Azure Functions
func azure functionapp publish <FunctionAppName>
Container Apps
func azurecontainerapps deploy --name <APP_NAME> --environment <ENVIRONMENT_NAME> --storage-account <STORAGE_CONNECTION> --resource-group <RESOURCE_GROUP> --image-name <IMAGE_NAME> [--registry-password] [--registry-server] [--registry-username]

Durable Functions
サンプルコード

Durable Functions でBluePrintを利用してファイル分割
Durable Functions では、ブループリントもサポートされています。 Durable Functions アプリのブループリントを作成するには、こちらに示すように、azure-functions-durable Blueprint クラスを使用してオーケストレーション、アクティビティ、およびエンティティ トリガーとクライアント バインドを登録します。 その後、結果のブループリントを通常どおりに登録できます。 例については、サンプルをご覧ください。

Azure Durable Functions: Python(Blueprint)での汎用的な入力・出力管理
以下のサンプルは、Azure Durable Functionsを利用して、入力(Input)と出力(Output)を汎用的かつ厳密に扱う方法を解説しています。特に、Pydantic を活用したデータバリデーションと整合性保証がポイントです。
実装の流れ
-
Pydanticモデルで入力・出力を管理
- 入力(MyInput): クライアントからのリクエストデータを検証。
- 出力(MyOutput): 各ステップの結果を統一フォーマットで管理。
-
Blueprintでワークフローを定義
- HTTPトリガー → オーケストレーター → アクティビティ → クライアントへレスポンス。
- 各ステップで Pydantic を利用してデータの整合性を保証。
1. モデル定義
from pydantic import BaseModel
from typing import Optional
class MyInput(BaseModel):
name: str
age: int
class MyOutput(BaseModel):
greeting: str
comment: Optional[str] = None
-
MyInput: クライアントが送信するデータ形式を定義(例:
{"name": "Alice", "age": 25}
)。 -
MyOutput: 処理結果のフォーマットを統一(例:
{"greeting": "Hello, Alice!", "comment": "You are 25 years old."}
)。
2. Blueprintによるワークフロー定義
(1) HTTPトリガー
クライアントからJSONデータを受け取り、Pydanticでバリデーション後にオーケストレーターを起動します。
import azure.durable_functions as df
import azure.functions as func
from pydantic import ValidationError
from .function_app import MyInput
bp = df.Blueprint()
@bp.route(route="start_example", methods=["POST"])
@bp.durable_client_input(client_name="client")
async def start_example(req: func.HttpRequest, client):
try:
body = req.get_json()
input_data = MyInput(**body) # Pydanticで入力検証
except (ValueError, ValidationError) as e:
return func.HttpResponse(f"Invalid input: {str(e)}", status_code=400)
instance_id = await client.start_new(
orchestration_function_name="my_orchestrator",
client_input=input_data.model_dump_json() # JSON形式で渡す
)
return client.create_check_status_response(req, instance_id)
(2) オーケストレーター
オーケストレーターは、以下の手順を実行します:
- 入力データを取得し、Pydanticモデルで再検証。
- アクティビティを呼び出して結果を取得。
- 最終出力をPydanticモデルで整形し、JSON形式で返却。
@bp.orchestration_trigger(context_name="context")
def my_orchestrator(context: df.DurableOrchestrationContext):
# 入力データを取得し、Pydanticで再検証
input_data = MyInput(**context.get_input())
# アクティビティを呼び出し
activity_result = yield context.call_activity(
"my_activity", input_data.model_dump_json()
)
# 結果を整形して返却
result = MyOutput(**activity_result)
return result.model_dump_json()
(3) アクティビティ
アクティビティでは、入力データを処理し、Pydanticモデルを用いて結果を整形します。
@bp.activity_trigger(input_name="data")
def my_activity(data: str) -> dict:
from pydantic import ValidationError
from .function_app import MyInput, MyOutput
try:
# 入力データをPydanticで検証
input_data = MyInput.parse_raw(data)
except ValidationError as e:
raise ValueError(f"MyInput validation error: {e}")
# 処理結果を作成
greeting = f"Hello, {input_data.name}!"
comment = f"You are {input_data.age} years old."
result = MyOutput(greeting=greeting, comment=comment)
return result.model_dump_json()

Azure Functions で Key Vaultを参照する
キー コンテナー参照の形式は @Microsoft.KeyVault({referenceString})
です。ここで、{referenceString0}
は次のいずれかの形式です。

Cosmos DB for NoSQL のベクトル検索機能
Azure Cosmos DB for NoSQL のベクトル検索機能を使用して、ベクトルデータベースを作成し、ベクトルのインデックス作成とクエリを実行する方法について。
Python で Azure Cosmos DB for NoSQL 内のベクトルのインデックス作成とクエリを実行する。
Cosmos DB for NoSQL アカウントをセットアップしてデータベースを作成する
Azure portal で新しい Azure Cosmos DB for NoSQL アカウントを作成し、データ エクスプローラーを使用してデータベースとコンテナーを構成します。最後に、コンテナーにサンプルデータを追加し、基本的なクエリを実行します。