scrapyでプレスのPDFファイルを収集してLLMで要約する
PythonのスクレイピングフレームワークScrapyを利用して、プレスなどのPDFファイルを収集、LLM(OpenAI API)でサマリを作成し、結果をデータベースに格納するという一連の流れを実現してみました。Scrapy、OpenAI APIともごく基本的な使い方しかしていませんが、あっという間に自動収集するプログラムが作成できましたので紹介します。
Scrapyで収集したPDFファイルの内容をLLMに読み込ませてサマリーを作成
今回やりたいことは以下のとおりです。
- Scrapyで特定のWebサイトの最新情報(今回はプレスリリースのタイトル、日付、PDFファイルなど)を取得する
- 取得したPDFファイルを開いて内容を読み込む
- LLM(OpenAI API)を利用して300文字程度のサマリーを作成
- 1.で取得したタイトル、日付、URLなどとともにサマリーをデータベース(MySQL)に保存
情報収集目的でScrapyを利用する事例はよくあると思いますが、今回はLLMにサマリーを作成してもらうところまで、Scrapyの仕組みで実現してみました。PDFファイルを開いて読むのは面倒なので、短いサマリーだけを見て、興味深そうなものだけを読むことができると、情報収集の速度も上がります。
今回の環境は以下のとおりです。
- Windows 10
- Python 3.11.6
- Scrapy 2.11.0
- PyMuPDF 1.23.16
今回は収集対象がPDFファイルなので、その中身のテキストを取得するのにPyMuPDF
を利用します。Windows10上で開発しましたが、丸ごとUbuntu 22.04に持っていってもそのまま動作しました。
なお、今回は鉄道関係の情報収集を目的として、鉄道会社のプレスリリースの取得を題材としています。このあたりは、各自、興味のある分野に置き換えて読んでいただければと思います。
Scrapyの環境構築
Scrapyをインストールしたあと、プロジェクトを作成します。ここではtrainnews
というプロジェクトを作成しています。
$ pip install scrapy
$ scrapy startproject trainnews
配下にtrainnews
ディレクトリが作成され、以下のようなファイルツリーが生成されます。Scrapyの開発では、主にspider
とpipeline.py
をいじっていきます。
D:\DOCUMENTS\WORK\TRAINNEWS
│ scrapy.cfg
│
├─download
├─log
│ scrapy.log
│
└─trainnews
│ items.py
│ middlewares.py
│ pipelines.py
│ settings.py
│ __init__.py
│
└─spiders
__init__.py
設定ファイルsettings.py
を開き、以下の行を修正または追加します。
# 同時に実施するスクレイピングの数
CONCURRENT_REQUESTS = 1
# ダウンロード間隔
DOWNLOAD_DELAY = 3
# スクレイピング結果のエンコード
FEED_EXPORT_ENCODING = "utf-8"
CONCURRENT_REQUESTS
とDOWNLOAD_DELAY
は、スクレイピングする先のWebサイトに過度の負荷を生じさせないようにするための設定です。また、FEED_EXPORT_ENCODING
は、スクレイピング結果をJSONファイルとして出力するときのエンコードを設定します。日本語のWebサイトを対象とする場合には設定しておきましょう。
Scrapyについて詳しくない方は、「Scrapyチュートリアル」がわかりやすいです。
Itemの作成
Item
は、スクレイピングで取得したデータを格納しておくためのクラスです。データを取得したあとの処理を行うときも、このItem
を利用します。
プロジェクトを作成した時に自動生成されているitems.py
に追記していきます。今回は、以下のようにしました。
class TrainnewsItem(scrapy.Item):
# define the fields for your item here like:
title = scrapy.Field() # プレスのタイトル
date = scrapy.Field() # 日付
company = scrapy.Field() # 会社名
category = scrapy.Field() # カテゴリ
file_urls = scrapy.Field() # PDFファイルのURL
files = scrapy.Field() # ファイルダウンロード時に利用
file_paths = scrapy.Field() # 同上
summary = scrapy.Field() # サマリ
このように、<フィールド名> = scrapy.Field()
という形で追加していきます。
Spiderの作成
Scrapyでは、実際にWebサイトをクロールするSpider
というクラスを作成していきます。Spider
クラスは、Webサイトを読み込み、HTMLファイルをパースし、必要な情報を抽出する役目を担います。
scrapy genspider
コマンドで、Spiderのひな型を作成できます。第1引数にSpiderの名前を、第2引数にスクレイピングするドメイン名を与えてコマンドを実行します。ここでは、JR北海道のWebサイトを設定してみます。
$ scrapy genspider jrhokkaido jrhokkaido.co.jp
spiders
ディレクトリの下に、jrhokkaido.py
が作成されていますので、これをベースに、HTMLのパース処理と、パースして取得したデータをItems
に追加する処理を書いていきます。
class JrhokkaidoSpider(scrapy.Spider):
name = "jrhokkaido"
allowed_domains = ["jrhokkaido.co.jp"]
start_urls = ["https://www.jrhokkaido.co.jp/CM/Info/press/presstop.html"]
def parse(self, response):
for fragment in response.css('ul.press-page-list li').getall()[:10]:
fragment_response = scrapy.http.HtmlResponse(
url="",
body=fragment,
encoding='utf-8')
item = TrainnewsItem()
# 日付
date = fragment_response.css('time::text').get()
dt = datetime.datetime.strptime(date, '%Y.%m.%d')
item['date'] = dt.strftime('%Y-%m-%d')
# タイトル
title_texts = fragment_response.css('div a span::text').getall()
item['title'] = ' '.join(title_texts)
# URL
url = fragment_response.css('div a::attr(href)').get()
item['file_urls'] = [response.urljoin(url)]
item['company'] = "JR北海道"
yield item
parse()
関数に処理を書いていきますが、引数のresponse
に取得したHTMLが格納されています。response.css(<CSSセレクタ>)
またはresponse.path(<XPath)
で、指定したセレクタやXPathのノード一覧を取得します。
その後、取得したHTMLノードからテキストなど必要なデータを取得し、適切な型や形式に変換してからItem
に格納していきます。
最後に yield Item
でデータを抽出します。このあとのデータ処理のPipeline
へデータを引き渡す場合も同様です。
Pipelineで後処理を記述
Scrapyでは、SpiderでWebサイトから取得したデータを後処理する仕組みとしてPipelineが備わっています。Pipelineはその名のとおり、定義した処理をパイプライン的につなげることができる仕組みです。
今回は、このPipelineで、PDFファイルのダウンロード、読み取り(解析)、LLMによるサマリー作成を実現します。
Pipelineの処理は、pipelines.py
にClassとして記述していきます。各クラスのprocess_item()
メソッドで処理を実施し、最後にitem
を返すと、次のPipelineに引き渡されます。
settings.pyにITEM_PIPELINESを設定
Pipelineを利用するには、pipelines.py
で定義するPipelineクラスを、settings.py
のITEM_PIPELINES
に設定しておきます。
ITEM_PIPELINES = {
"trainnews.pipelines.CheckDBPipeline": 100,
"trainnews.pipelines.PDFDownloadPipeline": 200,
"trainnews.pipelines.NewsSummaryPipeline": 300,
"trainnews.pipelines.InsertDBPipeline": 400,
"trainnews.pipelines.DeletePDFFilePipeline": 500,
}
このように設定すると、数値の小さい順にPipelineが実行されます。
今回設定するPipeline処理の流れは、以下のようになります。
以下では、各処理を実現するPipelineについて詳しく見ていきます。
データベース関連
まずは、Webサイトから取得したデータをデータベースに保存するための処理です。具体的には、以下の2つの処理が必要となりますので、それぞれ別々のPipelineのクラスを定義します。
-
CheckDBPipeline
: クローラが取得したデータがデータベースに登録されているかをチェックする -
InsertDBPipeline
: 実際にデータをデータベースに追加(INSERT)する
Pipelineは処理毎にクラスを定義するのですが、データベースの実体は一つです。CheckDBPipeline
とInsertDBPipeline
でそれぞれデータベースに接続→処理→切断をするのは効率が悪いので、少し工夫します。
具体的には、これらクラスの基底クラスとなるDatabaseBasePipeline
を定義し、その中でデータベースへの接続や切断を実施します。ScrapyのPipelineクラスには、Spiderの開始時や終了時、Pipelineクラスがインスタンス化されるときに呼び出される特別なメソッドがありますので、これを活用します。
class DatabaseBasePipeline:
"""
Database Pipelineの基底クラス
"""
def __init__(self, db_param):
self.db_param = db_param
self.conn = None
@classmethod
def from_crawler(cls, crawler):
DEBUG = os.getenv('DEBUG', 'False') == 'True'
# .envファイルからDB接続のパラメータを取得
# MySQL parameters
db_param = {
'name': os.environ['DATABASE_NAME'],
'host': os.environ['DATABASE_HOST'],
'user': os.environ['DATABASE_USER'],
'port': int(os.environ['DATABASE_PORT']),
'password': os.environ['DATABASE_PASSWORD'],
}
return cls(db_param=db_param)
def open_spider(self, spider):
# MySQLに接続
self.conn = pymysql.connect(
host=self.db_param['host'],
port=self.db_param['port'],
user=self.db_param['user'],
password=self.db_param['password'],
db=self.db_param['name'],
)
def close_spider(self, spider):
self.conn.close()
from_crawler()
メソッドはクラスメソッドとして定義すると、Pipelineクラスがインスタンス化されるときに呼ばれます。環境変数ファイル.env
からデータベース接続に必要な情報を読み出して、クラス変数self.db_param
として保存し、最後にreturn cls(db_param=db_param)
としてインスタンスを作成して返します。
open_spider()
メソッドは、Spiderの開始時に呼び出されます。ここで、from_crawler()
で読みだしておいたdb_param
を利用してデータベースに接続します。データベースへのconnectionをself.conn
として保存しておきます。
close_spider()
メソッドは、Spiderの終了時に呼び出されますので、データベースへの接続を切断します。
基底クラスでここまでの処理を記述しておけば、あとの処理は淡々と書くだけです。
すでにデータベースにデータが格納されているかを確認するCheckDBPipeline
は以下のようになります。
class CheckDBPipeline(DatabaseBasePipeline):
"""
DBに記事の情報が保存されているかをチェックする
すでに保存されていればItemをDropする
"""
def process_item(self, item, spider):
# DBにすでにデータが存在するかを確認する
conn = self.conn
cur = conn.cursor()
cur.execute('SELECT `id` FROM `train_news` WHERE `title` = %s;',(item['title'], ) )
if cur.fetchone() != None:
# すでにDBに存在する場合
raise DropItem(f"Item title = {item['title']} already exists.")
# DBに存在しない場合は次のPipelineへ引き渡す
return item
Pipelineの処理を実行するのはprocess_item()
メソッドです。基底クラスのself.conn
で設定したデータベースへのコネクションを取得してデータベースを操作します。
CheckDBPipeline.process_item()
では、すでにWebサイトから取得されたデータであればデータベースにレコードが存在するはずなので、それをチェックしています。すでにデータベースに存在する場合には、raise DropItem()
でItem
をドロップします。ドロップすると、このあとの処理は実施されません。そうでなければ、return Item
でItem
を次のPipelineへ引き渡します。
class InsertDBPipeline(DatabaseBasePipeline):
"""
Itemをデータベースに保存するPipeline
"""
def process_item(self, item, spider):
# DBに書き込む
conn = self.conn
cur = conn.cursor()
sql = "INSERT INTO `train_news` (`title`, `date`, `company`, `category`, `summary`, `url`) VALUES (%s, %s, %s, %s, %s, %s)"
data = (
item['title'],
item['date'],
item['company'],
item['category'],
item['summary'],
item['file_urls'][0]
)
cur.execute(sql, data)
conn.commit()
# logging
logging.info('Insert record for %s.', item['title'])
return item
InsertDBPipeline
は、Item
の内容をデータベースに書き込むPipelineです。CheckDBPipeline
との間に、PDFファイルを読み込む処理と、PDFファイルのサマリーを作成する処理が入っているので、クラスを分けています。ただ、CheckDBPipeline
と同じ基底クラスを継承することで、self.conn
を利用してデータベースにアクセスすることができます。
PDFファイルのダウンロード
CheckDBPipeline
のあと、データベースにまだデータが登録されていない場合には、PDFファイルをダウンロードし、内容を解析して、LLMでサマリを作成します。
PDFをダウンロードするPDFDownloadPipeline
は、Scrapyで用意されているFilesPipeline
を継承して作成します。
class PDFDownloadPipeline(FilesPipeline):
"""
PDFファイルをダウンロードするPipeline
"""
def file_path(self, request, item, response=None, info=None):
file_path = os.path.basename(urlparse(request.url).path)
return file_path
def item_completed(self, results, item, info):
# ファイルストアのディレクトリを取得
settings = get_project_settings()
files_store = settings.get('FILES_STORE')
# PDFファイルがダウンロードされたかを調べる
# PDFファイルがない場合はItemをDropする
file_paths = [os.path.join(files_store, x['path']) for ok, x in results if ok]
if not file_paths:
raise DropItem("Item contains no files")
# itemにファイルのパスを追加
item['file_paths'] = file_paths
return item
FilesPipeline
を利用するには、Item
に以下の属性を設定しておきます。
-
file_urls
: ダウンロードするファイルのURLを設定(リスト形式で複数設定可能) -
files
:FilesPipeline
がダウンロードした結果を保存する属性
これだけ設定しておけば、特に処理を記述しなくてもファイルがダウンロードされます。が、ファイル名を指定したり、ダウンロード結果によってItemの処理を変更したい場合には、上記のコードのようにFilesPipeline
を継承したクラスを作成して、各処理をオーバーライドします。
まず、file_path()
メソッドをオーバーライドして、ファイルのパス名を設定しています。ここでは、PDFファイルのURLからファイルのパスを作成しています。
次に、item_completed()
メソッドをオーバーライドして、ファイルがダウンロードされたあとの処理を記述します。ここでは、ファイルのパスをItem
のfile_paths
属性にリストとして格納します。ファイルをダウンロードするディレクトリは、あらかじめsettings.py
のFILES_STORE
に設定しておきます。もし一つもファイルがダウンロードできていなければ、これ以上の処理ができないので、Item
をドロップします。
FilesPipeline
の詳しい使用方法については、ScrapyのWebサイトをご確認ください。
PDFファイルの読み込みとサマリの作成
次に、ダウンロードしたPDFファイルを読み込んでテキストを抽出し、LLMに渡してサマリを作成します。
class NewsSummaryPipeline:
"""
PDFファイルを読み込み、記事のサマリを作成するPipeline
"""
def process_item(self, item, spider):
# check files filed
if 'file_paths' not in item:
raise DropItem("Item does not have file_paths field.")
# extract text from pdf file
file_path = item['file_paths'][0]
if not file_path:
raise DropItem("Item file not found.")
texts = pdf_loader(file_path)
# create news summary with OpenAI API
result = create_summary(texts[0])
item['summary'] = result['summary']
item['category'] = result['category']
# logging
logging.info('Created summary for %s with OpenAI API.', item['title'])
return item
PDFDownloadPipeline
でダウンロードしたPDFファイルのパス名をItem['file_paths']
に格納したので、それをもとにPDFファイルを読み込みます。今回はダウンロードされるPDFファイルは一つのみなので、item['file_paths'][0]
とします。
PDFファイルを読み込んでテキストを抽出するpdf_loader()
は以下のとおりです。
import fitz
import re
import unicodedata
'''
テキストのクリーニング
改行を削除、全角英数字文字を半角に変換
'''
def clean_and_convert(text):
# 2つ以上連続する空白を1つに変換
single_space_text = re.sub(r'\s{2,}', ' ', text)
# 改行のみを削除
no_newlines_text = re.sub(r'\n', '', single_space_text)
# 全角の英数字を半角に変換
converted_text = unicodedata.normalize('NFKC', no_newlines_text)
return converted_text
'''
PDFファイルからテキストを抽出
- ページ単位に抽出したテキストの配列を返す
'''
def pdf_loader(pdf_filename: str) -> list[str]:
# open file
doc = fitz.open(pdf_filename)
# get page_count
page_count = doc.page_count
# get texts
texts = []
for page in range(page_count):
text = doc.get_page_text(page)
texts.append(clean_and_convert(text))
return texts
関数pdf_loader()
では、PyMuPDF
を用いてPDFファイルを読み込み、get_page_text()
メソッドでテキストのみを抽出しています。このあと、抽出したテキストをLLMに送ってサマリを作成するため、clean_and_convert()
で最低限のクリーニング処理を実施します。
次に、PDFファイルから抽出したテキストをLLMに渡して、サマリを作成します。具体的な処理は、関数create_summary()
で以下のように記述しています。
import json
import logging
from retry import retry
from openai import OpenAI
GPT_MODEL = 'gpt-3.5-turbo-1106'
system_prompt = '''
あなたはユーザが提示するニュースリリースの文章を要約して、ニュース記事風に回答するチャットボットです。
summary(ニュース記事風の要約)は200~300文字程度で回答してください。
companyにはニュースリリースを発行した会社名を、categoryには記事のカテゴリを以下の分類から回答してください。
分類: お知らせ きっぷ 観光列車 ダイヤ改正 旅行商品 臨時列車 イベント・キャンペーン 安全 経営 その他
全てJSON形式で、日本語で回答してください。
'''
assistant_prompt = '''
{"summary": "",
"company": "",
"category": ""}
'''
@retry(delay=2, backoff=4, max_delay=60, logger=logging)
def openai_api(text: str):
"""
OpenAI API
"""
client = OpenAI(
timeout=60.0,
max_retries=3,
)
res = client.chat.completions.create(
model=GPT_MODEL,
messages=[
{ 'role': 'system', 'content': system_prompt },
{ 'role': 'assistant', 'content': assistant_prompt },
{ 'role': 'user', 'content': text },
],
response_format={ 'type': 'json_object' },
)
return res
def create_summary(text: str) -> dict:
res = openai_api(text)
return json.loads(res.choices[0].message.content)
PDFから抽出したテキストのうち1ページ目をプロンプトに挿入して、OpenAIのAPIを呼び出しているだけです。response_format={ 'type': 'json_object' }
でJSONモードを指定して、回答をJSON形式でもらうようにしています。JSONの形式はプロンプトに含めています。
ここでは、サマリだけでなく、記事のカテゴリもLLMに選択させています。カテゴリの選択肢をプロンプトに示して、その中から選択させるようにしていますが、おおむね期待どおりに回答してくれます。
このあと、前述のデータベース関連のところで紹介したInsertDBPipeline
で、サマリやカテゴリなども含めて、データをデータベースに書き込みます。
PDFファイルの削除
最後に、ダウンロードしたPDFファイルを削除します。今回は、PDFファイルそのものは保存しないことにしているので、不要なファイルが溜まらないように、Pipelineの最後で削除する処理を入れています。
class DeletePDFFilePipeline:
"""
PDFファイルを削除するPipeline
"""
def process_item(self, item, spider):
for file_path in item['file_paths']:
if os.path.exists(file_path):
os.remove(file_path)
return item
ダウンロードしたファイルのパス名はItem['file_paths']
にリスト形式で保存されているので、それを読み込んで、ファイルが存在すればos.remove()
で削除します。
動作確認
それでは動作確認をしてみます。Scrapyを実行するには、以下のように、scrapy crawl
のあとにSpiderの名前を指定します。
$ scrapy crawl jrhokkaido
コンソールにずらずらっとログが出力されます。ログをよく見れば、想定どおりに動作しているのかはわかりますが、今回はデータベースに結果を入れているので、その結果をJSON形式で読み出してみます。
以下、例として一部の抜粋を示します。
{'category': '臨時列車',
'company': 'JR北海道',
'created_at': datetime.datetime(2024, 1, 19, 23, 44, 32),
'date': datetime.date(2024, 1, 19),
'id': 65,
'summary': 'JR北海道は、2024年3月から6月までの特急列車についての運転日程を発表しました。特に、261系5000代「はまなす」 編成および「ラベンダー」編成での運転列車が注目されています。北海道内を結ぶ人気のある路線に加え、フラノラベンダーエクスプレスも運行される予定です。乗客の利便性を図るため、多彩な運行パターンが組まれており、旅行者にとって魅力的な選択肢が提供されます。',
'title': '261 系5000 代「はまなす」・「ラベンダー」編成で運転する特急列車について(3~6月分)',
'url': 'https://www.jrhokkaido.co.jp/CM/Info/press/pdf/20240119_KO_hamalabe3-6.pdf'},
{'category': '臨時列車',
'company': 'JR北海道',
'created_at': datetime.datetime(2024, 1, 19, 23, 44, 38),
'date': datetime.date(2024, 1, 19),
'id': 66,
'summary': 'JR北海道は2024年3月1日から6月30日までの122日間、春の臨時列車を計352本運行することを発表しました。新幹線や 在来線を含む列車は、ゴールデンウィーク期間などの利用が多い時期に増発されます。道南・道東・道北方面においてもSL列車や特急列車が運行され、新緑の季節に合わせた列車も運転される予定です。',
'title': '春の臨時列車のお知らせ ~3月から6月に運 転する列車です~',
'url': 'https://www.jrhokkaido.co.jp/CM/Info/press/pdf/20240119_KO_harurin.pdf'}
LLMに生成させたsummary
やcategory
を含めて、正常に動作しているようですね。
現在は、今回ご紹介した仕組みを利用して情報収集しています。毎日10~20程度のプレスを取得して、OpenAI APIを用いてサマリを作成していますが、GPT-3.5-turboを利用して0.03ドル/日(数円/日)くらいのコストしかかかっていません。もちろん、プロンプトに含めるテキストのサイズに依存しますので、たとえばPDFファイル全体のテキストを含めると、もっとコストがかかるはずです。
まとめ
Scrapyを利用して、企業の公式なプレスやニュースリリースのPDFファイルを取得し、LLMに要約させた結果も含めてデータベースに格納するという処理を実現しました。
Scrapyを利用したのは初めてでしたが、ちょっとしたお作法を覚えるだけで、スクレイピングが簡単にできるようになります。Pipelineの仕組みもあるので、データを取得したあとの処理もいろいろできて便利ですね。
LLMにサマリを作成してもらうところまでやると、PDFファイルを開くことすらせずに内容をざっと把握することができますので、情報収集のスピードが格段に上がります。
Discussion