🐙

Azure Durable Functions を Logic Apps や Azure Data Factory で利用する (1/2)

2024/01/22に公開
1

はじめに

今回は、Azure Durable Functions の非同期 HTTP API パターンを実装し、別のコンポーネントから利用する例を紹介したいと思います。Logic Apps の Http アクションは、基本的には、240秒の制限[1]があることから、重たい処理をする場合は、Durable Functions を使う必要があります。
前半で、Durable Functions の実装を紹介し、Logic Apps で利用する方法を示します。後半ではAzure Data Factory でマネージドID経由で利用する方法を示します。

Durable Functions とは、Azure Functions の機能の一部となり、下記がその要点となります。

  1. ステートフルなワークフロー: 通常のAzure Functionsはステートレスですが、Durable Functionsは状態を保持し、複数の関数呼び出し間でデータを維持できます。

  2. オーケストレーション機能: 複数の関数を一連のワークフローとしてコントロールし、管理します。

  3. 耐久性と再開能力: プロセスの状態が自動的に保存され、中断後に最後の状態から再開できます。

  4. 複雑なタスクの単純化: 長期間実行されるタスクや複雑なタスクを容易に扱えるようにします。

これにより、複数ステップのプロセスや、外部イベントに依存するワークフローを簡単に作成し、管理できます。

本記事の ソースコードは、Gitに登録しています。

https://github.com/yutaka-art/DurableFunctionsSample

アーキテクチャ構成は下記のような感じです。

Durable Functions の実装

Functions は、インプロセスモデル(dotnet)とし、AzureFunctionsVersion は v4 を利用します。
.NETの分離されたワーカープロセスモデル(dotnet-isolated)の場合、IActionResult が使えないので注意してください。

下記を参考に実装します。
https://learn.microsoft.com/ja-jp/azure/azure-functions/durable/durable-functions-overview?tabs=in-process%2Cnodejs-v3%2Cv1-model&pivots=csharp

ソリューション構成は下記のような感じです。

FunctionsStartup を実装します。

Startup.cs
Startup.cs
[assembly: FunctionsStartup(typeof(DurableFunction_Worker.Startup))]
namespace DurableFunction_Worker
{
    public class Startup : FunctionsStartup
    {
        public override void Configure(IFunctionsHostBuilder builder)
        {
            var context = builder.GetContext();
            var services = builder.Services;

            services.Configure<MySettings>(context.Configuration.GetSection("Function"));
            services.AddTransient<IDiceService, DiceService>();
        }

        public override void ConfigureAppConfiguration(IFunctionsConfigurationBuilder builder)
        {
            var context = builder.GetContext();

            builder.ConfigurationBuilder
                .AddJsonFile(Path.Combine(context.ApplicationRootPath, "appsettings.json"), optional: true, reloadOnChange: false)
                .AddJsonFile(Path.Combine(context.ApplicationRootPath, $"appsettings.{context.EnvironmentName}.json"), optional: true, reloadOnChange: false)
                .AddEnvironmentVariables();


            if (context.EnvironmentName != "Development")
            {
                var config = builder.ConfigurationBuilder.Build();
                builder.ConfigurationBuilder
                    .AddAzureKeyVault(new Uri(config["Function:KeyVaultUrl"]), new DefaultAzureCredential());
            }
        }
    }
}

エントリポイントは下記です。
Trigger と Orchestrator と Activity を構成します。

HttpFunction.cs
HttpFunction.cs
namespace DurableFunction_Worker
{
    public class HttpFunction
    {
        #region Variable・Const
        private readonly MySettings Settings;
        private readonly ILogger Logger;
        private readonly IDiceService DiceService;
        #endregion

        #region [EntryPoint]
        public HttpFunction(IOptions<MySettings> optionsAccessor, ILoggerFactory loggerFactory, IDiceService diceService)
        {
            this.Settings = optionsAccessor.Value;
            this.Logger = loggerFactory.CreateLogger<HttpFunction>();
            this.DiceService = diceService;
        }
        #endregion

        #region [Normal Functions]
        /// <summary>
        /// Execute trigger
        /// </summary>
        /// <param name="req"></param>
        /// <returns></returns>
        [FunctionName("ExecuteTrigger")]
        public async Task<IActionResult> RunExecuteTrigger(
            [HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req)
        {
            var requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            var target = JsonConvert.DeserializeObject<ReceiveModel>(requestBody);

            var returnModel = new ReturnModel();

            try
            {
                returnModel.ProceedTime = await this.DiceService.RollDiceUntilAsync(target.TargetValue);
                // 返却用モデル生成
                returnModel.IsSucceed = true;
            }
            catch (Exception ex)
            {
                // 返却用モデル生成
                returnModel.IsSucceed = false;
                returnModel.ProceedTime = 0;
                returnModel.Exception = ex.ToString();
            }

            return new OkObjectResult(JsonConvert.SerializeObject(returnModel, Formatting.Indented));
        }
        #endregion

        #region [Durable Functions]
        [FunctionName(nameof(ExecuteOrchestrator))]
        public async Task<ReturnModel> ExecuteOrchestrator(
            [OrchestrationTrigger] IDurableOrchestrationContext context)
        {
            var receivePram = context.GetInput<ReceiveModel>();
            var output = new ReturnModel();
            output = await context.CallActivityAsync<ReturnModel>(nameof(ExecuteActivity), receivePram);

            return output;
        }

        [FunctionName(nameof(ExecuteActivity))]
        public async Task<ReturnModel> ExecuteActivity([ActivityTrigger] ReceiveModel receivePram, ILogger logger)
        {
            var returnModel = new ReturnModel();

            try
            {
                returnModel.ProceedTime = await this.DiceService.RollDiceUntilAsync(receivePram.TargetValue);
                // 返却用モデル生成
                returnModel.IsSucceed = true;
            }
            catch (Exception ex)
            {
                // 返却用モデル生成
                returnModel.IsSucceed = false;
                returnModel.ProceedTime = 0;
                returnModel.Exception = ex.ToString();
            }

            return returnModel;
        }

        [FunctionName(nameof(DurableExecuteTrigger))]
        public async Task<IActionResult> DurableExecuteTrigger(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequest req,
            [DurableClient] IDurableOrchestrationClient starter,
            ILogger logger)
        {
            var requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            var target = JsonConvert.DeserializeObject<ReceiveModel>(requestBody);

            string instanceId = await starter.StartNewAsync(nameof(ExecuteOrchestrator), target);
            logger.LogInformation("Created new orchestration with instance ID = {instanceId}", instanceId);

            return starter.CreateCheckStatusResponse(req, instanceId);
        }
        #endregion
    }
}

Azure へデプロイした後、OAuth認証を設定します。

Logic Apps からの呼び出し

Azure Functions コネクタを利用する場合

同じテナント内の、Azure Functionsを呼び出す場合は、Logic Apps 内の Azure Functions コネクタを利用できます。

認証情報を指定します。

  • 認証の種類:Active Directory OAuth
  • 機関:https://login.windows.net
  • テナント:%テナントID%
  • 対象ユーザー:Entra IDに登録されている、アプリケーション ID の URI
  • クライアントID:Entra IDに登録されている、アプリケーションID
  • 資格情報の種類:シークレット
  • シークレット:Entra IDに登録されている、シークレットValue
    ※シークレットはAADの証明書とシークレットで設定した、シークレットの値→Functionの環境変数に設定されているもの

右側にある・・・より、設定を選択し、非同期パターンをOnにすることで、202 (Accepted) 応答で処理要求が受領されていることをリモート サーバーが示す場合、Logic Apps エンジンは、最終状態になるまで、応答の場所ヘッダーで指定された URL をポーリングし続けます。

応答を作成し、うまくCallできるか確認しておきましょう。

HTTP コネクタを利用する場合

別テナントの、Azure Functionsを呼び出す場合は、Logic Apps 内の HTTP コネクタを利用することになります。

アクセストークンを取得します。

  • URI:https://login.windows.net/%テナントID%/oauth2/v2.0/token
  • ヘッダー:content-type、application/x-www-form-urlencoded
  • 本文:client_id=%クライアントID%&client_secret=%クライアントシークレット%&grant_type=client_credentials&scope=%アプリケーション ID の URI%/.default

JSON の解析 コネクタを利用し、HTTPコネクタの結果を受け取ります。

コンテンツ:HTTPコネクタの本文
スキーマ:下記

json
{
    "properties": {
        "access_token": {
            "type": "string"
        },
        "expires_in": {
            "type": "integer"
        },
        "ext_expires_in": {
            "type": "integer"
        },
        "token_type": {
            "type": "string"
        }
    },
    "type": "object"
}

続いて、再度、HTTPコネクタを利用して、Azure Functions を指定します。

  • URI:https://%Azure Functions エンドポイント%.azurewebsites.net/api/DurableExecuteTrigger
  • ヘッダー:Authorization、Bearer @{body('JSON_の解析:アクセストークンの受け取り')?['access_token']}
  • %Azure Functions へのパラメータ%

同様に、右側にある・・・より、設定を選択し、非同期パターンをOnにすることで、202 (Accepted) 応答で処理要求が受領されていることをリモート サーバーが示す場合、Logic Apps エンジンは、最終状態になるまで、応答の場所ヘッダーで指定された URL をポーリングし続けます。

応答を作成し、うまくCallできるか確認しておきましょう。

まとめ

本記事では、Azure Durable Functions の非同期 HTTP API パターンの実装方法と、Logic Apps からの利用方法について説明しました。重要なポイントは以下の通りです。

Durable Functions の概要と特徴

  • ステートフルなワークフローをサポートし、複数の関数呼び出し間でのデータの維持が可能。
    ・オーケストレーション機能により、複数の関数を一連のワークフローとして管理。
    ・耐久性と再開能力を有し、プロセスの状態が自動保存され、中断後も最後の状態から再開可能。
    ・長期間実行されるタスクや複雑なタスクの単純化。

  • Azure Functions のデプロイとセットアップ
    ・.NETの分離されたワーカープロセスモデルとインプロセスモデルの選択。
    ・ソリューションの構成、FunctionsStartup の実装、エントリポイントの設定。

  • Logic Apps からの呼び出し方法
    ・Azure Functions コネクタとHTTPコネクタの使用方法。
    ・認証情報の設定と非同期パターンの活用。

  • Azure Data Factory での利用
    後編の記事で詳細に説明します。

この記事を通じて、Durable Functions を使った複雑なワークフローの実装と、他のAzureサービスとの統合方法について理解を深めることができます。技術的な詳細とソースコードは、GitHubリポジトリ Git で公開しています。

今後もAzure Durable Functions を活用して、より効率的で堅牢なクラウドベースのソリューションを開発していくための知見を共有していきたいと思います。

References

https://learn.microsoft.com/ja-jp/azure/azure-functions/durable/durable-functions-overview?tabs=in-process%2Cnodejs-v3%2Cv1-model&pivots=csharp

https://learn.microsoft.com/ja-jp/azure/logic-apps/logic-apps-limits-and-config?tabs=consumption

https://learn.microsoft.com/ja-jp/azure/azure-functions/durable/durable-functions-billing

脚注
  1. https://learn.microsoft.com/ja-jp/azure/logic-apps/logic-apps-limits-and-config?tabs=consumption#timeout-duration ↩︎

GitHubで編集を提案

Discussion

yutakaosadayutakaosada
  • HTTPによるアクセストークン取得部のパラメータ値の一部が誤っていたため訂正しました