Open10

socket をざっくりキャッチアップ

zuribozuribo

Socket とは?

  • Socket API は、アプリケーションがネットワーク上でデータを送受信するための仕組みを抽象化した API (Application Programming Interface) である。
  • socket とは、データの送受信を行うために使うエンドポイントのことである。
  • socket の種類
    • Stream sockets: トランスポート層のプロトコルに TCP (Transmission Control Protocol)、SCTP (Stream Control Transmission Protocol)、DCCP (Datagram Congestion Control Protocol) などを使った connection-oriented socket のこと。
    • Datagram sockets: トランスポート層のプロトコルに UDP (User Datagram Protocol) を使った connectionless socket のこと。
    • Unix Domain Sockets (UDS): ローカルファイルを使って、同一ホスト上のプロセス間のコミュニケーションを実現するもの。
zuribozuribo

Socket API

  • socket を使ってアプリケーション間で通信を行うための API (Application Programming Interface) のこと。
  • Socket API を使ったプログラムの開発のことを、socket programming / network programming と呼ぶ。

Berkeley sockets

  • Berkeley sockets standard では、Unix の哲学の「everything is a file」に則って、socket は file descriptor で表現される。
  • Berkeley sockets standard は、多少の変更を加えて、de-facto standard から POSIX 仕様になった。よって、POSIX sockets は基本的に Berkeley sockets と同義であり、BSD sockets とも呼ばれる。

Socket API functions

socket()

通信のエンドポイントとなる新しい socket を作成し、その socket に対応する file descriptor を返す。

#include <sys/socket.h>

int socket(int domain, int type, int protocol);

以下の3つの引数を取る。

  • domain: socket の protocol family を指定する。
    • AF_INET: IPv4 Internet protocol
    • AF_INET6: IPv6 Internet protocol
    • AF_UNIX: ローカル接続
  • type: socket の種類
    • SOCK_STREAM
    • SOCK_DGRAM
  • protocol: 使用する transport protocol を指定する。
    • IPPROTO_TCP
    • IPPROTO_SCTP
    • IPPROTO_UDP
    • IPPROTO_DCCP

bind()

基本的にサーバー側で使用され、socket を socket address (例えば local IP address と port 番号) に紐付ける。

#include <sys/socket.h>

int bind(int sockfd, const strut sockaddr *addr, socklen_t addrlen);

以下の3つの引数を取る。

  • sockfd: socket の file descriptor
  • addr: socket address を表現する sockaddr 構造体へのポインタ
  • addrlen: sockaddr 構造体のサイズを格納する socklen_t 型のデータへのポインタ

listen()

サーバー側で使用され、socket を listening state に変え、入ってくる接続に備えて準備する。SOCK_STREAM などの stream-oriented data mode のみで必要である。

#include <sys/socket.h>

int listen(int sockfd, int backlog);

以下の2つの引数を取る。

  • sockfd: 接続待ちに使う socket の file descriptor
  • backlog: 一度に queue に入れられる pending な接続の最大数

connect()

クライアント側で使用され、接続を行うのに使用される。

#include <sys/socket.h>

int connect(int sockfd, const strut sockaddr *addr, socklen_t addrlen);

以下の2つの引数を取る。

  • sockfd: 接続元として使用する socket の file descriptor
  • addr: 接続先の socket address を示す sockaddr 構造体へのポインタ
  • addrlen: sockaddr 構造体のサイズを格納する socklen_t 型のデータへのポインタ

accept()

サーバー側で使用され、クライアントから受け取った新しい TCP 接続確立の要求を受け入れ、その接続用の新しい socket を作成する。各接続ごとに socket が作成され、その socket への file descriptor が返される。

#include <sys/socket.h>

int accept(int sockfd, struct sockaddr *_Nullable restrict addr, socklen_t *_Nullable restrict addrlen);

以下の3つの引数を取る。

  • sockfd: 接続要求を queue に溜めておくための listening socket の file descriptor
  • addr: クライアントの情報を受け取るための sockaddr 構造体へのポインタ
  • addrlen: クライアントの sockaddr 構造体のサイズ情報を受け取るための socklen_t 型のデータへのポインタ

send(), sendto(), sendmsg(), recv(), recvfrom(), recvmsg()

データの送受信で使われる。

#include <sys/socket.h>

ssize_t send(int sockfd, const void buf, ssize_t len, int flags);
ssize_t sendto(int sockfd, const void buf, ssize_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen);
ssize_t sendmsg(int sockfd, const struct msghdr *msd, int flags);
#include <sys/socket.h>

ssize_t recv(int sockfd, void buf, size_t len, int flags);
ssize_t recvfrom(int sockfd, void buf, size_t len, int flags, struct sockaddr *_Nullable restrict src_addr, socklen_t *_Nullable restrict addrlen);
ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);

close()

socket に紐づいたリソースを解放する。

#include <unistd.h>

int close(int fd);

getaddrinfo(), freeaddrinfo()

ホスト名、ホストアドレスを解決する。

#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>

int getaddrinfo(const char *restrict node, const char *restrict service, const struct addrinfo *restrict hists, struct addrinfo **restrict res);
void freeaddrinfo(struct addrinfo *res);

select()

プロセスを中断して、渡された socket のリストの1つ以上が読み書き可能な状態になるのを待つ。

#include <sys/select.h>

int select(int nfds, fd_set *_Nullable restrict readfds, fd_set *_Nullable restrict writefds, fd_set *_Nullable restrict exceptfds, struct timeval *_Nullable restrict timeout);

poll()

渡された socket のリストの状態が変わっていないか確認する。

#include <poll.h>

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

getsockopt(), setsockopt()

特定の socket の特定の socket option の値を取得・設定する。

#include <sys/socket.h>

int getsockopt(int sockfd, int level, int optname, void optval, socklen_t *option);
int setsockopt(int sockfd, int level, int optname, const void optval, socklen_t optlen);
zuribozuribo

Socket Programming in Python

TCP

動作確認

先に server.py を実行する。

$ python3 server.py

次に別の端末から client.py を実行して、メッセージ「hello world」を打つ。

$ python3 client.py
Input a message: hello world

server 側で接続リクエストが来たことが表示され、「hello world」のメッセージが受け取れていることがわかります。

$ python3 server.py
2023-12-30 21:45:06.088524
Got a connection request: ('127.0.0.1', 51141)
hello world

client 側では、server がメッセージ受け取った日時が、メッセージとして返ってきているのが確認できます。

$ python3 client.py
Input a message: hello world
Got a message: 2023-12-30 21:45:06.088524

server.py

import socket
import datetime

PORT = 50000
BUFSIZE = 4096

# create a socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# bind a socket to an address
server.bind(("", PORT))

# listen connection requests on the socket
server.listen()

while True:
	# wait for a connection request
	client, addr = server.accept()

	msg = str(datetime.datetime.now())
	print(msg)
	print("Got a connection request:", addr)

	# receive data from the client
	data = client.recv(BUFSIZE)
	print(data.decode("utf-8"))

	# send a message back to the client
	client.sendall(msg.encode("utf-8"))

	# close the connection
	client.close()

client.py

import socket

HOST = "localhost"
PORT = 50000
BUFSIZE = 4096

# create a socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# connect to a server
client.connect((HOST, PORT))

# send a message to the server
msg = input("Input a message: ")
client.sendall(msg.encode("utf-8"))

# receive a message from the server
data = client.recv(BUFSIZE)
print("Got a message:", data.decode("utf-8"))

# close the connection
client.close()

UDP

動作確認

まず server.py を実行する。

$ python3 server.py

次に別の端末から client.py を実行する。

$ python3 client.py

server 側では、client から送られてきたメッセージが表示される。

$ python3 server.py
2023-12-30 22:03:14.716057
Got a message: ('127.0.0.1', 53113)
b'hello world'

client 側では、server がメッセージを受け取った日時が、メッセージとして返ってきている。

$ python3 client.py
2023-12-30 22:03:14.716057

server.py

import socket
import datetime

PORT = 50000
BUFSIZE = 4096

# create a socket
server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# bind the socket to an socket address
server.bind(("", PORT))

while True:
	# wait for receiving a message from a client
	data, client = server.recvfrom(BUFSIZE)

	msg = str(datetime.datetime.now())
	print(msg)
	print("Got a message:", client)
	print(data)

	# send a message back to the client
	server.sendto(msg.encode("utf-8"), client)

client.py

import socket

HOST = "localhost"
PORT = 50000
BUFSIZE = 4096

# create a socket
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# send a message to a server
msg = "hello world"
client.sendto(msg.encode("utf-8"), (HOST, PORT))

# receive a message from the server
data = client.recv(BUFSIZE)
print(data.decode("utf-8"))

# close the socket
client.close()
zuribozuribo

Socket Programming in Rust

TCP Echo

Server

use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};

fn handle_client(mut stream: TcpStream) -> std::io::Result<()> {
    let mut buf = [0; 4096];

    // receive data from the client.
    stream.read(&mut buf)?;
    println!("{}", std::str::from_utf8(&buf).unwrap());

    // send the same data back to the client.
    stream.write(&buf)?;

    Ok(())
}

fn main() -> std::io::Result<()> {
    // create a socket, bind it to an address and listen on it.
    let listener = TcpListener::bind("127.0.0.1:50000")?;

    // accept connections and process them serially.
    for stream in listener.incoming() {
        handle_client(stream?)?;
    }

    Ok(())
}

Client

use std::io::{Read, Write};
use std::net::TcpStream;

fn main() -> std::io::Result<()> {
    // create a socket and connect to a server.
    let mut stream = TcpStream::connect("127.0.0.1:50000")?;

    // send a message to the server.
    let msg = "hello world";
    stream.write(msg.as_bytes())?;

    // receive a message from the server.
    let mut buf = [0u8; 4096];
    stream.read(&mut buf)?;
    println!("{}", std::str::from_utf8(&buf).unwrap());

    Ok(())
}
zuribozuribo

Socket Programming in C

TCP Echo

動作確認

$ ./server
Got a request from 127.0.0.1:11418
hello world
$ ./client
hello world

server.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>

#define SERVER_PORT 50000
#define MAX_PENDING_CONN 5
#define BUFFER_SIZE 4096

int main(int argc, char *argv[])
{
	int server_sock, client_sock, ret;
	struct sockaddr_in server_addr, client_addr;
	unsigned int client_addr_len;
	char buffer[BUFFER_SIZE];
	int msg_len;

	// create a socket
	server_sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
	if (server_sock < 0) {
		perror("socket() failed");
		exit(1);
	}

	// construct a local socket address
	memset(&server_addr, 0, sizeof(server_addr));
	server_addr.sin_family = AF_INET;
	server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
	server_addr.sin_port = SERVER_PORT;

	// bind the socket to the server address
	ret = bind(server_sock, (struct sockaddr *)&server_addr,
		   sizeof(server_addr));
	if (ret < 0) {
		perror("bind() failed");
		exit(1);
	}

	// listen on the socket
	ret = listen(server_sock, MAX_PENDING_CONN);
	if (ret < 0) {
		perror("listen() failed");
		exit(1);
	}

	for (;;) {
		// wait for a connection request from a client
		client_sock = accept(server_sock,
				     (struct sockaddr *)&client_addr,
				     &client_addr_len);
		if (client_sock < 0) {
			perror("accept() failed");
			exit(1);
		}

		// print the client information
		printf("Got a request from %s:%d\n",
		       inet_ntoa(client_addr.sin_addr), client_addr.sin_port);

		do {
			// receive a message from the client
			msg_len = recv(client_sock, buffer, BUFFER_SIZE - 1, 0);
			if (msg_len < 0) {
				perror("recv() failed");
				exit(1);
			}
			buffer[msg_len] = '\0';
			printf(buffer);

			// send the same message back to the client
			ret = send(client_sock, buffer, msg_len, 0);
			if (ret != msg_len) {
				perror("send() failed");
				exit(1);
			}
		} while (msg_len > 0);
		printf("\n");

		// close the client socket
		close(client_sock);
	}

	return 0;
}

client.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>

#define SERVER_ADDRESS "127.0.0.1"
#define SERVER_PORT 50000
#define BUFFER_SIZE 4096
#define MSG "hello world"

int main(int argc, char *argv[])
{
	int ret, client_sock, total;
	struct sockaddr_in server_addr;
	char buffer[BUFFER_SIZE];
	unsigned int msg_len;

	// create a socket
	client_sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
	if (client_sock < 0) {
		perror("socket() failed");
		exit(1);
	}

	// construct a server socket address
	memset(&server_addr, 0, sizeof(server_addr));
	server_addr.sin_family = AF_INET;
	server_addr.sin_addr.s_addr = inet_addr(SERVER_ADDRESS);
	server_addr.sin_port = SERVER_PORT;

	// connect to the server
	ret = connect(client_sock, (struct sockaddr *)&server_addr,
		      sizeof(server_addr));
	if (ret < 0) {
		perror("connect() failed");
		exit(1);
	}

	// send a message to the server
	msg_len = strlen(MSG);
	ret = send(client_sock, MSG, msg_len, 0);
	if (ret != msg_len) {
		perror("send()");
		exit(1);
	}

	// receive a message from the server
	total = 0;
	while (total < msg_len) {
		ret = recv(client_sock, buffer, BUFFER_SIZE - 1, 0);
		if (ret <= 0) {
			perror("recv() failed or connection closed");
			exit(1);
		}
		total += ret;
		buffer[ret] = '\0';
		printf(buffer);
	}
	printf("\n");

	// close the socket
	close(client_sock);
	return 0;
}

UDP Echo

動作確認

$ ./server
Got a request from 127.0.0.1:1439
hello world
Got a request from 127.0.0.1:2531
hello world
Got a request from 127.0.0.1:34967
hello world
$ ./client
hello world

$ ./client
hello world

$ ./client
hello world

server.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>

#define SERVER_PORT 50000
#define BUFFER_SIZE 4096

int main(int argc, char *argv[])
{
	int ret, server_sock;
	struct sockaddr_in server_addr, client_addr;
	unsigned int client_addr_size, msg_len;
	char buffer[BUFFER_SIZE];

	// create a socket
	server_sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
	if (server_sock < 0) {
		perror("socket() failed");
		exit(1);
	}

	// construct a server socket address
	memset(&server_addr, 0, sizeof(server_addr));
	server_addr.sin_family = AF_INET;
	server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
	server_addr.sin_port = htons(SERVER_PORT);

	// bind the socket to the server sock address
	ret = bind(server_sock, (struct sockaddr *)&server_addr,
		   sizeof(server_addr));
	if (ret < 0) {
		perror("bind() failed");
		exit(1);
	}

	// handle requests
	for (;;) {
		client_addr_size = sizeof(client_addr);

		// receive a message from a client
		msg_len = recvfrom(server_sock, buffer, BUFFER_SIZE - 1, 0,
				   (struct sockaddr *)&client_addr,
				   &client_addr_size);
		if (msg_len < 0) {
			perror("recvfrom() failed");
			exit(1);
		}

		printf("Got a request from %s:%d\n",
		       inet_ntoa(client_addr.sin_addr), client_addr.sin_port);

		buffer[msg_len] = '\0';
		printf("%s\n", buffer);

		// response back to the client
		ret = sendto(server_sock, buffer, msg_len, 0,
			     (struct sockaddr *)&client_addr,
			     sizeof(client_addr));
		if (ret != msg_len) {
			perror("sendto() failed");
			exit(1);
		}
	}
}

client.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>

#define SERVER_ADDRESS "127.0.0.1"
#define SERVER_PORT 50000
#define BUFFER_SIZE 4096
#define MSG "hello world"

int main(int argc, char *argv[])
{
	int ret, client_sock;
	struct sockaddr_in server_addr, recv_addr;
	char buffer[BUFFER_SIZE];
	unsigned int msg_len, recv_addr_size;

	// create a socket
	client_sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
	if (client_sock < 0) {
		perror("socekt() failed");
		exit(1);
	}

	// construct a server socket address
	memset(&server_addr, 0, sizeof(server_addr));
	server_addr.sin_family = AF_INET;
	server_addr.sin_addr.s_addr = inet_addr(SERVER_ADDRESS);
	server_addr.sin_port = htons(SERVER_PORT);

	// send a message to the server
	msg_len = strlen(MSG);
	ret = sendto(client_sock, MSG, msg_len, 0,
		     (struct sockaddr *)&server_addr, sizeof(server_addr));
	if (ret != msg_len) {
		perror("sendto() failed");
		exit(1);
	}

	// receive a message from the server
	ret = recvfrom(client_sock, buffer, BUFFER_SIZE - 1, 0,
		       (struct sockaddr *)&recv_addr, &recv_addr_size);
	if (ret != msg_len) {
		perror("recvfrom() failed");
		exit(1);
	}

	if (server_addr.sin_addr.s_addr != recv_addr.sin_addr.s_addr) {
		fprintf(stderr, "Received a packet from an unknown source.\n");
		exit(1);
	}

	buffer[ret] = '\0';
	printf("%s\n", buffer);

	// close the socket
	close(client_sock);
	return 0;
}

zuribozuribo

socat (SOcket CAT)

ライフサイクル

  1. Init phase: CLI オプションを parse して、logging を初期化する。
  2. Open phase: 1つ目に渡された address を開けた後に、2つ目に渡された address を開ける。
  3. Transfer phase: 両方の stream の読み書きを select() を通して、データの受け渡しを行う。
  4. Closing phase: 一方の stream が EOF に到達した場合に、もう一方の stream に EOF を転送する。

Get Started

まずターミナルから以下を実行する。

$ socat tcp-listen:70000 stdout

70000 番 port で listen する TCP socket が作成され、その socket と標準出力 (stdout) が繋がれる。

次に、別のターミナルを開き、以下を実行する。

$ socat tcp-connect:localhost:70000 stdin

localhost の 70000 番 port に connect する TCP socket が作成され、その socket と標準入力 (stdin) が繋がれる。

上記の設定により、2つ目のターミナルの標準入力に入力した内容が、1つ目のターミナルの標準出力に出力されるようになる。

$ socat tcp-connect:localhost:70000 stdin
test
$ socat tcp-listen:70000 stdout
test

どちらか一方のターミナルでの Ctrl+C によって、接続が終了する。

zuribozuribo

Single-thread vs. Multi-thread

Single-thread

server 側が、同時にきた複数のリクエストを並列に処理できず、1つのリクエストを処理できてから、その次のリクエストを処理する。

1つ前の「Socket programming in Python」のサンプルを以下のように変更。

  • server.py: リクエストへレスポンスする前に 3 秒間スリープする。
  • client.py: 引数を1つ取り、送るメッセージにそれを含める。

動作確認

3秒に1つのリクエストしか処理できていないのがわかる。

server 側

$ python3 server.py
2023-12-30 22:16:36.365614
Got a connection request: ('127.0.0.1', 53213)
Hello from client 1
2023-12-30 22:16:39.371383
Got a connection request: ('127.0.0.1', 53214)
Hello from client 2
2023-12-30 22:16:42.373630
Got a connection request: ('127.0.0.1', 53215)
Hello from client 4
2023-12-30 22:16:45.379482
Got a connection request: ('127.0.0.1', 53216)
Hello from client 3
2023-12-30 22:16:48.381774
Got a connection request: ('127.0.0.1', 53217)
Hello from client 7
2023-12-30 22:16:51.386791
Got a connection request: ('127.0.0.1', 53218)
Hello from client 5
2023-12-30 22:16:54.390117
Got a connection request: ('127.0.0.1', 53219)
Hello from client 6
2023-12-30 22:16:57.392960
Got a connection request: ('127.0.0.1', 53220)
Hello from client 8
2023-12-30 22:17:00.395567
Got a connection request: ('127.0.0.1', 53221)
Hello from client 10
2023-12-30 22:17:03.399871
Got a connection request: ('127.0.0.1', 53222)
Hello from client 9

client 側

$ for i in {1..10}; do python3 client.py $i & done
[2] 41908
[3] 41909
[4] 41910
[5] 41911
[6] 41912
[7] 41913
[8] 41914
[9] 41915
[10] 41916
[11] 41917

server.py

import time
import socket
import datetime

PORT = 50000
BUFSIZE = 4096

# create a socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# bind a socket to an address
server.bind(("", PORT))

# listen connection requests on the socket
server.listen()

while True:
    # wait for a connection request
    client, addr = server.accept()

    msg = str(datetime.datetime.now())
    print(msg)
    print("Got a connection request:", addr)

    # receive data from the client
    data = client.recv(BUFSIZE)
    print(data.decode("utf-8"))

    time.sleep(3)

    # send a message back to the client
    client.sendall(msg.encode("utf-8"))

    # close the connection
    client.close()

client.py

import sys
import socket

HOST = "localhost"
PORT = 50000
BUFSIZE = 4096

# create a socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# connect to a server
client.connect((HOST, PORT))

# send a message to the server
msg = f"Hello from client {sys.argv[1]}"
client.sendall(msg.encode("utf-8"))

# receive a message from the server
data = client.recv(BUFSIZE)
print("Got a message:", data.decode("utf-8"))

# close the connection
client.close()

Multi-threading

server 側が、並列して I/O を処理することができる。

以下のように変更。

  • server.py を、リクエストを受け取ったら新しいスレッドを立ち上げ、そのスレッド内でレスポンスを返す。

動作確認

3秒待たずにどんどんリクエストを処理できているのがわかる。

server 側

$ python3 server.py
2023-12-30 22:20:44.702687
Got a connection request: ('127.0.0.1', 53440)
Hello from client 3
2023-12-30 22:20:44.706570
Got a connection request: ('127.0.0.1', 53441)
Hello from client 1
2023-12-30 22:20:44.714539
Got a connection request: ('127.0.0.1', 53442)
Hello from client 4
2023-12-30 22:20:44.717746
Got a connection request: ('127.0.0.1', 53443)
Hello from client 5
2023-12-30 22:20:44.738372
Got a connection request: ('127.0.0.1', 53444)
Hello from client 7
2023-12-30 22:20:44.740037
Got a connection request: ('127.0.0.1', 53445)
Hello from client 8
2023-12-30 22:20:44.745009
Got a connection request: ('127.0.0.1', 53446)
Hello from client 10
2023-12-30 22:20:44.751614
Got a connection request: ('127.0.0.1', 53447)
Hello from client 9
2023-12-30 22:20:44.753527
Got a connection request: ('127.0.0.1', 53448)
Hello from client 2
2023-12-30 22:20:44.753891
Got a connection request: ('127.0.0.1', 53449)
Hello from client 6

client 側

$ for i in {1..10}; do python3 client.py $i & done
[2] 42580
[3] 42581
[4] 42582
[5] 42583
[6] 42584
[7] 42585
[8] 42586
[9] 42587
[10] 42588
[11] 42589

server.py

import time
import socket
import datetime
import threading

PORT = 50000
BUFSIZE = 4096

# handler to be passed to a thread
def client_handler(client, msg):
    data = client.recv(BUFSIZE)
    print(data.decode("utf-8"))

    time.sleep(3)

    client.sendall(msg.encode("utf-8"))
    client.close()

# create a socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# bind a socket to an address
server.bind(("", PORT))

# listen connection requests on the socket
server.listen()

while True:
    # wait for a connection request
    client, addr = server.accept()

    msg = str(datetime.datetime.now())
    print(msg)
    print("Got a connection request:", addr)

    # create a thread
    p = threading.Thread(target = client_handler, args = (client, msg))
    p.start()

client.py

変更なし。

zuribozuribo

Thread Pool

Multi-thread の例では、リクエストが来てから thread を作成し、その thread 内でリクエストの処理をしていた。しかし、同時に大量のリクエストが来てしまうと、その数分だけの thread が作成されるため、メモリ消費が大量に増えるなどのデメリットもある。

そこで、あらかじめ決められた数の thread を作成して thread pool を構成し、main thread で accept() を実行せず、子 thread 内で accept() を実行することで、thread 数に上限を設けつつ並列で処理を可能にする。

動作確認

thread pool に 3 thread を用意した場合、3 秒ごとに 3 つリクエストが処理されていることがわかる。

$ python3 server.py
2023-12-31 18:52:09.204552
2023-12-31 18:52:09.204629
Got a connection request: ('127.0.0.1', 53492)
2023-12-31 18:52:09.204702
Got a connection request: ('127.0.0.1', 53508)
Hello from client 1
Hello from client 3
Got a connection request: ('127.0.0.1', 53500)
Hello from client 2
2023-12-31 18:52:12.207817
Got a connection request: ('127.0.0.1', 53514)
2023-12-31 18:52:12.207869
Hello from client 4
Got a connection request: ('127.0.0.1', 53530)
Hello from client 5
2023-12-31 18:52:12.208049
Got a connection request: ('127.0.0.1', 53538)
Hello from client 6
2023-12-31 18:52:15.210986
Got a connection request: ('127.0.0.1', 53544)
2023-12-31 18:52:15.211048
Hello from client 7
Got a connection request: ('127.0.0.1', 53546)
Hello from client 10
2023-12-31 18:52:15.211247
Got a connection request: ('127.0.0.1', 53554)
Hello from client 9
2023-12-31 18:52:18.214153
Got a connection request: ('127.0.0.1', 53558)
Hello from client 8

server.py

import time
import datetime
import socket
import threading

PORT = 50000
BUFSIZE = 4096
NUM_THREADS = 3

def worker_thread(server):
    while True:
        # wait for a connection request
        client, addr = server.accept()

        msg = str(datetime.datetime.now())
        print(msg)
        print("Got a connection request:", addr)

        # receive data from the client
        data = client.recv(BUFSIZE)
        print(data.decode("utf-8"))

        time.sleep(3)

        # send a message back to the client
        client.sendall(msg.encode("utf-8"))

        # close the client socket
        client.close()


# create a socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)

# bind the socket to an address
server.bind(("", PORT))

# listen on the socket
server.listen()

# create a thread pool
for _ in range(NUM_THREADS):
    thread = threading.Thread(target=worker_thread, args=(server,))
    thread.start()

# keep the parent alive
while True:
    time.sleep(1)

client.py

変更なし。

zuribozuribo

Non-Blocking I/O

これまでは accept() を呼び出すと、接続リクエストが来るまでは処理が止まってしまい、それ以外の処理を実行することができなかった。

socket を non-blocking mode に設定することで、accept() を呼び出しても block されないようになる。

Busy Loop

accept() を呼び出しても block されなくなった分、ループし続けて新しい接続が来て accept() が成功するのを待つ。

$ python3 server.py
2023-12-31 19:04:28.662429
Got a connection request ('127.0.0.1', 32808)
Hello from client 1
2023-12-31 19:04:33.422407
Got a connection request ('127.0.0.1', 32820)
Hello from client 2

try-except を使わないと、以下のようにすぐに "Resource temporarily unavailable" エラーが出てしまう。

$ python3 server.py
Traceback (most recent call last):
  File "server.py", line 26, in <module>
    client, addr = server.accept()
  File "/usr/lib64/python3.7/socket.py", line 212, in accept
    fd, addr = self._accept()
BlockingIOError: [Errno 11] Resource temporarily unavailable

server.py

import time
import socket
import datetime

SERVER_PORT = 50000
BUFSIZE = 4096

# create a socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)

# set the socket non-blocking
server.setblocking(False)

# bind the socket to an address
server.bind(("", SERVER_PORT))

# listen on the socket
server.listen()

while True:
    # try to accept a connection request and continue if it's not ready
    try:
        client, addr = server.accept()
    except BlockingIOError:
        continue

    msg = str(datetime.datetime.now())
    print(msg)
    print("Got a connection request", addr)

    # receive data from the client
    data = client.recv(BUFSIZE)
    print(data.decode("utf-8"))

    time.sleep(3)

    # send data back to the client
    client.sendall(msg.encode("utf-8"))

    # close the client socket
    client.close()

Event Loop

server.py

import datetime
import socket
import select

SERVER_PORT = 50000
BUFSIZE = 4096

# dictionary storing sockets waiting until ready to read
read_waiters = {}

# dictionary storing sockets waiting until ready to write
write_waiters = {}

# dictionary storing clients' info
connections = {}


def accept_handler(server):
    """Handler invoked when the server socket is ready to read"""

    # accept a connection request
    client, addr = server.accept()

    # set the client socket non-blocking mode
    client.setblocking(False)

    print(str(datetime.datetime.now()))
    print("Got a connection request:", addr)

    # add the connection to the connection dict
    connections[client.fileno()] = (client, addr)

    # add the client to the read-wait list
    read_waiters[client.fileno()] = (recv_handler, (client.fileno(), ))

    # add the server to the read-wait list to get a new connection
    read_waiters[server.fileno()] = (accept_handler, (server, ))


def recv_handler(fileno):
    """Handler invoked when the client socket is ready to read"""

    # retrieve the client
    client, addr = connections[fileno]

    # receive a message from the client
    data = client.recv(BUFSIZE)

    print(f"Recv from {addr}: {data}")

    # add the client to the write-wait list
    write_waiters[fileno] = (send_handler, (fileno, data))


def send_handler(fileno, data):
    """Handler invoked when the client socket is ready to write"""

    # retrieve the client
    client, addr = connections[fileno]

    # send the message back to the client
    client.sendall(data)

    # close the connection
    client.close()
    connections.pop(fileno)


# create a socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)

# bind the socket to an address
server.bind(("", SERVER_PORT))

# listen on the socket
server.listen()

# add the accept handler to the read-wait list
read_waiters[server.fileno()] = (accept_handler, (server, ))

while True:
    # wait for one or more of waiters getting ready
    rlist, wlist, _ = select.select(read_waiters.keys(), write_waiters.keys(), [], 10)

    # handle ready-to-reads
    for fileno in rlist:
        handler, args = read_waiters.pop(fileno)
        handler(*args)

    # handle ready-to-writes
    for fileno in wlist:
        handler, args = write_waiters.pop(fileno)
        handler(*args)