💨

GCPを利用した基盤構築及びデータ取得作業の定期実行について①

2022/09/13に公開

はじめに

これは、Web APIとGCPを利用し5日間の天候データ取得の作業を自動化した記事です。
これらの記事を参考にしています。
https://qiita.com/atsuken/items/23c0f0395514cafaf47d
https://qiita.com/matsu0130/items/4849ebb3681e827274d3

必要なもの・作業の流れ

・OpenweathermapのAPIキー
・GCPのアカウント(要はgoogleアカウント)

Openweathermapは、世界の天気に関する情報を取得することが出来るサービスです。
https://openweathermap.org/
APIキーはアカウントを作成すれば発行されると思います。(ログインして[API keys]をクリック)

今回は、無料で利用できる5日間の天気データを取得し、GCPのバゲット及びBigQueryのテーブルに保存する作業を自動化していきます。具体的には以下の通りです。

  1. Cloud Schedulerで指定した時間にPub/Subへメッセージを送信
  2. メッセージを受信したPub/SubがCloud Functionsを起動
  3. 起動したCloud Functionsが以下の作業を実行
     ・Openweathermapからデータを取得し、データフレームに変換
     ・データフレームをCloud Storageにcsvファイルとして保存
     ・データフレームをBigQueryのテーブルに登録

bigqueryのテーブル、Cloud Storageのバケット、Pub/Subを、上の2つの記事に従い先に作成しています。
bigqueryのテーブルはこのように作りました。(中身は空のテーブル)

Cloud Functionsの設定

・文字列をまとめたsettings.pyファイル
・api経由で天候データ取得を実行するopenweather.pyファイル
・データをGCSへcsvファイルで保存するstorage.pyファイル
・BigQueryのテーブルにデータを送るbigquery.pyファイル
・Cloud Functionsで実行するmain.pyファイル
・実行に必要な外部ライブラリを記載するrequirement.txt
以上5つのファイルと1つのテキストの形でコードを記述しています。

環境:第1世代
リージョン:us-west1
トリガー:Pub/Sub
割り当てられたメモリ:512 MB
タイムアウト:60 秒
ランタイム:Python3.8
エントリポイント:main
ソースコード:インラインエディタ

settings.py
city = '取得したい地名を入力' #ex)'Tokyo,JP'、'London,GB'
API_KEY = '自分のOpenweathermap APIを入力'
url = "http://api.openweathermap.org/data/2.5/forecast?q={city}&appid={key}&lang=ja&units=metric"
project_id = '自分のproject_idを入力'
bucket_name = 'csvファイルを保存する先のbucket_nameを入力' #ex)'wm'

units=metric:摂氏の温度を取得、lang=ja:日本語で出力

openweather.py
from datetime import datetime, timedelta, timezone
import json
import pandas as pd
import requests
import settings

# api経由で天候データ取得を実行する関数
def openweathermap():
    # Web APIから天気データを取得(json形式で取得することになる)
    url = settings.url.format(city=settings.city, key=settings.API_KEY)
    json_data = requests.get(url).json()

    tz = timezone(timedelta(hours=+9), 'JST') # 日本のタイムゾーンを作成
    j, m, p, s, wea, wind = [], [], [], [], [], []
    # 取得したデータを加工し、リストに追加する
    for dat in json_data['list']:
        jst = str(datetime.fromtimestamp(dat["dt"], tz))[:-9] # UTCの時間をJSTに変換
        j.append(jst)
        m.append(pd.Series(dat['main']))
        p.append(dat['pop'])
        s.append(pd.Series(dat['sys']))
        wea.append(pd.Series(dat['weather'][0]))
        wind.append(pd.Series(dat['wind']))
    
    # リストをデータフレームに変換し、連結する。
    dt = pd.Series(j, name='time')
    dfm = pd.DataFrame(m).drop(columns=['temp_kf'])
    dfp = pd.Series(p, name='pop')
    dfs = pd.DataFrame(s)
    dfw1 = pd.DataFrame(wea).drop(columns=['id','icon'])
    dfw2 = pd.DataFrame(wind)    
    df = pd.concat([dt, dfm, dfp, dfs, dfw1, dfw2], axis=1)
    return df
storage.py
from google.cloud import storage as gcs
import openweather
import datetime
import settings

# データをGCSへcsvファイルで保存する関数
def send(df):
    project_id = settings.project_id 
    bucket_name = settings.bucket_name #ex)settings.pyより'wm'
    gcs_path_1 = "data/openweathermap_data" # ファイルまでのパス
    # ファイル名でデータを取得した日付がわかるようにする
    to_day = str(datetime.date.today()) # import datetimeを書く必要がある。
    to_day = to_day.replace("-","")
    
    client = gcs.Client()
    bucket = client.get_bucket(bucket_name)
    
    blob_gcs_1 = bucket.blob(gcs_path_1 + "_" + to_day + ".csv")
    blob_gcs_1.upload_from_string(data=df.to_csv(index=False))

次のような形でcsvファイルに保存される。
wm/data/openweathermap_data_20220909.csv

datetime.date.today()で日付を付けるには「import datetime」でないと上手くいかない。
「from datetime import datetime, timedelta, timezone」で行うとエラーが発生する。

bigquery.py
from google.cloud import bigquery as gbq
import storage
import openweather
import settings

# BigQueryのテーブルにデータをインサートする関数
def insert(df):
    client = gbq.Client(settings.project_id)
    table = client.get_table("プロジェクト名.データセット名.テーブル名")
    client.insert_rows(table, df.values.tolist())
main.py
import settings
import openweather
import storage
import bigquery

# Cloud Functionsで実行する関数
def main(event, context):
    df = openweather.openweathermap()
    storage.send(df)
    bigquery.insert(df)

実行する関数の引数には(event, context)を設定する必要がある。(無いと上手くいかない)

requirements.txt
# Function dependencies, for example:
# package>=version
requests>=2.28.1
pandas>=1.3.5
google-cloud-storage>=1.33.0
google-cloud-bigquery>=2.20.0

標準ライブラリ(jsonなど)は記載しなくて良い。

Cloud Schedulerの設定

こちらも上にある参考記事に従い作成しました。
ただジョブ作成の頻度をunix-cronという見慣れない形式で作成することに注意が必要です。
(私は初めて知りました。)
私は22:00で5日おきに作成したかったので「0 22 */5 * *」と書きました。
書き方については以下の記事を参考にしています。
https://www.yoheim.net/blog.php?q=20190902
https://beyondjapan.com/blog/2019/12/cron-interval-45min/
https://pentan.info/server/cron/cron_off_by_one.html

リージョン:us-west1
頻度:0 22 */5 * *
タイムゾーン:日本標準時(JST)
ターゲットタイプ:Pub/Sub

まとめ

ここまで設定すれば後は、Cloud SchedulerとOpenweathermapのAPIを起動させておけば自動でデータを取得することが出来るようになります。
一応自分でやった時は上手くできたので大丈夫だと思います。
注意点として、OpenweathermapのAPIは起動直後だと上手くデータを取得できないので予め起動しておく必要があります(20分前ぐらいで大丈夫です)。
起動するだけならスマートフォンでもできるので、難しくはないです。

Discussion