🎏

Web Stream APIを使ってサーバーのメモリより大きなデータを扱う

に公開

はじめに

Web Stream APIを試してみました!

それまであまり技術イベントに参加したことはなかったのですが、2024年10月のNihonbashi.jsに参加し、詳しい方のお話を聞いて興味を持ったのがきっかけです

前提

Web Stream APIとは?

こちらのスライドの解説が非常にわかりやすいです
https://speakerdeck.com/tasshi/tskaigi-2025-web-streams-api

MDNでは下記のように紹介されています

ストリーム API を使用すると、 JavaScript がネットワーク経由で受信したデータのストリームにプログラムでアクセスし、開発者の希望どおりに処理できます。

https://developer.mozilla.org/ja/docs/Web/API/Streams_API

こちらの記事はどんなコードを書けばよいのか簡潔に紹介しています
https://developer.chrome.com/docs/capabilities/web-apis/fetch-streaming-requests?hl=ja

紹介すること

下記の検証の過程でやってみたことやハマったことを紹介します

  • RDBにある10万レコード分のデータをCSVダウンロードする
  • CSVにある10万レコード分のデータをアップロードし、RDBに書き込む

紹介しないこと

  • Web Stream APIの適切な使い方
  • CSVアップロード、ダウンロードの適切な設計

技術スタック

TypeScriptに慣れているので、React, NestJS(express, prisma)、MySQLを使って検証しました
検証で使ったリポジトリはこちら
https://github.com/tatsuya-asami/web-stream-api-trial/tree/main

わかったこと

  • nodeのStreamとWeb Stream APIは別のAPI
  • nodeでもWeb Stream APIは使える
    • node:stream/webのimportをしなくても型エラーにならないことがあるので注意
  • HTTP/1.xではWeb Stream APIを使ったアップロードは出来なかった
    • HTTP/2またはHTTP/3を使えばできるが、expressで素直にHTTP/2やHTTP/3を使うのは難しかったので、リバースプロキシを使った
  • 2025年6月時点では、アップロードで使用するfetch APIのduplexパラメータはPCのChrome, Edge, Operaでしか使えない
    • モバイルやSafari, Firefoxでは使えない
  • ブラウザのタブが非アクティブな場合では処理が停止することがあった
    • 停止しないこともあり、再現もしなくなったので、発生条件は不明

本文

ローカル環境でWeb Stream APIを試すための準備

ちょっと大きいデータを扱うとすぐ落ちるDockerコンテナを作成

まず大量のデータをサーバー上で展開出来ないことを確認するために、貧弱なサーバー環境を作ります
今回はDocker Composeでmem_limitを70mにするとちょうどよかったです
少量のデータであれば動きますが、大量のデータをまとめて扱おうとするとコンテナがシャットダウンします

docker-compose.ymlとDockerfile

docker-compose.yml

version: '3.8'

services:
  db:
    image: mysql:8.4.3
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_USER: user
      MYSQL_PASSWORD: password
      MYSQL_DATABASE: webstream
    ports:
      - '3307:3306'
    volumes:
      - mysql_data_volume:/var/lib/mysql
    networks:
      - app_network
    restart: unless-stopped

  app:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: nestjs_app_service

    mem_limit: 70m # ちょっと大きいデータを扱うと落ちるように調整

    environment:
      - NODE_ENV=production
      - DATABASE_URL=mysql://user:password@db:3306/webstream
      - PORT=3000
      - ./.env
    depends_on:
      - db
    restart: unless-stopped
    networks:
      - app_network

  nginx:
    image: nginx:1.27-alpine
    container_name: nginx_reverse_proxy
    restart: unless-stopped
    ports:
      - '80:80'
      - '443:443'
    volumes:
      - ./nginx/default.conf:/etc/nginx/conf.d/default.conf:ro
      - ./ssl:/etc/nginx/ssl:ro
      - ./nginx/logs:/var/log/nginx
    depends_on:
      - app
    networks:
      - app_network

networks:
  app_network:
    driver: bridge

volumes:
  mysql_data_volume:

Dockerfile

FROM node:22-alpine

WORKDIR /app

RUN npm install -g pnpm

COPY package.json pnpm-lock.yaml ./
RUN pnpm install --frozen-lockfile

COPY . .
RUN pnpm dlx prisma generate
RUN pnpm run build

EXPOSE 3000
CMD ["pnpm", "run", "start:prod"]

nginxでリバースプロキシ

HTTP/1.xではWeb Stream APIをリクエストにのせると接続が拒否されてしまいます
expressでHTTP/2にすることを試みましたが、上手く動かせませんでした(適切なやり方が出来ていなかった可能性はあります)
ブラウザとリバースプロキシの間をHTTP/2で繋ぎ、リバースプロキシとサーバーの間をHTTP/1.1で繋ぐことで動かすことが出来ました

ちなみにfastifyではexperimentalでHTTP2をサポオートしているようです

  • node組み込みのhttp2をexpressで使ってみたが、上手く動かず
  • node-spdyを使ったところ動きましたが、1回目のリクエストが必ず失敗するなど不安定な挙動があった

expressのissueを見るとまだ頑張っている最中なようでしたので、リバースプロキシを使って検証環境を作りました

nginx/default.conf
server {
    listen 80;
    listen [::]:80;
    server_name localhost;

    location / {
        return 301 https://$host$request_uri;
    }
}

server {
    listen 443 ssl http2;
    listen [::]:443 ssl http2;
    server_name localhost;

    ssl_certificate /etc/nginx/ssl/cert.pem;
    ssl_certificate_key /etc/nginx/ssl/key.pem;

	client_max_body_size 1G;
    client_header_timeout 60s;
    client_body_timeout   1800s;
    send_timeout          1800s;

    ssl_protocols TLSv1.2 TLSv1.3;
    ssl_prefer_server_ciphers off;
    ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384;

    access_log /var/log/nginx/access.log;
    error_log /var/log/nginx/error.log;

    location / {
        proxy_pass http://app:3000;

        proxy_http_version 1.1;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;

        proxy_buffering off;
        proxy_request_buffering off;

        proxy_connect_timeout 75s;
        proxy_send_timeout   1800s;
        proxy_read_timeout   1800s;
    }
}

HTTPS通信をするための証明書発行はmkcertを使うと簡単でした

これで小さいデータを扱うことはできますが、大きいデータを扱うとダウンするサーバーができました

検証用データ

UserUserProfileというリレーションシップを持ったテーブルを用意し、10万件のシードデータを投入します。ORMにはPrismaを使っています。

schema.prisma
// This is your Prisma schema file,
// learn more about it in the docs: https://pris.ly/d/prisma-schema

// Looking for ways to speed up your queries, or scale easily with your serverless or edge functions?
// Try Prisma Accelerate: https://pris.ly/cli/accelerate-init

generator client {
  provider = "prisma-client-js"
  output   = "../generated/prisma"
}

datasource db {
  provider = "mysql"
  url      = env("DATABASE_URL")
}

model User {
  id          String           @id @default(uuid(7))
  name        String
  email       String
  password    String
  createdAt   DateTime      @default(now())
  updatedAt   DateTime      @updatedAt
  UserProfile UserProfile[]
}

model UserProfile {
  id          Int      @id @default(autoincrement())
  userId      String
  age         Int
  phoneNumber String
  address     String
  city        String
  country     String
  zipCode     String
  occupation  String
  department  String
  isActive    Boolean  @default(true)
  lastLogin   DateTime
  bio         String   @db.Text
  loginCount  Int      @default(0)
  createdAt   DateTime @default(now())
  updatedAt   DateTime @updatedAt

  user User @relation(fields: [userId], references: [id], onDelete: Cascade)
}

seed/index.ts

import { PrismaClient, Prisma } from '../generated/prisma/client';
import { faker } from '@faker-js/faker/locale/ja';

const prisma = new PrismaClient();

const USER_COUNT = 100000;
const BATCH_SIZE = 1000;

async function main() {
console.log(Starting to seed ${USER_COUNT} users...);
faker.seed(1234);

await prisma.userProfile.deleteMany({});
await prisma.user.deleteMany({});

for (let i = 0; i < USER_COUNT; i += BATCH_SIZE) {
const userBatch: Prisma.UserCreateManyInput[] = [];
const batchSize = Math.min(BATCH_SIZE, USER_COUNT - i);

console.log(
  `Creating batch ${i / BATCH_SIZE + 1} of ${Math.ceil(USER_COUNT / BATCH_SIZE)} users`,
);

for (let j = 0; j < batchSize; j++) {
  const user: Prisma.UserCreateManyInput = {
    name: faker.person.fullName(),
    email: faker.internet.email(),
    password: faker.internet.password({ length: 10 }),
  };

  userBatch.push(user);
}

await prisma.user.createMany({
  data: userBatch,
});

const createdUsers = await prisma.user.findMany({
  skip: i,
  take: batchSize,
  orderBy: { id: 'asc' },
});

console.log(
  `Creating batch ${i / BATCH_SIZE + 1} of ${Math.ceil(USER_COUNT / BATCH_SIZE)} profiles`,
);

// Create profiles for these users
const profileBatch: Prisma.UserProfileCreateManyInput[] = [];

createdUsers.forEach((user) => {
  const profile: Prisma.UserProfileCreateManyInput = {
    userId: user.id,
    age: faker.number.int({ min: 18, max: 80 }),
    phoneNumber: faker.phone.number(),
    address: faker.location.streetAddress(),
    city: faker.location.city(),
    country: faker.location.country(),
    zipCode: faker.location.zipCode(),
    occupation: faker.person.jobTitle(),
    department: faker.commerce.department(),
    isActive: faker.datatype.boolean(),
    lastLogin: faker.date.past(),
    bio: faker.lorem.paragraphs(2),
    loginCount: faker.number.int({ min: 0, max: 100 }),
  };

  profileBatch.push(profile);
});

// Create profiles in batch
await prisma.userProfile.createMany({
  data: profileBatch,
});

}

const userCount = await prisma.user.count();
const profileCount = await prisma.userProfile.count();

console.log(Seeding complete!);
console.log(Created ${userCount} users);
console.log(Created ${profileCount} user profiles);
}

main()
.then(() => {
console.log('Seeding completed successfully!');
return prisma.$disconnect();
})
.catch(async (e) => {
console.error('Error during seeding:', e);
await prisma.$disconnect();
process.exit(1);
});

RDBにあるデータをCSVダウンロードする

まずはRDBにある10万レコード分のデータをCSVダウンロードしてみます

サーバーサイドの処理概要

RDBに10万レコード存在する場合、全てのデータをnodeサーバー上に展開するとダウンしますが、200レコードであればnodeサーバーに展開可能です

200レコードを1チャンクとしてCSVダウンロードをしてみます

各処理を少し解説します

レスポンスはNestJSのStreamableFileにした

node stream APIにはWeb Stream APIを変換するためのtoWeb, fromWebメソッドがあります
node22ではexperimentalですが、node24ではstableになっているようなので、使ってみました
NestJSのにはStreamableFileが用意されており、簡単に実装できたため、返却前にnodeのstreamに変換することにしました

// app.controller.ts
import { Controller, Get, StreamableFile, Header } from '@nestjs/common';
import { Readable } from 'node:stream';

 @Get('/csv/from/db')
  @Header('Content-Type', 'text/csv; charset=utf-8')
  @Header('Content-Disposition', 'attachment; filename="users_from_db.csv"')
  getCsvFromDb(): StreamableFile {
    const webReadableStream = this.appService.findAllUsersToCsv();

    const nodeReadableStream = Readable.fromWeb(webReadableStream);

    return new StreamableFile(nodeReadableStream);
  }

// app.service.ts
findAllUsersToCsv() {
  const readable: ReadableStream<UserWithProfile[]> =
    this.findUserService.findAllUsersObjectStream();
  return readable.pipeThrough(new UsersToCsvStringifyTransformer());
}

ReadableStreamを使い、200件ずつRDBからデータを取得する

ReadableStreamのpullメソッドを使います
全レコードから200件ずつセレクトし、全てのレコードを処理すると処理を終了します

startメソッドで再帰的な処理を実行するサンプルがあり、どちらを使うのが望ましいのか紛らわしかったですが、

ストリームの内部にあるチャンクのキューがいっぱいになっていない場合、最高水準点に達するまで繰り返し呼び出されます。 pull() がプロミスを返す場合、そのプロミスが満たされるまで再び呼び出されません

とあるので、pullを使用するのは適切なようです

node:stream/webをimportしなくても型エラーは出ませんが、明示的にimportが必要でした
tsconfigなどの設定次第かと思いますが、しばらくハマったので気をつけましょう

findUserStream.service.ts
import { Injectable } from '@nestjs/common';
import { PrismaService } from 'src/prisma.service';
import {
  ReadableStream,
  ReadableStreamDefaultController,
} from 'node:stream/web';
import { Prisma } from 'generated/prisma/client';

export type UserWithProfile = Prisma.UserGetPayload<{
  include: { UserProfile: true };
}>;

@Injectable()
export class FindUserStreamService {
  constructor(private prisma: PrismaService) {}

  findAllUsersObjectStream(): ReadableStream<UserWithProfile[]> {
    const batchSize = 200;
    let cursorId: string | undefined = undefined;
    let isFetching: boolean = false;
    let streamCancelled: boolean = false;

    const prismaService = this.prisma;

    return new ReadableStream<UserWithProfile[]>({
      async pull(
        controller: ReadableStreamDefaultController<UserWithProfile[]>,
      ) {
        if (isFetching || streamCancelled) {
          return;
        }
        isFetching = true;

        try {
          const usersWithProfilesBatch = await prismaService.user.findMany({
            take: batchSize,
            skip: cursorId ? 1 : undefined,
            cursor: cursorId ? { id: cursorId } : undefined,
            orderBy: { id: 'asc' },
            include: { UserProfile: true },
          });

          if (streamCancelled) {
            isFetching = false;

            return;
          }

          if (usersWithProfilesBatch.length === 0) {
            controller.close(); // データがなければストリームを正常終了
            isFetching = false;
            return;
          }

          controller.enqueue(usersWithProfilesBatch);

          cursorId =
            usersWithProfilesBatch[usersWithProfilesBatch.length - 1].id;

          if (usersWithProfilesBatch.length < batchSize) {
            // これが最後のバッチなので、ストリームを正常終了
            controller.close();
          }
        } catch (error) {
          console.error(
            'UserService: Error fetching users in ReadableStream pull:',
            error,
          );
          controller.error(error);
          streamCancelled = true;
        } finally {
          isFetching = false;
        }
      },
    });
  }
}

UsersToCsvでCSVに変換する

node-csvというライブラリを使用して、CSVに変換します
startメソッドでnode-csvのstringifierのイベントを登録し、transformメソッドで変換を行っています

UsersToCsv.ts
import {
  TransformStream,
  TransformStreamDefaultController,
} from 'node:stream/web';
import { stringify } from 'csv';
import { UserWithProfile } from 'src/findUserStream.service';

export class UsersToCsvStringifyTransformer extends TransformStream<
  UserWithProfile[],
  string
> {
  constructor() {
    const csvHeaders = [
      'id',
      'name',
      'email',
      'password',
      'createdAt',
      'updatedAt',
      'id',
      'userId',
      'age',
      'phoneNumber',
      'address',
      'city',
      'country',
      'zipCode',
      'occupation',
      'department',
      'isActive',
      'lastLogin',
      'bio',
      'loginCount',
      'createdAt',
      'updatedAt',
    ] as const;
    type CsvHeaders = (typeof csvHeaders)[number];

    const stringifier = stringify({
      header: true,
      columns: csvHeaders,
      quoted_string: true,
    });

    super({
      start: (controller: TransformStreamDefaultController<string>) => {
        stringifier.on('data', (chunk: Buffer) => {
          controller.enqueue(chunk.toString());
        });
        stringifier.on('error', (err) => {
          console.error(
            'UsersToCsvStringifyTransformer: Error from csv-stringify:',
            err,
          );
          controller.error(err);
        });
        stringifier.on('finish', () => {
          console.log(
            'UsersToCsvStringifyTransformer: csv-stringify finished.',
          );
        });
      },

      transform: (userBatch: UserWithProfile[]) => {
        for (const userWithProfiles of userBatch) {
          const profile = userWithProfiles.UserProfile?.[0];

          const record: Record<CsvHeaders, any> = {
            id: userWithProfiles.id,
            name: userWithProfiles.name,
            email: userWithProfiles.email,
            password: userWithProfiles.password,
            createdAt: userWithProfiles.createdAt.toISOString(),
            updatedAt: userWithProfiles.updatedAt.toISOString(),
            userId: profile?.userId,
            age: profile?.age,
            phoneNumber: profile?.phoneNumber,
            address: profile?.address,
            city: profile?.city,
            country: profile?.country,
            zipCode: profile?.zipCode,
            occupation: profile?.occupation,
            department: profile?.department,
            isActive: profile?.isActive,
            lastLogin: profile?.lastLogin
              ? profile.lastLogin.toISOString()
              : null,
            bio: profile?.bio,
            loginCount: profile?.loginCount,
          };

          const writeSuccess = stringifier.write(record);
          if (!writeSuccess) {
            console.warn(
              'UsersToCsvStringifyTransformer: csv-stringify backpressure.',
            );
          }
        }
      },

      flush: (controller: TransformStreamDefaultController<string>) => {
        stringifier.end(() => {
          controller.terminate();
        });
      },
    });
  }
}

ブラウザではダウンロード状況もわかるようにし、レスポンスをBlobに変換する

fetchStreamDataのレスポンスをBlobに変換→リンクを作成→ダウンロード→リンク削除をします
少しずつ処理が行われているのがわかるように、createProgressStreamで進捗状況がわかるようにしましたが、なくてもダウンロードには差し支えありません

ダウンロード状況

useStreamDownload.ts
import { useState } from 'react';
import { BASE_URL, type FetchStatus } from '../constants';

export const useStreamDownload = (path: string) => {
  const [downloadingStream, setDownloadingStream] =
    useState<FetchStatus>('idle');
  const [percent, setPercent] = useState(0);

  const getStream = async () => {
    console.time('useStreamDownload');
    try {
      setDownloadingStream('loading');
      setPercent(0);

      const response = await fetchStreamData(path);

      const progressStream = createProgressStream((progress) => {
        setPercent(progress);
      });

      const downloadStream = response.body?.pipeThrough(progressStream);
      if (!downloadStream) {
        throw new Error('Failed to create download stream');
      }

      const blob = await streamToBlob(downloadStream);

      downloadBlob(blob, 'downloaded_data.csv');

      setDownloadingStream('success');
    } catch (error) {
      console.error('Error downloading stream:', error);
      setDownloadingStream('error');
      setPercent(0);
    }
    console.timeEnd('useStreamDownload');
  };

  return { getStream, downloadingStream, percent };
};

const fetchStreamData = async (path: string): Promise<Response> => {
  const response = await fetch(`${BASE_URL}/${path}`, {
    method: 'GET',
    headers: {
      Accept: 'text/plain',
      'Content-Type': 'text/plain',
    },
  });

  if (!response.ok) {
    throw new Error(`API error: ${response.status} ${response.statusText}`);
  }

  return response;
};

const createProgressStream = (
  onProgress: (receivedLength: number) => void,
): TransformStream => {
  let receivedLength = 0;

  return new TransformStream({
    async transform(chunk, controller) {
      receivedLength += chunk.length;

      onProgress(receivedLength);

      controller.enqueue(chunk);
    },
  });
};

const streamToBlob = async (stream: ReadableStream): Promise<Blob> => {
  return new Response(stream).blob();
};

const downloadBlob = (blob: Blob, fileName: string): void => {
  const url = URL.createObjectURL(blob);
  const link = document.createElement('a');
  link.href = url;
  link.setAttribute('download', fileName);

  document.body.appendChild(link);
  link.click();
  document.body.removeChild(link);

  setTimeout(() => {
    URL.revokeObjectURL(url);
  }, 100);
};

ダウンロードでの背圧を確認

Web Stream APIには背圧(backpressure)があります
これはチャンクが受け付けられない場合に、手前の処理に停止信号を出し、メモリを圧迫しないようにするような処理です
ストリームがサーバーを介している場合でも背圧は効きます

試しに

  1. useStreamDownloadでチャンクごとに50mms待たせる
  2. useStreamDownloadでチャンクごとに10mms待たせる
  3. インターネット通信を遅くする
  4. 何もブロックがない状態(元のコード)

で計測してみると、下記の結果になりました

条件 合計処理時間 ネットワーク応答時間
1 チャンクごとに50ms待機 115.99秒 45.77s
2 チャンクごとに10ms待機 28.86秒 9.05s
3 インターネット通信を遅くする 85.88秒 1.4min (84s)
4 何もブロックがない状態 9.20秒 9.15s

合計処理時間

ダウンロード時間

1、2では下記のコードで待機させています

const downloadStream = response.body?.pipeThrough(progressStream).pipeThrough(
  new TransformStream({
    async transform() {
      await new Promise((resolve) => setTimeout(resolve, 50));
    },
  }),
);

いずれもサーバーが落ちずに処理ができているので、サーバーがさばける範囲で負荷が分散できているのがわかります

1番目の検証ではネットワーク応答時間がも合計処理時間も増えていますが、2番目の検証では、ネットワーク応答時間はブロックがない状態とほぼ変わりません
これは2番目の検証ではサーバー側が調整をしなくてもブラウザが十分に処理できるので、ネットワーク通信は何もブロックがない状態と変わらないのだと思います

また、3番目の検証では、合計処理時間は遅いものの、ネットワーク応答時間と合計処理時間がほぼ変わらないことから、ネットワーク通信が終わってからの処理はすぐに終わっていることがわかります

これでダウンロードで検証したいことは一通り試せました!

CSVをアップロードし、RDBに保存する

次はCSVにある10万レコード分のデータをアップロードし、RDBに書き込んでみます

ブラウザではアップロード状況がわかるようにし、ReadableStreamをリクエストボディに乗せる

先ほどダウンロードしたCSVを使いアップロードしてみます
duplex: 'half'を付与しないとエラーになってしまいます
2025年6月時点では、fetch APIのduplexパラメータは、PCのChrome, Edge, Operaでしか使えないようです

アップロードでも進捗状況の表示は不要ですが、せっかくなので表示しました
アップロード時はファイルサイズがわかっているので、進捗状況をパーセント表示できます

useStreamUpload.ts
import { useState } from 'react';
import { BASE_URL, type FetchStatus } from '../constants';

export const useStreamUpload = (url: string) => {
  const [uploadingCsvStream, setUploadingCsvStream] =
    useState<FetchStatus>('idle');
  const [uploadProgress, setUploadProgress] = useState(0);

  const postCsvStream = async (file: File) => {
    const [controller, timeoutId] = setupUploadTimeout(file.name);

    try {
      setUploadingCsvStream('loading');
      setUploadProgress(0);

      const totalBytes = file.size;
      console.log(`Starting upload of ${file.name} (${totalBytes} bytes)`);

      const progressStream = createProgressStream(
        totalBytes,
        (progressPercent, processedBytes) => {
          setUploadProgress(progressPercent);
          console.log(
            `Upload progress: ${progressPercent}%, sent ${processedBytes}/${totalBytes} bytes`,
          );
        },
      );

      const fileStream = file.stream().pipeThrough(progressStream);
      const result = await uploadFileStream(
        url,
        file,
        fileStream,
        controller.signal,
      );

      clearTimeout(timeoutId);
      console.log('Upload complete:', result);
      setUploadingCsvStream('success');
      return result;
    } catch (error) {
      console.error('Error uploading stream:', error);
      setUploadingCsvStream('error');
      setUploadProgress(0);
    }
  };

  return {
    postCsvStream,
    uploadingCsvStream,
    uploadProgress,
  };
};

const timeoutMs = 20 * 60 * 1000;

const setupUploadTimeout = (fileName: string): [AbortController, number] => {
  const controller = new AbortController();
  const timeoutId = setTimeout(() => {
    console.warn(
      `Client-side upload timeout of ${timeoutMs}ms reached for file ${fileName}. Aborting fetch.`,
    );
    controller.abort();
  }, timeoutMs);

  return [controller, timeoutId];
};

const createProgressStream = (
  totalBytes: number,
  onProgress: (progressPercent: number, processedBytes: number) => void,
): TransformStream => {
  let processedBytes = 0;

  return new TransformStream({
    transform(chunk, controller) {
      processedBytes += chunk.byteLength;
      const progressPercent = Math.round((processedBytes / totalBytes) * 100);
      onProgress(progressPercent, processedBytes);
      controller.enqueue(chunk);
    },
  });
};

const uploadFileStream = async (
  url: string,
  file: File,
  fileStream: ReadableStream,
  signal: AbortSignal,
) => {
  const response = await fetch(`${BASE_URL}/${url}`, {
    method: 'POST',
    headers: {
      'Content-Type': 'text/csv',
      'Content-Disposition': `attachment; filename="${file.name}"`,
    },
    body: fileStream,
    // @ts-expect-error 型が存在しない https://github.com/node-fetch/node-fetch/issues/1769
    duplex: 'half',
    signal,
  });

  if (!response.ok) {
    throw new Error(`API error: ${response.status} ${response.statusText}`);
  }

  return response.json();
};

サーバーサイドの処理概要

アップロードでは変換処理を細かくわけました
200件ずつRDBにインサートしていきます

A. TextDecoderStream: バイナリ → UTF-8テキスト
B. CsvToStringArray: CSV → 文字列配列
C. StringArrayToEntity: 文字列配列 → エンティティ
D. DbWriter: エンティティ → DB書き込み(200件貯まる毎に書き込み)

nodeのReadableをWeb Stream APIに変換し、各処理を行う

Web Stream APIを試したいので、nodeのReadableをWeb Stream APIに変換します
TextDecoderStreamはビルトインのメソッドです
pipeThroughはいくつも繋げられるので、細かく処理を分けてみました

// app.controller.ts
@Post('/csv/insert/stream')
async insertCsvStream(@Req() req: RawBodyRequest<Request>) {
  return this.appService.insertCsvStreamToDb(req);
}

// app.service.ts
async insertCsvStreamToDb(readableStream: Readable) {
  const webReadableStream: ReadableStream<Uint8Array> =
    Readable.toWeb(readableStream);

  await webReadableStream
    .pipeThrough(new TextDecoderStream('utf-8'))
    .pipeThrough(new CsvToStringArray())
    .pipeThrough(new StringArrayToEntity())
    .pipeTo(new DbWriter(this.prismaService));

  return { success: true };
}

それぞれの処理について解説していきます

テキストに変換されたCSVを文字列の配列に変換する

node-csvを使って文字列の配列に変換します
ダウンロードのときと同様、startメソッドが呼ばれたタイミングで、node-csvのイベントを登録します

CsvToStringArray.ts
import { TransformStream } from 'node:stream/web';
import { parse } from 'csv';

export class CsvToStringArray extends TransformStream<string, string[]> {
  constructor() {
    const parserInstance = parse({
      delimiter: ',',
      skip_empty_lines: true,
      from_line: 2,
    });

    super({
      start: (controller) => {
        parserInstance.on('data', (data: string[]) => {
          controller.enqueue(data);
        });

        parserInstance.on('error', (err: Error) => {
          console.error('CSV parsing error in CsvRecordOutput:', err);
          controller.error(err);
        });
      },
      transform: (chunk: string) => {
        parserInstance.write(chunk);
      },
      flush: (controller) => {
        parserInstance.end(() => {
          console.log('CsvRecordOutput: Parser finished, closing controller.');
          controller.terminate();
        });
      },
    });
  }
}

文字列の配列をRDBに保存できる形に変換する

ここはただの変換処理です
ストリームの検証とは無関係ですが、Userの主キーをRDBのオートインクリメントにしていると、インサートするまでUserProfileuserIdがわからないので、保存処理がややこしくなりそうです

StringArrayToEntity.ts
import { Prisma } from 'generated/prisma/client';
import { TransformStream } from 'node:stream/web';
import { v7 } from 'uuid';

export class StringArrayToEntity extends TransformStream<
  string[],
  {
    user: Prisma.UserCreateInput;
    userProfile: Prisma.UserProfileCreateManyInput;
  }
> {
  constructor() {
    super({
      transform(lines, controller) {
        const [
          _id,
          name,
          email,
          password,
          _createdAt,
          _updatedAt,
          __id,
          _userId,
          age,
          phoneNumber,
          address,
          city,
          country,
          zipCode,
          occupation,
          department,
          isActive,
          lastLogin,
          bio,
          loginCount,
          __createdAt,
          __updatedAt,
        ] = lines;

        const userId = v7();

        const user: Prisma.UserCreateInput = {
          id: userId,
          name,
          email,
          password,
        };
        const userProfile: Prisma.UserProfileCreateManyInput = {
          userId,
          age: parseInt(age, 10),
          phoneNumber,
          address,
          city,
          country,
          zipCode,
          occupation,
          department,
          isActive: isActive === 'true',
          lastLogin: new Date(lastLogin),
          bio,
          loginCount: parseInt(loginCount, 10),
        };

        controller.enqueue({
          user,
          userProfile,
        });
      },

      flush(controller) {
        controller.terminate();
      },
    });
  }
}

200件貯まる毎にRDBに書き込む

200件貯まるたびに書き込んでいきます
こちらもストリームの検証とは関係ないですが、CSVで大量のデータを登録する際、エラーハンドリングやトランザクション管理は考えることが多そうだと思いました

DbWriter.ts
import { Prisma } from 'generated/prisma/client';
import { WritableStream } from 'node:stream/web';
import { PrismaService } from 'src/prisma.service';

export class DbWriter extends WritableStream {
  private chunks: {
    user: Prisma.UserCreateInput;
    userProfile: Prisma.UserProfileCreateManyInput;
  }[] = [];
  private readonly chunkSize = 200;

  constructor(private readonly prismaService: PrismaService) {
    super({
      write: async (args: {
        user: Prisma.UserCreateInput;
        userProfile: Prisma.UserProfileCreateManyInput;
      }) => {
        this.chunks.push(args);

        if (this.chunks.length >= this.chunkSize) {
          await this.writeChunkToDb();
        }
      },
      close: async () => {
        if (this.chunks.length > 0) {
          await this.writeChunkToDb();
        }
      },
    });
  }

  private async writeChunkToDb(): Promise<void> {
    try {
      await this.prismaService.$transaction(async (tx) => {
        await tx.user.createMany({
          data: this.chunks.map((chunk) => chunk.user),
        });
        await tx.userProfile.createMany({
          data: this.chunks.map((chunk) => chunk.userProfile),
        });
      });
      console.log(`Inserted ${this.chunks.length} users into the database.`);
      this.chunks = []; // Reset chunks
    } catch (error) {
      console.error('Error inserting users into the database:', error);
    }
  }
}

アップロードでの背圧を確認

ダウンロード時と同様、背圧が効いていることを検証します

  1. dockerコンテナのメモリを増やし、DbWriterでのチャンクを10000件にし、チャンクごとに5000mms待たせる
  2. DbWriterでチャンクを書き込むごとに20mms待たせる
  3. 何もブロックがない状態(元のコード)

コンテナのメモリがギリギリだと変化がわかりにくいので、メモリを増やした状態でも検証してみました
アップロードでは全ての処理が終わったあとにレスポンスを返すので、ブラウザのネットワーク応答時間が合計処理時間とほぼ同じになります

レコード 条件 合計処理時間(ネットワーク応答時間)
1 コンテナメモリ増量 + 10000件チャンク + 5000ms待機 1.3min (78s)
2 200件書き込むごとに20ms待機 36.72s
3 何もブロックがない状態 28.24s

アップロード時の背圧

動画がアップロードできないので文章での説明になりますが、1つ目の検証ではサーバーがたくさんのデータを受け入れることができたので、ブラウザ側のアップロード状況はしばらく進むと数秒止まるを繰り返していました
2つ目の検証ではなだらかにアップロードが進みますが、少し待機させた分何もブロックがない状態よりはアップロードが遅くなっています

アップロード時も背圧が効いていることを確認できました!

ブラウザのタブが非アクティブになると処理が止まることがある

ダウンロード中、アップロード中にタブを非アクティブにすると処理が中断し、タブをアクティブにすると再開することがありました
Chromeのメモリセーバーの設定を調整をしても解消しなかったはずなのですが、全く再現しなくなってしまいました・・・
おそらくブラウザがよしなにやってくれているので、非アクティブ時にも確実に処理を続けるならService Workerの使用などを検討した方が良いのかもしれません

その他

今回は使いませんでしたが、cancelメソッドストリームの圧縮や展開を行うことができる組み込みのCompression Stream APIなど、便利な機能が色々あるようです

まとめ

実際にダウンロード、アップロード機能を作る場合には、エラーハンドリング、バリデーション、アップロード途中で失敗した場合はレコードを全て消すのか残すのかなど様々な検討が必要だと思います

また、非同期処理ではあるものの、処理の開始から終了まで長い時間かかるのが想定されるので、タイムアウトの時間をはじめ様々な考慮も必要になると思います

クラウドサービスを利用したアップロードを検討した方が良い場面が多いかもしれませんが、大量データの場合でもサーバーサイドで自由に処理ができるのは魅力だと思いました!

コミューン株式会社

Discussion