🗺️

地理データ処理を Luigi でつなげた

2024/12/17に公開

pyspa Advent Calendar 2024 17 日目。昨日は @otiai10組織の成長を妨げる「100 点を目指す病」の考察 でした。

TL;DR

  • 小規模なデータ処理の自動化を相談された。フルタイムのソフトウェアエンジニアを雇うほどの規模ではなく、ローカルコンピューターでの自動化で十分だという。
  • 指定した領域の航空写真と高度データをダウンロードし、統合したファイルを生成する。
  • 処理のパイプラインを、Luigi フレームワークでちゃちゃっと作った。

前提と課題

知人の仕事を手伝うことになった。フルタイムのソフトウェアエンジニアを必要とするほどの規模ではないし、完全自動化サービスにする余裕もない。ローカルコンピューターでの自動化で十分な案件だ。

データ処理担当者が、処理パイプラインの各ステップを実装できている。ソフトウェアエンジニアリング専任ではない。完全な自動化は必要なく、必要に応じてちょいちょい手動での操作がある。エラーが発生した場合は再実行できれば十分で、これまでもそのように段階的に進めてきたという。

こういった手作業を効率化にしたいという要望があり、手動と自動の中間的なアプローチを目指していた。

機能要件

地理的なタイルのリストを入力として受け取るのが、起点になる。高緯度地域をばっさり切り捨てて、長方形に写像する Web メルカトル図法を使って、ある特定地域を長方形で分割する。その分割したのをタイルと呼ぶ。たぶん。 https://maps.gsi.go.jp/development/siyou.html

各タイルについて対応する航空写真と GSM データ(Ground Surface Model データ; 地表面の標高や地形を表現するデジタルデータで、TIFF 形式で保存され地理座標情報と紐付けられている)をダウンロードする。航空写真は PNG ファイルとして保存されるが、この段階では地理的な座標情報は含まれていない。

一方、GSM データは TIFF 形式で保存され、画像上の各ポイントが実際の地理座標と紐付けられている。この地理情報を活用して、航空写真のピクセル輝度値を地理座標系にマッピングし、最終的に GeoTIFF という形式で出力する。

解決策

Luigi を使って、パイプライン処理を作った。

地理系のライブラリは、conda を使えば一発でインストールできるが、それ以外の場合は依存ライブラリの設定が面倒になったりする。Python のパッケージング周りは相変わらず複雑なので、識者から概説を聞きたいところだ。

ダウンロード処理の失敗に備えて、リトライも有効にする。それでも時々エラーは発生する。ただし、ほとんどは接続の問題なので、再実行すれば解決する。

def main():
    runall = RunAll(input='/path/to/input', ...)
    luigi.build([runall], local_scheduler=True)

RunAll という疑似タスクを実行する。

import luigi

class RunAll(luigi.WrapperTask):
    """すべてのタスクを定義、実行する"""

		# インスタンス化するとき受け取るパラメーター
    input = luigi.Parameter()

    def requires(self):
        """実行するべきタスクを生成"""

        # タイル定義を読み込む
        with open(self.tiles_path) as fin:
            reader = csv.DictReader(fin)
            tiles = [t for t in reader]  # -> [{...}, {...}, ...]

        # タイルごとに複数のタスクを作る
        for tile in tiles:
            # タスクを作成
            yield DownloadDSM(...)  # 高度データ
            yield BuildOrthoGeoTIFF(...)  # 地理情報つき航空写真

Luigi の Task クラスの requires 関数は、そのタスクを開始するために必要な要件を返す。ただし、WrapperTask クラスをタスクランナー的に使う場合は、requires 関数に最終成果物を yield させる。

import requests

class DownloadDSM(_BaseImageTask):
    """高度データをダウンロードするタスク"""
    retry_count = 3

    def run(self):
		    """タスクの実行"""
	      # 画像をダウンロード
        res = requests.get(..., params={...})

        # ファイルに保存
        with self.output().open('w') as fout:
            fout.write(res.content)

    def output(self):
   	    """タスクの出力は、ローカルファイル"""
        return luigi.LocalTarget('...', format=luigi.format.Nop)

luigi のタスクランナーが run() メソッドを実行する。

output() メソッドはローカルファイルを表すオブジェクトを返す。用途はつある。まず run() メソッドでこれを使う。さらに、後続のタスクがこれを参照しているような気がする。

class BuildOrthoGeoTIFF(_BaseImageTask):
    """2つのファイルから、別ファイルを生成するタスク"""
    def requires(self):
        # DSMダウンロードタスクと オルソ画像ダウンロードタスクに依存
        # 各タスクの output() メソッドの戻り値、つまりファイルが存在していれば
        # このタスクを開始する
        return {
            'dsmTask': DownlaodDSM(...),  # 前述のタスク
            'orthoTask': DownloadOrthoImage(...),  # このブログでは省略
        }

    def run(self):
        ...

まとめ

お互いの信用があったので、もめなかった。相手はぽんぽん要求を盛らなかったし、作りっぱなしでよいという握りもあった。こちらも有給休暇の消化中に、無責任に作ってみるから、役に立つなら使ってみてよという感じ。

知らない者どうしだと、そうはいかないだろうから、安易にこんな仕事を勧めるつもりはない。

でも可能性を探りたいかもなと、ちょっと思った。そこそこの品質の隙間仕事を、そこそこの報酬で。でもクラウドソーシングとは元々そういうのを狙っていて、今の状況なのかも知れない。新しい職場はエンプラ営業担当なので、まったく逆向きなんだけど。

おまけ

データ処理とかパインプライン処理とか未経験だけど、たしか何年も前に @IanMLewis が社内勉強会で Luigi を紹介したのを、ぼんやり覚えていた。

明日は @nikuyoshi です。

Discussion