📘

BigQueryのクエリ結果をCloudFunctionsとPub/Subを使用して定期的にslackに投稿する

2021/06/29に公開

はじめに

やりたかったことはタイトルの通りです。
世の先人たちが様々な方法で実装していますが、私なりに実装した方法をここに残します。
またこれまでpublicな場所で情報を発信したことがないので、その練習と併せて本記事を公開しています。

実装図

処理の流れ

簡単2ステップです。

  1. BigQueryのクエリスケジュールで定期的にクエリをテーブルに発行。処理終了時にPub/Subへトピックを発行するように設定
  2. Pub/SubのトピックをトリガーにしたCloud Functionsでslackに実行結果を投稿

処理の詳細

BigQueryのクエリスケジュールで定期的にクエリをテーブルに発行。処理終了時にPub/Subへトピックを発行するように設定

公式ページを参考に設定しました。
コンソールで実施する場合は、特段迷うことなく設定可能でした。
(前段のクエリを作成する方が不慣れで私は時間がかかりました。
JoinやらWhereの条件でとても苦戦しました。)

https://cloud.google.com/bigquery/docs/scheduling-queries?hl=ja

Pub/SubのトピックをトリガーにしたCloud Functionsでslackに実行結果を投稿

Pub/Subは作成されている前提です。
コンソールでポチポチで10分もあれば作成できました。

https://cloud.google.com/pubsub/docs/quickstart-console?hl=ja

Cloud FunctionはPub/Subをトリガーとしてコンソールから作成しました。

https://cloud.google.com/functions/docs/quickstart-go?hl=ja

関数を作成すると、Pub/Subからの通知を受ける処理が記載されたmain.goの
コードが表示されるので、そちらを修正していきます。

実際のコードは以下の通りです。
クエリ結果をそのままJSONに突っ込んでslackへ投げることができなかったので、
一度Stringに変換した上で1行に結合して1つづつ投げています。
これは纏めたりした方が絶対にquota等に引っかかりづらくなる筈なので、
将来的にそこを直したいと考えております。

// Package p contains a Pub/Sub Cloud Function.
package p

import (
	"context"
	"encoding/json"
	"log"
	"fmt"
	"net/http"
	"net/url"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/iterator"
)

var (
	IncomingUrl string = "Slackから発行されたURL"
)

type payload struct {
	Text string `json:"text"`
}

func post2slack(mes string) (err error) {
	a := payload{}
	a.Text = mes
	p, err := json.Marshal(a)

	if err != nil {
		return err
	}
	resp, err := http.PostForm(IncomingUrl, url.Values{"payload": {string(p)}})
	if err != nil {
		return err
	}
	defer resp.Body.Close()
	return nil
}

// PubSubMessage is the payload of a Pub/Sub event. Please refer to the docs for
// additional information regarding Pub/Sub events.
type PubSubMessage struct {
	Data []byte `json:"data"`
}

// HelloPubSub consumes a Pub/Sub message.
func HelloPubSub(ctx context.Context, m PubSubMessage) error {
	//log.Println(string(m.Data))
	projectID := "自分のプロジェクト"
	ctx = context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	q := client.Query(
		"SQLクエリ")
	// Location must match that of the dataset(s) referenced in the query.
	q.Location = "データセットのロケーション"
	// Run the query and print results when the query job is completed.
	job, err := q.Run(ctx)
	if err != nil {
		return err
	}
	status, err := job.Wait(ctx)
	if err != nil {
		return err
	}
	if err := status.Err(); err != nil {
		return err
	}
	log.Println("Start reading row and post")
	it, err := job.Read(ctx)
	for {
		var row []bigquery.Value
		var concat string
		err := it.Next(&row)
		if err == iterator.Done {
			break
		}
		if err != nil {
			return err
		}
		//fmt.Println(row)
		for _, v := range row {
			if v != nil {
				//fmt.Println(i, v.(string))
				concat += v.(string) + " "
			}
		}
		
		post2slack(concat)
	}
	log.Println("End reading row and post")
	return nil
}

これで問題なければslackにアプリから投稿されます。

終わりに

いつもAWSを使用していますが初めてBigQueryとCloud Functionsを触りました。
どれもサービス間を繋げやすくてとても感心しました。
エラーも全てコンソールで確認できるので試行錯誤が捗りました。

作成した関数はクエリの結果をもう少し加工してslackに投稿したい等、
改善したいことが多々出てきています。
空いている時間にまた修正していきたいと思います。

Discussion