Open1

Pythonでメールの添付ファイルを保存

m10k1m10k1
from __future__ import annotations

import imaplib
import logging
import os
import re
import unicodedata
from datetime import datetime 
from email import message_from_bytes
from email.header import decode_header
from pathlib import Path
from typing import Optional

import yaml

DATE_FMT = "%d-%b-%Y" #IMAPの日付フォーマット


logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
)

def iso_to_imap(date_str: Optional[str]) -> Optional[str]:
    """ISO 8601形式の日付をIMAP形式に変換"""
    if not date_str:
        return None
    
    try:
        dt = datetime.fromisoformat(date_str)
        return dt.strftime(DATE_FMT)
    except ValueError as e:
        logging.error(f"Invalid date format: {date_str} - {e}")
        return None

def build_search_criteria(sender: str, since: str, before: str) -> str:
    """検索条件を構築"""
    criteria: list[str] = [f'FROM "{sender}"']

    if since:
        criteria.append(f'SINCE {since}')
    if before:
        criteria.append(f'BEFORE {before}')
    
    return ' '.join(criteria)

def decode_filename(filename: str | bytes | None) -> Optional[str]:
    if not filename:
        return None
    
    decoded_parts = decode_header(filename)

    decoded_str = ''
    for part, encoding in decoded_parts:
        if isinstance(part, bytes):
            decoded_str += part.decode(encoding or 'utf-8', errors='replace')
        else:
            decoded_str += part
    return decoded_str

def sanitize_filename(filename):
    # Unicode正規化(NFKC)で機種依存文字を標準化
    filename = unicodedata.normalize('NFKC', filename)
    # ファイル名に使えない文字をアンダースコアに置換
    filename = re.sub(r'[\\/:*?"<>|\r\n\t]', '_', filename)
    # 制御文字や不可視文字を除去
    filename = ''.join(c for c in filename if c.isprintable())
    # 長すぎるファイル名をカット(例: 255文字まで)
    return filename[:255]

def save_attachment(part, dest_dir: Path) -> None:
    """単一パートをファイルとして保存する。存在する場合は連番を付与。"""
    filename = decode_filename(part.get_filename())
    if not filename:
        return
    filename = sanitize_filename(filename)
    path = dest_dir / filename

    # 衝突回避
    counter = 1
    while path.exists():
        path = dest_dir / f"{path.stem}_{counter}{path.suffix}"
        counter += 1

    with path.open("wb") as fp:
        fp.write(part.get_payload(decode=True))
    logging.info("Saved: %s", path)


def main():
    with open('config.yaml', 'r') as f:
        config = yaml.safe_load(f)

    imap_host = config['mail']['host']
    imap_user = config['mail']['user']
    imap_pass = config['mail']['password']

    attachiments_path = config['attachments']['save_path']
    os.makedirs(attachiments_path, exist_ok=True)

    sender = config['search']['sender']
    since = iso_to_imap(config['search'].get('since'))
    before = iso_to_imap(config['search'].get('before'))

    criteria = build_search_criteria(sender, since, before)

    imap = imaplib.IMAP4_SSL(imap_host)
    try:
        imap.login(imap_user, imap_pass)
        imap.select('INBOX')

        # type: 成功/失敗のステータス
        # data: 通常は要素が1つだけのリスト。シーケンス番号がスペース区切りのバイト列
        status, data = imap.search(None, criteria)  

        if status != 'OK':
            logging.error("Search error: %s", data)
            return
        
        for num in data[0].split():
            # fetch メソッドは、指定されたシーケンス番号のメールを取得します。
            # type : 成功/失敗のステータス
            # msg_data: メールの内容を含むリスト。各要素はタプル
            # (RF8822) はメール全体を取得するための引数
            # (BODY[HEADER]) はヘッダのみを取得するための引数
            # (BODY[TEXT]) は本文のみを取得するための引数
            # (FLAGS) はフラグ情報を取得するための引数
            status, msg_data = imap.fetch(num, '(RFC822)')

            if status != 'OK':
                logging.error("Fetch error for %s: %s", num, msg_data)
                continue


            # msg_data[0][1] はメールの内容を含むバイト列
            # email.message_from_bytes はバイト列から EmailMessage オブジェクトを生成します
            msg = message_from_bytes(msg_data[0][1])


            # メールは複数のバートから構成されている
            # walk() メソッドは、メールの各パートを再帰的に走査します
            # 各パートは EmailMessage オブジェクトとして表されます
            for part in msg.walk():

                # メールのパートがマルチパートである場合はスキップ
                if part.get_content_maintype() == 'multipart':
                    continue
                # 添付ファイルのパートを識別するために Content-Disposition ヘッダをチェック
                if part.get('Content-Disposition') is None:
                    continue

                # 添付ファイルのパートを保存
                save_attachment(part, Path(attachiments_path))

    finally:
        imap.logout()



if __name__ == '__main__':
    main()