😊

迷惑メールのヘッダー情報を解析するPythonのプログラム

2024/07/07に公開

時々 acmailer の脆弱性が放置されたレンタルサーバーから迷惑メールが届くことがあるので、
送信元などの情報を整理するために Python で emlファイルのヘッダー情報を解析し .xslx や .csv にまとめるプログラムを作成した。

機能

  • 指定ディレクトリ内のEMLファイルを解析
  • メールヘッダー情報を抽出
  • 結果をExcel(.xlsx)またはCSV(.csv)形式で出力

出力される項目

  1. Date
  2. Subject
  3. From
  4. To
  5. Reply-To
  6. X-Mailer
  7. Message-Id
  8. First Received
  9. Second Received
  10. Last Received
  11. Return-Path

WEB版

https://csecinf.pythonanywhere.com/

コマンドライン版

python this_program.py -d input_directory -o output.xlsx --format xlsx --sort asc --iso-date

プログラムコード

# 指定したディレクトリ内の eml のメールヘッダーを解析し、結果を .xslx や .csv にまとめるプログラム
# 使用方法
# python this_program.py -d input_directory -o output.xlsx --format xlsx --sort asc --iso-date                                                 

import os
import csv
import email
import argparse
from email.header import decode_header
from email.utils import parsedate_to_datetime
from email.policy import default
from datetime import datetime
import pytz
from openpyxl import Workbook
from openpyxl.styles import Alignment, Border, Side
from openpyxl.worksheet.page import PageMargins

def parse_arguments():
    """コマンドライン引数を解析します。"""
    parser = argparse.ArgumentParser(description="EMLファイルから情報を抽出し、CSVまたはXLSXに出力します。")
    parser.add_argument("-d", "--directory", type=str, required=True, help="解析対象のファイル群が格納されたディレクトリ。")
    parser.add_argument("-o", "--output", type=str, required=True, help="出力するファイルの名前。")
    parser.add_argument("--sort", type=str, choices=["asc", "desc"], help="Dateに基づいて並び替える方向。'asc'は古い順、'desc'は新しい順。")
    parser.add_argument("--format", type=str, choices=["csv", "xlsx"], default="csv", help="出力フォーマット。'csv'または'xlsx'。デフォルトは'csv'。")
    parser.add_argument("--iso-date", action="store_true", help="日時をISO形式で出力します。")
    return parser.parse_args()

def find_eml_files(directory):
    """指定されたディレクトリ以下のすべての.emlファイルのパスを返します。"""
    for root, _, files in os.walk(directory):
        for file in files:
            if file.endswith('.eml'):
                yield os.path.join(root, file)

def decode_header_value(val):
    """ヘッダーの値をデコードして返します。"""
    decoded, charset = decode_header(val)[0]
    if isinstance(decoded, bytes):
        try:
            return decoded.decode(charset or 'utf-8')
        except UnicodeDecodeError:
            return decoded.decode('iso-8859-1')
    return decoded

def extract_info_from_eml(file_path, iso_date=False):
    """指定された.emlファイルから必要な情報を抽出します。"""
    with open(file_path, 'r', encoding='utf-8') as f:
        msg = email.message_from_file(f, policy=default)
        received_headers = msg.get_all('Received')
        first_received = received_headers[-1] if received_headers else ""
        last_received = received_headers[0] if received_headers else ""
        second_received = received_headers[-2] if len(received_headers) >= 2 else ""  # 2番目のReceivedを取得
        date = msg.get("Date", "")
        try:
            date_obj = parsedate_to_datetime(date)
            if iso_date:
                date = date_obj.isoformat()
            else:
                date = date_obj.strftime("%a, %d %b %Y %H:%M:%S %z")
        except Exception:
            date_obj = datetime.now(pytz.utc)  # 日付解析に失敗した場合、現在時刻を使用
            date = date_obj.isoformat() if iso_date else date
        data = {
            "Date": date,
            "Subject": decode_header_value(msg.get("Subject", "")),
            "From": msg.get("From", ""),
            "To": msg.get("To", ""),
            "Reply-To": msg.get("Reply-To", ""),
            "X-Mailer": msg.get("X-Mailer", ""),
            "Message-Id": msg.get("Message-ID", ""),
            "First Received": first_received,
            "Second Received": second_received,  # 2番目のReceivedをデータに追加
            "Last Received": last_received,
            "Return-Path": msg.get("Return-Path", ""),
            "DateObj": date_obj,
        }
        return data

def write_to_csv(data, output_filename, sort_order=None):
    """抽出されたデータをShift_JIS形式の指定されたファイル名に出力します。"""
    headers = ["Date", "Subject", "From", "To", "Reply-To", "X-Mailer", "Message-Id", "First Received", "Second Received", "Last Received", "Return-Path"]
    if sort_order:
        data.sort(key=lambda x: x["DateObj"], reverse=True if sort_order == "desc" else False)
    with open(output_filename, 'w', newline='', encoding='shift_jis', errors='ignore') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=headers)
        writer.writeheader()
        for row in data:
            del row["DateObj"] # 出力前にDateObjを削除
            writer.writerow(row)

def write_to_xlsx(data, output_filename, sort_order=None):
    """抽出されたデータをXLSX形式の指定されたファイル名に出力します。"""
    headers = ["Date", "Subject", "From", "To", "Reply-To", "X-Mailer", "Message-Id", "First Received", "Second Received", "Last Received", "Return-Path"]
    if sort_order:
        data.sort(key=lambda x: x["DateObj"], reverse=True if sort_order == "desc" else False)
    
    wb = Workbook()
    ws = wb.active
    
    # 余白を「狭い」に設定
    ws.page_margins = PageMargins(left=0.25, right=0.25, top=0.75, bottom=0.75, header=0.3, footer=0.3)
    
    # 印刷の向きを「横」に設定
    ws.page_setup.orientation = ws.ORIENTATION_LANDSCAPE

    # 罫線のスタイル設定
    thin_border = Border(left=Side(style='thin'), 
                         right=Side(style='thin'), 
                         top=Side(style='thin'), 
                         bottom=Side(style='thin'))
    
    # タイトル行を追加し、スタイルを設定
    for col_num, header in enumerate(headers, start=1):
        cell = ws.cell(row=1, column=col_num, value=header)
        cell.border = thin_border
        cell.alignment = Alignment(horizontal='center', vertical='center')
    
    # データ行を追加し、スタイルを設定
    for row_num, row_data in enumerate(data, start=2):
        for col_num, key in enumerate(headers, start=1):
            value = row_data.get(key, "")
            if isinstance(value, str):
                # 不正な文字を置換または削除
                value = ''.join(c for c in value if c.isprintable())
            cell = ws.cell(row=row_num, column=col_num, value=value)
            cell.border = thin_border
            # DateObjは実際のデータとして出力しないので、ここでの処理は不要
            if key != "DateObj":
                cell.alignment = Alignment(vertical='top')
    
    # ファイルに保存
    wb.save(output_filename)

def main():
    args = parse_arguments()
    all_data = []
    for eml_file in find_eml_files(args.directory):
        try:
            data = extract_info_from_eml(eml_file, args.iso_date)
            all_data.append(data)
            print(f"{eml_file} からデータを抽出しました。")
        except Exception as e:
            print(f"Error processing {eml_file}: {e}")
    if args.format == "csv":
        write_to_csv(all_data, args.output, args.sort)
    elif args.format == "xlsx":
        write_to_xlsx(all_data, args.output, args.sort)
    print(f"処理が完了しました。出力ファイル: {args.output}")

if __name__ == "__main__":
    main()

注意事項

  • 迷惑メールの送信元サーバーを管理しているレンタルサーバー事業者の通報窓口に、所定の書式で通報すると対応してもらえます。
  • 高性能なフィルターを使用することで、ほとんどの迷惑メールを防ぐことができます。

Discussion