🫗

巨大ファイルをメモリに載せない

に公開

この記事は AI Shift Advent Calendar 2025 の1日目の記事です。

AI Shift で AI Worker の開発・運用に携わっています、鈴木 (@amata1219) と申します。

直近の業務では、AI Worker のファイル処理能力の抜本的な強化に取り組んできました。もともとは、単に取り扱い可能なファイルのサイズ上限を引き上げたいとの要望から始まりました。

しかし、定数の値を 1024 * 1024 から 1024 * 1024 * 1024 に書き換える1行のプルリクエストでは、この問題は解決しませんでした。いくつかの改修を経て、現在は数GB以上のファイルを取り扱えるプログラムになっています。

本記事ではスケール可能性の高い効率的なファイル処理の実装方法を紹介します。

なぜ1行のPRじゃ解決できない?

巨大なファイルを扱うことは容易ではないためです。

ハードウェアの限界があります。サーバーの限られたメモリの中でファイルを扱わなければなりません。実際にファイルサイズ上限を引き上げただけでは、ローカル環境のサーバーは巨大ファイルの受信とともに Out of Memory でクラッシュしました。もしメモリを増やすとしても、クラウドサービスでは費用が高くつく可能性があります。

また、 I/O 性能が悪化します。ディスクの読み書きに時間を要するようになり、性能上の新たなボトルネックとなる可能性があります。これはユーザー体験を損ねたり予測困難なディスク読み書き時間を考慮した複雑なアプリケーションコードを生んだりする原因になります。

これらの問題を解決するために、ファイルを処理するアルゴリズムを変える必要があります。

わかった、どう変えればいい?

スケール可能性の高い効率的なファイル処理は、主に以下のアルゴリズムに支えられています。

まず、ストリーミングが欠かせません。サーバーのメモリに載りきらないサイズのファイルは、少しずつ読み込み処理するといった工夫が必要です。一括で行っていたファイルのアップロード・ダウンロード処理をストリーミング化が求められます。

次に、ストリーミングパースが求められます。従来の処理では CSV や PDF ファイルを一括で処理していましたが、ストリーミング化以降はファイル全体を一度に処理することはできません。機能を維持するためには、チャンク単位での処理に切り替えなければなりません。

そして、並列分散処理もあると望ましいです。小さいファイルを一度に処理するのとは異なり、巨大ファイルのチャンクを全て処理しきるのは時間がかかります。現実的には十分に効率的なアルゴリズムであることが要求されます。

OK, やってみよう!

以降はスケール可能性の高い効率的なファイル処理の具体的な実装方法を説明します。

実際のファイル処理のコードは TypeScript で記述されています。確実に動作する説明・例示のために TypeScript を用いますが、考え方は他のプログラミング言語にも通用する内容です。

以下では Node.js 環境を前提としています。また、読みやすさのため、変更の無い箇所は基本的に省略しています。

ストリーミングする

まず、multipart/form-data でサーバーに一括でファイルをアップロードする処理から始めます。ここにストリーミング、ストリーミングパース、並列分散処理といった概念を一つひとつ導入し、処理の特性の変化を観察しながら理解を深めます。

import { Hono } from 'hono'

const app = new Hono()

app.post('/upload', async (c) => {
  const body = await c.req.parseBody()
  
  // `'file'` フィールドにファイルが1つ格納されていることを想定します
  const file = body['file']
  if (!(file instanceof File)) {
    return c.json({ error: 'file フィールドが存在しないか不正です' }, 400)
  }

  const arrayBuffer = await file.arrayBuffer()
  const buffer = Buffer.from(arrayBuffer)

  await handleUploadedFile(buffer)

  return c.json({ message: 'ok' })
})

async function handleUploadedFile(buffer: Buffer): Promise<void> {
 // ...
}

この処理は file フィールドに格納されたファイル全体をメモリに読み込みます。その後、Node.js の Buffer に変換しています。例えば、1MB のファイルを受信したら、1MB のメモリが消費されます。ArrayBufferBuffer 分の消費メモリも考慮すると、実際は 1MB 以上のメモリが使用されます。

サーバーの使用可能なメモリ量にも依りますが、取り扱い可能なファイルのサイズ上限を 10MB 程度に設定していれば問題無いかもしれません。ただ、偶然にも複数のユーザーが同時に大きいファイルをアップロードし、OOM が発生する可能性も排除することはできません。上限を 100MB, 1GB と引き上げる可能性があれば、いずれはファイルがメモリに載りきらなくなります。

この問題の解決策として、ファイルのアップロード・ダウンロード処理をストリーミング化します。

import Busboy from 'busboy'
import { Readable } from 'node:stream'
import type { ReadableStream } from 'node:stream/web'

app.post('/upload', async (c) => {
  const contentType = c.req.header('content-type')
  if (!contentType?.startsWith('multipart/form-data')) {
    return c.json({ error: 'Content-Type は multipart/form-data が必要です' }, 400)
  }
  
  const body = c.req.raw.body
  if (!body) {
    return c.json({ error: '空のボディです' }, 400)
  }

  const busboy = Busboy({
    headers: { 'content-type': contentType },
    highWaterMark: 1024 * 64, // busboy 本体の High Water Mark
    fileHwm: 1024 * 1024 // 各 file ストリームの High Water Mark
  })
  const nodeReadable = Readable.fromWeb(body as ReadableStream)
  
  try {
    await new Promise<void>((resolve, reject) => {
      busboy.on('file', (fieldname, file) => {
        if (fieldname !== 'file') {
          file.resume() // ここでは想定外のフィールドは捨てています
          return
        }
        // 簡略化のためここでは `await` していません。実際には並列度の制御やエラーハンドリングが必要です。
        file.on('data', (chunk: Buffer) => { void handleUploadedFileChunk(chunk) })
        file.on('end', () => { void handleUploadedFileChunk(Buffer.alloc(0)) })
      })
      busboy.on('error', (err) => reject(err as Error)) // エラー
      busboy.on('finish', () => resolve(null)) // 正常終了
      nodeReadable.pipe(busboy)
    })
  } catch {
    return c.json({ error: 'アップロードエラー' }, 500)
  }
  return c.json({ message: 'ok' })
})

async function handleUploadedFileChunk(chunk: Buffer) {
  // ...
}

ファイルを小さなチャンクに分割して逐次的に読み込んでいます。これにより常にメモリ使用量は低く抑えられます。

busboy は Node.js 向けの multipart/form-data ストリーミングパーサーです。ストリーミングパース自体は次節で説明しますが、busboy はストリーミングしながらリクエストからファイルやフィールドの情報を抽出して提供します。

High Water Mark はストリームの内部にあるバッファに溜められるデータサイズの閾値を意味します。busboy 本体と各 file ストリームそれぞれが個別に Node.js のストリームを持ちます。それぞれに対し High Water Mark を設定できます。fileHwm はバッファのサイズ上限として機能し、file.on('data', ...) で受け取るチャンクのサイズは概ねそれ以下になります。

もし GCS や S3 へのアップロードを行っている場合は、ダイレクトアップロードの利用を検討してください。クライアントから直接クラウドストレージにファイルをアップロードするため、メモリや帯域などのコストを外部に逃がせます。また、再開可能なアップロードがサポートされている場合は、それを利用することでより確実に巨大ファイルを保存することができます。

これで大きなファイルを受信しても、消費メモリはファイルサイズに比例せず、チャンクサイズと同時処理数の積程度に抑えられます。

ストリーミングパースする

これで全ての問題は解決したように見えます。本当でしょうか?

従来はファイル全体を取得し、その上で処理していました。ただ、現在得られるのはよく分からない境界で千切られた複数のチャンクです。このチャンク単位で処理を行わなければなりません。仮にチャンクを溜めて全結合してはストリーミングの意味が無いためです。

チャンクは見方を変えれば破損したデータのように見えるかもしれません。せめて元のファイルの形式に基づいて扱えるようにしたいです。ここにストリーミングパースが使えます。ストリーミングでデータを読み込みながらも、処理可能な単位に切り出します。

ここでは Node.js の stream.Transform API を利用したストリーミングパーサーを作成します。

import { Transform, TransformCallback } from 'node:stream'

class CsvParseStream extends Transform {
  private _buffer = ''

  constructor() {
    super({ readableObjectMode: true })
  }

  _transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback) {
    this._buffer += chunk.toString('utf8')
    
     // 実際の CSV ではクォート中の改行など複雑な仕様があるため、後ほど紹介する `csv-parse` のようなライブラリ利用を推奨します
    const [parsedRows, remain] = parseCsv(this._buffer)
    this._buffer = remain
    for (const row of parsedRows) {
      const ok = this.push(row)  // ここで object を下流に流しています
      if (!ok) {
        // `push()` が false なら readable のバッファが満杯になっています
        // Node が自動でバックプレッシャーを効かせるため、ここで無理に push し続けないのが望ましいです
        break
      }
    }
    callback()
  }

  _flush(callback: TransformCallback) {
    // バッファを利用し最終行のデータを flush する
    callback()
  }
}

import * as fs from 'fs'

fs.createReadStream('input.csv')
  .pipe(new CsvParseStream())
  .on('data', (row) => {
    // ...
  })

stream.Transform は、書き込み側 (writable) と読み取り側 (readable) のバッファを内部に持ちます。on('data', ...) が遅い場合は、readable のバッファが次第に High Water Mark に達し、それが write() の返り値が false になることを通じて writable 側にも伝播します。それを受けて Node の pipe()fs.ReadStreampause() します。stream.Transform を継承することで、Node のバックプレッシャーの仕組みの恩恵に浴することができます。

先述のストリーミング処理のコードに、このパーサーを組み込むと以下の通りになります。CSV のレコード1行1行を値として取得できるようになりました。従来のコードが CSV ファイルを読み込み、行毎に処理を行っていた場合は、そのループ内の処理を転用できます。

busboy.on('file', (fieldname, file) => {
  const parser = new CsvParseStream()
  parser.on('data', (row) => { void processRow(row) })
  parser.on('error', (err) => {})
  parser.on('end', () => {})
  file.pipe(parser)
})

busboy 自体もストリーミングパーサーであることを考えれば、各種データのストリーミングパースに特化したパイプラインを連結することで、multipart/form-data から CSV レコード1行1行を読み取る処理までを実現していると言えます。

一部のメジャーな種類のファイルであれば、そのストリーミングパース機能を提供するライブラリが公開されています。CSV ファイルであれば csv-parse というパッケージがあります。自前でパース処理を書くのは手間であるため、ここでは便利なライブラリを利用します。

import { parse } from 'csv-parse'

busboy.on('file', (fieldname, file) => {
  const parser = parse(/* cf. https://csv.js.org/parse/options/ */)
  parser.on('data', (row) => { void processRow(row) })
  parser.on('error', (err) => {})
  parser.on('end', () => {})
  file.pipe(parser)
})

大きなファイルをストリーミングで受信しながら処理できるようになりました。

並列分散処理する

巨大ファイルを処理できるようになったと言っても、現実的には速度の問題があります。ファイルのサイズに比例してチャンク数が増加するため、チャンク1つ当たりの処理に要する時間との積以上は待つことになります。

チャンク間の多様な依存関係を考慮する必要が無い場合は、並列化により速度を改善できます。

const CONCURRENCY = 10

busboy.on('file', (fieldname, file) => {
  let active = 0
  let ended = false
  const pending = new Set<Promise<void>>()

  const parser = parse(/* cf. https://csv.js.org/parse/options/ */)
  parser.on('data', (row) => {
    active++
    const p = (async () => {
      try {
        await processRow(row)
      } catch (err) {
        // ...
      } finally {
        active--
        pending.delete(p)
        if (active < CONCURRENCY && !ended) parser.resume()
      }
    })()
    pending.add(p)
    if (active >= CONCURRENCY) parser.pause()
  })
  parser.on('end', async () => {
    ended = true
    await Promise.all(pending) // 簡略化していますが実際は HTTP ハンドラー内で `await` できるようにします
  })
  file.pipe(parser)
})

この並列化では1つのファイルを同一のサーバー内で処理する制約が残ります。サーバーの処理性能がファイルの処理速度を律速します。その制約を撤廃し、より高度な並列化を実現する場合は、分散処理も導入します。

const queuePublisher = new QueuePublisher()

parser.on('data', (row) => {
  void queuePublisher.writeMessage({ row })
})

キューイングシステムを利用した分散処理については、CyberAgent Developers Advent Calendar 2025 で公開した「本番で死なないキューコンシューマー設計論」が参考になるかもしれません。

const queueConsumer = new QueueConsumer()

const messages = queueConsumer.readMessages()
await Promise.all(messages.map(async (message) => { await handleMessage(message) }))

async function handleMessage(message) {
  //...
}

まとめ

スケール可能性の高い効率的なファイル処理には以下の要素が重要でした。

一番にストリーミングで使用メモリを常に低く抑える必要があります。次に、チャンク単位でファイル処理を可能にするためにストリーミングパースを行います。さらに、効率性のため並列分散処理を導入することが望ましいです。

これまでに紹介した、ストリーミング、ストリーミングパース、並列分散処理を取り入れたプログラムは以下のようになります。

// app.ts
import Busboy from 'busboy'
import { parse } from 'csv-parse'
import { Hono } from 'hono'
import { Readable } from 'node:stream'

const app = new Hono()

const queuePublisher = new QueuePublisher()

app.post('/upload', async (c) => {
  const contentType = c.req.header('content-type')
  if (!contentType?.startsWith('multipart/form-data')) {
    return c.json({ error: 'Content-Type は multipart/form-data が必要です' }, 400)
  }
  
  const body = c.req.raw.body
  if (!body) {
    return c.json({ error: '空のボディです' }, 400)
  }

  const busboy = Busboy({
    headers: { 'content-type': contentType },
    highWaterMark: 1024 * 64,
    fileHwm: 1024 * 1024
  })
  const nodeReadable = Readable.fromWeb(body as any)
  
  try {
    await new Promise<void>((resolve, reject) => {
      busboy.on('file', (fieldname, file) => {
        if (fieldname !== 'file') {
          file.resume()
          return
        }
        const parser = parse(/* cf. https://csv.js.org/parse/options/ */)
        parser.on('data', (row) => { void queuePublisher.writeMessage({ row }) })
        parser.on('error', (err) => {})
        file.pipe(parser)
      })
      busboy.on('error', (err) => reject(err as Error))
      busboy.on('finish', () => resolve(null))
      nodeReadable.pipe(busboy)
    })
  } catch {
    return c.json({ error: 'アップロードエラー' }, 500)
  }
  return c.json({ message: 'ok' })
})

// consumer.ts
const queueConsumer = new QueueConsumer()
const messages = queueConsumer.readMessages()
await Promise.all(messages.map(async (message) => { await handleMessage(message) }))

スケール可能性の高い効率的なファイル処理に辿り着きました。

採用情報

AI Shiftではエンジニアの採用に力を入れています!
少しでも興味を持っていただけましたら、カジュアル面談でお話しませんか?
(オンライン・19時以降の面談も可能です!)

【面談フォームはこちら】
https://hrmos.co/pages/cyberagent-group/jobs/1826557091831955459


明日は AI Shift のチーフエバンジェリストである及川信太郎さんの記事です。

AI Shift Tech Blog

Discussion