Open2
Golang 😢
firestore の巨大な collection を traverse する.
GetAll でまとめて取得するのは(成功する時もあるが)大量のメモリを消費する. アプリケーションのメモリが足りないと OOM になる.
// materialize all documents on memory at once
// not practical for a large collection
snapshots, err := firestoreClient.Collection("/path/to/collection").
Documents(ctx).
GetAll()
Documents(ctx)
は Next()
で lazy に要素を取得する firestore.DocumentIterator
を返す.
これを利用するとうまくいきそうだが残念ながら collection が巨大だとエラーになる. "The datastore operation timed out, or the data was temporarily unavailable" などと出るはず.
docs := firestoreClient.Collection("/path/to/collection").
Documents(ctx)
for {
item, err := docs.Next()
if err := iterator.Done {
break
}
...
}
正解(?) は巨大なコレクションを扱うには OrderBy
と StartAfter
を組み合わせてページングする.
collection := firestoreClient.Collection("/path/to/collection")
var lastDocument firesore.DocumentSnapshot
batchSize := 100
for {
query := collection.
OrderBy("<orderingField>", firestore.Asc). // firestore.Desc でも可.
Where(<cond>). // Where はあってもなくても可.
Limit(batchSize)
if lastDocument != nil {
query = query.StartAfter(lastDocument.Data()["<orderingField>"])
}
// lastDocument が nil なら先頭から、nil でないなら lastDocument の
// 次の要素から batchSize 件取得する.
batch, err := query.Documents(ctx).GetAll()
// do something with document snapshots
// ...
// 次のバッチが存在しなくなるまで取得する.
if len(batch) != batchSize {
// there's no more next page
break
}
lastDocument := // ここに有効な一番末尾の document snapshot を入れる.
}
同じ型を返す失敗しうる非同期関数を最大並列数の上限付きで並列実行しエラーの情報を失わず順序を保ってリストに格納する処理
import cats.*
import cats.syntax.all.*
import cats.effect.*
List
.fill(n)(IO("failable operation").attempt)
.parSequenceN(10)
package main
import (
"fmt"
"sync"
)
func main() {
// Create a wait group to wait for all goroutines to finish
var wg sync.WaitGroup
// Create a channel to control the concurrency (at most 3 routines in parallel)
semaphore := make(chan struct{}, 3)
// Create a channel to collect results along with their index and error
resultChan := make(chan struct {
index int
value int
err error
})
// Define the asynchronous routine
routine := func(index int) {
defer wg.Done()
// Acquire the semaphore to control concurrency
semaphore <- struct{}{}
defer func() { <-semaphore }()
// Simulate some asynchronous work
// In a real-world scenario, replace this with your actual asynchronous routine
result := 42
var err error
// Add error handling logic if needed
// For example:
// if somethingWentWrong {
// err = errors.New("Something went wrong")
// }
// Send the result, its index, and the error to the channel
resultChan <- struct {
index int
value int
err error
}{index, result, err}
}
// Create a variable number of routines
numRoutines := 5
// Add the number of routines to the wait group
wg.Add(numRoutines)
// Launch goroutines for each routine
for i := 0; i < numRoutines; i++ {
go routine(i)
}
// Close the result channel when all goroutines are done
go func() {
wg.Wait()
close(resultChan)
}()
// Collect and handle results from the channel
results := make([]int, numRoutines)
for i := 0; i < numRoutines; i++ {
result := <-resultChan
if result.err != nil {
fmt.Printf("Error in routine %d: %v\n", result.index, result.err)
// Handle the error as needed
} else {
results[result.index] = result.value
}
}
// Print the ordered results
fmt.Println(results)
}