😊
迷惑メールのヘッダー情報を解析するPythonのプログラム
時々 acmailer の脆弱性が放置されたレンタルサーバーから迷惑メールが届くことがあるので、
送信元などの情報を整理するために Python で emlファイルのヘッダー情報を解析し .xslx や .csv にまとめるプログラムを作成した。
機能
- 指定ディレクトリ内のEMLファイルを解析
- メールヘッダー情報を抽出
- 結果をExcel(.xlsx)またはCSV(.csv)形式で出力
出力される項目
- Date
- Subject
- From
- To
- Reply-To
- X-Mailer
- Message-Id
- First Received
- Second Received
- Last Received
- Return-Path
WEB版
https://csecinf.pythonanywhere.com/
コマンドライン版
python this_program.py -d input_directory -o output.xlsx --format xlsx --sort asc --iso-date
プログラムコード
# 指定したディレクトリ内の eml のメールヘッダーを解析し、結果を .xslx や .csv にまとめるプログラム
# 使用方法
# python this_program.py -d input_directory -o output.xlsx --format xlsx --sort asc --iso-date
import os
import csv
import email
import argparse
from email.header import decode_header
from email.utils import parsedate_to_datetime
from email.policy import default
from datetime import datetime
import pytz
from openpyxl import Workbook
from openpyxl.styles import Alignment, Border, Side
from openpyxl.worksheet.page import PageMargins
def parse_arguments():
"""コマンドライン引数を解析します。"""
parser = argparse.ArgumentParser(description="EMLファイルから情報を抽出し、CSVまたはXLSXに出力します。")
parser.add_argument("-d", "--directory", type=str, required=True, help="解析対象のファイル群が格納されたディレクトリ。")
parser.add_argument("-o", "--output", type=str, required=True, help="出力するファイルの名前。")
parser.add_argument("--sort", type=str, choices=["asc", "desc"], help="Dateに基づいて並び替える方向。'asc'は古い順、'desc'は新しい順。")
parser.add_argument("--format", type=str, choices=["csv", "xlsx"], default="csv", help="出力フォーマット。'csv'または'xlsx'。デフォルトは'csv'。")
parser.add_argument("--iso-date", action="store_true", help="日時をISO形式で出力します。")
return parser.parse_args()
def find_eml_files(directory):
"""指定されたディレクトリ以下のすべての.emlファイルのパスを返します。"""
for root, _, files in os.walk(directory):
for file in files:
if file.endswith('.eml'):
yield os.path.join(root, file)
def decode_header_value(val):
"""ヘッダーの値をデコードして返します。"""
decoded, charset = decode_header(val)[0]
if isinstance(decoded, bytes):
try:
return decoded.decode(charset or 'utf-8')
except UnicodeDecodeError:
return decoded.decode('iso-8859-1')
return decoded
def extract_info_from_eml(file_path, iso_date=False):
"""指定された.emlファイルから必要な情報を抽出します。"""
with open(file_path, 'r', encoding='utf-8') as f:
msg = email.message_from_file(f, policy=default)
received_headers = msg.get_all('Received')
first_received = received_headers[-1] if received_headers else ""
last_received = received_headers[0] if received_headers else ""
second_received = received_headers[-2] if len(received_headers) >= 2 else "" # 2番目のReceivedを取得
date = msg.get("Date", "")
try:
date_obj = parsedate_to_datetime(date)
if iso_date:
date = date_obj.isoformat()
else:
date = date_obj.strftime("%a, %d %b %Y %H:%M:%S %z")
except Exception:
date_obj = datetime.now(pytz.utc) # 日付解析に失敗した場合、現在時刻を使用
date = date_obj.isoformat() if iso_date else date
data = {
"Date": date,
"Subject": decode_header_value(msg.get("Subject", "")),
"From": msg.get("From", ""),
"To": msg.get("To", ""),
"Reply-To": msg.get("Reply-To", ""),
"X-Mailer": msg.get("X-Mailer", ""),
"Message-Id": msg.get("Message-ID", ""),
"First Received": first_received,
"Second Received": second_received, # 2番目のReceivedをデータに追加
"Last Received": last_received,
"Return-Path": msg.get("Return-Path", ""),
"DateObj": date_obj,
}
return data
def write_to_csv(data, output_filename, sort_order=None):
"""抽出されたデータをShift_JIS形式の指定されたファイル名に出力します。"""
headers = ["Date", "Subject", "From", "To", "Reply-To", "X-Mailer", "Message-Id", "First Received", "Second Received", "Last Received", "Return-Path"]
if sort_order:
data.sort(key=lambda x: x["DateObj"], reverse=True if sort_order == "desc" else False)
with open(output_filename, 'w', newline='', encoding='shift_jis', errors='ignore') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=headers)
writer.writeheader()
for row in data:
del row["DateObj"] # 出力前にDateObjを削除
writer.writerow(row)
def write_to_xlsx(data, output_filename, sort_order=None):
"""抽出されたデータをXLSX形式の指定されたファイル名に出力します。"""
headers = ["Date", "Subject", "From", "To", "Reply-To", "X-Mailer", "Message-Id", "First Received", "Second Received", "Last Received", "Return-Path"]
if sort_order:
data.sort(key=lambda x: x["DateObj"], reverse=True if sort_order == "desc" else False)
wb = Workbook()
ws = wb.active
# 余白を「狭い」に設定
ws.page_margins = PageMargins(left=0.25, right=0.25, top=0.75, bottom=0.75, header=0.3, footer=0.3)
# 印刷の向きを「横」に設定
ws.page_setup.orientation = ws.ORIENTATION_LANDSCAPE
# 罫線のスタイル設定
thin_border = Border(left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin'))
# タイトル行を追加し、スタイルを設定
for col_num, header in enumerate(headers, start=1):
cell = ws.cell(row=1, column=col_num, value=header)
cell.border = thin_border
cell.alignment = Alignment(horizontal='center', vertical='center')
# データ行を追加し、スタイルを設定
for row_num, row_data in enumerate(data, start=2):
for col_num, key in enumerate(headers, start=1):
value = row_data.get(key, "")
if isinstance(value, str):
# 不正な文字を置換または削除
value = ''.join(c for c in value if c.isprintable())
cell = ws.cell(row=row_num, column=col_num, value=value)
cell.border = thin_border
# DateObjは実際のデータとして出力しないので、ここでの処理は不要
if key != "DateObj":
cell.alignment = Alignment(vertical='top')
# ファイルに保存
wb.save(output_filename)
def main():
args = parse_arguments()
all_data = []
for eml_file in find_eml_files(args.directory):
try:
data = extract_info_from_eml(eml_file, args.iso_date)
all_data.append(data)
print(f"{eml_file} からデータを抽出しました。")
except Exception as e:
print(f"Error processing {eml_file}: {e}")
if args.format == "csv":
write_to_csv(all_data, args.output, args.sort)
elif args.format == "xlsx":
write_to_xlsx(all_data, args.output, args.sort)
print(f"処理が完了しました。出力ファイル: {args.output}")
if __name__ == "__main__":
main()
注意事項
- 迷惑メールの送信元サーバーを管理しているレンタルサーバー事業者の通報窓口に、所定の書式で通報すると対応してもらえます。
- 高性能なフィルターを使用することで、ほとんどの迷惑メールを防ぐことができます。
Discussion