Closed3
PythonでWebSocketのサンプル
これに向けた素振りとして。
WebSocket、HTTP上で双方向にやりとりできる、程度の理解しかなかったので、以下をまずはサラッと見てみた。
Pythonだとwebsocketsライブラリでシンプルに使えるらしい。
ChatGPTに書いてもらった。aioconsoleは非同期で標準入力を受けるために必要。
$ pip install websockets aioconsole
サーバ側
import asyncio
import websockets
from datetime import datetime
# 認証トークンの定義
TOKEN = "mysecrettoken"
# 認証付きハンドラー
async def handler(websocket, path):
# 初めにクライアントからトークンを受け取る
try:
token = await websocket.recv()
if token != TOKEN:
print(f"Invalid token from {websocket.remote_address}")
await websocket.close() # 認証失敗時は接続を切断
return
print(f"Client {websocket.remote_address} authenticated successfully.")
except websockets.ConnectionClosed:
print(f"Connection closed during authentication from {websocket.remote_address}.")
return
# クライアントに、5秒ごとに時刻を送信する
async def send_time_messages():
try:
while True:
await asyncio.sleep(5)
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = f"Server Time: {current_time}"
await websocket.send(message)
print(f"Sent: {message}")
except websockets.ConnectionClosed:
print(f"Connection with {websocket.remote_address} closed.")
# クライアントからのメッセージを受信する
async def receive_messages():
try:
async for message in websocket:
print(f"Received from client: {message}")
except websockets.ConnectionClosed:
print(f"Connection with {websocket.remote_address} closed.")
# メッセージ送信と受信を並行して実行
try:
await asyncio.gather(send_time_messages(), receive_messages())
except websockets.ConnectionClosed:
print(f"Client {websocket.remote_address} disconnected.")
# WebSocketサーバー起動
async def main():
async with websockets.serve(handler, "localhost", 8765):
print("WebSocket server started on ws://localhost:8765")
await asyncio.Future() # サーバーを永久に動作させる
if __name__ == "__main__":
asyncio.run(main())
クライアント側
import asyncio
import websockets
import aioconsole
# 認証トークンの定義
TOKEN="mysecrettoken"
async def client():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
# 認証トークンを送信
await websocket.send(TOKEN)
async def send_user_input():
print("Enter a message to send to the server!")
while True:
user_input = await aioconsole.ainput()
await websocket.send(user_input)
print(f"Sent: {user_input}")
async def receive_messages():
while True:
message = await websocket.recv()
print(f"Received from server: {message}")
await asyncio.gather(send_user_input(), receive_messages())
if __name__ == "__main__":
asyncio.run(client())
サーバ、クライアント、それぞれを別ターミナルで起動。
$ python server.py
$ python client.py
ChatGPT,いい感じに書いてくれたんだけど、公式のサンプルも実際に試してみるつもり。
Fletを使ったGUIクライアントをChatGPTとやり取りしながら書いた。サーバは同じでよい。
gui_client.py
import asyncio
import websockets
import flet as ft
from threading import Thread
# 認証トークンの定義
TOKEN = "mysecrettoken"
# WebSocketサーバーのURI
URI = "ws://localhost:8765"
# WebSocketクライアント
class WebSocketClient:
def __init__(self, uri, update_ui_callback):
self.uri = uri
self.websocket = None
self.update_ui_callback = update_ui_callback
self.connected = False
def connect(self):
self.loop = asyncio.new_event_loop()
Thread(target=self._start_loop, daemon=True).start()
def _start_loop(self):
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self._connect())
async def _connect(self):
try:
self.websocket = await websockets.connect(self.uri)
await self.websocket.send(TOKEN)
self.connected = True
self.update_ui_callback("Connected to server.")
await self.receive_messages()
except Exception as e:
self.update_ui_callback(f"Connection failed: {str(e)}")
async def receive_messages(self):
try:
while True:
message = await self.websocket.recv()
self.update_ui_callback(f"Server: {message}")
except websockets.ConnectionClosed:
self.update_ui_callback("Connection closed.")
self.connected = False
def send_message(self, message):
if self.connected:
asyncio.run_coroutine_threadsafe(self._send_message(message), self.loop)
async def _send_message(self, message):
if self.websocket is not None and self.connected:
await self.websocket.send(message)
self.update_ui_callback(f"Client: {message}")
# Flet UI
def main(page: ft.Page):
page.title = "WebSocket Client Interface"
page.vertical_alignment = ft.MainAxisAlignment.CENTER
messages_log = ft.TextField(value="", width=400, height=300, read_only=True, multiline=True)
message_input = ft.TextField(label="Your Message", width=400)
client = WebSocketClient(URI, lambda msg: append_message(msg, messages_log, page))
def append_message(msg, messages_log, page):
messages_log.value += msg + "\n"
page.update()
def connect_click(e):
client.connect()
def send_message_click(e):
if message_input.value:
client.send_message(message_input.value)
message_input.value = ""
page.update()
connect_button = ft.ElevatedButton(text="Connect to Server", on_click=connect_click)
send_button = ft.ElevatedButton(text="Send Message", on_click=send_message_click)
page.add(
connect_button,
message_input,
send_button,
messages_log,
)
if __name__ == "__main__":
ft.app(target=main)
このスクラップは2ヶ月前にクローズされました