Closed3

PythonでWebSocketのサンプル

kun432kun432

これに向けた素振りとして。

https://openai.com/index/introducing-the-realtime-api/

WebSocket、HTTP上で双方向にやりとりできる、程度の理解しかなかったので、以下をまずはサラッと見てみた。

https://zenn.dev/nameless_sn/articles/websocket_tutorial

https://learn.microsoft.com/ja-jp/archive/msdn-magazine/2012/december/windows-8-networking-windows-8-and-the-websocket-protocol

Pythonだとwebsocketsライブラリでシンプルに使えるらしい。

https://websockets.readthedocs.io/en/stable/

ChatGPTに書いてもらった。aioconsoleは非同期で標準入力を受けるために必要。

$ pip install websockets aioconsole

サーバ側

import asyncio
import websockets
from datetime import datetime

# 認証トークンの定義
TOKEN = "mysecrettoken"

# 認証付きハンドラー
async def handler(websocket, path):
    # 初めにクライアントからトークンを受け取る
    try:
        token = await websocket.recv()
        if token != TOKEN:
            print(f"Invalid token from {websocket.remote_address}")
            await websocket.close()  # 認証失敗時は接続を切断
            return
        print(f"Client {websocket.remote_address} authenticated successfully.")
    except websockets.ConnectionClosed:
        print(f"Connection closed during authentication from {websocket.remote_address}.")
        return

    # クライアントに、5秒ごとに時刻を送信する
    async def send_time_messages():
        try:
            while True:
                await asyncio.sleep(5)
                current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                message = f"Server Time: {current_time}"
                await websocket.send(message)
                print(f"Sent: {message}")
        except websockets.ConnectionClosed:
            print(f"Connection with {websocket.remote_address} closed.")

    # クライアントからのメッセージを受信する
    async def receive_messages():
        try:
            async for message in websocket:
                print(f"Received from client: {message}")
        except websockets.ConnectionClosed:
            print(f"Connection with {websocket.remote_address} closed.")

    # メッセージ送信と受信を並行して実行
    try:
        await asyncio.gather(send_time_messages(), receive_messages())
    except websockets.ConnectionClosed:
        print(f"Client {websocket.remote_address} disconnected.")

# WebSocketサーバー起動
async def main():
    async with websockets.serve(handler, "localhost", 8765):
        print("WebSocket server started on ws://localhost:8765")
        await asyncio.Future()  # サーバーを永久に動作させる

if __name__ == "__main__":
    asyncio.run(main())

クライアント側

import asyncio
import websockets
import aioconsole

# 認証トークンの定義
TOKEN="mysecrettoken"

async def client():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        # 認証トークンを送信
        await websocket.send(TOKEN)

        async def send_user_input():
            print("Enter a message to send to the server!")
            while True:
                user_input = await aioconsole.ainput()
                await websocket.send(user_input)
                print(f"Sent: {user_input}")

        async def receive_messages():
            while True:
                message = await websocket.recv()
                print(f"Received from server: {message}")

        await asyncio.gather(send_user_input(), receive_messages())

if __name__ == "__main__":
    asyncio.run(client())

サーバ、クライアント、それぞれを別ターミナルで起動。

$ python server.py
$ python client.py

kun432kun432

ChatGPT,いい感じに書いてくれたんだけど、公式のサンプルも実際に試してみるつもり。

kun432kun432

Fletを使ったGUIクライアントをChatGPTとやり取りしながら書いた。サーバは同じでよい。

gui_client.py
import asyncio
import websockets
import flet as ft
from threading import Thread

# 認証トークンの定義
TOKEN = "mysecrettoken"

# WebSocketサーバーのURI
URI = "ws://localhost:8765"

# WebSocketクライアント
class WebSocketClient:
    def __init__(self, uri, update_ui_callback):
        self.uri = uri
        self.websocket = None
        self.update_ui_callback = update_ui_callback
        self.connected = False

    def connect(self):
        self.loop = asyncio.new_event_loop()
        Thread(target=self._start_loop, daemon=True).start()

    def _start_loop(self):
        asyncio.set_event_loop(self.loop)
        self.loop.run_until_complete(self._connect())

    async def _connect(self):
        try:
            self.websocket = await websockets.connect(self.uri)
            await self.websocket.send(TOKEN)
            self.connected = True
            self.update_ui_callback("Connected to server.")
            await self.receive_messages()
        except Exception as e:
            self.update_ui_callback(f"Connection failed: {str(e)}")

    async def receive_messages(self):
        try:
            while True:
                message = await self.websocket.recv()
                self.update_ui_callback(f"Server: {message}")
        except websockets.ConnectionClosed:
            self.update_ui_callback("Connection closed.")
            self.connected = False

    def send_message(self, message):
        if self.connected:
            asyncio.run_coroutine_threadsafe(self._send_message(message), self.loop)

    async def _send_message(self, message):
        if self.websocket is not None and self.connected:
            await self.websocket.send(message)
            self.update_ui_callback(f"Client: {message}")

# Flet UI
def main(page: ft.Page):
    page.title = "WebSocket Client Interface"
    page.vertical_alignment = ft.MainAxisAlignment.CENTER

    messages_log = ft.TextField(value="", width=400, height=300, read_only=True, multiline=True)
    message_input = ft.TextField(label="Your Message", width=400)

    client = WebSocketClient(URI, lambda msg: append_message(msg, messages_log, page))

    def append_message(msg, messages_log, page):
        messages_log.value += msg + "\n"
        page.update()

    def connect_click(e):
        client.connect()

    def send_message_click(e):
        if message_input.value:
            client.send_message(message_input.value)
            message_input.value = ""
            page.update()

    connect_button = ft.ElevatedButton(text="Connect to Server", on_click=connect_click)
    send_button = ft.ElevatedButton(text="Send Message", on_click=send_message_click)

    page.add(
        connect_button,
        message_input,
        send_button,
        messages_log,
    )

if __name__ == "__main__":
    ft.app(target=main)

このスクラップは2ヶ月前にクローズされました