🦴

GCPの使用料金を日次でSlackに通知する with Airflow

2022/04/22に公開

概要

私用でGCPを使用しているが、クラウド破産を避けたい。。予算アラートで知らされる前に気づきたい。。。ために、「CloudBillingのBigQueryへのエクスポート機能」と「Airflow」を用いて日次で使用料金をSlackに通知するDAGを作成する。

やりたいこと

  • 日次で使用料金をSlackに通知
    • この時、プロジェクトとサービス名毎の料金もわかるように。
  • BigQuery内にある使用したデータ(通知済みのデータ)の整理
    • 通知済みのデータはローカルにCSVとして保存する。
    • また、6日以上前のデータはデータ量削減のために削除する。

手順

事前準備:Cloud Billing データを BigQuery にエクスポートする

{  "billing_account_id": "XXX",  "service": {    "id": "XXX",    "description": "Cloud Logging"  },  "sku": {    "id": "143F-A1B0-E0BE",    "description": "Log Volume"  },  "usage_start_time": "2022-04-20T02:00:00Z",  "usage_end_time": "2022-04-20T03:00:00Z",  "project": {    "id": "XXX",    "number": "61685520625",    "name": "XXX",    "labels": [],    "ancestry_numbers": null,    "ancestors": [{      "resource_name": "XXX",      "display_name": "XXX"    }]  },  "labels": [{    "key": "goog-resource-type",    "value": "bigquery_dataset"  }],  "system_labels": [],  "location": {    "location": "us",    "country": "US",    "region": null,    "zone": null  },  "export_time": "2022-04-20T05:55:41.063Z",  "cost": "0.0",  "currency": "JPY",  "currency_conversion_rate": "122.83500000551589",  "usage": {    "amount": "933.0",    "unit": "bytes",    "amount_in_pricing_units": "8.6892396211624146e-07",    "pricing_unit": "gibibyte"  },  "credits": [],  "invoice": {    "month": "202204"  },  "cost_type": "regular",  "adjustment_info": null}
{  "billing_account_id": "XXX",  "service": {    "id": "XXX",    "description": "Cloud Logging"  },  "sku": {    "id": "143F-A1B0-E0BE",    "description": "Log Volume"  },  "usage_start_time": "2022-04-20T00:00:00Z",  "usage_end_time": "2022-04-20T01:00:00Z",  "project": {    "id": "XXX",    "number": "61685520625",    "name": "XXX",    "labels": [],    "ancestry_numbers": null,    "ancestors": [{      "resource_name": "XXX",      "display_name": "XXX"    }]  },  "labels": [{    "key": "goog-resource-type",    "value": "bigquery_project"  }],  "system_labels": [],  "location": {    "location": "us",    "country": "US",    "region": null,    "zone": null  },  "export_time": "2022-04-20T04:04:18.373Z",  "cost": "0.0",  "currency": "JPY",  "currency_conversion_rate": "122.83500000551589",  "usage": {    "amount": "8498.0",    "unit": "bytes",    "amount_in_pricing_units": "7.9143792390823364e-06",    "pricing_unit": "gibibyte"  },  "credits": [],  "invoice": {    "month": "202204"  },  "cost_type": "regular",  "adjustment_info": null}
{  "billing_account_id": "XXX",  "service": {    "id": "XXX",    "description": "Cloud Logging"  },  "sku": {    "id": "143F-A1B0-E0BE",    "description": "Log Volume"  },  "usage_start_time": "2022-04-20T00:00:00Z",  "usage_end_time": "2022-04-20T01:00:00Z",  "project": {    "id": "XXX",    "number": "61685520625",    "name": "XXX",    "labels": [],    "ancestry_numbers": null,    "ancestors": [{      "resource_name": "XXX",      "display_name": "XXX"    }]  },  "labels": [{    "key": "goog-resource-type",    "value": "bigquery_dataset"  }],  "system_labels": [],  "location": {    "location": "us",    "country": "US",    "region": null,    "zone": null  },  "export_time": "2022-04-20T04:04:18.373Z",  "cost": "0.0",  "currency": "JPY",  "currency_conversion_rate": "122.83500000551589",  "usage": {    "amount": "4609.0",    "unit": "bytes",    "amount_in_pricing_units": "4.2924657464027405e-06",    "pricing_unit": "gibibyte"  },  "credits": [],  "invoice": {    "month": "202204"  },  "cost_type": "regular",  "adjustment_info": null}
  • 出力されたデータのうち、「.export_time」で任意の日付の識別を、「.project.name」でプロジェクト名、「.service.description」でサービス名、「.cost」でプロジェクトのサービス毎の料金の識別として、使用していきます。

DAG全体図

  • prepare task
    • 各種準備のためのタスク(手動実行・自動実行の場合の取得するデータ日付の切り分け、Xcomのpushなど)
  • main_task_group group
    • extract:BigQueryからsqlを用いてデータをCSVに出力する。
    • transform:extractで出力したCSVデータを加工し、Slackに通知できるようなtextを組み立てる。
    プロジェクトA,サービスA,0.0,
    プロジェクトA,サービスB,0.0,
    プロジェクトA,サービスC,155.30447499999997,
    
    例えば上記のようなデータの場合->以下のようなtextを組み立てる。
    2022-04-14のGCP使用料金
    プロジェクトA----
    ・サービスC ¥156
    ・サービスA ¥0
    ・サービスB ¥0
    100円以上でした。
    
    • slack_notify:Slackに通知する。
  • clean_group group
    • clean_bq:6日以上前のデータを削除する。

設定するVariables

  • bigquery_sa_keypath: Biqqueryのサービスアカウントのkeyがあるpath
  • slack_token_API1: slackapiのtoken
  • cost_dataset: bqのdataset
  • cost_table: bqのtable

main.py

import uuid
from datetime import timedelta
from operator import itemgetter
from airflow import DAG
from airflow.models import Variable
from airflow.macros import ds_add
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup
from airflow.operators.python_operator import PythonOperator
from airflow_dag_sample.operators.slack_api_post_operator\
    import SlackAPIPostOperatorAPI1
from airflow_dag_sample.operators.bigquery_operator import BigQueryOperator


default_args = {
    'owner': 'owner',
    'depends_on_past': False,
    'start_date': days_ago(2),
}


def get_extract_date(ti):
    return ti.xcom_pull(task_ids='prepare', key='extract_date')


def get_delete_date(ti):
    return ti.xcom_pull(task_ids='prepare', key='delete_date')


def _macros():
    return {
        'bq_table': f"{Variable.get('cost_dataset')}.dataset.{Variable.get('cost_table')}",
        'extract_date': get_extract_date,
        'delete_date': get_delete_date,
    }


with DAG(
    'test_slack',
    default_args=default_args,
    description='日々のGCP使用料をSlackに通知するDAG',
    schedule_interval=timedelta(days=1),
    tags=["work"],
    user_defined_macros=_macros(),
) as dag:
    dag.doc_md = """\
    ## GCPの使用料金をSlackに通知するDAG
    ### 使用するVariable
    - bigquery_sa_keypath
    - slack_token_hatiware
    - cost_dataset
    - cost_table
    """

    def prepare(**kwargs):
        if kwargs['dag_run'].external_trigger \
             and not kwargs['dag_run'].is_backfill:
            extract_date = kwargs['yesterday_ds']
            delete_date = ds_add(kwargs['yesterday_ds'], -5)
        else:
            extract_date = kwargs['ds']
            delete_date = ds_add(kwargs['ds'], -5)
        kwargs['ti'].xcom_push(key='extract_date', value=extract_date)
        kwargs['ti'].xcom_push(key='delete_date', value=delete_date)
	# CSVの出力先
        local_log_dir = '/var/log/bigquery/'
        suffix = str(uuid.uuid4()).replace('-', '_')
        kwargs['ti'].xcom_push(
            key='local_log_filepath',
            value=f'{local_log_dir}{extract_date}_{suffix}.csv')

    prepare_task = PythonOperator(
        task_id="prepare",
        python_callable=prepare,
    )

    with TaskGroup(
         "main_task_group", tooltip="bqからextract_dateのgcp利用料金を抽出・加工し、Slackに送る"
         ) as main_group:
        def reader(filepath: str) -> list:
            with open(filepath) as f:
                for row in f:
                    yield row.split(',')

        def check_total(total_cost: str) -> str:
            if total_cost >= 100:
                return "100円以上でした。"
            else:
                return "100円未満でした。"

        def create_text(extract_date: str, result_dict: dict) -> str:
            text = f"{extract_date}のGCP使用料金"
            total_cost = 0
            for project, values in result_dict.items():
                text = text+f"\n{project}----"
		# costが大きい順に並び替えて取り出す
                for value in sorted(
                        values, key=itemgetter('cost'), reverse=True):
                    cost = int(float(value['cost']))
                    text = text+f"\n・{value['service']} ¥{cost}"
                    total_cost += cost
            text = text+f"\n\n{check_total(total_cost)}"
            return text

        def transform(**kwargs):
            filepath = kwargs['ti'].xcom_pull(key='local_log_filepath')
            result_dict = {}
            for row in reader(filepath):
                if result_dict.get(row[0], None) is not None:
                    result_dict[row[0]] = \
                        list(result_dict[row[0]])\
                        + [dict(service=row[1], cost=row[2])]
                else:
                    result_dict[row[0]] = \
                        [dict(service=row[1], cost=row[2])]
            kwargs['ti'].xcom_push(
                key='slack_text',
                value=create_text(kwargs['extract_date'], result_dict))

        extract_task = BigQueryOperator(
            task_id='extract',
            sql='sql/extract_cost.sql',
            filepath="{{ ti.xcom_pull(key='local_log_filepath') }}",
            do_xcom_push=False,
        )

        transform_task = PythonOperator(
            task_id="transform",
            op_kwargs={"extract_date": '{{ extract_date(ti) }}'},
            python_callable=transform,
        )

        slack_notify_task = SlackAPIPostOperatorAPI1(
            task_id="slack_notify",
            text="{{ ti.xcom_pull(key='slack_text')}}",
            channel="costs"
        )

        extract_task >> transform_task >> slack_notify_task

    with TaskGroup(
         "clean_group", tooltip="bqデータの整理") as clean_group:

        clean_bq_task = BigQueryOperator(
            task_id='clean_bq',
            sql="DELETE FROM {{ bq_table }} \
                 WHERE cast(export_time as date) <= '{{ delete_date(ti) }}'",
            do_xcom_push=False,
        )

        clean_bq_task

    prepare_task >> main_group >> clean_group

sql/extract.sql

extract_taskで使用するsqlのtemplate

SELECT project.name AS project_name, service.description AS service_name,
 SUM(cost) AS total_cost
FROM {{ bq_table }} 
WHERE CAST(export_time as date) = '{{ extract_date(ti) }}'
GROUP BY project_name, service_name

BigQueryOperator

BaseOperator(https://github.com/apache/airflow/blob/main/airflow/models/baseoperator.py)を拡張して作成しました。

from google.cloud import bigquery
from google.oauth2 import service_account
from airflow.models import Variable
from airflow.models import BaseOperator

class BigQueryOperator(BaseOperator):

    template_fields = ('sql','filepath')
    template_ext = ('.sql')
    ui_color = '#db7093'

    def __init__(self, sql, filepath=None, *args, **kwargs):
        """ 
	:param sql: execute query. Works with both file paths and raw queries
	:param filepath: save the query execution result. if None, does not save to file
	"""
        super(BigQueryOperator, self).__init__(*args, **kwargs)
        self.sql = sql
        self.filepath = filepath


    def execute(self, *args, **kwargs):
        _credentials = service_account.Credentials.from_service_account_file(
            Variable.get("bigquery_sa_keypath"),
            scopes=["https://www.googleapis.com/auth/cloud-platform"],
        )
        _client = bigquery.Client(
            credentials=_credentials,
            project=_credentials.project_id,
        )
        query_job = _client.query(self.sql)
        result = query_job.result()

        if self.filepath is not None:
            with open(self.filepath, 'w') as f:
                for rows in result:
                    [f.write(f"{row},") for row in rows]
                    f.write("\n")
        else:
            return

SlackAPIPostOperator

SlackAPIOperator(https://github.com/apache/airflow/blob/main/airflow/providers/slack/operators/slack.py)を拡張して作成しました。。そのまま使っても問題なかったのですが、ui_colorを変えたかったのと、Operatorを使用するときにTokenを引数に都度設定するのが煩雑だったのでこのような形に。

import json
from airflow.models import Variable
from airflow.operators.slack_operator import SlackAPIOperator
from typing import Any, List, Optional, Sequence


class SlackAPIPostOperator(SlackAPIOperator):
    """
    Posts messages to a slack channel
    Examples:
    https://airflow.apache.org/docs/apache-airflow-providers-slack/stable/_modules/airflow/providers/slack/operators/slack.html#SlackAPIPostOperator
    """
    template_fields: Sequence[str] = (
        'username', 'text', 'attachments', 'blocks', 'channel')

    ui_color = '#b6f0dd'

    def __init__(
        self,
        channel: str = '#general',
        username: str = '467',
        text: str = 'No message has been set.',
        icon_url: str = 'https://raw.githubusercontent.com/apache/'
        'airflow/main/airflow/www/static/pin_100.png',
        attachments: Optional[List] = None,
        blocks: Optional[List] = None,
        **kwargs,
    ) -> None:
        self.method = 'chat.postMessage'
        self.channel = channel
        self.username = username
        self.text = text
        self.icon_url = icon_url
        self.attachments = attachments or []
        self.blocks = blocks or []
        super().__init__(method=self.method, **kwargs)

    def construct_api_call_params(self) -> Any:
        self.api_params = {
            'channel': self.channel,
            'username': self.username,
            'text': self.text,
            'icon_url': self.icon_url,
            'attachments': json.dumps(self.attachments),
            'blocks': json.dumps(self.blocks),

        }


class SlackAPIPostOperatorAPI1(SlackAPIPostOperator):
    def __init__(self, **kwargs):
        self.token = Variable.get("slack_token_api1")
        super().__init__(token=self.token, **kwargs)

実行結果

実行するとSlackAPIPostOperatorAPI1に指定したchannelに以下のような通知が来るかと思います。

まとめ

GCPの使用料金を日次でSlackに通知するDAGについて紹介でした。コードに実装が不適切であったり、良いやり方があればご指摘いただけると嬉しいです。

Discussion