💽

Azure SQL データ ベースの監査ログを Event Hub 経由で Azure Data Explorer に保存

に公開

はじめに

SQL データベースの監査ログを取得するには、ストレージ アカウントに保存する方法が一般的ですが、以下のような課題があります:

  • BLOBファイルがテキストデータではないため、SQL Server Management Studio(SSMS)などの専用ツールが必要
  • ログの検索性が低く、大量のデータから特定の情報を見つけることが困難

また、Log Analytics ワークスペースに保存する方法もありますが、運用コストが高くなる傾向があります。そこで今回は、Event Hub と Azure Data Explorer(ADX)を組み合わせた手法を検証してみました。

事前準備

以下のAzureリソースを事前に準備する必要があります。

Event Hub

  • Event Hub 名前空間と Event Hub インスタンスを作成します。
  • SKU、スループット単位(TU)、パーティション数については、想定されるログ量に応じて適切に設定してください。

Azure Data Explorer (ADX)

  • ADX クラスターを作成します。
  • SKU やコンピューティング リソースについては、ログの処理量と保存期間を考慮して選択してください。

設定

SQL サーバーもしくは SQL データベースの [監査] から [イベント ハブ] を設定します。


Event Hub のメトリックで [Incoming Messages] が増えることを確認します。


ADX の [データベース] > [+データベースの追加] から新規データベースを作成します。


ADX のポータル (https://リソース名.centralus.kusto.windows.net) に移動し、[ホーム] > [データの取得] をクリックします。


[Event Hubs] をクリックします。


テーブル名を追加し、ログを転送している Event Hub を指定します。


以下のように表示されるため、そのまま [終了] をクリックします。
もしここで何も表示されなければ、Event Hub 側にログが来ていないため、設定やメトリックを再確認します。

以下の画面で [閉じる] をクリックします。


Azure ポータルの ADX メニューの [データベース] から今回作成したデータベースをクリックし [データ接続] を開くと、ログ取り込みの設定が確認可能です。

動作確認

ADX のポータルで対象のテーブルを検索します。以下のように 1 レコードで表示されます。Log Analytics ワークスペースと異なり、JSON 形式のビューが展開されます。


検索しやすいようにパースします。これで検索可能な状態になります。


パース用クエリはこちらです。

SqlAuditLogs
| mv-expand records
| extend record = parse_json(records)
| extend props = record.properties
| project
    PartitionId = tostring(record.PartitionId),
    originalEventTimestamp = todatetime(record.originalEventTimestamp),
    ['time'] = todatetime(record.['time']),
    resourceId = tostring(record.resourceId),
    category = tostring(record.category),
    operationName = tostring(record.operationName),
    location = tostring(record.location),
    LogicalServerName = tostring(record.LogicalServerName),
    SubscriptionId = tostring(record.SubscriptionId),
    ResourceGroup = tostring(record.ResourceGroup),
    securable_class_type = tostring(props.securable_class_type),
    affected_rows = toint(props.affected_rows),
    server_principal_name = tostring(props.server_principal_name),
    server_instance_name = tostring(props.server_instance_name),
    statement = tostring(props.statement),
    additional_information = tostring(props.additional_information),
    server_principal_sid = tostring(props.server_principal_sid),
    user_defined_information = tostring(props.user_defined_information),
    client_ip = tostring(props.client_ip),
    database_name = tostring(props.database_name),
    event_time = todatetime(props.event_time),
    action_id = tostring(props.action_id),
    schema_name = tostring(props.schema_name),
    target_database_principal_id = toint(props.target_database_principal_id),
    is_local_secondary_replica = tostring(props.is_local_secondary_replica),
    object_name = tostring(props.object_name),
    database_principal_id = toint(props.database_principal_id),
    sequence_number = toint(props.sequence_number),
    server_principal_id = toint(props.server_principal_id),
    object_id = toint(props.object_id),
    ledger_start_sequence_number = toint(props.ledger_start_sequence_number),
    data_sensitivity_information = tostring(props.data_sensitivity_information),
    database_principal_name = tostring(props.database_principal_name),
    host_name = tostring(props.host_name),
    client_tls_version_name = tostring(props.client_tls_version_name),
    is_server_level_audit = tostring(props.is_server_level_audit),
    target_database_principal_name = tostring(props.target_database_principal_name),
    user_defined_event_id = toint(props.user_defined_event_id),
    transaction_id = toint(props.transaction_id),
    target_server_principal_id = toint(props.target_server_principal_id),
    audit_schema_version = toint(props.audit_schema_version),
    response_rows = toint(props.response_rows),
    succeeded = tostring(props.succeeded),
    action_name = tostring(props.action_name),
    class_type_description = tostring(props.class_type_description),
    database_transaction_id = toint(props.database_transaction_id),
    target_server_principal_name = tostring(props.target_server_principal_name),
    duration_milliseconds = toint(props.duration_milliseconds),
    obo_middle_tier_app_id = tostring(props.obo_middle_tier_app_id),
    event_id = tostring(props.event_id),
    connection_id = tostring(props.connection_id),
    external_policy_permissions_checked = tostring(props.external_policy_permissions_checked),
    class_type = tostring(props.class_type),
    sequence_group_id = tostring(props.sequence_group_id),
    session_id = toint(props.session_id),
    is_column_permission = tostring(props.is_column_permission),
    application_name = tostring(props.application_name),
    session_server_principal_name = tostring(props.session_server_principal_name),
    permission_bitmask = tostring(props.permission_bitmask),
    session_context = tostring(props.session_context),
    client_tls_version = toint(props.client_tls_version),
    target_server_principal_sid = tostring(props.target_server_principal_sid)

なお、Azure ポータルのデータ接続の正常性で受信・処理状況が確認可能になっています。

まとめ

今回、Event Hub と Azure Data Explorer を組み合わせた SQL 監査ログの収集方法を検証しました。SQL 監査ログは容量が大きいため、コストを抑えつつ、検索できる基盤を準備するという点では有用な選択肢かと思います。

Microsoft (有志)

Discussion