🐵

メトリクスフィルター設定後、自動的にCloudWatchAlarmが作成されるようにしてみた

2022/12/25に公開

はじめに

こんにちは、山田です。
今回はメトリクスフィルターが設定されたら、自動的にCloudWatchAlarmが作成されるようにLambda関数を作成したので記載していきます。
よろしくお願いいたします。

概要図

概要図は以下の通りです。

①:メトリクスフィルターとCLoudWatchAlarmの一覧を取得する。
②:取得した結果、CloudWatchAlarmに名前がないものに関しては、アラームを作成する。
※前提条件※
メトリクスフィルター名とCLoudWatchAlarm名は同名とします。

Lambda関数

作成したLambda関数は以下になります。

import json
import boto3
import time

cloudwatch_client = boto3.client('cloudwatch')
logs_client = boto3.client('logs')

def get_ClodWatchAlarm():
   response = cloudwatch_client.describe_alarms()
   cloudwatchalarm=[]
   count = 0
   for i in range(len(response['MetricAlarms'])):
       cloudwatchalarm.append(response['MetricAlarms'][i]['AlarmName'])
       count+=1
   count = 0
   while 'nextToken' in response:
       response = cloudwatch_client.describe_alarms(nextToken=response['nextToken'])
       for i in range(len(response['MetricAlarms'])):
           cloudwatchalarm.append(response['MetricAlarms'][i]['AlarmName'])
           count+=1
   return cloudwatchalarm
   
def get_MetricsFilter():
   response = logs_client.describe_metric_filters()
   metricsfilter=[]
   count = 0
   for i in range(len(response['metricFilters'])):
       metricsfilter.append(response['metricFilters'][i]['filterName'])
       count+=1
   count = 0
   while 'nextToken' in response:
       response = logs_client.describe_alarms(nextToken=response['nextToken'])
       for i in range(len(response['metricFilters'])):
           metricsfilter.append(response['metricFilters'][i]['filterName'])
           count+=1
   return metricsfilter
   
def get_MetricsFilterPattern():
   response = logs_client.describe_metric_filters()
   metricsfilterpattern=[]
   count = 0
   for i in range(len(response['metricFilters'])):
       metricsfilterpattern.append(response['metricFilters'][i]['filterPattern'])
       count+=1
   count = 0
   while 'nextToken' in response:
       response = logs_client.describe_alarms(nextToken=response['nextToken'])
       for i in range(len(response['metricFilters'])):
           metricsfilterpattern.append(response['metricFilters'][i]['filterPattern'])
           count+=1
   return metricsfilterpattern

def get_LogGroup():
   response = logs_client.describe_metric_filters()
   loggroupname = []
   count = 0
   for i in range(len(response['metricFilters'])):
       loggroupname.append(response['metricFilters'][i]['logGroupName'])
       count+=1
   return loggroupname

def put_LogEvent(metricsfilterpattern):
   cloudwatchalarm = get_ClodWatchAlarm()
   metricsfilter = get_MetricsFilter()
   loggroup = get_LogGroup()
   metricsfilter_2 = []
   count=0
   for item in metricsfilter:
       if item not in cloudwatchalarm:
           res = logs_client.describe_log_streams(
           logGroupName=(loggroup[metricsfilter.index(item)]),
           logStreamNamePrefix=(loggroup[metricsfilter.index(item)]),
           )
           seq_token=res['logStreams'][0]['uploadSequenceToken']
           response = logs_client.put_log_events(
           logGroupName=loggroup[metricsfilter.index(item)],
           logStreamName=loggroup[metricsfilter.index(item)],
           logEvents=[
               {
               'timestamp': int(time.time()) * 1000,
               'message': metricsfilterpattern[metricsfilter.index(item)]
               },
           ],
           sequenceToken = seq_token
           )
           metricsfilter_2.append(item)
   return metricsfilter_2

           
def lambda_handler(event, context):
   client = boto3.client('cloudwatch')
   list_cloudwatchalarm = get_ClodWatchAlarm()
   list_metricsfilter_1 = get_MetricsFilter()
   list_loggroup = get_LogGroup()
   list_metricsfilterpattern = get_MetricsFilterPattern()
   list_metricsfilter_2 = put_LogEvent(list_metricsfilterpattern)
   time.sleep(10)
   for item in list_metricsfilter_2:
       response=cloudwatch_client.put_metric_alarm(
       AlarmName=item,
       AlarmDescription=item,
       ActionsEnabled=True,
       AlarmActions=[
        'SNSのARN',
       ],
       MetricName=item,
       Namespace='Logs',
       Statistic='Sum',
       Period=300,
       Unit='Seconds',
       Threshold=1,
       EvaluationPeriods=1,
       ComparisonOperator='GreaterThanOrEqualToThreshold'
       )

CloudWatchAlarm一覧取得

CloudWatchAlarm一覧を取得するコードは以下になります。

def get_ClodWatchAlarm():
   response = cloudwatch_client.describe_alarms()
   cloudwatchalarm=[]
   count = 0
   for i in range(len(response['MetricAlarms'])):
       cloudwatchalarm.append(response['MetricAlarms'][i]['AlarmName'])
       count+=1
   count = 0
   while 'nextToken' in response:
       response = cloudwatch_client.describe_alarms(nextToken=response['nextToken'])
       for i in range(len(response['MetricAlarms'])):
           cloudwatchalarm.append(response['MetricAlarms'][i]['AlarmName'])
           count+=1
   return cloudwatchalarm

CloudWacthAlarmの情報を取得します。

response = cloudwatch_client.describe_alarms()

空の配列を用意します。変数countには「0」を代入します。
len関数を用いてCloudWatchAlarmの数を取得し、range関数を用いて連続数を取得します。
取得した連続数だけfor文にて繰り返し処理を実行します。処理の最後には、変数countの値が+1されるように設定します。

cloudwatchalarm=[]
count = 0
for i in range(len(response['MetricAlarms'])):
  cloudwatchalarm.append(response['MetricAlarms'][i]['AlarmName'])
  count+=1

()test = ['a' , 'b' , 'c']
range(len(test))
--> 0,1,2

responseの中に、「'nextToken'」 の記載がなくなるまで、for文にて繰り返し処理を実行します。

count = 0
while 'nextToken' in response:
  response = cloudwatch_client.describe_alarms(nextToken=response['nextToken'])
    for i in range(len(response['MetricAlarms'])):
      cloudwatchalarm.append(response['MetricAlarms'][i]['AlarmName'])
        count+=1

メトリクスフィルター・フィルターパターン一覧取得

メトリクスフィルター・フィルターパターン一覧を取得するコードは以下になります。

def get_MetricsFilter():
    response = logs_client.describe_metric_filters()
    metricsfilter=[]
    count = 0
    for i in range(len(response['metricFilters'])):
        metricsfilter.append(response['metricFilters'][i]['filterName'])
        count+=1
    count = 0
    while 'nextToken' in response:
        response = logs_client.describe_alarms(nextToken=response['nextToken'])
        for i in range(len(response['metricFilters'])):
            metricsfilter.append(response['metricFilters'][i]['filterName'])
            count+=1
    return metricsfilter
    
def get_MetricsFilterPattern():
    response = logs_client.describe_metric_filters()
    metricsfilterpattern=[]
    count = 0
    for i in range(len(response['metricFilters'])):
        metricsfilterpattern.append(response['metricFilters'][i]['filterPattern'])
        count+=1
    count = 0
    while 'nextToken' in response:
        response = logs_client.describe_alarms(nextToken=response['nextToken'])
        for i in range(len(response['metricFilters'])):
            metricsfilterpattern.append(response['metricFilters'][i]['filterPattern'])
            count+=1
    return metricsfilterpattern

メトリクスフィルター・フィルターパターン一覧取得に関しても、上記のCLoudWatchAlaram取得の仕組みは同様です。

ロググループ一覧取得

ロググループ一覧を取得するコードは以下になります。

def get_LogGroup():
    response = logs_client.describe_metric_filters()
    loggroupname = []
    count = 0
    for i in range(len(response['metricFilters'])):
        loggroupname.append(response['metricFilters'][i]['logGroupName'])
        count+=1
    return loggroupname

ロググループ一覧に関しては、メトリクスフィルタが設定済みのロググループのみを取得するようにしています。

ログイベント発行

ログイベントを発行するコードは以下になります。

def put_LogEvent(metricsfilterpattern):
   cloudwatchalarm = get_ClodWatchAlarm()
   metricsfilter = get_MetricsFilter()
   loggroup = get_LogGroup()
   metricsfilter_2 = []
   count=0
   for item in metricsfilter:
       if item not in cloudwatchalarm:
           res = logs_client.describe_log_streams(
           logGroupName=(loggroup[metricsfilter.index(item)]),
           logStreamNamePrefix=(loggroup[metricsfilter.index(item)]),
           )
           seq_token=res['logStreams'][0]['uploadSequenceToken']
           response = logs_client.put_log_events(
           logGroupName=loggroup[metricsfilter.index(item)],
           logStreamName=loggroup[metricsfilter.index(item)],
           logEvents=[
               {
               'timestamp': int(time.time()) * 1000,
               'message': metricsfilterpattern[metricsfilter.index(item)]
               },
           ],
           sequenceToken = seq_token
           )
           metricsfilter_2.append(item)
   return metricsfilter_2

CloudWatchAlarm一覧とメトリクスフィルター一覧を比較し、CloudWatchAlarm一覧に名前がないメトリクスフィルターが設定されているロググループにログを発行します。
発行するログのメッセージに関しては、メトリクスフィルターのフィルターパターンに記載してある文字列を発行します。

cloudwatchalarm = get_ClodWatchAlarm()
metricsfilter = get_MetricsFilter()
loggroup = get_LogGroup()

CloudWatchAlarm、メトリクスフィルター、ロググループの一覧を取得します。

metricsfilter_2 = []

空の配列を用意します。

count=0
   for item in metricsfilter:
       if item not in cloudwatchalarm:
           res = logs_client.describe_log_streams(
           logGroupName=(loggroup[metricsfilter.index(item)]),
           logStreamNamePrefix=(loggroup[metricsfilter.index(item)]),
           )
           seq_token=res['logStreams'][0]['uploadSequenceToken']
           response = logs_client.put_log_events(
           logGroupName=loggroup[metricsfilter.index(item)],
           logStreamName=loggroup[metricsfilter.index(item)],
           logEvents=[
               {
               'timestamp': int(time.time()) * 1000,
               'message': metricsfilterpattern[metricsfilter.index(item)]
               },
           ],
           sequenceToken = seq_token
           )

CloudWatchAlarm一覧に名前がないメトリクスフィルターが設定されているロググループにログを発行します。
発行するログのメッセージに関しては、メトリクスフィルターのフィルターパターンに記載してある文字列を発行します。

metricsfilter_2.append(item)
return metricsfilter_2

ログ発行後、空の配列にメトリクスフィルター名を追加します
戻り値として、「metiricsfilter_2」 を返します。

CloudWatchAlarm作成

CloudWatchAlarmを作成するコードは以下になります。

def lambda_handler(event, context):
    list_cloudwatchalarm = get_ClodWatchAlarm()
    list_metricsfilter_1 = get_MetricsFilter()
    list_loggroup = get_LogGroup()
    list_metricsfilterpattern = get_MetricsFilterPattern()
    list_metricsfilter_2 = put_LogEvent(list_metricsfilterpattern)
    time.sleep(10)
    for item in list_metricsfilter_2:
        response=cloudwatch_client.put_metric_alarm(
        AlarmName=item,
        AlarmDescription=item,
        ActionsEnabled=True,
        AlarmActions=[
         'SNSのARN',
        ],
        MetricName=item,
        Namespace='Logs',
        Statistic='Sum',
        Period=300,
        Unit='Seconds',
        Threshold=1,
        EvaluationPeriods=1,
        ComparisonOperator='GreaterThanOrEqualToThreshold'
        )

上記に記載した関数をそれぞれ実行し、最後にCloudWatchAlarmを作成するコードを記載しています。

list_cloudwatchalarm = get_ClodWatchAlarm()
list_metricsfilter_1 = get_MetricsFilter()
list_loggroup = get_LogGroup()
list_metricsfilterpattern = get_MetricsFilterPattern()
list_metricsfilter_2 = put_LogEvent(list_metricsfilterpattern)

上記記載の関数をそれぞれ実行し、情報を変数に代入しています。

time.sleep(10)

CloudWatchにメトリクスが反映されるのを待つため、CloudWatchAlarmの作成を10秒待機します。

for item in list_metricsfilter_2:
    response=cloudwatch_client.put_metric_alarm(
    AlarmName=item, #アラーム名
    AlarmDescription=item, #アラームの説明
    ActionsEnabled=True, 
    AlarmActions=[
     'SNSのARN', #Alarm状態時の通知先
    ],
    MetricName=item, #メトリクス名
    Namespace='Logs', #名前空間
    Statistic='Sum', #合計
    Period=300, #時間
    Unit='Seconds', #単位(秒)
    Threshold=1, #しきい値
    EvaluationPeriods=1, #評価期間
    ComparisonOperator='GreaterThanOrEqualToThreshold' #しきい値以上の時にアラーム発行
    )

メトリクスフィルター名と同名のCLoudWatchAlarmを作成します。

動作確認

動作確認のためにメトリクスフィルター設定後に、作成したLambda関数を実行します。

メトリクスフィルターを設定します。

Lambda関数を実行します。

CloudWatchAlarmが作成されていることが確認できれば完了です。
今回は手動で実行しましたが、EventBridgeにスケジューリングすれば定期的に実行できます。

終わりに

今回は、メトリクスフィルターが設定されたら、自動的にCloudWatchAlarmが作成されるLambda関数を作成したので記載していきました。
Lambda関数の練習がてらに、CloudWatchAlarmを自動化するコードを作成してみました。作成しみたはいいものの、自分の実力はまだまだなんだと実感しました。
これからも、自動化できる箇所についてはLambda関数を用いて自動化していこうと思います。
拝見いただきありがとうございました。

Discussion