Kinesis Data Firehoseで受け取ったレコードをLambdaでフィルタリングする
Kinesis Data Firehoseにおいて、データソースから受け取ったレコードをフィルタリングしたい場合があると思います。
CloudWatch Logsのサブスクリプションフィルターを使っている場合には、あらかじめ送信するデータのみFirehoseに渡されるとは思いますが、今回はFirehoseに設定するデータ変換Lambdaにおいてフィルタリングする方法を紹介します。
※情報にたどり着くまでに時間がかかったため、備忘も込めて・・・。。
結論
データ変換用のLambdaで受け取った各レコードのパラメータのうち、result
を編集してreturnする。
Ok :Firehoseの送信先に送信する
Dropped:Firehoseの送信先に送信しない(=データをフィルタ可能)
{
"recordId": "<recordId from the Lambda input>",
"result": "Ok",
"data": "<Base64 encoded Transformed data>"
},
{
"recordId": "<recordId from the Lambda input>",
"result": "Dropped",
"data": "<Base64 encoded Transformed data>"
}
以下、公式ドキュメント。
レコードのデータ変換のステータス。指定できる値は次のとおりです: Ok (レコードが正常に変換された)、Dropped (レコードが処理ロジックによって意図的に削除された)、ProcessingFailed (レコードを変換できなかった)。
データ変換用Lambdaで受け取る/返すレコードについて
クラスメソッドさんの以下の記事が大変参考になりました。
- 基本的に使用するのは、recordsの中身
- 特にrecordIdはresponseで使用する
- recordsの各要素のdataは各データをBase64エンコードしたもの
使用例 -AWS WAFのログをフィルタする-
今回はAWS WAFのログについて、BLOCKした際のログのみをFirehoseの送信先に送信するようにLambdaを実装します。
WAFのログについて
AWS WAFのログですが、ログフィールドでWAFによるアクションの判別が可能です。
BLOCKの場合はaction
フィールドに「BLOCK」と記録されます。
※COUNTなどを判別したい場合はnonTerminatingMatchingRules
の中のaction
を参照する必要があります。この辺りは別途書ければ書きたいと思います。
詳細は公式ドキュメントを参照ください。
また、WAFのログの例についても公式ドキュメントを参照ください。
データ変換用のLambdaについて
Lambdaは「Process records sent to an Amazon Data Firehose streams」というブループリントを使用します。
修正前のコード(ブループリント)は以下の通りです。
import base64
print('Loading function')
def lambda_handler(event, context):
output = []
for record in event['records']:
print(record['recordId'])
payload = base64.b64decode(record['data']).decode('utf-8')
# Do custom processing on the payload here
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(payload.encode('utf-8')).decode('utf-8')
}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['records'])))
return {'records': output}
ブループリントを元に、AWS WAFのログのうち、action
に記録されているWAFアクションを判断してフィルタする処理を加えます。
実装したコードは以下の通りです。
check_waf_action
という関数を定義し、AWS WAFのログアクションを判断、result
のパラメータを返しています。
import base64
import json
print('Loading function')
# WAFのアクションを確認する関数
def check_waf_action(payload):
# dict型に変換
waf_log = json.loads(payload)
# actionを判定
# BLOCKの場合はOk => 送信
# それ以外はDropped => 非送信
if(waf_log['action'] == 'BLOCK'):
return 'Ok'
return 'Dropped'
def lambda_handler(event, context):
output = []
for record in event['records']:
print(record['recordId'])
payload = base64.b64decode(record['data']).decode('utf-8')
# Do custom processing on the payload here
# check_waf_action関数を呼び出す
result = check_waf_action(payload)
output_record = {
'recordId': record['recordId'],
'result': result,
'data': record['data']
}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['records'])))
return {'records': output}
おわりに
Kinesis Data Firehoseに設定できるデータ変換Lambdaにおいて、レコードをフィルタリングする処理についてまとめました。
本記事がどなたかの参考になれば幸いです。
参考
Discussion