dkronのソースコードリーディングメモ
https://github.com/distribworks/dkron のソースコードを読んでみる
dkron
distributed cronっていう意味っぽい?多分。
このwhite paperにインスパイされたそう。
まず、これを読んでみる。
最初にそもそもcrondとは?みたいな話がある。
確かにcrond(crontab)の実装とか知らんなぁ。
crondの信頼性の話をしている。
crondはcrondが起動していなかった間に実行予定だったジョブは保持されないとか、マシーンが故障していると、スケジューラーが動いていないとダメとか色々と問題がある。
anacronっていうコマンドがあるらしく、それは一つ目の問題に対処できるらしい。
crond自体の性質以外にも、crondによって実行されるjobに関しても問題がある。
冪等性があるかどうかによって実行されなかったjobをどう扱うかは話が変わってくるよね。っていう話。
cronをスケールさせる上で、上記のような問題があるが、どうしましょう?っていう話。
そのためにどんな設計変更が必要なのか?を以下につづける。
設計変更の部分は翻訳を読んでもいまいち分からなかった。後でもう一回読み返す。
Extended InfrasturctureとExtended Requirementのところ。
何はともあれgoogleは以下の選択をした。
cronが管理するジョブの状態を保持するためにcronシステム自体に情報をもたせる。理由は以下の通り。
- GFS(Google File System)やHDFS(Hadoop Distributed File System)は使わなかった。なぜならそれらは非常に大きなファイルのために設計されており、cronジョブの状態といった小さいデータを管理するにはオーバースペックで、そのためのオーバーヘッドが高い。
- また、cronみたいに広範囲に影響を与えるシステムの依存関係は少ないほうが良いと判断したから。
複数マシンにcronサービスのレプリカをデプロイして、それぞれの状態の一貫性を維持するためにPaxosという合意形成アルゴリズムを使う。
Paxosはk8s周辺でよく見るRaft https://raft.github.io/ の祖先っぽい。
で、具体的にはPaxosの亜種であるFast Paxosというアルゴリズムを使っている。
Fast Paxosはmasterとslaveがいる。それぞれの役割は以下の通り。
- master
- 内部にcron job schedulerを保持して、jobの起動時刻になったらPaxosを通じて他のreplicaにjobの状態を同期する。状態とはjobの開始、終了とかのこと。
- slave
- こちらもmasterと同期されたcron job schedulerを保持して、Paxosを通じてmasterから同期されたjobの状態を保持する。
- masterが死んだときに1分以内にmasterとして選出される。
状態を保持するための工夫が紹介されている。Storing Stateの部分。ここめちゃくちゃ面白い。
まず、cronシステムのステートを保持するためには
- 状態を変更するログ
- ログから算出されたスナップショット
の2つが重要な要素になる。
ログは各レプリカに保存されて、スナップショットは分散ファイルシステムに保存される。
これはスナップショットがあれば、ログがなくても最小限の被害に抑えることができるからこうなっている。最小限の被害とはスナップショット保存時以降の状態変化が失われることを意味する。
Paxosは上記のロジックを構成するアルゴリズムとなっている。
Running Large Cron jobでは、巨大なデータセンターでcron jobを動かすむずかしさについて書かれている。1つのcron jobがデータセンターにかなり大きな影響を与える可能性がある。その上そんなjobが様々なシステムから呼び出される。
これを軽減するためにcrontabのフォーマットを拡張した。?を導入した。
?は指定した箇所でどんな値でも許容するということである。時間の箇所に指定したら、0~23がcronシステムによって選択される。
ただ、これを導入してもなおスパイクが起きてしまう。これはやはり特定の時間に実行する要求があるjobがあるから仕方ない。
ここからはdkronのreadmeを読んでみる。
まず、いくつかの単語をピックアップする。
dkronはraftとserfによってすごい耐障害性を提供しているみたいなことが書かれている。
RaftはPaxosの後継のアルゴリズムであり、合意形成アルゴリズムの一種である。具体的にどんなアルゴリズムなのかはわかってない。ので、調べたいところ。
Serfは聞いたことなかったけど、どうもHashiCorp社が作っているクラスタ管理用ツールっぽい。
go.modをみる限りだとserfはパッケージとしても使えるっぽい。ということは、serfを組み込めばnodeレベルの分散システムは楽に作れるのか、、、?
まずはRaftとserfについてドキュメントを読んでみようと思う。
現時点では、serfをパッケージとして使うっていうのが理解できない。
ラフトとは?
Raftはコンセンサス・アルゴリズムであり、理解しやすいように設計されている。耐障害性と性能はPaxosと同等です。その違いは、比較的独立した部分問題に分解され、実用的なシステムに必要なすべての主要部分にきれいに対応していることです。私たちは、Raftによってコンセンサスがより多くの人々に利用されるようになること、そして、より多くの人々が、現在よりも質の高い様々なコンセンサスベースのシステムを開発できるようになることを望んでいる。
コンセンサスとは何か?
コンセンサスとは、フォールト・トレラントな分散システムにおける基本的な問題である。コンセンサスとは、複数のサーバーが値について合意することである。一旦、ある値に関する決定に達すると、その決定は最終的なものとなる。典型的なコンセンサス・アルゴリズムでは、サーバーの過半数が利用可能であれば、コンセンサスは進展する。例えば、5台のサーバーからなるクラスタは、2台のサーバーが故障しても、稼働し続けることができる。それ以上の数のサーバーが故障すると、コンセンサスは進まなくなる(ただし、間違った結果を返すことはない)。
通常、コンセンサスは、フォールト・トレラント・システムを構築するための一般的なアプローチである、複製されたステート・マシンのコンテキストで発生する。各サーバーはステートマシンとログを持つ。ステートマシンは、ハッシュテーブルなど、フォールトトレラントにしたいコンポーネントである。クライアントからは、クラスタ内の少数のサーバーが故障しても、信頼できる単一のステートマシンとやりとりしているように見える。各ステートマシンは、ログからコマンドを入力する。ハッシュテーブルの例では、ログにはset x to 3のようなコマンドが含まれる。コンセンサスアルゴリズムは、サーバーのログ内のコマンドに合意するために使用される。コンセンサスアルゴリズムは、いずれかのステートマシンがset x to 3をn番目のコマンドとして適用した場合、他のステートマシンが異なるn番目のコマンドを適用することがないようにしなければならない。その結果、各ステートマシンは同じ一連のコマンドを処理し、同じ一連の結果を生成し、同じ一連の状態に到達する。
と書いてある。
dkronでは上記のライブラリを使っている。
hashicorpがRaftの解説を書いてる。
RaftはPaxosをベースにしたコンセンサス・アルゴリズムである。Paxosと比較して、Raftはステートが少なく、よりシンプルで理解しやすいアルゴリズムに設計されている。
Raftについて議論する際に知っておくべき重要な用語がいくつかある:
ログ - Raftシステムの主要な作業単位はログ・エントリです。一貫性の問題は、複製されたログに分解することができる。ログは順序付けられたエントリのシーケンスです。エントリには、ノードの追加、サービスの追加、新しいキーと値のペアなど、クラスタのあらゆる変更が含まれます。すべてのメンバがエントリとその順序に同意している場合、ログは一貫していると考えます。
FSM - 有限ステートマシン。FSMは、それらの間の遷移を持つ有限状態の集まりです。新しいログが適用されると、FSMは状態間を遷移する。つまり、動作は決定論的でなければならない。
ピアセット - ピアセットは、ログ複製に参加するすべてのメンバーのセットです。Consulの目的では、すべてのサーバーノードはローカルデータセンターのピアセットに含まれます。
定足数 - 定数とは、ピアセットからのメンバーの過半数です。サイズNのセットに対して、定足数は少なくとも(N/2)+1のメンバーを必要とします。例えば、ピアセットに5人のメンバーがいる場合、クォーラムを形成するには3つのノードが必要です。定足数のノードが何らかの理由で利用できなくなると、クラスタは利用できなくなり、新しいログをコミットできなくなります。
コミットされたエントリ - ノードのクォーラムに永続的に保存されると、エントリがコミットされたとみなされます。エントリがコミットされると、それを適用できます。
リーダー - 任意の時点で、ピアセットは1つのノードをリーダーに選出します。リーダーは、新しいログエントリーを取り込み、フォロワーにレプリケートし、エントリーがいつコミットされたとみなされるかを管理する責任を負う。
Raftは複雑なプロトコルであるため、ここでは詳細な説明は省略する(より包括的な説明を希望される方には、この論文に完全な仕様が掲載されている)。しかし、メンタルモデルを構築するのに役立つかもしれない高レベルの説明を提供することを試みる。
ラフト・ノードは常に、フォロワー、候補、リーダーの3つの状態のいずれかにある。すべてのノードは、最初はフォロワーとしてスタートします。この状態では、ノードはリーダーからのログ・エントリーを受け入れ、投票を行うことができる。しばらくエントリが受信されない場合、ノードは候補者状態に自己昇格します。候補状態では、ノードは仲間からの投票を要求します。候補が定足数の票を獲得すると、リーダーに昇格する。リーダーは新しいログエントリーを受け入れ、他のすべてのフォロワーにレプリケートしなければならない。さらに、古い読み出しが許容されない場合、すべてのクエリーはリーダーに対しても実行されなければならない。
クラスターがリーダーを持つと、新しいログエントリーを受け入れることができるようになる。クライアントは、リーダーに新しいログエントリを追加するよう要求することができます(Raftの観点では、ログエントリは不透明なバイナリブロブです)。その後、リーダーはそのエントリを耐久性のあるストレージに書き込み、フォロワーのクォーラムにレプリケートしようとする。いったんログエントリーがコミットされると、それを有限ステートマシンに適用することができる。有限状態マシンはアプリケーション固有のもので、Consulの場合、クラスタの状態を維持するためにMemDBを使用している。Consulの書き込みは、コミットされ適用されるまでブロックされる。これは、クエリの一貫モードと併用することで、書き込み後の読み込みセマンティクスを実現する。
明らかに、レプリケートされたログが無制限に成長することは望ましくありません。Raftは、現在の状態がスナップショットされ、ログがコンパクト化されるメカニズムを提供する。FSMは抽象化されているため、FSMの状態を復元すると、古いログの再生と同じ状態にならざるを得ません。これにより、Raftはある時点のFSMの状態をキャプチャし、その状態に到達するために使用されたすべてのログを削除することができます。これはユーザーの介入なしに自動的に実行され、ログの再生に費やす時間を最小限に抑えながら、ディスクの無制限な使用を防ぎます。MemDBを使用する利点の1つは、古い状態がスナップショットされている間でもConsulが新しいトランザクションを受け入れ続け、可用性の問題を防ぐことができることです。
Consensusは、クォーラムが利用可能な時点までフォールトトレラントである。クォーラムノードが利用できない場合、ログエントリーを処理したり、ピアメンバーシップを推論したりすることはできない。たとえば、2つのピアしかないとします:クォーラムのサイズも2であるため、ログエントリーをコミットするには両方のノードが同意する必要があります。AまたはBのどちらかが失敗すると、クォーラムに達することができなくなります。これは、クラスタがノードを追加または削除できず、追加のログエントリをコミットできないことを意味します。その結果、利用できなくなります。この時点で、AまたはBのいずれかを削除し、残りのノードをブートストラップモードで再起動するには、手動による介入が必要になります。
3ノードのRaftクラスタは1つのノード障害に耐えることができますが、5ノードのクラスタは2つのノード障害に耐えることができます。推奨される構成は、データセンターあたり3台または5台のConsulサーバーを稼働させることです。これにより、パフォーマンスを大きく犠牲にすることなく、可用性を最大限に高めることができます。以下の配置表は、クラスタサイズのオプションとフォールトトレランをまとめたものです。
Raftがどういうものかはなんとなーーーくわかった。
serfについて読んでみる。
k8sのnode管理だけある、みたいなイメージに近いのかな?
k8sはnode上のpodなど多くのリソースを管理できるようになっているが、serfはシンプルにクラスタに参加するnodeの管理を行っているっぽい。
コードをどう読んでいくか。
docker-comopseを見ると、dkron agentというコマンドを使ってdkronのクラスタを構成しているっぽい。
この辺りのコマンドの詳細を見てみる。
まず、dkron clusterは複数のnodeで構成されている。
全てのnodeでジョブが実行される。
nodeには2つの役割がある。
- server
- agent
serverは
ジョブの実行を指示し、ジョブのスケジューリングに使用され、データストレージを処理し、リーダー選挙に参加することです。
と書かれている。
で、agentは
Dkronエージェントはクラスターメンバーであり、ジョブの実行を処理し、スクリプトを実行し、結果の出力をサーバーに返します。
と書かれている。
本来であれば、ipを直接指定する必要があるが、meta tagなどのデータを指定することでクラスタ形成ができる。
オプションを提供するためにgo-discoverというhashicorp製のライブラリを使っている。
クォーラムってなんぞや?はこれがわかりやすい。
plugin開発が可能。
drkonプロセスとプラグインのプロセスは別になっていて、gRPCで通信するらしい。
何はともあれ https://github.com/distribworks/dkron/blob/v3.2.6/cmd/agent.go を読むと良さそう。
関係ないけど、buntdbの作者の方、すごすぎ開発者でビビる。
こんな人いるんか、、、。
agentの起動はこちらから。
このあたりからプラグインとエージェント起動の処理があるのが分かる。
いったんプラグインは置いといて、エージェント周りの処理を読んでいく。
golangの実装例を読んでいると、こうやって構造体にconfigを設定するパターン多いよね。
agent#Startメソッドはこんな感じ
その前にbindとadvertiseの意味はこんな感じらしい。
bind addressは、他のマシンから何という名前で呼ばれたかでメッセージを受け取る/受け取らない(というか見える見えない)を変えるものです。
ConsulやVaultは複数のサーバでクラスタを組むため、他のノードに対して「私のIPはこれですよー。このIPでアクセスしてねー」と伝える必要があります。
そのための設定というわけですね。
agent#Startは以下を実施する。
- ロガーの初期化
- BindAddr, HTTPAddr, AdvertiseAddrを初期化、設定値としてセットする
a. BindAddr マシンに付与された1つ以上のIPアドレスからどのIPを利用するか決定する
b. HTTPAddr Web UI用のアドレス
c. AdvertiseAddr クラスター内の他のnodeに知らせるために使われるAddressでデフォルトはBindAddressが使われる。 - serfをセットアップする
a. serf.Serfインスタンスを生成する。生成する際にコマンドライン引数に渡されたオプションを使って設定を行っている。 - sertクラスタにジョインする
- メトリクスの収集を設定する
a. https://dkron.io/docs/usage/metrics/ - expvarでnodenameを公開する
- dkron agentをserver mode or agent modeのどちらかで起動する
a. agentモードの場合、gRPCでAgenServiceをインターフェイスとして待ち受ける
b. serverモードの場合、gRPCでAgentService, DkronServiceをインターフェイスとして待ち受けつつ、Raftも待ち受ける。このときRaftとgRPCサーバーはcmuxパッケージを使って1つのポートで待ち受けて、cmuxでコネクションがどちらか(Raft or gRPC)に振り分けられる。 - タグを設定する
- serfクラスターから発信されたイベントをハンドルするgorutineを起動する
- serfクラスタにjoinする
興味深いので、こちらに記載する。
configに設定されたクラウドプロパイダーのメタデータから割り出したIPアドレス or 直接指定されたIPアドレスに対してserf.Joinを実行する。serf.Joinが何やってるのかまではいったん追わない。
この辺のコードのjoinフィールドの使い方が勉強になった。
構造体にフィールドを定義しておいて、実態を切り替えられるようにしてある。
今回はagent#Joinを渡すことで、このメソッドを内部で使えるようにしている。
- dkron agentをserver mode or agent modeのどちらかで起動する
Agent ServerはgRPCサーバーでproto/dkron.protoから生成されたrpcを備えている。要するにAgentとしてのインターフェイス、実装をgRPC, pbでやってるっぽい。なるほどー。
Serverはproto/dkron.protoから生成されたrpcを備えたgRPCサーバー、raftを起動している。
cmuxというライブラリが何なのかわからないので、調べてみる。
そもそもmuxとは?っていうのはgorilla/muxがわかりやすかった。
gorilla/mux パッケージは、入ってきたリクエストをそれぞれのハンドラに マッチングするためのリクエストルータとディスパッチャを実装しています。
mux という名前は "HTTP request multiplexer" の略です。標準のhttp.ServeMuxのように、mux.Routerは入ってくるリクエストを登録されたルートのリストと照合し、URLやその他の条件にマッチしたルートのハンドラを呼び出します。主な機能は以下の通り:
http.Handlerインターフェースを実装しているので、標準のhttp.ServeMuxと互換性があります。
リクエストはURLホスト、パス、パスプレフィックス、スキーム、ヘッダとクエリ値、HTTPメソッド、またはカスタムマッチャーを使用してマッチさせることができます。
URLホスト、パス、クエリー値は、オプションの正規表現で変数を持つことができます。
登録されたURLは、リソースへの参照を維持するのに役立つビルド、または "リバース "することができます。
ルートはサブルーターとして使うことができます。ネストされたルートは親ルートがマッチした場合のみテストされます。これはホスト、パスのプレフィックス、その他の繰り返される属性のような共通の条件を共有するルートのグループを定義するのに便利です。ボーナスとして、これはリクエストのマッチングを最適化します。
ルーティングとハンドラの登録ができるものをmux(multiplexer)と呼ぶっぽい。
cmuxはconnection multiplexerと呼ぶらしい。
見た感じだと1つのポートで複数プロトコルを受け付けてそれを各ハンドラーに割り振るみたいな感じ?
ServerはDkron ServiceとAgent ServiceをgRPCサーバーとして起動している。
agent#StartServerの最後の方でgorutineとしてmonitorLeadershipというメソッドが呼ばれており、読んでみた。細部はややこしくて諦めたが、見た感じだとRaftクラスタ内のリーダーロールとしてやるべきいくつかのメンテナンス処理を行うようだ。agentがリーダーじゃない場合は何もしないっぽい。
ややこしそうなのはリーダーロール時に実行されるこの辺りの処理。
expvarという簡単にHTTP経由でjson形式のメトリクスを用意できるパッケージが標準にある。すげー。
よくOSSとかでanonymous reportがあるけど、それ専用のパッケージがあるっぽい。
Dkronとしての機能はgRPCを読めば分かりそう。
分散システムとしてどうやって成立してるのか?はRaftとSerfを調べる必要がありそう。