📖
【真似厳禁】SSHをDiscordのBOTで実装してみた
コード
import discord
from discord.ext import commands
import asyncio
import os
from collections import deque
import time
from dotenv import load_dotenv
# 設定
load_dotenv()
TOKEN = os.getenv('DISCORD_BOT_TOKEN')
intents = discord.Intents.default()
intents.message_content = True # メッセージ内容の取得を有効化
ALLOWED_CHANNEL_ID = channelID # コマンド実行専用のチャンネルID
ALLOWED_USER_IDS = [userID] # コマンド実行できるユーザーのID
MAX_OUTPUT_LENGTH = 1900 # Discordの文字数制限を考慮
LOG_BUFFER_SIZE = 10 # ログバッファのサイズ
RATE_LIMIT_SECONDS = 2 # レート制限
class CommandBot(commands.Bot):
def __init__(self):
super().__init__(command_prefix='!', intents=intents)
self.log_buffer = deque(maxlen=LOG_BUFFER_SIZE)
self.last_command_time = {}
self.user_cwd = {} # カレントディレクトリを継承
self.user_process = {} # ユーザーごとの実行中プロセス
async def on_ready(self):
print(f'{self.user} has connected to Discord!')
bot = CommandBot()
@bot.command(name='exec')
async def execute_command(ctx, *, command):
# チャンネルとユーザーの検証
if ctx.channel.id != ALLOWED_CHANNEL_ID:
return
if ctx.author.id not in ALLOWED_USER_IDS:
await ctx.send("権限がありません。")
return
user_id = ctx.author.id
current_time = time.time()
if user_id in bot.last_command_time:
if current_time - bot.last_command_time[user_id] < RATE_LIMIT_SECONDS:
await ctx.send("コマンドの実行間隔が短すぎます。")
return
bot.last_command_time[user_id] = current_time
# 既にプロセスが動いている場合
if user_id in bot.user_process:
await ctx.send("既にコマンドが実行中です。`!input` で入力できます。")
return
# カレントディレクトリ取得
cwd = bot.user_cwd.get(user_id, os.getcwd())
# cdコマンドの特別処理
if command.strip().startswith('cd '):
path = command.strip()[3:].strip()
new_path = os.path.abspath(os.path.join(cwd, path))
if os.path.isdir(new_path):
bot.user_cwd[user_id] = new_path
await ctx.send(f"ディレクトリを変更しました: {new_path}")
else:
await ctx.send("そのディレクトリは存在しません。")
return
# 危険なコマンドのフィルタリング
dangerous_commands = ['rm -rf', 'format', 'del /f', 'shutdown']
if any(danger in command.lower() for danger in dangerous_commands):
await ctx.send("危険なコマンドは実行できません。")
return
try:
# コマンド実行
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
cwd=cwd
)
bot.user_process[user_id] = process
await ctx.send(f"コマンド実行中: `{command}`\n`!input` で入力できます。")
# 出力を非同期で読み取ってDiscordに送信
async def read_output():
buffer = b""
while True:
chunk = await process.stdout.read(1024)
if not chunk:
break
buffer += chunk
# 改行で分割して送信
while b"\n" in buffer:
line, buffer = buffer.split(b"\n", 1)
decoded_line = line.decode('utf-8', errors='ignore').rstrip()
if decoded_line:
bot.log_buffer.append(decoded_line)
await send_output(ctx, [decoded_line])
# 残りのバッファも送信
if buffer:
decoded_line = buffer.decode('utf-8', errors='ignore').rstrip()
if decoded_line:
bot.log_buffer.append(decoded_line)
await send_output(ctx, [decoded_line])
# エラー出力も処理
stderr = await process.stderr.read()
if stderr:
error_msg = stderr.decode('utf-8', errors='ignore')
await ctx.send(f"エラー:\n```\n{error_msg[:MAX_OUTPUT_LENGTH]}\n```")
# プロセス終了時に削除
bot.user_process.pop(user_id, None)
await ctx.send("コマンド実行完了")
bot.loop.create_task(read_output())
except Exception as e:
bot.user_process.pop(user_id, None)
await ctx.send(f"エラーが発生しました: {str(e)}")
# !inputでユーザー入力も受け付けられるゾ
@bot.command(name='input')
async def input_command(ctx, *, user_input):
if ctx.channel.id != ALLOWED_CHANNEL_ID or ctx.author.id not in ALLOWED_USER_IDS:
return
user_id = ctx.author.id
process = bot.user_process.get(user_id)
if not process:
await ctx.send("実行中のコマンドがありません。")
return
try:
process.stdin.write((user_input + '\n').encode('utf-8'))
await process.stdin.drain()
await ctx.send(f"入力を送信しました: {user_input}")
except Exception as e:
await ctx.send(f"入力送信時にエラー: {str(e)}")
# ログが多すぎたらMAX_OUTPUT_LENGTHに基づいて省略
async def send_output(ctx, lines):
output = '\n'.join(lines)
if len(output) > MAX_OUTPUT_LENGTH:
output = output[:MAX_OUTPUT_LENGTH] + '\n... (省略)'
await ctx.send(f"```\n{output}\n```")
# エラー起きたら停止
@bot.event
async def on_command_error(ctx, error):
if isinstance(error, commands.CommandNotFound):
return
await ctx.send(f"エラー: {str(error)}")
# ログ表示コマンド
# !logで表示
@bot.command(name='logs')
async def show_logs(ctx):
if ctx.channel.id != ALLOWED_CHANNEL_ID or ctx.author.id not in ALLOWED_USER_IDS:
return
if bot.log_buffer:
logs = '\n'.join(bot.log_buffer)
await ctx.send(f"最近のログ:\n```\n{logs[:MAX_OUTPUT_LENGTH]}\n```")
else:
await ctx.send("ログがありません。")
# 強制終了コマンド
# !stopで停止
@bot.command(name='stop')
async def stop_command(ctx):
if ctx.channel.id != ALLOWED_CHANNEL_ID or ctx.author.id not in ALLOWED_USER_IDS:
return
user_id = ctx.author.id
process = bot.user_process.get(user_id)
if not process:
await ctx.send("停止できる実行中のコマンドがありません。")
return
try:
process.kill()
await ctx.send("コマンドを停止しました。")
except Exception as e:
await ctx.send(f"停止時にエラー: {str(e)}")
finally:
bot.user_process.pop(user_id, None)
if __name__ == '__main__':
bot.run(TOKEN)
Discussion
試みとしは面白い発想ですが、絶対に使わないでくださいと言えるくらい危険です。
たとえ個人のみのサーバーで利用するにしてもそれ以上のリスクがあります。
入力内容、標準出力、標準エラー出力はdiscord側に保存される
当然ながらメッセージはDiscordサーバーを経由するためメセージとして保管されます。
保管はさすがに暗号化されていると思いますが、その暗号化キーもDiscord側が持ってるので復号可能です。
さすがにDiscord社自体がそうするとは考えにくいですが、攻撃を受けて暗号化されたデータや鍵が流出しないとも限りません。
SSHの場合はSSHクライアントとSSHサーバーの間の経路では全て暗号化されてます。
E2E暗号化とも呼ばれます。
ここ1年以内でDiscordメッセージの内容がE2E暗号化される話があったような気もしますが詳しくは知りません。
標準出力からの転送が多いとDiscordメッセージも長くなるため、Discord側から不審に思われるリスクもあるかもしれません。
コマンドエスケープが不適切(botコード上の脆弱性)
rm -rfなどを危険コマンドとしてフィルタリングしていますが、コマンドとオプション間のスペースが2個に増えるとシェル構文としては有効なままフィルタリングを回避することになります。。スペースで分割したり、ダブルクォーテーションで括られていたらただの文字列として扱う、しかしエスケープ、eval、環境変数、パイプなどを使ってコマンド文字列を組み上げて実行などでコマンド実行できるため、文字列を見るだけでは対処不可能です。
これらはコマンドインジェクションや、任意コード実行という脆弱性にも分類されます。
ユーザーの入力内容でコマンド実行するというのはそれだけリスクがあります。
Discordアカウント乗っ取りリスク
乗っ取りに関してある程度の認識はあるみたいですが、実際乗っ取られた場合は相手にDiscordだけでなくBot稼働サーバーでも文字通りやりたい放題されてしまいます。
そのサーバーが大量のスパムDMやメール、ウイルスつき添付ファイル送信、DDoS攻撃サーバーの1つとして使われるなどで、サーバーの持ち主はあなた、つまりあなたが加害者と認定される恐れがあります。
以上のことから極めて危険であるので、絶対に真似しないよう注意書きをしておいたほうが良いです。
なるほど、そこまでの考えが及んでいませんでした
そのように注意書きをしておきます!