Closed
5

GCPのCloud Composerを触ってみる

クイックスタート  |  Cloud Composer  |  Google Cloudを見ながら、Cloud Composerを触ってみる。

準備

GCPのコンソールのメニューからComposerをクリックすると、以下のようなページが出てくるのでAPIを有効にする。

有効になると「環境」というページに遷移するので、「環境の作成」をする。

最初の2項目だけ変更して、あとはデフォルトのまま「作成」ボタンを押す。

注: システムが環境を作成するまでに約 25 分かかります。

ということなので、気長に待つ。

Google Cloud SDKの準備

環境を作るのになんかやたら時間かかるので、その間にmacOS 用のクイックスタート  |  Cloud SDK のドキュメント  |  Google Cloudを参考にGoogle Cloud SDKを準備しておく。

macOSなのでHomebrewで入れる。

$ brew install --cask google-cloud-sdk

以下のような文面が表示されるので、使ってるシェルに合わせて設定を追加しておく。

google-cloud-sdk is installed at /usr/local/Caskroom/google-cloud-sdk/latest/google-cloud-sdk. Add your profile:

  for bash users
    source "/usr/local/Caskroom/google-cloud-sdk/latest/google-cloud-sdk/path.bash.inc"
    source "/usr/local/Caskroom/google-cloud-sdk/latest/google-cloud-sdk/completion.bash.inc"

  for zsh users
    source "/usr/local/Caskroom/google-cloud-sdk/latest/google-cloud-sdk/path.zsh.inc"
    source "/usr/local/Caskroom/google-cloud-sdk/latest/google-cloud-sdk/completion.zsh.inc"

  for fish users
    source "/usr/local/Caskroom/google-cloud-sdk/latest/google-cloud-sdk/path.fish.inc"

SDKの初期化をする。

$ gcloud init

ブラウザが開いて認証したり、Compute Engineを有効にしてる場合はデフォルトのリージョンを選択したりする。

$ gcloud auth list
  Credentialed Accounts
ACTIVE  ACCOUNT
*       kentarok@gmail.com

To set the active account, run:
    $ gcloud config set account `ACCOUNT`

ちゃんと動いてるぽい。

DAGの作成

SDKの準備をしているうちに環境ができあがった。以下のコードをドキュメントからコピーして、ローカルにファイルを作成する。

quickstart.py
import datetime

import airflow
from airflow.operators import bash_operator

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with airflow.DAG(
        'composer_sample_dag',
        'catchup=False',
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash_operator.BashOperator(
        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')

以下の「DAGのフォルダ」カラムのDAGsというところをクリックすると、クラウドストレージのページが開かれる。

そこに↑で作ったファイルをアップロードする。

Web UIでDAGを確認する

DAG をアップロードすると、Cloud Composer によって DAG が Airflow に追加され、その後すぐに DAG のスケジュールが設定されます。DAG が Airflow ウェブ インターフェースに表示されるまでに数分かかる場合があります。

ということなので、自動的にAirflowに追加されるようだ。環境の一覧にあるリンクからアクセスすると、こんな感じでquickstart.pyで定義したDAGが表示されている。

ちゃんと動いたっぽい。

後始末

課金されたくないので消しちゃう。

  1. バケットを消す
  2. Pub/Subトピックを削除
  3. 環境を削除
このスクラップは7日前にクローズされました
ログインするとコメントできます