Web Stream APIを使ってサーバーのメモリより大きなデータを扱う
はじめに
Web Stream APIを試してみました!
それまであまり技術イベントに参加したことはなかったのですが、2024年10月のNihonbashi.jsに参加し、詳しい方のお話を聞いて興味を持ったのがきっかけです
前提
Web Stream APIとは?
こちらのスライドの解説が非常にわかりやすいです
MDNでは下記のように紹介されています
ストリーム API を使用すると、 JavaScript がネットワーク経由で受信したデータのストリームにプログラムでアクセスし、開発者の希望どおりに処理できます。
こちらの記事はどんなコードを書けばよいのか簡潔に紹介しています
紹介すること
下記の検証の過程でやってみたことやハマったことを紹介します
- RDBにある10万レコード分のデータをCSVダウンロードする
- CSVにある10万レコード分のデータをアップロードし、RDBに書き込む
紹介しないこと
- Web Stream APIの適切な使い方
- CSVアップロード、ダウンロードの適切な設計
技術スタック
TypeScriptに慣れているので、React, NestJS(express, prisma)、MySQLを使って検証しました
検証で使ったリポジトリはこちら
わかったこと
- nodeのStreamとWeb Stream APIは別のAPI
- nodeでもWeb Stream APIは使える
-
node:stream/web
のimportをしなくても型エラーにならないことがあるので注意
-
- HTTP/1.xではWeb Stream APIを使ったアップロードは出来なかった
- HTTP/2またはHTTP/3を使えばできるが、expressで素直にHTTP/2やHTTP/3を使うのは難しかったので、リバースプロキシを使った
- 2025年6月時点では、アップロードで使用するfetch APIのduplexパラメータはPCのChrome, Edge, Operaでしか使えない
- モバイルやSafari, Firefoxでは使えない
- ブラウザのタブが非アクティブな場合では処理が停止することがあった
- 停止しないこともあり、再現もしなくなったので、発生条件は不明
本文
ローカル環境でWeb Stream APIを試すための準備
ちょっと大きいデータを扱うとすぐ落ちるDockerコンテナを作成
まず大量のデータをサーバー上で展開出来ないことを確認するために、貧弱なサーバー環境を作ります
今回はDocker Composeでmem_limit
を70mにするとちょうどよかったです
少量のデータであれば動きますが、大量のデータをまとめて扱おうとするとコンテナがシャットダウンします
docker-compose.ymlとDockerfile
docker-compose.yml
version: '3.8'
services:
db:
image: mysql:8.4.3
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_USER: user
MYSQL_PASSWORD: password
MYSQL_DATABASE: webstream
ports:
- '3307:3306'
volumes:
- mysql_data_volume:/var/lib/mysql
networks:
- app_network
restart: unless-stopped
app:
build:
context: .
dockerfile: Dockerfile
container_name: nestjs_app_service
mem_limit: 70m # ちょっと大きいデータを扱うと落ちるように調整
environment:
- NODE_ENV=production
- DATABASE_URL=mysql://user:password@db:3306/webstream
- PORT=3000
- ./.env
depends_on:
- db
restart: unless-stopped
networks:
- app_network
nginx:
image: nginx:1.27-alpine
container_name: nginx_reverse_proxy
restart: unless-stopped
ports:
- '80:80'
- '443:443'
volumes:
- ./nginx/default.conf:/etc/nginx/conf.d/default.conf:ro
- ./ssl:/etc/nginx/ssl:ro
- ./nginx/logs:/var/log/nginx
depends_on:
- app
networks:
- app_network
networks:
app_network:
driver: bridge
volumes:
mysql_data_volume:
Dockerfile
FROM node:22-alpine
WORKDIR /app
RUN npm install -g pnpm
COPY package.json pnpm-lock.yaml ./
RUN pnpm install --frozen-lockfile
COPY . .
RUN pnpm dlx prisma generate
RUN pnpm run build
EXPOSE 3000
CMD ["pnpm", "run", "start:prod"]
nginxでリバースプロキシ
HTTP/1.xではWeb Stream APIをリクエストにのせると接続が拒否されてしまいます
expressでHTTP/2にすることを試みましたが、上手く動かせませんでした(適切なやり方が出来ていなかった可能性はあります)
ブラウザとリバースプロキシの間をHTTP/2で繋ぎ、リバースプロキシとサーバーの間をHTTP/1.1で繋ぐことで動かすことが出来ました
ちなみにfastifyではexperimentalでHTTP2をサポオートしているようです
- node組み込みのhttp2をexpressで使ってみたが、上手く動かず
- node-spdyを使ったところ動きましたが、1回目のリクエストが必ず失敗するなど不安定な挙動があった
expressのissueを見るとまだ頑張っている最中なようでしたので、リバースプロキシを使って検証環境を作りました
nginx/default.conf
server {
listen 80;
listen [::]:80;
server_name localhost;
location / {
return 301 https://$host$request_uri;
}
}
server {
listen 443 ssl http2;
listen [::]:443 ssl http2;
server_name localhost;
ssl_certificate /etc/nginx/ssl/cert.pem;
ssl_certificate_key /etc/nginx/ssl/key.pem;
client_max_body_size 1G;
client_header_timeout 60s;
client_body_timeout 1800s;
send_timeout 1800s;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_prefer_server_ciphers off;
ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384;
access_log /var/log/nginx/access.log;
error_log /var/log/nginx/error.log;
location / {
proxy_pass http://app:3000;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_buffering off;
proxy_request_buffering off;
proxy_connect_timeout 75s;
proxy_send_timeout 1800s;
proxy_read_timeout 1800s;
}
}
HTTPS通信をするための証明書発行はmkcertを使うと簡単でした
これで小さいデータを扱うことはできますが、大きいデータを扱うとダウンするサーバーができました
検証用データ
User
とUserProfile
というリレーションシップを持ったテーブルを用意し、10万件のシードデータを投入します。ORMにはPrismaを使っています。
schema.prisma
// This is your Prisma schema file,
// learn more about it in the docs: https://pris.ly/d/prisma-schema
// Looking for ways to speed up your queries, or scale easily with your serverless or edge functions?
// Try Prisma Accelerate: https://pris.ly/cli/accelerate-init
generator client {
provider = "prisma-client-js"
output = "../generated/prisma"
}
datasource db {
provider = "mysql"
url = env("DATABASE_URL")
}
model User {
id String @id @default(uuid(7))
name String
email String
password String
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
UserProfile UserProfile[]
}
model UserProfile {
id Int @id @default(autoincrement())
userId String
age Int
phoneNumber String
address String
city String
country String
zipCode String
occupation String
department String
isActive Boolean @default(true)
lastLogin DateTime
bio String @db.Text
loginCount Int @default(0)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
}
seed/index.ts
import { PrismaClient, Prisma } from '../generated/prisma/client';
import { faker } from '@faker-js/faker/locale/ja';
const prisma = new PrismaClient();
const USER_COUNT = 100000;
const BATCH_SIZE = 1000;
async function main() {
console.log(Starting to seed ${USER_COUNT} users...
);
faker.seed(1234);
await prisma.userProfile.deleteMany({});
await prisma.user.deleteMany({});
for (let i = 0; i < USER_COUNT; i += BATCH_SIZE) {
const userBatch: Prisma.UserCreateManyInput[] = [];
const batchSize = Math.min(BATCH_SIZE, USER_COUNT - i);
console.log(
`Creating batch ${i / BATCH_SIZE + 1} of ${Math.ceil(USER_COUNT / BATCH_SIZE)} users`,
);
for (let j = 0; j < batchSize; j++) {
const user: Prisma.UserCreateManyInput = {
name: faker.person.fullName(),
email: faker.internet.email(),
password: faker.internet.password({ length: 10 }),
};
userBatch.push(user);
}
await prisma.user.createMany({
data: userBatch,
});
const createdUsers = await prisma.user.findMany({
skip: i,
take: batchSize,
orderBy: { id: 'asc' },
});
console.log(
`Creating batch ${i / BATCH_SIZE + 1} of ${Math.ceil(USER_COUNT / BATCH_SIZE)} profiles`,
);
// Create profiles for these users
const profileBatch: Prisma.UserProfileCreateManyInput[] = [];
createdUsers.forEach((user) => {
const profile: Prisma.UserProfileCreateManyInput = {
userId: user.id,
age: faker.number.int({ min: 18, max: 80 }),
phoneNumber: faker.phone.number(),
address: faker.location.streetAddress(),
city: faker.location.city(),
country: faker.location.country(),
zipCode: faker.location.zipCode(),
occupation: faker.person.jobTitle(),
department: faker.commerce.department(),
isActive: faker.datatype.boolean(),
lastLogin: faker.date.past(),
bio: faker.lorem.paragraphs(2),
loginCount: faker.number.int({ min: 0, max: 100 }),
};
profileBatch.push(profile);
});
// Create profiles in batch
await prisma.userProfile.createMany({
data: profileBatch,
});
}
const userCount = await prisma.user.count();
const profileCount = await prisma.userProfile.count();
console.log(Seeding complete!
);
console.log(Created ${userCount} users
);
console.log(Created ${profileCount} user profiles
);
}
main()
.then(() => {
console.log('Seeding completed successfully!');
return prisma.$disconnect();
})
.catch(async (e) => {
console.error('Error during seeding:', e);
await prisma.$disconnect();
process.exit(1);
});
RDBにあるデータをCSVダウンロードする
まずはRDBにある10万レコード分のデータをCSVダウンロードしてみます
サーバーサイドの処理概要
RDBに10万レコード存在する場合、全てのデータをnodeサーバー上に展開するとダウンしますが、200レコードであればnodeサーバーに展開可能です
200レコードを1チャンクとしてCSVダウンロードをしてみます
各処理を少し解説します
レスポンスはNestJSのStreamableFileにした
node stream APIにはWeb Stream APIを変換するためのtoWeb, fromWebメソッドがあります
node22ではexperimentalですが、node24ではstableになっているようなので、使ってみました
NestJSのにはStreamableFileが用意されており、簡単に実装できたため、返却前にnodeのstreamに変換することにしました
// app.controller.ts
import { Controller, Get, StreamableFile, Header } from '@nestjs/common';
import { Readable } from 'node:stream';
@Get('/csv/from/db')
@Header('Content-Type', 'text/csv; charset=utf-8')
@Header('Content-Disposition', 'attachment; filename="users_from_db.csv"')
getCsvFromDb(): StreamableFile {
const webReadableStream = this.appService.findAllUsersToCsv();
const nodeReadableStream = Readable.fromWeb(webReadableStream);
return new StreamableFile(nodeReadableStream);
}
// app.service.ts
findAllUsersToCsv() {
const readable: ReadableStream<UserWithProfile[]> =
this.findUserService.findAllUsersObjectStream();
return readable.pipeThrough(new UsersToCsvStringifyTransformer());
}
ReadableStreamを使い、200件ずつRDBからデータを取得する
ReadableStreamのpullメソッドを使います
全レコードから200件ずつセレクトし、全てのレコードを処理すると処理を終了します
startメソッドで再帰的な処理を実行するサンプルがあり、どちらを使うのが望ましいのか紛らわしかったですが、
ストリームの内部にあるチャンクのキューがいっぱいになっていない場合、最高水準点に達するまで繰り返し呼び出されます。 pull() がプロミスを返す場合、そのプロミスが満たされるまで再び呼び出されません
とあるので、pullを使用するのは適切なようです
node:stream/web
をimportしなくても型エラーは出ませんが、明示的にimportが必要でした
tsconfigなどの設定次第かと思いますが、しばらくハマったので気をつけましょう
findUserStream.service.ts
import { Injectable } from '@nestjs/common';
import { PrismaService } from 'src/prisma.service';
import {
ReadableStream,
ReadableStreamDefaultController,
} from 'node:stream/web';
import { Prisma } from 'generated/prisma/client';
export type UserWithProfile = Prisma.UserGetPayload<{
include: { UserProfile: true };
}>;
@Injectable()
export class FindUserStreamService {
constructor(private prisma: PrismaService) {}
findAllUsersObjectStream(): ReadableStream<UserWithProfile[]> {
const batchSize = 200;
let cursorId: string | undefined = undefined;
let isFetching: boolean = false;
let streamCancelled: boolean = false;
const prismaService = this.prisma;
return new ReadableStream<UserWithProfile[]>({
async pull(
controller: ReadableStreamDefaultController<UserWithProfile[]>,
) {
if (isFetching || streamCancelled) {
return;
}
isFetching = true;
try {
const usersWithProfilesBatch = await prismaService.user.findMany({
take: batchSize,
skip: cursorId ? 1 : undefined,
cursor: cursorId ? { id: cursorId } : undefined,
orderBy: { id: 'asc' },
include: { UserProfile: true },
});
if (streamCancelled) {
isFetching = false;
return;
}
if (usersWithProfilesBatch.length === 0) {
controller.close(); // データがなければストリームを正常終了
isFetching = false;
return;
}
controller.enqueue(usersWithProfilesBatch);
cursorId =
usersWithProfilesBatch[usersWithProfilesBatch.length - 1].id;
if (usersWithProfilesBatch.length < batchSize) {
// これが最後のバッチなので、ストリームを正常終了
controller.close();
}
} catch (error) {
console.error(
'UserService: Error fetching users in ReadableStream pull:',
error,
);
controller.error(error);
streamCancelled = true;
} finally {
isFetching = false;
}
},
});
}
}
UsersToCsvでCSVに変換する
node-csvというライブラリを使用して、CSVに変換します
startメソッドでnode-csvのstringifierのイベントを登録し、transformメソッドで変換を行っています
UsersToCsv.ts
import {
TransformStream,
TransformStreamDefaultController,
} from 'node:stream/web';
import { stringify } from 'csv';
import { UserWithProfile } from 'src/findUserStream.service';
export class UsersToCsvStringifyTransformer extends TransformStream<
UserWithProfile[],
string
> {
constructor() {
const csvHeaders = [
'id',
'name',
'email',
'password',
'createdAt',
'updatedAt',
'id',
'userId',
'age',
'phoneNumber',
'address',
'city',
'country',
'zipCode',
'occupation',
'department',
'isActive',
'lastLogin',
'bio',
'loginCount',
'createdAt',
'updatedAt',
] as const;
type CsvHeaders = (typeof csvHeaders)[number];
const stringifier = stringify({
header: true,
columns: csvHeaders,
quoted_string: true,
});
super({
start: (controller: TransformStreamDefaultController<string>) => {
stringifier.on('data', (chunk: Buffer) => {
controller.enqueue(chunk.toString());
});
stringifier.on('error', (err) => {
console.error(
'UsersToCsvStringifyTransformer: Error from csv-stringify:',
err,
);
controller.error(err);
});
stringifier.on('finish', () => {
console.log(
'UsersToCsvStringifyTransformer: csv-stringify finished.',
);
});
},
transform: (userBatch: UserWithProfile[]) => {
for (const userWithProfiles of userBatch) {
const profile = userWithProfiles.UserProfile?.[0];
const record: Record<CsvHeaders, any> = {
id: userWithProfiles.id,
name: userWithProfiles.name,
email: userWithProfiles.email,
password: userWithProfiles.password,
createdAt: userWithProfiles.createdAt.toISOString(),
updatedAt: userWithProfiles.updatedAt.toISOString(),
userId: profile?.userId,
age: profile?.age,
phoneNumber: profile?.phoneNumber,
address: profile?.address,
city: profile?.city,
country: profile?.country,
zipCode: profile?.zipCode,
occupation: profile?.occupation,
department: profile?.department,
isActive: profile?.isActive,
lastLogin: profile?.lastLogin
? profile.lastLogin.toISOString()
: null,
bio: profile?.bio,
loginCount: profile?.loginCount,
};
const writeSuccess = stringifier.write(record);
if (!writeSuccess) {
console.warn(
'UsersToCsvStringifyTransformer: csv-stringify backpressure.',
);
}
}
},
flush: (controller: TransformStreamDefaultController<string>) => {
stringifier.end(() => {
controller.terminate();
});
},
});
}
}
ブラウザではダウンロード状況もわかるようにし、レスポンスをBlobに変換する
fetchStreamDataのレスポンスをBlobに変換→リンクを作成→ダウンロード→リンク削除をします
少しずつ処理が行われているのがわかるように、createProgressStream
で進捗状況がわかるようにしましたが、なくてもダウンロードには差し支えありません
useStreamDownload.ts
import { useState } from 'react';
import { BASE_URL, type FetchStatus } from '../constants';
export const useStreamDownload = (path: string) => {
const [downloadingStream, setDownloadingStream] =
useState<FetchStatus>('idle');
const [percent, setPercent] = useState(0);
const getStream = async () => {
console.time('useStreamDownload');
try {
setDownloadingStream('loading');
setPercent(0);
const response = await fetchStreamData(path);
const progressStream = createProgressStream((progress) => {
setPercent(progress);
});
const downloadStream = response.body?.pipeThrough(progressStream);
if (!downloadStream) {
throw new Error('Failed to create download stream');
}
const blob = await streamToBlob(downloadStream);
downloadBlob(blob, 'downloaded_data.csv');
setDownloadingStream('success');
} catch (error) {
console.error('Error downloading stream:', error);
setDownloadingStream('error');
setPercent(0);
}
console.timeEnd('useStreamDownload');
};
return { getStream, downloadingStream, percent };
};
const fetchStreamData = async (path: string): Promise<Response> => {
const response = await fetch(`${BASE_URL}/${path}`, {
method: 'GET',
headers: {
Accept: 'text/plain',
'Content-Type': 'text/plain',
},
});
if (!response.ok) {
throw new Error(`API error: ${response.status} ${response.statusText}`);
}
return response;
};
const createProgressStream = (
onProgress: (receivedLength: number) => void,
): TransformStream => {
let receivedLength = 0;
return new TransformStream({
async transform(chunk, controller) {
receivedLength += chunk.length;
onProgress(receivedLength);
controller.enqueue(chunk);
},
});
};
const streamToBlob = async (stream: ReadableStream): Promise<Blob> => {
return new Response(stream).blob();
};
const downloadBlob = (blob: Blob, fileName: string): void => {
const url = URL.createObjectURL(blob);
const link = document.createElement('a');
link.href = url;
link.setAttribute('download', fileName);
document.body.appendChild(link);
link.click();
document.body.removeChild(link);
setTimeout(() => {
URL.revokeObjectURL(url);
}, 100);
};
ダウンロードでの背圧を確認
Web Stream APIには背圧(backpressure)があります
これはチャンクが受け付けられない場合に、手前の処理に停止信号を出し、メモリを圧迫しないようにするような処理です
ストリームがサーバーを介している場合でも背圧は効きます
試しに
-
useStreamDownload
でチャンクごとに50mms待たせる -
useStreamDownload
でチャンクごとに10mms待たせる - インターネット通信を遅くする
- 何もブロックがない状態(元のコード)
で計測してみると、下記の結果になりました
条件 | 合計処理時間 | ネットワーク応答時間 | |
---|---|---|---|
1 | チャンクごとに50ms待機 | 115.99秒 | 45.77s |
2 | チャンクごとに10ms待機 | 28.86秒 | 9.05s |
3 | インターネット通信を遅くする | 85.88秒 | 1.4min (84s) |
4 | 何もブロックがない状態 | 9.20秒 | 9.15s |
1、2では下記のコードで待機させています
const downloadStream = response.body?.pipeThrough(progressStream).pipeThrough(
new TransformStream({
async transform() {
await new Promise((resolve) => setTimeout(resolve, 50));
},
}),
);
いずれもサーバーが落ちずに処理ができているので、サーバーがさばける範囲で負荷が分散できているのがわかります
1番目の検証ではネットワーク応答時間がも合計処理時間も増えていますが、2番目の検証では、ネットワーク応答時間はブロックがない状態とほぼ変わりません
これは2番目の検証ではサーバー側が調整をしなくてもブラウザが十分に処理できるので、ネットワーク通信は何もブロックがない状態と変わらないのだと思います
また、3番目の検証では、合計処理時間は遅いものの、ネットワーク応答時間と合計処理時間がほぼ変わらないことから、ネットワーク通信が終わってからの処理はすぐに終わっていることがわかります
これでダウンロードで検証したいことは一通り試せました!
CSVをアップロードし、RDBに保存する
次はCSVにある10万レコード分のデータをアップロードし、RDBに書き込んでみます
ブラウザではアップロード状況がわかるようにし、ReadableStreamをリクエストボディに乗せる
先ほどダウンロードしたCSVを使いアップロードしてみます
duplex: 'half'
を付与しないとエラーになってしまいます
2025年6月時点では、fetch APIのduplexパラメータは、PCのChrome, Edge, Operaでしか使えないようです
アップロードでも進捗状況の表示は不要ですが、せっかくなので表示しました
アップロード時はファイルサイズがわかっているので、進捗状況をパーセント表示できます
useStreamUpload.ts
import { useState } from 'react';
import { BASE_URL, type FetchStatus } from '../constants';
export const useStreamUpload = (url: string) => {
const [uploadingCsvStream, setUploadingCsvStream] =
useState<FetchStatus>('idle');
const [uploadProgress, setUploadProgress] = useState(0);
const postCsvStream = async (file: File) => {
const [controller, timeoutId] = setupUploadTimeout(file.name);
try {
setUploadingCsvStream('loading');
setUploadProgress(0);
const totalBytes = file.size;
console.log(`Starting upload of ${file.name} (${totalBytes} bytes)`);
const progressStream = createProgressStream(
totalBytes,
(progressPercent, processedBytes) => {
setUploadProgress(progressPercent);
console.log(
`Upload progress: ${progressPercent}%, sent ${processedBytes}/${totalBytes} bytes`,
);
},
);
const fileStream = file.stream().pipeThrough(progressStream);
const result = await uploadFileStream(
url,
file,
fileStream,
controller.signal,
);
clearTimeout(timeoutId);
console.log('Upload complete:', result);
setUploadingCsvStream('success');
return result;
} catch (error) {
console.error('Error uploading stream:', error);
setUploadingCsvStream('error');
setUploadProgress(0);
}
};
return {
postCsvStream,
uploadingCsvStream,
uploadProgress,
};
};
const timeoutMs = 20 * 60 * 1000;
const setupUploadTimeout = (fileName: string): [AbortController, number] => {
const controller = new AbortController();
const timeoutId = setTimeout(() => {
console.warn(
`Client-side upload timeout of ${timeoutMs}ms reached for file ${fileName}. Aborting fetch.`,
);
controller.abort();
}, timeoutMs);
return [controller, timeoutId];
};
const createProgressStream = (
totalBytes: number,
onProgress: (progressPercent: number, processedBytes: number) => void,
): TransformStream => {
let processedBytes = 0;
return new TransformStream({
transform(chunk, controller) {
processedBytes += chunk.byteLength;
const progressPercent = Math.round((processedBytes / totalBytes) * 100);
onProgress(progressPercent, processedBytes);
controller.enqueue(chunk);
},
});
};
const uploadFileStream = async (
url: string,
file: File,
fileStream: ReadableStream,
signal: AbortSignal,
) => {
const response = await fetch(`${BASE_URL}/${url}`, {
method: 'POST',
headers: {
'Content-Type': 'text/csv',
'Content-Disposition': `attachment; filename="${file.name}"`,
},
body: fileStream,
// @ts-expect-error 型が存在しない https://github.com/node-fetch/node-fetch/issues/1769
duplex: 'half',
signal,
});
if (!response.ok) {
throw new Error(`API error: ${response.status} ${response.statusText}`);
}
return response.json();
};
サーバーサイドの処理概要
アップロードでは変換処理を細かくわけました
200件ずつRDBにインサートしていきます
A. TextDecoderStream: バイナリ → UTF-8テキスト
B. CsvToStringArray: CSV → 文字列配列
C. StringArrayToEntity: 文字列配列 → エンティティ
D. DbWriter: エンティティ → DB書き込み(200件貯まる毎に書き込み)
nodeのReadableをWeb Stream APIに変換し、各処理を行う
Web Stream APIを試したいので、nodeのReadableをWeb Stream APIに変換します
TextDecoderStreamはビルトインのメソッドです
pipeThroughはいくつも繋げられるので、細かく処理を分けてみました
// app.controller.ts
@Post('/csv/insert/stream')
async insertCsvStream(@Req() req: RawBodyRequest<Request>) {
return this.appService.insertCsvStreamToDb(req);
}
// app.service.ts
async insertCsvStreamToDb(readableStream: Readable) {
const webReadableStream: ReadableStream<Uint8Array> =
Readable.toWeb(readableStream);
await webReadableStream
.pipeThrough(new TextDecoderStream('utf-8'))
.pipeThrough(new CsvToStringArray())
.pipeThrough(new StringArrayToEntity())
.pipeTo(new DbWriter(this.prismaService));
return { success: true };
}
それぞれの処理について解説していきます
テキストに変換されたCSVを文字列の配列に変換する
node-csvを使って文字列の配列に変換します
ダウンロードのときと同様、startメソッドが呼ばれたタイミングで、node-csv
のイベントを登録します
CsvToStringArray.ts
import { TransformStream } from 'node:stream/web';
import { parse } from 'csv';
export class CsvToStringArray extends TransformStream<string, string[]> {
constructor() {
const parserInstance = parse({
delimiter: ',',
skip_empty_lines: true,
from_line: 2,
});
super({
start: (controller) => {
parserInstance.on('data', (data: string[]) => {
controller.enqueue(data);
});
parserInstance.on('error', (err: Error) => {
console.error('CSV parsing error in CsvRecordOutput:', err);
controller.error(err);
});
},
transform: (chunk: string) => {
parserInstance.write(chunk);
},
flush: (controller) => {
parserInstance.end(() => {
console.log('CsvRecordOutput: Parser finished, closing controller.');
controller.terminate();
});
},
});
}
}
文字列の配列をRDBに保存できる形に変換する
ここはただの変換処理です
ストリームの検証とは無関係ですが、User
の主キーをRDBのオートインクリメントにしていると、インサートするまでUserProfile
のuserId
がわからないので、保存処理がややこしくなりそうです
StringArrayToEntity.ts
import { Prisma } from 'generated/prisma/client';
import { TransformStream } from 'node:stream/web';
import { v7 } from 'uuid';
export class StringArrayToEntity extends TransformStream<
string[],
{
user: Prisma.UserCreateInput;
userProfile: Prisma.UserProfileCreateManyInput;
}
> {
constructor() {
super({
transform(lines, controller) {
const [
_id,
name,
email,
password,
_createdAt,
_updatedAt,
__id,
_userId,
age,
phoneNumber,
address,
city,
country,
zipCode,
occupation,
department,
isActive,
lastLogin,
bio,
loginCount,
__createdAt,
__updatedAt,
] = lines;
const userId = v7();
const user: Prisma.UserCreateInput = {
id: userId,
name,
email,
password,
};
const userProfile: Prisma.UserProfileCreateManyInput = {
userId,
age: parseInt(age, 10),
phoneNumber,
address,
city,
country,
zipCode,
occupation,
department,
isActive: isActive === 'true',
lastLogin: new Date(lastLogin),
bio,
loginCount: parseInt(loginCount, 10),
};
controller.enqueue({
user,
userProfile,
});
},
flush(controller) {
controller.terminate();
},
});
}
}
200件貯まる毎にRDBに書き込む
200件貯まるたびに書き込んでいきます
こちらもストリームの検証とは関係ないですが、CSVで大量のデータを登録する際、エラーハンドリングやトランザクション管理は考えることが多そうだと思いました
DbWriter.ts
import { Prisma } from 'generated/prisma/client';
import { WritableStream } from 'node:stream/web';
import { PrismaService } from 'src/prisma.service';
export class DbWriter extends WritableStream {
private chunks: {
user: Prisma.UserCreateInput;
userProfile: Prisma.UserProfileCreateManyInput;
}[] = [];
private readonly chunkSize = 200;
constructor(private readonly prismaService: PrismaService) {
super({
write: async (args: {
user: Prisma.UserCreateInput;
userProfile: Prisma.UserProfileCreateManyInput;
}) => {
this.chunks.push(args);
if (this.chunks.length >= this.chunkSize) {
await this.writeChunkToDb();
}
},
close: async () => {
if (this.chunks.length > 0) {
await this.writeChunkToDb();
}
},
});
}
private async writeChunkToDb(): Promise<void> {
try {
await this.prismaService.$transaction(async (tx) => {
await tx.user.createMany({
data: this.chunks.map((chunk) => chunk.user),
});
await tx.userProfile.createMany({
data: this.chunks.map((chunk) => chunk.userProfile),
});
});
console.log(`Inserted ${this.chunks.length} users into the database.`);
this.chunks = []; // Reset chunks
} catch (error) {
console.error('Error inserting users into the database:', error);
}
}
}
アップロードでの背圧を確認
ダウンロード時と同様、背圧が効いていることを検証します
- dockerコンテナのメモリを増やし、
DbWriter
でのチャンクを10000件にし、チャンクごとに5000mms待たせる -
DbWriter
でチャンクを書き込むごとに20mms待たせる - 何もブロックがない状態(元のコード)
コンテナのメモリがギリギリだと変化がわかりにくいので、メモリを増やした状態でも検証してみました
アップロードでは全ての処理が終わったあとにレスポンスを返すので、ブラウザのネットワーク応答時間が合計処理時間とほぼ同じになります
レコード | 条件 | 合計処理時間(ネットワーク応答時間) |
---|---|---|
1 | コンテナメモリ増量 + 10000件チャンク + 5000ms待機 | 1.3min (78s) |
2 | 200件書き込むごとに20ms待機 | 36.72s |
3 | 何もブロックがない状態 | 28.24s |
動画がアップロードできないので文章での説明になりますが、1つ目の検証ではサーバーがたくさんのデータを受け入れることができたので、ブラウザ側のアップロード状況はしばらく進むと数秒止まるを繰り返していました
2つ目の検証ではなだらかにアップロードが進みますが、少し待機させた分何もブロックがない状態よりはアップロードが遅くなっています
アップロード時も背圧が効いていることを確認できました!
ブラウザのタブが非アクティブになると処理が止まることがある
ダウンロード中、アップロード中にタブを非アクティブにすると処理が中断し、タブをアクティブにすると再開することがありました
Chromeのメモリセーバーの設定を調整をしても解消しなかったはずなのですが、全く再現しなくなってしまいました・・・
おそらくブラウザがよしなにやってくれているので、非アクティブ時にも確実に処理を続けるならService Workerの使用などを検討した方が良いのかもしれません
その他
今回は使いませんでしたが、cancelメソッドやストリームの圧縮や展開を行うことができる組み込みのCompression Stream APIなど、便利な機能が色々あるようです
まとめ
実際にダウンロード、アップロード機能を作る場合には、エラーハンドリング、バリデーション、アップロード途中で失敗した場合はレコードを全て消すのか残すのかなど様々な検討が必要だと思います
また、非同期処理ではあるものの、処理の開始から終了まで長い時間かかるのが想定されるので、タイムアウトの時間をはじめ様々な考慮も必要になると思います
クラウドサービスを利用したアップロードを検討した方が良い場面が多いかもしれませんが、大量データの場合でもサーバーサイドで自由に処理ができるのは魅力だと思いました!
Discussion