🔥

仕事で使うための Cloudflare Workers 入門 Day 3 - Queue/CronTriggers/Workflow

に公開

(速習のためのハンズオン資料です。口頭で捕捉します)

定期実行やワークフロー周りを解説します。そもそも何ができるかを認識するためことを目的として書いています。

セットアップ

$ npm create cloudflare
  • HellWorld
  • Queue
  • TypeScript

Cloudflare Queues

https://www.cloudflare.com/ja-jp/developer-platform/products/cloudflare-queues/

Cloudflare Queue には Consumer(消費する側) と Producer(生成する側があります) があります。

今回は同じ Worker 実装が同時に処理する設定です。

  "queues": {
    "consumers": [
      {
        "queue": "my-queue"
      }
    ],
    "producers": [
      {
        "binding": "MY_QUEUE",
        "queue": "my-queue"
      }
    ]
  },
export default {
  async fetch(req, env, ctx): Promise<Response> {
    await env.MY_QUEUE.send({
      url: req.url,
      method: req.method,
      headers: Object.fromEntries(req.headers),
    });
    return new Response("Sent message to the queue");
  },
  async queue(batch, env): Promise<void> {
    for (let message of batch.messages) {
      console.log(
        `message ${message.id} processed: ${JSON.stringify(message.body)}`
      );
    }
  },
} satisfies ExportedHandler<Env, Error>;

これは即座に queue() ハンドラが実行されるのではなく、デフォルトで最小 10 件、最速で 5 秒ごとにバッチされます。

https://developers.cloudflare.com/queues/configuration/batching-retries/

Cron Triggers

定期実行ジョブ

  "triggers": {
    "crons": ["*/2 * * * *"]
  }

同じ Worker 実装に、 Cron Trigger から queue を追加する実装を足します。

export default {
  async fetch(req, env, ctx): Promise<Response> {
    await env.MY_QUEUE.send({
      url: req.url,
      method: req.method,
      headers: Object.fromEntries(req.headers),
    });
    return new Response("Sent message to the queue");
  },
  async queue(batch, env): Promise<void> {
    for (let message of batch.messages) {
      console.log(
        `message ${message.id} processed: ${JSON.stringify(message.body)}`
      );
    }
  },
  // 追加
  async scheduled(
    controller: ScheduledController,
    env: Env,
    ctx: ExecutionContext
  ) {
    await env.MY_QUEUE.send({
      url: "https://example.com",
      method: "GET",
      headers: {
        "Content-Type": "application/json",
      },
    });
    console.log("cron processed");
  },
} satisfies ExportedHandler<Env, Error>;

Workflows

Cloudflare Workflows は Durable Object として状態を持つタスク実行の仕組みです。

ステップ実行や、ステップ毎のリトライ機構を持ちます。

https://developers.cloudflare.com/workflows/get-started/guide/

Cloudflare Workers は CPU やバックグラウンドジョブの実行制限がありますが Workflow だとタスクを分割しながら長時間実行できます。

(公式サンプルをみるに、どうやらベクトル検索のインデックスを想定しているように見えます)

ワークフロー定義は公式の実装をそのまま参考にします。

  "workflows": [
    {
      // name of your workflow
      "name": "workflows-starter",
      // binding name env.MYWORKFLOW
      "binding": "MY_WORKFLOW",
      // this is class that extends the Workflow class in src/index.ts
      "class_name": "MyWorkflow"
    }
  ]
import {
  WorkflowEntrypoint,
  WorkflowStep,
  WorkflowEvent,
} from "cloudflare:workers";

// User-defined params passed to your workflow
type Params = {
  email: string;
  metadata: Record<string, string>;
};

export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
  async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
    const files = await step.do("my first step", async () => {
      // Fetch a list of files from $SOME_SERVICE
      return {
        files: [
          "doc_7392_rev3.pdf",
          "report_x29_final.pdf",
          "memo_2024_05_12.pdf",
          "file_089_update.pdf",
          "proj_alpha_v2.pdf",
          "data_analysis_q2.pdf",
          "notes_meeting_52.pdf",
          "summary_fy24_draft.pdf",
        ],
      };
    });

    const apiResponse = await step.do("some other step", async () => {
      let resp = await fetch("https://api.cloudflare.com/client/v4/ips");
      return await resp.json<any>();
    });

    await step.sleep("wait on something", "1 minute");

    await step.do(
      "make a call to write that could maybe, just might, fail",
      // Define a retry strategy
      {
        retries: {
          limit: 5,
          delay: "5 second",
          backoff: "exponential",
        },
        timeout: "15 minutes",
      },
      async () => {
        // Do stuff here, with access to the state from our previous steps
        if (Math.random() > 0.5) {
          throw new Error("API call to $STORAGE_SYSTEM failed");
        }
      }
    );
  }
}

コード読んだ感じだと、PDF リソースを取得して、保存するのがステップになっていますね。

これを実行開始するための fetch handler を実装してみましょう。

export default {
  async fetch(req, env, ctx): Promise<Response> {
    if (req.url.endsWith("/workflow")) {
      // Spawn a new instance and return the ID and status
      let instance = await env.MY_WORKFLOW.create();
      return Response.json({
        id: instance.id,
        details: await instance.status(),
      });
    }
    const url = new URL(req.url);
    if (url.pathname.endsWith("/workflow/status")) {
      let id = url.searchParams.get("instanceId");
      if (id) {
        let instance = await env.MY_WORKFLOW.get(id);
        return Response.json({
          status: await instance.status(),
        });
      }
      return new Response("Missing instanceId", { status: 400 });
    }
    // ...
  },
  // ...
};
## ワークフロー開始
$ curl http://localhost:8787/workflow
{"id":"2abd7118-907c-4fab-b852-38af84c6796a","details":{"status":"running","__LOCAL_DEV_STEP_OUTPUTS":[],"output":null}}

## ステータス取得
$ curl http://localhost:8787/workflow/status?instanceId=2abd7118-907c-4fab-b852-38af84c6796a
{"status":{"status":"running","__LOCAL_DEV_STEP_OUTPUTS":[{"files":["doc_7392_rev3.pdf","report_x29_final.pdf","memo_2024_05_12.pdf","file_089_update.pdf","proj_alpha_v2.pdf","data_analysis_q2.pdf","notes_meeting_52.pdf","summary_fy24_draft.pdf"]},{"result":{"ipv4_cidrs":["173.245.48.0/20","103.21.244.0/22","103.22.200.0/22","103.31.4.0/22","141.101.64.0/18","108.162.192.0/18","190.93.240.0/20","188.114.96.0/20","197.234.240.0/22","198.41.128.0/17","162.158.0.0/15","104.16.0.0/13","104.24.0.0/14","172.64.0.0/13","131.0.72.0/22"],"ipv6_cidrs":["2400:cb00::/32","2606:4700::/32","2803:f800::/32","2405:b500::/32","2405:8100::/32","2a06:98c0::/29","2c0f:f248::/32"],"etag":"38f79d050aa027e3be3865e495dcc9bc"},"success":true,"errors":[],"messages":[]}],"output":null}}

# しばらく待ってから実行
$ curl -X GET "http://localhost:8787/workflow/status?instanceId=2abd7118-907c-4fab-b852-38af84c6796a"
{"status":{"status":"complete","__LOCAL_DEV_STEP_OUTPUTS":[{"files":["doc_7392_rev3.pdf","report_x29_final.pdf","memo_2024_05_12.pdf","file_089_update.pdf","proj_alpha_v2.pdf","data_analysis_q2.pdf","notes_meeting_52.pdf","summary_fy24_draft.pdf"]},{"result":{"ipv4_cidrs":["173.245.48.0/20","103.21.244.0/22","103.22.200.0/22","103.31.4.0/22","141.101.64.0/18","108.162.192.0/18","190.93.240.0/20","188.114.96.0/20","197.234.240.0/22","198.41.128.0/17","162.158.0.0/15","104.16.0.0/13","104.24.0.0/14","172.64.0.0/13","131.0.72.0/22"],"ipv6_cidrs":["2400:cb00::/32","2606:4700::/32","2803:f800::/32","2405:b500::/32","2405:8100::/32","2a06:98c0::/29","2c0f:f248::/32"],"etag":"38f79d050aa027e3be3865e495dcc9bc"},"success":true,"errors":[],"messages":[]},null]}}%

Discussion