🙆

Azure Durable Functions を Logic Apps や Azure Data Factory で利用する (2/2)

2024/03/03に公開

はじめに

今回は、Azure Durable Functions の非同期 HTTP API パターンを実装し、別のコンポーネントから利用する例を紹介したいと思います。Logic Apps の Http アクションは、基本的には、240秒の制限[1]があることから、重たい処理をする場合は、Durable Functions を使う必要があります。
前半で、Durable Functions の実装を紹介し、Logic Apps で利用する方法を示します。後半ではAzure Data Factory でマネージドID経由で利用する方法を示します。

今回は後半編となります

Durable Functions とは、Azure Functions の機能の一部となり、下記がその要点となります。

  1. ステートフルなワークフロー: 通常のAzure Functionsはステートレスですが、Durable Functionsは状態を保持し、複数の関数呼び出し間でデータを維持できます。

  2. オーケストレーション機能: 複数の関数を一連のワークフローとしてコントロールし、管理します。

  3. 耐久性と再開能力: プロセスの状態が自動的に保存され、中断後に最後の状態から再開できます。

  4. 複雑なタスクの単純化: 長期間実行されるタスクや複雑なタスクを容易に扱えるようにします。

これにより、複数ステップのプロセスや、外部イベントに依存するワークフローを簡単に作成し、管理できます。

本記事の ソースコードは、Gitに登録しています。

https://github.com/yutaka-art/DurableFunctionsSample

アーキテクチャ構成は下記のような感じです。

Durable Functions の実装

Functions は、インプロセスモデル(dotnet)とし、AzureFunctionsVersion は v4 を利用します。
.NETの分離されたワーカープロセスモデル(dotnet-isolated)の場合、IActionResult が使えないので注意してください。

下記を参考に実装します。
https://learn.microsoft.com/ja-jp/azure/azure-functions/durable/durable-functions-overview?tabs=in-process%2Cnodejs-v3%2Cv1-model&pivots=csharp

ソリューション構成は下記のような感じです。

FunctionsStartup を実装します。

Startup.cs
Startup.cs
[assembly: FunctionsStartup(typeof(DurableFunction_Worker.Startup))]
namespace DurableFunction_Worker
{
    public class Startup : FunctionsStartup
    {
        public override void Configure(IFunctionsHostBuilder builder)
        {
            var context = builder.GetContext();
            var services = builder.Services;

            services.Configure<MySettings>(context.Configuration.GetSection("Function"));
            services.AddTransient<IDiceService, DiceService>();
        }

        public override void ConfigureAppConfiguration(IFunctionsConfigurationBuilder builder)
        {
            var context = builder.GetContext();

            builder.ConfigurationBuilder
                .AddJsonFile(Path.Combine(context.ApplicationRootPath, "appsettings.json"), optional: true, reloadOnChange: false)
                .AddJsonFile(Path.Combine(context.ApplicationRootPath, $"appsettings.{context.EnvironmentName}.json"), optional: true, reloadOnChange: false)
                .AddEnvironmentVariables();


            if (context.EnvironmentName != "Development")
            {
                var config = builder.ConfigurationBuilder.Build();
                builder.ConfigurationBuilder
                    .AddAzureKeyVault(new Uri(config["Function:KeyVaultUrl"]), new DefaultAzureCredential());
            }
        }
    }
}

エントリポイントは下記です。
Trigger と Orchestrator と Activity を構成します。

HttpFunction.cs
HttpFunction.cs
namespace DurableFunction_Worker
{
    public class HttpFunction
    {
        #region Variable・Const
        private readonly MySettings Settings;
        private readonly ILogger Logger;
        private readonly IDiceService DiceService;
        #endregion

        #region [EntryPoint]
        public HttpFunction(IOptions<MySettings> optionsAccessor, ILoggerFactory loggerFactory, IDiceService diceService)
        {
            this.Settings = optionsAccessor.Value;
            this.Logger = loggerFactory.CreateLogger<HttpFunction>();
            this.DiceService = diceService;
        }
        #endregion

        #region [Normal Functions]
        /// <summary>
        /// Execute trigger
        /// </summary>
        /// <param name="req"></param>
        /// <returns></returns>
        [FunctionName("ExecuteTrigger")]
        public async Task<IActionResult> RunExecuteTrigger(
            [HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req)
        {
            var requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            var target = JsonConvert.DeserializeObject<ReceiveModel>(requestBody);

            var returnModel = new ReturnModel();

            try
            {
                returnModel.ProceedTime = await this.DiceService.RollDiceUntilAsync(target.TargetValue);
                // 返却用モデル生成
                returnModel.IsSucceed = true;
            }
            catch (Exception ex)
            {
                // 返却用モデル生成
                returnModel.IsSucceed = false;
                returnModel.ProceedTime = 0;
                returnModel.Exception = ex.ToString();
            }

            return new OkObjectResult(JsonConvert.SerializeObject(returnModel, Formatting.Indented));
        }
        #endregion

        #region [Durable Functions]
        [FunctionName(nameof(ExecuteOrchestrator))]
        public async Task<ReturnModel> ExecuteOrchestrator(
            [OrchestrationTrigger] IDurableOrchestrationContext context)
        {
            var receivePram = context.GetInput<ReceiveModel>();
            var output = new ReturnModel();
            output = await context.CallActivityAsync<ReturnModel>(nameof(ExecuteActivity), receivePram);

            return output;
        }

        [FunctionName(nameof(ExecuteActivity))]
        public async Task<ReturnModel> ExecuteActivity([ActivityTrigger] ReceiveModel receivePram, ILogger logger)
        {
            var returnModel = new ReturnModel();

            try
            {
                returnModel.ProceedTime = await this.DiceService.RollDiceUntilAsync(receivePram.TargetValue);
                // 返却用モデル生成
                returnModel.IsSucceed = true;
            }
            catch (Exception ex)
            {
                // 返却用モデル生成
                returnModel.IsSucceed = false;
                returnModel.ProceedTime = 0;
                returnModel.Exception = ex.ToString();
            }

            return returnModel;
        }

        [FunctionName(nameof(DurableExecuteTrigger))]
        public async Task<IActionResult> DurableExecuteTrigger(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequest req,
            [DurableClient] IDurableOrchestrationClient starter,
            ILogger logger)
        {
            var requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            var target = JsonConvert.DeserializeObject<ReceiveModel>(requestBody);

            string instanceId = await starter.StartNewAsync(nameof(ExecuteOrchestrator), target);
            logger.LogInformation("Created new orchestration with instance ID = {instanceId}", instanceId);

            return starter.CreateCheckStatusResponse(req, instanceId);
        }
        #endregion
    }
}

Azure へデプロイした後、OAuth認証を設定します。

Data Factory からの呼び出し

リンクサービスの構築

Functionsを呼び出すためにリンクサービスを構成します。

左ペインのカバンアイコン>リンクサービス>+新規>コンピューティング>Azure>Azure 関数をクリック

少しややこしいですが、下記のように設定をします。

  • 統合ランタイム経由で接続:AutoResolveIntegrationRuntime ※デフォルト
  • Azure 関数アプリの選択方法:Enter manually
  • 関数アプリURL:Azure FunctionsのURL
  • 認証方法:システムマネージドID
  • リソースID:OAuth認証を設定した際に生成されたEntraIDのアプリケーションIDのURL
  • ファンクションキー:Azure Functionsのアプリキー

PipelineによるFunctionsの呼び出し

Logic Appsのときと同様にポーリング処理を間に挟みます。
Pipelineを新しく作成し、Azure Functionsのコネクタを指定します。

全般タブ

  • 名前:任意の名前をつけます。
  • アクティビティの状態:アクティブ化
  • タイムアウト:デフォルト値
  • 再試行:0
  • 再試行のサイクル間隔(秒):30
  • セキュリティで保護された出力:Off
  • セキュリティで保護された入力:Off

設定タブ

  • Azure関数のリンクサービス:先ほど構築したリンクサービスを指定します。
  • 関数名:Azure Functionsの関数名を指定します。
  • メソッド:HTTPメソッドを指定します。
  • ヘッダー:呼び出す際のヘッダを指定します。
  • 本文:呼び出す際のBodyを指定します。

続いて、ポーリングする機構を設定します。
ループ処理のコンポーネントを入れて、その中に待機処理を5秒間隔に設定します。

HTTPコネクタによる、Durable FunctionsのURIを指定します。

設定タブ

  • URL:Durable FunctionsのポーリングURLを指定します。
  • メソッド:GET
  • 認証:OAuthを利用するので、システムマネージドIDを指定します。
  • リソース:Azure FunctionsのURL
  • 統合ランタイム:デフォルト

完成形は下記のJsonのようになります。

.json
{
    "name": "pipeline_sample02",
    "properties": {
        "activities": [
            {
                "name": "DurableExecuteTrigger",
                "type": "AzureFunctionActivity",
                "dependsOn": [],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "functionName": "DurableExecuteTrigger",
                    "body": {
                        "value": "{\r\n    \"TargetValue\" : \"@{pipeline().parameters.TargetValue}\"\r\n}",
                        "type": "Expression"
                    },
                    "method": "POST"
                },
                "linkedServiceName": {
                    "referenceName": "AzureFunction2",
                    "type": "LinkedServiceReference"
                }
            },
            {
                "name": "DurableExecuteTrigger_Until",
                "type": "Until",
                "dependsOn": [
                    {
                        "activity": "DurableExecuteTrigger",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "expression": {
                        "value": "@not(or(equals(activity('CheckStatus').output.runtimeStatus, 'Pending'), equals(activity('CheckStatus').output.runtimeStatus, 'Running')))",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "Wait5Second",
                            "type": "Wait",
                            "dependsOn": [],
                            "userProperties": [],
                            "typeProperties": {
                                "waitTimeInSeconds": 5
                            }
                        },
                        {
                            "name": "CheckStatus",
                            "type": "WebActivity",
                            "dependsOn": [
                                {
                                    "activity": "Wait5Second",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "timeout": "0.12:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "method": "GET",
                                "url": {
                                    "value": "@activity('DurableExecuteTrigger').output.statusQueryGetUri",
                                    "type": "Expression"
                                },
                                "authentication": {
                                    "type": "MSI",
                                    "resource": "api://f90eeec8-1d7d-484f-9fc9-07f53b576f1c"
                                }
                            }
                        }
                    ],
                    "timeout": "0.12:00:00"
                }
            }
        ],
        "parameters": {
            "TargetValue": {
                "type": "int",
                "defaultValue": 25
            }
        },
        "annotations": [],
        "lastPublishTime": "2024-01-09T13:37:28Z"
    },
    "type": "Microsoft.DataFactory/factories/pipelines"
}

今すぐトリガーを作成し、うまくCallできるか確認しておきましょう。

モニターから確認します。

まとめ

本記事では、Azure Durable Functionsの非同期HTTP APIパターンを実装し、Azure Data FactoryやLogic Appsからの利用方法を詳述しました。Durable Functionsを活用することで、ステートフルなワークフローの管理や、耐久性のある長期的な処理を効率的に行うことが可能となります。特に、複数のステップを含む複雑なプロセスや、外部イベントに依存するワークフローの構築において、その利点を最大限に発揮します。

後半では、Azure Data Factoryからの呼び出しについて、マネージドIDを活用したリンクサービスの設定や、パイプライン内でのポーリング処理の実装を行いました。これにより、長時間実行される処理を効率的に管理し、監視する手法を学びました。

Durable Functionsの強力なオーケストレーション機能を活用し、Azureの他のサービスとの連携をシームレスに行うことで、クラウド環境における堅牢でスケーラブルなアプリケーションの構築が可能となります。次回は、Data Factoryでのリンクサービスやパイプラインの構成を効率化するためのツールについて紹介しますので、ぜひご期待ください。

References

https://learn.microsoft.com/ja-jp/azure/azure-functions/durable/durable-functions-overview?tabs=in-process%2Cnodejs-v3%2Cv1-model&pivots=csharp

https://learn.microsoft.com/ja-jp/azure/data-factory/concepts-pipelines-activities?tabs=data-factory

https://learn.microsoft.com/ja-jp/azure/azure-functions/durable/durable-functions-billing

脚注
  1. https://learn.microsoft.com/ja-jp/azure/logic-apps/logic-apps-limits-and-config?tabs=consumption#timeout-duration ↩︎

GitHubで編集を提案

Discussion