socket をざっくりキャッチアップ
Socket とは?
- Socket API は、アプリケーションがネットワーク上でデータを送受信するための仕組みを抽象化した API (Application Programming Interface) である。
- socket とは、データの送受信を行うために使うエンドポイントのことである。
- socket の種類
- Stream sockets: トランスポート層のプロトコルに TCP (Transmission Control Protocol)、SCTP (Stream Control Transmission Protocol)、DCCP (Datagram Congestion Control Protocol) などを使った connection-oriented socket のこと。
- Datagram sockets: トランスポート層のプロトコルに UDP (User Datagram Protocol) を使った connectionless socket のこと。
- Unix Domain Sockets (UDS): ローカルファイルを使って、同一ホスト上のプロセス間のコミュニケーションを実現するもの。
Socket API
- socket を使ってアプリケーション間で通信を行うための API (Application Programming Interface) のこと。
- Socket API を使ったプログラムの開発のことを、socket programming / network programming と呼ぶ。
Berkeley sockets
- Berkeley sockets standard では、Unix の哲学の「everything is a file」に則って、socket は file descriptor で表現される。
- Berkeley sockets standard は、多少の変更を加えて、de-facto standard から POSIX 仕様になった。よって、POSIX sockets は基本的に Berkeley sockets と同義であり、BSD sockets とも呼ばれる。
Socket API functions
socket()
通信のエンドポイントとなる新しい socket を作成し、その socket に対応する file descriptor を返す。
#include <sys/socket.h>
int socket(int domain, int type, int protocol);
以下の3つの引数を取る。
-
domain
: socket の protocol family を指定する。-
AF_INET
: IPv4 Internet protocol -
AF_INET6
: IPv6 Internet protocol -
AF_UNIX
: ローカル接続
-
-
type
: socket の種類SOCK_STREAM
SOCK_DGRAM
-
protocol
: 使用する transport protocol を指定する。IPPROTO_TCP
IPPROTO_SCTP
IPPROTO_UDP
IPPROTO_DCCP
bind()
基本的にサーバー側で使用され、socket を socket address (例えば local IP address と port 番号) に紐付ける。
#include <sys/socket.h>
int bind(int sockfd, const strut sockaddr *addr, socklen_t addrlen);
以下の3つの引数を取る。
-
sockfd
: socket の file descriptor -
addr
: socket address を表現するsockaddr
構造体へのポインタ -
addrlen
:sockaddr
構造体のサイズを格納するsocklen_t
型のデータへのポインタ
listen()
サーバー側で使用され、socket を listening state に変え、入ってくる接続に備えて準備する。SOCK_STREAM
などの stream-oriented data mode のみで必要である。
#include <sys/socket.h>
int listen(int sockfd, int backlog);
以下の2つの引数を取る。
-
sockfd
: 接続待ちに使う socket の file descriptor -
backlog
: 一度に queue に入れられる pending な接続の最大数
connect()
クライアント側で使用され、接続を行うのに使用される。
#include <sys/socket.h>
int connect(int sockfd, const strut sockaddr *addr, socklen_t addrlen);
以下の2つの引数を取る。
-
sockfd
: 接続元として使用する socket の file descriptor -
addr
: 接続先の socket address を示すsockaddr
構造体へのポインタ -
addrlen
:sockaddr
構造体のサイズを格納するsocklen_t
型のデータへのポインタ
accept()
サーバー側で使用され、クライアントから受け取った新しい TCP 接続確立の要求を受け入れ、その接続用の新しい socket を作成する。各接続ごとに socket が作成され、その socket への file descriptor が返される。
#include <sys/socket.h>
int accept(int sockfd, struct sockaddr *_Nullable restrict addr, socklen_t *_Nullable restrict addrlen);
以下の3つの引数を取る。
-
sockfd
: 接続要求を queue に溜めておくための listening socket の file descriptor -
addr
: クライアントの情報を受け取るためのsockaddr
構造体へのポインタ -
addrlen
: クライアントのsockaddr
構造体のサイズ情報を受け取るためのsocklen_t
型のデータへのポインタ
send()
, sendto()
, sendmsg()
, recv()
, recvfrom()
, recvmsg()
データの送受信で使われる。
#include <sys/socket.h>
ssize_t send(int sockfd, const void buf, ssize_t len, int flags);
ssize_t sendto(int sockfd, const void buf, ssize_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen);
ssize_t sendmsg(int sockfd, const struct msghdr *msd, int flags);
#include <sys/socket.h>
ssize_t recv(int sockfd, void buf, size_t len, int flags);
ssize_t recvfrom(int sockfd, void buf, size_t len, int flags, struct sockaddr *_Nullable restrict src_addr, socklen_t *_Nullable restrict addrlen);
ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
close()
socket に紐づいたリソースを解放する。
#include <unistd.h>
int close(int fd);
getaddrinfo()
, freeaddrinfo()
ホスト名、ホストアドレスを解決する。
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
int getaddrinfo(const char *restrict node, const char *restrict service, const struct addrinfo *restrict hists, struct addrinfo **restrict res);
void freeaddrinfo(struct addrinfo *res);
select()
プロセスを中断して、渡された socket のリストの1つ以上が読み書き可能な状態になるのを待つ。
#include <sys/select.h>
int select(int nfds, fd_set *_Nullable restrict readfds, fd_set *_Nullable restrict writefds, fd_set *_Nullable restrict exceptfds, struct timeval *_Nullable restrict timeout);
poll()
渡された socket のリストの状態が変わっていないか確認する。
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
getsockopt()
, setsockopt()
特定の socket の特定の socket option の値を取得・設定する。
#include <sys/socket.h>
int getsockopt(int sockfd, int level, int optname, void optval, socklen_t *option);
int setsockopt(int sockfd, int level, int optname, const void optval, socklen_t optlen);
Socket Programming in Python
TCP
動作確認
先に server.py を実行する。
$ python3 server.py
次に別の端末から client.py を実行して、メッセージ「hello world」を打つ。
$ python3 client.py
Input a message: hello world
server 側で接続リクエストが来たことが表示され、「hello world」のメッセージが受け取れていることがわかります。
$ python3 server.py
2023-12-30 21:45:06.088524
Got a connection request: ('127.0.0.1', 51141)
hello world
client 側では、server がメッセージ受け取った日時が、メッセージとして返ってきているのが確認できます。
$ python3 client.py
Input a message: hello world
Got a message: 2023-12-30 21:45:06.088524
server.py
import socket
import datetime
PORT = 50000
BUFSIZE = 4096
# create a socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# bind a socket to an address
server.bind(("", PORT))
# listen connection requests on the socket
server.listen()
while True:
# wait for a connection request
client, addr = server.accept()
msg = str(datetime.datetime.now())
print(msg)
print("Got a connection request:", addr)
# receive data from the client
data = client.recv(BUFSIZE)
print(data.decode("utf-8"))
# send a message back to the client
client.sendall(msg.encode("utf-8"))
# close the connection
client.close()
client.py
import socket
HOST = "localhost"
PORT = 50000
BUFSIZE = 4096
# create a socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# connect to a server
client.connect((HOST, PORT))
# send a message to the server
msg = input("Input a message: ")
client.sendall(msg.encode("utf-8"))
# receive a message from the server
data = client.recv(BUFSIZE)
print("Got a message:", data.decode("utf-8"))
# close the connection
client.close()
UDP
動作確認
まず server.py
を実行する。
$ python3 server.py
次に別の端末から client.py
を実行する。
$ python3 client.py
server 側では、client から送られてきたメッセージが表示される。
$ python3 server.py
2023-12-30 22:03:14.716057
Got a message: ('127.0.0.1', 53113)
b'hello world'
client 側では、server がメッセージを受け取った日時が、メッセージとして返ってきている。
$ python3 client.py
2023-12-30 22:03:14.716057
server.py
import socket
import datetime
PORT = 50000
BUFSIZE = 4096
# create a socket
server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# bind the socket to an socket address
server.bind(("", PORT))
while True:
# wait for receiving a message from a client
data, client = server.recvfrom(BUFSIZE)
msg = str(datetime.datetime.now())
print(msg)
print("Got a message:", client)
print(data)
# send a message back to the client
server.sendto(msg.encode("utf-8"), client)
client.py
import socket
HOST = "localhost"
PORT = 50000
BUFSIZE = 4096
# create a socket
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# send a message to a server
msg = "hello world"
client.sendto(msg.encode("utf-8"), (HOST, PORT))
# receive a message from the server
data = client.recv(BUFSIZE)
print(data.decode("utf-8"))
# close the socket
client.close()
Socket Programming in Rust
TCP Echo
Server
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
fn handle_client(mut stream: TcpStream) -> std::io::Result<()> {
let mut buf = [0; 4096];
// receive data from the client.
stream.read(&mut buf)?;
println!("{}", std::str::from_utf8(&buf).unwrap());
// send the same data back to the client.
stream.write(&buf)?;
Ok(())
}
fn main() -> std::io::Result<()> {
// create a socket, bind it to an address and listen on it.
let listener = TcpListener::bind("127.0.0.1:50000")?;
// accept connections and process them serially.
for stream in listener.incoming() {
handle_client(stream?)?;
}
Ok(())
}
Client
use std::io::{Read, Write};
use std::net::TcpStream;
fn main() -> std::io::Result<()> {
// create a socket and connect to a server.
let mut stream = TcpStream::connect("127.0.0.1:50000")?;
// send a message to the server.
let msg = "hello world";
stream.write(msg.as_bytes())?;
// receive a message from the server.
let mut buf = [0u8; 4096];
stream.read(&mut buf)?;
println!("{}", std::str::from_utf8(&buf).unwrap());
Ok(())
}
Socket Programming in C
TCP Echo
動作確認
$ ./server
Got a request from 127.0.0.1:11418
hello world
$ ./client
hello world
server.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#define SERVER_PORT 50000
#define MAX_PENDING_CONN 5
#define BUFFER_SIZE 4096
int main(int argc, char *argv[])
{
int server_sock, client_sock, ret;
struct sockaddr_in server_addr, client_addr;
unsigned int client_addr_len;
char buffer[BUFFER_SIZE];
int msg_len;
// create a socket
server_sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (server_sock < 0) {
perror("socket() failed");
exit(1);
}
// construct a local socket address
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = SERVER_PORT;
// bind the socket to the server address
ret = bind(server_sock, (struct sockaddr *)&server_addr,
sizeof(server_addr));
if (ret < 0) {
perror("bind() failed");
exit(1);
}
// listen on the socket
ret = listen(server_sock, MAX_PENDING_CONN);
if (ret < 0) {
perror("listen() failed");
exit(1);
}
for (;;) {
// wait for a connection request from a client
client_sock = accept(server_sock,
(struct sockaddr *)&client_addr,
&client_addr_len);
if (client_sock < 0) {
perror("accept() failed");
exit(1);
}
// print the client information
printf("Got a request from %s:%d\n",
inet_ntoa(client_addr.sin_addr), client_addr.sin_port);
do {
// receive a message from the client
msg_len = recv(client_sock, buffer, BUFFER_SIZE - 1, 0);
if (msg_len < 0) {
perror("recv() failed");
exit(1);
}
buffer[msg_len] = '\0';
printf(buffer);
// send the same message back to the client
ret = send(client_sock, buffer, msg_len, 0);
if (ret != msg_len) {
perror("send() failed");
exit(1);
}
} while (msg_len > 0);
printf("\n");
// close the client socket
close(client_sock);
}
return 0;
}
client.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#define SERVER_ADDRESS "127.0.0.1"
#define SERVER_PORT 50000
#define BUFFER_SIZE 4096
#define MSG "hello world"
int main(int argc, char *argv[])
{
int ret, client_sock, total;
struct sockaddr_in server_addr;
char buffer[BUFFER_SIZE];
unsigned int msg_len;
// create a socket
client_sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (client_sock < 0) {
perror("socket() failed");
exit(1);
}
// construct a server socket address
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr(SERVER_ADDRESS);
server_addr.sin_port = SERVER_PORT;
// connect to the server
ret = connect(client_sock, (struct sockaddr *)&server_addr,
sizeof(server_addr));
if (ret < 0) {
perror("connect() failed");
exit(1);
}
// send a message to the server
msg_len = strlen(MSG);
ret = send(client_sock, MSG, msg_len, 0);
if (ret != msg_len) {
perror("send()");
exit(1);
}
// receive a message from the server
total = 0;
while (total < msg_len) {
ret = recv(client_sock, buffer, BUFFER_SIZE - 1, 0);
if (ret <= 0) {
perror("recv() failed or connection closed");
exit(1);
}
total += ret;
buffer[ret] = '\0';
printf(buffer);
}
printf("\n");
// close the socket
close(client_sock);
return 0;
}
UDP Echo
動作確認
$ ./server
Got a request from 127.0.0.1:1439
hello world
Got a request from 127.0.0.1:2531
hello world
Got a request from 127.0.0.1:34967
hello world
$ ./client
hello world
$ ./client
hello world
$ ./client
hello world
server.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#define SERVER_PORT 50000
#define BUFFER_SIZE 4096
int main(int argc, char *argv[])
{
int ret, server_sock;
struct sockaddr_in server_addr, client_addr;
unsigned int client_addr_size, msg_len;
char buffer[BUFFER_SIZE];
// create a socket
server_sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (server_sock < 0) {
perror("socket() failed");
exit(1);
}
// construct a server socket address
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(SERVER_PORT);
// bind the socket to the server sock address
ret = bind(server_sock, (struct sockaddr *)&server_addr,
sizeof(server_addr));
if (ret < 0) {
perror("bind() failed");
exit(1);
}
// handle requests
for (;;) {
client_addr_size = sizeof(client_addr);
// receive a message from a client
msg_len = recvfrom(server_sock, buffer, BUFFER_SIZE - 1, 0,
(struct sockaddr *)&client_addr,
&client_addr_size);
if (msg_len < 0) {
perror("recvfrom() failed");
exit(1);
}
printf("Got a request from %s:%d\n",
inet_ntoa(client_addr.sin_addr), client_addr.sin_port);
buffer[msg_len] = '\0';
printf("%s\n", buffer);
// response back to the client
ret = sendto(server_sock, buffer, msg_len, 0,
(struct sockaddr *)&client_addr,
sizeof(client_addr));
if (ret != msg_len) {
perror("sendto() failed");
exit(1);
}
}
}
client.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#define SERVER_ADDRESS "127.0.0.1"
#define SERVER_PORT 50000
#define BUFFER_SIZE 4096
#define MSG "hello world"
int main(int argc, char *argv[])
{
int ret, client_sock;
struct sockaddr_in server_addr, recv_addr;
char buffer[BUFFER_SIZE];
unsigned int msg_len, recv_addr_size;
// create a socket
client_sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (client_sock < 0) {
perror("socekt() failed");
exit(1);
}
// construct a server socket address
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr(SERVER_ADDRESS);
server_addr.sin_port = htons(SERVER_PORT);
// send a message to the server
msg_len = strlen(MSG);
ret = sendto(client_sock, MSG, msg_len, 0,
(struct sockaddr *)&server_addr, sizeof(server_addr));
if (ret != msg_len) {
perror("sendto() failed");
exit(1);
}
// receive a message from the server
ret = recvfrom(client_sock, buffer, BUFFER_SIZE - 1, 0,
(struct sockaddr *)&recv_addr, &recv_addr_size);
if (ret != msg_len) {
perror("recvfrom() failed");
exit(1);
}
if (server_addr.sin_addr.s_addr != recv_addr.sin_addr.s_addr) {
fprintf(stderr, "Received a packet from an unknown source.\n");
exit(1);
}
buffer[ret] = '\0';
printf("%s\n", buffer);
// close the socket
close(client_sock);
return 0;
}
socat
(SOcket CAT)
- socat(1): Multipurpose relay - Linux man page
- 2つの双方向通信バイトストリームを確立し、それらの間でデータを転送するのに使えるコマンド。
ライフサイクル
- Init phase: CLI オプションを parse して、logging を初期化する。
- Open phase: 1つ目に渡された address を開けた後に、2つ目に渡された address を開ける。
- Transfer phase: 両方の stream の読み書きを
select()
を通して、データの受け渡しを行う。 - Closing phase: 一方の stream が EOF に到達した場合に、もう一方の stream に EOF を転送する。
Get Started
まずターミナルから以下を実行する。
$ socat tcp-listen:70000 stdout
70000 番 port で listen する TCP socket が作成され、その socket と標準出力 (stdout) が繋がれる。
次に、別のターミナルを開き、以下を実行する。
$ socat tcp-connect:localhost:70000 stdin
localhost の 70000 番 port に connect する TCP socket が作成され、その socket と標準入力 (stdin) が繋がれる。
上記の設定により、2つ目のターミナルの標準入力に入力した内容が、1つ目のターミナルの標準出力に出力されるようになる。
$ socat tcp-connect:localhost:70000 stdin
test
$ socat tcp-listen:70000 stdout
test
どちらか一方のターミナルでの Ctrl+C によって、接続が終了する。
Single-thread vs. Multi-thread
Single-thread
server 側が、同時にきた複数のリクエストを並列に処理できず、1つのリクエストを処理できてから、その次のリクエストを処理する。
1つ前の「Socket programming in Python」のサンプルを以下のように変更。
-
server.py
: リクエストへレスポンスする前に 3 秒間スリープする。 -
client.py
: 引数を1つ取り、送るメッセージにそれを含める。
動作確認
3秒に1つのリクエストしか処理できていないのがわかる。
server 側
$ python3 server.py
2023-12-30 22:16:36.365614
Got a connection request: ('127.0.0.1', 53213)
Hello from client 1
2023-12-30 22:16:39.371383
Got a connection request: ('127.0.0.1', 53214)
Hello from client 2
2023-12-30 22:16:42.373630
Got a connection request: ('127.0.0.1', 53215)
Hello from client 4
2023-12-30 22:16:45.379482
Got a connection request: ('127.0.0.1', 53216)
Hello from client 3
2023-12-30 22:16:48.381774
Got a connection request: ('127.0.0.1', 53217)
Hello from client 7
2023-12-30 22:16:51.386791
Got a connection request: ('127.0.0.1', 53218)
Hello from client 5
2023-12-30 22:16:54.390117
Got a connection request: ('127.0.0.1', 53219)
Hello from client 6
2023-12-30 22:16:57.392960
Got a connection request: ('127.0.0.1', 53220)
Hello from client 8
2023-12-30 22:17:00.395567
Got a connection request: ('127.0.0.1', 53221)
Hello from client 10
2023-12-30 22:17:03.399871
Got a connection request: ('127.0.0.1', 53222)
Hello from client 9
client 側
$ for i in {1..10}; do python3 client.py $i & done
[2] 41908
[3] 41909
[4] 41910
[5] 41911
[6] 41912
[7] 41913
[8] 41914
[9] 41915
[10] 41916
[11] 41917
server.py
import time
import socket
import datetime
PORT = 50000
BUFSIZE = 4096
# create a socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# bind a socket to an address
server.bind(("", PORT))
# listen connection requests on the socket
server.listen()
while True:
# wait for a connection request
client, addr = server.accept()
msg = str(datetime.datetime.now())
print(msg)
print("Got a connection request:", addr)
# receive data from the client
data = client.recv(BUFSIZE)
print(data.decode("utf-8"))
time.sleep(3)
# send a message back to the client
client.sendall(msg.encode("utf-8"))
# close the connection
client.close()
client.py
import sys
import socket
HOST = "localhost"
PORT = 50000
BUFSIZE = 4096
# create a socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# connect to a server
client.connect((HOST, PORT))
# send a message to the server
msg = f"Hello from client {sys.argv[1]}"
client.sendall(msg.encode("utf-8"))
# receive a message from the server
data = client.recv(BUFSIZE)
print("Got a message:", data.decode("utf-8"))
# close the connection
client.close()
Multi-threading
server 側が、並列して I/O を処理することができる。
以下のように変更。
-
server.py
を、リクエストを受け取ったら新しいスレッドを立ち上げ、そのスレッド内でレスポンスを返す。
動作確認
3秒待たずにどんどんリクエストを処理できているのがわかる。
server 側
$ python3 server.py
2023-12-30 22:20:44.702687
Got a connection request: ('127.0.0.1', 53440)
Hello from client 3
2023-12-30 22:20:44.706570
Got a connection request: ('127.0.0.1', 53441)
Hello from client 1
2023-12-30 22:20:44.714539
Got a connection request: ('127.0.0.1', 53442)
Hello from client 4
2023-12-30 22:20:44.717746
Got a connection request: ('127.0.0.1', 53443)
Hello from client 5
2023-12-30 22:20:44.738372
Got a connection request: ('127.0.0.1', 53444)
Hello from client 7
2023-12-30 22:20:44.740037
Got a connection request: ('127.0.0.1', 53445)
Hello from client 8
2023-12-30 22:20:44.745009
Got a connection request: ('127.0.0.1', 53446)
Hello from client 10
2023-12-30 22:20:44.751614
Got a connection request: ('127.0.0.1', 53447)
Hello from client 9
2023-12-30 22:20:44.753527
Got a connection request: ('127.0.0.1', 53448)
Hello from client 2
2023-12-30 22:20:44.753891
Got a connection request: ('127.0.0.1', 53449)
Hello from client 6
client 側
$ for i in {1..10}; do python3 client.py $i & done
[2] 42580
[3] 42581
[4] 42582
[5] 42583
[6] 42584
[7] 42585
[8] 42586
[9] 42587
[10] 42588
[11] 42589
server.py
import time
import socket
import datetime
import threading
PORT = 50000
BUFSIZE = 4096
# handler to be passed to a thread
def client_handler(client, msg):
data = client.recv(BUFSIZE)
print(data.decode("utf-8"))
time.sleep(3)
client.sendall(msg.encode("utf-8"))
client.close()
# create a socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# bind a socket to an address
server.bind(("", PORT))
# listen connection requests on the socket
server.listen()
while True:
# wait for a connection request
client, addr = server.accept()
msg = str(datetime.datetime.now())
print(msg)
print("Got a connection request:", addr)
# create a thread
p = threading.Thread(target = client_handler, args = (client, msg))
p.start()
client.py
変更なし。
Thread Pool
Multi-thread の例では、リクエストが来てから thread を作成し、その thread 内でリクエストの処理をしていた。しかし、同時に大量のリクエストが来てしまうと、その数分だけの thread が作成されるため、メモリ消費が大量に増えるなどのデメリットもある。
そこで、あらかじめ決められた数の thread を作成して thread pool を構成し、main thread で accept()
を実行せず、子 thread 内で accept()
を実行することで、thread 数に上限を設けつつ並列で処理を可能にする。
動作確認
thread pool に 3 thread を用意した場合、3 秒ごとに 3 つリクエストが処理されていることがわかる。
$ python3 server.py
2023-12-31 18:52:09.204552
2023-12-31 18:52:09.204629
Got a connection request: ('127.0.0.1', 53492)
2023-12-31 18:52:09.204702
Got a connection request: ('127.0.0.1', 53508)
Hello from client 1
Hello from client 3
Got a connection request: ('127.0.0.1', 53500)
Hello from client 2
2023-12-31 18:52:12.207817
Got a connection request: ('127.0.0.1', 53514)
2023-12-31 18:52:12.207869
Hello from client 4
Got a connection request: ('127.0.0.1', 53530)
Hello from client 5
2023-12-31 18:52:12.208049
Got a connection request: ('127.0.0.1', 53538)
Hello from client 6
2023-12-31 18:52:15.210986
Got a connection request: ('127.0.0.1', 53544)
2023-12-31 18:52:15.211048
Hello from client 7
Got a connection request: ('127.0.0.1', 53546)
Hello from client 10
2023-12-31 18:52:15.211247
Got a connection request: ('127.0.0.1', 53554)
Hello from client 9
2023-12-31 18:52:18.214153
Got a connection request: ('127.0.0.1', 53558)
Hello from client 8
server.py
import time
import datetime
import socket
import threading
PORT = 50000
BUFSIZE = 4096
NUM_THREADS = 3
def worker_thread(server):
while True:
# wait for a connection request
client, addr = server.accept()
msg = str(datetime.datetime.now())
print(msg)
print("Got a connection request:", addr)
# receive data from the client
data = client.recv(BUFSIZE)
print(data.decode("utf-8"))
time.sleep(3)
# send a message back to the client
client.sendall(msg.encode("utf-8"))
# close the client socket
client.close()
# create a socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
# bind the socket to an address
server.bind(("", PORT))
# listen on the socket
server.listen()
# create a thread pool
for _ in range(NUM_THREADS):
thread = threading.Thread(target=worker_thread, args=(server,))
thread.start()
# keep the parent alive
while True:
time.sleep(1)
client.py
変更なし。
Non-Blocking I/O
これまでは accept()
を呼び出すと、接続リクエストが来るまでは処理が止まってしまい、それ以外の処理を実行することができなかった。
socket を non-blocking mode に設定することで、accept()
を呼び出しても block されないようになる。
Busy Loop
accept()
を呼び出しても block されなくなった分、ループし続けて新しい接続が来て accept()
が成功するのを待つ。
$ python3 server.py
2023-12-31 19:04:28.662429
Got a connection request ('127.0.0.1', 32808)
Hello from client 1
2023-12-31 19:04:33.422407
Got a connection request ('127.0.0.1', 32820)
Hello from client 2
try-except を使わないと、以下のようにすぐに "Resource temporarily unavailable" エラーが出てしまう。
$ python3 server.py
Traceback (most recent call last):
File "server.py", line 26, in <module>
client, addr = server.accept()
File "/usr/lib64/python3.7/socket.py", line 212, in accept
fd, addr = self._accept()
BlockingIOError: [Errno 11] Resource temporarily unavailable
server.py
import time
import socket
import datetime
SERVER_PORT = 50000
BUFSIZE = 4096
# create a socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
# set the socket non-blocking
server.setblocking(False)
# bind the socket to an address
server.bind(("", SERVER_PORT))
# listen on the socket
server.listen()
while True:
# try to accept a connection request and continue if it's not ready
try:
client, addr = server.accept()
except BlockingIOError:
continue
msg = str(datetime.datetime.now())
print(msg)
print("Got a connection request", addr)
# receive data from the client
data = client.recv(BUFSIZE)
print(data.decode("utf-8"))
time.sleep(3)
# send data back to the client
client.sendall(msg.encode("utf-8"))
# close the client socket
client.close()
Event Loop
server.py
import datetime
import socket
import select
SERVER_PORT = 50000
BUFSIZE = 4096
# dictionary storing sockets waiting until ready to read
read_waiters = {}
# dictionary storing sockets waiting until ready to write
write_waiters = {}
# dictionary storing clients' info
connections = {}
def accept_handler(server):
"""Handler invoked when the server socket is ready to read"""
# accept a connection request
client, addr = server.accept()
# set the client socket non-blocking mode
client.setblocking(False)
print(str(datetime.datetime.now()))
print("Got a connection request:", addr)
# add the connection to the connection dict
connections[client.fileno()] = (client, addr)
# add the client to the read-wait list
read_waiters[client.fileno()] = (recv_handler, (client.fileno(), ))
# add the server to the read-wait list to get a new connection
read_waiters[server.fileno()] = (accept_handler, (server, ))
def recv_handler(fileno):
"""Handler invoked when the client socket is ready to read"""
# retrieve the client
client, addr = connections[fileno]
# receive a message from the client
data = client.recv(BUFSIZE)
print(f"Recv from {addr}: {data}")
# add the client to the write-wait list
write_waiters[fileno] = (send_handler, (fileno, data))
def send_handler(fileno, data):
"""Handler invoked when the client socket is ready to write"""
# retrieve the client
client, addr = connections[fileno]
# send the message back to the client
client.sendall(data)
# close the connection
client.close()
connections.pop(fileno)
# create a socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
# bind the socket to an address
server.bind(("", SERVER_PORT))
# listen on the socket
server.listen()
# add the accept handler to the read-wait list
read_waiters[server.fileno()] = (accept_handler, (server, ))
while True:
# wait for one or more of waiters getting ready
rlist, wlist, _ = select.select(read_waiters.keys(), write_waiters.keys(), [], 10)
# handle ready-to-reads
for fileno in rlist:
handler, args = read_waiters.pop(fileno)
handler(*args)
# handle ready-to-writes
for fileno in wlist:
handler, args = write_waiters.pop(fileno)
handler(*args)