Open2

Golang 😢

110416110416

firestore の巨大な collection を traverse する.

GetAll でまとめて取得するのは(成功する時もあるが)大量のメモリを消費する. アプリケーションのメモリが足りないと OOM になる.

// materialize all documents on memory at once
// not practical for a large collection
snapshots, err := firestoreClient.Collection("/path/to/collection").
    Documents(ctx).
    GetAll()

Documents(ctx)Next() で lazy に要素を取得する firestore.DocumentIterator を返す.
これを利用するとうまくいきそうだが残念ながら collection が巨大だとエラーになる. "The datastore operation timed out, or the data was temporarily unavailable" などと出るはず.

docs := firestoreClient.Collection("/path/to/collection").
    Documents(ctx)

for {
  item, err := docs.Next()
  if err := iterator.Done {
    break
  }
  ...
}

正解(?) は巨大なコレクションを扱うには OrderByStartAfter を組み合わせてページングする.


collection := firestoreClient.Collection("/path/to/collection")
var lastDocument firesore.DocumentSnapshot
batchSize := 100
for {
  query := collection.
    OrderBy("<orderingField>", firestore.Asc).  // firestore.Desc でも可.
    Where(<cond>). // Where はあってもなくても可.
    Limit(batchSize)
  
  if lastDocument != nil {
    query = query.StartAfter(lastDocument.Data()["<orderingField>"])
  }
  // lastDocument が nil なら先頭から、nil でないなら lastDocument の
  // 次の要素から batchSize 件取得する.
  batch, err := query.Documents(ctx).GetAll()
  // do something with document snapshots
  // ...
  // 次のバッチが存在しなくなるまで取得する.
  if len(batch) != batchSize {
    // there's no more next page
    break
  }
  lastDocument :=  // ここに有効な一番末尾の document snapshot を入れる.
}

https://github.com/googleapis/google-cloud-dotnet/issues/4561

110416110416

同じ型を返す失敗しうる非同期関数を最大並列数の上限付きで並列実行しエラーの情報を失わず順序を保ってリストに格納する処理

import cats.*
import cats.syntax.all.*
import cats.effect.*

List
  .fill(n)(IO("failable operation").attempt)
  .parSequenceN(10)

package main

import (
	"fmt"
	"sync"
)

func main() {
	// Create a wait group to wait for all goroutines to finish
	var wg sync.WaitGroup

	// Create a channel to control the concurrency (at most 3 routines in parallel)
	semaphore := make(chan struct{}, 3)

	// Create a channel to collect results along with their index and error
	resultChan := make(chan struct {
		index int
		value int
		err   error
	})

	// Define the asynchronous routine
	routine := func(index int) {
		defer wg.Done()

		// Acquire the semaphore to control concurrency
		semaphore <- struct{}{}
		defer func() { <-semaphore }()

		// Simulate some asynchronous work
		// In a real-world scenario, replace this with your actual asynchronous routine
		result := 42
		var err error

		// Add error handling logic if needed
		// For example:
		// if somethingWentWrong {
		//     err = errors.New("Something went wrong")
		// }

		// Send the result, its index, and the error to the channel
		resultChan <- struct {
			index int
			value int
			err   error
		}{index, result, err}
	}

	// Create a variable number of routines
	numRoutines := 5

	// Add the number of routines to the wait group
	wg.Add(numRoutines)

	// Launch goroutines for each routine
	for i := 0; i < numRoutines; i++ {
		go routine(i)
	}

	// Close the result channel when all goroutines are done
	go func() {
		wg.Wait()
		close(resultChan)
	}()

	// Collect and handle results from the channel
	results := make([]int, numRoutines)
	for i := 0; i < numRoutines; i++ {
		result := <-resultChan
		if result.err != nil {
			fmt.Printf("Error in routine %d: %v\n", result.index, result.err)
			// Handle the error as needed
		} else {
			results[result.index] = result.value
		}
	}

	// Print the ordered results
	fmt.Println(results)
}