🗂

【QuickSight】休眠ユーザーアカウントの炙り出しをStep Functionsで自動化してみる。

に公開

概要

きたる、QuickSightのユーザー料金改定に伴い、こまめなアカウント管理をした方がよさそうということで、Step Functionsをメインに特定の日数ログインしていないアカウントをあぶりだすものになります。

使用するサービス

・Step Functions
・EventBridge
・CloudTrail
・S3
・Athena
・DynamoDB (GSI設定あり)
・SNS(Lambda)

全体イメージ図

流れ一覧

① CloudTrailの情報をAthena経由で取得するようにデータベースを作成
② Step Functionsで毎日、前日分のセッション情報を取得(今回はQuickSightのGetDashboard)し、DynamoDBに格納
③ Step FunctionsでDynamoDBに格納されている情報を取得して、指定の期限を過ぎているユーザーを抽出してSlackに送信

CloudTrailの情報をAthena経由で取得するようにデータベースを作成

CloudTrailを有効にしている大前提ですが、QuickSightのAPI ListUsersなどで、最終のログインセッションタイムなどが取得できないため、CloudTrail上のGetDashboardイベントを取得して、その情報を元に、アカウントの管理をする構成をとるためにAthenaでデータベースの定義をします。

実際の定義は下記のとおりです。

CREATE EXTERNAL TABLE cloudtrail_quicksight.ctrail_pp_ymd (
    eventVersion STRING,
    userIdentity STRUCT<
        type: STRING,
        principalId: STRING,
        arn: STRING,
        accountId: STRING,
        invokedBy: STRING,
        accessKeyId: STRING,
        userName: STRING,
        sessionContext: STRUCT<
            attributes: STRUCT<
                mfaAuthenticated: STRING,
                creationDate: STRING>,
            sessionIssuer: STRUCT<
                type: STRING,
                principalId: STRING,
                arn: STRING,
                accountId: STRING,
                userName: STRING>>>,
    eventTime STRING,
    eventSource STRING,
    eventName STRING,
    awsRegion STRING,
    sourceIpAddress STRING,
    userAgent STRING,
    errorCode STRING,
    errorMessage STRING,
    requestParameters STRING,
    responseElements STRING,
    additionalEventData STRING,
    requestId STRING,
    eventId STRING,
    resources ARRAY<STRUCT<
        arn: STRING,
        accountId: STRING,
        type: STRING>>,
    eventType STRING,
    apiVersion STRING,
    readOnly STRING,
    recipientAccountId STRING,
    serviceEventDetails STRING,
    sharedEventID STRING,
    vpcEndpointId STRING
)
PARTITIONED BY (region string, year string, month string, day string)
ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://BUCKET/HOGEHOGE/CloudTrail/'
TBLPROPERTIES (
    'projection.enabled' = 'true',
    'projection.day.type'='integer',
    'projection.day.range'='01,31',
    'projection.day.digits'='2',
    'projection.month.type'='integer',
    'projection.month.range'='01,12',
    'projection.month.digits'='2',
    'projection.year.type'='integer',
    'projection.year.range'='2010,2100',
    'projection.region.type' = 'enum',
    'projection.region.values'='us-east-1,ap-northeast-1',
    'storage.location.template'='s3://BUCKET/HOGEHOGE/CloudTrail/${region}/${year}/${month}/${day}',
    'classification'='cloudtrail',
    'compressionType'='gzip',
    'typeOfData'='file',
    'classification'='cloudtrail'
);

参考URL:
https://repost.aws/ja/knowledge-center/athena-cloudtrail-data-timeout

パーティションを切らないと、毎回フルでスキャンするようになってしまうので、年月日でパーティションを切るような形にします。

Step Functionsで毎日、前日分のセッション情報を取得し、DynamoDBに格納

全体像

JSON定義:

{
  "Comment": "A description of my state machine",
  "StartAt": "変数定義",
  "States": {
    "変数定義": {
      "Type": "Pass",
      "Next": "Athena StartQueryExecution",
      "Assign": {
        "year": "{% $fromMillis($millis()+32400000-86400000, '[Y0001]') %}",
        "month": "{% $fromMillis($millis()+32400000-86400000, '[M01]') %}",
        "day": "{% $fromMillis($millis()+32400000-86400000, '[D01]') %}",
        "Database": "cloudtrail_quicksight"
      }
    },
    "Athena StartQueryExecution": {
      "Type": "Task",
      "Resource": "arn:aws:states:::athena:startQueryExecution",
      "Arguments": {
        "QueryString": "{% \"SELECT useridentity.userName AS user_email, MAX(eventtime) AS last_login_time FROM ctrail_pp_ymd WHERE eventsource = 'quicksight.amazonaws.com' AND eventname = 'GetDashboard' AND year = '\" & $year & \"' AND month = '\" & $month & \"' AND day = '\" & $day & \"' GROUP BY useridentity.userName ORDER BY last_login_time DESC;\" %}",
        "QueryExecutionContext": {
          "Database": "{% $Database %}"
        },
        "ResultConfiguration": {
          "OutputLocation": "s3://HOGEHOGE/"
        },
        "WorkGroup": "primary"
      },
      "Next": "Wait",
      "Assign": {
        "QueryExecutionId": "{% $states.result.QueryExecutionId %}"
      }
    },
    "Wait": {
      "Type": "Wait",
      "Seconds": 3,
      "Next": "Athena GetQueryExecution"
    },
    "Athena GetQueryExecution": {
      "Type": "Task",
      "Resource": "arn:aws:states:::athena:getQueryExecution",
      "Arguments": {
        "QueryExecutionId": "{% $QueryExecutionId %}"
      },
      "Next": "State = SUCCEEDED"
    },
    "State = SUCCEEDED": {
      "Type": "Choice",
      "Choices": [
        {
          "Next": "Athena GetQueryResults",
          "Condition": "{% $states.input.QueryExecution.Status.State = \"SUCCEEDED\" %}"
        }
      ],
      "Default": "Wait"
    },
    "Athena GetQueryResults": {
      "Type": "Task",
      "Resource": "arn:aws:states:::athena:getQueryResults",
      "Arguments": {
        "MaxResults": 100,
        "QueryExecutionId": "{% $QueryExecutionId %}"
      },
      "Next": "Map",
      "Assign": {
        "ColumnInfo": "{% $states.result.ResultSet.ResultSetMetadata.ColumnInfo %}"
      },
      "Output": "{% $states.result.ResultSet.Rows %}"
    },
    "Map": {
      "Type": "Map",
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "INLINE"
        },
        "StartAt": "ヘッダーを無視",
        "States": {
          "ヘッダーを無視": {
            "Type": "Choice",
            "Choices": [
              {
                "Next": "終了",
                "Condition": "{% $states.input.Data[0].VarCharValue = \"user_email\" %}"
              }
            ],
            "Default": "DynamoDB PutItem"
          },
          "DynamoDB PutItem": {
            "Type": "Task",
            "Resource": "arn:aws:states:::dynamodb:putItem",
            "Arguments": {
              "TableName": "quicksight-user-logintime",
              "Item": {
                "user_email": {
                  "S": "{% $states.input.Data[0].VarCharValue %}"
                },
                "last_login_time": {
                  "S": "{% $states.input.Data[1].VarCharValue %}"
                },
                "account_status": {
                  "S": "active"
                },
                "updated_at": {
                  "S": "{% $now() %}"
                }
              }
            },
            "Next": "終了"
          },
          "終了": {
            "Type": "Pass",
            "End": true
          }
        }
      },
      "End": true
    }
  },
  "QueryLanguage": "JSONata"
}

細かく説明

変数定義

最初のPassステートで、変数を定義します。あとで見やすいのでPassステートを入れていますが、削りたいのであれば、Athena StartQueryExecutionでまとめることもできます。(見にくいと思いますが…)

"Assign": {
        "year": "{% $fromMillis($millis()+32400000-86400000, '[Y0001]') %}",
        "month": "{% $fromMillis($millis()+32400000-86400000, '[M01]') %}",
        "day": "{% $fromMillis($millis()+32400000-86400000, '[D01]') %}",
        "Database": "cloudtrail_quicksight"
      }

$fromMillis($millis()+32400000-86400000, '[Y0001]')とすることで、現在のUNIXタイムから、-9時間した日時に、-1日するという形になります。
実際にとれる値は、

こんな感じです。

Athena StartQueryExecution

QueryStringのところでクエリをstringで埋め込みます。

"Arguments": {
        "QueryString": "{% \"SELECT useridentity.userName AS user_email, MAX(eventtime) AS last_login_time FROM ctrail_pp_ymd WHERE eventsource = 'quicksight.amazonaws.com' AND eventname = 'GetDashboard' AND year = '\" & $year & \"' AND month = '\" & $month & \"' AND day = '\" & $day & \"' GROUP BY useridentity.userName ORDER BY last_login_time DESC;\" %}",
        "QueryExecutionContext": {
          "Database": "{% $Database %}"
        },
        "ResultConfiguration": {
          "OutputLocation": "s3://HOGEHOGE/"
        },
        "WorkGroup": "primary"
      },

投げたいクエリだけ整理すると、

SELECT 
    useridentity.userName AS user_email,
    MAX(eventtime) AS last_login_time
FROM 
    cloudtrail_quicksight.ctrail_pp_ymd
WHERE 
    eventsource = 'quicksight.amazonaws.com'
    AND eventname = 'GetDashboard'
    AND year = '2025'
    AND month = '07'
    AND day = '03'
GROUP BY 
    useridentity.userName
ORDER BY 
    last_login_time DESC;

こんな感じです。

DynamoDB PutItem

クエリでとってきたものを、DynamoDBにPutします。後ほど使うので、account_statusを付与します。

"Item": {
        "user_email": {
          "S": "{% $states.input.Data[0].VarCharValue %}"
        },
        "last_login_time": {
          "S": "{% $states.input.Data[1].VarCharValue %}"
        },
        "account_status": {
          "S": "active"
        },
        "updated_at": {
          "S": "{% $now() %}"
        }
      }
    }

これで、こんな感じでアカウント情報が取得できます。

Step FunctionsでDynamoDBに格納されている情報を取得して、指定の期限を過ぎているユーザーを抽出してSlackに送信

全体像

JSON定義:

{
  "Comment": "A description of my state machine",
  "StartAt": "active-Query",
  "States": {
    "active-Query": {
      "Type": "Task",
      "Arguments": {
        "TableName": "quicksight-user-logintime",
        "IndexName": "account_status-index",
        "KeyConditionExpression": "account_status = :val",
        "ExpressionAttributeValues": {
          ":val": {
            "S": "active"
          }
        }
      },
      "Resource": "arn:aws:states:::aws-sdk:dynamodb:query",
      "Next": "Map",
      "Output": "{% $states.result.Items %}"
    },
    "Map": {
      "Type": "Map",
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "INLINE"
        },
        "StartAt": "90日以上アクセス無し。",
        "States": {
          "90日以上アクセス無し。": {
            "Type": "Pass",
            "End": true,
            "Output": "{% ($toMillis($states.input.last_login_time.S) < ($millis() - (86400000*90))) ? $states.input : null %}"
          }
        }
      },
      "Next": "CheckIfUsersExist",
      "Output": "{% $exists($states.result[$ != null]) ? $states.result[$ != null] : [] %}"
    },
    "CheckIfUsersExist": {
      "Type": "Choice",
      "Choices": [
        {
          "Next": "pending-Query",
          "Condition": "{% $count($states.input) = 0 %}"
        }
      ],
      "Default": "UpdateUsers"
    },
    "pending-Query": {
      "Type": "Task",
      "Arguments": {
        "TableName": "quicksight-user-logintime",
        "IndexName": "account_status-index",
        "KeyConditionExpression": "account_status = :val",
        "ExpressionAttributeValues": {
          ":val": {
            "S": "pending"
          }
        }
      },
      "Resource": "arn:aws:states:::aws-sdk:dynamodb:query",
      "Output": "{% $states.result.Items %}",
      "Next": "SNS Publish"
    },
    "UpdateUsers": {
      "Type": "Map",
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "INLINE"
        },
        "StartAt": "UpdateItem",
        "States": {
          "UpdateItem": {
            "Type": "Task",
            "Arguments": {
              "TableName": "quicksight-user-logintime",
              "Key": {
                "user_email": "{% $states.input.user_email %}"
              },
              "UpdateExpression": "SET account_status = :newStatus, updated_at = :updatedAt",
              "ExpressionAttributeValues": {
                ":newStatus": {
                  "S": "pending"
                },
                ":updatedAt": {
                  "S": "{% $now() %}"
                }
              }
            },
            "Resource": "arn:aws:states:::aws-sdk:dynamodb:updateItem",
            "End": true
          }
        }
      },
      "Next": "pending-Query",
      "Output": "{% $states.input %}"
    },
    "SNS Publish": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Arguments": {
        "Message": "{% $states.input %}",
        "TopicArn": "arn:aws:sns:ap-northeast-1:123456789012:quicksight-notifications"
      },
      "End": true
    }
  },
  "QueryLanguage": "JSONata"
}

細かく説明

Query

GSIを設定済み(account_statusをPKに)のDynamoDBからQueryしてデータを取得します。account_statusがactiveのみを抽出したユーザーを取得します。

"Arguments": {
    "TableName": "quicksight-user-logintime",
    "IndexName": "account_status-index",
    "KeyConditionExpression": "account_status = :val",
    "ExpressionAttributeValues": {
      ":val": {
        "S": "active"
      }
    }
  }

90日以上アクセス無し

今回は、適当に90日以上のアクセス無しを検知するようにしています。

"90日以上アクセス無し。": {
        "Type": "Pass",
        "End": true,
        "Output": "{% ($toMillis($states.input.last_login_time.S) < ($millis() - (86400000*90))) ? $states.input : null %}"
      }
    }
  }

三項演算子で、

($toMillis($states.input.last_login_time.S) < ($millis() - (86400000*90))) ? $states.input : null

とすることで、UNIXタイムが90日より前になっているユーザーのみアウトプットして、それ以外はnullで返すようにしています。

($millis() - (86400000*90))ここが90日前のUNIXタイムを表しています。

そして、MapステートのOutputに、

"Output": "{% $exists($states.result[$ != null]) ? $states.result[$ != null] : [] %}"

として、nullを出力に含めないようにし、何もヒットしなかった場合は、[]を出力するようにします。

CheckIfUsersExist

指定した日付より前のログインセッションのユーザーのみ、active_statusの変更をしたいので、Checkステートを入れます。条件式は、{% $count($states.input) = 0 %}で、Mapの出力が[]以外の場合UpdateItemを行います。

"Choices": [
    {
      "Next": "pending-Query",
      "Condition": "{% $count($states.input) = 0 %}"
    }
  ]

UpdateItem

今回検出したアカウントに関して、account_statusをpendingに更新します。

"Arguments": {
      "TableName": "quicksight-user-logintime",
      "Key": {
        "user_email": "{% $states.input.user_email %}"
      },
      "UpdateExpression": "SET account_status = :newStatus, updated_at = :updatedAt",
      "ExpressionAttributeValues": {
        ":newStatus": {
          "S": "pending"
        },
        ":updatedAt": {
          "S": "{% $now() %}"
        }
      }
    }

SNS publish

最後に、取得した情報をSlackに展開してお知らせします。
連携しているLambdaはこちらです。

import json
import urllib.request
import os
from datetime import datetime, timedelta, timezone

SLACK_WEBHOOK_URL = os.environ['SLACK_WEBHOOK_URL']

def lambda_handler(event, context):

    print(event)
    # SNS レコードを取得
    sns = event['Records'][0]['Sns']
    raw_message = sns['Message']
    topic_arn = sns.get('TopicArn', '')

    # JSON をデコードし、リスト or 単独オブジェクト対応
    try:
        parsed = json.loads(raw_message)
    except json.JSONDecodeError as e:
        print(f"JSON decode error: {e}")
        return

    if isinstance(parsed, dict):
        items = [parsed]
    elif isinstance(parsed, list):
        items = parsed
    else:
        print(f"Unexpected message type: {type(parsed)}")
        return

    # 各ユーザー情報をパースして行を生成
    lines = []
    for entry in items:
        utc_str = entry.get('last_login_time', {}).get('S', '')
        try:
            dt_utc = datetime.fromisoformat(utc_str.replace('Z', '+00:00'))
            ts_jst = dt_utc.astimezone(timezone(timedelta(hours=9))).strftime('%Y-%m-%d %H:%M:%S')
        except Exception:
            ts_jst = utc_str or '不明'

        email = entry.get('user_email', {}).get('S', '不明')
        lines.append(f"• *{email}*  — 最終ログイン: `{ts_jst}`")

    # 候補なしの場合フォールバック
    if not lines:
        lines = ["(削除候補アカウントはありませんでした)"]

    # Slack 用メッセージを組み立て
    slack_message = {
        "blocks": [
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": "【二フラム】QuickSight アカウント削除候補",
                    "emoji": True
                }
            },
            {"type": "divider"},
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": "*以下のアカウントが削除対象として絞り込まれました*:\n" + "\n".join(lines)
                }
            },
            {"type": "divider"}
        ]
    }
    req = urllib.request.Request(
        SLACK_WEBHOOK_URL,
        data=json.dumps(slack_message).encode('utf-8'),
        headers={'Content-Type': 'application/json'}
    )
    try:
        with urllib.request.urlopen(req) as res:
            print(f"Slack 通知ステータス: {res.status}")
    except Exception as e:
        print(f"Slack 通知失敗: {e}")
        raise

送られるメッセージはこんな感じ。

さいごに

今回はアカウントをあぶりだすまでをやりましたが、ここまでであればQuickSightのアカウント一覧で簡単に見れますので、次はここから削除をするものを作っていきたいと思います。自動で問答無用に削除するのか、なにかアクションを加えるのかは考え中です。

今回EventBridgeの部分は省きましたが、適切なタイミングでスケジュールすればいいと思います。
最終ログインの取得に関しては毎日で、おしらせに関しては週1回など。

Discussion