GCPのCloud Composerを触ってみる
クイックスタート | Cloud Composer | Google Cloudを見ながら、Cloud Composerを触ってみる。
準備
GCPのコンソールのメニューからComposer
をクリックすると、以下のようなページが出てくるのでAPIを有効にする。
有効になると「環境」というページに遷移するので、「環境の作成」をする。
最初の2項目だけ変更して、あとはデフォルトのまま「作成」ボタンを押す。
注: システムが環境を作成するまでに約 25 分かかります。
ということなので、気長に待つ。
Google Cloud SDKの準備
環境を作るのになんかやたら時間かかるので、その間にmacOS 用のクイックスタート | Cloud SDK のドキュメント | Google Cloudを参考にGoogle Cloud SDKを準備しておく。
macOSなのでHomebrewで入れる。
$ brew install --cask google-cloud-sdk
以下のような文面が表示されるので、使ってるシェルに合わせて設定を追加しておく。
google-cloud-sdk is installed at /usr/local/Caskroom/google-cloud-sdk/latest/google-cloud-sdk. Add your profile:
for bash users
source "/usr/local/Caskroom/google-cloud-sdk/latest/google-cloud-sdk/path.bash.inc"
source "/usr/local/Caskroom/google-cloud-sdk/latest/google-cloud-sdk/completion.bash.inc"
for zsh users
source "/usr/local/Caskroom/google-cloud-sdk/latest/google-cloud-sdk/path.zsh.inc"
source "/usr/local/Caskroom/google-cloud-sdk/latest/google-cloud-sdk/completion.zsh.inc"
for fish users
source "/usr/local/Caskroom/google-cloud-sdk/latest/google-cloud-sdk/path.fish.inc"
SDKの初期化をする。
$ gcloud init
ブラウザが開いて認証したり、Compute Engineを有効にしてる場合はデフォルトのリージョンを選択したりする。
$ gcloud auth list
Credentialed Accounts
ACTIVE ACCOUNT
* kentarok@gmail.com
To set the active account, run:
$ gcloud config set account `ACCOUNT`
ちゃんと動いてるぽい。
DAGの作成
SDKの準備をしているうちに環境ができあがった。以下のコードをドキュメントからコピーして、ローカルにファイルを作成する。
import datetime
import airflow
from airflow.operators import bash_operator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with airflow.DAG(
'composer_sample_dag',
'catchup=False',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
# Print the dag_run id from the Airflow logs
print_dag_run_conf = bash_operator.BashOperator(
task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')
以下の「DAGのフォルダ」カラムのDAGs
というところをクリックすると、クラウドストレージのページが開かれる。
そこに↑で作ったファイルをアップロードする。
Web UIでDAGを確認する
DAG をアップロードすると、Cloud Composer によって DAG が Airflow に追加され、その後すぐに DAG のスケジュールが設定されます。DAG が Airflow ウェブ インターフェースに表示されるまでに数分かかる場合があります。
ということなので、自動的にAirflowに追加されるようだ。環境の一覧にあるリンクからアクセスすると、こんな感じでquickstart.py
で定義したDAGが表示されている。
ちゃんと動いたっぽい。
後始末
課金されたくないので消しちゃう。
- バケットを消す
- Pub/Subトピックを削除
- 環境を削除