GCOM-Cの観測データをグラフ化する

2020/09/28に公開

はじめに

2020年の日本の夏は大変暑い日々でした。

2020年は史上1番目か2番目に暑い年に とあるように日本だけでなく世界的に暑い夏だったようです。

「今年が一番暑い」と毎年言っているような気がしますが、それだけ異常気象が常態化しつつあるということでしょう。

そんな地球の気象をグラフで手軽に可視化できないかと思い立ってやってみたのが本記事の内容です。

気象データのソースとしてはGCOM-Cの観測値を、技術としてはPythonとElasticsearchを利用しています。

  • グラフのデータはGCOM-Cが観測を開始した2018年以降に限られます。
  • 容量をめちゃくちゃ食います。あまり実際的ではないかもしれません。

GCOM-C(しきさい)

GCOM-Cという日本の人工衛星があります。
海洋や陸上の状態を最大250mおきに2~3日に1回観測しており、データはすべて無料で入手することができます。
データは衛星が打ち上げられた2018年以降のものに限られますが、シリーズで10年以上の観測が予定されているので、今後これだけの分解能と頻度のデータを当たり前のものとして使っていけるのはありがたいですね。

課題

人工衛星のデータは通常、観測した瞬間のデータを写真のように1つのファイルとして保存、配布されます。

衛星と地球の位置関係は常に変化しているため、ぴったり同じ地点が観測できるとは限らないし、毎日ぴったり同じ時刻に観測ができるとは限らないです[1]

そのため、東京上空を毎日観測しているデータがあっても、各ピクセルの座標(経度緯度など)はファイルごとに微妙に異なるし、観測される時刻もファイルごとに揃うことはほぼありません(hourは揃ってもminute以下ではまず揃わない)。「この地点のデータを集めたい」と思った時、大量にあるファイルの中からファイル名でだいたいの絞り込んで、さらに欲しい地点の座標に最も近いピクセルはどれで、なんて処理をせねばならず、時系列データとして人間が直感的に扱いにくいのが現状です[2]

この課題を解消すべくAnalysis Ready Dataというフォーマットが作られたりはしていますが、今回は別のアプローチを試してみたいと思います。

方針

各ピクセルのデータを時空間(Spatiotemporal)データとしてElasticsearchに放りこんでいくことにします。
Elasticsearchはオープンソースの検索エンジンです。
検索速度が速いというのもありますが、柔軟に条件を指定できるのと、経度緯度とそこからの距離[3]により位置検索ができるため今回採用しました。

環境

dockerおよびdocker-composeを利用します。
dockerのインストールは各自行ってください。

  1. gcomc-pointgit cloneしディレクトリに移動
  2. docker-compose build
  3. docker-compose up -d
  4. http://localhost:8888/ にアクセス
  5. 終わったらdocker-compose down

これでElasticsearchとPythonの環境が整いました。

  • 筆者はmac上で開発を行いました。
  • DockerのDisk image sizeは20G以上を推奨します(処理するファイル数次第ではそれでも足りません)。

Elasticsearchを試してみる

Elasticsearchでやりたいことができるか検証してみます。
work/sample_es.ipynbを開いてみてください。

from elasticsearch import Elasticsearch

es = Elasticsearch("http://es:9200")
index = 'test'
# テストなので毎回インデックスを作り直す
try:
    es.indices.delete(index)
except:
    pass
es.indices.create(index)

マッピングは以下のようにします。
「同一ファイルID内で指定した座標から最も近い値を日付順に取得」をしたいので、file_id,location,date,valueを用意します。
また、衛星データではピクセルごとに品質の目安となるQAフラグが与えられていることがよくあります。
フラグがこの値のvalueのみ取得したい、などQAフラグで絞り込みを行えるようにqaを用意しておきます。

mapping = {
  "properties": {
    "file_id": {
      "type": "keyword"
    },
    "location": {
      "type": "geo_point"
    },
    "date": {
      "type": "date"
    },
    "qa": {
        "type": "short"
    },
    "value": {
        "type": "integer"
    }
  }
}
es.indices.put_mapping(index=index, body=mapping)

テストデータを入れていきます。ドキュメントを1つずつcreateすると遅いのでbulkで一気に流し込みます。

from elasticsearch import helpers

def gendata(index, items):
    for item in items:
        yield {
            "_op_type": "create",
            "_index": index,
            "_source": item
        }
# テストデータ
items = [
    {"file_id": "Tokyo", "location": [139.74730, 35.68464], "date": "2014-01-01", "qa": [1], "value": 28},
    {"file_id": "Tokyo", "location": [139.44798, 35.69978], "date": "2015-01-01", "qa": [0, 1], "value": 24},
    {"file_id": "Tokyo", "location": [139.45015, 35.69464], "date": "2016-01-01", "qa": [2], "value": 25},
    {"file_id": "Hokkaido", "location": [142.51199, 42.81914], "date": "2013-01-01", "value": 18},
    {"file_id": "Osaka", "location": [135.60643, 34.76895], "date": "2012-01-01", "qa": [1], "value": 26},
    {"file_id": "Aichi", "location": [136.88158, 35.17095], "date": "2015-01-01", "qa": [0, 1, 2], "value": 25},
    {"file_id": "Fukuoka", "location": [130.36511, 33.57753], "date": "2016-01-01", "qa": [0], "value": 27}
]

helpers.bulk(es, gendata(index, items))

絞り込みを行わずserachすると全件(7件)取得できます。

es.search(index=index, body={})
> {'took': 13,
>  'timed_out': False,
>  '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},
>  'hits': {'total': {'value': 7, 'relation': 'eq'},
>   'max_score': 1.0,
>   'hits': [{'_index': 'test',
>     '_type': '_doc',
> ...

クエリをいろいろ試してみましょう。

東京のある地点から600km圏内で絞り込み

Tokyo, Aichi, Osakaの5件が取得できます。

query = {
   "query":{
      "bool":{
         "must":{
            "match_all":{
               
            }
         },
         "filter":{
            "geo_distance":{
               "distance":"600km",
               "location":[139.45, 35.695]
            }
         }
      }
   }
}
es.search(index=index, body=query)

2013年以降、2016年より前で絞り込み

3件取得できます。

query = {
   "query":{
      "bool":{
         "must":{
            "range": {
              "date": {
                "gte": "2013-01-01", 
                "lt": "2016-01-01",
                "time_zone": "+00:00"
              }
            }
         }
      }
   }
}
es.search(index=index, body=query)

QAフラグに0と1を含む条件で絞り込み

[0, 1]と[0, 1, 2]の2件取得できます。

query = {
   "query":{
      "bool":{
         "must": [
            {
                "match": {
                  "qa": 1
                }
            },
            {
                "match": {
                  "qa": 0
                }
            }
         ]
      }
   }
}
es.search(index=index, body=query)

QAフラグに0または1を含み、2を含まない条件で絞り込み

4件取得できます。

query = {
   "query":{
      "bool":{
         "should": [
            {
                "match": {
                  "qa": 1
                }
            },
            {
                "match": {
                  "qa": 0
                }
            }
         ],
          "must_not": [
            {
                "match": {
                  "qa": 2
                }
            }
          ]
      }
   }
}
es.search(index=index, body=query)

複合条件

  • QAフラグに2を含まない
  • 東京のある地点から600km圏内
  • 時刻でソート
  • file_idが同じデータ内で距離が近い順にソートする。
query = {
   "query":{
      "bool":{
         "must_not": [
            {
                "match": {
                  "qa": 2
                }
            }
         ],
         "filter":{
            "geo_distance":{
               "distance":"600km",
               "location":[139.45, 35.695]
            }
         }
      }
   },
   "sort":[
      {
         "date":{
            "order":"asc"
         }
      }
   ],
   "collapse":{
      "field":"file_id",
      "inner_hits":{
         "name":"distance",
         "size":5,
         "sort":[
            {
               "_geo_distance":{
                  "location":{
                     "lat":35.6970,
                     "lon":139.7735
                  }
               }
            }
         ]
      }
   }
}
es.search(index=index, body=query)

['hits']['hits'][i]['inner_hits']['hits'][0]が各file_id内で最も条件に一致した値です。

どうやらElasticsearchでやりたいことが十分に実現できそうです。

データの読み取り

ダウンロード

GCOM-CのデータはG-Portalで入手できます。
アカウント(無料)を作ってから、「衛星からの検索」->「GCOM-C/SGLI」のプルダウンを開きます。
LEVEL1, LEVEL2, LEVEL3と処理レベル別にデータを選択できますが、LEVEL1は物理量に変換する前のデータで素人には扱いが難しく、LEVEL3は週や月ごとに統計処理され分解能が若干落ちたデータとなっています。
今回はせっかくなので、最大250m分解能で2~3日に1回の頻度で観測というGCOM-Cのメリットを享受できるLEVEL2のデータを選びましょう。

「L2-地表面温度」を選択します。
LEVEL2の表面温度を選択している例

歯車アイコンをクリックすると開く「詳細条件設定」で「品質」をGoodのみにしておくと、観測がうまくいかなかったデータを検索から除外することができます。
詳細条件設定

あとは「関心領域」にデータが欲しい範囲を選択して検索を行います。
範囲を指定

条件にあったファイルの一覧が表示されるので、ポチポチと1つずつダウンロードしていってください。
初めは10ファイルくらいで良いと思います。
ダウンロードしたファイルはsourceディレクトリに移動します。

HDF5ファイル

GCOM-CのデータはHDF5形式で配布されています。
ファイルの構造がどうなっているかはHDF Viewで確認できます。

GCOM-Cのデータは6種類の投影タイプが存在しますが、LEVEL2では「タイル」と「シーン」のプロダクトを扱うことができれば十分です。
GCOM-Cのデータをどのように扱えばいいか理解するには、GCOM-Cプロダクト利用の手引き
(入門編)
を一読することをオススメします。

ここでは地表面温度などの陸域プロダクトの投影タイプである「タイル」のデータを扱うコードを見ていきます。(work/gcomc_reader.py)。

import os, re, datetime, math, h5py
import numpy as np

class Tile:
    # support level: L2
    # support file: LST, AGB, LAI, VGI, CLFG, CLPR, ARNP, ARPLK
    def __init__(self, hdf5path, product = None):
        self.granule_id = os.path.splitext(os.path.basename(hdf5path))[0]
        self.observation_date = datetime.datetime(int(self.granule_id[7:11]), int(self.granule_id[11:13]), int(self.granule_id[13:15]), tzinfo=datetime.timezone.utc)
        self.nodata_value = np.NaN

        self.vtile = int(self.granule_id[21:23])
        self.htile = int(self.granule_id[23:25])
        self.product = self.granule_id[31:35].replace('_', '') if product == None else product
        self.resolution = self.granule_id[35]

        with h5py.File(hdf5path, 'r') as f:
            self.status = f['Processing_attributes'].attrs['Processing_result'][0].decode() 
            processing_ut = f['Processing_attributes'].attrs['Processing_UT'][0].decode()
            self.processed = datetime.datetime.strptime(processing_ut + '+0000', '%Y%m%d %H:%M:%S%z')
            self.algorithm_version = f['Global_attributes'].attrs['Algorithm_version'][0].decode()
            start_time = f['Global_attributes'].attrs['Image_start_time'][0].decode()
            self.start_time = datetime.datetime.strptime(start_time + '+0000', '%Y%m%d %H:%M:%S.%f%z')
            end_time = f['Global_attributes'].attrs['Image_end_time'][0].decode()
            self.end_time = datetime.datetime.strptime(end_time + '+0000', '%Y%m%d %H:%M:%S.%f%z')

            self.lower_left = [float(f['Geometry_data'].attrs['Lower_left_longitude'][0]), float(f['Geometry_data'].attrs['Lower_left_latitude'][0])]
            self.lower_right = [float(f['Geometry_data'].attrs['Lower_right_longitude'][0]), float(f['Geometry_data'].attrs['Lower_right_latitude'][0])]
            self.upper_left = [float(f['Geometry_data'].attrs['Upper_left_longitude'][0]), float(f['Geometry_data'].attrs['Upper_left_latitude'][0])]
            self.upper_right = [float(f['Geometry_data'].attrs['Upper_right_longitude'][0]), float(f['Geometry_data'].attrs['Upper_right_latitude'][0])]

            data = f['Image_data'][self.product][:].astype(np.float64)
            slope = f['Image_data'][self.product].attrs['Slope'][0]
            offset = f['Image_data'][self.product].attrs['Offset'][0]
            error_dn = f['Image_data'][self.product].attrs['Error_DN'][0]
            min_valid_dn = f['Image_data'][self.product].attrs['Minimum_valid_DN'][0]
            max_valid_dn = f['Image_data'][self.product].attrs['Maximum_valid_DN'][0]
            self.unit = f['Image_data'][self.product].attrs['Unit'][0].decode()

            with np.errstate(invalid='ignore'):
                data[data == error_dn] = self.nodata_value
                data[(data < min_valid_dn) | (data > max_valid_dn)] = self.nodata_value
            self.data = slope * data + offset

            self.lines = int(f['Image_data'].attrs['Number_of_lines'][0])
            self.pixels = int(f['Image_data'].attrs['Number_of_pixels'][0])
            self.grid_interval_num = f['Image_data'].attrs['Grid_interval'][0]

            try:
                self.qa_flags = f['Image_data']['QA_flag'][:]
                self.qa_description = f['Image_data']['QA_flag'].attrs['Data_description'][0].decode()
            except:
                self.qa_flags = None
                self.qa_description = None
            
            try:
                obs_time = f['Geometry_data']['Obs_time'][:].astype(np.float64)
                obs_time_offset = f['Geometry_data']['Obs_time'].attrs['Offset'][0]
                obs_time_slope = f['Geometry_data']['Obs_time'].attrs['Slope'][0]
                obs_time_error_dn = f['Geometry_data']['Obs_time'].attrs['Error_DN'][0]
                obs_time_min_valid_dn = f['Geometry_data']['Obs_time'].attrs['Minimum_valid_DN'][0]
                obs_time_max_valid_dn = f['Geometry_data']['Obs_time'].attrs['Maximum_valid_DN'][0]

                with np.errstate(invalid='ignore'):    
                    obs_time[obs_time == obs_time_error_dn] = self.nodata_value
                    obs_time[(obs_time < obs_time_min_valid_dn) | (obs_time > obs_time_max_valid_dn)] = self.nodata_value
                    obs_time = obs_time_slope * obs_time + obs_time_offset
                self.obs_time = obs_time * 3600.0 + np.ones((self.lines, self.pixels), dtype="float64") * self.observation_date.timestamp()
            except:
                self.obs_time = None

    def is_good(self):
        return self.status == 'Good'

    def get_point(self, line, pixel):
        value = self.data[line, pixel]     
        if np.isnan(value):
            raise Exception('error value.')

        try:
            obs_time = self.obs_time[line, pixel]
            if np.isnan(obs_time):
                raise Exception
            observation_datetime = datetime.datetime.fromtimestamp(obs_time, datetime.timezone.utc)
        except:
            observation_datetime = None
        
        vtile_num = 18
        v_pixel = int(self.lines * vtile_num)
        d = 180 / v_pixel
        NL = v_pixel  # 180 / d
        NP0 = 2 * NL
    
        lin_total = line + (self.vtile * self.lines)
        col_total = pixel + (self.htile * self.pixels)
        lat = 90 - (lin_total + 0.5) * d
        NPi = int(round(NP0 * math.cos(math.radians(lat)), 0))
        lon = 360 / NPi * (col_total - NP0 / 2 + 0.5)
        
        try:
            qa_flag = int(self.qa_flags[line, pixel])
            qa_flags = [i for i, qa in enumerate(reversed(format(qa_flag, 'b'))) if int(qa) == 1]
        except:
            qa_flags = None

        return {
            'location': [lon, lat],
            'value': value,
            'observation_datetime': observation_datetime,
            'qa_flags': qa_flags
        }

work/sample_read_hdf5.ipynbで実際にファイルを開いてみます。

import gcomc_reader

file_path = '../source/GC1SG1_20200915A01D_T0221_L2SG_LST_Q_2000.h5' # ダウンロードしたファイルに合わせて変更
r = gcomc_reader.Tile(file_path)
print('granule_id: {}'.format(r.granule_id))
print('algorithm_version: {}'.format(r.algorithm_version))
print('start_time: {}'.format(r.start_time))
print('end_time: {}'.format(r.end_time))
print('size: {}, {}'.format(r.lines, r.pixels))
print('unit: {}'.format(r.unit))
print('qa_description: {}'.format(r.qa_description))
> granule_id: GC1SG1_20200915A01D_T0221_L2SG_LST_Q_2000
> algorithm_version: 2.20
> start_time: 2020-09-15 12:29:49.200000+00:00
> end_time: 2020-09-15 17:31:48+00:00
> size: 4800, 4800
> unit: Kelvin
> qa_description: 00:no input data; 01:land/water flag(0=land/1=water); 02:Spare; 03:no CLFG; 04:no VNR/SWR; 05:Snow; 06:Sensor zenith angle > 33; 07:Sensor zenith angle > 43; 08:TR1 < 0.6; 09:RES > 1[K]; 10:RES > 2[K]; 11:Probably Cloudy; 12:Cloudy; 13:TS out of range; 14:land/water flag(0=land/1=water); 15:no input data;

観測時刻はピクセル毎に異なるため、ファイル全体の時刻情報はstartとendになります。
ファイルのサイズは4800*4800ピクセルですが、そのすべてにデータがあるわけではありません。
山陰や悪天候により十分な観測が行えなかった、対象外の領域(陸域データであれば海や湖、川)だったなどの理由でNoDATAの場合があります。
というよりファイルによっては半分以上がNoDATAなんてことがザラです。

プロダクトを生成するアルゴリズムは日々改良が加えられており、そのバージョンはalgorithm_versionで確認できます(2020/09時点では2.20が最新)。
バージョンによって後述するqa_flagsの値の意味が異なるため、過去のファイルを取得する際はまだ最新のアルゴリズムで再処理がされていない可能性があります。
その場合はファイルによってフラグの扱いを変えなければならないので注意してください。

try:
    print(r.get_point(3040,410))
except:
    print('no data')
> {'location': [69.55463368816531, 63.665625000000006], 'value': 273.3999938890338, 'observation_datetime': datetime.datetime(2020, 9, 15, 15, 51, 45, 939332, tzinfo=datetime.timezone.utc), 'qa_flags': [6, 9, 11]}

get_pointは指定した座標がNoDATAだった場合Errorを投げます。
返り値のqa_flagsは値が1だったフラグ番号の配列です。各フラグ番号の意味はqa_descriptionを確認してください。

データ投入

ではダウンロードしたファイルのデータをElasticsearchに入れていきましょう。
work/sample_insert.ipynbを開いてみてください。

地表面温度(LST)を対象とします。

from elasticsearch import Elasticsearch, helpers
from pathlib import Path
import time
import gcomc_reader

es = Elasticsearch("http://es:9200")
filename_identifier = 'LST'
product = 'LST'
source_path = "../source"

各ピクセルのデータを入れるインデックスとファイル情報と処理ステータスを入れるインデックスを用意します。

product_index = product.lower()
# es.indices.delete(product_index) #indexを作り直す場合 

if not es.indices.exists(index=product_index):
    es.indices.create(index=product_index, body={
        "mappings": {
            "properties": {
                "granule_id": {
                  "type": "keyword"
                },
                "location": {
                  "type": "geo_point"
                },
                "value": {
                    "type": "float"
                },
                "observation_datetime": {
                  "type": "date"
                },
                "qa_flags": {
                    "type": "short"
                }
            }
        }
    })
    
process_index = 'process'
# es.indices.delete(process_index) #indexを作り直す場合 

if not es.indices.exists(index=process_index):
    es.indices.create(index=process_index, body={
        "mappings": {
            "properties": {
                "location": {
                  "type": "geo_shape"
                },
                "start_datetime": {
                  "type": "date"
                },
                "end_datetime": {
                  "type": "date"
                },
                "processed_datetime": {
                  "type": "date"
                },
                "good": {
                    "type": "boolean"
                },
                "algorithm_version": {
                    "type": "keyword"
                },
                "product": {
                    "type": "keyword"
                },
                "done": {
                    "type": "boolean"
                }
            }
        }
    })

続いて、source内にあるLSTのファイルを読み込んで処理していきます。

  • ファイル名から読み取っている情報もあるので名前はダウンロードしたまま変更しないでください。
  • 分解能250m(Q)のファイルのみ処理の対象とします。
  • 品質がGood以外のファイルはスキップします。
p = Path(source_path)

for file_path in p.glob("*{}*.h5".format(filename_identifier)):
    r = gcomc_reader.Tile(str(file_path), product)
    print(r.resolution == 'Q')
    processed = es.search(index=process_index, body={"query":{"bool":{"must":[{"term":{"_id":r.granule_id}},{"term":{"done":True}}]}}})
    if processed['hits']['total']['value'] == 0 and r.resolution == 'Q':
        print("start: {}".format(r.granule_id))
        start = time.time()
        es.index(index=process_index, id=r.granule_id, body={
            "location": {
                "type" : "polygon",
                "coordinates" : [
                  [r.lower_left, r.lower_right, r.upper_right, r.upper_left, r.lower_left]
                ]
            },
            "start_datetime": r.start_time,
            "end_datetime": r.end_time,
            "processed_datetime": r.processed,
            "good": r.is_good(),
            "algorithm_version": r.algorithm_version,
            "product": r.product,
            "done": False
        })

        if r.is_good() == True:
            items = []
            for line in range(r.lines):
                for pixel in range(r.pixels):
                    try:
                        item = r.get_point(line, pixel)
                        if item['value'] is not None and item['observation_datetime'] is not None:
                            item['granule_id'] = r.granule_id
                            id = '{}_{}_{}'.format(r.granule_id, line, pixel)
                            items.append({'_op_type': 'index', '_index':product_index, '_id':id, '_source':item})

                            if len(items)>10000:
                                helpers.bulk(es, items)
                                items = []
                    except:
                        pass

            if len(items)>0:
                helpers.bulk(es, items)
        es.update(index=process_index, id=r.granule_id, body={"doc": {"done": True}})
        print("done: {}[s]".format(time.time() - start))
    else:
        print("passed: {}".format(r.granule_id))

処理時間は、筆者の環境では1ファイルにつき10~30分ほどかかりました。10ファイルくらいなら寝る前に実行しておくとちょうどいい感じでしょうか[4]

あるファイルでは4800*4800点中6368156点が有効で、サイズは875.3MBでした(1点あたり137byte目安)。ファイルによってはもっとサイズが大きくなります。
ファイルの元のサイズは10~40MBなのでずいぶんとサイズが膨らみました[5]
処理するファイル数次第ではDockerのvolumeに注意してください。そういうのを気にしたくなければdocker-compose.ymlのvolumes設定をbindにするなどいい感じに変更しましょう。

最終的に何点データが作られたかは以下のコードで確認できます。

print("created points: {}".format(es.count(index=product_index)))
print("processed files: {}".format(es.count(index=process_index)))

グラフ化

処理したファイルの数は人によって異なるでしょうが、筆者は今回タイル番号0221のファイルを2020/09/15から2020/09/25の10日分(21ファイル、179817666点)を処理しました[6]
参考に処理したファイルの一覧です。

GC1SG1_20200915A01D_T0221_L2SG_LST_Q_2000.h5	GC1SG1_20200919A01D_T0221_L2SG_LST_Q_2000.h5	GC1SG1_20200923A01D_T0221_L2SG_LST_Q_2000.h5
GC1SG1_20200915D01D_T0221_L2SG_LST_Q_2000.h5	GC1SG1_20200919D01D_T0221_L2SG_LST_Q_2000.h5	GC1SG1_20200923D01D_T0221_L2SG_LST_Q_2000.h5
GC1SG1_20200916A01D_T0221_L2SG_LST_Q_2000.h5	GC1SG1_20200920A01D_T0221_L2SG_LST_Q_2000.h5	GC1SG1_20200924A01D_T0221_L2SG_LST_Q_2000.h5
GC1SG1_20200916D01D_T0221_L2SG_LST_Q_2000.h5	GC1SG1_20200920D01D_T0221_L2SG_LST_Q_2000.h5	GC1SG1_20200924D01D_T0221_L2SG_LST_Q_2000.h5
GC1SG1_20200917A01D_T0221_L2SG_LST_Q_2000.h5	GC1SG1_20200921A01D_T0221_L2SG_LST_Q_2000.h5	GC1SG1_20200925A01D_T0221_L2SG_LST_Q_2000.h5
GC1SG1_20200917D01D_T0221_L2SG_LST_Q_2000.h5	GC1SG1_20200921D01D_T0221_L2SG_LST_Q_2000.h5	GC1SG1_20200918A01D_T0221_L2SG_LST_Q_2000.h5	
GC1SG1_20200922A01D_T0221_L2SG_LST_Q_2000.h5	GC1SG1_20200918D01D_T0221_L2SG_LST_Q_2000.h5	GC1SG1_20200922D01D_T0221_L2SG_LST_Q_2000.h5

work/sample_graph.ipynbを開いてみてください。
地表面温度をセ氏に直してグラフ化します。

from elasticsearch import Elasticsearch, helpers
import datetime
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.dates import date2num
import matplotlib.ticker as ticker

es = Elasticsearch("http://es:9200")
product = 'LST'
product_index = product.lower()

def draw(searched):
    def get_value(hit):
        return hit['inner_hits']['distance']['hits']['hits'][0]['_source']['value']
    
    def get_date(hit):
        return hit['inner_hits']['distance']['hits']['hits'][0]['_source']['observation_datetime']
    
    x = [datetime.datetime.fromisoformat(d) for d in map(get_date, searched['hits']['hits'])]
    y = np.array(list(map(get_value, searched['hits']['hits']))) - 273.15

    figure = plt.figure(1, figsize=(8,4))
    axes = figure.add_subplot(111, title='LST', ylabel='[deg C]')
    axes.plot(x, y, 'o-')
    axes.xaxis.set_major_locator(ticker.FixedLocator(date2num(x)))
    axes.set_xticklabels([d.strftime('%Y/%m/%d %H:%M:%S') for d in x], rotation=320, ha='left')

タイル番号0221に含まれる適当な座標(経度, 緯度)から1km以内で絞り込んでみます。
※ 座標は自分がダウンロードした領域にあわせて変更してください。

point = [75.05859375000006, 62.0043408521]

searched = es.search(index=product_index, body={
    "query": {
        "bool": {
            "must": {
                "match_all": {}
            },
            "filter": {
                "geo_distance": {
                    "distance": "1km",
                    "location": point
                }
            }
        }
    },
    "sort": [
        {
            "observation_datetime": {
                "order": "asc"
            }
        }
    ],
    "collapse": {
        "field": "granule_id",
        "inner_hits": {
            "name": "distance",
            "size": 1,
            "sort": [
                {
                    "_geo_distance": {
                        "location": point
                    }
                }
            ]
        }
    }
})
print(searched)
> {'took': 46, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 407, 'relation': 'eq'}, 'max_score': None, 'hits': [{'_index': 'lst', '_type': '_doc', '_id': 'GC1SG1_20200915D01D_T0221_L2SG_LST_Q_2000_3834_2505', '_score': None, '_source': {'location': [75.04846227834365, 62.01145833333334], 'value': 284.179993648082, 'observation_datetime': '2020-09-15T06:00:20.394679+00:00', 'qa_flags': [9], 'granule_id': 'GC1SG1_20200915D01D_T0221_L2SG_LST_Q_2000'}, 'fields': {'granule_id': ['GC1SG1_20200915D01D_T0221_L2SG_LST_Q_2000']}, 'sort': [1600149620394], 'inner_hits': {'distance': {'hits': {'total': {'value': 19, 'relation': 'eq'}, 'max_score': None, 'hits': [{'_index': 'lst', '_type': '_doc', '_id': 'GC1SG1_20200915D01D_T0221_L2SG_LST_Q_2000_3835_2508', '_score': None, '_source': {'location': [75.05622688039458, 62.009375000000006], 'value': 282.7999936789274,
> ...

きちんとデータが返ってきました。レスポンスタイム(took)も46msと速いです。
これをグラフにしましょう。

draw(searched)

75.05859375000006, 62.0043408521, all

今回処理していない地点を指定すると何も返ってきません。

point = [0, 0]
searched = es.search(index=product_index, body={...})
print(searched)
> {'took': 14, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 0, 'relation': 'eq'}, 'max_score': None, 'hits': []}}

座標で簡単に検索できることはわかったので、次はQAフラグを条件に加えてみます。
GCOM-Cの地表面温度のQAフラグはアルゴリズムver2.20では以下のようになっています。

00:no input data; 
01:land/water flag(0=land/1=water); 
02:Spare; 
03:no CLFG; 
04:no VNR/SWR; 
05:Snow; 
06:Sensor zenith angle > 33; 
07:Sensor zenith angle > 43; 
08:TR1 < 0.6; 
09:RES > 1[K]; 
10:RES > 2[K]; 
11:Probably Cloudy; 
12:Cloudy; 
13:TS out of range; 
14:land/water flag(0=land/1=water); 
15:no input data;

それぞれが何を表しているか詳しく知りたい人はJAXAのドキュメントを読み漁ってもらうとして、今回は雲っぽいと判定されているピクセル([11, 12]が1)を除外してみることにします[7]

point = [75.05859375000006, 62.0043408521]

searched = es.search(index=product_index, body={
    "query": {
        "bool": {
            "must_not" : [
                {"term":{"qa_flags": 11}},
                {"term":{"qa_flags": 12}}
            ],
            "filter": {
                "geo_distance": {
                    "distance": "1km",
                    "location": point
                }
            }
        }
    },
    "sort": [
        {
            "observation_datetime": {
                "order": "asc"
            }
        }
    ],
    "collapse": {
        "field": "granule_id",
        "inner_hits": {
            "name": "distance",
            "size": 1,
            "sort": [
                {
                    "_geo_distance": {
                        "location": point
                    }
                }
            ]
        }
    }
})
draw(searched)

75.05859375000006, 62.0043408521, not cloud
さきほどと同じ地点ですが、点がまばらになっています。

このように時空間データに一度直してしまえば、様々な条件で素早くグラフの作成が可能となります。

まとめ

Elasticsearchを利用してGCOM-Cの観測値データを時空間データとして扱う方法を紹介しました。

一度Elasticsearchに入れてしまえばデータの取り回しが速く簡便で解析も捗りそうですが、その処理には時間とリソースを要するのが難点ではあります。

実際的ではないですが、Pythonより処理速度の速い言語に切り替える、インデックスやマッピングを最適化するなど工夫のしどころはありますし、単純にクラウドを使って数の暴力でどうにかすることもできるので、興味が出たらお試しあれ。

脚注
  1. ひまわりのような静止軌道衛星は除く ↩︎

  2. GCOM-Cの陸域データが利用する「タイル」という投影タイプではピクセル毎の座標が揃えられているので扱いは比較的容易だが、それでも各ファイルから特定の座標の値を取り出すのはやはりめんどうである。 ↩︎

  3. 回転楕円体上の距離を算出してくれる。 ↩︎

  4. 2018年からのデータすべてを処理しようと思うとどれかくらいかかるでしょうか。並列処理したとしても……。これが実際的でない理由1です。 ↩︎

  5. 得たいデータの範囲や時刻、プロダクト種別を増やすと必要になる容量がどれくらいになるか、想像するだけで頭が痛くなります。これが実際的でない理由2です。 ↩︎

  6. およそ13GBです。 ↩︎

  7. QA_flag情報を参考に、0, 1, 4, 11, 12, 13, 14, 15が1のピクセルをマスク(除外)すると、品質の高いデータを絞り込めます。 ↩︎

Discussion