📖

Java製クローラーフレームワークの実装

2024/08/10に公開

イントロダクション

有名なクローラーフレームワークに、Scrapyがあります。今回、Javaで同じような動きをするフレームワークを実装してみました。
アーキテクチャは、完全にScrapyのアーキテクチャ概観ページから真似しています。
https://doc-ja-scrapy.readthedocs.io/ja/latest/topics/architecture.html
フレームワーク名はKaibutsuにしました。
https://github.com/heruoji/Kaibutsu

Scrapyのコード自体は全く見ておらず、あくまで勉強用に想像して実装した偽物なので、悪しからず・・・!!

モジュール構成

Kaibutsuフレームワークは、以下の主要なモジュールで構成されています:

  • Applicationモジュール:エントリーポイントや設定の読み込み、エンジンの初期化を担当します。
  • Containerモジュール:依存性注入コンテナとして機能し、主要なコンポーネントを動的にインスタンス化します。今は実質ただのファクトリになっています。
  • Coreモジュール:実際のクローリング処理を行うメインのモジュールです。
├── application/
├── container/
└── core/

Coreモジュールのコンポーネント

└── core/
    ├── downloader/
    ├── engine/
    ├── itempipeline/
    ├── scheduler/
    └── parser/

Coreモジュールは、実際にクロール処理を行う中心のモジュールで、以下のコンポーネントから構成されています。

  • Engine:各コンポーネントを制御してクローリング処理を行います。Coreモジュールの司令塔の役割になります。
  • Downloader:指定されたURLのWebページを取得します。静的ページ取得用のStaticDownloaderと、動的ページ取得用のDynamicDownloaderを用意しました。
  • Scheduler:Downlaoderへ渡すリクエストの管理を行います。リクエスト間隔を設定できます。
  • Parser:Downloaderが取得したWebページから、任意のデータと新しいリクエストをスクレイプします。
  • ItemPipeline:取得したデータの後処理を行います。データのクリーニングやファイルへの出力などを行うことができます。

クロール処理の流れ

Coreモジュールのクロール処理全体の流れは、以下になります。

  1. EngineがParserで指定した最初のリクエストを、Schedulerに渡します。
  2. Schedulerはリクエストを管理し、指定された間隔でEngineに渡します。Engineは、Schedulerから受け取ったリクエストを、Downloaderに渡します。
  3. Downloaderはリクエストを受け取ると、Webページを取得して、Engineに返します。
  4. Engineは、Downlaoderから受け取ったWebページを、スクレイプのためにParserに渡します。
  5. Parserは、Webページからデータ(item)と新しいクロール先のURLをスクレイプします。それらをまとめてEngineに返します。
  6. EngineはParserから受け取ったスクレイプ結果のうち、新しいリクエストをSchedulerに渡し、データをItemPipelineに渡します。
  7. ItemPipelineでは、データに対してファイル出力などの任意の後処理を行います。
  8. Schedulerからリクエストがなくなるまで、2に戻ります。

フレームワーク利用者側でParser、Item、ItemPipelineの実装クラスを用意することで、任意のWebサイトをクロールすることができます。

フレームワーク利用方法

例として、https://quotes.toscrape.com/ に載っている著者の名言をスクレイプし、ファイルに出力する処理を行いたいと思います。
フレームワーク利用者側で以下のファイルを用意する必要があります。

  1. Parser実装クラス
  2. Item実装クラス(抽出したいデータのDTO)
  3. ItemPipeline実装クラス(抽出したデータの後処理の実装クラス)
  4. 設定ファイル

Parser実装クラス

Parser実装クラスでは、実際のWebページに対するスクレイプ処理を実装します。使い方はScrapyにおけるSpiderとほとんど同じです。
startRequestは、クロールを開始するページのDownloaderRequestクラスを返します。
parseMainとparseAuthorはユーザー側で実装したメソッドであり、メソッド名に決まりはありません。DownloaderRequestでクロール対象のURLと、そのページに対して実行するメソッド名を指定します。例えば、startRequestでは、https://quotes.toscrape.com というページに対して、parseMainメソッドでスクレイプを行うように指定しており、同じようにparseMainメソッドでは、各著者ページに対して、parseAuthorメソッドでスクレイプを実行するように指定しています。

public class QuoteParser implements Parser {

    public DownloaderRequest startRequest() {
        return new DownloaderRequest("https://quotes.toscrape.com", "parseMain");
    }

    public ParserResponse parseMain(DownloaderResponse downloaderResponse) {
        List<DownloaderRequest> newDownloaderRequests = downloaderResponse.getJsoupElements(".author + a").stream().map(link -> new DownloaderRequest(link.absUrl("href"), "parseAuthor")).collect(Collectors.toList());
        newDownloaderRequests.addAll(downloaderResponse.getJsoupElements("li.next a").stream().map(link -> new DownloaderRequest(link.absUrl("href"), "parseMain")).toList());

        return ParserResponse.fromNewRequests(newDownloaderRequests);
    }

    public ParserResponse parseAuthor(DownloaderResponse downloaderResponse) {
        String name = downloaderResponse.getJsoupElements("h3.author-title").text();
        String birthday = downloaderResponse.getJsoupElements(".author-born-date").text();
        String bio = downloaderResponse.getJsoupElements(".author-description").text();
        Author author = new Author();
        author.name = name;
        author.birthday = birthday;
        author.bio = bio;
        return ParserResponse.fromItems(Collections.singletonList(author));
    }
}

Item実装クラス

Item実装クラスは、取得したいデータのDTOクラスです。今回の場合は、著者の名前、誕生日、紹介文を取得します。

public class Author implements Item {
    public String name;
    public String birthday;
    public String bio;

    @Override
    public String toString() {
        return String.format("{ name : %s, birthday : %s, bio : %s }", name, birthday, bio);
    }
}

ItemPipeline実装クラス

ItemPipeline実装クラスでは、取得したItemに対してどのような後処理を行うかを指定します。フレームワーク側では、Printerという実装クラスを用意しており、コンソールに出力することができます。今回の要件では、csv出力を行いたいため、AuthorCsvWriterクラスを用意します。
初期化処理のopenメソッド、終了時処理のcloseメソッド、データに対する処理のprocessメソッドを実装する必要があります。

public class AuthorCsvWriter implements ItemPipeline {
    private CSVPrinter csvPrinter;

    @Override
    public void open() {
        try {
            Writer writer = new FileWriter("authors.csv");
            this.csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT.withHeader("name", "birthday", "bio"));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void close() {
        try {
            csvPrinter.flush();
            this.csvPrinter.close();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public Item process(Item item) {
        try {
            Author author = (Author) item;
            csvPrinter.printRecord(author.name, author.birthday, author.bio);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        return item;
    }
}

設定ファイル

設定ファイルでは、以下の項目を指定します。

parser=QuoteParser //使用するParser実装クラス
dynamic=false //静的ページを取得するかどうか
itemPipelines=Printer,AuthorCsvWriter //使用するItemPipeline実装クラス
interval=1000 //SchedulerからDownloaderにリクエストを渡す間隔。ミリ秒
parserPackage=org.example.kaibutsu.parser //Parser実装クラスのパッケージ
itemPipelinesPackage=org.example.kaibutsu.itempipeline //ItemPipeline実装クラスのパッケージ

各モジュールの詳細

Applicationモジュール

Applicationモジュールは、フレームワークのエントリーポイントとなります。以下のクラスを含んでいます

application/
├── config/
│   ├── Config.java
│   ├── ConfigLoader.java
│   └── ConfigException.java
└── Kaibutsu.java

Kaibutsuクラスがアプリケーションを実行するメインクラスです。引数で指定された設定ファイルを読み込んでDIを行い、エンジンを初期化して実行することで、任意のWebサイトをクロールすることができます。

public class Kaibutsu {
    private static final Logger logger = LoggerFactory.getLogger(Kaibutsu.class);

    public static void main(String[] args) {
        run(args[0]);
    }

    public static void run(String config) {
        if (config.isEmpty()) {
            logger.error("設定ファイル名が指定されていません。プログラムを実行するには有効な設定ファイル名を引数に指定してください。");
            throw new IllegalArgumentException("設定ファイル名が指定されていません。");
        }
        try {
            GodzillaEngine godzillaEngine = initializeEngine(config);
            godzillaEngine.run();
        } catch (Exception e) {
            Thread.currentThread().interrupt();
            logger.error("エンジンの初期化または実行中にエラーが発生しました: ", e);
        }
    }

    private static GodzillaEngine initializeEngine(String configName) {
        Config config = ConfigLoader.load(configName);
        Scheduler scheduler = new Scheduler(config.intervalMillSeconds);
        Downloader downloader = Container.buildDownloader(config.dynamic);
        Tsuchigumo tsuchigumo = Container.buildTsuchigumo(config.tsuchigumoPackage, config.tsuchigumo);
        List<MagatamaPipeline> magatamaPipelines = Container.buildMagatamaPipelines(config.magatamaPipelinesPackage, Arrays.asList(config.magatamaPipelines));

        return new GodzillaEngine(scheduler, downloader, tsuchigumo, magatamaPipelines);
    }
}

Containerモジュール

Containerモジュールは、フレームワークの主要なコンポーネントを動的にインスタンス化します。

container/
├── ClassFinder.java
├── Container.java
└── ContainerException.java
public class Container {

    public static Parser buildParser(String targetPackage, String parserName) {
        Set<Class<? extends Parser>> parserClasses = getParserClasses(targetPackage);
        for (Class<? extends Parser> clazz : parserClasses) {
            if (clazz.getSimpleName().equals(parserName)) {
                return instantiateParser(clazz);
            }
        }
        throw new ContainerException("指定された名前のParserが見つかりませんでした。名前:" + parserName);
    }

    private static Set<Class<? extends Parser>> getParserClasses(String targetPackage) {
        return ClassFinder.getSubClasses(targetPackage, Parser.class);
    }

    private static Parser instantiateParser(Class<?> clazz) {
        try {
            return (Parser) clazz.getConstructor().newInstance();
        } catch (InstantiationException | IllegalAccessException | InvocationTargetException |
                 NoSuchMethodException e) {
            throw new ContainerException("Parserの初期化に失敗しました。", e);
        }
    }

    public static Downloader buildDownloader(boolean usePlaywright) {
        if (usePlaywright) {
            return new DynamicDownloader();
        } else {
            return new StaticDownloader();
        }
    }

    public static List<ItemPipeline> buildItemPipelines(String targetPackage, List<String> names) {
        if (names.isEmpty()) {
            return Collections.emptyList();
        }
        List<ItemPipeline> itemPipelines = new ArrayList<>();
        for (String name : names) {
            ItemPipeline itemPipeline = buildItemPipeline(targetPackage, name);
            itemPipelines.add(itemPipeline);
        }
        return itemPipelines;
    }

    private static ItemPipeline buildItemPipeline(String targetPackage, String name) {
        if (name.equals("Printer")) {
            return new Printer();
        }
        Set<Class<? extends ItemPipeline>> itemPipelineClasses = getItemPipelineClasses(targetPackage);
        for (Class<? extends ItemPipeline> clazz : itemPipelineClasses) {
            if (clazz.getSimpleName().equals(name)) {
                return instantiateItemPipeline(clazz);
            }
        }
        throw new ContainerException("指定された名前のItemPipelineが見つかりませんでした。名前:" + name);
    }

    private static Set<Class<? extends ItemPipeline>> getItemPipelineClasses(String targetPackage) {
        return ClassFinder.getSubClasses(targetPackage, ItemPipeline.class);
    }

    private static ItemPipeline instantiateItemPipeline(Class<?> clazz) {
        try {
            return (ItemPipeline) clazz.getConstructor().newInstance();
        } catch (InstantiationException | IllegalAccessException | InvocationTargetException |
                 NoSuchMethodException e) {
            throw new ContainerException("ItemPipelineの初期化に失敗しました", e);
        }
    }

}

Coreモジュール

coreモジュールは、実際のクロール処理を行う本体部分です。以下のコンポーネントから成ります。

└── core/
    ├── downloader/
    ├── engine/
    ├── itempipeline/
    ├── scheduler/
    └── parser/

Downloaderコンポーネント

├── downloader/
│   ├── Downloader.java
│   ├── DownloaderException.java
│   ├── DynamicDownloader.java
│   ├── DownloaderRequest.java
│   ├── DownloaderResponse.java
│   └── StaticDownloader.java

Downloaderは、指定されたURLのページをダウンロードするためのメソッドを定義します。

package org.example.kaibutsu.core.downloader;

import reactor.core.publisher.Mono;

public interface Downloader {
    Mono<DownloaderResponse> download(DownloaderRequest downloaderRequest);

    void close();
}

具体的な実装クラスとして、静的コンテンツ用のStaticDownloaderと、動的コンテンツ用のDynamicDownloaderがあります。これらのどちらを利用するかはConfig設定ファイルで指定します。
StaticDownloaderクラスは、springframework.web.reactiveのWebClientを使用して非同期に静的コンテンツを取得します。

package org.example.kaibutsu.core.downloader;

import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

public class StaticDownloader implements Downloader {

    private static final int MAX_REDIRECTS = 5;
    private static final int MAX_IN_MEMORY_SIZE = 1024 * 1024;

    private final WebClient webClient = WebClient.builder()
            .exchangeStrategies(ExchangeStrategies.builder()
                    .codecs(configure -> configure
                            .defaultCodecs()
                            .maxInMemorySize(MAX_IN_MEMORY_SIZE))
                    .build())
            .build();

    @Override
    public Mono<DownloaderResponse> download(DownloaderRequest downloaderRequest) {
        return download(downloaderRequest, 0);
    }

    private Mono<DownloaderResponse> download(DownloaderRequest downloaderRequest, int redirectCount) {
        return webClient.get()
                .uri(downloaderRequest.getUrl())
                .exchangeToMono(clientResponse -> {
                    if (clientResponse.statusCode().isError()) {
                        return clientResponse.createException().flatMap(ex -> Mono.error(new DownloaderException("レスポンスのステータスコードが不正です。リクエスト:" + downloaderRequest, ex)));
                    } else if (clientResponse.statusCode().is3xxRedirection()) {
                        String newUrl = clientResponse.headers().header("Location").stream().findFirst().orElse(null);
                        if (newUrl == null) {
                            return Mono.error(new DownloaderException("リダイレクト先のURLが見つかりませんでした。リクエスト:" + downloaderRequest));
                        }
                        if (redirectCount >= MAX_REDIRECTS) {
                            return Mono.error(new DownloaderException("リダイレクトの上限回数に達しました。リクエスト:" + downloaderRequest));
                        }
                        DownloaderRequest newDownloaderRequest = downloaderRequest.cloneWithNewUrl(newUrl);
                        return download(newDownloaderRequest, redirectCount + 1);
                    } else {
                        return clientResponse.bodyToMono(byte[].class)
                                .map(body -> new DownloaderResponse(
                                        downloaderRequest.getUrl(),
                                        body,
                                        downloaderRequest
                                ));
                    }
                })
                .onErrorResume(e -> Mono.error(new DownloaderException("ダウンロード中にエラーが発生しました。リクエストURL:" + downloaderRequest.getUrl(), e)));
    }

    @Override
    public void close() {
    }
}

対して、DynamicDownloaderクラスは、Playwrightを使用して動的なWebページを取得します。

Engineコンポーネント

GodzillaEngineクラスは、クローリングのメインロジックを担当しており、Scheduler、Downloader、Parser、ItemPipelineコンポーネントの処理を制御し、全体の処理を管理します。Project Reactorライブラリを用いることで、リクエストの各コンポーネント処理を非同期で効率的に行います。

├── engine/
│  └── Engine.java
package org.example.kaibutsu.core.engine;

import org.example.kaibutsu.core.downloader.Downloader;
import org.example.kaibutsu.core.downloader.DownloaderException;
import org.example.kaibutsu.core.downloader.DownloaderRequest;
import org.example.kaibutsu.core.downloader.DownloaderResponse;
import org.example.kaibutsu.core.itempipeline.ItemPipeline;
import org.example.kaibutsu.core.scheduler.Scheduler;
import org.example.kaibutsu.core.parser.Item;
import org.example.kaibutsu.core.parser.Parser;
import org.example.kaibutsu.core.parser.ParserException;
import org.example.kaibutsu.core.parser.ParserResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

public class Engine {
    private static final Logger logger = LoggerFactory.getLogger(Engine.class);
    private final Scheduler scheduler;
    private final Downloader downloader;
    private final Parser parser;
    private final List<ItemPipeline> itemPipelines;
    private volatile boolean isRunning = true;
    private final Set<DownloaderRequest> inProgress = ConcurrentHashMap.newKeySet();
    private final Set<String> visitedUrls = ConcurrentHashMap.newKeySet();
    private final Set<String> errorUrls = ConcurrentHashMap.newKeySet();
    private final CountDownLatch latch = new CountDownLatch(1);

    public Engine(Scheduler scheduler, Downloader downloader, Parser parser, List<ItemPipeline> itemPipelines) {
        this.scheduler = scheduler;
        this.downloader = downloader;
        this.parser = parser;
        this.itemPipelines = itemPipelines;
        initializeItemPipeline();
    }

    public void run() throws InterruptedException {
        addRequestToScheduler(parser.startRequest());
        crawl();
        waitCompletion();
    }

    private void addRequestToScheduler(DownloaderRequest downloaderRequest) {
        inProgress.add(downloaderRequest);
        scheduler.addRequest(downloaderRequest);
    }

    private void crawl() {
        scheduler.requestStream()
                .flatMap(this::processRequest)
                .subscribe(this::onNext, this::onError, this::onComplete);
    }

    private Mono<DownloaderRequest> processRequest(DownloaderRequest downloaderRequest) {
        return download(downloaderRequest)
                .publishOn(Schedulers.parallel())
                .flatMap(this::parse)
                .doOnNext(this::addNewRequestsToScheduler)
                .flatMap(parserResponse -> processItemPipelines(parserResponse, downloaderRequest))
                .onErrorResume(e -> {
                    handleError(e, downloaderRequest);
                    return Mono.empty();
                })
                .doFinally(signalType -> {
                    inProgress.remove(downloaderRequest);
                });
    }

    private Mono<DownloaderResponse> download(DownloaderRequest downloaderRequest) {
        return downloader.download(downloaderRequest);
    }

    private Mono<ParserResponse> parse(DownloaderResponse downloaderResponse) {
        return parser.parse(downloaderResponse);
    }

    private void addNewRequestsToScheduler(ParserResponse parserResponse) {
        parserResponse.newDownloaderRequests.stream()
                .filter(newRequest -> !visitedUrls.contains(newRequest.getUrl()))
                .forEach(newRequest -> {
                    visitedUrls.add(newRequest.getUrl());
                    addRequestToScheduler(newRequest);
                });
    }

    private Mono<DownloaderRequest> processItemPipelines(ParserResponse parserResponse, DownloaderRequest downloaderRequest) {
        return Flux.fromIterable(parserResponse.items)
                .flatMap(this::processItemThroughPipeline)
                .then(Mono.just(downloaderRequest));
    }

    private Mono<Item> processItemThroughPipeline(Item item) {
        return Flux.fromIterable(itemPipelines)
                .flatMap(pipeline -> Mono.fromCallable(() -> pipeline.process(item)))
                .last();
    }

    private void initializeItemPipeline() {
        itemPipelines.forEach(ItemPipeline::open);
    }

    private void waitCompletion() throws InterruptedException {
        startShutdownMonitor();
        latch.await();
    }

    private void startShutdownMonitor() {
        Flux.interval(Duration.ofSeconds(1))
                .takeWhile(i -> isRunning)
                .subscribe(i -> {
                    if (inProgress.isEmpty() && scheduler.isEmpty()) {
                        isRunning = false;
                        scheduler.terminate();
                    }
                });
    }

    private void handleError(Throwable e, DownloaderRequest downloaderRequest) {
        if (e instanceof DownloaderException) {
            logError("Downloader", downloaderRequest.getUrl(), e);
        } else if (e instanceof ParserException) {
            logError("Parser", downloaderRequest.getUrl(), e);
        } else {
            logError("Processing", downloaderRequest.getUrl(), e);
        }
        errorUrls.add(downloaderRequest.getUrl());
    }

    private void logError(String component, String url, Throwable e) {
        logger.error("{} error for URL: {}", component, url, e);
    }

    private void onNext(DownloaderRequest downloaderRequest) {
        logger.info("次のURLのクロールが完了しました:" + downloaderRequest.getUrl());
    }

    private void onError(Throwable throwable) {
    }

    private void onComplete() {
        logger.info("全てのクロールが完了しました。");
        itemPipelines.forEach(ItemPipeline::close);
        downloader.close();
        latch.countDown();
    }
}

Schedulerコンポーネント

Schedulerクラスは、リクエストの管理と処理のタイミングを制御します。リクエストをキューに追加し、指定された間隔で処理します。

├── scheduler/
│   ├── Scheduler.java
│   └── SchedulerException.java
package org.example.kaibutsu.core.scheduler;

import org.example.kaibutsu.core.downloader.DownloaderRequest;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Scheduler {

    private final BlockingQueue<DownloaderRequest> downloaderRequestQueue = new LinkedBlockingQueue<>(100);

    private final Sinks.Many<DownloaderRequest> sink = Sinks.many().unicast().onBackpressureBuffer();

    private final int intervalMillSeconds;

    public Scheduler(int intervalMillSeconds) {
        this.intervalMillSeconds = intervalMillSeconds;
    }

    public Flux<DownloaderRequest> requestStream() {
        return sink.asFlux()
                .delayElements(Duration.ofMillis(intervalMillSeconds));
    }

    public void addRequest(DownloaderRequest downloaderRequest) {
        try {
            downloaderRequestQueue.put(downloaderRequest);
        } catch (InterruptedException e) {
            throw new SchedulerException("リクエストのスケジューラーへの追加に失敗しました。\nリクエスト:" + downloaderRequest + "\nエラーメッセージ:" + e.getMessage(), e);
        }
        emitRequests();
    }

    private void emitRequests() {
        DownloaderRequest nextDownloaderRequest;
        while ((nextDownloaderRequest = downloaderRequestQueue.poll()) != null) {
            sink.emitNext(nextDownloaderRequest, ((signalType, emitResult) -> Sinks.EmitResult.FAIL_OVERFLOW.equals(emitResult)));
        }
    }

    public void terminate() {
        emitRequests();
        sink.tryEmitComplete();
    }

    public boolean isEmpty() {
        return downloaderRequestQueue.isEmpty();
    }
}

Parserコンポーネント

Parserは、Downloaderが取得したWebページをスクレイプし、データの抽出と新しいリクエストの生成を行います。フレームワークの使用者はParserの実装クラスを用意することで、任意のWebページをクロールすることができます。
Itemインターフェースは、スクレイプしたいデータのインターフェースです。抽出したいデータに合わせて、利用者側で任意の実装クラスを用意します。

└── parser/
    ├── Item.java
    ├── Parser.java
    ├── ParserException.java
    └── ParserResponse.java

Parserは、ScrapyにおけるSpiderに該当します。クロールを開始する最初のDownloaderRequestを返すためのstartRequestメソッドと、任意のページに対して実行するパース処理用のメソッドを実装する必要があります。DownloaderRequestには、取得ページのURLと一緒に、取得したページに対してParser実装クラスのどのメソッドを実行するのかコールバックメソッド名として指定する必要があります。
Parserインターフェースのparseメソッドでは、Downloaderが取得したWebページに対して、コールバックメソッド名で指定した実装クラスのメソッドを実行します。

package org.example.kaibutsu.core.parser;

import org.example.kaibutsu.core.downloader.DownloaderRequest;
import org.example.kaibutsu.core.downloader.DownloaderResponse;
import reactor.core.publisher.Mono;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

public interface Parser {
    DownloaderRequest startRequest();

    default Mono<ParserResponse> parse(DownloaderResponse downloaderResponse) {
        Method[] declaredMethods = this.getClass().getDeclaredMethods();

        for (Method method : declaredMethods) {
            if (method.getName().equals(downloaderResponse.getCallbackKey())) {
                try {
                    return Mono.just((ParserResponse) method.invoke(this, downloaderResponse));
                } catch (IllegalAccessException | InvocationTargetException e) {
                    throw new ParserException("次のメソッドの呼び出しに失敗しました:" + downloaderResponse.getCallbackKey(), e);
                }
            }
        }

        throw new ParserException("次のメソッドが見つかりませんでした:" + downloaderResponse.getCallbackKey());
    }
}

ItemPipelineコンポーネント

ItemPipelineは、Webページから抽出したデータの後処理を管理します。フレームワークの使用者が要件に応じたItemPipelineの実装クラスを用意することで、データに対して任意の後処理を行うことができます。
例えば、Printerクラスはデータをコンソールに出力します。

├── itempipeline/
│   ├── ItemPipeline.java
│   └── Printer.java
public interface ItemPipeline {

    void open();

    void close();

    Item process(Item item);
}
public class Printer implements ItemPipeline {
    @Override
    public void open() {
    }

    @Override
    public void close() {
    }

    @Override
    public Item process(Item item) {
        System.out.println(item);
        return item;
    }
}

結論

今回は、Scrapyのような機能を持つJava製のウェブクローリングフレームワーク、Kaibutsuを開発しました。怪物というより子鹿のような状態ですが、自分で一から構築したことで親としての愛情がわきました。今後は次のような機能強化を考えているので、Kaibutsuの進化にぜひご期待ください。

  • 何よりもまずテストコードを(絶対に!)追加し、バグを修正する。
  • 各ドメインごとにスケジューラーのキューを用意し、複数のドメインを並行してクロールする機能を実装する。
  • クロールの進行状態や収集したデータを記録することで、障害発生時でも途中から再開できるようにする。
  • よく使用されるデータ処理(ファイル書き込み、データベース保存など)を標準のItemPipelineで提供し、開発者が一般的なデータ処理タスクは実装しなくて良くする。
  • ハッシュなどでコンテンツ持っておくことで、既にクロール済みのコンテンツでURLが違うページを重複してクロールしないようにする。
  • KaibutsuをMavenライブラリとして公開し、より多くのJava開発者が容易に利用できるようにする。
  • Scrapyのstartprojectコマンドのように、Kaibutsuプロジェクトを簡単に始められるCLIツールを開発する。

Discussion