Web Streams APIのReadableStreamBYOBReader
Web Streams API
- WHATWG Streams Standard: https://streams.spec.whatwg.org/
- MDNの解説: https://developer.mozilla.org/ja/docs/Web/API/Streams_API
ブラウザに標準搭載されているStreams API。現在は、denoやnode.jsにも搭載されている。
(node.js-18より。node.js-17以前では、import {ReadableStream, WritableStream} from "node:stream/web";
することで使用可能)
ReadableStreamとReader
read()
することで次々とオブジェクトを受け取れるstreamオブジェクトがReadableStream
。
stream.getReader()
で獲得し、実際にread()
メソッドを持っているアダプタがReader
。
ただし、read()
できるReader
インスタンスは一つだけであり、その権利はreleaseLock()
することで開放される。
たとえば現在、fetch()
のResponse
のbody
プロパティはReadableStream
のインスタンスとなっている。
const response = await fetch("http:/example.com");
const readable = response.body;
const reader = readable.getReader();
const decoder = new TextDecoder();
while (true) {
const {done, value} = await reader.read();
if (done) break;
console.log(decoder.decode(value, {stream: true}));
}
reader.releaseLock();
ちなみにReadableStream
はasyncIterator
を実装しているのでfor await
ループからも利用可能。
const response = await fetch("http:/example.com");
const readable = response.body;
const decoder = new TextDecoder();
for await (const value of readable) {
console.log(decoder.decode(value, {stream: true}));
}
BYOBなbytes Reader
BYOBは"Bring Your Own Buffer"の頭文字。
readするときに、呼び出し側で用意したサイズのTypedArrayバッファビューを渡せば、値を詰めてもらえる、仕組み。streamで扱う対象がバイトストリームであるとき限定の特別な機能。
要素サイズの決まっているバイナリフォーマットの読み出しなどで、バイトサイズを指定して、値を読み込むときに役立つ。
const reader = readable.getReader({mode: "byob"}); // modeオプションが必須
{
const {value, done} = await reader.read(new Uint32Array(4)); // 引数にTypedArray(`DataView`も可)インスタンスを渡す
console.assert(value instanceof Uint32Array); // 引数と同じ型のTypedArrayがvalueに入る
}
注意点は、引数のTypedArrayインスタンスはそれ以降普通には利用できなくなることである。
具体的には参照するview.buffer
がdetachされ、保持する値にアクセスできなくなる(byteLength
にはアクセスできる)。
- NOTE: 決してdetachされない
SharedArrayBuffer
をbufferとして参照するTypedArrayインスタンスもview引数には使用できない
このため、他で使用しているTypedArrayのsubarray()
で作成したTypedArrayインスタンスを渡すことなどは、危険な使用法である。
value
読み込みサイズが短いときのReadableのsourceのpull()
メソッドの実装次第で渡す量は自由に決められるので、不足分を埋め込む実装も可能。
ただしpull()
実装で、のこり3バイトしかないときに3バイトとして返した(respond(3)
)場合は以下のようになる。
-
read(new Uint8Array(8))
:value
はlength
が3、byteLength
が3、buffer.byteLength
が8のUint8Array
-
read(new Uint16Array(4))
:value
はlength
が1でbyteLength
が2、buffer.byteLength
が8のUint16Array
sourceのpull()
実装では、残りが要素サイズより小さい時の処理に注意する必要があるだろう。
(close()
したつもりがclose()
されてない可能性がある)。
type: "bytes"
なReadableStream
ReadableStream
のコンストラクタ引数source
で、source.type
が"bytes"
のときのみ、getReader({mode: "byob"})
が使用可能となる。
const byobReadable = new ReadableStream({
type: "bytes",
async start(controller) {...},
async pull(controller) {...},
...
});
const byobReader = byobReadable.getReader({mode: "byob"});
const {done, value} = await byobReader.read(new DataView(new ArrayBuffer(4)));
mode: "byob"
なreaderでreader.read(view)
が呼び出されると、source.pull(controller)
が呼び出される。
type: "bytes"なとき、
controllerは、
ReadableBytesStreamControllerのインスタンスが渡る(
start`も)。
ReadableBytesStreamController
では、controller.byobRequest
プロパティが存在する。
このcontroller.byobRequest
のcontroller.byobRequest.view
プロパティには、getReader(view)
の view
と同じサイズのUint8Array
オブジェクトが渡る。
async pull(controller)
の中では、このcontroller.byobRequest.view.buffer
に対し値を書き込み、書き込んだサイズを、controller.byobRequest.respond(size)
で渡すことでawait reader.read(view)
が完了する。
ただし、以下の状況では、byobRequest
プロパティはnull
になる。
-
start(controller)
のcontroller
-
source.autoAllocateChunkSize
が設定されていない場合に、reader = readable.getReader()
のreader.read()
で呼び出されたときのpull(controller)
もし source.autoAllocateChunkSize
が設定されていれば、pull(controller)
ではつねにcontroller.byobRequest
が利用できる状態になる。この場合、null
にならずそのview
プロパティにはautoAllocateChunkSize
なUint8Array
が入る。
byobRequest
がnull
の場合は、controller.enqueue(chunk)
によってデータを渡すことができる。
// readable sourceの例
const byob127source = () => ({
type: "bytes",
autoAllocateChunkSize: 12,
async start(controller) {
console.log(controller.byobRequest, controller.desiredSize);
this.remains = 127;
},
async pull(controller) {
const view = controller.byobRequest.view; // as Uint8Array
const size = Math.min(this.remains, view.byteLength);
for (let i = 0; i < size; i++) view[i] = this.remains - i;
this.remains -= size;
controller.byobRequest.respond(size);
if (this.remains === 0) controller.close();
},
});
{// 利用側の例
const readable = new ReadableStream(byob127source());
const buffs = [];
const reader = readable.getReader({mode: "byob"});
for (let i = 0; i < 15; i++) {
const view = new Int16Array(5); // read(view)したら再利用できないので毎回newする
try {
const {done, value} = await reader.read(view); // mode: "byob"でのreadではviewは必須
if (done) break;
buffs.push(value); // valueはUint16Array
} catch (error) {
console.log(error); //書き込みサイズに端数がでるとき(例: Int16Arrayで奇数サイズの書き込み)
}
}
}
注意点
-
await reader.read()
が進行するには、async pull()
からcontroller.byobRequest.respond(size)
(もしくはrespondWithNewView(view)
)を呼びだす必要がある。- このため、終了しているとして
pull()
内でcontroller.close()
を呼んだだけではawait状態は解除されない。 -
controller.close()
する前では、byobRequest.respond(size)
のsize
で0を渡すとエラーが発生する 正常に振る舞わせるには、最後のrespond()
をしたあとで、次のpull()
が 呼び出される前にcontroller.close()
しなくてはいけない。(つまり、読み込み対象がちょうど終わりでも先読みする必要がある)- 正常に振る舞わせるには、最後の
respond(size)
をしたあとでcontroller.close()
するか、そのあとのpull()
の中でcontroller.close()
してからrespond(0)
をする。
- このため、終了しているとして
失礼いたします。
こちらのスクラップ、大変参考になりました。
byobRequest.respond(size)のsizeで0を渡すとエラーが発生する
controller.close()
を呼び出したあとはbyobRequest.respond(0)
が有効なようです。
GitHubのIssueですが、下記を参考にしました。
↓実験したコードです。
情報ありがとうございます。
respond(0)
するにはその前にclose()
することで可能、という仕様だったのですね。
以下のコードで確認できました。
// readable sourceの例
const byob128source = () => ({
type: "bytes",
autoAllocateChunkSize: 12,
async start(controller) {
console.log(controller.byobRequest, controller.desiredSize);
this.remains = 128;
},
async pull(controller) {
const view = controller.byobRequest.view; // as Uint8Array
console.log(view.byteLength); // close後は呼ばれない
const size = Math.min(this.remains, view.byteLength);
for (let i = 0; i < size; i++) view[i] = this.remains - i;
this.remains -= size;
if (this.remains === 0 && size === 0) controller.close(); //sizeが0のときrespond(0)より先にclose()する
controller.byobRequest.respond(size);
},
});
{// 利用側の例
const readable = new ReadableStream(byob127source());
const buffs = [];
const reader = readable.getReader({mode: "byob"});
for (let i = 0; i < 10; i++) {
const view = new Int8Array(16); // read(view)したら再利用できないので毎回newする
//const view = new Int8Array(15); // read(view)したら再利用できないので毎回newする
try {
const {done, value} = await reader.read(view); // mode: "byob"でのreadではviewは必須
console.log(done, value);
///if (done) break;
buffs.push(value);
} catch (error) {
console.log(error); //書き込みサイズに端数がでるとき(例: Int16Arrayで奇数サイズの書き込み)
}
}
}
DecompressionStream
とBYOB
Web API Web APIには、Uint8Array
なReadableStream
を、gzipやzlib deflateで圧縮・展開するCompressionStream
とDecompressionStream
が提案され、すでにchromeブラウザやdenoには実装されている。
これらはTransformStream
として実装されており、readable.pipeThrough(new DecompressionStream("gzip"))
のように使用する。
しかし、現状TransformStream
自体がtype: "bytes"
に対応しておらず、つまりgetReadable({mode: "byob"})
で利用することができない(ただし、readableType
プロパティが予約されてはある)。
このためCompressionStream
/DecompressionStream
ともに、固定長(最大byteLength = 65536)のUint8Array
を受け取る形式でしか利用できない。
(そこで、以下2パターンでBYOBエミュレーションを実装してみた)
read(view)
を可能にするReaderのラッパークラス
BYOBエミュレーション1: // Emulate BYOB reader for Uint8Array reader of DecompressionStream readable
export const MimicBYOBReader = class {
#closed; #reader; #chunk; #done;
constructor(u8reader) {
this.#reader = u8reader;
this.#chunk = null;
this.#done = false;
this.#closed = false;
}
get closed() {return this.#reader.closed;}
cancel() {return this.#reader.cancel();}
releaseLock() {return this.#reader.releaseLock();}
//NOTE: Web API BYOBReader's passed view `buffer` is "detached" (it cannot access view's array values)
async read(view) {
if (view.byteLength === 0) throw new TypeError("it must be view.byteLength > 0");
let u8view = new Uint8Array(view.buffer, view.byteOffset, view.byteLength);
if (this.#closed) return {done: true, value: new view.constructor(view.buffer, view.byteOffset, 0)};
while (!this.#chunk || this.#chunk.byteLength < u8view.byteLength) {//[chunk shorter than view]
if (this.#chunk) {
u8view.set(this.#chunk);
u8view = u8view.subarray(this.#chunk.byteLength);
}
const {done, value} = await this.#reader.read();
this.#done = done;
if (done) {//[just after the last chunk]
this.#closed = true;
this.#chunk = null;
const blen = u8view.byteOffset - view.byteOffset, bpe = view.BYTES_PER_ELEMENT ?? 1;
const len = Math.trunc(blen / bpe) * bpe;
return {done: blen === 0, value: new view.constructor(view.buffer, view.byteOffset, len)};
}
if (!(value instanceof Uint8Array)) throw new TypeError(`Must be Uint8Array reader but: ${value}`);
this.#chunk = value;
}
u8view.set(this.#chunk.subarray(0, u8view.byteLength));
this.#chunk = this.#chunk.subarray(u8view.byteLength);
return {done: false, value: new view.constructor(view.buffer, view.byteOffset, view.length)};
}
};
import {MimicBYOBReader} from "./mimic-byob-reader.js";
...
const reader = new MimicBYOBReader(readable.getReader());
const {done, view} = await reader.read(new Uint8Array(10));
制限
-
read(view)
のview
の上書きはしないが、detachもしないので再利用できてしまう
TransformStream
実装
BYOBエミュレーション2: BYOB可能な// BYOB emulation as TransformStream for `u8readable.pipeThrough(new BYOBTransform())`
const newQueue = () => {
const [gets, polls] = [[], []];
const next = () => new Promise(get => polls.length > 0 ? polls.shift()(get) : gets.push(get));
const poll = () => new Promise(poll => gets.length > 0 ? poll(gets.shift()) : polls.push(poll));
const push = async value => (await poll())({value, done: false});
const close = async () => (await poll())({done: true});
return {next, push, close, [Symbol.asyncIterator]() {return this}};
};
export const BYOBTransform = class {
constructor(transform = {}) {
const queue = newQueue();
let chunk = null;
this.readable = new ReadableStream({
type: "bytes",
autoAllocateChunkSize: transform.autoAllocateChunkSize,
async pull(controller) {
let view = controller.byobRequest.view;
while (!chunk || chunk.byteLength < view.byteLength) {
if (chunk) {
view.set(chunk);
view = view.subarray(chunk.byteLength);
}
const {done, value} = await queue.next();
chunk = value;
if (done) {
const size = view.byteOffset - controller.byobRequest.view.byteOffset;
if (size === 0) controller.close();
controller.byobRequest.respond(size);
if (size > 0) controller.close();
return;
}
}
view.set(chunk.subarray(0, view.byteLength));
chunk = chunk.subarray(view.byteLength);
const size = view.byteOffset + view.byteLength - controller.byobRequest.view.byteOffset;
controller.byobRequest.respond(size);
},
});
this.writable = new WritableStream({
async write(chunk, controller) {await queue.push(chunk);},
async close(controller) {await queue.close();},
});
}
};
import {BYOBTransform} from "./byob-transform.js";
...
const reader = readable.pipeThrough(new BYOBTransform()).getReader({mode: "byob"});
const {done, view} = await reader.read(new Uint8Array(10));
DecompressionStreamでBYOB読み込みする例: MNISTデータ読み込み
- コード: https://gist.github.com/bellbind/b7803cd78249c95bc95f1084adb36eeb
- デモ: https://gist.githack.com/bellbind/b7803cd78249c95bc95f1084adb36eeb/raw/index.html
import {MimicBYOBReader} from "./mimic-byob-reader.js";
import {BYOBTransform} from "./byob-transform.js";
// MNIST data from: http://yann.lecun.com/exdb/mnist/
const mnistUrl = {
train: {
images: "./train-images-idx3-ubyte.gz",
labels: "./train-labels-idx1-ubyte.gz",
},
t10k: {
images: "./t10k-images-idx3-ubyte.gz",
labels: "./t10k-labels-idx1-ubyte.gz",
},
};
//[MNIST gzip decompressed file format]
// images:
// 0-3: magic = 2051 (Big Endian)
// 4-7: image count = train 60000 | t10k 10000 (Big Endian)
// 8-11: image width = 28 (Big Endian)
// 12-15: image height = 28 (Big Endian)
// 16-799: 28x28 pixel bytes(white 0-255 black) of image[0]
// 800-1583: 28x28 pixel bytes(white 0-255 black) of image[1]
// ...
// labels:
// 0-3: magic = 2049 (Big Endian)
// 4-7: image count = train 60000 | t10k 10000 (Big Endian)
// 8: a number value(0-9) of image[0]
// 9: a number value(0-9) of image[1]
// ...
//
// load mnist images and labels with urls
const loadMnist = async function* (urls) {
const imageReadable = (await fetch(urls.images)).body.pipeThrough(new DecompressionStream("gzip"));
const labelReadable = (await fetch(urls.labels)).body.pipeThrough(new DecompressionStream("gzip"));
const imageReader = imageReadable.pipeThrough(new BYOBTransform()).getReader({mode: "byob"});
//const labelReader = labelReadable.pipeThrough(new BYOBTransform()).getReader({mode: "byob"});
//const imageReader = new MimicBYOBReader(imageReadable.getReader());
const labelReader = new MimicBYOBReader(labelReadable.getReader());
try {
const imageMagic = (await imageReader.read(new DataView(new ArrayBuffer(4)))).value.getUint32(0, false);
if (imageMagic !== 2051) throw new TypeError("invalid magic of images file");
const labelMagic = (await labelReader.read(new DataView(new ArrayBuffer(4)))).value.getUint32(0, false);
if (labelMagic !== 2049) throw new TypeError("invalid magic of labels file");
const count = (await imageReader.read(new DataView(new ArrayBuffer(4)))).value.getUint32(0, false);
const labelCount = (await labelReader.read(new DataView(new ArrayBuffer(4)))).value.getUint32(0, false);
if (count !== labelCount) throw new TypeError(`mismatched counts: images, labels: ${count}, ${labelCount}`);
const width = (await imageReader.read(new DataView(new ArrayBuffer(4)))).value.getUint32(0, false);
const height = (await imageReader.read(new DataView(new ArrayBuffer(4)))).value.getUint32(0, false);
if (width !== 28 || height !== 28) throw new TypeError(`invalid image size(28x28): ${width}x${height}`);
const bytes = width * height, images = new Array(count), labels = new Array(count);
for (let i = 0; i < count; i++) {
const image = (await imageReader.read(new Uint8Array(bytes))).value;
const label = (await labelReader.read(new Uint8Array(1))).value;
yield {image, label};
[images[i], labels[i]] = [image, label];
}
return {images, labels};
} finally {
imageReader.releaseLock();
labelReader.releaseLock();
}
};
// mnist image view for HTML canvas
const toImageData = image => {
const id = new ImageData(28, 28);
for (let x = 0; x < 28; x++) for (let y = 0; y < 28; y++) {
const i = y * 28 + x, offs = i * 4;
id.data[offs] = id.data[offs + 1] = id.data[offs + 2] = 255 - image[i];
id.data[offs + 3] = 255;
}
return id;
};
const toCanvas = image => {
const canvas = document.createElement("canvas");
canvas.width = canvas.height = 28;
canvas.style.borderStyle = "solid";
canvas.getContext("2d").putImageData(toImageData(image), 0, 0);
return canvas;
};
//[example] show MNIST images
for await (const {image, label} of loadMnist(mnistUrl.t10k)) {
const canvas = toCanvas(image);
canvas.title = label;
document.body.append(canvas);
//await new Promise(f => requestAnimationFrame(f));
}
type: "bytes"
なReaderの読み込みパターン
以下は、実験用のReadableStreamのsource。指定した数ぶん降順で整数値を書き込む。最後に書き込む値は1。
const byob = (remains, autoAllocateChunkSize) => ({
type: "bytes",
autoAllocateChunkSize: autoAllocateChunkSize,
async start(controller) {
this.remains = remains;
},
async pull(controller) {
const view = controller.byobRequest ? controller.byobRequest.view : new Uint8Array(12);
const size = Math.min(this.remains, view.byteLength);
for (let i = 0; i < size; i++) view[i] = this.remains - i;
this.remains -= size;
controller.byobRequest ? controller.byobRequest.respond(size) : controller.enqueue(view.slice(0, size));
if (this.remains === 0) controller.close();
},
});
1. Default Readerでchunkを読み込むパターン
const readable128 = new ReadableStream(byob(128, 12));
const buffs = [];
const reader = readable128.getReader();
for (;;) {
const {done, value} = await reader.read();
if (done) break;
buffs.push(value);
}
- Default Readerでは、
done
がtrue
のときはvalue
はundefined
になる
2. BYOB Readerでchunkを読み込むパターン
const readable128 = new ReadableStream(byob(128, 12));
const buffs = [];
const reader = readable128.getReader({mode: "byob"});
for (;;) {
const {done, value} = await reader.read(new Uint8Array(12));
if (done) break;
buffs.push(value);
}
- BYOB Readerでは、
done
がtrue
のときはvalue
は長さ0のTypedArrayになる
3. BYOB Readerで1バッファに追加書き込みするパターン
const readable128 = new ReadableStream(byob(128, 12));
let buf = new ArrayBuffer(150), offs = 0; // 上書きする変数
const reader = readable128.getReader({mode: "byob"});
for (;;) {
const {done, value} = await reader.read(new Uint8Array(buf, offs, 10));
buf = value.buffer; // 注1: もとのbufは使用不能になっているので、bufはvalue.bufferで必ず上書きすること
offs += value.byteLength; // 注2: 最後の読み込みではvalue.byteLengthはview.byteLength以下になりうる
if (done) break; // 注3: done == trueのときでも、value.bufferを取り出したあとにbreakすること
}
console.log(new Uint8Array(buf, 0, offs));
- 他で使われているarraybufferに上書きできるわけではないので注意。readに渡したbufのbufferはdetachされ、その値にアクセスできなくなる。
- BYOB Readerでは、結果の
value.buffer
には、 引数のview.buffer
の値がコピーされているので、連結されたbufferを得ることができる - breakの実行位置に注意