Zenn
Closed14

📝 Azure Learning メモ

yuji⛩️yuji⛩️

Cosmos DB エミュレーターの構築

Docker Compose を使った Cosmos DB エミュレーターの構築

Docker Compose を利用して、Azure Cosmos DB エミュレーターを構築する手順について。

ドキュメントは Docker での構築手順が記載されているが、Docker Compose で構築した。

Docker Compose ファイル
docker-compose.yml
services:
    cosmosdb-emulator:
        image: mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator:latest
        container_name: cosmosdb-emulator
        ports:
            - "8081:8081"
            - "10250-10255:10250-10255"
        environment:
            AZURE_COSMOS_EMULATOR_PARTITION_COUNT: 10
            AZURE_COSMOS_EMULATOR_ENABLE_DATA_PERSISTENCE: "true"
        volumes:
            - cosmosdbdata:/data
        networks:
            - cosmosdb-network
        healthcheck:
            test:
                [
                    "CMD",
                    "curl",
                    "-f",
                    "https://localhost:8081/_explorer/emulator.pem",
                ]
            interval: 30s
            timeout: 10s
            retries: 5

    cosmosdbdata: {}

networks:
    cosmosdb-network:
        driver: bridge

Ubuntu でのエミュレーターの TLS/SSL 証明書をインポートする手順は次の通り。

Cosmos DB エミュレーターの TLS/SSL 証明書のインポート (Ubuntu)
curl --insecure https://localhost:8081/_explorer/emulator.pem -o /tmp/cosmosdb-emulator-cert.crt || { echo 'Failed to download certificate'; exit 1; }
sudo cp /tmp/cosmosdb-emulator-cert.crt /usr/local/share/ca-certificates/ || { echo 'Failed to copy certificate'; exit 1; }
sudo update-ca-certificates || { echo 'Failed to update CA certificates'; exit 1; }
rm /tmp/cosmosdb-emulator-cert.crt
yuji⛩️yuji⛩️

VS Code を使った Go での Azure Functions の作成

VS Code を利用し、Go 言語で Azure Functions を作成し、ローカルで実行および Azure にデプロイする手順について。

yuji⛩️yuji⛩️

ローカル環境での Azure Functions の開発

Azure Functions をローカル環境で開発し、デバッグする手順。必要なツールのセットアップから、ローカルで関数を実行するまでの方法について。

yuji⛩️yuji⛩️

Azure Cosmos DB の Go SDK を使った CRUD 操作

Azure Cosmos DB の Go SDK を使って、Cosmos DB に対して CRUD 操作を行う手順について。

Azure Cosmos DB の環境変数の設定

local.settings.json ファイルに Cosmos DB の接続情報を設定します。

COSOMOSDB_KEY には、Azure Cosmos DB のアカウントのプライマリ キーを設定します。
プライマリーキーはhttps://localhost:8081/_explorer/index.html で確認できます。

local.settings.json
{
    "IsEncrypted": false,
    "Values": {
        "FUNCTIONS_WORKER_RUNTIME": "custom",
        "AzureWebJobsStorage": "",
        "COSMOSDB_ENDPOINT": "https://localhost:8081",
        "COSMOSDB_KEY": "your_cosmosdb_key"
    }
}

Azure Cosmos DB の Go SDK のインストール

details Go SDK のインストール

go get -u github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos

CRUD 操作のサンプルコード

動的な ID を含むパスを処理する RESTful API を構築するために、function.json ファイルの route プロパティを次のように設定します。

function.json
{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": ["get", "post", "put", "delete"],
      "route": "items/{id?}"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    }
  ]
}
CRUD 操作のサンプルコード
main.go
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "os"

    "log/slog"

    "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos"
    "github.com/google/uuid"
)

// Itemはデータベース内のアイテムを表します。
type Item struct {
    ID       string  `json:"id"`
    Name     string  `json:"name"`
    Category string  `json:"category"`
    Price    float64 `json:"price"`
}

var (
    cosmosClient  *azcosmos.Client
    databaseName  = "SampleDB"
    containerName = "Items"
)

// init は、環境変数から CosmosDB の接続情報を取得し、クライアントを初期化します。
func init() {
    endpoint := os.Getenv("COSMOSDB_ENDPOINT")
    key := os.Getenv("COSMOSDB_KEY")
    if endpoint == "" || key == "" {
        slog.Error("環境変数が設定されていません", "COSMOSDB_ENDPOINT", endpoint, "COSMOSDB_KEY", key)
        os.Exit(1)
    }

    cred, err := azcosmos.NewKeyCredential(key)
    if err != nil {
        slog.Error("クレデンシャルの作成に失敗しました", "error", err)
        os.Exit(1)
    }

    cosmosClient, err = azcosmos.NewClientWithKey(endpoint, cred, nil)
    if err != nil {
        slog.Error("Cosmosクライアントの作成に失敗しました", "error", err)
        os.Exit(1)
    }
}

// createItem は、POST リクエストを処理してデータベースに新しいアイテムを作成します。
func createItem(w http.ResponseWriter, r *http.Request) {
    ctx := context.TODO()
    var item Item
    if err := json.NewDecoder(r.Body).Decode(&item); err != nil {
        w.WriteHeader(http.StatusBadRequest)
        json.NewEncoder(w).Encode(map[string]string{"error": "リクエストペイロードが無効です"})
        slog.Error("リクエストペイロードが無効です", "error", err)
        return
    }
    item.ID = uuid.NewString()
    itemData, _ := json.Marshal(item)
    partitionKey := azcosmos.NewPartitionKeyString(item.Category)

    container, err := cosmosClient.NewContainer(databaseName, containerName)
    if err != nil {
        w.WriteHeader(http.StatusInternalServerError)
        json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("コンテナの作成に失敗しました: %v", err)})
        slog.Error("コンテナの作成に失敗しました", "error", err)
        return
    }

    _, err = container.CreateItem(ctx, partitionKey, itemData, nil)
    if err != nil {
        w.WriteHeader(http.StatusInternalServerError)
        json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("アイテムの作成に失敗しました: %v", err)})
        slog.Error("アイテムの作成に失敗しました", "error", err)
        return
    }
    w.Header().Set("Content-Type", "application/json")
    w.WriteHeader(http.StatusCreated)
    json.NewEncoder(w).Encode(item)
    slog.Info("アイテムを作成しました", "item", item)
}

// getItem は、GET リクエストを処理して指定された ID とカテゴリのアイテムを取得します。
func getItem(w http.ResponseWriter, r *http.Request) {
    ctx := context.TODO()
    itemID := r.PathValue("id")
    category := r.URL.Query().Get("category")
    if category == "" {
        w.WriteHeader(http.StatusBadRequest)
        json.NewEncoder(w).Encode(map[string]string{"error": "クエリパラメータとしてカテゴリが必要です"})
        slog.Error("カテゴリパラメータが不足しています", "category", category)
        return
    }

    partitionKey := azcosmos.NewPartitionKeyString(category)
    container, err := cosmosClient.NewContainer(databaseName, containerName)
    if err != nil {
        w.WriteHeader(http.StatusInternalServerError)
        json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("コンテナの作成に失敗しました: %v", err)})
        slog.Error("コンテナの作成に失敗しました", "error", err)
        return
    }

    readResponse, err := container.ReadItem(ctx, partitionKey, itemID, nil)
    if err != nil {
        w.WriteHeader(http.StatusInternalServerError)
        json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("アイテムの読み取りに失敗しました: %v", err)})
        slog.Error("アイテムの読み取りに失敗しました", "error", err)
        return
    }
    var item Item
    json.Unmarshal(readResponse.Value, &item)
    w.Header().Set("Content-Type", "application/json")
    w.WriteHeader(http.StatusOK)
    json.NewEncoder(w).Encode(item)
    slog.Info("アイテムを取得しました", "item", item)
}

// updateItem は、PUT リクエストを処理してデータベースの既存アイテムを更新します。
func updateItem(w http.ResponseWriter, r *http.Request) {
    ctx := context.TODO()
    itemID := r.PathValue("id")
    var item Item
    if err := json.NewDecoder(r.Body).Decode(&item); err != nil {
        w.WriteHeader(http.StatusBadRequest)
        json.NewEncoder(w).Encode(map[string]string{"error": "リクエストペイロードが無効です"})
        slog.Error("リクエストペイロードが無効です", "error", err)
        return
    }
    item.ID = itemID
    itemData, _ := json.Marshal(item)
    partitionKey := azcosmos.NewPartitionKeyString(item.Category)

    container, err := cosmosClient.NewContainer(databaseName, containerName)
    if err != nil {
        w.WriteHeader(http.StatusInternalServerError)
        json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("コンテナの作成に失敗しました: %v", err)})
        slog.Error("コンテナの作成に失敗しました", "error", err)
        return
    }

    _, err = container.ReplaceItem(ctx, partitionKey, item.ID, itemData, nil)
    if err != nil {
        w.WriteHeader(http.StatusInternalServerError)
        json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("アイテムの更新に失敗しました: %v", err)})
        slog.Error("アイテムの更新に失敗しました", "error", err)
        return
    }
    w.Header().Set("Content-Type", "application/json")
    w.WriteHeader(http.StatusOK)
    json.NewEncoder(w).Encode(map[string]string{"message": "アイテムの更新に成功しました。"})
    slog.Info("アイテムを更新しました", "item", item)
}

// deleteItem は、DELETE リクエストを処理してデータベースからアイテムを削除します。
func deleteItem(w http.ResponseWriter, r *http.Request) {
    ctx := context.TODO()
    itemID := r.PathValue("id")
    category := r.URL.Query().Get("category")
    if category == "" {
        w.WriteHeader(http.StatusBadRequest)
        json.NewEncoder(w).Encode(map[string]string{"error": "クエリパラメータとしてカテゴリが必要です"})
        slog.Error("カテゴリパラメータが不足しています", "category", category)
        return
    }

    partitionKey := azcosmos.NewPartitionKeyString(category)
    container, err := cosmosClient.NewContainer(databaseName, containerName)
    if err != nil {
        w.WriteHeader(http.StatusInternalServerError)
        json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("コンテナの作成に失敗しました: %v", err)})
        slog.Error("コンテナの作成に失敗しました", "error", err)
        return
    }

    _, err = container.DeleteItem(ctx, partitionKey, itemID, nil)
    if err != nil {
        w.WriteHeader(http.StatusInternalServerError)
        json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("アイテムの削除に失敗しました: %v", err)})
        slog.Error("アイテムの削除に失敗しました", "error", err)
        return
    }
    w.Header().Set("Content-Type", "application/json")
    w.WriteHeader(http.StatusOK)
    json.NewEncoder(w).Encode(map[string]string{"message": "アイテムの削除に成功しました。"})
    slog.Info("アイテムを削除しました", "itemID", itemID)
}

func main() {
    listenAddr := ":7071"
    if val, ok := os.LookupEnv("FUNCTIONS_CUSTOMHANDLER_PORT"); ok {
        listenAddr = ":" + val
    }

    // サブルーティングを含むHTTPマルチプレクサの設定
    mux := http.NewServeMux()
    mux.HandleFunc("POST /api/items", createItem)
    mux.HandleFunc("GET /api/items/{id}", getItem)
    mux.HandleFunc("PUT /api/items/{id}", updateItem)
    mux.HandleFunc("DELETE /api/items/{id}", deleteItem)

    slog.Info("サーバーを起動します", "address", listenAddr)
    log.Fatal(http.ListenAndServe(listenAddr, mux))
}

yuji⛩️yuji⛩️

curl を使った API のテスト

サンプルコードのcurl で各エンドポイントをテストするためのサンプルコマンド

API のテスト

1. アイテムの作成 (POST /api/items)

curl -X POST http://localhost:7071/api/items -H "Content-Type: application/json" -d '{
    "name": "Sample Item",
    "category": "Electronics",
    "price": 99.99
}'

2. アイテムの取得 (GET /api/items/{id})

{id} は作成されたアイテムの ID に置き換えてください(例として sample-id を使用しています)。

curl -X GET "http://localhost:7071/api/items/sample-id?category=Electronics"

3. アイテムの更新 (PUT /api/items/{id})

{id} は更新したいアイテムの ID に置き換えてください。

curl -X PUT "http://localhost:7071/api/items/sample-id" -H "Content-Type: application/json" -d '{
    "name": "Updated Item",
    "category": "Electronics",
    "price": 149.99
}'

4. アイテムの削除 (DELETE /api/items/{id})

{id} は削除したいアイテムの ID に置き換えてください。

curl -X DELETE "http://localhost:7071/api/items/sample-id?category=Electronics"
yuji⛩️yuji⛩️

VS Code と Python を使用して Azure に関数を作成する

VS Code と Python を使用して Azure Functions を作成し、ローカルで実行および Azure にデプロイする手順について。

Azurite を docker-compose で起動する

Python で Azure Functions を開発する際に、Azurite を使用して Azure Storage エミュレーターをローカルで起動する必要がある。

Docker Compose ファイルの Azurite の設定
docker-compose.yml
services:
    azurite:
        image: mcr.microsoft.com/azure-storage/azurite
        ports:
            - "10000:10000"
            - "10001:10001"
            - "10002:10002"

Azure Storage Explorer のインストール

Microsoft Azure Storage Explorer は、Windows、macOS、Linux での Azure Storage データの操作を容易にするスタンドアローン アプリです。

Azure Storage BLOB、ファイル、キュー、テーブルに加えて、Azure Data Lake Storage エンティティと Azure マネージド ディスクのアップロード、ダウンロード、管理を行うことができます。ストレージのアクセス許可とアクセス制御、階層、規則を構成できます。

yuji⛩️yuji⛩️

デプロイコマンド

Azure Functions

func azure functionapp publish <FunctionAppName>

Container Apps

func azurecontainerapps deploy --name <APP_NAME> --environment <ENVIRONMENT_NAME> --storage-account <STORAGE_CONNECTION> --resource-group <RESOURCE_GROUP> --image-name <IMAGE_NAME> [--registry-password] [--registry-server] [--registry-username]
yuji⛩️yuji⛩️

Durable Functions でBluePrintを利用してファイル分割

Durable Functions では、ブループリントもサポートされています。 Durable Functions アプリのブループリントを作成するには、こちらに示すように、azure-functions-durable Blueprint クラスを使用してオーケストレーション、アクティビティ、およびエンティティ トリガーとクライアント バインドを登録します。 その後、結果のブループリントを通常どおりに登録できます。 例については、サンプルをご覧ください。

yuji⛩️yuji⛩️

Azure Durable Functions: Python(Blueprint)での汎用的な入力・出力管理

以下のサンプルは、Azure Durable Functionsを利用して、入力(Input)と出力(Output)を汎用的かつ厳密に扱う方法を解説しています。特に、Pydantic を活用したデータバリデーションと整合性保証がポイントです。


実装の流れ

  1. Pydanticモデルで入力・出力を管理
    • 入力(MyInput): クライアントからのリクエストデータを検証。
    • 出力(MyOutput): 各ステップの結果を統一フォーマットで管理。
  2. Blueprintでワークフローを定義
    • HTTPトリガー → オーケストレーター → アクティビティ → クライアントへレスポンス。
    • 各ステップで Pydantic を利用してデータの整合性を保証。

1. モデル定義

from pydantic import BaseModel
from typing import Optional

class MyInput(BaseModel):
    name: str
    age: int

class MyOutput(BaseModel):
    greeting: str
    comment: Optional[str] = None
  • MyInput: クライアントが送信するデータ形式を定義(例: {"name": "Alice", "age": 25})。
  • MyOutput: 処理結果のフォーマットを統一(例: {"greeting": "Hello, Alice!", "comment": "You are 25 years old."})。

2. Blueprintによるワークフロー定義

(1) HTTPトリガー

クライアントからJSONデータを受け取り、Pydanticでバリデーション後にオーケストレーターを起動します。

import azure.durable_functions as df
import azure.functions as func
from pydantic import ValidationError
from .function_app import MyInput

bp = df.Blueprint()

@bp.route(route="start_example", methods=["POST"])
@bp.durable_client_input(client_name="client")
async def start_example(req: func.HttpRequest, client):
    try:
        body = req.get_json()
        input_data = MyInput(**body)  # Pydanticで入力検証
    except (ValueError, ValidationError) as e:
        return func.HttpResponse(f"Invalid input: {str(e)}", status_code=400)

    instance_id = await client.start_new(
        orchestration_function_name="my_orchestrator",
        client_input=input_data.model_dump_json()  # JSON形式で渡す
    )

    return client.create_check_status_response(req, instance_id)

(2) オーケストレーター

オーケストレーターは、以下の手順を実行します:

  1. 入力データを取得し、Pydanticモデルで再検証。
  2. アクティビティを呼び出して結果を取得。
  3. 最終出力をPydanticモデルで整形し、JSON形式で返却。
@bp.orchestration_trigger(context_name="context")
def my_orchestrator(context: df.DurableOrchestrationContext):
    # 入力データを取得し、Pydanticで再検証
    input_data = MyInput(**context.get_input())

    # アクティビティを呼び出し
    activity_result = yield context.call_activity(
        "my_activity", input_data.model_dump_json()
    )

    # 結果を整形して返却
    result = MyOutput(**activity_result)
    return result.model_dump_json()

(3) アクティビティ

アクティビティでは、入力データを処理し、Pydanticモデルを用いて結果を整形します。

@bp.activity_trigger(input_name="data")
def my_activity(data: str) -> dict:
    from pydantic import ValidationError
    from .function_app import MyInput, MyOutput

    try:
        # 入力データをPydanticで検証
        input_data = MyInput.parse_raw(data)
    except ValidationError as e:
        raise ValueError(f"MyInput validation error: {e}")

    # 処理結果を作成
    greeting = f"Hello, {input_data.name}!"
    comment = f"You are {input_data.age} years old."
    result = MyOutput(greeting=greeting, comment=comment)

    return result.model_dump_json()

yuji⛩️yuji⛩️

Azure Functions で Key Vaultを参照する

キー コンテナー参照の形式は @Microsoft.KeyVault({referenceString}) です。ここで、{referenceString0} は次のいずれかの形式です。

yuji⛩️yuji⛩️

Cosmos DB for NoSQL のベクトル検索機能

Azure Cosmos DB for NoSQL のベクトル検索機能を使用して、ベクトルデータベースを作成し、ベクトルのインデックス作成とクエリを実行する方法について。

Python で Azure Cosmos DB for NoSQL 内のベクトルのインデックス作成とクエリを実行する。

Cosmos DB for NoSQL アカウントをセットアップしてデータベースを作成する

Azure portal で新しい Azure Cosmos DB for NoSQL アカウントを作成し、データ エクスプローラーを使用してデータベースとコンテナーを構成します。最後に、コンテナーにサンプルデータを追加し、基本的なクエリを実行します。

ベクトルデータベース機能をセットアップする方法

Python で Azure Cosmos DB for NoSQL 内のベクトルのインデックス作成とクエリを実行する

このスクラップは1ヶ月前にクローズされました
作成者以外のコメントは許可されていません