SAM + Go + CloudWatch で特定ワードのツイートを定期的にDynamoDBに入れる

14 min read読了の目安(約13300字

Twitter の API を使って、特定の単語が含まれているツイートだけを DB に入れるというのをやってみたので、記録を残してみます!
定期的に処理を実行して、API を叩いて DB に入れるのを繰り返す必要があるので、SAM + Lambda + CloudWatch を使って実装します。

個人的な好みというだけで今回は Go を使用しますが、実行時間の速さやシングルバイナリであることから、lambda などのサーバーレスとは相性が良いようです。

SAM とは

SAM(Serverless Application Model)とは、AWS CloudFormation を拡張したもので、yaml で書いた設定を CLI から簡単に構成できるものです。
API gateway + Lambda を用いた サーバーレス API のサンプルを用意してくれているので、それを少し書き換えるだけで今回やりたいことは実現できそうです。

やりたいこと

  • lambda で Twitter の API を叩いて特定ワードが含まれるツイートを取得
  • そのツイートの内容、投稿者、ID、日時を DynamoDB に保存
  • 上記を時間を指定して定期実行(今回は 5 分に一回)

それではやってみましょう!

事前準備

まず、Twitter の API と DynamoDB を使う準備をしておきます。

1.Twitter API のセットアップ

Twitter の API ですが、やや準備が面倒です。

↓ から、twitter のプロジェクトを登録します。
どういった用途で API を使いたいかを英語で簡単に書かないといけないので、頑張って書きましょう笑

https://developer.twitter.com/en/portal/petition/use-case

色々設定してプロジェクトを作ると、以下のように API KeyAPI Secret KeyAccess TokenAccess Token Secret が発行できます。
api key 発行画面

2.DynamoDB でテーブル作成

次は DynamoDB でツイートを格納するテーブルを作成しましょう

テーブル名やプライマリーキー名はなんでも大丈夫ですが、今回はテーブル名はcrawled_tweet、プライマリーキー名はtweet_idで作ります。

そして、TTL(Time To Live)を設定して保存したツイートは一定期間後に自動で削除されるようにしておきましょう。
そうしないと、定期実行されて無限に格納されてしまいます。

テーブルの詳細から、「TTL の管理」を開き、TTL 属性にexpired_atを設定しましょう。
TTL設定

以上が終わったら、AWS_ACCEESS_KEYAWS_SECRET_ACCEESS_KEYAPI KeyAPI Secret KeyAccess TokenAccess Token Secretの計6つを.env ファイルに記述しておきます。

AWS_ACCEESS_KEY=...
AWS_SECRET_ACCEESS_KEY=...
CONSUMER_KEY=...
CONSUMER_SECRET=...
ACCESS_TOKEN=...
ACCESS_TOKEN_SECRET=...

SAM のセットアップ

次に SAM CLI をインストールしましょう。
mac の場合は homebrew が入っていればインストール出来ると思いますが、詳しくはこちらを参照してください。
brew が入っていれば、以下でインストール出来ると思います。

$brew tap aws/tap
$brew install aws-sam-cli

SAM CLI がインストールできたら、sam initコマンドで環境構築をしましょう。
今回は Go を使用するので、以下のようになります。

$sam init --runtime go1.x --name crawl-tweet-sample

テンプレートをどうするかとか色々聞かれますが、とりあえず全部1にしましょう。

以下のような構成が出来ていると思います。

sample-project
├── Makefile
├── README.md
├── hello-world
  ├── main.go
  └── main_test.go
└── template.yaml

main.goには実行したい関数が記述してあり、template.yamlにはデプロイの設定が書いてあります。

まずはtemplate.yamlを以下のように変えてみます。

template.yaml
AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Description: >
  crawl-tweet-project

   Sample SAM Template for crawl-tweet-project

# 渡したいパラメータ名と型を指定
Parameters:
  AwsAccessKey:
    Type: String
  AwsSecretAccessKey:
    Type: String
  ConsumerKey:
    Type: String
  ConsumerSecret:
    Type: String
  AccessToken:
    Type: String
  AccessTokenSecret:
    Type: String

Globals:
  Function:
    Timeout: 5

Resources:
  HelloWorldFunction:
    Type: AWS::Serverless::Function
    Properties:
      # 関数がある場所を指定
      CodeUri: crawl-tweet/
      Handler: crawl-tweet
      Runtime: go1.x
      Tracing: Active
      # EventsはScheduledEventに変える
      Events:
        ScheduledEvent:
          Type: Schedule
          Properties:
            Schedule: cron(0/5 * * * ? *)
      # 上のParameterでもらった値を環境変数にセット
      Environment:
        Variables:
          AWS_ACCEESS_KEY: !Ref AwsAccessKey
          AWS_SECRET_ACCEESS_KEY: !Ref AwsSecretAccessKey
          CONSUMER_KEY: !Ref ConsumerKey
          CONSUMER_SECRET: !Ref ConsumerSecret
          ACCESS_TOKEN: !Ref AccessToken
          ACCESS_TOKEN_SECRET: !Ref AccessTokenSecret

元のテンプレートだと、Events には APIGateway を使っているのですが、そこを cloudwatch の events に変えましょう。Schedule の値は cron 式で入力できて、今回は0/5 * * * ? *とすることで5分に一回 lambda 関数を実行するよう指定します。

クローン式は分 時間 日 月 曜日 年の順で指定できます。分のフィールドを0/5とすると、「0 分スタートで 5 分毎に実行してね」という意味になります。

また、AWS と TwitterAPI のキーを公開したくないので、コマンドで実行やデプロイする際に渡します。そのために Parameters を設定しておきましょう。

Makefile を作っておく

次に Makefile を作って、ビルドやローカルでの実行を簡単にしておきましょう。作らなくても実行などできるのですが、コマンドからパラメータを渡すとかなり長くなってしまうので、短縮して書いておいた方が楽ですね。

include .env

$(eval export $(shell sed -ne 's/ *#.*$//; /./ s/=.*$$// p' .env))

.PHONY: build
# ビルド
build:
	sam build

# lambdaにデプロイ
deploy:
	sam deploy --stack-name crawl-tweet-sample --parameter-overrides AwsAccessKey=${AWS_ACCEESS_KEY} AwsSecretAccessKey=${AWS_SECRET_ACCEESS_KEY} ConsumerKey=${CONSUMER_KEY} ConsumerSecret=${CONSUMER_SECRET} AccessToken=${ACCESS_TOKEN} AccessTokenSecret=${ACCESS_TOKEN_SECRET}

# デプロイしたlambdaの削除
delete:
	aws cloudformation delete-stack --stack-name crawl-tweet-sample

# ローカルで実行
invoke:
	sam build && sam local invoke --parameter-overrides AwsAccessKey=${AWS_ACCEESS_KEY} AwsSecretAccessKey=${AWS_SECRET_ACCEESS_KEY} ConsumerKey=${CONSUMER_KEY} ConsumerSecret=${CONSUMER_SECRET} AccessToken=${ACCESS_TOKEN} AccessTokenSecret=${ACCESS_TOKEN_SECRET}

main.go の実装

次は実際に lambda が実行する関数を実装します。
全体は以下のようになります。

main.go
package main

import (
	"log"
	"net/url"
	"os"
	"strconv"
	"time"

	"github.com/ChimeraCoder/anaconda"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/credentials"
	"github.com/aws/aws-sdk-go/aws/session"

	"github.com/guregu/dynamo"
)
// ①dynamoDBに入れたい情報のstructの定義
// User つぶやいたユーザ情報
type User struct {
	ID         int64  `dynamo:"id"`
	Name       string `dynamo:"name"`
	ScreenName string `dynamo:"screen_name"`
}

// Tweet 参加を募集するツイート
type Tweet struct {
	ID        string `dynamo:"tweet_id"` //パーティションキー
	FullText  string `dynamo:"full_text"`
	TweetedAt int64  `dynamo:"tweeted_at"` //dynamodbでソート出来るようにUNIX時間
	ExpiredAt int64  `dynamo:"expired_at"`
	User      User   `dynamo:"user"`
}

// NewTweet Tweetのメンバを初期化する関数
func NewTweet() Tweet {
	tweet := Tweet{}
	tweet.ID = ""
	tweet.FullText = ""
	tweet.TweetedAt = 0
	tweet.ExpiredAt = 0
	tweet.User = User{}
	return tweet
}

func crawlTweets() {
	// ②Twitter APIと AWSの認証
	const tableName = "crawled_tweet"

	creds := credentials.NewStaticCredentials(os.Getenv("AWS_ACCEESS_KEY"), os.Getenv("AWS_SECRET_ACCEESS_KEY"), "") //第3引数はtoken

	sess, _ := session.NewSession(&aws.Config{
		Credentials: creds,
		Region:      aws.String("ap-northeast-1")},
	)

	db := dynamo.New(sess)
	table := db.Table(tableName)

	anaconda.SetConsumerKey(os.Getenv("CONSUMER_KEY"))
	anaconda.SetConsumerSecret(os.Getenv("CONSUMER_SECRET"))

	api := anaconda.NewTwitterApi(os.Getenv("ACCESS_TOKEN"), os.Getenv("ACCESS_TOKEN_SECRET"))

	// 検索するオプション
	v := url.Values{}
	v.Set("count", "30")

	searchResult, _ := api.GetSearch("#Golang", v)

	// ③DynamoDBにツイートを保存
	// 文字列→日付に変換するレイアウト
	var layout = "Mon Jan 2 15:04:05 +0000 2006"

	// 期限を一日後に設定する
	expiredAt := time.Now().AddDate(0, 0, 1).Unix()
	for _, tweet := range searchResult.Statuses {

		newTweet := NewTweet()
		tweetedTime, _ := time.Parse(layout, tweet.CreatedAt)

		newTweet.ID = tweet.IdStr
		newTweet.FullText = tweet.FullText
		newTweet.TweetedAt = tweetedTime.Unix()
		newTweet.ExpiredAt = expiredAt
		newTweet.User = User{
			tweet.User.Id,
			tweet.User.Name,
			tweet.User.ScreenName,
		}

		if err := table.Put(newTweet).If("attribute_not_exists(tweet_id)").Run(); err != nil {
			log.Println(err.Error())
		} else {
			log.Println("Success!")
		}

	}

}

func main() {
	// ラムダ実行
	lambda.Start(crawlTweets)
}

順番に説明していきます。

① 構造体の定義

まず、API から取れる Tweet を格納するために構造体TweetUserを定義しています。

// User つぶやいたユーザ情報
type User struct {
	ID         string  `dynamo:"id"`
	Name       string `dynamo:"name"`
	ScreenName string `dynamo:"screen_name"`
}

// Tweet 参加を募集するツイート
type Tweet struct {
	ID        string `dynamo:"tweet_id"` //パーティションキー
	FullText  string `dynamo:"full_text"`
	TweetedAt int64  `dynamo:"tweeted_at"` //dynamodbでソート出来るようにUNIX時間
	ExpiredAt int64  `dynamo:"expired_at"`
	User      User   `dynamo:"user"`
}

// NewTweet Tweetのメンバを初期化する関数
func NewTweet() Tweet {
	tweet := Tweet{}
	tweet.ID = ""
	tweet.FullText = ""
	tweet.TweetedAt = 0
	tweet.ExpiredAt = 0
	tweet.User = User{}
	return tweet
}


この中でdynamo:"tweet_id"などのタグをメンバに定義することで、dynamoDB に保存するカラム名を指定できます。
また、初期化をする関数NewTweetを用意してます。

② 認証

そして、crawlTweet 関数の中で API を叩いて DynamoDB に入れる処理を書いています。
Go で dynamoDB を扱う SDK は guregu/dynamo を利用しています。
公式の SDKもあるのですが、ちょっと使いにくいのでこちらのほうがおすすめです。

https://github.com/guregu/dynamo

また、Go で Twitter の API を利用するライブラリもあるので、そちらを使用します。
ほんとなんでもライブラリがやってくれて、便利な世の中ですね…笑

https://github.com/ChimeraCoder/anaconda

これらを使って以下のように aws と twitterAPI の認証を行います。
twitter の認証の際に、先程発行された4つのキーを使います。

const tableName = "crawled_tweet"

// 認証
creds := credentials.NewStaticCredentials(os.Getenv("AWS_ACCEESS_KEY"), os.Getenv("AWS_SECRET_ACCEESS_KEY"), "") //第3引数はtoken

sess, _ := session.NewSession(&aws.Config{
	Credentials: creds,
	Region:      aws.String("ap-northeast-1")},
)

db := dynamo.New(sess)
table := db.Table(tableName)

anaconda.SetConsumerKey(os.Getenv("CONSUMER_KEY"))
anaconda.SetConsumerSecret(os.Getenv("CONSUMER_SECRET"))

api := anaconda.NewTwitterApi(os.Getenv("ACCESS_TOKEN"), os.Getenv("ACCESS_TOKEN_SECRET"))

この認証を行った後、以下のようにv.Setでオプションをつけて API を叩くことが出来ます。

v := url.Values{}
v.Set("count", "30")

searchResult, _ := api.GetSearch("#Golang", v)

この例だと最新30件を取得できます。他にも、ツイートした時間や言語などでも絞ることができます。
詳しくは以下の API リファレンスを参照してみてください。

https://developer.twitter.com/en/docs/twitter-api/v1/tweets/search/api-reference/get-search-tweets

オプションをつけた後は、api.GetSearch("#Golang", v)で#Golang が含まれているツイートを取得しています

③DynamoDB に入れる

API から取得した結果を、以下のように DynamoDB に入れていきます。

// 文字列→日付に変換するレイアウト
var layout = "Mon Jan 2 15:04:05 +0000 2006"

// 期限を一日後に設定する
expiredAt := time.Now().AddDate(0, 0, 1).Unix()
for _, tweet := range searchResult.Statuses {

	newTweet := NewTweet()
	tweetedTime, _ := time.Parse(layout, tweet.CreatedAt)

	newTweet.ID = tweet.IdStr
	newTweet.FullText = tweet.FullText
	newTweet.TweetedAt = tweetedTime.Unix()
	newTweet.ExpiredAt = expiredAt
	newTweet.User = User{
		tweet.User.Id,
		tweet.User.Name,
		tweet.User.ScreenName,
	}

	if err := table.Put(newTweet).If("attribute_not_exists(tweet_id)").Run(); err != nil {
		log.Println(err.Error())
	} else {
		log.Println("Success!")
	}

}

TweetedAt(ツイートした時刻)を DB に入れる際は、Unix 時間に変換しています。
日付をそのまま入れても良いのですが、DynamoDB には文字列型か数値型しか挿入できないので、そうすると日付が文字列として入ってしまいます。
なので、Unix 時間にしたほうが後々ソートや検索がしやすいと思います。

もう一つ注意なのは、api から取れる tweet の ID はtweet.IdStrを使うことです。
uint 型の tweet.Id の値も存在するのですが、こちらはツイート ID の桁数が大きすぎて、Javascript でこの値を使おうとすると勝手に下位桁が0で丸められてしまいます。(公式ドキュメントでも id_str を使うよう推奨してますね)

そして最後に、取得したツイートの ID, 内容、ツイートした時間、ユーザの ID と名前を挿入しています。"attribute_not_exists(tweet_id)"の条件をつけることで、tweet_id が存在しないときだけ挿入を行います。

ローカルで動作確認

以下のコマンドでビルド, 実行してみましょう

$ make build
$ make invoke

そして DynamoDB を見てみると、追加されているのが確認できます!

ですがよく見ると、RT と文頭についている、 リツイートされたツイートも一つのツイートとしてカウントされてしまうので、重複した内容が入ってしまっています。
これをなくしたい場合は以下のように、tweet.RetweetedStatusの値によって分岐しましょう。

if tweet.RetweetedStatus == nil {
	newTweet.ID = tweet.IdStr
	newTweet.FullText = tweet.FullText
	newTweet.TweetedAt = tweetedTime.Unix()
	newTweet.ExpiredAt = expiredAt
	newTweet.User = User{
		tweet.User.Id,
		tweet.User.Name,
		tweet.User.ScreenName,
	}

	if err := table.Put(newTweet).If("attribute_not_exists(tweet_id)").Run(); err != nil {
		log.Println(err.Error())
	} else {
		log.Println("Success!")
	}
} else {
	newTweet.ID = tweet.RetweetedStatus.IdStr
	newTweet.FullText = tweet.RetweetedStatus.FullText
	newTweet.TweetedAt = tweetedTime.Unix()
	newTweet.ExpiredAt = expiredAt
	newTweet.User = User{
		tweet.RetweetedStatus.User.Id,
		tweet.RetweetedStatus.User.Name,
		tweet.RetweetedStatus.User.ScreenName,
	}

	if err := table.Put(newTweet).Run(); err != nil {
		log.Println(err.Error())
	} else {
		log.Println("Success")
	}
}

tweet.RetweetedStatusが nil でなければリツイートされたツイートになります。その場合、tweet.RetweetedStatus.IdStr(リツイート元のツイート ID)を tweet_id として dynamoDB に入れると、リツイート元が DB に入っていれば保存されません。
一度 DB を空にして、再度実行してみると、いい感じに保存できています!

デプロイ

最後に作った関数を lambda にデプロイしましょう。
以下のコマンドでデプロイできます

$ make deploy

または、sam deploy --guidedとすれば、対話型でスタックの名前やパラメータの値をセットできます。ですが、make コマンドで一発で実行したほうが楽ですね

デプロイが成功したら、lambda のコンソールを開いてみましょう。
下のように、template.yml で書いた関数名とスタック名が入ったものが登録されていると思います。

ログを見てみると、ちゃんと 5 分ごとに実行されているのがわかります!

まとめ

SAM と TwitterAPI を使って Twitter からツイートをクローリングしてみました。
SAM を使えばめちゃめちゃ簡単に開発&デプロイができたので、ちょっと試したいときや個人でなにか作りたいときにはとても便利ですね。

今回つくったもののソースコードは以下に公開してます。

https://github.com/masamichhhhi/crawl-tweet-sample

指摘や補足などあれば教えていただけると幸いです。

参考