🗑️

[Python] Gmailから指定したメールを削除する

2022/06/19に公開

はじめに

いままでGmailの自動振り分けを使っていなかったために要らないメールをちまちまと削除していました。
それに疲れたので、Pythonで指定したメールを削除できるようにしてみました。
自動振り分けを設定したらあまり必要でないのですが、何かの役に立つかと思い残しておきます。

環境

Python
Python 3.10.4 (tags/v3.10.4:9d38120, Mar 23 2022, 23:13:41) [MSC v.1929 64 bit (AMD64)]

コード

Goole API を利用するプロジェクトを作成が完了し、認証も済んでいるとします。

下記のコードでは、

  • 送り先のメールアドレスが addressesに含まれている
  • 実行時から30日より前

の条件を満たすメールをゴミ箱へ送ります。

import os.path
import datetime
import re
import functools

# from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

SCOPES = [
  # "https://mail.google.com/", # full access
  "https://www.googleapis.com/auth/gmail.modify",  # trash
]

################################################################################
##
################################################################################

# 時刻差がdelta以上ならばTrue
# datetime.timedelta(hours=9)は、JST 日本標準時 UTC+0900
def compareTime(dt1, dt2, delta, timeZone=datetime.timedelta(hours=9)):
  return abs(dt1 - dt2) > abs(delta)

def getCredentials(path):
  if os.path.exists('token.json'):
    creds = Credentials.from_authorized_user_file('token.json', SCOPES)
  else:
    flow = InstalledAppFlow.from_client_secrets_file(path, SCOPES)
    creds = flow.run_local_server(port=0)
  with open('token.json', 'w') as file:
    file.write(creds.to_json())
  return creds


def getMessages(service, labelIds=None, user_id='me', pageToken=None):
  result = []
  lt = service.users().messages().list(userId=user_id, labelIds=labelIds, pageToken=pageToken).execute()
  for i, message in enumerate(lt["messages"]):
    headers = service.users().messages().get(userId=user_id, id=message["id"]).execute().get("payload", {}).get("headers", [])
    info = getInfo(message["id"], headers)
    info = formatInfo(info)
    result.append(info)
  return result, lt.get("nextPageToken", None)


def getAllMessages(service, labelIds=None, user_id='me'):
  result = []
  pageToken = None
  temp, pageToken = getMessages(service, labelIds, user_id, pageToken)
  result += temp
  while pageToken is not None:
    temp, pageToken = getMessages(service, labelIds, user_id, pageToken)
    result += temp
  return result


def filterMessageFrom(info, addresses):
  return True if info["From"] in addresses else False


def filterMessageDate(info, days):
  now = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9)))
  return TU.compareTime(info["Date"], now, datetime.timedelta(days=days))


def getInfo(mId, lt):
  item = ["Date", "Subject", "From", "To"]
  result = {"id": mId}
  for d in lt:
    if d["name"] in item:
      result[d["name"]] = d["value"]
  return result


def convertTimeZone(string):
  converter = {
    "UTC": "+0000",
    "GMT": "+0000",
    "EST": "-0500",
    "CST": "-0600",
    "JST": "+0900",
  }
  return converter.get(string, "")


def convertDate(string):
  reDate = re.compile("(?:[A-Za-z]{3}, )?(?P<datetime>\d{1,2} [A-Za-z]{3} \d{4} \d{2}:\d{2}:\d{2}) (?:(?P<timeZone>[A-Z]{3})|(?:(?P<offset>[+-]\d{4})(?: \([A-Z]{3}\))?))")
  datetimeFormat = "%d %b %Y %H:%M:%S %z"
  match = reDate.search(string)
  dt = match["datetime"]
  tz = match["timeZone"]
  if tz is not None:
    offset = convertTimeZone(tz)
  else:
    offset = match["offset"]
  return datetime.datetime.strptime(f"{dt} {offset}", datetimeFormat)


def convertFrom(string):
  reFrom = re.compile("(?P<name>.+) +<(?P<address>.+)>")
  match = reFrom.search(string)
  if match is not None:
    return match["address"], match["name"]
  else:
    return string, None


def formatInfo(info):
  result = info.copy()
  for k, v in info.items():
    if k == "Date":
      result[k] = convertDate(v)
    elif k == "From":
      result[k], result["FromName"] = convertFrom(v)
  return result


def trash(service, infos, user_id='me'):
  for info in infos:
    print(info)
    service.users().messages().trash(userId=user_id, id=info["id"]).execute()


def getLabels(service):
  return service.users().labels().list(userId='me').execute().get('labels', [])


if __name__ == "__main__":
  path = r"client_secret.json"
  addresses = [
    "atmarkit-mail@noreply.itmedia.jp",
    "support@codezine.jp",
    "itmedia-seminar@noreply.itmedia.jp",
  ]
  days = 30

  creds = getCredentials(path)
  try:
    service = build('gmail', 'v1', credentials=creds)
    # labels = getLabels(service)
    # print(labels)
 
    # infos = getAllMessages(service, labelIds="INBOX")
    infos = getAllMessages(service)
    filterMessageFrom = functools.partial(filterMessageFrom, addresses=addresses)
    infos = list(filter(filterMessageFrom, infos))
    filterMessageDate = functools.partial(filterMessageDate, days=days)
    infos = list(filter(filterMessageDate, infos))
    trash(service, infos)
  except HttpError as error:
    print(f'An error occurred: {error}')

Discussion