🕌

PythonのimaplibでUIDを取得する方法

2022/05/19に公開約7,300字

IMAP4メール受信のエラーリカバリーをしたい

弊社では私が作ったPythonライブラリとAPIを用いてIMAP4サーバーからメールを受信し、自動で処理しています。APIはAWS Lambda上に配置し、自動受信スクリプトは5分間隔で動かしていますが、ときどきエラーが発生します。大抵の場合は一過性のエラーなので5分後には解消していますが、問題がひとつありました。それはエラー発生により受信に失敗したメールが次のAPI呼び出しで取得されず、取りこぼしが発生することです。

IMAP4では基本的にサーバー上にメールを残します。一度受信したメールを再度取り込まないように、普通はUNSEENという検索オプションを指定してメールを受信します。受信したメールは既読状態となってUNSEENの対象外となるので、常に未読メッセージのみが受信されるわけです。ところがAPIがエラーになると、IMAP4サーバー上のメッセージは既読状態なのに、クライアントアプリには取り込まれていない状況になる場合があります。

そうなったら、クライアントアプリの状態とサーバーの状態を比較して取りこぼされたメッセージを特定し、未読状態に戻してやれば次の自動処理で取り込まれます。サーバー上のメッセージはWebメールで確認や操作ができます。今まではエラーが発生するたびに手動で作業していましたが、さすがに面倒になってきたのでエラーリカバリー処理を自動化することにしました。

IMAP4の検索オプション

エラーリカバリーをするにはIMAP4の検索オプションを理解する必要があります。IMAP4の検索オプションの説明はこちらのページがわかりやすいです。これらの検索オプションのうち、エラーリカバリーに使えそうなのは、メッセージ番号指定とUID指定です。どちらも番号で、さらに範囲指定が可能なので、最後のメッセージの番号がわかればそれより大きい番号で範囲指定すれば取りこぼした既読メッセージを再取得できます。ただし、メッセージ番号指定には少し問題があります。

メッセージ番号はサーバー上のメッセージに連番で割り当てられる相対的な値です。メッセージ番号は状況によって変化します。あるメールのメッセージ番号が100だったとします。サーバー上の古いメールを10通削除したら、同じメッセージのメッセージ番号は90になります。したがって、エラーリカバリーにメッセージ番号を用いるのは危険です。最後に受信したメールのメッセージ番号がわかっていたとしても、次のメッセージ番号はそれより大きい保証はないからです。[1]

一方UIDは一度割り当てられたら変化しません。UIDも基本的には連番なので、新しいメッセージのUIDは古いメッセージのUIDよりも必ず大きくなります。ということで、エラーリカバリーにはUIDを用いることにしました。

UIDはどこ?

以下のソースコードはIMAP4受信ライブラリの一部です。

    def fetch_mail(self, search_option='UNSEEN', timezone='Asia/Tokyo'):
        '''
        メールを受信し、内容と添付ファイルの情報を辞書形式で返す。
        '''

        result = []

        # メールサーバーに接続
        cli = imaplib.IMAP4_SSL(self.host_name)

        try:
            # 認証
            cli.login(self.user_id, self.password)

            # メールボックスを選択(標準はINBOX)
            cli.select()

            # 指定されたオプションを用いてメッセージを検索
            status, data = cli.search(None, search_option)

            # 受信エラーの場合は空の結果を返して終了
            if status == 'NO':
                return result

            # メールの解析
            for num in data[0].split():
                status, data = cli.fetch(num, '(RFC822)')
                msg = BytesParser(policy=policy.default).parsebytes(data[0][1])
                msg_id = msg.get('Message-Id', failobj='')
                from_ = msg.get('From', failobj='')
                to_ = msg.get('To', failobj='')
                cc_ = msg.get('Cc', failobj='')
                subject = msg.get('Subject', failobj='')
                date_str = msg.get('Date', failobj='')
                date_time = parsedate_to_datetime(date_str)
                if date_time:
                    # タイムゾーンを補正
                    date_time = date_time.astimezone(pytz.timezone(timezone))
                date = date_time.strftime('%Y/%m/%d') if date_time else ''
                time = date_time.strftime('%H:%M:%S') if date_time else ''
                header_text = self._get_header_text(msg)
                body, format_, charset = self._get_main_content(msg)
                attachments = self._get_attachments(msg)
                mail_data = {}
                mail_data['msg_id'] = msg_id
                mail_data['header'] = header_text
                mail_data['from'] = from_
                mail_data['to'] = to_
                mail_data['cc'] = cc_
                mail_data['subject'] = subject
                mail_data['date'] = date
                mail_data['time'] = time
                mail_data['format'] = format_
                mail_data['charset'] = charset
                mail_data['body'] = body
                mail_data['attachments'] = attachments
                result.append(mail_data)

            return result

        finally:
            cli.close()
            cli.logout()

さて、UIDはどこにあるでしょうか。

実はどこにもありません。変数の中に隠れているわけではなく、この方法ではそもそもUIDを取得できないのです。

次はUIDを取得できるようにした改良版を以下に示します。

    def fetch_mail(self, search_option='UNSEEN', timezone='Asia/Tokyo'):
        '''
        メールを受信し、内容と添付ファイルの情報を辞書形式で返す。
        '''

        result = []

        # メールサーバーに接続
        cli = imaplib.IMAP4_SSL(self.host_name)

        try:
            # 認証
            cli.login(self.user_id, self.password)

            # メールボックスを選択(標準はINBOX)
            cli.select()

            # 指定されたオプションを用いてメッセージを検索
-           status, data = cli.search(None, search_option)
+           status, data = cli.uid('search', None, search_option)

            # 受信エラーの場合は空の結果を返して終了
            if status == 'NO':
                return result

            # メールの解析
-           for num in data[0].split():
+           for uid in data[0].split():
                status, data = cli.uid('fetch', uid, '(RFC822)')
                msg = BytesParser(policy=policy.default).parsebytes(data[0][1])
                msg_id = msg.get('Message-Id', failobj='')
                from_ = msg.get('From', failobj='')
                to_ = msg.get('To', failobj='')
                cc_ = msg.get('Cc', failobj='')
                subject = msg.get('Subject', failobj='')
                date_str = msg.get('Date', failobj='')
                date_time = parsedate_to_datetime(date_str)
                if date_time:
                    # タイムゾーンを補正
                    date_time = date_time.astimezone(pytz.timezone(timezone))
                date = date_time.strftime('%Y/%m/%d') if date_time else ''
                time = date_time.strftime('%H:%M:%S') if date_time else ''
                header_text = self._get_header_text(msg)
                body, format_, charset = self._get_main_content(msg)
                attachments = self._get_attachments(msg)
                mail_data = {}
+               mail_data['uid'] = uid.decode()
                mail_data['msg_id'] = msg_id
                mail_data['header'] = header_text
                mail_data['from'] = from_
                mail_data['to'] = to_
                mail_data['cc'] = cc_
                mail_data['subject'] = subject
                mail_data['date'] = date
                mail_data['time'] = time
                mail_data['format'] = format_
                mail_data['charset'] = charset
                mail_data['body'] = body
                mail_data['attachments'] = attachments
                result.append(mail_data)

            return result

        finally:
            cli.close()
            cli.logout()

違いは3箇所ありますが、重要なのはここです。

-           status, data = cli.search(None, search_option)
+           status, data = cli.uid('search', None, search_option)

こうすることでdataの中にUIDが格納されます。

たったこれだけの違いですが、サンプルコードがなかなか見つからなくて苦労しました。公式ドキュメントのサンプルも上の書き方になっています。

とりあえずUIDを取得できるようになったので、ライブラリをバージョンアップしました。

https://pypi.org/project/imap2dict/

バージョン1.1からはUIDが返るようになっています。

imap2dict 1.1

クライアント側の対応

レスポンス項目にUIDが追加されただけなので、クライアント側の対応はかんたんです。DBにUIDを格納するフィールドを追加して、ほかの項目と一緒に保存するだけです。リカバリー処理のスクリプトも作りましたが、最初は自動実行せず、エラー発生後にリカバリー処理の動作を確認することにしました。あとはエラーが起きるのを待つばかりです。

あれ、エラーが起きないぞ

ライブラリを修正してから10日たちましたが、エラーが発生しません。以前は3日に1度は発生していたのに不思議です。原因はいまのところ不明です。IMAP4.search()IMAP4.uid()でLambda環境との相性に違いがあるのでしょうか。

やっとエラーが発生した

11日目にようやく待望の(?)エラーが発生しました。取りこぼしメッセージも発生していたので、リカバリー用のスクリプトを動かして、期待通りの動作をすることが確認できました。ちなみにUIDの範囲指定はUID 10230:10235のように指定します。Pythonの文字列操作のようにUID 10230:という指定はできません。試しに指定したら空のデータが返ってきました。エラーリカバリー時はメールの到達頻度に応じた値を指定するとよいでしょう。

脚注
  1. もちろんIMAP4サーバー上のメールを削除しない運用なら一応大丈夫ですが、オペレーションミスで消してしまうことはありえますよね。 ↩︎

Discussion

ログインするとコメントできます