iterator adapterもっといろいろアイデア帳
思いついたときに書いとくよう
- iteratorのteeing
- token streamのteeingはEither[L, R any]を作るときデコード処理で常にほしくなる
- 多分iteratorでも欲しくなることがある。
io.TeeReaderになぞらえるとこう?
func TeeSeq[V any](seq iter.Seq[V], pusher func(v V) bool) iter.Seq[V] {
return func(yield func(V) bool) {
for v := range seq {
if !pusher(v) {
return
}
if !yield(v) {
return
}
}
}
}
io.Reader
とio.Writer
ってメソッド名が違うだけで関数シグネチャは一緒なんですよね。io.Writer
に当たるものにfunc(v V) bool
を持ってくるのが最も適切なのかは確信が持てない。
となると当然io.Pipeをなぞらえたものも欲しくなる
io.Pipe
も内部的にchannelでやり取りするのでそこもなぞらえてそのように実装。
さらにio.Pipe
とは違い、バッファサイズも好きにいじっていいように。多分同じgoroutineで全部処理させたいケースもあるだろうから。
var (
_ hiter.IntoIterable[any] = (*Pipe[any])(nil)
)
type Pipe[V any] struct {
c chan V
mu sync.Mutex
closeOnce sync.Once
closed chan struct{}
}
func NewPipe[V any](n int) *Pipe[V] {
if n < 0 {
// panic?
n = 0
}
p := &Pipe[V]{
c: make(chan V, n),
closed: make(chan struct{}),
}
return p
}
func (p *Pipe[V]) Close() {
p.closeOnce.Do(func() {
close(p.closed)
p.mu.Lock()
defer p.mu.Unlock()
close(p.c)
})
}
func (p *Pipe[V]) Push(v V) bool {
select {
case <-p.closed:
return false
default:
}
p.mu.Lock()
defer p.mu.Unlock()
select {
case p.c <- v:
return true
case <-p.closed:
return false
}
}
func (p *Pipe[V]) TryPush(v V) (ok, pushed bool) {
select {
case <-p.closed:
return false, false
default:
}
if !p.mu.TryLock() {
return false, false
}
defer p.mu.Unlock()
select {
case p.c <- v:
return true, true
default:
return true, false
}
}
func (p *Pipe[V]) IntoIter() iter.Seq[V] {
return hiter.Chan(context.Background(), p.c)
}
buffer size > 0
がありうるならStop
は内部チャネルを閉じざるを得ない。閉じずにhiter.Chan
に渡したctx
をキャンセルしてしまうとチャネルのバッファにある要素が宙に浮いたまま消えてしまうため。
実は前の記事を書いていた時点でTee
の実装はしてたんだけど検証するのがめんどくさくなって消しちゃった。その時の実装はIter() (iter.Seq[V], iter.Seq[V])
とするもので、内部的にはchannelを介して値を送るというものだった。
あとからshared bufferに値を書き込んでそれぞれのiteratorから読み取り、全員読み取ったら次の値をpopulateするという方式にしたほうがいいと思ったが、これがgoroutineがうまいこと互い違いに起きて動作させようとすると結構大変で結局channelでタイミングをそろえることになり、複雑な割にうれしくないので今のteeの形にすることにした
もろにCyclicBarrierの使いどころだなと思ったり
Close, Pushなどがgoroutine safeである理由はないんだし少し簡素化
type Pipe[V any] struct {
c chan V
closed chan struct{}
}
func NewPipe[V any](n int) *Pipe[V] {
if n < 0 {
// panic?
n = 0
}
p := &Pipe[V]{
c: make(chan V, n),
closed: make(chan struct{}),
}
return p
}
func (p *Pipe[V]) Close() {
select {
case <-p.closed:
return
default:
}
close(p.closed)
close(p.c)
}
func (p *Pipe[V]) Push(v V) bool {
select {
case <-p.closed:
return false
default:
}
select {
case p.c <- v:
return true
case <-p.closed:
return false
}
}
func (p *Pipe[V]) TryPush(v V) (ok, pushed bool) {
select {
case <-p.closed:
return false, false
default:
}
select {
case p.c <- v:
return true, true
default:
return true, false
}
}
func (p *Pipe[V]) IntoIter() iter.Seq[V] {
return hiter.Chan(context.Background(), p.c)
}
// Cycle converts seq into an infinite iterator by repeatedly calling seq.
// seq is assumed to be finite but pure and reusable.
// The iterator may yield forever if seq is repeatable; stopping it is caller's responsibility.
func Cycle[V any](seq iter.Seq[V]) iter.Seq[V] {
return func(yield func(V) bool) {
for {
for v := range seq {
if !yield(v) {
return
}
}
}
}
}
// CycleBuffered is like [Cycle] but seq is called only once.
// Values from seq is buffered and from second time and on,
// the iterator uses buffered contents.
//
// seq must be finite and small, otherwise huge amount of memory will be consumed.
func CycleBuffered[V any](seq iter.Seq[V]) iter.Seq[V] {
return func(yield func(V) bool) {
var buf []V
for v := range seq {
if !yield(v) {
return
}
buf = append(buf, v)
}
for _, v := range buf {
if !yield(v) {
return
}
}
}
}
(あとなんかあったんだけど忘れちゃった)
samber/loのPartitionByの前振り
func MapKeys[M ~map[K]V, K comparable, V any](m M, keys iter.Seq[K]) iter.Seq2[K, V] {
return func(yield func(K, V) bool) {
for k := range keys {
if !yield(k, m[k]) {
return
}
}
}
}
そしてこう
func Partition[K comparable, V any](f func(V) K, seq iter.Seq[V]) iter.Seq2[int, V] {
return func(yield func(int, V) bool) {
m := map[K]int{}
for v := range seq {
k := f(v)
order, ok := m[k]
if !ok {
order = len(m)
m[k] = order
}
if !yield(order, v) {
return
}
}
}
}
func Example_partition() {
mm := hiter.ReduceGroup(
func(accumulator []int, current int) []int { return append(accumulator, current) },
[]int(nil),
hiter.Partition(
func(i int) string {
if i < 0 {
return "negative"
} else if i%2 == 0 {
return "even"
}
return "odd"
},
slices.Values([]int{-2, -1, 0, 1, 2, 3, 4, 5}),
),
)
fmt.Printf(
"%#v\n",
slices.Collect(
hiter.OmitF(
hiter.MapKeys(
mm,
hiter.Range(0, len(mm)),
),
),
),
)
// Output:
// [][]int{[]int{-2, -1}, []int{0, 2, 4}, []int{1, 3, 5}}
}
(名前が思いつかない。Partition
じゃ多分意味が分からないので変える必要がある)
- readerへの変換
ネタすぎるかなと思って実装やめたけどあると便利かも
hiter.Write
と同じような様式で
Writeをio.Pipeで繋げばいいんだけなんだけど
func Reader[V any](marshaler func(V) ([]byte, error), seq *Resumable[V]) io.ReadCloser {
return &iterReader[V]{
marshaler: marshaler,
seq: seq,
}
}
type iterReader[V any] struct {
marshaler func(V) ([]byte, error)
seq *Resumable[V]
buf []byte
err error
}
func (r *iterReader[V]) Read(p []byte) (n int, err error) {
if r.err != nil {
return 0, r.err
}
if len(r.buf) > 0 {
n = copy(p, r.buf)
p = p[n:]
r.buf = r.buf[n:]
if len(r.buf) > 0 {
return
}
}
next, ok := hiter.First(r.seq.IntoIter())
if !ok {
err = io.EOF
r.err = err
return
}
r.buf, err = r.marshaler(next)
if err != nil {
return
}
nn := copy(p, r.buf)
n += nn
r.buf = r.buf[nn:]
return
}
func (r *iterReader[V]) Close() error {
r.seq.Stop()
return nil
}
reflectでstructのフィールドを列挙するiterator
公開フィールドは全て同じ型でforEachしたいみたいなことをそれなりに頻繁に実装してるので欲しい
Readdirもiteratorにできる
Goは8kib分getdentt64をバッファーしてしまうためメモリコストに寄与するかは分からない
以下のようになる。いるかなあこれ。
func Readdir[R interface {
Readdir(n int) ([]fs.FileInfo, error)
}](r R) iter.Seq2[fs.FileInfo, error] {
return func(yield func(fs.FileInfo, error) bool) {
for {
// 64 dirents in a batch.
dirents, err := r.Readdir(1 << 6)
for _, dirent := range dirents {
if !yield(dirent, nil) {
return
}
}
if err != nil {
if err == io.EOF {
return
}
yield(nil, err)
return
}
}
}
}
sti(single type iterator)
やっぱメソッドチェーンで書きたい時もある
Duration
iterationされるたびtimerをResetしてdurationだけ待つ
time.Tickerと違ってyieldが長い時間かかる場合、処理が終わってからdurationまてる
テストが面倒だし使用機会そんななさそうだし消した
IndexAccesibleAll
のAll版
types.MethodSetをiterateするのはよくするが、別に範囲とか指定する必要なくて毎回hiter.Range(0, mset.Len())としているがこれがめんどくさい
Backward版はおそらく需要がないので作らない