Open34

iterator adapterもっといろいろアイデア帳

ngicksngicks
  • iteratorのteeing
  • token streamのteeingはEither[L, R any]を作るときデコード処理で常にほしくなる
  • 多分iteratorでも欲しくなることがある。

io.TeeReaderになぞらえるとこう?

func TeeSeq[V any](seq iter.Seq[V], pusher func(v V) bool) iter.Seq[V] {
	return func(yield func(V) bool) {
		for v := range seq {
			if !pusher(v) {
				return
			}
			if !yield(v) {
				return
			}
		}
	}
}

io.Readerio.Writerってメソッド名が違うだけで関数シグネチャは一緒なんですよね。io.Writerに当たるものにfunc(v V) boolを持ってくるのが最も適切なのかは確信が持てない。

となると当然io.Pipeをなぞらえたものも欲しくなる

io.Pipeも内部的にchannelでやり取りするのでそこもなぞらえてそのように実装。
さらにio.Pipeとは違い、バッファサイズも好きにいじっていいように。多分同じgoroutineで全部処理させたいケースもあるだろうから。

var (
	_ hiter.IntoIterable[any] = (*Pipe[any])(nil)
)

type Pipe[V any] struct {
	c         chan V
	mu        sync.Mutex
	closeOnce sync.Once
	closed    chan struct{}
}

func NewPipe[V any](n int) *Pipe[V] {
	if n < 0 {
		// panic?
		n = 0
	}
	p := &Pipe[V]{
		c:      make(chan V, n),
		closed: make(chan struct{}),
	}
	return p
}

func (p *Pipe[V]) Close() {
	p.closeOnce.Do(func() {
		close(p.closed)
		p.mu.Lock()
		defer p.mu.Unlock()
		close(p.c)
	})
}

func (p *Pipe[V]) Push(v V) bool {
	select {
	case <-p.closed:
		return false
	default:
	}
	p.mu.Lock()
	defer p.mu.Unlock()
	select {
	case p.c <- v:
		return true
	case <-p.closed:
		return false
	}
}

func (p *Pipe[V]) TryPush(v V) (ok, pushed bool) {
	select {
	case <-p.closed:
		return false, false
	default:
	}
	if !p.mu.TryLock() {
		return false, false
	}
	defer p.mu.Unlock()
	select {
	case p.c <- v:
		return true, true
	default:
		return true, false
	}
}

func (p *Pipe[V]) IntoIter() iter.Seq[V] {
	return hiter.Chan(context.Background(), p.c)
}

buffer size > 0がありうるならStopは内部チャネルを閉じざるを得ない。閉じずにhiter.Chanに渡したctxをキャンセルしてしまうとチャネルのバッファにある要素が宙に浮いたまま消えてしまうため。

ngicksngicks

実は前の記事を書いていた時点でTeeの実装はしてたんだけど検証するのがめんどくさくなって消しちゃった。その時の実装はIter() (iter.Seq[V], iter.Seq[V])とするもので、内部的にはchannelを介して値を送るというものだった。

ngicksngicks

あとからshared bufferに値を書き込んでそれぞれのiteratorから読み取り、全員読み取ったら次の値をpopulateするという方式にしたほうがいいと思ったが、これがgoroutineがうまいこと互い違いに起きて動作させようとすると結構大変で結局channelでタイミングをそろえることになり、複雑な割にうれしくないので今のteeの形にすることにした

ngicksngicks

Close, Pushなどがgoroutine safeである理由はないんだし少し簡素化

type Pipe[V any] struct {
	c      chan V
	closed chan struct{}
}

func NewPipe[V any](n int) *Pipe[V] {
	if n < 0 {
		// panic?
		n = 0
	}
	p := &Pipe[V]{
		c:      make(chan V, n),
		closed: make(chan struct{}),
	}
	return p
}

func (p *Pipe[V]) Close() {
	select {
	case <-p.closed:
		return
	default:
	}
	close(p.closed)
	close(p.c)
}

func (p *Pipe[V]) Push(v V) bool {
	select {
	case <-p.closed:
		return false
	default:
	}
	select {
	case p.c <- v:
		return true
	case <-p.closed:
		return false
	}
}

func (p *Pipe[V]) TryPush(v V) (ok, pushed bool) {
	select {
	case <-p.closed:
		return false, false
	default:
	}
	select {
	case p.c <- v:
		return true, true
	default:
		return true, false
	}
}

func (p *Pipe[V]) IntoIter() iter.Seq[V] {
	return hiter.Chan(context.Background(), p.c)
}
ngicksngicks
// Cycle converts seq into an infinite iterator by repeatedly calling seq.
// seq is assumed to be finite but pure and reusable.
// The iterator may yield forever if seq is repeatable; stopping it is caller's responsibility.
func Cycle[V any](seq iter.Seq[V]) iter.Seq[V] {
	return func(yield func(V) bool) {
		for {
			for v := range seq {
				if !yield(v) {
					return
				}
			}
		}
	}
}

// CycleBuffered is like [Cycle] but seq is called only once.
// Values from seq is buffered and from second time and on,
// the iterator uses buffered contents.
//
// seq must be finite and small, otherwise huge amount of memory will be consumed.
func CycleBuffered[V any](seq iter.Seq[V]) iter.Seq[V] {
	return func(yield func(V) bool) {
		var buf []V
		for v := range seq {
			if !yield(v) {
				return
			}
			buf = append(buf, v)
		}
		for _, v := range buf {
			if !yield(v) {
				return
			}
		}
	}
}

(あとなんかあったんだけど忘れちゃった)

ngicksngicks

samber/loPartitionByの前振り

func MapKeys[M ~map[K]V, K comparable, V any](m M, keys iter.Seq[K]) iter.Seq2[K, V] {
	return func(yield func(K, V) bool) {
		for k := range keys {
			if !yield(k, m[k]) {
				return
			}
		}
	}
}
ngicksngicks

そしてこう

func Partition[K comparable, V any](f func(V) K, seq iter.Seq[V]) iter.Seq2[int, V] {
	return func(yield func(int, V) bool) {
		m := map[K]int{}
		for v := range seq {
			k := f(v)
			order, ok := m[k]
			if !ok {
				order = len(m)
				m[k] = order
			}
			if !yield(order, v) {
				return
			}
		}
	}
}
func Example_partition() {
	mm := hiter.ReduceGroup(
		func(accumulator []int, current int) []int { return append(accumulator, current) },
		[]int(nil),
		hiter.Partition(
			func(i int) string {
				if i < 0 {
					return "negative"
				} else if i%2 == 0 {
					return "even"
				}
				return "odd"
			},
			slices.Values([]int{-2, -1, 0, 1, 2, 3, 4, 5}),
		),
	)

	fmt.Printf(
		"%#v\n",
		slices.Collect(
			hiter.OmitF(
				hiter.MapKeys(
					mm,
					hiter.Range(0, len(mm)),
				),
			),
		),
	)
	// Output:
	// [][]int{[]int{-2, -1}, []int{0, 2, 4}, []int{1, 3, 5}}
}

(名前が思いつかない。Partitionじゃ多分意味が分からないので変える必要がある)

ngicksngicks
  • readerへの変換
    ネタすぎるかなと思って実装やめたけどあると便利かも
    hiter.Writeと同じような様式で
ngicksngicks
func Reader[V any](marshaler func(V) ([]byte, error), seq *Resumable[V]) io.ReadCloser {
	return &iterReader[V]{
		marshaler: marshaler,
		seq:       seq,
	}
}

type iterReader[V any] struct {
	marshaler func(V) ([]byte, error)
	seq       *Resumable[V]
	buf       []byte
	err       error
}

func (r *iterReader[V]) Read(p []byte) (n int, err error) {
	if r.err != nil {
		return 0, r.err
	}
	if len(r.buf) > 0 {
		n = copy(p, r.buf)
		p = p[n:]
		r.buf = r.buf[n:]
		if len(r.buf) > 0 {
			return
		}
	}
	next, ok := hiter.First(r.seq.IntoIter())
	if !ok {
		err = io.EOF
		r.err = err
		return
	}
	r.buf, err = r.marshaler(next)
	if err != nil {
		return
	}
	nn := copy(p, r.buf)
	n += nn
	r.buf = r.buf[nn:]
	return
}

func (r *iterReader[V]) Close() error {
	r.seq.Stop()
	return nil
}
ngicksngicks

reflectでstructのフィールドを列挙するiterator

公開フィールドは全て同じ型でforEachしたいみたいなことをそれなりに頻繁に実装してるので欲しい

ngicksngicks

Readdirもiteratorにできる
Goは8kib分getdentt64をバッファーしてしまうためメモリコストに寄与するかは分からない

ngicksngicks

以下のようになる。いるかなあこれ。

func Readdir[R interface {
	Readdir(n int) ([]fs.FileInfo, error)
}](r R) iter.Seq2[fs.FileInfo, error] {
	return func(yield func(fs.FileInfo, error) bool) {
		for {
			// 64 dirents in a batch.
			dirents, err := r.Readdir(1 << 6)
			for _, dirent := range dirents {
				if !yield(dirent, nil) {
					return
				}
			}
			if err != nil {
				if err == io.EOF {
					return
				}
				yield(nil, err)
				return
			}
		}
	}
}
ngicksngicks

Duration
iterationされるたびtimerをResetしてdurationだけ待つ
time.Tickerと違ってyieldが長い時間かかる場合、処理が終わってからdurationまてる

ngicksngicks

テストが面倒だし使用機会そんななさそうだし消した

ngicksngicks

IndexAccesibleAll

https://pkg.go.dev/github.com/ngicks/go-iterator-helper@v0.0.15/hiter#IndexAccessible
のAll版

types.MethodSetをiterateするのはよくするが、別に範囲とか指定する必要なくて毎回hiter.Range(0, mset.Len())としているがこれがめんどくさい

Backward版はおそらく需要がないので作らない

ngicksngicks

ところでIndexAccessibleよりも名前はAtterのほうが良かったかも 単純で名前から予測がつく

ngicksngicks

破壊的変更になっちゃうけど多分使ってる人は私しかいないのでAtterAllとAtterRangeにしちゃおうかしら