RxJS を使ってあらゆるWeb素材を抽象化し、fp-tsで自在に読み込む
素材の抽象化
画像、動画、音声、Three.jsの素材、etc...、Webで使う素材は様々。
これら全てに個別の読み込み処理を書くよりも、素材として一つに抽象化できれば扱いやすい。
事前にローディング画面を表示しつつ使う素材を一気に読み込む
使うタイミングで都度読み込む
など読み込みを行うタイミングも様々だが、ここではシンプルに
- 読み込まれていなかったら読み込みを開始し、読み込みが完了したら素材を返す
- すでに読み込みが完了していたらその素材を返す
ようにできれば使い勝手が良さそうだ。
TypeScript と RxJS で抽象クラスを定義する
Promise を使ってもよいが、RxJSを使う理由としてはオペレータが充実していて、後述するが実際に使う際に直列、並列、もしくは最初に一つだけ読み込んでその後複数読み込む、などなど様々な処理がしやすいため。
構成要素はシンプルで
- 素材のurl
src: string
- 素材そのもの
asset$: Observable<T>
の2つとする。
これを TypeScript を用いて抽象クラスを書くと下記のようになる。
import { Observable } from 'rxjs';
import { shareReplay } from 'rxjs/operators'
export abstract class ObservableAsset<T> {
readonly src: string
private cache$: Observable<T> | undefined = undefined
constructor(src: string) {
this.src = src
}
get asset$() {
if (!this.cache$) {
this.cache$ = this.load().pipe(shareReplay(1))
}
return this.cache$
}
clear = () => {
this.unload?.()
this.cache$ = undefined
}
protected abstract load: () => Observable<T>
protected abstract unload: (() => void) | undefined
}
素材を T
として Generics で書いている。
実際に使う素材は asset$
の Observable<T>
で、まだ読み込まれていなかったら load()
メソッドで読み込みを開始して shareReplay(1)
でキャッシュ化、それを cache$
に格納し、すでに読み込みが完了していたらその素材(cache$
)を返すようにしている。
clear()
と unload()
を用意しているのは、使わなくなった場合にメモリを解放するため。解放処理自体は unload()
に書く。
以下はこの ObservableAsset
をもとに、素材別に実体のクラスを書いていく。
画像 ObservableImage
import { Observable } from 'rxjs'
import { ObservableAsset } from './ObservableAsset'
export type ObservableImageProps =
| {
type: 'loaded'
value: HTMLImageElement
}
| {
type: 'loading'
value: {
loaded: number
total: number
}
}
export class ObservableImage extends ObservableAsset<ObservableImageProps> {
protected load = () => {
const observable = new Observable<ObservableImageProps>((subscriber) => {
const image = new Image()
image.onload = () => {
subscriber.next({ type: 'loaded', value: image })
subscriber.complete()
}
image.onprogress = (progress) => {
if (progress.lengthComputable && progress.total > 0) {
const result = {
type: 'loading' as const,
value: {
loaded: progress.loaded,
total: progress.total,
},
}
subscriber.next(result)
}
}
image.onerror = (error) => {
subscriber.error(error)
}
image.src = this.src
})
return observable
}
protected unload = undefined
}
素材を T
として表すと書いたが、実際には素材の状態を書いている。
ここでは ObservableImageProps
が T
にあたる。
状態は読込済みと読込中の2種類があり、それぞれ loaded
, loading
で表していて、value
で読込済みの場合 HTMLImageElement
、読込中の場合 total
と loaded
で進捗状況を取得できるようにしている。
unload
は clear
で cache$
の参照が無くなると自動で削除されるため[1]実装していない。
この ObservableImage
を実際に使うには下記の様に書く。
const image = new ObservableImage('画像のパス')
const subscription = image.asset$.subscribe((asset) => {
if (asset.type === 'loaded') {
// 読み込み完了処理
// asset.value に HTMLImageElement が入っている
} else if (asset.type === 'loading') {
// 読込中の処理
// total と loaded で読み込みの進捗を計算できる
const { total, loaded } = asset.value
const progress = loaded / total
}
})
// 使わなくなったら unsubscribe
subscription.unsubscribe()
動画 ObservableVideo
import { Observable } from 'rxjs'
import axios from 'axios'
import * as O from 'fp-ts/lib/Option'
import { ObservableAsset } from './ObservableAsset'
export type ObservableVideoProps =
| {
type: 'loaded'
value: string
}
| {
type: 'loading'
value: {
loaded: number
total: number
}
}
export class ObservableVideo extends ObservableAsset<ObservableVideoProps> {
private videoSrc: O.Option<string> = O.none
protected load = () => {
const observable = new Observable<ObservableVideoProps>((subscriber) => {
const cancelToken = axios.CancelToken.source()
axios({
url: this.src,
method: 'GET',
timeout: 10 * 1000, // ms
responseType: 'blob',
onDownloadProgress: (progress: ProgressEvent<EventTarget>) => {
if (progress.lengthComputable && progress.total > 0) {
const result = {
type: 'loading' as const,
value: {
loaded: progress.loaded,
total: progress.total,
},
}
subscriber.next(result)
}
},
cancelToken: cancelToken.token,
})
.then((response) => {
const data = new Blob([response.data], { type: 'video/mp4' })
const url = URL.createObjectURL(data)
this.videoSrc = O.some(url)
subscriber.next({ type: 'loaded', value: url })
subscriber.complete()
})
.catch((error) => {
subscriber.error(error)
})
return () => {
cancelToken.cancel()
}
})
return observable
}
protected unload = () => {
if (O.isSome(this.videoSrc)) {
URL.revokeObjectURL(this.videoSrc.value)
}
this.videoSrc = O.none
}
}
動画の場合は若干特殊でバイナリーデータとして読み込む。Blobを作成し、URL.createObjectURL()
を呼び出して、Blob
を URL
に変換している。
この場合は unload 処理が必要なので URL.revokeObjectURL
で解放している。
videoSrc
に関数型言語ライブラリ fp-ts の Option
を使っていて、ここでは詳細な説明は省くが string | undefinde
のようなものだと考えてよい。undefinde
の場合は Option.none
、string
の場合は Option.some
、で value
に string
が入っているイメージだ。
またここでは axios を使っているが、ky などを使っても同様のものが書けるはず。
Three.js のテキスチャ ObservableThreeTexture
最後に Three.js のテキスチャのクラスを紹介する。
やっていることは Three.js が用意している TextureLoader
を使っているだけで前述の例とほぼ同様だ。
import { Observable } from 'rxjs'
import { Texture, TextureLoader } from 'three'
import { ObservableAsset } from '.ObservableAsset'
export type ThreeTextureProps = {
width: number
height: number
texture: Texture
}
export type ObservableThreeTextureProps =
| {
type: 'loaded'
value: ThreeTextureProps
}
| {
type: 'loading'
value: {
loaded: number
total: number
}
}
export class ObservableThreeTexture extends ObservableAsset<ObservableThreeTextureProps> {
protected load = () => {
const observable = new Observable<ObservableThreeTextureProps>((subscriber) => {
const loader = new TextureLoader()
loader.load(
this.src,
(data) => {
const image = data.image as unknown as { width: number; height: number }
const result = {
type: 'loaded' as const,
value: {
width: image.width,
height: image.height,
texture: data,
},
}
subscriber.next(result)
subscriber.complete()
},
(progress) => {
if (progress.lengthComputable && progress.total > 0) {
const result = {
type: 'loading' as const,
value: {
loaded: progress.loaded,
total: progress.total,
},
}
subscriber.next(result)
}
},
(error) => {
subscriber.error(error)
},
)
})
return observable
}
protected unload = undefined
}
読み込み処理応用
以下は大量の素材を、32素材ずつに分けて読み込む処理。
fp-ts
を使って素材の読み込みを32のチャンク(chunksof)に分けて並列(merge)に読み込み、それらを直列(concat)に繋いで読み込んでいる。
このような処理を書くときに fp-ts
は pipe
を使って関数合成ができ、コードも読みやすくなるため大変便利。
import { Observable, concat, merge, map } from 'rxjs'
import * as A from 'fp-ts/lib/Array'
import { pipe } from 'fp-ts/function'
const loadAssets$ = <T, U extends ObservableAsset<T>>(assets: Array<U>, numLoad: number) => {
// 複数のチャンクに分けて並列読み込み
const loadObservables = pipe(
assets,
A.map((asset) => asset.asset$),
A.chunksOf(numLoad), // 同時に読み込む数
A.map((o) => merge(...o)),
)
const loader$ = concat(...loadObservables).pipe(
map((asset, index) => ({ asset: asset, count: index + 1, total: assets.length })),
)
return loader$
}
const assets = [
new ObservableImage('パス'),
new ObservableVideo('パス'),
new ObservableThreeTexture('パス'),
// ...以下いくつでも
]
const numLoad = 32 // 同時に読み込む数
const subscription = loadAssets$(assets, numLoad).subscribe({
next: ({ count, total }) => {
// 進捗(いくつ読み込まれたか)
const progress = count / total
// 進捗処理を書く
},
complete: () => {
// 全て読み込み完了
},
})
上のコードを少し変えて、最初の1素材だけはすぐに読み込んで、残りはチャンクに分けて並列読み込む場合を考える。その場合は、畳み込み関数の foldLeft を使って配列の最初の要素と残りに分けてやればよい。
const loadAssets2$ = <T, U extends ObservableAsset<T>>(assets: Array<U>, numLoad: number) => {
const loadObservables = pipe(
assets,
A.map((asset) => asset.asset$),
A.foldLeft(
() => [[]],
(head, tail) => [[head], ...A.chunksOf(numLoad)(tail)],
),
A.map((o) => merge(...o)),
)
const loader$ = concat(...loadObservables).pipe(
map((asset, index) => ({ asset: asset, count: index + 1, total: assets.length })),
)
return loader$
}
最初に読み込んだ素材を取り急ぎ表示したい場合は、assets[0]
にあたる素材を別の場所で subscribe
しておけば、読み込み処理と表示処理を別にし、宣言的に書ける。
-
実際にはWebブラウザの任意のタイミング ↩︎
Discussion