Open15

goで依存したものの処理を非同期・同期を意識せず使ってみたい

podhmopodhmo

このような計算を考える

  • 雑にchannelを組み合わせると並行処理ができる
  • genericsを使えば、特定の型に依存した処理にする必要がなくなる?
  • (エラー処理はどうしようか?)
  • (context cancelの対応はどうしようか?)
  • (実際のコードへの適用はどうしようか?)
  • 実行とトレースを分離したい
podhmopodhmo

同期処理

----------------------------------------
sync
----------------------------------------
21:17:27.759349 value(10)
21:17:27.867438 value(20)
21:17:28.382795 add(10,20) = 30
21:17:28.491254 value(30)
21:17:28.592238 value(40)
21:17:29.097075 add(30,40) = 70
21:17:29.628750 add(30,70) = 100
100

code

package main

import (
	"fmt"
	"log"
	"time"
)

func main() {
	log.SetFlags(log.Lmicroseconds)
	runSync()
}

func runSync() {
	now := time.Now()
	log.Printf("start %s", now)
	defer func() { log.Printf("end with %s", time.Since(now)) }()

	fmt.Println("----------------------------------------")
	fmt.Println("sync")
	fmt.Println("----------------------------------------")
	fmt.Println(add(add(value(10), value(20)), add(value(30), value(40))))
}

func value(x int) int {
	time.Sleep(100 * time.Millisecond)
	log.Printf("value(%d)", x)
	return x
}

func add(x, y int) int {
	time.Sleep(500 * time.Millisecond)
	log.Printf("add(%d,%d) = %d", x, y, x+y)
	return x + y
}
podhmopodhmo

非同期処理

----------------------------------------
async
----------------------------------------
21:19:24.971010 value(20)
21:19:24.971216 value(10)
21:19:24.971246 value(30)
21:19:24.971230 value(40)
21:19:25.372763 add(10,20) = 30
21:19:25.372893 add(30,40) = 70
21:19:25.372976 add(30,70) = 100
100
21:19:25.373035 end with 502.53423ms

code

package main

import (
	"fmt"
	"log"
	"time"
)

func main() {
	log.SetFlags(log.Lmicroseconds)
	run()
}

func run() {
	now := time.Now()
	log.Printf("start %s", now)
	defer func() { log.Printf("end with %s", time.Since(now)) }()

	fmt.Println("----------------------------------------")
	fmt.Println("async")
	fmt.Println("----------------------------------------")
	fmt.Println(<-add(add(value(10), value(20)), add(value(30), value(40))))
}

func value(x int) <-chan int {
	ch := make(chan int)
	go func() {
		defer close(ch)
		time.Sleep(100 * time.Millisecond)
		log.Printf("value(%d)", x)
		ch <- x
	}()
	return ch
}
func add(xCh, yCh <-chan int) <-chan int {
	ch := make(chan int)
	go func() {
		defer close(ch)
		time.Sleep(500 * time.Millisecond)
		x := <-xCh
		y := <-yCh
		log.Printf("add(%d,%d) = %d", x, y, x+y)
		ch <- x + y
	}()
	return ch
}
podhmopodhmo

graphのような構造を作る

こんな感じ。

package main

import (
	"fmt"
	"io"
	"log"
	"os"
	"time"
)

type Node interface {
	Do() int
	Dump(w io.Writer)
}

type ValueNode struct {
	N int
}

func (n *ValueNode) Do() int {
	x := n.N
	time.Sleep(100 * time.Millisecond)
	log.Printf("value(%d)", x)
	return x
}
func (n *ValueNode) Dump(w io.Writer) {
	fmt.Fprintf(w, "value(%d)", n.N)
}

type AddNode struct {
	X Node
	Y Node
}

func (n *AddNode) Do() int {
	x := n.X.Do()
	y := n.Y.Do()
	time.Sleep(500 * time.Millisecond)
	log.Printf("add(%d,%d) = %d", x, y, x+y)
	return x + y
}
func (n *AddNode) Dump(w io.Writer) {
	fmt.Fprintf(w, "add(")
	n.X.Dump(w)
	fmt.Fprintf(w, ", ")
	n.Y.Dump(w)
	fmt.Fprintf(w, ")")
}

func main() {
	log.SetFlags(log.Lmicroseconds)
	now := time.Now()
	log.Printf("start %s", now)
	defer func() { log.Printf("end with %s", time.Since(now)) }()

	fmt.Println("----------------------------------------")
	fmt.Println("sync")
	fmt.Println("----------------------------------------")
	g := &AddNode{&AddNode{&ValueNode{10}, &ValueNode{20}}, &AddNode{&ValueNode{30}, &ValueNode{40}}}
	g.Dump(os.Stdout)
	fmt.Println("")
	fmt.Println("----------------------------------------")
	fmt.Println(g.Do())
}
podhmopodhmo

実行結果

----------------------------------------
sync
----------------------------------------
add(add(value(10), value(20)), add(value(30), value(40)))
----------------------------------------
21:36:12.097043 value(10)
21:36:12.197277 value(20)
21:36:12.699242 add(10,20) = 30
21:36:12.807172 value(30)
21:36:12.908077 value(40)
21:36:13.417616 add(30,40) = 70
21:36:13.934240 add(30,70) = 100
100
21:36:13.934361 end with 1.937804514s
podhmopodhmo

非同期用のインターフェイスを定義してみる

type Node interface {
	Do() int
	Dump(w io.Writer)
}

ここから

type Node interface {
	Do() <-chan int
	Dump(w io.Writer)
}
podhmopodhmo

こんな感じのコードになる。

package main

import (
	"fmt"
	"io"
	"log"
	"os"
	"time"
)

type Node interface {
	Do() <-chan int
	Dump(w io.Writer)
}

type ValueNode struct {
	N int
}

func (n *ValueNode) Do() <-chan int {
	x := n.N
	ch := make(chan int)
	go func() {
		defer close(ch)
		time.Sleep(100 * time.Millisecond)
		log.Printf("value(%d)", x)
		ch <- x
	}()
	return ch
}
func (n *ValueNode) Dump(w io.Writer) {
	fmt.Fprintf(w, "value(%d)", n.N)
}

type AddNode struct {
	X Node
	Y Node
}

func (n *AddNode) Do() <-chan int {
	ch := make(chan int)
	xCh := n.X.Do()
	yCh := n.Y.Do()
	go func() {
		defer close(ch)
		time.Sleep(500 * time.Millisecond)
		x := <-xCh
		y := <-yCh
		log.Printf("add(%d,%d) = %d", x, y, x+y)
		ch <- x + y
	}()
	return ch
}
func (n *AddNode) Dump(w io.Writer) {
	fmt.Fprintf(w, "add(")
	n.X.Dump(w)
	fmt.Fprintf(w, ", ")
	n.Y.Dump(w)
	fmt.Fprintf(w, ")")
}

func main() {
	log.SetFlags(log.Lmicroseconds)
	now := time.Now()
	log.Printf("start %s", now)
	defer func() { log.Printf("end with %s", time.Since(now)) }()

	fmt.Println("----------------------------------------")
	fmt.Println("async")
	fmt.Println("----------------------------------------")
	g := &AddNode{&AddNode{&ValueNode{10}, &ValueNode{20}}, &AddNode{&ValueNode{30}, &ValueNode{40}}}
	g.Dump(os.Stdout)
	fmt.Println("")
	fmt.Println("----------------------------------------")
	fmt.Println(<-g.Do())
}
podhmopodhmo

実行結果

----------------------------------------
add(add(value(10), value(20)), add(value(30), value(40)))
----------------------------------------
21:37:57.632474 value(20)
21:37:57.632510 value(40)
21:37:57.632540 value(30)
21:37:57.632550 value(10)
21:37:58.028902 add(10,20) = 30
21:37:58.029148 add(30,40) = 70
21:37:58.029197 add(30,70) = 100
100
21:37:58.029363 end with 501.744277ms
podhmopodhmo

graphをgenericsにする

こんな感じでインターフェイスをgenericsにする。

type Node[T any] interface {
	Do() <-chan T
	Dump(w io.Writer)
}
podhmopodhmo

Uopは不要ではあるけれど。。

package main

import (
	"fmt"
	"io"
	"log"
	"os"
	"time"
)

type Node[T any] interface {
	Do() <-chan T
	Dump(w io.Writer)
}

type ValueNode[T any] struct {
	X T
}

func (n *ValueNode[T]) Do() <-chan T {
	ch := make(chan T)
	go func() {
		defer close(ch)
		time.Sleep(100 * time.Millisecond)
		x := n.X
		log.Printf("value(%v)", x)
		ch <- x
	}()
	return ch
}
func (n *ValueNode[T]) Dump(w io.Writer) {
	fmt.Fprintf(w, "value(%v)", n.X)
}

type UopNode[T any] struct {
	X    Node[T]
	name string
	uOp  func(T) T
}

func (n *UopNode[T]) Do() <-chan T {
	xCh := n.X.Do()
	ch := make(chan T)
	go func() {
		defer close(ch)
		time.Sleep(100 * time.Millisecond)
		x := <-xCh
		log.Printf("%s(%v)", n.name, x)
		ch <- n.uOp(x)
	}()
	return ch
}
func (n *UopNode[T]) Dump(w io.Writer) {
	fmt.Fprintf(w, "%s(", n.name)
	n.X.Dump(w)
	fmt.Fprintf(w, ")")
}

type BopNode[T any] struct {
	X    Node[T]
	Y    Node[T]
	name string
	bOp  func(x, y T) T
}

func (n *BopNode[T]) Do() <-chan T {
	ch := make(chan T)
	xCh := n.X.Do()
	yCh := n.Y.Do()
	go func() {
		defer close(ch)
		time.Sleep(500 * time.Millisecond)
		x := <-xCh
		y := <-yCh
		ans := n.bOp(x, y)
		log.Printf("%s(%v,%v) = %v", n.name, x, y, ans)
		ch <- ans
	}()
	return ch
}
func (n *BopNode[T]) Dump(w io.Writer) {
	fmt.Fprintf(w, "%s(", n.name)
	n.X.Dump(w)
	fmt.Fprintf(w, ", ")
	n.Y.Dump(w)
	fmt.Fprintf(w, ")")
}

func value(n int) *ValueNode[int] {
	return &ValueNode[int]{X: n}
}
func add(x, y Node[int]) *BopNode[int] {
	return &BopNode[int]{X: x, Y: y, name: "add", bOp: func(x, y int) int { return x + y }}
}

func main() {
	log.SetFlags(log.Lmicroseconds)
	now := time.Now()
	log.Printf("start %s", now)
	defer func() { log.Printf("end with %s", time.Since(now)) }()

	fmt.Println("----------------------------------------")
	fmt.Println("async")
	fmt.Println("----------------------------------------")
	g := add(add(value(10), value(20)), add(value(30), value(40)))
	g.Dump(os.Stdout)
	fmt.Println("")
	fmt.Println("----------------------------------------")
	fmt.Println(<-g.Do())
}
podhmopodhmo
----------------------------------------
async
----------------------------------------
add(add(value(10), value(20)), add(value(30), value(40)))
----------------------------------------
22:38:54.318238 value(20)
22:38:54.318270 value(40)
22:38:54.318310 value(30)
22:38:54.318319 value(10)
22:38:54.729963 add(30,40) = 70
22:38:54.730106 add(10,20) = 30
22:38:54.730494 add(30,70) = 100
100
22:38:54.730622 end with 513.706739ms

グラフの定義と実行の同期/非同期を分けたい場合はどうすれば良いんだろう?
Do()がgraphに依存してしまっているので分けられない。

podhmopodhmo

ASTのように考えてしまえば、dumpはわざわざ各structに定義しなくても良いのかもしれない。

podhmopodhmo

graphの構築とgraphの実行を分けてあげればtraceができるかもしれない。

podhmopodhmo

クロージャのようにしてしまうと実行せずができなくなる。

podhmopodhmo

固定値を依存の一部としてグラフを作りたい