🐿️

Kinesis Data AnalyticsのBeamチュートリアル

2021/04/25に公開

Kinesiss Data AnalyticsでのApache Beamのチュートリアルでハマったのでメモ。

背景知識

Kinesis Data Analytics(以下、KDA)

公式ページ
AWSのManaged Apache Flinkです。

公式ページ
分散データ処理エンジンです。Apache Sparkや、GCPのCloud Dataflowの仲間です。
マスコットのリスがカワイイです。

Apache Beam

公式ページ
FlinkやDataflowで使えるAPI・SDKです。Beamで書くことで、

  • 分散データ処理エンジン間を移行しやすくなる(ある程度)
  • ストリーミングとバッチを同じように書ける
  • 入出力(Source, Sink)などのライブラリを共通して使える

などのメリットがあります。
マスコットのホタルがカワイイです。

Fink・KDAは自前のAPI・SDKでも、Beamでも書くことができます。
(DataflowはBeamのみ)
今回やったチュートリアルは、Apache BeamでKDAのアプリケーションを実行する回です。

チュートリアルの概要

Kinesis Data Streamsから入力を受け取り、CloudWatch LogsとKines Data Streamに出力するBeamアプリケーションを構築するチュートリアルです。おおまかには下の流れです。

  1. 関連リソースの準備
    • jarを置くS3
    • 入出力に使うKinesis Data Stream
  2. ダミーの入力をKinesis Data Streamに出力するPythonスクリプトの作成
  3. サンプルJavaコードをダウンロード・ビルド・アップロード
  4. KDAアプリケーションの設定
  5. アプリケーションの起動

チュートリアルなので書いている通りにやれば動くはずですが、いくつか戸惑った点がありました。

戸惑った点

  1. ライブラリのバージョン
  2. Kinesis Data Streamの確認の仕方
  3. アプリケーションの終了・Checkpoint
  4. IAMの使いまわし注意

ライブラリのバージョン

2021/04/25の時点では、サンプルコード記載の依存関係そのままでは、

  • コンパイルには成功
  • KDAでアプリケーションを実行するとエラーがCloudWatch Logsに出力される
  • 処理が実行されない

という不具合があります。issue が報告されているので、いずれ訂正されると思います。

Kinesis Data Streamのデータ確認

このチュートリアルでは、Kinesis Data Streamsにデータを書き込みますが、その確認方法は記載されていません(※)。Kinesis Data Streamsのドキュメントの方に、CLIでデータを確認する方法が記載されているので、そちらを参考にしましょう。

なお、上記ドキュメントに記載されています(※※)が、空のデータが返されることもある(Data Stream上にあったとしても)点には注意が必要です。
Kinesis Data FirehoseでS3にデータを置いて確認したり、下記のようにループすると良いと思います。

# (Streamの最初から見る場合)
# first=$(aws kinesis --profile プロファイル名 get-records --shard-iterator  $(aws kinesis --profile プロファイル名 get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name ExampleOutputStream | jq  -r .ShardIterator))

# このスクリプト起動後のデータから見る場合
first=$(aws kinesis --profile プロファイル名 get-records --shard-iterator  $(aws kinesis --profile プロファイル名 get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type LATEST --stream-name ExampleOutputStream | jq  -r .ShardIterator))
echo $first
itr=$(echo $first | jq -r .NextShardIterator)

while true
do
    echo $itr
    m=$(aws kinesis --profile adminuser get-records --shard-iterator $itr)
    # メッセージの中身見る場合は、base64 -dなどにパイプしてデコード
    echo $m
    itr=$(echo $m | jq -r .NextShardIterator)
    sleep 1
done

※ チュートリアルの記載は、さらっと「You can check the Kinesis Data Analytics metrics on the CloudWatch console to verify that the application is working. 」
※※ 「you may receive zero or more records even if there are records in your stream, and any records returned may not represent all the records currently in your stream」

アプリケーションの終了・Checkpoint

KDAの管理画面には、アプリケーションを止める選択肢が3つあります。

  1. 「停止」
  2. 「スナップショットを作成せずに停止」
  3. 「削除」

それぞれの詳しい説明を見つけられなかったのですが、下の挙動かな?と思っています。

  • 「停止」は、アプリケーションの設定・状態(スナップショット)を残したアプリケーションの停止
  • 「スナップショットを作成せずに停止」は、設定は残すが状態は残さないアプリケーションの停止
  • 「削除」は、設定も情報も含めたアプリケーションの停止・削除

このうち、今回のチュートリアルで選ぶのは「削除」です。

なお、「停止」を選ぶと、待てど待てども、アプリケーションを終了することができませんでした。
BeamでないFlinkのチュートリアルのアプリケーションでは、「停止」が出来たので、KDAでBeamを使う上での制限かもしれない?と思っています。
(原因不明なので、ご存知の方いれば情報くださいませ…)

IAMの使いまわし注意

KDAのアプリケーションを作成する時に、アプリケーションが使用するIAMを作成、もしくは、既存のIAMを選択します。
新規作成の時は大丈夫なのですが、他のKDAアプリケーションで使ったIAMを使い回す時は
CloudWatch Logsの権限を割り当てることに注意してください。

新規作成IAMでは、アプリケーション名に対応するロググループのPutEvents権限だけが設定されるため、そのまま流用すると、CloudWatch Logsにログが出ない現象に直面する羽目になります。
(*か対応するアプリケーションのロググループを設定すればO.K.)

(付録)ライブラリのバージョンが合わない時のエラー

長いです。初見でバージョンの不具合とわかるのは難しいと思います。

"message": "org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$4(JarRunOverrideHandler.java:207)\n\tat java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)\n\tat java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)\n\tat java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: java.util.concurrent.CompletionException: java.lang.VerifyError: Bad type on operand stack\nException Details:\n Location:\n org/apache/beam/runners/flink/FlinkStreamingTransformTranslators$CombinePerKeyTranslator.translateNode(Lorg/apache/beam/sdk/transforms/PTransform;Lorg/apache/beam/runners/flink/FlinkStreamingTranslationContext;)V @467: invokespecial\n Reason:\n Type 'org/apache/flink/streaming/api/transformations/TwoInputTransformation' (current frame, stack[4]) is not assignable to 'org/apache/flink/streaming/api/transformations/StreamTransformation'\n Current Frame:\n bci: @467\n flags: { }\n locals: { 'org/apache/beam/runners/flink/FlinkStreamingTransformTranslators$CombinePerKeyTranslator', 'org/apache/beam/sdk/transforms/PTransform', 'org/apache/beam/runners/flink/FlinkStreamingTranslationContext', 'java/lang/String', 'org/apache/beam/sdk/values/PCollection', 'org/apache/beam/sdk/values/WindowingStrategy', 'org/apache/beam/sdk/coders/KvCoder', 'org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder', 'org/apache/flink/streaming/api/datastream/DataStream', 'org/apache/beam/sdk/util/WindowedValue$FullWindowedValueCoder', 'org/apache/beam/runners/flink/translation/types/CoderTypeInformation', 'org/apache/flink/streaming/api/datastream/DataStream', 'org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector', 'org/apache/flink/streaming/api/datastream/KeyedStream', 'org/apache/beam/sdk/transforms/CombineFnBase$GlobalCombineFn', 'org/apache/beam/runners/core/SystemReduceFn', 'org/apache/beam/sdk/coders/Coder', 'org/apache/flink/api/common/typeinfo/TypeInformation', 'java/util/List', 'org/apache/flink/api/java/tuple/Tuple2', 'org/apache/beam/sdk/values/TupleTag', 'org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator', 'org/apache/flink/streaming/api/transformations/TwoInputTransformation' }\n stack: { uninitialized 455, uninitialized 455, 'org/apache/beam/runners/flink/FlinkStreamingTransformTranslators$CombinePerKeyTranslator', 'org/apache/flink/streaming/api/environment/StreamExecutionEnvironment', 'org/apache/flink/streaming/api/transformations/TwoInputTransformation' }\n Bytecode:\n 0000000: 2cb8 000b 4e2c 2bb6 0003 c000 043a 0419\n 0000010: 04b6 0005 3a05 1904 b600 0cc0 000d 3a06\n 0000020: 1906 b600 0e19 06b6 000f 1904 b600 05b6\n 0000030: 0006 b600 10b8 0011 3a07 2c19 04b6 0012\n 0000040: 3a08 1907 1904 b600 05b6 0006 b600 10b8\n 0000050: 0013 3a09 bb00 1459 1909 b700 153a 0a19\n 0000060: 08bb 0016 592c b600 17b7 0018 b600 1919\n 0000070: 0ab6 001a 121b b600 1c3a 0bbb 001d 5919\n 0000080: 06b6 000e b700 1e3a 0c19 0b19 0cb6 001f\n 0000090: 3a0d 2bc0 0008 b600 203a 0e19 06b6 000e\n 00000a0: 190e 1904 b600 21b6 0022 1906 b800 23b8\n 00000b0: 0024 3a0f 2c2c 2bb6 0025 c000 04b6 0026\n 00000c0: 3a10 2c2c 2bb6 0025 c000 04b6 0027 3a11\n 00000d0: 2bc0 0008 b600 093a 1219 12b9 000a 0100\n 00000e0: 9900 61bb 0028 5912 29b7 002a 3a13 bb00\n 00000f0: 2b59 190f 2d19 0919 13b8 002c bb00 2d59\n 0000100: 1913 1910 b700 2e19 05bb 002f 59b7 0030\n 0000110: b800 2c2c b600 1719 06b6 000e 190c b700\n 0000120: 313a 1419 0d2d 1911 1914 b600 322d b600\n 0000130: 333a 152c 2c2b b600 2519 15b6 0034 a700\n 0000140: af19 122c b800 353a 13bb 0028 5912 29b7\n 0000150: 002a 3a14 bb00 2b59 190f 2d19 0919 14b8\n 0000160: 002c bb00 2d59 1914 1910 b700 2e19 0519\n 0000170: 13b4 0036 c000 3719 122c b600 1719 06b6\n 0000180: 000e 190c b700 313a 15bb 0038 5919 0db6\n 0000190: 0039 1913 b400 3ac0 003b b600 3cb6 003d\n 00001a0: 2bb6 003e 1915 1911 190d b600 3fb7 0040\n 00001b0: 3a16 1916 190d b600 41b6 0042 1916 190d\n 00001c0: b600 4301 b600 44bb 0045 592a 190d b600\n 00001d0: 4619 16b7 0047 3a17 190d b600 4619 16b6\n 00001e0: 0048 2c2c 2bb6 0025 1917 b600 34b1 \n Stackmap Table:\n full_frame(@321,{Object[#73],Object[#164],Object[#165],Object[#166],Object[#4],Object[#100],Object[#13],Object[#167],Object[#59],Object[#125],Object[#20],Object[#59],Object[#29],Object[#168],Object[#136],Object[#169],Object[#170],Object[#171],Object[#172]},{})\n same_frame_extended(@493)\n\n\tat java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)\n\tat java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)\n\t... 6 more\nCaused by: java.lang.VerifyError: Bad type on operand stack\nException Details:\n Location:\n org/apache/beam/runners/flink/FlinkStreamingTransformTranslators$CombinePerKeyTranslator.translateNode(Lorg/apache/beam/sdk/transforms/PTransform;Lorg/apache/beam/runners/flink/FlinkStreamingTranslationContext;)V @467: invokespecial\n Reason:\n Type 'org/apache/flink/streaming/api/transformations/TwoInputTransformation' (current frame, stack[4]) is not assignable to 'org/apache/flink/streaming/api/transformations/StreamTransformation'\n Current Frame:\n bci: @467\n flags: { }\n locals: { 'org/apache/beam/runners/flink/FlinkStreamingTransformTranslators$CombinePerKeyTranslator', 'org/apache/beam/sdk/transforms/PTransform', 'org/apache/beam/runners/flink/FlinkStreamingTranslationContext', 'java/lang/String', 'org/apache/beam/sdk/values/PCollection', 'org/apache/beam/sdk/values/WindowingStrategy', 'org/apache/beam/sdk/coders/KvCoder', 'org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder', 'org/apache/flink/streaming/api/datastream/DataStream', 'org/apache/beam/sdk/util/WindowedValue$FullWindowedValueCoder', 'org/apache/beam/runners/flink/translation/types/CoderTypeInformation', 'org/apache/flink/streaming/api/datastream/DataStream', 'org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector', 'org/apache/flink/streaming/api/datastream/KeyedStream', 'org/apache/beam/sdk/transforms/CombineFnBase$GlobalCombineFn', 'org/apache/beam/runners/core/SystemReduceFn', 'org/apache/beam/sdk/coders/Coder', 'org/apache/flink/api/common/typeinfo/TypeInformation', 'java/util/List', 'org/apache/flink/api/java/tuple/Tuple2', 'org/apache/beam/sdk/values/TupleTag', 'org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator', 'org/apache/flink/streaming/api/transformations/TwoInputTransformation' }\n stack: { uninitialized 455, uninitialized 455, 'org/apache/beam/runners/flink/FlinkStreamingTransformTranslators$CombinePerKeyTranslator', 'org/apache/flink/streaming/api/environment/StreamExecutionEnvironment', 'org/apache/flink/streaming/api/transformations/TwoInputTransformation' }\n Bytecode:\n 0000000: 2cb8 000b 4e2c 2bb6 0003 c000 043a 0419\n 0000010: 04b6 0005 3a05 1904 b600 0cc0 000d 3a06\n 0000020: 1906 b600 0e19 06b6 000f 1904 b600 05b6\n 0000030: 0006 b600 10b8 0011 3a07 2c19 04b6 0012\n 0000040: 3a08 1907 1904 b600 05b6 0006 b600 10b8\n 0000050: 0013 3a09 bb00 1459 1909 b700 153a 0a19\n 0000060: 08bb 0016 592c b600 17b7 0018 b600 1919\n 0000070: 0ab6 001a 121b b600 1c3a 0bbb 001d 5919\n 0000080: 06b6 000e b700 1e3a 0c19 0b19 0cb6 001f\n 0000090: 3a0d 2bc0 0008 b600 203a 0e19 06b6 000e\n 00000a0: 190e 1904 b600 21b6 0022 1906 b800 23b8\n 00000b0: 0024 3a0f 2c2c 2bb6 0025 c000 04b6 0026\n 00000c0: 3a10 2c2c 2bb6 0025 c000 04b6 0027 3a11\n 00000d0: 2bc0 0008 b600 093a 1219 12b9 000a 0100\n 00000e0: 9900 61bb 0028 5912 29b7 002a 3a13 bb00\n 00000f0: 2b59 190f 2d19 0919 13b8 002c bb00 2d59\n 0000100: 1913 1910 b700 2e19 05bb 002f 59b7 0030\n 0000110: b800 2c2c b600 1719 06b6 000e 190c b700\n 0000120: 313a 1419 0d2d 1911 1914 b600 322d b600\n 0000130: 333a 152c 2c2b b600 2519 15b6 0034 a700\n 0000140: af19 122c b800 353a 13bb 0028 5912 29b7\n 0000150: 002a 3a14 bb00 2b59 190f 2d19 0919 14b8\n 0000160: 002c bb00 2d59 1914 1910 b700 2e19 0519\n 0000170: 13b4 0036 c000 3719 122c b600 1719 06b6\n 0000180: 000e 190c b700 313a 15bb 0038 5919 0db6\n 0000190: 0039 1913 b400 3ac0 003b b600 3cb6 003d\n 00001a0: 2bb6 003e 1915 1911 190d b600 3fb7 0040\n 00001b0: 3a16 1916 190d b600 41b6 0042 1916 190d\n 00001c0: b600 4301 b600 44bb 0045 592a 190d b600\n 00001d0: 4619 16b7 0047 3a17 190d b600 4619 16b6\n 00001e0: 0048 2c2c 2bb6 0025 1917 b600 34b1 \n Stackmap Table:\n full_frame(@321,{Object[#73],Object[#164],Object[#165],Object[#166],Object[#4],Object[#100],Object[#13],Object[#167],Object[#59],Object[#125],Object[#20],Object[#59],Object[#29],Object[#168],Object[#136],Object[#169],Object[#170],Object[#171],Object[#172]},{})\n same_frame_extended(@493)\n\n\tat org.apache.beam.runners.flink.FlinkStreamingTransformTranslators.<clinit>(FlinkStreamingTransformTranslators.java:156)\n\tat org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.enterCompositeTransform(FlinkStreamingPipelineTranslator.java:103)\n\tat org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:653)\n\tat org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)\n\tat org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)\n\tat org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)\n\tat org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463)\n\tat org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)\n\tat org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:88)\n\tat org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:117)\n\tat org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:82)\n\tat org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)\n\tat org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)\n\tat com.amazonaws.kinesisanalytics.beam.BasicBeamStreamingJob.main(BasicBeamStreamingJob.java:86)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:294)\n\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:201)\n\tat org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:203)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)\n\t... 6 more\n",

Discussion