🐦

TwitterAPI v2のStreamでTwitter自動リプライするよ

2022/02/13に公開

AI近藤春菜を作ってます

現在の状況などはこちらを更新してます。
https://zenn.dev/ryo427/scraps/1ef49c70a2faee

内容をざっくり

この記事ではPythonを用いてTwitterでの自動リプライを行います。
自身が受け取った投稿に対して自動で返信ができるということです。
TwitterAPIv2で動きます。
何が良いかというとAPI申請レベルがEssential(登録のみ)で動かせるということです。
仕組みとしてはStreamで監視し、tweepyでリプライツイートをします。

準備

Twitter APIキーの取得がまだの方は以下の記事を参考にしてみてください。

https://tech-lab.sios.jp/archives/21238
https://zenn.dev/kg0r0/articles/8b1cfe654a1cee#developer-portal

設定部分

さっそくpythonで書いていきましょう。
各種キーに関してはあちこちで使うのでグローバル変数に入れています。

import requests
import os
import json
from requests_oauthlib import OAuth1Session
import time
import traceback
import tweepy
from dotenv import load_dotenv
load_dotenv()

consumer_key = os.environ['CONSUMER_KEY']
consumer_secret = os.environ['CONSUMER_SECRET']
access_token = os.environ['ACCESS_TOKEN']
access_token_secret = os.environ['ACCESSS_TOKEN_SECRET']
bearer_token = os.environ['BEARER_TOKEN']

Client = tweepy.Client(bearer_token, consumer_key, consumer_secret, access_token, access_token_secret)

dotenvを使用していますが、不要なら

#from dotenv import load_dotenv
#load_dotenv() 
consumer_key = 取得したキー

のように書き換えても構いません。
5つのキーが設定出来たら次に進みます。

認証とStream用設定関数

ここは特に書き換えなくても使えますのでそのままコピーでOKです

def bearer_oauth(r):
    r.headers["Authorization"] = f"Bearer {bearer_token}"
    r.headers["User-Agent"] = "v2FilteredStreamPython"
    return r

def get_rules():
    response = requests.get(
        "https://api.twitter.com/2/tweets/search/stream/rules", auth=bearer_oauth
    )
    if response.status_code != 200:
        raise Exception(
            "Cannot get rules (HTTP {}): {}".format(response.status_code, response.text)
        )
    print(json.dumps(response.json()))
    return response.json()

def delete_all_rules(rules):
    if rules is None or "data" not in rules:
        return None

    ids = list(map(lambda rule: rule["id"], rules["data"]))
    payload = {"delete": {"ids": ids}}
    response = requests.post(
        "https://api.twitter.com/2/tweets/search/stream/rules",
        auth=bearer_oauth,
        json=payload
    )
    if response.status_code != 200:
        raise Exception(
            "Cannot delete rules (HTTP {}): {}".format(
                response.status_code, response.text
            )
        )
    print(json.dumps(response.json()))

監視対象の設定

rulesで監視対象を設定します。
今回は自分へのリプライを監視したいので
"value":"to:アカウント名"とします。
AI春菜の場合@haruna_machineですので
"value":"to:haruna_machine"になります。
https://twitter.com/haruna_machine

def set_rules(delete):
    rules = [
        {
            "value":"to:haruna_machine" ##←ここを書き換えるよ
        }
    ]
    payload = {"add": rules}
    response = requests.post(
        "https://api.twitter.com/2/tweets/search/stream/rules",
        auth=bearer_oauth,
        json=payload,
    )
    if response.status_code != 201:
        raise Exception(
            "Cannot add rules (HTTP {}): {}".format(response.status_code, response.text)
        )
    print(json.dumps(response.json()))

Stream用関数

本記事のメインとなるStream関数です。
この関数を実行すると指定した内容での監視が開始されます。
そのままコピーするとリプライをくれた相手に自動で「リプライありがとう!」と返事をしますので好きな内容に書き換えましょう。

def get_stream(headers):
    run = 1
    while run:
        try:
            with requests.get(
                "https://api.twitter.com/2/tweets/search/stream", auth=bearer_oauth, stream=True,
            ) as response:
                print(response.status_code)
                if response.status_code != 200:
                    raise Exception(
                        "Cannot get stream (HTTP {}): {}".format(
                            response.status_code, response.text
                        )
                    )
                for response_line in response.iter_lines():
                    if response_line:
                        json_response = json.loads(response_line)
                        tweet_id = json_response["data"]["id"] #ツイートID
                        reply_text=json_response["data"]["text"] #相手の送ってきた内容
                        
			###ここで自分のリプライの内容を設定します
			text ="リプライありがとう!"
			
			print(text)
                        Client.create_tweet(
                            text=text,
                            in_reply_to_tweet_id =tweet_id)


        except ChunkedEncodingError as chunkError:
            print(traceback.format_exc())
            time.sleep(6)
            continue
        
        except ConnectionError as e:
            print(traceback.format_exc())
            run+=1
            if run <10:
                time.sleep(6)
                print("再接続します",run+"回目")
                continue
            else:
                run=0
        except Exception as e:
            # some other error occurred.. stop the loop
            print("Stopping loop because of un-handled error")
            print(traceback.format_exc())
            run = 0
	    
class ChunkedEncodingError(Exception):
    pass

main処理

そのままコピーでOKです。

def main():
    rules = get_rules()
    delete = delete_all_rules(rules)
    set = set_rules(delete)
    get_stream(set)
 
if __name__ == "__main__":
    main()

実行してみる

実行するとこんな感じです。

実行中にリプライをもらうと

実行画面はこんな感じでまた新たなリプを監視します。

以上!

ローカルやEC2上で実行してる間はリプライをし続けてくれます。
相手のツイートの内容を取得したい場合reply_textに入っていますので
print(reply_text)などで取得してください。

Discussion