Goのnet/httpのServer側デザインの良さについて語る(Listen編)
Goのnet/httpのServerのデザインの良さ
net/httpのコードリーディングをしていてepollとscheduler、goroutineがうまく協調してデザインしてあり上手く回っているものだな・・・と思ったので紹介します。
といっても下記の記事で解説されているので、ここではもうちょっと全体の流れをソースコードを詳しく見てみて追っていきます。
参考
最初に全体像の説明
上記画像の感じですね。
epollとかGoRoutineとかが絡んできます。
上記画像はコネクション処理編での動作なのでListen,Acceptはそれまでの溜めみたいなものとなっています。
Listen編 -> Accept編 -> コネクション処理編と続きます。
epoll is 何
下記の解説が詳しいです。ざくっと言うと非同期IOを実現させるためのものです。(厳密な非同期IOの定義とは違うかもしれませんが容赦ください)
同期IOだと下記はRead終了までブロックしてしまいます。someTaskはRead完了まで実行されません。
ret := SomeHeavyRead()
someTask()
非同期IOだとReadが終わらなくてもsomeTaskが実行されます。
ret := SomeAsyncHeavyRead()
someTask()
同期IO,非同期IO,ブロックング、ノンブロッキングの定義等は下記参考にすると良いかもしれません。
どうやってepollを使って非同期IOを実現するか、epollの使い方
epollの内部動作ではなく使い方の説明です。
必要な道具は4つ
- epoll_create1() - epollFDの作成
- epoll_ctl() - 監視対象のfdを登録
- events配列の作成 - epoll_waitで監視対象のfdを受け取る際のバッファ
- epoll_wait - イベントがあったfdを取得してeventsに入れる
流れとしては下記のような感じです。
Goではnetpollという部分で監視の部分のepoll_waitを担当します。
登録の部分はlistenでもacceptでもコネクション処理でもやります。ただ処理としては全部下記みたいにepoll_ctlするだけとなっています。
int epfd = epoll_create1(0);
struct epoll_event events[10];
//ev.eventsはどんなイベントを監視するか、ev.data.fdはどのfdを監視するか
struct epoll_event ev;
ev.events = EPOLLIN;//受信イベント
ev.data.fd = someFD; //監視対象のfdをセット
//監視対象のfdをepollに追加
epoll_ctl(epfd,EPOLL_CTL_ADD,someFD,&ev);
for(;;){
//epoll_waitで監視対象のfd達が準備完了かどうか見て、準備完了のものだけをeventsに写し取る
numFds = epoll_wait(epfd,events,10,-1);
...
}
一つだけカギとなる部分としてGoではev.dataのところにpollDescという構造体を入れることです。
ev.data.fd = pollDesc.fdとなり、
pollDesc.rg or pollDesc.wgにはGoRoutineが入ることだけを押さえておきます。
type pollDesc struct {
...
fd uintptr
rg uintptr // pdReady, pdWait, G waiting for read or nil
wg uintptr // pdReady, pdWait, G waiting for write or nil
...
}
参考
Listenの処理を見ていく
それではListenを見ていきます。
Listenでの働きとしてはlisten由来のfdをsocketで作り、そのfdをepollに登録して監視対象にすることです。
ListenAndServe -> net.Listen -> ListenConfig.Listen -> sysListener.ListenTCP -> internetSocket
func ListenAndServe(addr string, handler Handler) error {
server := &Server{Addr: addr, Handler: handler}
return server.ListenAndServe()
}
func (srv *Server) ListenAndServe() error {
if srv.shuttingDown() {
return ErrServerClosed
}
addr := srv.Addr
if addr == "" {
addr = ":http"
}
ln, err := net.Listen("tcp", addr) <-★
if err != nil {
return err
}
return srv.Serve(ln)
}
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
sl := &sysListener{
ListenConfig: *lc,
network: network,
address: address,
}
var l Listener
la := addrs.first(isIPv4)
switch la := la.(type) {
case *TCPAddr:
l, err = sl.listenTCP(ctx, la)
case *UnixAddr:
l, err = sl.listenUnix(ctx, la)
default:
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
}
return l, nil
}
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
if err != nil {
return nil, err
}
return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd") && mode == "dial" && raddr.isWildcard() {
raddr = raddr.toLocal(net)
}
family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}
listenは内部でsocketを実行しています。引き続きsocketから見ていきましょう。
socket -> netFD.listenStream -> netFD.init -> fd.Init -> pollDesc.init
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
s, err := sysSocket(family, sotype, proto) //★ここでlisten用のfdを作成
if err != nil {
return nil, err
}
if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
poll.CloseFunc(s)
return nil, err
}
if fd, err = newFD(s, family, sotype, net); err != nil { //★netFDを作成
poll.CloseFunc(s)
return nil, err
}
if laddr != nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil { <-★
fd.Close()
return nil, err
}
return fd, nil
case syscall.SOCK_DGRAM:
if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
}
if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
var err error
if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
return err
}
var lsa syscall.Sockaddr
if lsa, err = laddr.sockaddr(fd.family); err != nil {
return err
}
if ctrlFn != nil {
c, err := newRawConn(fd)
if err != nil {
return err
}
if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
return err
}
}
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
if err = fd.init(); err != nil { <-★
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
func (fd *netFD) init() error {
errcall, err := fd.pfd.Init(fd.net, true)
if errcall != "" {
err = wrapSyscallError(errcall, err)
}
return err
}
fd.unix.go
func (fd *FD) Init(net string, pollable bool) error {
// We don't actually care about the various network types.
if net == "file" {
fd.isFile = true
}
if !pollable {
fd.isBlocking = 1
return nil
}
err := fd.pd.init(fd)
if err != nil {
// If we could not initialize the runtime poller,
// assume we are using blocking mode.
fd.isBlocking = 1
}
return err
}
fd_poll_runtime
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit) <-☆1
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd)) <-☆2
//ここのfdはsocketでとってきたlistenで使う用のfd
if errno != 0 {
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}
FDがたくさん出てきてややこしいですが、関係としては下図のようになっています。
FD.SysfdとpollDesc.fdがfdの本体で、pollDescのrg,wgでGoroutineを扱うことができます。
Goroutineを扱う部分はAccept編で見ることになります。
中でも今回はpollDescが一番重要です。
pollDesc.init ~ netpollopen
さて、ようやくここまできてepollが絡んできます。
pollDesc.init()の1でepollを作り、2でlistenで使うfdをepollの監視対象にします。
1 epollの作成
netpoll.go
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
if atomic.Load(&netpollInited) == 0 {
lockInit(&netpollInitLock, lockRankNetpollInit)
lock(&netpollInitLock)
if netpollInited == 0 {
netpollinit()
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}
netpoll_epoll.go
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC) <---ここでepollを作成
if epfd < 0 {
epfd = epollcreate(1024)
if epfd < 0 {
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}
closeonexec(epfd)
}
r, w, errno := nonblockingPipe()
if errno != 0 {
println("runtime: pipe failed with", -errno)
throw("runtime: pipe failed")
}
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev) //ここでもepollに監視対象を登録しているのですが本筋じゃないのでスルー
if errno != 0 {
println("runtime: epollctl failed with", -errno)
throw("runtime: epollctl failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
2 listenのfdをepollに登録
netpoll.go
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
pd := pollcache.alloc() <-------★pollDesdcを作成
lock(&pd.lock)
if pd.wg != 0 && pd.wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
if pd.rg != 0 && pd.rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
pd.fd = fd <-pollDesc.fdにlistenFDをセット
pd.closing = false
pd.everr = false
pd.rseq++
pd.rg = 0 <-ここではGoroutineをセットせずに0のまま、accept編で役立ちます。
pd.rd = 0
pd.wseq++
pd.wg = 0
pd.wd = 0
pd.self = pd
unlock(&pd.lock)
errno := netpollopen(fd, pd)
if errno != 0 {
pollcache.free(pd)
return nil, int(errno)
}
return pd, 0
}
netpoll_epoll.go
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd <-ev.dataにpdを入れています。取り出しの時に役立ちます。
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
さて、ここまででepollへlistenのfd(listenFDと呼びます)を登録することができました。
次回はaccept編で、登録したlistenFDがどう使われるか見ていきましょう。
次回
Discussion