Zenn

Fastly Compute HTTP リクエストの作成 (3) 並列/ストリーミング処理

2023/12/16に公開

この記事は Fastly Compute (旧 Compute@Edge) 一人アドベントカレンダー 15 日目の記事です。

Fastly Compute で最も多く利用することになるであろう HTTP リクエストの作成について紹介するシリーズ(Fastly Compute HTTP リクエストの作成)の第三回目です。エッジでコードを書く時の背景やモチベーションとして、複数のバックエンドからのレスポンスを集約したいから HTTP リクエストを並列で送りたいとか、レイテンシやメモリ管理の観点でストリーミング処理を実装したい時などがそれなりの頻度で出てきます。そうした並列処理とストリーミング処理について、実は公式 Web サイトの方だと実装例が掲載されてたりもするのですが、それらの記載も分散してしまっていたりして、まとまった形で紹介されることが少なくとっつきづらくなりがちなように思われたので、本稿で少し取り扱ってみます。

並列処理の実装方法

Go と JavaScript の実装は非常にシンプルで、それぞれ Goroutine と Promise を使うことで実現可能です。(引用元の公式サンプル)

main.go
func main() {
  fsthttp.ServeFunc(func(ctx context.Context, w fsthttp.ResponseWriter, r *fsthttp.Request) {
    fmt.Println("Starting")
    begin := time.Now()

    var wg sync.WaitGroup
    for _, url := range []string{
      "https://http-me.glitch.me/dog?wait=3000",
      "https://http-me.glitch.me/cat?wait=500",
    } {
      wg.Add(1)
      go func(url string) {
        defer wg.Done()
        fmt.Println("Starting", url)
        req, _ := fsthttp.NewRequest(fsthttp.MethodGet, url, nil)
        req.CacheOptions.Pass = true
        resp, _ := req.Send(ctx, "origin_0")
        _, _ = io.Copy(io.Discard, resp.Body)
        fmt.Println("finished", url)
      }(url)
    }
    wg.Wait()
    fmt.Println("finished after ", time.Since(begin))
  })
}
index.js
addEventListener("fetch", (event) => event.respondWith(handleRequest(event)));

async function handleRequest(_event) {
  const pending_reqs = [
    fetch("https://http-me.glitch.me/dog?wait=3000", { backend: "origin_0" }),
    fetch("https://http-me.glitch.me/dog?cat=500", { backend: "origin_0" }),
  ];
  const logs = [];
  const responses = [];
  await Promise.all(
    pending_reqs.map(async (req, idx) => {
      logs.push('REQ' + idx);
      const r = await req;
      const t = await r.text();
      logs.push('RESP' + idx);
      responses.push(t);
    })
  );
  console.log(logs.join(' '));
  return new Response(`${responses[0]} responded first, ${responses[1]} responded next`, {
    status: 200,
  });
}

Rust の場合、SDK から並列処理を実装するために専用の API (send_async(), send_async_streaming())が提供されておりこちらを利用するのが便利です。[1]実装例は以下の通りです。

use fastly::{Error, Request, Response};

#[fastly::main]
fn main(_req: Request) -> Result<Response, Error> {
    let req1 = Request::get("https://http-me.glitch.me/dog?wait=3000").send_async("origin_0")?;
    let req2 = Request::get("https://http-me.glitch.me/cat?wait=500").send_async("origin_0")?;
    let pending_reqs = vec![req1, req2];

    let (resp, pending_reqs) = fastly::http::request::select(pending_reqs);
    let mut resp1 = resp?;
    let (resp, _pending_reqs) = fastly::http::request::select(pending_reqs);
    let mut resp2 = resp?;

    let resp = Response::from_body(format!(
        "{} responded first, {} responded next",
        resp1.take_body_str(),
        resp2.take_body_str()
    ));
    Ok(resp)
}

ストリーミング処理の実装方法

編集と文字数の都合でストリーミング処理については早送りして、サンプルコードへのリンクを簡単に紹介することに留めます(ToDo: 後日時間が出来たら詳しく書く)

  • Rust で使えるサンプル
    • fastly::http::body::Body::read_chunks(doc) を使って chunk 毎にレスポンスを読み込みストリーム処理 (source)
  • Go で使えるサンプル
  • JavaScript で使えるサンプル
    • [TransformStream](https://js-compute-reference-docs.edgecompute.app/docs/globals/TransformStream/) コンストラクタを拡張した独自定義の ReplacerStream と、それを引数に受け取る [[ReadableSream.pipeThrough()](https://js-compute-reference-docs.edgecompute.app/docs/globals/ReadableStream/prototype/pipeThrough)] によってストリーム処理を実装 (例1, 例2)

まとめ

本稿では Compute を使いこなす中できっと出てくるであろう並列/ストリーミング処理について実装例をいくつか紹介しました。明日からは Cache API を使った揮発性ストレージへの読み書きについて概要を紹介していきたいと思います。

脚注
  1. 余談ですが、これらの API は Fastly Hostcall のインターフェースをそのまま利用したものとなっています。Rust SDK ではこのように低レベルの操作が可能なことが特色と言えます。 ↩︎

Discussion

ログインするとコメントできます