SAM + Go + CloudWatch で特定ワードのツイートを定期的にDynamoDBに入れる
Twitter の API を使って、特定の単語が含まれているツイートだけを DB に入れるというのをやってみたので、記録を残してみます!
定期的に処理を実行して、API を叩いて DB に入れるのを繰り返す必要があるので、SAM + Lambda + CloudWatch を使って実装します。
個人的な好みというだけで今回は Go を使用しますが、実行時間の速さやシングルバイナリであることから、lambda などのサーバーレスとは相性が良いようです。
SAM とは
SAM(Serverless Application Model)とは、AWS CloudFormation を拡張したもので、yaml で書いた設定を CLI から簡単に構成できるものです。
API gateway + Lambda を用いた サーバーレス API のサンプルを用意してくれているので、それを少し書き換えるだけで今回やりたいことは実現できそうです。
やりたいこと
- lambda で Twitter の API を叩いて特定ワードが含まれるツイートを取得
- そのツイートの内容、投稿者、ID、日時を DynamoDB に保存
- 上記を時間を指定して定期実行(今回は 5 分に一回)
それではやってみましょう!
事前準備
まず、Twitter の API と DynamoDB を使う準備をしておきます。
1.Twitter API のセットアップ
Twitter の API ですが、やや準備が面倒です。
↓ から、twitter のプロジェクトを登録します。
どういった用途で API を使いたいかを英語で簡単に書かないといけないので、頑張って書きましょう笑
色々設定してプロジェクトを作ると、以下のように API Key
、API Secret Key
、Access Token
、Access Token Secret
が発行できます。
2.DynamoDB でテーブル作成
次は DynamoDB でツイートを格納するテーブルを作成しましょう
テーブル名やプライマリーキー名はなんでも大丈夫ですが、今回はテーブル名はcrawled_tweet
、プライマリーキー名はtweet_id
で作ります。
そして、TTL(Time To Live)を設定して保存したツイートは一定期間後に自動で削除されるようにしておきましょう。
そうしないと、定期実行されて無限に格納されてしまいます。
テーブルの詳細から、「TTL の管理」を開き、TTL 属性にexpired_at
を設定しましょう。
以上が終わったら、AWS_ACCEESS_KEY
、AWS_SECRET_ACCEESS_KEY
、 API Key
、API Secret Key
、Access Token
、Access Token Secret
の計6つを.env ファイルに記述しておきます。
AWS_ACCEESS_KEY=...
AWS_SECRET_ACCEESS_KEY=...
CONSUMER_KEY=...
CONSUMER_SECRET=...
ACCESS_TOKEN=...
ACCESS_TOKEN_SECRET=...
SAM のセットアップ
次に SAM CLI をインストールしましょう。
mac の場合は homebrew が入っていればインストール出来ると思いますが、詳しくはこちらを参照してください。
brew が入っていれば、以下でインストール出来ると思います。
$brew tap aws/tap
$brew install aws-sam-cli
SAM CLI がインストールできたら、sam init
コマンドで環境構築をしましょう。
今回は Go を使用するので、以下のようになります。
$sam init --runtime go1.x --name crawl-tweet-sample
テンプレートをどうするかとか色々聞かれますが、とりあえず全部1にしましょう。
以下のような構成が出来ていると思います。
sample-project
├── Makefile
├── README.md
├── hello-world
├── main.go
└── main_test.go
└── template.yaml
main.go
には実行したい関数が記述してあり、template.yaml
にはデプロイの設定が書いてあります。
まずはtemplate.yaml
を以下のように変えてみます。
AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Description: >
crawl-tweet-project
Sample SAM Template for crawl-tweet-project
# 渡したいパラメータ名と型を指定
Parameters:
AwsAccessKey:
Type: String
AwsSecretAccessKey:
Type: String
ConsumerKey:
Type: String
ConsumerSecret:
Type: String
AccessToken:
Type: String
AccessTokenSecret:
Type: String
Globals:
Function:
Timeout: 5
Resources:
HelloWorldFunction:
Type: AWS::Serverless::Function
Properties:
# 関数がある場所を指定
CodeUri: crawl-tweet/
Handler: crawl-tweet
Runtime: go1.x
Tracing: Active
# EventsはScheduledEventに変える
Events:
ScheduledEvent:
Type: Schedule
Properties:
Schedule: cron(0/5 * * * ? *)
# 上のParameterでもらった値を環境変数にセット
Environment:
Variables:
AWS_ACCEESS_KEY: !Ref AwsAccessKey
AWS_SECRET_ACCEESS_KEY: !Ref AwsSecretAccessKey
CONSUMER_KEY: !Ref ConsumerKey
CONSUMER_SECRET: !Ref ConsumerSecret
ACCESS_TOKEN: !Ref AccessToken
ACCESS_TOKEN_SECRET: !Ref AccessTokenSecret
元のテンプレートだと、Events には APIGateway を使っているのですが、そこを cloudwatch の events に変えましょう。Schedule の値は cron 式で入力できて、今回は0/5 * * * ? *
とすることで5分に一回 lambda 関数を実行するよう指定します。
クローン式は分 時間 日 月 曜日 年
の順で指定できます。分のフィールドを0/5
とすると、「0 分スタートで 5 分毎に実行してね」という意味になります。
また、AWS と TwitterAPI のキーを公開したくないので、コマンドで実行やデプロイする際に渡します。そのために Parameters を設定しておきましょう。
Makefile を作っておく
次に Makefile を作って、ビルドやローカルでの実行を簡単にしておきましょう。作らなくても実行などできるのですが、コマンドからパラメータを渡すとかなり長くなってしまうので、短縮して書いておいた方が楽ですね。
include .env
$(eval export $(shell sed -ne 's/ *#.*$//; /./ s/=.*$$// p' .env))
.PHONY: build
# ビルド
build:
sam build
# lambdaにデプロイ
deploy:
sam deploy --stack-name crawl-tweet-sample --parameter-overrides AwsAccessKey=${AWS_ACCEESS_KEY} AwsSecretAccessKey=${AWS_SECRET_ACCEESS_KEY} ConsumerKey=${CONSUMER_KEY} ConsumerSecret=${CONSUMER_SECRET} AccessToken=${ACCESS_TOKEN} AccessTokenSecret=${ACCESS_TOKEN_SECRET}
# デプロイしたlambdaの削除
delete:
aws cloudformation delete-stack --stack-name crawl-tweet-sample
# ローカルで実行
invoke:
sam build && sam local invoke --parameter-overrides AwsAccessKey=${AWS_ACCEESS_KEY} AwsSecretAccessKey=${AWS_SECRET_ACCEESS_KEY} ConsumerKey=${CONSUMER_KEY} ConsumerSecret=${CONSUMER_SECRET} AccessToken=${ACCESS_TOKEN} AccessTokenSecret=${ACCESS_TOKEN_SECRET}
main.go の実装
次は実際に lambda が実行する関数を実装します。
全体は以下のようになります。
package main
import (
"log"
"net/url"
"os"
"strconv"
"time"
"github.com/ChimeraCoder/anaconda"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/guregu/dynamo"
)
// ①dynamoDBに入れたい情報のstructの定義
// User つぶやいたユーザ情報
type User struct {
ID int64 `dynamo:"id"`
Name string `dynamo:"name"`
ScreenName string `dynamo:"screen_name"`
}
// Tweet 参加を募集するツイート
type Tweet struct {
ID string `dynamo:"tweet_id"` //パーティションキー
FullText string `dynamo:"full_text"`
TweetedAt int64 `dynamo:"tweeted_at"` //dynamodbでソート出来るようにUNIX時間
ExpiredAt int64 `dynamo:"expired_at"`
User User `dynamo:"user"`
}
// NewTweet Tweetのメンバを初期化する関数
func NewTweet() Tweet {
tweet := Tweet{}
tweet.ID = ""
tweet.FullText = ""
tweet.TweetedAt = 0
tweet.ExpiredAt = 0
tweet.User = User{}
return tweet
}
func crawlTweets() {
// ②Twitter APIと AWSの認証
const tableName = "crawled_tweet"
creds := credentials.NewStaticCredentials(os.Getenv("AWS_ACCEESS_KEY"), os.Getenv("AWS_SECRET_ACCEESS_KEY"), "") //第3引数はtoken
sess, _ := session.NewSession(&aws.Config{
Credentials: creds,
Region: aws.String("ap-northeast-1")},
)
db := dynamo.New(sess)
table := db.Table(tableName)
anaconda.SetConsumerKey(os.Getenv("CONSUMER_KEY"))
anaconda.SetConsumerSecret(os.Getenv("CONSUMER_SECRET"))
api := anaconda.NewTwitterApi(os.Getenv("ACCESS_TOKEN"), os.Getenv("ACCESS_TOKEN_SECRET"))
// 検索するオプション
v := url.Values{}
v.Set("count", "30")
searchResult, _ := api.GetSearch("#Golang", v)
// ③DynamoDBにツイートを保存
// 文字列→日付に変換するレイアウト
var layout = "Mon Jan 2 15:04:05 +0000 2006"
// 期限を一日後に設定する
expiredAt := time.Now().AddDate(0, 0, 1).Unix()
for _, tweet := range searchResult.Statuses {
newTweet := NewTweet()
tweetedTime, _ := time.Parse(layout, tweet.CreatedAt)
newTweet.ID = tweet.IdStr
newTweet.FullText = tweet.FullText
newTweet.TweetedAt = tweetedTime.Unix()
newTweet.ExpiredAt = expiredAt
newTweet.User = User{
tweet.User.Id,
tweet.User.Name,
tweet.User.ScreenName,
}
if err := table.Put(newTweet).If("attribute_not_exists(tweet_id)").Run(); err != nil {
log.Println(err.Error())
} else {
log.Println("Success!")
}
}
}
func main() {
// ラムダ実行
lambda.Start(crawlTweets)
}
順番に説明していきます。
① 構造体の定義
まず、API から取れる Tweet を格納するために構造体Tweet
とUser
を定義しています。
// User つぶやいたユーザ情報
type User struct {
ID string `dynamo:"id"`
Name string `dynamo:"name"`
ScreenName string `dynamo:"screen_name"`
}
// Tweet 参加を募集するツイート
type Tweet struct {
ID string `dynamo:"tweet_id"` //パーティションキー
FullText string `dynamo:"full_text"`
TweetedAt int64 `dynamo:"tweeted_at"` //dynamodbでソート出来るようにUNIX時間
ExpiredAt int64 `dynamo:"expired_at"`
User User `dynamo:"user"`
}
// NewTweet Tweetのメンバを初期化する関数
func NewTweet() Tweet {
tweet := Tweet{}
tweet.ID = ""
tweet.FullText = ""
tweet.TweetedAt = 0
tweet.ExpiredAt = 0
tweet.User = User{}
return tweet
}
この中でdynamo:"tweet_id"
などのタグをメンバに定義することで、dynamoDB に保存するカラム名を指定できます。
また、初期化をする関数NewTweet
を用意してます。
② 認証
そして、crawlTweet 関数の中で API を叩いて DynamoDB に入れる処理を書いています。
Go で dynamoDB を扱う SDK は guregu/dynamo を利用しています。
公式の SDKもあるのですが、ちょっと使いにくいのでこちらのほうがおすすめです。
また、Go で Twitter の API を利用するライブラリもあるので、そちらを使用します。
ほんとなんでもライブラリがやってくれて、便利な世の中ですね…笑
これらを使って以下のように aws と twitterAPI の認証を行います。
twitter の認証の際に、先程発行された4つのキーを使います。
const tableName = "crawled_tweet"
// 認証
creds := credentials.NewStaticCredentials(os.Getenv("AWS_ACCEESS_KEY"), os.Getenv("AWS_SECRET_ACCEESS_KEY"), "") //第3引数はtoken
sess, _ := session.NewSession(&aws.Config{
Credentials: creds,
Region: aws.String("ap-northeast-1")},
)
db := dynamo.New(sess)
table := db.Table(tableName)
anaconda.SetConsumerKey(os.Getenv("CONSUMER_KEY"))
anaconda.SetConsumerSecret(os.Getenv("CONSUMER_SECRET"))
api := anaconda.NewTwitterApi(os.Getenv("ACCESS_TOKEN"), os.Getenv("ACCESS_TOKEN_SECRET"))
この認証を行った後、以下のようにv.Set
でオプションをつけて API を叩くことが出来ます。
v := url.Values{}
v.Set("count", "30")
searchResult, _ := api.GetSearch("#Golang", v)
この例だと最新30件を取得できます。他にも、ツイートした時間や言語などでも絞ることができます。
詳しくは以下の API リファレンスを参照してみてください。
オプションをつけた後は、api.GetSearch("#Golang", v)
で#Golang が含まれているツイートを取得しています
③DynamoDB に入れる
API から取得した結果を、以下のように DynamoDB に入れていきます。
// 文字列→日付に変換するレイアウト
var layout = "Mon Jan 2 15:04:05 +0000 2006"
// 期限を一日後に設定する
expiredAt := time.Now().AddDate(0, 0, 1).Unix()
for _, tweet := range searchResult.Statuses {
newTweet := NewTweet()
tweetedTime, _ := time.Parse(layout, tweet.CreatedAt)
newTweet.ID = tweet.IdStr
newTweet.FullText = tweet.FullText
newTweet.TweetedAt = tweetedTime.Unix()
newTweet.ExpiredAt = expiredAt
newTweet.User = User{
tweet.User.Id,
tweet.User.Name,
tweet.User.ScreenName,
}
if err := table.Put(newTweet).If("attribute_not_exists(tweet_id)").Run(); err != nil {
log.Println(err.Error())
} else {
log.Println("Success!")
}
}
TweetedAt(ツイートした時刻)を DB に入れる際は、Unix 時間に変換しています。
日付をそのまま入れても良いのですが、DynamoDB には文字列型か数値型しか挿入できないので、そうすると日付が文字列として入ってしまいます。
なので、Unix 時間にしたほうが後々ソートや検索がしやすいと思います。
もう一つ注意なのは、api から取れる tweet の ID はtweet.IdStr
を使うことです。
uint 型の tweet.Id の値も存在するのですが、こちらはツイート ID の桁数が大きすぎて、Javascript でこの値を使おうとすると勝手に下位桁が0で丸められてしまいます。(公式ドキュメントでも id_str を使うよう推奨してますね)
そして最後に、取得したツイートの ID, 内容、ツイートした時間、ユーザの ID と名前を挿入しています。"attribute_not_exists(tweet_id)"
の条件をつけることで、tweet_id が存在しないときだけ挿入を行います。
ローカルで動作確認
以下のコマンドでビルド, 実行してみましょう
$ make build
$ make invoke
そして DynamoDB を見てみると、追加されているのが確認できます!
ですがよく見ると、RT と文頭についている、 リツイートされたツイートも一つのツイートとしてカウントされてしまうので、重複した内容が入ってしまっています。
これをなくしたい場合は以下のように、tweet.RetweetedStatus
の値によって分岐しましょう。
if tweet.RetweetedStatus == nil {
newTweet.ID = tweet.IdStr
newTweet.FullText = tweet.FullText
newTweet.TweetedAt = tweetedTime.Unix()
newTweet.ExpiredAt = expiredAt
newTweet.User = User{
tweet.User.Id,
tweet.User.Name,
tweet.User.ScreenName,
}
if err := table.Put(newTweet).If("attribute_not_exists(tweet_id)").Run(); err != nil {
log.Println(err.Error())
} else {
log.Println("Success!")
}
} else {
newTweet.ID = tweet.RetweetedStatus.IdStr
newTweet.FullText = tweet.RetweetedStatus.FullText
newTweet.TweetedAt = tweetedTime.Unix()
newTweet.ExpiredAt = expiredAt
newTweet.User = User{
tweet.RetweetedStatus.User.Id,
tweet.RetweetedStatus.User.Name,
tweet.RetweetedStatus.User.ScreenName,
}
if err := table.Put(newTweet).Run(); err != nil {
log.Println(err.Error())
} else {
log.Println("Success")
}
}
tweet.RetweetedStatus
が nil でなければリツイートされたツイートになります。その場合、tweet.RetweetedStatus.IdStr
(リツイート元のツイート ID)を tweet_id として dynamoDB に入れると、リツイート元が DB に入っていれば保存されません。
一度 DB を空にして、再度実行してみると、いい感じに保存できています!
デプロイ
最後に作った関数を lambda にデプロイしましょう。
以下のコマンドでデプロイできます
$ make deploy
または、sam deploy --guided
とすれば、対話型でスタックの名前やパラメータの値をセットできます。ですが、make コマンドで一発で実行したほうが楽ですね
デプロイが成功したら、lambda のコンソールを開いてみましょう。
下のように、template.yml で書いた関数名とスタック名が入ったものが登録されていると思います。
ログを見てみると、ちゃんと 5 分ごとに実行されているのがわかります!
まとめ
SAM と TwitterAPI を使って Twitter からツイートをクローリングしてみました。
SAM を使えばめちゃめちゃ簡単に開発&デプロイができたので、ちょっと試したいときや個人でなにか作りたいときにはとても便利ですね。
今回つくったもののソースコードは以下に公開してます。
指摘や補足などあれば教えていただけると幸いです。
Discussion