リアルタイムなストリーム処理/分散処理に思いを巡らせる

5 min read読了の目安(約5300字

ここ最近、ちょっとしたリアルタイムなストリーム処理と分散処理について触れる機会があったので、頭の整理も兼ねて得た知見をまとめてみる。

やりたいこととしては、IoTからAWSへ流れてくるデータをIoT単位である程度リアルタイムに計算処理することだった。
バッチ処理であれば定期的にDBなどの永続化機構から取り出してIoT単位で計算処理をすれば良いのだが、ある程度リアルタイムに処理する必要があり、どうしたものかという問題に直面した。
流れとしては(雰囲気として)以下の感じである。

いわゆる、ストリーム処理と分散処理に関する話だ(と、思っている)。

前提

  • 開発に使える時間と人員はほぼ無いこともあり、クラウドネイティブ(AWS)で殴りたい
    • できるだけ自前でリソースを作ったり管理しない方向で考えている

考え方

ストリーム処理と分散処理がキーになりそうなので、コンセプトをサラッとまとめる(より正確な意味は参考先のリンクなどを御覧ください)。

1. ストリーム is 何?

AWS公式が出しているストリーミングデータとはによると、

ストリーミングデータは、数千ものデータソースによって継続的に生成されるデータです。通常は、小さなサイズ (キロバイト単位) で同時にデータレコードが送信されます。ストリーミングデータには、お客様のモバイルアプリケーションやウェブアプリケーションで顧客によって生成されるログファイル、e コマースでの購入内容、ゲーム内でのプレイヤーのアクティビティ、さらにはソーシャルネットワーク、証券取引所の立会場、または地理空間サービスからの情報、およびデータセンター内の接続されたデバイスや計器からのテレメトリなど、広範なデータがあります。

とのこと。

重要そうな点はざっくりと

  • 大量のデータ
    • データ単位では小さいが、それが大量にある感じ
  • データ間で何らかの関係性を担保されている必要がありそう
    • 例えば「ゲーム内でのプレイヤーのアクティビティ」であれば、 時系列ユーザー単位特定のグループ単位 において意味を見いだせるようにする必要がありそう

上記に挙げた図であれば、

  • IoTがたくさんのデータを投げて来る
  • IoT単位で計算処理をする必要がある

といった所がストリームの要素を持っていると思う。

2. 分散処理 is 何?

IDC Frontier 分散処理によると、

分散処理とは、処理速度の向上とサーバー負荷軽減のために、1つの処理を分散して行う方式のことです。

分散処理には、1台のコンピュータに多数のプロセッサを搭載して処理する方法と、大規模データ処理を複数のサーバーに分散させ、処理結果をネットワーク上で共有する方法の2種類があります。

とのこと。

上記のIoTの例を出すと、IoTに対する計算処理を1台のサーバー(1つのプロセスやスレッドでもいいかもしれない)で処理していくと、IoTの台数が増えると1台のサーバーでは捌ききれなくなる(インスタンスのスペックを上げて殴るのはできるが、例えば1億台のIoTがアクセスしてくるとスペックで殴るのにも限界が来るだろう)。IoT単位で並行処理や並列処理をしたり、場合によってはサーバー自体分ける必要があるかもしれない。

そんな感じで、データをある特定の意味を持つ塊に分けて、それぞれの塊毎に処理をする感じである(あくまでも雰囲気であり、詳しいことは調べてみてください)。

難しさ

上記に挙げたストリームによる順序保証と大量データに伴う分散処理が合わさることで、実装としてどうしよう問題が出てくる。(大規模)分散処理に詳しい方であればSpark Streamingなどが解決案として思い浮かぶのかもしれないが、こちとら単なるバックエンドエンジニアなんだ。もう少し優しい問題でもバチは当たらんだろというお気持ちではある(ただの愚痴である)。

ぱっと考えたところの懸念点(というか頭の痛くなる点)は以下かなと思う

  • そもそも大規模分散システムなんぞ組んでる知見も時間も人員も居ないんじゃ 限られた時間内で動作保証と品質担保するのムズい
    • もちろん検証が必要だろうし、トラブルシュートどうするよ
  • ストリーム処理による順序保証はクラウドネイティブなサービスを使うことでいけそうだが、出口ではどうやってそれをIoT単位で捌くんだ?
  • 自前でバッチ用サーバー建ててストリーム用サービスの出口に繋いだとしても、結局IoT単位で分割して処理するには作り込みが要りそう...
    • 作り込みはもちろんのこと、自前でサーバーを建てると今度は管理が辛そう...

とまぁ、頭を悩ませたワケである。

解決法は?

結局は「解決策は時と場合による」というものであり銀の弾丸はないだろう。
以下に挙げたのはあくまでも解決策のうちの一つといったものであるので、他に方法があれば是非ともコメント頂きたい。

ストリーム処理

IoTが投げてくるデータ量,頻度,IoT自体の数にもよるが、AWSでは主にAmazon Simple Queue ServiceAmazon Kinesisを使う方法がありそう。

1. SQS

SQSはキューイングサービスであり、FIFO(先入れ先出し)にも対応しているので、順序保証の点では利用できると考える。このFIFOには メッセージグループID というシャードのような分割機構があり、FIFOの担保はこのメッセージグループID単位で実行されるので、IoTのID毎にメッセージグループIDを割り当てることでIoT間は気にしなくて良いようになりそうだ。

SQSのFIFOの仕様としては以下の感じである(公式より拝借)

高スループット: デフォルトでは、FIFO キューは毎秒最大 300 件のメッセージをサポートします (毎秒送信、受信、または削除オペレーション 300 件)。オペレーションあたり 10 件のメッセージ (最大) というバッチ処理を実行する場合、FIFO キューは 1 秒あたり最大 3,000 件のメッセージをサポートできます。クォータの引き上げをリクエストする場合は、サポートリクエストを提出してください。

1 回だけの処理: メッセージは 1 度配信されると、ユーザーがそれを処理して削除するまでは使用可能な状態に保たれます。キューでメッセージの重複が起きることはありません。

1秒間の最大オペレーション数が300件という制限があるので、それを念頭に技術選定を行う必要がある。扱う対象のストリームの頻度と規模がどれくらいになりそうかを検討した上で使うのが良いだろう。

参考にした記事

2. Kinesis

Kinesisはデータストリーム用のサービスであり、テキストから動画まで様々なものに対応している。データストリームがしたければひとまずKinesisを選択しておくのが無難といったところだろう。

Kinesisにはシャードという機構を有しており、シャード単位でグルーピングしてその中で順序を担保するような仕組みになっている。シャードはパーティーションキー(Kinesisへデータ送信時に指定するパラメータの1つ)というものを元に作成するため、Kinesisへデータ送信する際にパーティーションキーを設定することで、ハンドリングしたい単位でシャードをつくることができる。

IoTが持つユニークなID単位でパーティーションキーを設定しておくことでIoT単位に順序保証されたストリームを作ることが出来る。

注意点としてはシャード単位で課金されるので、シャード数が増えればその分費用も掛かるだろうということだ。費用がどれくらい掛かりそうかは設定するシャード数をおよそ検討して予め試算しておくことが賢明だろう。

参考にした記事

分散処理

ゴツい機構を自前で構築する余裕がない前提であれば、AWS Lambdaが無難だろうと思っている。SQSもKinesisもLambdaとのインテグレーションを有しているので、簡単にサービスを連携できる。

特にKinesisに関しては、Lambdaで単一のシャードからポーリングする際の同時実行数を設定できる。これを1に設定しておくことで、シャード内では確実に直列処理を実行できるようになるのではないかと思う(もちろん要検証ではあるだろうが確度はかなり高いのではなかろうか)。

未知と遭遇したときにどうすればいいのか?

  • 問題にぶち当たったときには、それが何に起因するものなのかを紐解いていくのが良いだろう
    • 複雑怪奇なものをそのまま取り扱うのはメンタル的にもくるものがある
    • 紐解く過程で問題の本質や関係性が見えてくるので、作戦を練りやすくなるだろう
  • 大体の問題の本質は一般的なものに着地することが多い
    • 「この問題に直面しているのがこの世で俺だけだ!」という状況でなければ、基本的に誰かが何かの解決策や情報を持っていることがある(ググろう)
      • もしそれが世界初のものであれば、おめでとう!記事にして後世に残そう!
    • その問題が一般的なものに起因するものであれば、おそらく解決策も転がっているだろう
      • AWSと絡めて調べることで、だいたいそれに特化したサービスにたどり着けるのではなかろうか

まとめ

この業界で何年やってても知らないことは山ほど出てくる。これだからソフトウェア・エンジニアリングってやつはやめられないんだ。

その他参考