📲

scrapyでプレスのPDFファイルを収集してLLMで要約する

2024/02/04に公開

PythonのスクレイピングフレームワークScrapyを利用して、プレスなどのPDFファイルを収集、LLM(OpenAI API)でサマリを作成し、結果をデータベースに格納するという一連の流れを実現してみました。Scrapy、OpenAI APIともごく基本的な使い方しかしていませんが、あっという間に自動収集するプログラムが作成できましたので紹介します。

Scrapyで収集したPDFファイルの内容をLLMに読み込ませてサマリーを作成

今回やりたいことは以下のとおりです。

  1. Scrapyで特定のWebサイトの最新情報(今回はプレスリリースのタイトル、日付、PDFファイルなど)を取得する
  2. 取得したPDFファイルを開いて内容を読み込む
  3. LLM(OpenAI API)を利用して300文字程度のサマリーを作成
  4. 1.で取得したタイトル、日付、URLなどとともにサマリーをデータベース(MySQL)に保存

情報収集目的でScrapyを利用する事例はよくあると思いますが、今回はLLMにサマリーを作成してもらうところまで、Scrapyの仕組みで実現してみました。PDFファイルを開いて読むのは面倒なので、短いサマリーだけを見て、興味深そうなものだけを読むことができると、情報収集の速度も上がります。

今回の環境は以下のとおりです。

  • Windows 10
  • Python 3.11.6
  • Scrapy 2.11.0
  • PyMuPDF 1.23.16

今回は収集対象がPDFファイルなので、その中身のテキストを取得するのにPyMuPDFを利用します。Windows10上で開発しましたが、丸ごとUbuntu 22.04に持っていってもそのまま動作しました。

なお、今回は鉄道関係の情報収集を目的として、鉄道会社のプレスリリースの取得を題材としています。このあたりは、各自、興味のある分野に置き換えて読んでいただければと思います。

Scrapyの環境構築

Scrapyをインストールしたあと、プロジェクトを作成します。ここではtrainnewsというプロジェクトを作成しています。

$ pip install scrapy
$ scrapy startproject trainnews

配下にtrainnewsディレクトリが作成され、以下のようなファイルツリーが生成されます。Scrapyの開発では、主にspiderpipeline.pyをいじっていきます。

D:\DOCUMENTS\WORK\TRAINNEWS
│  scrapy.cfg
│  
├─download
├─log
│      scrapy.log
│      
└─trainnews
    │  items.py
    │  middlewares.py
    │  pipelines.py
    │  settings.py
    │  __init__.py
    │  
    └─spiders
            __init__.py

設定ファイルsettings.pyを開き、以下の行を修正または追加します。

settings.py
# 同時に実施するスクレイピングの数
CONCURRENT_REQUESTS = 1

# ダウンロード間隔
DOWNLOAD_DELAY = 3

# スクレイピング結果のエンコード
FEED_EXPORT_ENCODING = "utf-8"

CONCURRENT_REQUESTSDOWNLOAD_DELAYは、スクレイピングする先のWebサイトに過度の負荷を生じさせないようにするための設定です。また、FEED_EXPORT_ENCODINGは、スクレイピング結果をJSONファイルとして出力するときのエンコードを設定します。日本語のWebサイトを対象とする場合には設定しておきましょう。

Scrapyについて詳しくない方は、「Scrapyチュートリアル」がわかりやすいです。

https://doc-ja-scrapy.readthedocs.io/ja/latest/intro/tutorial.html

Itemの作成

Itemは、スクレイピングで取得したデータを格納しておくためのクラスです。データを取得したあとの処理を行うときも、このItemを利用します。

プロジェクトを作成した時に自動生成されているitems.pyに追記していきます。今回は、以下のようにしました。

items.py
class TrainnewsItem(scrapy.Item):
    # define the fields for your item here like:
    title = scrapy.Field()		# プレスのタイトル
    date = scrapy.Field()		# 日付
    company = scrapy.Field()	# 会社名
    category = scrapy.Field()	# カテゴリ
    file_urls = scrapy.Field()	# PDFファイルのURL
    files = scrapy.Field()		# ファイルダウンロード時に利用
    file_paths = scrapy.Field()	# 同上
    summary = scrapy.Field()	# サマリ

このように、<フィールド名> = scrapy.Field()という形で追加していきます。

Spiderの作成

Scrapyでは、実際にWebサイトをクロールするSpiderというクラスを作成していきます。Spiderクラスは、Webサイトを読み込み、HTMLファイルをパースし、必要な情報を抽出する役目を担います。

scrapy genspiderコマンドで、Spiderのひな型を作成できます。第1引数にSpiderの名前を、第2引数にスクレイピングするドメイン名を与えてコマンドを実行します。ここでは、JR北海道のWebサイトを設定してみます。

$ scrapy genspider jrhokkaido jrhokkaido.co.jp

spidersディレクトリの下に、jrhokkaido.pyが作成されていますので、これをベースに、HTMLのパース処理と、パースして取得したデータをItemsに追加する処理を書いていきます。

jrhokkaido.py
class JrhokkaidoSpider(scrapy.Spider):
    name = "jrhokkaido"
    allowed_domains = ["jrhokkaido.co.jp"]
    start_urls = ["https://www.jrhokkaido.co.jp/CM/Info/press/presstop.html"]

    def parse(self, response):
        for fragment in response.css('ul.press-page-list li').getall()[:10]:

            fragment_response = scrapy.http.HtmlResponse(
                url="",
                body=fragment,
                encoding='utf-8')

            item = TrainnewsItem()

            # 日付
            date = fragment_response.css('time::text').get()
            dt = datetime.datetime.strptime(date, '%Y.%m.%d')
            item['date'] = dt.strftime('%Y-%m-%d')

            # タイトル
            title_texts = fragment_response.css('div a span::text').getall()
            item['title'] = ' '.join(title_texts)

            # URL
            url =  fragment_response.css('div a::attr(href)').get()
            item['file_urls'] = [response.urljoin(url)]

            item['company'] = "JR北海道"

            yield item

parse()関数に処理を書いていきますが、引数のresponseに取得したHTMLが格納されています。response.css(<CSSセレクタ>)またはresponse.path(<XPath)で、指定したセレクタやXPathのノード一覧を取得します。

その後、取得したHTMLノードからテキストなど必要なデータを取得し、適切な型や形式に変換してからItemに格納していきます。

最後に yield Itemでデータを抽出します。このあとのデータ処理のPipelineへデータを引き渡す場合も同様です。

Pipelineで後処理を記述

Scrapyでは、SpiderでWebサイトから取得したデータを後処理する仕組みとしてPipelineが備わっています。Pipelineはその名のとおり、定義した処理をパイプライン的につなげることができる仕組みです。

今回は、このPipelineで、PDFファイルのダウンロード、読み取り(解析)、LLMによるサマリー作成を実現します。

Pipelineの処理は、pipelines.pyにClassとして記述していきます。各クラスのprocess_item()メソッドで処理を実施し、最後にitemを返すと、次のPipelineに引き渡されます。

settings.pyにITEM_PIPELINESを設定

Pipelineを利用するには、pipelines.pyで定義するPipelineクラスを、settings.pyITEM_PIPELINESに設定しておきます。

settings.py
ITEM_PIPELINES = {
    "trainnews.pipelines.CheckDBPipeline": 100,
    "trainnews.pipelines.PDFDownloadPipeline": 200,
    "trainnews.pipelines.NewsSummaryPipeline": 300,
    "trainnews.pipelines.InsertDBPipeline": 400,
    "trainnews.pipelines.DeletePDFFilePipeline": 500,
}

このように設定すると、数値の小さい順にPipelineが実行されます。

今回設定するPipeline処理の流れは、以下のようになります。

以下では、各処理を実現するPipelineについて詳しく見ていきます。

データベース関連

まずは、Webサイトから取得したデータをデータベースに保存するための処理です。具体的には、以下の2つの処理が必要となりますので、それぞれ別々のPipelineのクラスを定義します。

  • CheckDBPipeline: クローラが取得したデータがデータベースに登録されているかをチェックする
  • InsertDBPipeline: 実際にデータをデータベースに追加(INSERT)する

Pipelineは処理毎にクラスを定義するのですが、データベースの実体は一つです。CheckDBPipelineInsertDBPipelineでそれぞれデータベースに接続→処理→切断をするのは効率が悪いので、少し工夫します。

具体的には、これらクラスの基底クラスとなるDatabaseBasePipelineを定義し、その中でデータベースへの接続や切断を実施します。ScrapyのPipelineクラスには、Spiderの開始時や終了時、Pipelineクラスがインスタンス化されるときに呼び出される特別なメソッドがありますので、これを活用します。

pipelines.py
class DatabaseBasePipeline:
    """
    Database Pipelineの基底クラス
    """
    def __init__(self, db_param):
        self.db_param = db_param
        self.conn = None

    @classmethod
    def from_crawler(cls, crawler):

        DEBUG = os.getenv('DEBUG', 'False') == 'True'

        # .envファイルからDB接続のパラメータを取得
        # MySQL parameters
        db_param = {
            'name': os.environ['DATABASE_NAME'],
            'host': os.environ['DATABASE_HOST'],
            'user': os.environ['DATABASE_USER'],
            'port': int(os.environ['DATABASE_PORT']),
            'password': os.environ['DATABASE_PASSWORD'],
        }

        return cls(db_param=db_param)


    def open_spider(self, spider):

            # MySQLに接続
            self.conn = pymysql.connect(
                host=self.db_param['host'],
                port=self.db_param['port'],
                user=self.db_param['user'],
                password=self.db_param['password'],
                db=self.db_param['name'],
            )

    def close_spider(self, spider):
        self.conn.close()

from_crawler()メソッドはクラスメソッドとして定義すると、Pipelineクラスがインスタンス化されるときに呼ばれます。環境変数ファイル.envからデータベース接続に必要な情報を読み出して、クラス変数self.db_paramとして保存し、最後にreturn cls(db_param=db_param)としてインスタンスを作成して返します。

open_spider()メソッドは、Spiderの開始時に呼び出されます。ここで、from_crawler()で読みだしておいたdb_paramを利用してデータベースに接続します。データベースへのconnectionをself.connとして保存しておきます。

close_spider()メソッドは、Spiderの終了時に呼び出されますので、データベースへの接続を切断します。

基底クラスでここまでの処理を記述しておけば、あとの処理は淡々と書くだけです。

すでにデータベースにデータが格納されているかを確認するCheckDBPipelineは以下のようになります。

pipelines.py
class CheckDBPipeline(DatabaseBasePipeline):
    """
    DBに記事の情報が保存されているかをチェックする
    すでに保存されていればItemをDropする
    """

    def process_item(self, item, spider):
        # DBにすでにデータが存在するかを確認する
        conn = self.conn
        cur = conn.cursor()
        cur.execute('SELECT `id` FROM `train_news` WHERE `title` = %s;',(item['title'], ) )
        
        if cur.fetchone() != None:
            # すでにDBに存在する場合
            raise DropItem(f"Item title = {item['title']} already exists.")
        
        # DBに存在しない場合は次のPipelineへ引き渡す
        return item

Pipelineの処理を実行するのはprocess_item()メソッドです。基底クラスのself.connで設定したデータベースへのコネクションを取得してデータベースを操作します。

CheckDBPipeline.process_item()では、すでにWebサイトから取得されたデータであればデータベースにレコードが存在するはずなので、それをチェックしています。すでにデータベースに存在する場合には、raise DropItem()Itemをドロップします。ドロップすると、このあとの処理は実施されません。そうでなければ、return ItemItemを次のPipelineへ引き渡します。

pipelines.py
class InsertDBPipeline(DatabaseBasePipeline):
    """
    Itemをデータベースに保存するPipeline
    """

    def process_item(self, item, spider):

        # DBに書き込む
        conn = self.conn
        cur = conn.cursor()
        sql = "INSERT INTO `train_news` (`title`, `date`, `company`, `category`, `summary`, `url`) VALUES (%s, %s, %s, %s, %s, %s)"

        data = (
            item['title'],
            item['date'],
            item['company'],
            item['category'],
            item['summary'],
            item['file_urls'][0]
            )

        cur.execute(sql, data)
        conn.commit()

        # logging
        logging.info('Insert record for  %s.', item['title'])

        return item

InsertDBPipelineは、Itemの内容をデータベースに書き込むPipelineです。CheckDBPipelineとの間に、PDFファイルを読み込む処理と、PDFファイルのサマリーを作成する処理が入っているので、クラスを分けています。ただ、CheckDBPipelineと同じ基底クラスを継承することで、self.connを利用してデータベースにアクセスすることができます。

PDFファイルのダウンロード

CheckDBPipelineのあと、データベースにまだデータが登録されていない場合には、PDFファイルをダウンロードし、内容を解析して、LLMでサマリを作成します。

PDFをダウンロードするPDFDownloadPipelineは、Scrapyで用意されているFilesPipelineを継承して作成します。

pipelines.py
class PDFDownloadPipeline(FilesPipeline):
    """
    PDFファイルをダウンロードするPipeline
    """

    def file_path(self, request, item, response=None, info=None):
        file_path = os.path.basename(urlparse(request.url).path)
        return file_path

    def item_completed(self, results, item, info):

        # ファイルストアのディレクトリを取得
        settings = get_project_settings()
        files_store = settings.get('FILES_STORE')

        # PDFファイルがダウンロードされたかを調べる
        # PDFファイルがない場合はItemをDropする
        file_paths = [os.path.join(files_store, x['path']) for ok, x in results if ok]
        if not file_paths:
            raise DropItem("Item contains no files")
        
        # itemにファイルのパスを追加
        item['file_paths'] = file_paths

        return item

FilesPipelineを利用するには、Itemに以下の属性を設定しておきます。

  • file_urls: ダウンロードするファイルのURLを設定(リスト形式で複数設定可能)
  • files: FilesPipelineがダウンロードした結果を保存する属性

これだけ設定しておけば、特に処理を記述しなくてもファイルがダウンロードされます。が、ファイル名を指定したり、ダウンロード結果によってItemの処理を変更したい場合には、上記のコードのようにFilesPipelineを継承したクラスを作成して、各処理をオーバーライドします。

まず、file_path()メソッドをオーバーライドして、ファイルのパス名を設定しています。ここでは、PDFファイルのURLからファイルのパスを作成しています。

次に、item_completed()メソッドをオーバーライドして、ファイルがダウンロードされたあとの処理を記述します。ここでは、ファイルのパスをItemfile_paths属性にリストとして格納します。ファイルをダウンロードするディレクトリは、あらかじめsettings.pyFILES_STOREに設定しておきます。もし一つもファイルがダウンロードできていなければ、これ以上の処理ができないので、Itemをドロップします。

FilesPipelineの詳しい使用方法については、ScrapyのWebサイトをご確認ください。

https://docs.scrapy.org/en/latest/topics/media-pipeline.html

PDFファイルの読み込みとサマリの作成

次に、ダウンロードしたPDFファイルを読み込んでテキストを抽出し、LLMに渡してサマリを作成します。

pipelines.py
class NewsSummaryPipeline:
    """
    PDFファイルを読み込み、記事のサマリを作成するPipeline
    """
    def process_item(self, item, spider):

        # check files filed
        if 'file_paths' not in item:
            raise DropItem("Item does not have file_paths field.")

        # extract text from pdf file
        file_path = item['file_paths'][0]
        if not file_path:
            raise DropItem("Item file not found.")
        texts = pdf_loader(file_path)

        # create news summary with OpenAI API
        result = create_summary(texts[0])
        item['summary'] = result['summary']
        item['category'] = result['category']

        # logging
        logging.info('Created summary for %s with OpenAI API.', item['title'])

        return item

PDFDownloadPipelineでダウンロードしたPDFファイルのパス名をItem['file_paths']に格納したので、それをもとにPDFファイルを読み込みます。今回はダウンロードされるPDFファイルは一つのみなので、item['file_paths'][0]とします。

PDFファイルを読み込んでテキストを抽出するpdf_loader()は以下のとおりです。

simple_pdf_loader.py
import fitz
import re
import unicodedata

'''
テキストのクリーニング
改行を削除、全角英数字文字を半角に変換
'''
def clean_and_convert(text):
    # 2つ以上連続する空白を1つに変換
    single_space_text = re.sub(r'\s{2,}', ' ', text)
    
    # 改行のみを削除
    no_newlines_text = re.sub(r'\n', '', single_space_text)
    
    # 全角の英数字を半角に変換
    converted_text = unicodedata.normalize('NFKC', no_newlines_text)
    
    return converted_text

'''
PDFファイルからテキストを抽出
- ページ単位に抽出したテキストの配列を返す
'''
def pdf_loader(pdf_filename: str) -> list[str]:

    # open file
    doc = fitz.open(pdf_filename)

    # get page_count
    page_count = doc.page_count

    # get texts
    texts = []
    for page in range(page_count):
        text = doc.get_page_text(page)
        texts.append(clean_and_convert(text))
    
    return texts

関数pdf_loader()では、PyMuPDFを用いてPDFファイルを読み込み、get_page_text()メソッドでテキストのみを抽出しています。このあと、抽出したテキストをLLMに送ってサマリを作成するため、clean_and_convert()で最低限のクリーニング処理を実施します。

次に、PDFファイルから抽出したテキストをLLMに渡して、サマリを作成します。具体的な処理は、関数create_summary()で以下のように記述しています。

create_summary.py
import json
import logging
from retry import retry
from openai import OpenAI

GPT_MODEL = 'gpt-3.5-turbo-1106'

system_prompt = '''
あなたはユーザが提示するニュースリリースの文章を要約して、ニュース記事風に回答するチャットボットです。
summary(ニュース記事風の要約)は200~300文字程度で回答してください。
companyにはニュースリリースを発行した会社名を、categoryには記事のカテゴリを以下の分類から回答してください。
分類: お知らせ きっぷ 観光列車 ダイヤ改正 旅行商品 臨時列車 イベント・キャンペーン 安全 経営 その他
全てJSON形式で、日本語で回答してください。
'''

assistant_prompt = '''
    {"summary": "",
     "company": "",
     "category": ""}
'''

@retry(delay=2, backoff=4, max_delay=60, logger=logging)
def openai_api(text: str):
    """
    OpenAI API
    """
    client = OpenAI(
        timeout=60.0,
        max_retries=3,
    )

    res = client.chat.completions.create(
        model=GPT_MODEL,
        messages=[
            { 'role': 'system', 'content': system_prompt },
            { 'role': 'assistant', 'content': assistant_prompt },
            { 'role': 'user', 'content': text },
        ],
        response_format={ 'type': 'json_object' },
    )

    return res

def create_summary(text: str) -> dict:

    res = openai_api(text)
    return json.loads(res.choices[0].message.content)

PDFから抽出したテキストのうち1ページ目をプロンプトに挿入して、OpenAIのAPIを呼び出しているだけです。response_format={ 'type': 'json_object' }でJSONモードを指定して、回答をJSON形式でもらうようにしています。JSONの形式はプロンプトに含めています。

ここでは、サマリだけでなく、記事のカテゴリもLLMに選択させています。カテゴリの選択肢をプロンプトに示して、その中から選択させるようにしていますが、おおむね期待どおりに回答してくれます。

このあと、前述のデータベース関連のところで紹介したInsertDBPipelineで、サマリやカテゴリなども含めて、データをデータベースに書き込みます。

PDFファイルの削除

最後に、ダウンロードしたPDFファイルを削除します。今回は、PDFファイルそのものは保存しないことにしているので、不要なファイルが溜まらないように、Pipelineの最後で削除する処理を入れています。

pipelines.py
class DeletePDFFilePipeline:
    """
    PDFファイルを削除するPipeline
    """

    def process_item(self, item, spider):

        for file_path in item['file_paths']:
            if os.path.exists(file_path):
                os.remove(file_path)

        return item

ダウンロードしたファイルのパス名はItem['file_paths']にリスト形式で保存されているので、それを読み込んで、ファイルが存在すればos.remove()で削除します。

動作確認

それでは動作確認をしてみます。Scrapyを実行するには、以下のように、scrapy crawlのあとにSpiderの名前を指定します。

$ scrapy crawl jrhokkaido

コンソールにずらずらっとログが出力されます。ログをよく見れば、想定どおりに動作しているのかはわかりますが、今回はデータベースに結果を入れているので、その結果をJSON形式で読み出してみます。

以下、例として一部の抜粋を示します。

 {'category': '臨時列車',
  'company': 'JR北海道',
  'created_at': datetime.datetime(2024, 1, 19, 23, 44, 32),
  'date': datetime.date(2024, 1, 19),
  'id': 65,
  'summary': 'JR北海道は、2024年3月から6月までの特急列車についての運転日程を発表しました。特に、261系5000代「はまなす」 編成および「ラベンダー」編成での運転列車が注目されています。北海道内を結ぶ人気のある路線に加え、フラノラベンダーエクスプレスも運行される予定です。乗客の利便性を図るため、多彩な運行パターンが組まれており、旅行者にとって魅力的な選択肢が提供されます。',
  'title': '261 系5000 代「はまなす」・「ラベンダー」編成で運転する特急列車について(3~6月分)',
  'url': 'https://www.jrhokkaido.co.jp/CM/Info/press/pdf/20240119_KO_hamalabe3-6.pdf'},
 {'category': '臨時列車',
  'company': 'JR北海道',
  'created_at': datetime.datetime(2024, 1, 19, 23, 44, 38),
  'date': datetime.date(2024, 1, 19),
  'id': 66,
  'summary': 'JR北海道は2024年3月1日から6月30日までの122日間、春の臨時列車を計352本運行することを発表しました。新幹線や 在来線を含む列車は、ゴールデンウィーク期間などの利用が多い時期に増発されます。道南・道東・道北方面においてもSL列車や特急列車が運行され、新緑の季節に合わせた列車も運転される予定です。',
  'title': '春の臨時列車のお知らせ ~3月から6月に運 転する列車です~',
  'url': 'https://www.jrhokkaido.co.jp/CM/Info/press/pdf/20240119_KO_harurin.pdf'}

LLMに生成させたsummarycategoryを含めて、正常に動作しているようですね。

現在は、今回ご紹介した仕組みを利用して情報収集しています。毎日10~20程度のプレスを取得して、OpenAI APIを用いてサマリを作成していますが、GPT-3.5-turboを利用して0.03ドル/日(数円/日)くらいのコストしかかかっていません。もちろん、プロンプトに含めるテキストのサイズに依存しますので、たとえばPDFファイル全体のテキストを含めると、もっとコストがかかるはずです。

まとめ

Scrapyを利用して、企業の公式なプレスやニュースリリースのPDFファイルを取得し、LLMに要約させた結果も含めてデータベースに格納するという処理を実現しました。

Scrapyを利用したのは初めてでしたが、ちょっとしたお作法を覚えるだけで、スクレイピングが簡単にできるようになります。Pipelineの仕組みもあるので、データを取得したあとの処理もいろいろできて便利ですね。

LLMにサマリを作成してもらうところまでやると、PDFファイルを開くことすらせずに内容をざっと把握することができますので、情報収集のスピードが格段に上がります。

Discussion