Open15
goで依存したものの処理を非同期・同期を意識せず使ってみたい
このような計算を考える
- 雑にchannelを組み合わせると並行処理ができる
- genericsを使えば、特定の型に依存した処理にする必要がなくなる?
- (エラー処理はどうしようか?)
- (context cancelの対応はどうしようか?)
- (実際のコードへの適用はどうしようか?)
- 実行とトレースを分離したい
同期処理
----------------------------------------
sync
----------------------------------------
21:17:27.759349 value(10)
21:17:27.867438 value(20)
21:17:28.382795 add(10,20) = 30
21:17:28.491254 value(30)
21:17:28.592238 value(40)
21:17:29.097075 add(30,40) = 70
21:17:29.628750 add(30,70) = 100
100
code
package main
import (
"fmt"
"log"
"time"
)
func main() {
log.SetFlags(log.Lmicroseconds)
runSync()
}
func runSync() {
now := time.Now()
log.Printf("start %s", now)
defer func() { log.Printf("end with %s", time.Since(now)) }()
fmt.Println("----------------------------------------")
fmt.Println("sync")
fmt.Println("----------------------------------------")
fmt.Println(add(add(value(10), value(20)), add(value(30), value(40))))
}
func value(x int) int {
time.Sleep(100 * time.Millisecond)
log.Printf("value(%d)", x)
return x
}
func add(x, y int) int {
time.Sleep(500 * time.Millisecond)
log.Printf("add(%d,%d) = %d", x, y, x+y)
return x + y
}
非同期処理
----------------------------------------
async
----------------------------------------
21:19:24.971010 value(20)
21:19:24.971216 value(10)
21:19:24.971246 value(30)
21:19:24.971230 value(40)
21:19:25.372763 add(10,20) = 30
21:19:25.372893 add(30,40) = 70
21:19:25.372976 add(30,70) = 100
100
21:19:25.373035 end with 502.53423ms
code
package main
import (
"fmt"
"log"
"time"
)
func main() {
log.SetFlags(log.Lmicroseconds)
run()
}
func run() {
now := time.Now()
log.Printf("start %s", now)
defer func() { log.Printf("end with %s", time.Since(now)) }()
fmt.Println("----------------------------------------")
fmt.Println("async")
fmt.Println("----------------------------------------")
fmt.Println(<-add(add(value(10), value(20)), add(value(30), value(40))))
}
func value(x int) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
time.Sleep(100 * time.Millisecond)
log.Printf("value(%d)", x)
ch <- x
}()
return ch
}
func add(xCh, yCh <-chan int) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
time.Sleep(500 * time.Millisecond)
x := <-xCh
y := <-yCh
log.Printf("add(%d,%d) = %d", x, y, x+y)
ch <- x + y
}()
return ch
}
graphのような構造を作る
こんな感じ。
package main
import (
"fmt"
"io"
"log"
"os"
"time"
)
type Node interface {
Do() int
Dump(w io.Writer)
}
type ValueNode struct {
N int
}
func (n *ValueNode) Do() int {
x := n.N
time.Sleep(100 * time.Millisecond)
log.Printf("value(%d)", x)
return x
}
func (n *ValueNode) Dump(w io.Writer) {
fmt.Fprintf(w, "value(%d)", n.N)
}
type AddNode struct {
X Node
Y Node
}
func (n *AddNode) Do() int {
x := n.X.Do()
y := n.Y.Do()
time.Sleep(500 * time.Millisecond)
log.Printf("add(%d,%d) = %d", x, y, x+y)
return x + y
}
func (n *AddNode) Dump(w io.Writer) {
fmt.Fprintf(w, "add(")
n.X.Dump(w)
fmt.Fprintf(w, ", ")
n.Y.Dump(w)
fmt.Fprintf(w, ")")
}
func main() {
log.SetFlags(log.Lmicroseconds)
now := time.Now()
log.Printf("start %s", now)
defer func() { log.Printf("end with %s", time.Since(now)) }()
fmt.Println("----------------------------------------")
fmt.Println("sync")
fmt.Println("----------------------------------------")
g := &AddNode{&AddNode{&ValueNode{10}, &ValueNode{20}}, &AddNode{&ValueNode{30}, &ValueNode{40}}}
g.Dump(os.Stdout)
fmt.Println("")
fmt.Println("----------------------------------------")
fmt.Println(g.Do())
}
実行結果
----------------------------------------
sync
----------------------------------------
add(add(value(10), value(20)), add(value(30), value(40)))
----------------------------------------
21:36:12.097043 value(10)
21:36:12.197277 value(20)
21:36:12.699242 add(10,20) = 30
21:36:12.807172 value(30)
21:36:12.908077 value(40)
21:36:13.417616 add(30,40) = 70
21:36:13.934240 add(30,70) = 100
100
21:36:13.934361 end with 1.937804514s
非同期用のインターフェイスを定義してみる
type Node interface {
Do() int
Dump(w io.Writer)
}
ここから
type Node interface {
Do() <-chan int
Dump(w io.Writer)
}
こんな感じのコードになる。
package main
import (
"fmt"
"io"
"log"
"os"
"time"
)
type Node interface {
Do() <-chan int
Dump(w io.Writer)
}
type ValueNode struct {
N int
}
func (n *ValueNode) Do() <-chan int {
x := n.N
ch := make(chan int)
go func() {
defer close(ch)
time.Sleep(100 * time.Millisecond)
log.Printf("value(%d)", x)
ch <- x
}()
return ch
}
func (n *ValueNode) Dump(w io.Writer) {
fmt.Fprintf(w, "value(%d)", n.N)
}
type AddNode struct {
X Node
Y Node
}
func (n *AddNode) Do() <-chan int {
ch := make(chan int)
xCh := n.X.Do()
yCh := n.Y.Do()
go func() {
defer close(ch)
time.Sleep(500 * time.Millisecond)
x := <-xCh
y := <-yCh
log.Printf("add(%d,%d) = %d", x, y, x+y)
ch <- x + y
}()
return ch
}
func (n *AddNode) Dump(w io.Writer) {
fmt.Fprintf(w, "add(")
n.X.Dump(w)
fmt.Fprintf(w, ", ")
n.Y.Dump(w)
fmt.Fprintf(w, ")")
}
func main() {
log.SetFlags(log.Lmicroseconds)
now := time.Now()
log.Printf("start %s", now)
defer func() { log.Printf("end with %s", time.Since(now)) }()
fmt.Println("----------------------------------------")
fmt.Println("async")
fmt.Println("----------------------------------------")
g := &AddNode{&AddNode{&ValueNode{10}, &ValueNode{20}}, &AddNode{&ValueNode{30}, &ValueNode{40}}}
g.Dump(os.Stdout)
fmt.Println("")
fmt.Println("----------------------------------------")
fmt.Println(<-g.Do())
}
実行結果
----------------------------------------
add(add(value(10), value(20)), add(value(30), value(40)))
----------------------------------------
21:37:57.632474 value(20)
21:37:57.632510 value(40)
21:37:57.632540 value(30)
21:37:57.632550 value(10)
21:37:58.028902 add(10,20) = 30
21:37:58.029148 add(30,40) = 70
21:37:58.029197 add(30,70) = 100
100
21:37:58.029363 end with 501.744277ms
graphをgenericsにする
こんな感じでインターフェイスをgenericsにする。
type Node[T any] interface {
Do() <-chan T
Dump(w io.Writer)
}
Uopは不要ではあるけれど。。
package main
import (
"fmt"
"io"
"log"
"os"
"time"
)
type Node[T any] interface {
Do() <-chan T
Dump(w io.Writer)
}
type ValueNode[T any] struct {
X T
}
func (n *ValueNode[T]) Do() <-chan T {
ch := make(chan T)
go func() {
defer close(ch)
time.Sleep(100 * time.Millisecond)
x := n.X
log.Printf("value(%v)", x)
ch <- x
}()
return ch
}
func (n *ValueNode[T]) Dump(w io.Writer) {
fmt.Fprintf(w, "value(%v)", n.X)
}
type UopNode[T any] struct {
X Node[T]
name string
uOp func(T) T
}
func (n *UopNode[T]) Do() <-chan T {
xCh := n.X.Do()
ch := make(chan T)
go func() {
defer close(ch)
time.Sleep(100 * time.Millisecond)
x := <-xCh
log.Printf("%s(%v)", n.name, x)
ch <- n.uOp(x)
}()
return ch
}
func (n *UopNode[T]) Dump(w io.Writer) {
fmt.Fprintf(w, "%s(", n.name)
n.X.Dump(w)
fmt.Fprintf(w, ")")
}
type BopNode[T any] struct {
X Node[T]
Y Node[T]
name string
bOp func(x, y T) T
}
func (n *BopNode[T]) Do() <-chan T {
ch := make(chan T)
xCh := n.X.Do()
yCh := n.Y.Do()
go func() {
defer close(ch)
time.Sleep(500 * time.Millisecond)
x := <-xCh
y := <-yCh
ans := n.bOp(x, y)
log.Printf("%s(%v,%v) = %v", n.name, x, y, ans)
ch <- ans
}()
return ch
}
func (n *BopNode[T]) Dump(w io.Writer) {
fmt.Fprintf(w, "%s(", n.name)
n.X.Dump(w)
fmt.Fprintf(w, ", ")
n.Y.Dump(w)
fmt.Fprintf(w, ")")
}
func value(n int) *ValueNode[int] {
return &ValueNode[int]{X: n}
}
func add(x, y Node[int]) *BopNode[int] {
return &BopNode[int]{X: x, Y: y, name: "add", bOp: func(x, y int) int { return x + y }}
}
func main() {
log.SetFlags(log.Lmicroseconds)
now := time.Now()
log.Printf("start %s", now)
defer func() { log.Printf("end with %s", time.Since(now)) }()
fmt.Println("----------------------------------------")
fmt.Println("async")
fmt.Println("----------------------------------------")
g := add(add(value(10), value(20)), add(value(30), value(40)))
g.Dump(os.Stdout)
fmt.Println("")
fmt.Println("----------------------------------------")
fmt.Println(<-g.Do())
}
----------------------------------------
async
----------------------------------------
add(add(value(10), value(20)), add(value(30), value(40)))
----------------------------------------
22:38:54.318238 value(20)
22:38:54.318270 value(40)
22:38:54.318310 value(30)
22:38:54.318319 value(10)
22:38:54.729963 add(30,40) = 70
22:38:54.730106 add(10,20) = 30
22:38:54.730494 add(30,70) = 100
100
22:38:54.730622 end with 513.706739ms
グラフの定義と実行の同期/非同期を分けたい場合はどうすれば良いんだろう?
Do()がgraphに依存してしまっているので分けられない。
ASTのように考えてしまえば、dumpはわざわざ各structに定義しなくても良いのかもしれない。
graphの構築とgraphの実行を分けてあげればtraceができるかもしれない。
クロージャのようにしてしまうと実行せずができなくなる。
固定値を依存の一部としてグラフを作りたい