🦴
GCPの使用料金を日次でSlackに通知する with Airflow
概要
私用でGCPを使用しているが、クラウド破産を避けたい。。予算アラートで知らされる前に気づきたい。。。ために、「CloudBillingのBigQueryへのエクスポート機能」と「Airflow」を用いて日次で使用料金をSlackに通知するDAGを作成する。
やりたいこと
- 日次で使用料金をSlackに通知
- この時、プロジェクトとサービス名毎の料金もわかるように。
- BigQuery内にある使用したデータ(通知済みのデータ)の整理
- 通知済みのデータはローカルにCSVとして保存する。
- また、6日以上前のデータはデータ量削減のために削除する。
手順
事前準備:Cloud Billing データを BigQuery にエクスポートする
- こちらにまとめられている通りに行いました。
- https://cloud.google.com/billing/docs/how-to/export-data-bigquery?hl=ja
- 「標準的な使用料金データ」を使いました。
- 諸々設定するとBigQuery内にDatasetが作られ以下のようなデータが作られます。
- 各カラムについてはこちら
- 念のために隠した方が良いようなところのvalueは「XXX」にしてあります。
{ "billing_account_id": "XXX", "service": { "id": "XXX", "description": "Cloud Logging" }, "sku": { "id": "143F-A1B0-E0BE", "description": "Log Volume" }, "usage_start_time": "2022-04-20T02:00:00Z", "usage_end_time": "2022-04-20T03:00:00Z", "project": { "id": "XXX", "number": "61685520625", "name": "XXX", "labels": [], "ancestry_numbers": null, "ancestors": [{ "resource_name": "XXX", "display_name": "XXX" }] }, "labels": [{ "key": "goog-resource-type", "value": "bigquery_dataset" }], "system_labels": [], "location": { "location": "us", "country": "US", "region": null, "zone": null }, "export_time": "2022-04-20T05:55:41.063Z", "cost": "0.0", "currency": "JPY", "currency_conversion_rate": "122.83500000551589", "usage": { "amount": "933.0", "unit": "bytes", "amount_in_pricing_units": "8.6892396211624146e-07", "pricing_unit": "gibibyte" }, "credits": [], "invoice": { "month": "202204" }, "cost_type": "regular", "adjustment_info": null}
{ "billing_account_id": "XXX", "service": { "id": "XXX", "description": "Cloud Logging" }, "sku": { "id": "143F-A1B0-E0BE", "description": "Log Volume" }, "usage_start_time": "2022-04-20T00:00:00Z", "usage_end_time": "2022-04-20T01:00:00Z", "project": { "id": "XXX", "number": "61685520625", "name": "XXX", "labels": [], "ancestry_numbers": null, "ancestors": [{ "resource_name": "XXX", "display_name": "XXX" }] }, "labels": [{ "key": "goog-resource-type", "value": "bigquery_project" }], "system_labels": [], "location": { "location": "us", "country": "US", "region": null, "zone": null }, "export_time": "2022-04-20T04:04:18.373Z", "cost": "0.0", "currency": "JPY", "currency_conversion_rate": "122.83500000551589", "usage": { "amount": "8498.0", "unit": "bytes", "amount_in_pricing_units": "7.9143792390823364e-06", "pricing_unit": "gibibyte" }, "credits": [], "invoice": { "month": "202204" }, "cost_type": "regular", "adjustment_info": null}
{ "billing_account_id": "XXX", "service": { "id": "XXX", "description": "Cloud Logging" }, "sku": { "id": "143F-A1B0-E0BE", "description": "Log Volume" }, "usage_start_time": "2022-04-20T00:00:00Z", "usage_end_time": "2022-04-20T01:00:00Z", "project": { "id": "XXX", "number": "61685520625", "name": "XXX", "labels": [], "ancestry_numbers": null, "ancestors": [{ "resource_name": "XXX", "display_name": "XXX" }] }, "labels": [{ "key": "goog-resource-type", "value": "bigquery_dataset" }], "system_labels": [], "location": { "location": "us", "country": "US", "region": null, "zone": null }, "export_time": "2022-04-20T04:04:18.373Z", "cost": "0.0", "currency": "JPY", "currency_conversion_rate": "122.83500000551589", "usage": { "amount": "4609.0", "unit": "bytes", "amount_in_pricing_units": "4.2924657464027405e-06", "pricing_unit": "gibibyte" }, "credits": [], "invoice": { "month": "202204" }, "cost_type": "regular", "adjustment_info": null}
- 出力されたデータのうち、「.export_time」で任意の日付の識別を、「.project.name」でプロジェクト名、「.service.description」でサービス名、「.cost」でプロジェクトのサービス毎の料金の識別として、使用していきます。
DAG全体図
- prepare task
- 各種準備のためのタスク(手動実行・自動実行の場合の取得するデータ日付の切り分け、Xcomのpushなど)
- main_task_group group
- extract:BigQueryからsqlを用いてデータをCSVに出力する。
- transform:extractで出力したCSVデータを加工し、Slackに通知できるようなtextを組み立てる。
例えば上記のようなデータの場合->以下のようなtextを組み立てる。プロジェクトA,サービスA,0.0, プロジェクトA,サービスB,0.0, プロジェクトA,サービスC,155.30447499999997,
2022-04-14のGCP使用料金 プロジェクトA---- ・サービスC ¥156 ・サービスA ¥0 ・サービスB ¥0 100円以上でした。
- slack_notify:Slackに通知する。
- clean_group group
- clean_bq:6日以上前のデータを削除する。
設定するVariables
- bigquery_sa_keypath: Biqqueryのサービスアカウントのkeyがあるpath
- slack_token_API1: slackapiのtoken
- cost_dataset: bqのdataset
- cost_table: bqのtable
main.py
import uuid
from datetime import timedelta
from operator import itemgetter
from airflow import DAG
from airflow.models import Variable
from airflow.macros import ds_add
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup
from airflow.operators.python_operator import PythonOperator
from airflow_dag_sample.operators.slack_api_post_operator\
import SlackAPIPostOperatorAPI1
from airflow_dag_sample.operators.bigquery_operator import BigQueryOperator
default_args = {
'owner': 'owner',
'depends_on_past': False,
'start_date': days_ago(2),
}
def get_extract_date(ti):
return ti.xcom_pull(task_ids='prepare', key='extract_date')
def get_delete_date(ti):
return ti.xcom_pull(task_ids='prepare', key='delete_date')
def _macros():
return {
'bq_table': f"{Variable.get('cost_dataset')}.dataset.{Variable.get('cost_table')}",
'extract_date': get_extract_date,
'delete_date': get_delete_date,
}
with DAG(
'test_slack',
default_args=default_args,
description='日々のGCP使用料をSlackに通知するDAG',
schedule_interval=timedelta(days=1),
tags=["work"],
user_defined_macros=_macros(),
) as dag:
dag.doc_md = """\
## GCPの使用料金をSlackに通知するDAG
### 使用するVariable
- bigquery_sa_keypath
- slack_token_hatiware
- cost_dataset
- cost_table
"""
def prepare(**kwargs):
if kwargs['dag_run'].external_trigger \
and not kwargs['dag_run'].is_backfill:
extract_date = kwargs['yesterday_ds']
delete_date = ds_add(kwargs['yesterday_ds'], -5)
else:
extract_date = kwargs['ds']
delete_date = ds_add(kwargs['ds'], -5)
kwargs['ti'].xcom_push(key='extract_date', value=extract_date)
kwargs['ti'].xcom_push(key='delete_date', value=delete_date)
# CSVの出力先
local_log_dir = '/var/log/bigquery/'
suffix = str(uuid.uuid4()).replace('-', '_')
kwargs['ti'].xcom_push(
key='local_log_filepath',
value=f'{local_log_dir}{extract_date}_{suffix}.csv')
prepare_task = PythonOperator(
task_id="prepare",
python_callable=prepare,
)
with TaskGroup(
"main_task_group", tooltip="bqからextract_dateのgcp利用料金を抽出・加工し、Slackに送る"
) as main_group:
def reader(filepath: str) -> list:
with open(filepath) as f:
for row in f:
yield row.split(',')
def check_total(total_cost: str) -> str:
if total_cost >= 100:
return "100円以上でした。"
else:
return "100円未満でした。"
def create_text(extract_date: str, result_dict: dict) -> str:
text = f"{extract_date}のGCP使用料金"
total_cost = 0
for project, values in result_dict.items():
text = text+f"\n{project}----"
# costが大きい順に並び替えて取り出す
for value in sorted(
values, key=itemgetter('cost'), reverse=True):
cost = int(float(value['cost']))
text = text+f"\n・{value['service']} ¥{cost}"
total_cost += cost
text = text+f"\n\n{check_total(total_cost)}"
return text
def transform(**kwargs):
filepath = kwargs['ti'].xcom_pull(key='local_log_filepath')
result_dict = {}
for row in reader(filepath):
if result_dict.get(row[0], None) is not None:
result_dict[row[0]] = \
list(result_dict[row[0]])\
+ [dict(service=row[1], cost=row[2])]
else:
result_dict[row[0]] = \
[dict(service=row[1], cost=row[2])]
kwargs['ti'].xcom_push(
key='slack_text',
value=create_text(kwargs['extract_date'], result_dict))
extract_task = BigQueryOperator(
task_id='extract',
sql='sql/extract_cost.sql',
filepath="{{ ti.xcom_pull(key='local_log_filepath') }}",
do_xcom_push=False,
)
transform_task = PythonOperator(
task_id="transform",
op_kwargs={"extract_date": '{{ extract_date(ti) }}'},
python_callable=transform,
)
slack_notify_task = SlackAPIPostOperatorAPI1(
task_id="slack_notify",
text="{{ ti.xcom_pull(key='slack_text')}}",
channel="costs"
)
extract_task >> transform_task >> slack_notify_task
with TaskGroup(
"clean_group", tooltip="bqデータの整理") as clean_group:
clean_bq_task = BigQueryOperator(
task_id='clean_bq',
sql="DELETE FROM {{ bq_table }} \
WHERE cast(export_time as date) <= '{{ delete_date(ti) }}'",
do_xcom_push=False,
)
clean_bq_task
prepare_task >> main_group >> clean_group
sql/extract.sql
extract_taskで使用するsqlのtemplate
SELECT project.name AS project_name, service.description AS service_name,
SUM(cost) AS total_cost
FROM {{ bq_table }}
WHERE CAST(export_time as date) = '{{ extract_date(ti) }}'
GROUP BY project_name, service_name
BigQueryOperator
BaseOperator(https://github.com/apache/airflow/blob/main/airflow/models/baseoperator.py)を拡張して作成しました。
from google.cloud import bigquery
from google.oauth2 import service_account
from airflow.models import Variable
from airflow.models import BaseOperator
class BigQueryOperator(BaseOperator):
template_fields = ('sql','filepath')
template_ext = ('.sql')
ui_color = '#db7093'
def __init__(self, sql, filepath=None, *args, **kwargs):
"""
:param sql: execute query. Works with both file paths and raw queries
:param filepath: save the query execution result. if None, does not save to file
"""
super(BigQueryOperator, self).__init__(*args, **kwargs)
self.sql = sql
self.filepath = filepath
def execute(self, *args, **kwargs):
_credentials = service_account.Credentials.from_service_account_file(
Variable.get("bigquery_sa_keypath"),
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
_client = bigquery.Client(
credentials=_credentials,
project=_credentials.project_id,
)
query_job = _client.query(self.sql)
result = query_job.result()
if self.filepath is not None:
with open(self.filepath, 'w') as f:
for rows in result:
[f.write(f"{row},") for row in rows]
f.write("\n")
else:
return
SlackAPIPostOperator
SlackAPIOperator(https://github.com/apache/airflow/blob/main/airflow/providers/slack/operators/slack.py)を拡張して作成しました。。そのまま使っても問題なかったのですが、ui_colorを変えたかったのと、Operatorを使用するときにTokenを引数に都度設定するのが煩雑だったのでこのような形に。
import json
from airflow.models import Variable
from airflow.operators.slack_operator import SlackAPIOperator
from typing import Any, List, Optional, Sequence
class SlackAPIPostOperator(SlackAPIOperator):
"""
Posts messages to a slack channel
Examples:
https://airflow.apache.org/docs/apache-airflow-providers-slack/stable/_modules/airflow/providers/slack/operators/slack.html#SlackAPIPostOperator
"""
template_fields: Sequence[str] = (
'username', 'text', 'attachments', 'blocks', 'channel')
ui_color = '#b6f0dd'
def __init__(
self,
channel: str = '#general',
username: str = '467',
text: str = 'No message has been set.',
icon_url: str = 'https://raw.githubusercontent.com/apache/'
'airflow/main/airflow/www/static/pin_100.png',
attachments: Optional[List] = None,
blocks: Optional[List] = None,
**kwargs,
) -> None:
self.method = 'chat.postMessage'
self.channel = channel
self.username = username
self.text = text
self.icon_url = icon_url
self.attachments = attachments or []
self.blocks = blocks or []
super().__init__(method=self.method, **kwargs)
def construct_api_call_params(self) -> Any:
self.api_params = {
'channel': self.channel,
'username': self.username,
'text': self.text,
'icon_url': self.icon_url,
'attachments': json.dumps(self.attachments),
'blocks': json.dumps(self.blocks),
}
class SlackAPIPostOperatorAPI1(SlackAPIPostOperator):
def __init__(self, **kwargs):
self.token = Variable.get("slack_token_api1")
super().__init__(token=self.token, **kwargs)
実行結果
実行するとSlackAPIPostOperatorAPI1に指定したchannelに以下のような通知が来るかと思います。
まとめ
GCPの使用料金を日次でSlackに通知するDAGについて紹介でした。コードに実装が不適切であったり、良いやり方があればご指摘いただけると嬉しいです。
Discussion