🗑️
[Python] Gmailから指定したメールを削除する
はじめに
いままでGmailの自動振り分けを使っていなかったために要らないメールをちまちまと削除していました。
それに疲れたので、Pythonで指定したメールを削除できるようにしてみました。
自動振り分けを設定したらあまり必要でないのですが、何かの役に立つかと思い残しておきます。
環境
Python
Python 3.10.4 (tags/v3.10.4:9d38120, Mar 23 2022, 23:13:41) [MSC v.1929 64 bit (AMD64)]
コード
Goole API を利用するプロジェクトを作成が完了し、認証も済んでいるとします。
下記のコードでは、
- 送り先のメールアドレスが
addresses
に含まれている - 実行時から30日より前
の条件を満たすメールをゴミ箱へ送ります。
import os.path
import datetime
import re
import functools
# from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
SCOPES = [
# "https://mail.google.com/", # full access
"https://www.googleapis.com/auth/gmail.modify", # trash
]
################################################################################
##
################################################################################
# 時刻差がdelta以上ならばTrue
# datetime.timedelta(hours=9)は、JST 日本標準時 UTC+0900
def compareTime(dt1, dt2, delta, timeZone=datetime.timedelta(hours=9)):
return abs(dt1 - dt2) > abs(delta)
def getCredentials(path):
if os.path.exists('token.json'):
creds = Credentials.from_authorized_user_file('token.json', SCOPES)
else:
flow = InstalledAppFlow.from_client_secrets_file(path, SCOPES)
creds = flow.run_local_server(port=0)
with open('token.json', 'w') as file:
file.write(creds.to_json())
return creds
def getMessages(service, labelIds=None, user_id='me', pageToken=None):
result = []
lt = service.users().messages().list(userId=user_id, labelIds=labelIds, pageToken=pageToken).execute()
for i, message in enumerate(lt["messages"]):
headers = service.users().messages().get(userId=user_id, id=message["id"]).execute().get("payload", {}).get("headers", [])
info = getInfo(message["id"], headers)
info = formatInfo(info)
result.append(info)
return result, lt.get("nextPageToken", None)
def getAllMessages(service, labelIds=None, user_id='me'):
result = []
pageToken = None
temp, pageToken = getMessages(service, labelIds, user_id, pageToken)
result += temp
while pageToken is not None:
temp, pageToken = getMessages(service, labelIds, user_id, pageToken)
result += temp
return result
def filterMessageFrom(info, addresses):
return True if info["From"] in addresses else False
def filterMessageDate(info, days):
now = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9)))
return TU.compareTime(info["Date"], now, datetime.timedelta(days=days))
def getInfo(mId, lt):
item = ["Date", "Subject", "From", "To"]
result = {"id": mId}
for d in lt:
if d["name"] in item:
result[d["name"]] = d["value"]
return result
def convertTimeZone(string):
converter = {
"UTC": "+0000",
"GMT": "+0000",
"EST": "-0500",
"CST": "-0600",
"JST": "+0900",
}
return converter.get(string, "")
def convertDate(string):
reDate = re.compile("(?:[A-Za-z]{3}, )?(?P<datetime>\d{1,2} [A-Za-z]{3} \d{4} \d{2}:\d{2}:\d{2}) (?:(?P<timeZone>[A-Z]{3})|(?:(?P<offset>[+-]\d{4})(?: \([A-Z]{3}\))?))")
datetimeFormat = "%d %b %Y %H:%M:%S %z"
match = reDate.search(string)
dt = match["datetime"]
tz = match["timeZone"]
if tz is not None:
offset = convertTimeZone(tz)
else:
offset = match["offset"]
return datetime.datetime.strptime(f"{dt} {offset}", datetimeFormat)
def convertFrom(string):
reFrom = re.compile("(?P<name>.+) +<(?P<address>.+)>")
match = reFrom.search(string)
if match is not None:
return match["address"], match["name"]
else:
return string, None
def formatInfo(info):
result = info.copy()
for k, v in info.items():
if k == "Date":
result[k] = convertDate(v)
elif k == "From":
result[k], result["FromName"] = convertFrom(v)
return result
def trash(service, infos, user_id='me'):
for info in infos:
print(info)
service.users().messages().trash(userId=user_id, id=info["id"]).execute()
def getLabels(service):
return service.users().labels().list(userId='me').execute().get('labels', [])
if __name__ == "__main__":
path = r"client_secret.json"
addresses = [
"atmarkit-mail@noreply.itmedia.jp",
"support@codezine.jp",
"itmedia-seminar@noreply.itmedia.jp",
]
days = 30
creds = getCredentials(path)
try:
service = build('gmail', 'v1', credentials=creds)
# labels = getLabels(service)
# print(labels)
# infos = getAllMessages(service, labelIds="INBOX")
infos = getAllMessages(service)
filterMessageFrom = functools.partial(filterMessageFrom, addresses=addresses)
infos = list(filter(filterMessageFrom, infos))
filterMessageDate = functools.partial(filterMessageDate, days=days)
infos = list(filter(filterMessageDate, infos))
trash(service, infos)
except HttpError as error:
print(f'An error occurred: {error}')
Discussion