Open11

Web Streams APIのReadableStreamBYOBReader

bellbindbellbind

ReadableStreamとReader

read()することで次々とオブジェクトを受け取れるstreamオブジェクトがReadableStream
stream.getReader()で獲得し、実際にread()メソッドを持っているアダプタがReader

ただし、read()できるReaderインスタンスは一つだけであり、その権利はreleaseLock()することで開放される。

たとえば現在、fetch()ResponsebodyプロパティはReadableStreamのインスタンスとなっている。

fetchのreader利用例
const response = await fetch("http:/example.com");
const readable = response.body;
const reader = readable.getReader();
const decoder = new TextDecoder();
while (true) {
  const {done, value} = await reader.read();
  if (done) break;
  console.log(decoder.decode(value, {stream: true}));
}
reader.releaseLock();

ちなみにReadableStreamasyncIteratorを実装しているのでfor awaitループからも利用可能。

fetchのfor awaitループ利用例
const response = await fetch("http:/example.com");
const readable = response.body;
const decoder = new TextDecoder();
for await (const value of readable) {
   console.log(decoder.decode(value, {stream: true}));
}
bellbindbellbind

BYOBなbytes Reader

BYOBは"Bring Your Own Buffer"の頭文字。

readするときに、呼び出し側で用意したサイズのTypedArrayバッファビューを渡せば、値を詰めてもらえる、仕組み。streamで扱う対象がバイトストリームであるとき限定の特別な機能。

要素サイズの決まっているバイナリフォーマットの読み出しなどで、バイトサイズを指定して、値を読み込むときに役立つ。

利用例
const reader = readable.getReader({mode: "byob"}); // modeオプションが必須
{
  const {value, done} = await reader.read(new Uint32Array(4)); // 引数にTypedArray(`DataView`も可)インスタンスを渡す
  console.assert(value instanceof Uint32Array); // 引数と同じ型のTypedArrayがvalueに入る
}

注意点は、引数のTypedArrayインスタンスはそれ以降普通には利用できなくなることである。
具体的には参照するview.bufferがdetachされ、保持する値にアクセスできなくなる(byteLengthにはアクセスできる)。

  • NOTE: 決してdetachされないSharedArrayBufferをbufferとして参照するTypedArrayインスタンスもview引数には使用できない

このため、他で使用しているTypedArrayのsubarray()で作成したTypedArrayインスタンスを渡すことなどは、危険な使用法である。

読み込みサイズが短いときのvalue

Readableのsourceのpull()メソッドの実装次第で渡す量は自由に決められるので、不足分を埋め込む実装も可能。

ただしpull()実装で、のこり3バイトしかないときに3バイトとして返した(respond(3))場合は以下のようになる。

  • read(new Uint8Array(8)): valuelengthが3、byteLengthが3、buffer.byteLengthが8のUint8Array
  • read(new Uint16Array(4)): valuelengthが1でbyteLengthが2、buffer.byteLengthが8のUint16Array

sourceのpull()実装では、残りが要素サイズより小さい時の処理に注意する必要があるだろう。
(close()したつもりがclose()されてない可能性がある)。

bellbindbellbind

type: "bytes"なReadableStream

ReadableStreamのコンストラクタ引数sourceで、source.type"bytes"のときのみ、getReader({mode: "byob"})が使用可能となる。

const byobReadable = new ReadableStream({
  type: "bytes",
  async start(controller) {...},
  async pull(controller) {...},
  ...
});
const byobReader = byobReadable.getReader({mode: "byob"});
const {done, value} = await byobReader.read(new DataView(new ArrayBuffer(4)));

mode: "byob"なreaderでreader.read(view)が呼び出されると、source.pull(controller)が呼び出される。
type: "bytes"なとき、controllerは、ReadableBytesStreamControllerのインスタンスが渡る(start`も)。

ReadableBytesStreamControllerでは、controller.byobRequestプロパティが存在する。
このcontroller.byobRequestcontroller.byobRequest.viewプロパティには、getReader(view)viewと同じサイズのUint8Arrayオブジェクトが渡る。
async pull(controller)の中では、このcontroller.byobRequest.view.bufferに対し値を書き込み、書き込んだサイズを、controller.byobRequest.respond(size)で渡すことでawait reader.read(view)が完了する。

ただし、以下の状況では、byobRequestプロパティはnullになる。

  • start(controller)controller
  • source.autoAllocateChunkSizeが設定されていない場合に、reader = readable.getReader()reader.read()で呼び出されたときのpull(controller)

もし source.autoAllocateChunkSizeが設定されていれば、pull(controller)ではつねにcontroller.byobRequestが利用できる状態になる。この場合、nullにならずそのviewプロパティにはautoAllocateChunkSizeUint8Arrayが入る。

byobRequestnullの場合は、controller.enqueue(chunk)によってデータを渡すことができる。

コード例
// readable sourceの例
const byob127source = () => ({
  type: "bytes",
  autoAllocateChunkSize: 12,
  async start(controller) {
    console.log(controller.byobRequest, controller.desiredSize);
    this.remains = 127;
  },
  async pull(controller) {
    const view = controller.byobRequest.view; // as Uint8Array
    const size = Math.min(this.remains, view.byteLength);
    for (let i = 0; i < size; i++) view[i] = this.remains - i;
    this.remains -= size;
    controller.byobRequest.respond(size);
    if (this.remains === 0) controller.close();
  },
});

{// 利用側の例
  const readable = new ReadableStream(byob127source());
  const buffs = [];
  const reader = readable.getReader({mode: "byob"});
  for (let i = 0; i < 15; i++) {
    const view = new Int16Array(5); // read(view)したら再利用できないので毎回newする
    try {
       const {done, value} = await reader.read(view); // mode: "byob"でのreadではviewは必須
       if (done) break;
       buffs.push(value); // valueはUint16Array
    } catch (error) {
      console.log(error); //書き込みサイズに端数がでるとき(例: Int16Arrayで奇数サイズの書き込み)
    }
  }
}

注意点

  • await reader.read()が進行するには、async pull()からcontroller.byobRequest.respond(size)(もしくはrespondWithNewView(view))を呼びだす必要がある。
    • このため、終了しているとしてpull()内で controller.close()を呼んだだけではawait状態は解除されない
    • controller.close()する前では、byobRequest.respond(size)sizeで0を渡すとエラーが発生する
    • 正常に振る舞わせるには、最後のrespond()をしたあとで、次のpull()が 呼び出される前にcontroller.close()しなくてはいけない。(つまり、読み込み対象がちょうど終わりでも先読みする必要がある)
    • 正常に振る舞わせるには、最後のrespond(size)をしたあとでcontroller.close()するか、そのあとのpull()の中でcontroller.close()してからrespond(0)をする。
yskszk63yskszk63

失礼いたします。
こちらのスクラップ、大変参考になりました。

byobRequest.respond(size)のsizeで0を渡すとエラーが発生する

controller.close()を呼び出したあとはbyobRequest.respond(0)が有効なようです。
GitHubのIssueですが、下記を参考にしました。
https://github.com/whatwg/streams/issues/1170#issuecomment-933674328

↓実験したコードです。
https://gist.github.com/yskszk63/460760bbefa4ef24c9589dcf132a8232

bellbindbellbind

情報ありがとうございます。
respond(0)するにはその前にclose()することで可能、という仕様だったのですね。

以下のコードで確認できました。

// readable sourceの例
const byob128source = () => ({
  type: "bytes",
  autoAllocateChunkSize: 12,
  async start(controller) {
    console.log(controller.byobRequest, controller.desiredSize);
    this.remains = 128;
  },
  async pull(controller) {
    const view = controller.byobRequest.view; // as Uint8Array
    console.log(view.byteLength); // close後は呼ばれない
    const size = Math.min(this.remains, view.byteLength);
    for (let i = 0; i < size; i++) view[i] = this.remains - i;
    this.remains -= size;
    if (this.remains === 0 && size === 0) controller.close(); //sizeが0のときrespond(0)より先にclose()する
    controller.byobRequest.respond(size);
   },
});

{// 利用側の例
  const readable = new ReadableStream(byob127source());
  const buffs = [];
  const reader = readable.getReader({mode: "byob"});
  for (let i = 0; i < 10; i++) {
    const view = new Int8Array(16); // read(view)したら再利用できないので毎回newする
    //const view = new Int8Array(15); // read(view)したら再利用できないので毎回newする
    try {
      const {done, value} = await reader.read(view); // mode: "byob"でのreadではviewは必須
      console.log(done, value);
      ///if (done) break;
      buffs.push(value);
    } catch (error) {
      console.log(error); //書き込みサイズに端数がでるとき(例: Int16Arrayで奇数サイズの書き込み)
    }
  }
}
bellbindbellbind

Web API DecompressionStreamとBYOB

Web APIには、Uint8ArrayReadableStreamを、gzipやzlib deflateで圧縮・展開するCompressionStreamDecompressionStreamが提案され、すでにchromeブラウザやdenoには実装されている。

これらはTransformStreamとして実装されており、readable.pipeThrough(new DecompressionStream("gzip"))のように使用する。

しかし、現状TransformStream自体がtype: "bytes"に対応しておらず、つまりgetReadable({mode: "byob"})で利用することができない(ただし、readableTypeプロパティが予約されてはある)。

このためCompressionStream/DecompressionStreamともに、固定長(最大byteLength = 65536)のUint8Arrayを受け取る形式でしか利用できない。

(そこで、以下2パターンでBYOBエミュレーションを実装してみた)

bellbindbellbind

BYOBエミュレーション1: read(view)を可能にするReaderのラッパークラス

mimic-byob-reader.js
// Emulate BYOB reader for Uint8Array reader of DecompressionStream readable
export const MimicBYOBReader = class {
  #closed; #reader; #chunk; #done;
  constructor(u8reader) {
    this.#reader = u8reader;
    this.#chunk = null;
    this.#done = false;
    this.#closed = false;
  }
  get closed() {return this.#reader.closed;}
  cancel() {return this.#reader.cancel();}
  releaseLock() {return this.#reader.releaseLock();}

  //NOTE: Web API BYOBReader's passed view `buffer` is "detached" (it cannot access view's array values)
  async read(view) {
    if (view.byteLength === 0) throw new TypeError("it must be view.byteLength > 0");
    let u8view = new Uint8Array(view.buffer, view.byteOffset, view.byteLength);
    if (this.#closed) return {done: true, value: new view.constructor(view.buffer, view.byteOffset, 0)};
    while (!this.#chunk || this.#chunk.byteLength < u8view.byteLength) {//[chunk shorter than view]
      if (this.#chunk) {
        u8view.set(this.#chunk);
        u8view = u8view.subarray(this.#chunk.byteLength);
      }
      const {done, value} = await this.#reader.read();
      this.#done = done;
      if (done) {//[just after the last chunk]
        this.#closed = true;
        this.#chunk = null;
        const blen = u8view.byteOffset - view.byteOffset, bpe = view.BYTES_PER_ELEMENT ?? 1;
        const len = Math.trunc(blen / bpe) * bpe;
        return {done: blen === 0, value: new view.constructor(view.buffer, view.byteOffset, len)};
      }
      if (!(value instanceof Uint8Array)) throw new TypeError(`Must be Uint8Array reader but: ${value}`);
      this.#chunk = value;
    }
    u8view.set(this.#chunk.subarray(0, u8view.byteLength));
    this.#chunk = this.#chunk.subarray(u8view.byteLength);
    return {done: false, value: new view.constructor(view.buffer, view.byteOffset, view.length)};
  }
};
使用例
import {MimicBYOBReader} from "./mimic-byob-reader.js";

...
const reader = new MimicBYOBReader(readable.getReader());
const {done, view} = await reader.read(new Uint8Array(10));

制限

  • read(view)viewの上書きはしないが、detachもしないので再利用できてしまう
bellbindbellbind

BYOBエミュレーション2: BYOB可能なTransformStream実装

byob-transform.js
// BYOB emulation as TransformStream for `u8readable.pipeThrough(new BYOBTransform())`
const newQueue = () => {
  const [gets, polls] = [[], []];
  const next = () => new Promise(get => polls.length > 0 ? polls.shift()(get) : gets.push(get));
  const poll = () => new Promise(poll => gets.length > 0 ? poll(gets.shift()) : polls.push(poll));
  const push = async value => (await poll())({value, done: false});
  const close = async () => (await poll())({done: true});
  return {next, push, close, [Symbol.asyncIterator]() {return this}};
};

export const BYOBTransform = class {
  constructor(transform = {}) {
    const queue = newQueue();
    let chunk = null;
    this.readable = new ReadableStream({
      type: "bytes",
      autoAllocateChunkSize: transform.autoAllocateChunkSize,
      async pull(controller) {
        let view = controller.byobRequest.view;
        while (!chunk || chunk.byteLength < view.byteLength) {
          if (chunk) {
            view.set(chunk);
            view = view.subarray(chunk.byteLength);
          }
          const {done, value} = await queue.next();
          chunk = value;
          if (done) {
            const size = view.byteOffset - controller.byobRequest.view.byteOffset;
            if (size === 0) controller.close();
            controller.byobRequest.respond(size);
            if (size > 0) controller.close();
            return;
          }
        }
        view.set(chunk.subarray(0, view.byteLength));
        chunk = chunk.subarray(view.byteLength);
        const size = view.byteOffset + view.byteLength - controller.byobRequest.view.byteOffset;
        controller.byobRequest.respond(size);
      },
    });
    this.writable = new WritableStream({
      async write(chunk, controller) {await queue.push(chunk);},
      async close(controller) {await queue.close();},
    });
  }
};
使用例
import {BYOBTransform} from "./byob-transform.js";

...
const reader = readable.pipeThrough(new BYOBTransform()).getReader({mode: "byob"});
const {done, view} = await reader.read(new Uint8Array(10));
bellbindbellbind

DecompressionStreamでBYOB読み込みする例: MNISTデータ読み込み

main.js
import {MimicBYOBReader} from "./mimic-byob-reader.js";
import {BYOBTransform} from "./byob-transform.js";

// MNIST data from: http://yann.lecun.com/exdb/mnist/
const mnistUrl = {
  train: {
    images: "./train-images-idx3-ubyte.gz",
    labels: "./train-labels-idx1-ubyte.gz",
  },
  t10k: {
    images: "./t10k-images-idx3-ubyte.gz",
    labels: "./t10k-labels-idx1-ubyte.gz",
  },
};
//[MNIST gzip decompressed file format]
// images:
//  0-3: magic = 2051 (Big Endian)
//  4-7: image count = train 60000 | t10k 10000 (Big Endian)
//  8-11: image width = 28 (Big Endian)
//  12-15: image height = 28 (Big Endian)
//  16-799:  28x28 pixel bytes(white 0-255 black) of image[0]
//  800-1583: 28x28 pixel bytes(white 0-255 black) of image[1]
//  ...
// labels:
//  0-3: magic = 2049 (Big Endian)
//  4-7: image count = train 60000 | t10k 10000 (Big Endian)
//  8: a number value(0-9) of image[0] 
//  9: a number value(0-9) of image[1]
//  ...
//

// load mnist images and labels with urls
const loadMnist = async function* (urls) {
  const imageReadable = (await fetch(urls.images)).body.pipeThrough(new DecompressionStream("gzip"));
  const labelReadable = (await fetch(urls.labels)).body.pipeThrough(new DecompressionStream("gzip"));
  
  const imageReader = imageReadable.pipeThrough(new BYOBTransform()).getReader({mode: "byob"});
  //const labelReader = labelReadable.pipeThrough(new BYOBTransform()).getReader({mode: "byob"});
  //const imageReader = new MimicBYOBReader(imageReadable.getReader());
  const labelReader = new MimicBYOBReader(labelReadable.getReader());
  try {
    const imageMagic = (await imageReader.read(new DataView(new ArrayBuffer(4)))).value.getUint32(0, false);
    if (imageMagic !== 2051) throw new TypeError("invalid magic of images file");
    const labelMagic = (await labelReader.read(new DataView(new ArrayBuffer(4)))).value.getUint32(0, false);
    if (labelMagic !== 2049) throw new TypeError("invalid magic of labels file");
    
    const count = (await imageReader.read(new DataView(new ArrayBuffer(4)))).value.getUint32(0, false);
    const labelCount = (await labelReader.read(new DataView(new ArrayBuffer(4)))).value.getUint32(0, false);
    if (count !== labelCount) throw new TypeError(`mismatched counts: images, labels: ${count}, ${labelCount}`);
    
    const width = (await imageReader.read(new DataView(new ArrayBuffer(4)))).value.getUint32(0, false);
    const height = (await imageReader.read(new DataView(new ArrayBuffer(4)))).value.getUint32(0, false);
    if (width !== 28 || height !== 28) throw new TypeError(`invalid image size(28x28): ${width}x${height}`);
    
    const bytes = width * height, images = new Array(count), labels = new Array(count);
    for (let i = 0; i < count; i++) {
      const image = (await imageReader.read(new Uint8Array(bytes))).value;
      const label = (await labelReader.read(new Uint8Array(1))).value;
      yield {image, label};
      [images[i], labels[i]] = [image, label];
    }
    return {images, labels};
  } finally {
    imageReader.releaseLock();
    labelReader.releaseLock();
  }
};

// mnist image view for HTML canvas
const toImageData = image => {
  const id = new ImageData(28, 28);
  for (let x = 0; x < 28; x++) for (let y = 0; y < 28; y++) {
    const i = y * 28 + x, offs = i * 4;
    id.data[offs] = id.data[offs + 1] = id.data[offs + 2] = 255 - image[i];
    id.data[offs + 3] = 255;
  }
  return id;
};
const toCanvas = image => {
  const canvas = document.createElement("canvas");
  canvas.width = canvas.height = 28;
  canvas.style.borderStyle = "solid";
  canvas.getContext("2d").putImageData(toImageData(image), 0, 0);
  return canvas;
};


//[example] show MNIST images
for await (const {image, label} of loadMnist(mnistUrl.t10k)) {
  const canvas = toCanvas(image);
  canvas.title = label;
  document.body.append(canvas);
  //await new Promise(f => requestAnimationFrame(f));
}
bellbindbellbind

type: "bytes"なReaderの読み込みパターン

以下は、実験用のReadableStreamのsource。指定した数ぶん降順で整数値を書き込む。最後に書き込む値は1。

実験用ReadableSource
const byob = (remains, autoAllocateChunkSize) => ({
  type: "bytes",
  autoAllocateChunkSize: autoAllocateChunkSize,
  async start(controller) {
    this.remains = remains;
  },
  async pull(controller) {
    const view = controller.byobRequest ? controller.byobRequest.view : new Uint8Array(12);
    const size = Math.min(this.remains, view.byteLength);
    for (let i = 0; i < size; i++) view[i] = this.remains - i;
    this.remains -= size;
    controller.byobRequest ? controller.byobRequest.respond(size) : controller.enqueue(view.slice(0, size));
    if (this.remains === 0) controller.close();
  },
});

1. Default Readerでchunkを読み込むパターン

const readable128 = new ReadableStream(byob(128, 12));
const buffs = [];
const reader = readable128.getReader();
for (;;) {
  const {done, value} = await reader.read();
  if (done) break;
  buffs.push(value);
}
  • Default Readerでは、donetrueのときはvalueundefinedになる

2. BYOB Readerでchunkを読み込むパターン

const readable128 = new ReadableStream(byob(128, 12));
const buffs = [];
const reader = readable128.getReader({mode: "byob"});
for (;;) {
  const {done, value} = await reader.read(new Uint8Array(12));
  if (done) break;
  buffs.push(value);
}
  • BYOB Readerでは、donetrueのときはvalueは長さ0のTypedArrayになる

3. BYOB Readerで1バッファに追加書き込みするパターン

const readable128 = new ReadableStream(byob(128, 12));
let buf = new ArrayBuffer(150), offs = 0; // 上書きする変数
const reader = readable128.getReader({mode: "byob"});
for (;;) {
  const {done, value} = await reader.read(new Uint8Array(buf, offs, 10));
  buf = value.buffer; // 注1: もとのbufは使用不能になっているので、bufはvalue.bufferで必ず上書きすること
  offs += value.byteLength; // 注2: 最後の読み込みではvalue.byteLengthはview.byteLength以下になりうる
  if (done) break; // 注3: done == trueのときでも、value.bufferを取り出したあとにbreakすること
}
console.log(new Uint8Array(buf, 0, offs));
  • 他で使われているarraybufferに上書きできるわけではないので注意。readに渡したbufのbufferはdetachされ、その値にアクセスできなくなる。
  • BYOB Readerでは、結果のvalue.bufferには、 引数のview.bufferの値がコピーされているので、連結されたbufferを得ることができる
  • breakの実行位置に注意