📘
BigQueryのクエリ結果をCloudFunctionsとPub/Subを使用して定期的にslackに投稿する
はじめに
やりたかったことはタイトルの通りです。
世の先人たちが様々な方法で実装していますが、私なりに実装した方法をここに残します。
またこれまでpublicな場所で情報を発信したことがないので、その練習と併せて本記事を公開しています。
実装図
処理の流れ
簡単2ステップです。
- BigQueryのクエリスケジュールで定期的にクエリをテーブルに発行。処理終了時にPub/Subへトピックを発行するように設定
- Pub/SubのトピックをトリガーにしたCloud Functionsでslackに実行結果を投稿
処理の詳細
BigQueryのクエリスケジュールで定期的にクエリをテーブルに発行。処理終了時にPub/Subへトピックを発行するように設定
公式ページを参考に設定しました。
コンソールで実施する場合は、特段迷うことなく設定可能でした。
(前段のクエリを作成する方が不慣れで私は時間がかかりました。
JoinやらWhereの条件でとても苦戦しました。)
Pub/SubのトピックをトリガーにしたCloud Functionsでslackに実行結果を投稿
Pub/Subは作成されている前提です。
コンソールでポチポチで10分もあれば作成できました。
Cloud FunctionはPub/Subをトリガーとしてコンソールから作成しました。
関数を作成すると、Pub/Subからの通知を受ける処理が記載されたmain.goの
コードが表示されるので、そちらを修正していきます。
実際のコードは以下の通りです。
クエリ結果をそのままJSONに突っ込んでslackへ投げることができなかったので、
一度Stringに変換した上で1行に結合して1つづつ投げています。
これは纏めたりした方が絶対にquota等に引っかかりづらくなる筈なので、
将来的にそこを直したいと考えております。
// Package p contains a Pub/Sub Cloud Function.
package p
import (
"context"
"encoding/json"
"log"
"fmt"
"net/http"
"net/url"
"cloud.google.com/go/bigquery"
"google.golang.org/api/iterator"
)
var (
IncomingUrl string = "Slackから発行されたURL"
)
type payload struct {
Text string `json:"text"`
}
func post2slack(mes string) (err error) {
a := payload{}
a.Text = mes
p, err := json.Marshal(a)
if err != nil {
return err
}
resp, err := http.PostForm(IncomingUrl, url.Values{"payload": {string(p)}})
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}
// PubSubMessage is the payload of a Pub/Sub event. Please refer to the docs for
// additional information regarding Pub/Sub events.
type PubSubMessage struct {
Data []byte `json:"data"`
}
// HelloPubSub consumes a Pub/Sub message.
func HelloPubSub(ctx context.Context, m PubSubMessage) error {
//log.Println(string(m.Data))
projectID := "自分のプロジェクト"
ctx = context.Background()
client, err := bigquery.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("bigquery.NewClient: %v", err)
}
defer client.Close()
q := client.Query(
"SQLクエリ")
// Location must match that of the dataset(s) referenced in the query.
q.Location = "データセットのロケーション"
// Run the query and print results when the query job is completed.
job, err := q.Run(ctx)
if err != nil {
return err
}
status, err := job.Wait(ctx)
if err != nil {
return err
}
if err := status.Err(); err != nil {
return err
}
log.Println("Start reading row and post")
it, err := job.Read(ctx)
for {
var row []bigquery.Value
var concat string
err := it.Next(&row)
if err == iterator.Done {
break
}
if err != nil {
return err
}
//fmt.Println(row)
for _, v := range row {
if v != nil {
//fmt.Println(i, v.(string))
concat += v.(string) + " "
}
}
post2slack(concat)
}
log.Println("End reading row and post")
return nil
}
これで問題なければslackにアプリから投稿されます。
終わりに
いつもAWSを使用していますが初めてBigQueryとCloud Functionsを触りました。
どれもサービス間を繋げやすくてとても感心しました。
エラーも全てコンソールで確認できるので試行錯誤が捗りました。
作成した関数はクエリの結果をもう少し加工してslackに投稿したい等、
改善したいことが多々出てきています。
空いている時間にまた修正していきたいと思います。
Discussion