🌐

Goのnet/httpのServer側デザインの良さについて語る(Listen編)

2021/10/03に公開

Goのnet/httpのServerのデザインの良さ

net/httpのコードリーディングをしていてepollとscheduler、goroutineがうまく協調してデザインしてあり上手く回っているものだな・・・と思ったので紹介します。

といっても下記の記事で解説されているので、ここではもうちょっと全体の流れをソースコードを詳しく見てみて追っていきます。

参考
https://i101330.hatenablog.com/entry/2019/12/03/113645

最初に全体像の説明

上記画像の感じですね。
epollとかGoRoutineとかが絡んできます。
上記画像はコネクション処理編での動作なのでListen,Acceptはそれまでの溜めみたいなものとなっています。
Listen編 -> Accept編 -> コネクション処理編と続きます。

epoll is 何

下記の解説が詳しいです。ざくっと言うと非同期IOを実現させるためのものです。(厳密な非同期IOの定義とは違うかもしれませんが容赦ください)

同期IOだと下記はRead終了までブロックしてしまいます。someTaskはRead完了まで実行されません。

ret := SomeHeavyRead() 

someTask()

非同期IOだとReadが終わらなくてもsomeTaskが実行されます。

ret := SomeAsyncHeavyRead() 

someTask()

同期IO,非同期IO,ブロックング、ノンブロッキングの定義等は下記参考にすると良いかもしれません。
https://blog.takanabe.tokyo/2015/03/ノンブロッキングi/oと非同期i/oの違いを理解する/

どうやってepollを使って非同期IOを実現するか、epollの使い方

epollの内部動作ではなく使い方の説明です。
必要な道具は4つ

  1. epoll_create1() - epollFDの作成
  2. epoll_ctl() - 監視対象のfdを登録
  3. events配列の作成 - epoll_waitで監視対象のfdを受け取る際のバッファ
  4. epoll_wait - イベントがあったfdを取得してeventsに入れる

流れとしては下記のような感じです。
Goではnetpollという部分で監視の部分のepoll_waitを担当します。
登録の部分はlistenでもacceptでもコネクション処理でもやります。ただ処理としては全部下記みたいにepoll_ctlするだけとなっています。

 int epfd = epoll_create1(0);
 
 struct epoll_event events[10];
 
 //ev.eventsはどんなイベントを監視するか、ev.data.fdはどのfdを監視するか
 struct epoll_event ev;
 ev.events = EPOLLIN;//受信イベント
 ev.data.fd = someFD; //監視対象のfdをセット
 
 //監視対象のfdをepollに追加
 epoll_ctl(epfd,EPOLL_CTL_ADD,someFD,&ev);
 
 for(;;){
   //epoll_waitで監視対象のfd達が準備完了かどうか見て、準備完了のものだけをeventsに写し取る
   numFds  = epoll_wait(epfd,events,10,-1);
   
   ...
 }

一つだけカギとなる部分としてGoではev.dataのところにpollDescという構造体を入れることです。
ev.data.fd = pollDesc.fdとなり、
pollDesc.rg or pollDesc.wgにはGoRoutineが入ることだけを押さえておきます。

type pollDesc struct {
	...
	fd      uintptr
	rg      uintptr   // pdReady, pdWait, G waiting for read or nil
	wg      uintptr   // pdReady, pdWait, G waiting for write or nil
	...
}

参考
https://raskr.hatenablog.com/entry/2018/04/21/143825

Listenの処理を見ていく

それではListenを見ていきます。
Listenでの働きとしてはlisten由来のfdをsocketで作り、そのfdをepollに登録して監視対象にすることです。

ListenAndServe -> net.Listen -> ListenConfig.Listen -> sysListener.ListenTCP -> internetSocket

func ListenAndServe(addr string, handler Handler) error {
        server := &Server{Addr: addr, Handler: handler}
        return server.ListenAndServe()
}

func (srv *Server) ListenAndServe() error {
        if srv.shuttingDown() {
                return ErrServerClosed
        }
        addr := srv.Addr
        if addr == "" {
                addr = ":http"
        }
        ln, err := net.Listen("tcp", addr)  <-if err != nil {
                return err
        }
        return srv.Serve(ln)
}

func Listen(network, address string) (Listener, error) {
        var lc ListenConfig
        return lc.Listen(context.Background(), network, address)
}

func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
        
        sl := &sysListener{
                ListenConfig: *lc,
                network:      network,
                address:      address,
        }
        var l Listener
        la := addrs.first(isIPv4)
        switch la := la.(type) {
        case *TCPAddr:
                l, err = sl.listenTCP(ctx, la) 
        case *UnixAddr:
                l, err = sl.listenUnix(ctx, la)
        default:
                return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
        }
        
        return l, nil
}

func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
        fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
        if err != nil {
                return nil, err
        }
        return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}

func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
        if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd") && mode == "dial" && raddr.isWildcard() {
                raddr = raddr.toLocal(net)
        }
        family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
        return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}

listenは内部でsocketを実行しています。引き続きsocketから見ていきましょう。

socket -> netFD.listenStream -> netFD.init -> fd.Init -> pollDesc.init

 func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
	s, err := sysSocket(family, sotype, proto) //★ここでlisten用のfdを作成
	if err != nil {
		return nil, err
	}
	if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
		poll.CloseFunc(s)
		return nil, err
	}
	if fd, err = newFD(s, family, sotype, net); err != nil { //★netFDを作成
		poll.CloseFunc(s)
		return nil, err
	}

	if laddr != nil && raddr == nil {
		switch sotype {
		case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
			if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {  <-★
				fd.Close()
				return nil, err
			}
			return fd, nil
		case syscall.SOCK_DGRAM:
			if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
				fd.Close()
				return nil, err
			}
			return fd, nil
		}
	}
	if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
		fd.Close()
		return nil, err
	}
	return fd, nil
}

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
	var err error
	if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
		return err
	}
	var lsa syscall.Sockaddr
	if lsa, err = laddr.sockaddr(fd.family); err != nil {
		return err
	}
	if ctrlFn != nil {
		c, err := newRawConn(fd)
		if err != nil {
			return err
		}
		if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
			return err
		}
	}
	if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
		return os.NewSyscallError("bind", err)
	}
	if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
		return os.NewSyscallError("listen", err)
	}
	if err = fd.init(); err != nil {  <-return err
	}
	lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
	fd.setAddr(fd.addrFunc()(lsa), nil)
	return nil
}

func (fd *netFD) init() error {
    errcall, err := fd.pfd.Init(fd.net, true)
    if errcall != "" {
        err = wrapSyscallError(errcall, err)
    }
    return err
}

fd.unix.go
func (fd *FD) Init(net string, pollable bool) error {
	// We don't actually care about the various network types.
	if net == "file" {
		fd.isFile = true
	}
	if !pollable {
		fd.isBlocking = 1
		return nil
	}
	err := fd.pd.init(fd)
	if err != nil {
		// If we could not initialize the runtime poller,
		// assume we are using blocking mode.
		fd.isBlocking = 1
	}
	return err
}

fd_poll_runtime
func (pd *pollDesc) init(fd *FD) error {
	serverInit.Do(runtime_pollServerInit)  <-1
	ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd)) <-2
	//ここのfdはsocketでとってきたlistenで使う用のfd
	if errno != 0 {
		return errnoErr(syscall.Errno(errno))
	}
	pd.runtimeCtx = ctx
	return nil
}

FDがたくさん出てきてややこしいですが、関係としては下図のようになっています。
FD.SysfdとpollDesc.fdがfdの本体で、pollDescのrg,wgでGoroutineを扱うことができます。
Goroutineを扱う部分はAccept編で見ることになります。
中でも今回はpollDescが一番重要です。

pollDesc.init ~ netpollopen

さて、ようやくここまできてepollが絡んできます。
pollDesc.init()の1でepollを作り、2でlistenで使うfdをepollの監視対象にします。

1 epollの作成

netpoll.go
func poll_runtime_pollServerInit() {
	netpollGenericInit()
}

func netpollGenericInit() {
	if atomic.Load(&netpollInited) == 0 {
		lockInit(&netpollInitLock, lockRankNetpollInit)
		lock(&netpollInitLock)
		if netpollInited == 0 {
			netpollinit()
			atomic.Store(&netpollInited, 1)
		}
		unlock(&netpollInitLock)
	}
}

netpoll_epoll.go
func netpollinit() {
	epfd = epollcreate1(_EPOLL_CLOEXEC)  <---ここでepollを作成
	if epfd < 0 {
		epfd = epollcreate(1024)
		if epfd < 0 {
			println("runtime: epollcreate failed with", -epfd)
			throw("runtime: netpollinit failed")
		}
		closeonexec(epfd)
	}
	r, w, errno := nonblockingPipe()
	if errno != 0 {
		println("runtime: pipe failed with", -errno)
		throw("runtime: pipe failed")
	}
	ev := epollevent{
		events: _EPOLLIN,
	}
	*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
	errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)  //ここでもepollに監視対象を登録しているのですが本筋じゃないのでスルー
	if errno != 0 {
		println("runtime: epollctl failed with", -errno)
		throw("runtime: epollctl failed")
	}
	netpollBreakRd = uintptr(r)
	netpollBreakWr = uintptr(w)
}

2 listenのfdをepollに登録

netpoll.go
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
	pd := pollcache.alloc() <-------★pollDesdcを作成
	lock(&pd.lock)
	if pd.wg != 0 && pd.wg != pdReady {
		throw("runtime: blocked write on free polldesc")
	}
	if pd.rg != 0 && pd.rg != pdReady {
		throw("runtime: blocked read on free polldesc")
	}
	pd.fd = fd      <-pollDesc.fdにlistenFDをセット   
	pd.closing = false
	pd.everr = false
	pd.rseq++
	pd.rg = 0       <-ここではGoroutineをセットせずに0のまま、accept編で役立ちます。
	pd.rd = 0
	pd.wseq++
	pd.wg = 0
	pd.wd = 0
	pd.self = pd
	unlock(&pd.lock)

	errno := netpollopen(fd, pd)
	if errno != 0 {
		pollcache.free(pd)
		return nil, int(errno)
	}
	return pd, 0
}

netpoll_epoll.go
func netpollopen(fd uintptr, pd *pollDesc) int32 {
	var ev epollevent
	ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
	*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd   <-ev.dataにpdを入れています。取り出しの時に役立ちます。
	return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

さて、ここまででepollへlistenのfd(listenFDと呼びます)を登録することができました。
次回はaccept編で、登録したlistenFDがどう使われるか見ていきましょう。

次回

https://zenn.dev/cube/articles/0731eb7c14bb5d

Discussion