🔌

【Websockets】P2P地震速報のWebsocketとMisskey.pyを使って地震botを作ろう!

2023/06/28に公開

はじめに

ほぼ1年間記事が思いつかずに投稿してなかった人です。
今回は、QuakeWatchというMisskey/Discordの地震情報Botを作ってみたので備忘録程度に書いていきます。

動作環境

Windows11 22H2
Python 3.11.3

ライブラリ

  • folium
  • fastapi
  • discord-webhook
  • misskey.py
  • websockets
  • httpx
  • uvicorn[standard]
  • playwright

完成形

image.png

簡単なmapを生成してみる

まず、foliumをimportします。

import folium
from folium.features import CustomIcon #CustomIcon

次に、地図を生成し、保存します。

m = folium.Map(
    location=[35.6894, 139.6917], #東京都の経度緯度
    tiles="cartodbdark_matter", #テーマ設定
    zoom_start=7 #個人的に拡大率7が一番良かった
)

m.save("index.html")

生成後のhtmlは以下のようになります。
image.png

震度分布図を生成してみよう

まず、以下のライブラリをimportしてください。

from fastapi import FastAPI, status, Response
import uvicorn
import base64
import binascii
import json
import uuid
import folium
import os
import httpx
from playwright.sync_api import sync_playwright
from folium.features import CustomIcon
import io

FastAPIを読み込み(?)ます。

app = FastAPI()

getリクエストの/generate_mapを定義します。

@app.get("/generate_map")
def generate_map(map_data: str, response: Response):

uuidモジュールを利用して、ファイル名を生成して、変数「file_name」に格納します。

file_name = uuid.uuid4()

次に、foliumでmapを生成します。

   jl = json.loads(map_data) #map_dataをstringとして受け取り、デコードしたものをjsonでロードする
   m = folium.Map(
       location=[jl["earthquake"]["hypocenter"]["latitude"], jl["earthquake"]["hypocenter"]["longitude"]],
       tiles="cartodbdark_matter",
       zoom_start=7
   )

震源地に❌を表示します。❌の絵文字は、事前に保存しておいてください。

    icon = CustomIcon(
        icon_image = "images/x.png",
        icon_size = (25, 25),
        icon_anchor = (30, 30),
        popup_anchor = (3, 3)
    )
    folium.Marker(
        location = [jl["earthquake"]["hypocenter"]["latitude"], jl["earthquake"]["hypocenter"]["longitude"]],
        icon=icon
    ).add_to(m)

jl["points"]の長さをlenで取得して、rangeで長さ分ループします。
そして、httpx.getで、pointsの地名から、経度緯度を取得します。

   for i in range(len(jl["points"])):
      resp = httpx.get(url=f'https://msearch.gsi.go.jp/address-search/AddressSearch?q={jl["points"][i]["pref"]}{jl["points"][i]["addr"]}')
      rj = resp.json()

以下のように震度ごとにアイコンの画像を変更します。
そして、完了後に、folium.Markerで、マーカーを設置します。

            if jl["points"][i]["scale"] == 10:
                icon = CustomIcon(
                    icon_image = "images/震度1_bg.png",
                    icon_size = (25, 25),
                    icon_anchor = (30, 30),
                    popup_anchor = (3, 3)
                )
            if jl["points"][i]["scale"] == 20:
                icon = CustomIcon(
                    icon_image = "images/震度2_bg.png",
                    icon_size = (25, 25),
                    icon_anchor = (30, 30),
                    popup_anchor = (3, 3)
                )
            if jl["points"][i]["scale"] == 30:
                icon = CustomIcon(
                    icon_image = "images/震度3_bg.png",
                    icon_size = (25, 25),
                    icon_anchor = (30, 30),
                    popup_anchor = (3, 3)
                )
            if jl["points"][i]["scale"] == 40:
                icon = CustomIcon(
                    icon_image = "images/震度4_bg.png",
                    icon_size = (25, 25),
                    icon_anchor = (30, 30),
                    popup_anchor = (3, 3)
                )
            if jl["points"][i]["scale"] == 45:
                icon = CustomIcon(
                    icon_image = "images/震度5-_bg.png",
                    icon_size = (25, 25),
                    icon_anchor = (30, 30),
                    popup_anchor = (3, 3)
                )
            if jl["points"][i]["scale"] == 50:
                icon = CustomIcon(
                    icon_image = "images/震度5+_bg.png",
                    icon_size = (25, 25),
                    icon_anchor = (30, 30),
                    popup_anchor = (3, 3)
                )
            if jl["points"][i]["scale"] == 55:
                icon = CustomIcon(
                    icon_image = "images/震度6-_bg.png",
                    icon_size = (25, 25),
                    icon_anchor = (30, 30),
                    popup_anchor = (3, 3)
                )
            if jl["points"][i]["scale"] == 60:
                icon = CustomIcon(
                    icon_image = "images/震度6+_bg.png",
                    icon_size = (25, 25),
                    icon_anchor = (30, 30),
                    popup_anchor = (3, 3)
                )
            if jl["points"][i]["scale"] == 70:
                icon = CustomIcon(
                    icon_image = "images/震度7_bg.png",
                    icon_size = (25, 25),
                    icon_anchor = (30, 30),
                    popup_anchor = (3, 3)
                )
            folium.Marker(
                location = [rj[0]["geometry"]["coordinates"][1], rj[0]["geometry"]["coordinates"][0]],
                icon=icon
            ).add_to(m)

次に、m.saveでhtmlを保存します。

m.save(f"temp/{file_name}.html")

playwrightを読み込み、ロードが完了したら、スクリーンショットを保存します。保存後に、htmlを削除します。

        with sync_playwright() as p:
            browser = p.chromium.launch()
            page = browser.new_page()
            page.goto(f"file://{os.path.abspath('.')}/temp/{file_name}.html")
            def on_load():
                page.screenshot(path=f'temp/{file_name}.png', full_page=True)
            page.wait_for_load_state("domcontentloaded", timeout=30000)
            page.add_listener("load", on_load)
        os.remove(f"temp/{file_name}.html")

pngをbase64でエンコードして、returnでjsonを返却します。

        with open(f"temp/{file_name}.png", "rb") as image_file:
            data = base64.b64encode(image_file.read())
        os.remove(f"temp/{file_name}.png")
        return {"message": "成功", "content": data, "uuid": file_name}

そして最後に、uvicorn.runでサーバーを起動します。

if __name__ == "__main__":
    uvicorn.run(app, port=8000)

全体のコード

backend.py

backend.py
from fastapi import FastAPI, status, Response
import uvicorn
import base64
import binascii
import json
import uuid
import folium
import os
import httpx
from playwright.sync_api import sync_playwright
from folium.features import CustomIcon

description = """
QuakeWatch向けバックエンドAPI
"""

app = FastAPI(
    title="QuakeWatch Backend API",
    description=description,
)

@app.get("/generate_map")
def generate_map(map_data: str, response: Response):
        file_name = uuid.uuid4()
        jl = json.loads(map_data) #map_dataをstringとして受け取り、デコードしたものをjsonでロードする
        print([jl["earthquake"]["hypocenter"]["latitude"], jl["earthquake"]["hypocenter"]["longitude"]])
        m = folium.Map(
            location=[jl["earthquake"]["hypocenter"]["latitude"], jl["earthquake"]["hypocenter"]["longitude"]],
            tiles="cartodbdark_matter",
            zoom_start=7
        )
        icon = CustomIcon(
            icon_image = "images/x.png",
            icon_size = (25, 25),
            icon_anchor = (30, 30),
            popup_anchor = (3, 3)
        )
        folium.Marker(
            location = [jl["earthquake"]["hypocenter"]["latitude"], jl["earthquake"]["hypocenter"]["longitude"]],
            icon=icon
        ).add_to(m)
        for i in range(len(jl["points"])):
            print(i)
            resp = httpx.get(url=f'https://msearch.gsi.go.jp/address-search/AddressSearch?q={jl["points"][i]["pref"]}{jl["points"][i]["addr"]}')
            rj = resp.json()
            print(type(jl["points"][i]["scale"]))
            print(type(str(jl["points"][i]["scale"])))
            if jl["points"][i]["scale"] == 10:
                icon = CustomIcon(
                    icon_image = "images/震度1_bg.png",
                    icon_size = (25, 25),
                    icon_anchor = (30, 30),
                    popup_anchor = (3, 3)
                )
            if jl["points"][i]["scale"] == 20:
                icon = CustomIcon(
                    icon_image = "images/震度2_bg.png",
                    icon_size = (25, 25),
                    icon_anchor = (30, 30),
                    popup_anchor = (3, 3)
                )
            if jl["points"][i]["scale"] == 30:
                icon = CustomIcon(
                    icon_image = "images/震度3_bg.png",
                    icon_size = (25, 25),
                    icon_anchor = (30, 30),
                    popup_anchor = (3, 3)
                )
            if jl["points"][i]["scale"] == 40:
                icon = CustomIcon(
                    icon_image = "images/震度4_bg.png",
                    icon_size = (25, 25),
                    icon_anchor = (30, 30),
                    popup_anchor = (3, 3)
                )
            if jl["points"][i]["scale"] == 45:
                icon = CustomIcon(
                    icon_image = "images/震度5-_bg.png",
                    icon_size = (25, 25),
                    icon_anchor = (30, 30),
                    popup_anchor = (3, 3)
                )
            if jl["points"][i]["scale"] == 50:
                icon = CustomIcon(
                    icon_image = "images/震度5+_bg.png",
                    icon_size = (25, 25),
                    icon_anchor = (30, 30),
                    popup_anchor = (3, 3)
                )
            if jl["points"][i]["scale"] == 55:
                icon = CustomIcon(
                    icon_image = "images/震度6-_bg.png",
                    icon_size = (25, 25),
                    icon_anchor = (30, 30),
                    popup_anchor = (3, 3)
                )
            if jl["points"][i]["scale"] == 60:
                icon = CustomIcon(
                    icon_image = "images/震度6+_bg.png",
                    icon_size = (25, 25),
                    icon_anchor = (30, 30),
                    popup_anchor = (3, 3)
                )
            if jl["points"][i]["scale"] == 70:
                icon = CustomIcon(
                    icon_image = "images/震度7_bg.png",
                    icon_size = (25, 25),
                    icon_anchor = (30, 30),
                    popup_anchor = (3, 3)
                )
            folium.Marker(
                location = [rj[0]["geometry"]["coordinates"][1], rj[0]["geometry"]["coordinates"][0]],
                icon=icon
            ).add_to(m)

        m.save(f"temp/{file_name}.html")
        with sync_playwright() as p:
            browser = p.chromium.launch()
            page = browser.new_page()
            page.goto(f"file://{os.path.abspath('.')}/temp/{file_name}.html")
            def on_load():
                page.screenshot(path=f'temp/{file_name}.png', full_page=True)
            page.wait_for_load_state("domcontentloaded", timeout=30000)
            page.add_listener("load", on_load)
        os.remove(f"temp/{file_name}.html")
        with open(f"temp/{file_name}.png", "rb") as image_file:
            data = base64.b64encode(image_file.read())
        os.remove(f"temp/{file_name}.png")
        return {"message": "成功", "content": data, "uuid": file_name}
    
if __name__ == "__main__":
    uvicorn.run(app, port=8000)

クライアント部分

完成形

Misskey
Discord

作成

まず、importでライブラリを読み込みます。

import asyncio, json, aiohttp
from websockets import client
from websockets.exceptions import ConnectionClosed
import websockets
import io
from discord_webhook import DiscordWebhook, DiscordEmbed
import base64
import httpx as requests
from datetime import datetime
from misskey import Misskey, MiAuth
import misskey
import os
import configparser
import webbrowser
import logging

あれこれ書きます。

inifile = configparser.ConfigParser()
inifile.read('config.ini', encoding="utf-8")
gateway_url = "wss://api.p2pquake.net/v2/ws"
genmap = "http://localhost:8000/generate_map?map_data="
gateway_dummy = "ws://localhost:8765/"

class Color:
	BLACK          = '\033[30m'#(文字)黒
	RED            = '\033[31m'#(文字)赤
	GREEN          = '\033[32m'#(文字)緑
	YELLOW         = '\033[33m'#(文字)黄
	BLUE           = '\033[34m'#(文字)青
	MAGENTA        = '\033[35m'#(文字)マゼンタ
	CYAN           = '\033[36m'#(文字)シアン
	WHITE          = '\033[37m'#(文字)白
	COLOR_DEFAULT  = '\033[39m'#文字色をデフォルトに戻す
	BOLD           = '\033[1m'#太字
	UNDERLINE      = '\033[4m'#下線
	INVISIBLE      = '\033[08m'#不可視
	REVERCE        = '\033[07m'#文字色と背景色を反転
	BG_BLACK       = '\033[40m'#(背景)黒
	BG_RED         = '\033[41m'#(背景)赤
	BG_GREEN       = '\033[42m'#(背景)緑
	BG_YELLOW      = '\033[43m'#(背景)黄
	BG_BLUE        = '\033[44m'#(背景)青
	BG_MAGENTA     = '\033[45m'#(背景)マゼンタ
	BG_CYAN        = '\033[46m'#(背景)シアン
	BG_WHITE       = '\033[47m'#(背景)白
	BG_DEFAULT     = '\033[49m'#背景色をデフォルトに戻す
	RESET          = '\033[0m'#全てリセット

class ColoredFormatter(logging.Formatter):
    def format(self, record):
        # ログレベルに応じて色を設定
        if record.levelno == logging.DEBUG:
            color = '\033[37m' # White
        elif record.levelno == logging.INFO:
            color = '\033[32m' # Green
        elif record.levelno == logging.WARNING:
            color = '\033[33m' # Yellow
        elif record.levelno == logging.ERROR:
            color = '\033[31m' # Red
        elif record.levelno == logging.CRITICAL:
            color = '\033[35m' # Magenta

        # ログメッセージをカラーフォーマットで出力
        message = super().format(record)
        message = color + message + '\033[0m'
        return message

# ログの設定
logging.basicConfig(
    format="{asctime} {websocket.id} {websocket.remote_address[0]} {message}",
    level=logging.INFO,
    style="{",
)

handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
handler.setFormatter(ColoredFormatter())
logging.getLogger().addHandler(handler)

class LoggerAdapter(logging.LoggerAdapter):
    """Add connection ID and client IP address to websockets logs."""
    def process(self, msg, kwargs):
        try:
            websocket = kwargs["extra"]["websocket"]
        except KeyError:
            return msg, kwargs
        xff = websocket.request_headers.get("X-Forwarded-For")
        return f"{websocket.id} {xff} {msg}", kwargs

ConnectionClosedエラーが発生しても再接続できるようにします。

async def connect():
    print(f"{Color.GREEN}[INFO]{Color.RESET}P2P地震速報WebSocketAPIに接続しています...")
    while True:
        async for ws in client.connect(gateway_url):

接続後にprintで出力して、

            print(f"{Color.GREEN}[INFO]{Color.RESET}P2P地震速報WebSocketAPIに接続しました!")
            try:
                while True:
                    recv = await ws.recv()
                    print(f"{Color.YELLOW}[LOG]{Color.RESET}\n------------------------------------------------------\n{recv}\n------------------------------------------------------")
                    lj = json.loads(recv)

地震情報をキャッチしたら、震度情報などを変数に格納します。
mtやsindoto10codeに-1があるのは、稀に-1が出る場合があり、それによるKeyErrorが起こる可能性を減らすためです。

                    if lj["code"] == 551:
                        sindo = str(lj["earthquake"]["maxScale"])
                        magnitude = str(lj["earthquake"]["hypocenter"]["magnitude"])
                        depth = str(lj["earthquake"]["hypocenter"]["depth"])
                        name = lj["earthquake"]["hypocenter"]["name"]
                        mt = {
                            "-1": "Unknown",
                            "10": "震度1",
                            "20": "震度2",
                            "30": "震度3",
                            "40": "震度4",
                            "50": "震度5弱",
                            "55": "震度5強",
                            "60": "震度6弱",
                            "65": "震度6強",
                            "70": "震度7",
                        }
                        sindoto10code = {
                            "-1": 16777215,
                            "10": 3955330,
                            "20": 1999590,
                            "30": 7923420,
                            "40": 16777110,
                            "50": 16765440,
                            "55": 16750080,
                            "60": 15741440,
                            "65": 12451840,
                            "70": 9175080,
                        }

config.iniにdiscordのwebhookURLが書き込まれているかを確認し、書き込まれていた場合は処理を実行します。

                        dt = datetime.strptime(lj["earthquake"]["time"], '%Y/%m/%d %H:%M:%S')
                        result_str = dt.strftime('%Y年%m月%d日 %H時%M分') #時間の形式を変更する
                        async with aiohttp.ClientSession() as session:
                            async with session.get(url=genmap + recv) as resp:
                                rj = await resp.json()
                                WEBHOOK_URL = inifile.get('DISCORD', 'WEBHOOK_URL')
                                if WEBHOOK_URL == "":
                                    pass
                                else:
                                    payload2 = {
                                        "payload_json" : {
                                            "username"      :"QuakeWatch👀",
                                            "embeds": [
                                                {
                                                    "title"         : "地震発生",
                                                    "description"   : f'{result_str}頃、マグニチュード{magnitude}、最大{mt[sindo]}の地震が発生しました。深さは{depth}km、発生場所は{name}です。詳細は以下の画像をご覧ください。',
                                                    "url"           : "http://www.seis.bosai.go.jp/",
                                                    "color"         : sindoto10code[sindo],
                                                    "footer": {
                                                        "text"      : "ソース: 気象庁",
                                                    },
                                                    "thumbnail": {
                                                        "url"       : "attachment://eew2.png"
                                                    },
                                                    "image": {
                                                        "url"       : "attachment://eew.png"
                                                    }
                                                }
                                            ],
                                        }
                                    }

BytesIOでレスポンスのbase64から読み込んだ画像を添付して、埋め込みに埋め込みます。

                                    with open(f"images/{sindo}.png", 'rb') as f:
                                        eew2 = f.read()
                                    eew = io.BytesIO(base64.b64decode(rj["content"].encode('utf-8')))
                                    files  = {
                                        "logo_bg" : ( "eew2.png", eew2 ),
                                        "logo_effect" : ( "eew.png", eew ),
                                    }
                                    payload2['payload_json'] = json.dumps( payload2['payload_json'], ensure_ascii=False )
                                    async with requests.AsyncClient() as httpx:
                                        res = await httpx.post(WEBHOOK_URL, files = files  , data = payload2 )

misskeyへ投稿&画像アップロードします。

                                if not inifile.get('MISSKEY', 'TOKEN') == "":
                                    mk = Misskey(inifile.get('MISSKEY', 'SERVER'), i=inifile.get('MISSKEY', 'TOKEN'))
                                    f = io.BytesIO(base64.b64decode(rj["content"].encode('utf-8')))
                                    data = mk.drive_files_create(f, name="eew.png")
                                    new_note = mk.notes_create(text=f'{result_str}頃、マグニチュード{magnitude}、最大{mt[sindo]}の地震が発生しました。深さは{depth}km、発生場所は{name}です。詳細は以下の画像をご覧ください。', file_ids=[data["id"]])

全体コード

client.py

client.py
import io
import asyncio, json, aiohttp
from websockets import client
from websockets.exceptions import ConnectionClosed
import websockets
from discord_webhook import DiscordWebhook, DiscordEmbed
import base64
import httpx as requests
from datetime import datetime
from misskey import Misskey, MiAuth
import misskey
import os
import configparser
import webbrowser
import logging
import io

inifile = configparser.ConfigParser()
inifile.read('config.ini', encoding="utf-8")
gateway_url = "wss://api.p2pquake.net/v2/ws"
genmap = "http://localhost:8000/generate_map?map_data="
gateway_dummy = "ws://localhost:8765/"

class Color:
	BLACK          = '\033[30m'#(文字)黒
	RED            = '\033[31m'#(文字)赤
	GREEN          = '\033[32m'#(文字)緑
	YELLOW         = '\033[33m'#(文字)黄
	BLUE           = '\033[34m'#(文字)青
	MAGENTA        = '\033[35m'#(文字)マゼンタ
	CYAN           = '\033[36m'#(文字)シアン
	WHITE          = '\033[37m'#(文字)白
	COLOR_DEFAULT  = '\033[39m'#文字色をデフォルトに戻す
	BOLD           = '\033[1m'#太字
	UNDERLINE      = '\033[4m'#下線
	INVISIBLE      = '\033[08m'#不可視
	REVERCE        = '\033[07m'#文字色と背景色を反転
	BG_BLACK       = '\033[40m'#(背景)黒
	BG_RED         = '\033[41m'#(背景)赤
	BG_GREEN       = '\033[42m'#(背景)緑
	BG_YELLOW      = '\033[43m'#(背景)黄
	BG_BLUE        = '\033[44m'#(背景)青
	BG_MAGENTA     = '\033[45m'#(背景)マゼンタ
	BG_CYAN        = '\033[46m'#(背景)シアン
	BG_WHITE       = '\033[47m'#(背景)白
	BG_DEFAULT     = '\033[49m'#背景色をデフォルトに戻す
	RESET          = '\033[0m'#全てリセット

class ColoredFormatter(logging.Formatter):
    def format(self, record):
        # ログレベルに応じて色を設定
        if record.levelno == logging.DEBUG:
            color = '\033[37m' # White
        elif record.levelno == logging.INFO:
            color = '\033[32m' # Green
        elif record.levelno == logging.WARNING:
            color = '\033[33m' # Yellow
        elif record.levelno == logging.ERROR:
            color = '\033[31m' # Red
        elif record.levelno == logging.CRITICAL:
            color = '\033[35m' # Magenta

        # ログメッセージをカラーフォーマットで出力
        message = super().format(record)
        message = color + message + '\033[0m'
        return message

# ログの設定
logging.basicConfig(
    format="{asctime} {websocket.id} {websocket.remote_address[0]} {message}",
    level=logging.INFO,
    style="{",
)

handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
handler.setFormatter(ColoredFormatter())
logging.getLogger().addHandler(handler)

class LoggerAdapter(logging.LoggerAdapter):
    """Add connection ID and client IP address to websockets logs."""
    def process(self, msg, kwargs):
        try:
            websocket = kwargs["extra"]["websocket"]
        except KeyError:
            return msg, kwargs
        xff = websocket.request_headers.get("X-Forwarded-For")
        return f"{websocket.id} {xff} {msg}", kwargs

async def connect():
    print(f"{Color.GREEN}[INFO]{Color.RESET}P2P地震速報WebSocketAPIに接続しています...")
    while True:
        async for ws in client.connect(gateway_url):
            print(f"{Color.GREEN}[INFO]{Color.RESET}P2P地震速報WebSocketAPIに接続しました!")
            try:
                while True:
                    recv = await ws.recv()
                    print(f"{Color.YELLOW}[LOG]{Color.RESET}\n------------------------------------------------------\n{recv}\n------------------------------------------------------")
                    lj = json.loads(recv)
                    if lj["code"] == 551:
                        sindo = str(lj["earthquake"]["maxScale"])
                        magnitude = str(lj["earthquake"]["hypocenter"]["magnitude"])
                        depth = str(lj["earthquake"]["hypocenter"]["depth"])
                        name = lj["earthquake"]["hypocenter"]["name"]
                        mt = {
                            "-1": "Unknown",
                            "10": "震度1",
                            "20": "震度2",
                            "30": "震度3",
                            "40": "震度4",
                            "50": "震度5弱",
                            "55": "震度5強",
                            "60": "震度6弱",
                            "65": "震度6強",
                            "70": "震度7",
                        }
                        sindoto10code = {
                            "-1": 16777215,
                            "10": 3955330,
                            "20": 1999590,
                            "30": 7923420,
                            "40": 16777110,
                            "50": 16765440,
                            "55": 16750080,
                            "60": 15741440,
                            "65": 12451840,
                            "70": 9175080,
                        }
                        dt = datetime.strptime(lj["earthquake"]["time"], '%Y/%m/%d %H:%M:%S')
                        result_str = dt.strftime('%Y年%m月%d日 %H時%M分')
                        async with aiohttp.ClientSession() as session:
                            async with session.get(url=genmap + recv) as resp:
                                rj = await resp.json()
                                WEBHOOK_URL = inifile.get('DISCORD', 'WEBHOOK_URL')
                                if WEBHOOK_URL == "":
                                    pass
                                else:
                                    payload2 = {
                                        "payload_json" : {
                                            "username"      :"QuakeWatch👀",
                                            "embeds": [
                                                {
                                                    "title"         : "地震発生",
                                                    "description"   : f'{result_str}頃、マグニチュード{magnitude}、最大{mt[sindo]}の地震が発生しました。深さは{depth}km、発生場所は{name}です。詳細は以下の画像をご覧ください。',
                                                    "url"           : "http://www.seis.bosai.go.jp/",
                                                    "color"         : sindoto10code[sindo],
                                                    "footer": {
                                                        "text"      : "ソース: 気象庁",
                                                    },
                                                    "thumbnail": {
                                                        "url"       : "attachment://eew2.png"
                                                    },
                                                    "image": {
                                                        "url"       : "attachment://eew.png"
                                                    }
                                                }
                                            ],
                                        }
                                    }
                                    with open(f"images/{sindo}.png", 'rb') as f:
                                        eew2 = f.read()
                                    with open(f'temp\\{rj["uuid"]}.png', 'rb') as f:
                                        eew = f.read()
                                    files_qiita  = {
                                        "logo_bg" : ( "eew2.png", eew2 ),
                                        "logo_effect" : ( "eew.png", eew ),
                                    }
                                    payload2['payload_json'] = json.dumps( payload2['payload_json'], ensure_ascii=False )
                                    async with requests.AsyncClient() as httpx:
                                        res = await httpx.post(WEBHOOK_URL, files = files_qiita  , data = payload2 )
                                if not inifile.get('MISSKEY', 'TOKEN') == "":
                                    mk = Misskey(inifile.get('MISSKEY', 'SERVER'), i=inifile.get('MISSKEY', 'TOKEN'))
                                    f = io.BytesIO(base64.b64decode(rj["content"].encode('utf-8')))
                                    data = mk.drive_files_create(f, name="eew.png")
                                    new_note = mk.notes_create(text=f'{result_str}頃、マグニチュード{magnitude}、最大{mt[sindo]}の地震が発生しました。深さは{depth}km、発生場所は{name}です。詳細は以下の画像をご覧ください。', file_ids=[data["id"]])
                                    os.remove(f'temp\\{rj["uuid"]}.png')
                    elif lj["code"] == 556:
                        mt = {
                            "-1": "Unknown",
                            "10": "震度1",
                            "20": "震度2",
                            "30": "震度3",
                            "40": "震度4",
                            "50": "震度5弱",
                            "55": "震度5強",
                            "60": "震度6弱",
                            "65": "震度6強",
                            "70": "震度7",
                        }
                        sindoto10code = {
                            "-1": 16777215,
                            "10": 3955330,
                            "20": 1999590,
                            "30": 7923420,
                            "40": 16777110,
                            "50": 16765440,
                            "55": 16750080,
                            "60": 15741440,
                            "65": 12451840,
                            "70": 9175080,
                        }
                        dt = datetime.strptime(lj["issue"]["time"], '%Y/%m/%d %H:%M:%S')
                        result_str = dt.strftime('%Y年%m月%d日 %H時%M分')
                        mk = Misskey(inifile.get('MISSKEY', 'SERVER'), i=inifile.get('MISSKEY', 'TOKEN'))
                        mk.notes_create(text=f'【緊急地震速報(警報)】{result_str}頃、マグニチュード{lj["earthquake"]["hypocenter"]["magnitude"]}の地震が発生しました。震源の深さは約{lj["earthquake"]["hypocenter"]["depth"]}km、発生場所は{lj["earthquake"]["hypocenter"]["name"]}です。この情報には誤差を含む場合があります。', file_ids=[data["id"]])
                    if not lj["code"] == 555:
                        if not lj["code"] == 9611:
                            payload2 = {
                                "payload_json" : {
                                    "username" :"QuakeWatch👀 Logger",
                                    "embeds": [
                                        {
                                            "title" : f"Code: {lj['code']}",
                                            "description" : json.dumps(lj, ensure_ascii=False, indent=4),
                                        }
                                    ],
                                }
                            }
            except ConnectionClosed:
                print(f"{Color.GREEN}[INFO]{Color.RESET}接続が終了しました。再接続しています...")
                continue

if __name__ == "__main__":
    asyncio.run(connect())

config.ini

[MISSKEY] #Misskeyへの送信を利用する際に参照するセクションです。
TOKEN = misskeyのtoken
SERVER = misskeyのサーバー(misskey.ioなど)

[DISCORD] #DiscordのWebhookへの送信を利用する際に参照するセクションです。
WEBHOOK_URL = discordのwebhookURL
# WEBHOOK_URLの末尾に?wait=trueをつけてください。

あとがき

この記事は夜遅くに書いたのを少し修正(コードの一部)しただけなので、おかしなところがあるかもしれません。

参考

https://qiita.com/ABBBB/items/e6bdf7fc94b8f6f72a01

https://zenn.dev/sion_pn/articles/85c22b9cdad159

https://misskeypy.readthedocs.io/ja/latest/upload-file.html

ソースコード

一部は異なります。

https://github.com/sonyakun/QuakeWatch

Discussion