💸

NewrelicでAWS CostUsageReportのダッシュボードを作成する

2024/08/27に公開

TL;DR

  • NewrelicのダッシュボードでAWS CostUsageReport連携情報を可視化しました
  • Newrelicへの連携を行うlambdaは自作しました
  • 参考記事の中にあるNewrelic公式のlambdaとGrokを用いたParseでの連携方法は採用しませんでした

参考

https://docs.aws.amazon.com/ja_jp/cur/latest/userguide/what-is-cur.html
https://docs.newrelic.com/jp/docs/logs/log-api/introduction-log-api/
https://newrelic.com/jp/blog/how-to-relic/aws-cur-visualise

背景

New Relicの導入が一段落し、次のステップとしてFinOpsの取り組みを進めることになりました。現状では、コストの大部分がインフラ関連の費用となっているため、これらを可視化し、開発チームがコスト意識を持てるような仕組みを構築しようとなりました。

成果物


Newrelicのダッシュボード

※お見せ出来る情報少なくてすみません

なぜ公式のやり方を採用しなかったのか

公式の連携方法(参考

  • costUsageReportがS3バケットにgzip圧縮されたcsv形式で追加される
  • ↑をトリガーにNewrelic公式のLogAPI連携用lambdaが実行される
  • LogAPIにはCSV形式のまま連携されて、Parse機能によってJson形式に変換する

発生した問題

  • costUsageReportは連携される内容に応じてヘッダー項目が増減する
    • NewrelicのParse機能はヘッダーが固定されていることが前提で動的に対処できず、Json変換時にKeyとValueのマッピングがずれてしまうことが多々あった

1年以上前の記事ということもあり、Newrelicの方にも相談してlambdaを自作することが最適だと判断しました。

自作する

実現方法

  • CostUsageReportがS3バケットにgzip圧縮されたcsv形式で追加される
  • ↑をトリガーに自作のLambdaを実行
    • S3から取得したCSVを解凍→jsonに変換→圧縮
    • 変換時に金額が0のレコードなど不要なデータは連携しないようにする

実装時の考慮点

  • NewrelicAPIには転送データのサイズの上限があるためにgzip形式で圧縮して転送する
  • NewrelicAPIにはリクエスト数上限があるので転送は1件毎ではなくある程度まとめて転送する
  • 連携されるcsvのファイルサイズは解凍後は数GBになることもあるので注意
    • ローカルストレージに一時的に保存する場合、Lambdaは/tmp配下が上限512MBのためEFSの利用が必要になる
  • S3のイベント通知は最低1回配信される仕様であるため、複数実行されることがある(参考
    • 複数実行されると金額にズレが生じるため今回はDynamoDBを使用して制御しました
    • SQSのほうがよかったかも
    • 急にダッシュボード上の金額が2倍になって焦りました

構成

  • S3
    • CostUsageReportを保存するバケット
  • Lamdba
    • EFSを利用するためVPCLambdaにする必要がある
  • EFS
    • 解凍後のCSVファイルの一時保管用
  • Cloudwatch
    • Lambda実行ログ保管用
  • DynamoDB
    • 1トリガーでのLambda複数実行を防止するために使用
  • ECR
    • Lambdaの実行環境のイメージ管理

※その他リソースについては下記コード参照

実装内容はこちら

インフラリソース(Terraformモジュール)
modules/cost_usage_report/main.tf
# =======================
# vpc
# =======================
data "aws_vpc" "this" {
  id = var.vpc_id
}

data "aws_subnet" "this" {
  id = var.subnet_id
}

# =======================
# cloud_watch
# =======================
resource "aws_cloudwatch_log_group" "this" {
  name = "/aws/lambda/${aws_lambda_function.this.function_name}"
  tags = {
    Name    = "cost-usage-report"
    Service = "cost-usage-report"
  }
}

# =======================
# dynamodb
# =======================
resource "aws_dynamodb_table" "this" {
  name           = "cost_usage_report"
  billing_mode   = "PROVISIONED"
  hash_key       = "LockID"
  write_capacity = 1
  read_capacity  = 1

  attribute {
    name = "LockID"
    type = "S"
  }
}

# =======================
# ecr
# =======================
resource "aws_ecr_repository" "this" {
  name                 = "cost-usage-report"
  image_tag_mutability = "MUTABLE"
  tags = {
    Name    = "cost-usage-report"
    Service = "cost-usage-report"
  }
}

# =======================
# efs
# =======================
resource "aws_efs_file_system" "this" {
  encrypted  = true
  kms_key_id = data.aws_kms_key.this.arn
  tags = {
    Name    = "cost-usage-report"
    Service = "cost-usage-report"
  }
}

resource "aws_efs_mount_target" "this" {
  file_system_id  = aws_efs_file_system.this.id
  subnet_id       = data.aws_subnet.this.id
  security_groups = [aws_security_group.efs.id]
}

resource "aws_efs_access_point" "this" {
  file_system_id = aws_efs_file_system.this.id

  root_directory {
    path = "/mnt/cost-usage-report"
    creation_info {
      owner_gid   = 1000
      owner_uid   = 1000
      permissions = "755"
    }
  }

  posix_user {
    gid = 1000
    uid = 1000
  }
}

# =======================
# iam
# =======================
resource "aws_iam_role" "this" {
  name = "cur-integration-role"
  assume_role_policy = templatefile("${path.module}/template/iam_policy_trust_rel.json", {
    service = "lambda.amazonaws.com"
  })
  tags = {
    Name    = "cost-usage-report"
    Service = "cost-usage-report"
  }
}

resource "aws_iam_role_policy" "s3" {
  name = "s3"
  role = aws_iam_role.this.id
  policy = templatefile("${path.module}/template/iam_policy_s3.json", {
    s3_bucket_arn = data.aws_s3_bucket.this.arn
  })
}

resource "aws_iam_role_policy" "cloud_watch" {
  name = "cloud_watch"
  role = aws_iam_role.this.id
  policy = templatefile("${path.module}/template/iam_policy_cloudwatch.json", {
    account_id    = data.aws_caller_identity.this.account_id
    log_group_arn = aws_cloudwatch_log_group.this.arn
  })
}

resource "aws_iam_role_policy" "efs" {
  name = "efs"
  role = aws_iam_role.this.id
  policy = templatefile("${path.module}/template/iam_policy_efs.json", {
    efs_file_system_id = aws_efs_file_system.this.id
  })
}

resource "aws_iam_role_policy" "network_interface" {
  name = "network_interface"
  role = aws_iam_role.this.id
  policy = templatefile("${path.module}/template/iam_policy_network_interface.json", {
    vpc_id    = var.vpc_id
    subnet_id = var.subnet_id
  })
}

resource "aws_iam_role_policy" "dynamodb" {
  name = "dynamodb"
  role = aws_iam_role.this.id
  policy = templatefile("${path.module}/template/iam_policy_dynamodb.json", {
    dynamodb_arn = aws_dynamodb_table.this.arn
  })
}

# =======================
# kms  
# =======================
data "aws_kms_key" "this" {
  key_id = "alias/aws/elasticfilesystem"
}

# =======================
# lambda
# =======================
resource "aws_lambda_function" "this" {
  architectures = [
    "arm64",
  ]
  function_name = "cost-usage-report"
  role          = aws_iam_role.this.arn
  package_type  = "Image"
  image_uri     = "${aws_ecr_repository.this.repository_url}:latest"
  timeout       = 900
  memory_size   = 5248
  file_system_config {
    arn              = aws_efs_access_point.this.arn
    local_mount_path = "/mnt/cost-usage-report"
  }

  vpc_config {
    subnet_ids         = [data.aws_subnet.this.id]
    security_group_ids = [aws_security_group.lambda.id]
  }
  environment {
    variables = {
      NEW_RELIC_LICENSE_KEY = jsondecode(data.aws_secretsmanager_secret_version.this.secret_string)["API_LICENSE_KEY"],
      NODE_ENV              = "production",
    }
  }

  lifecycle {
    ignore_changes = [
      image_uri
    ]
  }

  depends_on = [aws_efs_mount_target.this]

  tags = {
    Name    = "cost-usage-report"
    Service = "cost-usage-report"
  }
}

resource "aws_lambda_permission" "this" {
  statement_id  = "AllowExecutionFromS3Bucket"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.this.arn
  principal     = "s3.amazonaws.com"
  source_arn    = data.aws_s3_bucket.this.arn
}

resource "aws_lambda_function_event_invoke_config" "this" {
  function_name          = aws_lambda_function.this.function_name
  maximum_retry_attempts = 0
}

resource "aws_s3_bucket_notification" "this" {
  bucket = data.aws_s3_bucket.this.id

  lambda_function {
    lambda_function_arn = aws_lambda_function.this.arn
    events = [
      "s3:ObjectCreated:*",
    ]
    filter_prefix = var.filter_prefix
    filter_suffix = ".gz"
  }

  depends_on = [aws_lambda_permission.this]
}

# =======================
# s3
# =======================
data "aws_s3_bucket" "this" {
  bucket = var.bucket_name
}

# =======================
# secret manager
# =======================
data "aws_secretsmanager_secret" "this" {
  name = "newrelic"
}

data "aws_secretsmanager_secret_version" "this" {
  secret_id = data.aws_secretsmanager_secret.this.id
}

# =======================
# security group for lambda
# =======================
resource "aws_security_group" "lambda" {
  name        = "cost-usage-report-lambda"
  description = "Allow TLS inbound traffic and all outbound traffic"
  vpc_id      = data.aws_vpc.this.id
  tags = {
    Name    = "cost-usage-report"
    Service = "cost-usage-report"
  }
}

resource "aws_vpc_security_group_egress_rule" "lambda_v4" {
  security_group_id = aws_security_group.lambda.id
  cidr_ipv4         = "0.0.0.0/0"
  ip_protocol       = "-1"
  tags = {
    Name    = "cost-usage-report"
    Service = "cost-usage-report"
  }
}

resource "aws_vpc_security_group_egress_rule" "lambda_v6" {
  security_group_id = aws_security_group.lambda.id
  cidr_ipv6         = "::/0"
  ip_protocol       = "-1"
  tags = {
    Name    = "cost-usage-report"
    Service = "cost-usage-report"
  }
}

# ==================================
# security group for efs
# ==================================
resource "aws_security_group" "efs" {
  name        = "cost-usage-report-efs"
  description = "Allow TLS inbound traffic and all outbound traffic"
  vpc_id      = data.aws_vpc.this.id
  tags = {
    Name    = "cost-usage-report"
    Service = "cost-usage-report"
  }
}

resource "aws_vpc_security_group_ingress_rule" "name" {
  security_group_id            = aws_security_group.efs.id
  from_port                    = 2049
  to_port                      = 2049
  ip_protocol                  = "tcp"
  referenced_security_group_id = aws_security_group.lambda.id
  tags = {
    Name    = "cost-usage-report"
    Service = "cost-usage-report"
  }
}

resource "aws_vpc_security_group_egress_rule" "efs_v4" {
  security_group_id = aws_security_group.efs.id
  cidr_ipv4         = "0.0.0.0/0"
  ip_protocol       = "-1"
  tags = {
    Name    = "cost-usage-report"
    Service = "cost-usage-report"
  }
}

resource "aws_vpc_security_group_egress_rule" "efs_v6" {
  security_group_id = aws_security_group.efs.id
  cidr_ipv6         = "::/0"
  ip_protocol       = "-1"
  tags = {
    Name    = "cost-usage-report"
    Service = "cost-usage-report"
  }
}

# =======================
# variables
# =======================
variable "bucket_name" {
  description = "The name of the S3 bucket to store the cost usage report"
  type        = string
}

variable "filter_prefix" {
  description = "The prefix to filter the S3 bucket for"
  type        = string
}

variable "vpc_id" {
  description = "The VPC ID to deploy the cost usage report lambda function"
  type        = string
}

variable "subnet_id" {
  description = "The subnet ID to deploy the cost usage report lambda function"
  type        = string
}
modules/cost_usage_report/template/iam_policy_cloudwatch.json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "logs:CreateLogGroup",
      "Resource": "arn:aws:logs:ap-northeast-1:${account_id}:*"
    },
    {
      "Effect": "Allow",
      "Action": ["logs:CreateLogStream", "logs:PutLogEvents"],
      "Resource": ["${log_group_arn}:*"]
    }
  ]
}
modules/cost_usage_report/template/iam_policy_dynamodb.json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "dynamodb:GetItem",
        "dynamodb:PutItem",
        "dynamodb:DeleteItem"
      ],
      "Resource": ["${dynamodb_arn}"]
    }
  ]
}
modules/cost_usage_report/template/iam_policy_efs.json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "elasticfilesystem:ClientMount",
        "elasticfilesystem:ClientWrite",
        "elasticfilesystem:DescribeMountTargets"
      ],
      "Resource": "*"
    }
  ]
}
modules/cost_usage_report/template/iam_policy_network_interface.json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "ec2:CreateNetworkInterface",
        "ec2:DescribeNetworkInterfaces",
        "ec2:DeleteNetworkInterface"
      ],
      "Resource": "*"
    }
  ]
}

modules/cost_usage_report/template/iam_policy_s3.json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "VisualEditor0",
      "Effect": "Allow",
      "Action": ["s3:GetObject", "s3:GetObjectVersion", "s3:ListBucket"],
      "Resource": ["${s3_bucket_arn}", "${s3_bucket_arn}/*"]
    }
  ]
}
modules/cost_usage_report/template/iam_policy_trust_rel.json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "${service}"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
Newrelicダッシュボード(Terraformモジュール)
modules/dashboard/cost_usage_report/main.tf
resource "newrelic_one_dashboard_json" "this" {
  json = templatefile("${path.module}/template/dashboard.json", {
    account_id = var.account_id
  })
}
modules/dashboard/cost_usage_report/variables.tf
variable "account_id" {
  description = "Account ID"
  type        = string
}
modules/dashboard/templete/dashboard.json
{
  "name": "AWS CostUsageReport",
  "description": null,
  "permissions": "PUBLIC_READ_WRITE",
  "pages": [
    {
      "name": "全体",
      "description": null,
      "widgets": [
        {
          "title": "【全体】月間予算利用率",
          "layout": {
            "column": 1,
            "row": 1,
            "width": 3,
            "height": 2
          },
          "linkedEntityGuids": null,
          "visualization": {
            "id": "viz.billboard"
          },
          "rawConfiguration": {
            "facet": {
              "showOtherSeries": false
            },
            "nrqlQueries": [
              {
                "accountIds": [
                  ${account_id}
                ],
                "query": "SELECT \n    (max(`provider.actualAmount`) / max(`provider.limitAmount`)) * 100 as '%'\nFROM FinanceSample SINCE 1 day ago"
              }
            ],
            "platformOptions": {
              "ignoreTimeRange": false
            }
          }
        },
        {
          "title": "【全体】月間予算上限額",
          "layout": {
            "column": 4,
            "row": 1,
            "width": 3,
            "height": 2
          },
          "linkedEntityGuids": null,
          "visualization": {
            "id": "viz.billboard"
          },
          "rawConfiguration": {
            "facet": {
              "showOtherSeries": false
            },
            "nrqlQueries": [
              {
                "accountIds": [
                  ${account_id}
                ],
                "query": "SELECT \n    max(`provider.limitAmount`) as '$'\nFROM FinanceSample SINCE 1 day ago"
              }
            ],
            "platformOptions": {
              "ignoreTimeRange": false
            }
          }
        },
        {
          "title": "【全体】月間想定合計利用額",
          "layout": {
            "column": 7,
            "row": 1,
            "width": 3,
            "height": 2
          },
          "linkedEntityGuids": null,
          "visualization": {
            "id": "viz.billboard"
          },
          "rawConfiguration": {
            "facet": {
              "showOtherSeries": false
            },
            "nrqlQueries": [
              {
                "accountIds": [
                  ${account_id}
                ],
                "query": "SELECT\n    latest(`provider.forecastedAmount`) as '$' \nFROM FinanceSample SINCE 1 day ago"
              }
            ],
            "platformOptions": {
              "ignoreTimeRange": false
            }
          }
        },
        {
          "title": "【全体】月間合計利用額",
          "layout": {
            "column": 10,
            "row": 1,
            "width": 3,
            "height": 2
          },
          "linkedEntityGuids": null,
          "visualization": {
            "id": "viz.billboard"
          },
          "rawConfiguration": {
            "facet": {
              "showOtherSeries": false
            },
            "nrqlQueries": [
              {
                "accountIds": [
                  ${account_id}
                ],
                "query": "SELECT\n    max(`provider.actualAmount`) as '$'\nFROM FinanceSample SINCE 1 day ago"
              }
            ],
            "platformOptions": {
              "ignoreTimeRange": false
            }
          }
        },
        {
          "title": "【サービス別】月間利用額総額",
          "layout": {
            "column": 1,
            "row": 3,
            "width": 6,
            "height": 3
          },
          "linkedEntityGuids": null,
          "visualization": {
            "id": "viz.pie"
          },
          "rawConfiguration": {
            "facet": {
              "showOtherSeries": true
            },
            "legend": {
              "enabled": true
            },
            "nrqlQueries": [
              {
                "accountIds": [
                  ${account_id}
                ],
                "query": "SELECT\n    max(cost) as 'total'\nFROM\n    (\n        SELECT\n            sum(numeric(`lineItem/BlendedCost`)) as cost\n        FROM\n            Log\n        WHERE\n            `logtype` = 'cur-integration'  TIMESERIES 1 hours FACET `resourceTags/user:Service` LIMIT MAX\n    ) SINCE 24 hours ago FACET `resourceTags/user:Service` LIMIT MAX"
              }
            ],
            "platformOptions": {
              "ignoreTimeRange": false
            }
          }
        },
        {
          "title": "【サービス別】月間利用額推移",
          "layout": {
            "column": 7,
            "row": 3,
            "width": 6,
            "height": 3
          },
          "linkedEntityGuids": null,
          "visualization": {
            "id": "viz.stacked-bar"
          },
          "rawConfiguration": {
            "facet": {
              "showOtherSeries": false
            },
            "legend": {
              "enabled": true
            },
            "nrqlQueries": [
              {
                "accountIds": [
                  ${account_id}
                ],
                "query": "SELECT\n    max(cost) as 'total'\nFROM\n    (\n        SELECT\n            sum(numeric(`lineItem/BlendedCost`)) as cost\n        FROM\n            Log\n        WHERE\n            `logtype` = 'cur-integration'  TIMESERIES 4 hours facet `resourceTags/user:Service` LIMIT MAX\n    ) SINCE this month TIMESERIES 1 day facet `resourceTags/user:Service` LIMIT MAX"
              }
            ],
            "platformOptions": {
              "ignoreTimeRange": false
            }
          }
        },
        {
          "title": "【AWSリソース別】月間利用額",
          "layout": {
            "column": 1,
            "row": 6,
            "width": 6,
            "height": 3
          },
          "linkedEntityGuids": null,
          "visualization": {
            "id": "viz.pie"
          },
          "rawConfiguration": {
            "facet": {
              "showOtherSeries": true
            },
            "legend": {
              "enabled": true
            },
            "nrqlQueries": [
              {
                "accountIds": [
                  ${account_id}
                ],
                "query": "SELECT\n    max(cost) as 'total'\nFROM\n    (\n        SELECT\n            sum(numeric(`lineItem/BlendedCost`)) as cost\n        FROM\n            Log\n        WHERE\n            `logtype` = 'cur-integration'  TIMESERIES 1 hours facet `product/ProductName`\n        LIMIT MAX\n    ) SINCE 24 hours ago facet `product/ProductName` LIMIT MAX"
              }
            ],
            "platformOptions": {
              "ignoreTimeRange": false
            }
          }
        },
        {
          "title": "【AWSリソース別】月間利用額推移",
          "layout": {
            "column": 7,
            "row": 6,
            "width": 6,
            "height": 3
          },
          "linkedEntityGuids": null,
          "visualization": {
            "id": "viz.stacked-bar"
          },
          "rawConfiguration": {
            "facet": {
              "showOtherSeries": false
            },
            "legend": {
              "enabled": true
            },
            "nrqlQueries": [
              {
                "accountIds": [
                  ${account_id}
                ],
                "query": "SELECT\n    max(cost)\nFROM\n    (\n        SELECT\n            sum(numeric(`lineItem/BlendedCost`)) as cost\n        FROM\n            Log\n        WHERE\n            `logtype` = 'cur-integration'  TIMESERIES 4 hours facet `product/ProductName` LIMIT MAX\n    ) SINCE this month TIMESERIES 1 day facet `product/ProductName` LIMIT MAX"
              }
            ],
            "platformOptions": {
              "ignoreTimeRange": false
            }
          }
        }
      ]
    },
    {
      "name": "サービス別",
      "description": null,
      "widgets": [
        {
          "title": "【サービス別】月間利用額",
          "layout": {
            "column": 1,
            "row": 1,
            "width": 6,
            "height": 3
          },
          "linkedEntityGuids": null,
          "visualization": {
            "id": "viz.pie"
          },
          "rawConfiguration": {
            "facet": {
              "showOtherSeries": true
            },
            "legend": {
              "enabled": true
            },
            "nrqlQueries": [
              {
                "accountIds": [
                  ${account_id}
                ],
                "query": "SELECT\n    max(cost) as 'total'\nFROM\n    (\n        SELECT\n            sum(numeric(`lineItem/BlendedCost`)) as cost\n        FROM\n            Log\n        WHERE\n            `logtype` = 'cur-integration'  \n        AND \n            `resourceTags/user:Service` IN ({{serviceName}})\n        TIMESERIES \n            1 hours \n        facet \n           `resourceTags/user:Service`,`product/ProductName`\n        LIMIT MAX\n    ) SINCE 24 hours ago facet `product/ProductName` LIMIT MAX"
              }
            ],
            "platformOptions": {
              "ignoreTimeRange": false
            }
          }
        },
        {
          "title": "【サービス別】月間利用額推移",
          "layout": {
            "column": 7,
            "row": 1,
            "width": 6,
            "height": 3
          },
          "linkedEntityGuids": null,
          "visualization": {
            "id": "viz.stacked-bar"
          },
          "rawConfiguration": {
            "facet": {
              "showOtherSeries": false
            },
            "legend": {
              "enabled": true
            },
            "nrqlQueries": [
              {
                "accountIds": [
                  ${account_id}
                ],
                "query": "SELECT\n    max(cost) as 'total'\nFROM\n    (\n        SELECT\n            sum(numeric(`lineItem/BlendedCost`)) as cost\n        FROM\n            Log\n        WHERE\n            `logtype` = 'cur-integration'  \n        AND \n            `resourceTags/user:Service` IN ({{serviceName}})\n        TIMESERIES 4 hours facet `resourceTags/user:Service`,`product/ProductName`\n        LIMIT MAX\n    ) SINCE this month TIMESERIES 1 day facet `product/ProductName` LIMIT MAX"
              }
            ],
            "platformOptions": {
              "ignoreTimeRange": false
            }
          }
        }
      ]
    }
  ],
  "variables": [
    {
      "name": "serviceName",
      "items": null,
      "defaultValues": [],
      "nrqlQuery": {
        "accountIds": [
          ${account_id}
        ],
        "query": "FROM Log SELECT uniques(`resourceTags/user:Service`) WHERE `resourceTags/user:Service` not like '%resourceTags%'SINCE this month"
      },
      "options": {
        "ignoreTimeRange": false
      },
      "title": "serviceName",
      "type": "NRQL",
      "isMultiSelection": null,
      "replacementStrategy": "STRING"
    }
  ]
}
lambda関数
costUsageReport.ts
import { S3Event } from 'aws-lambda';
import { parse, Parser } from 'csv-parse';
import AWS from 'aws-sdk';
import dotenv from 'dotenv';
import path from 'path';
import fs from 'fs';
import zlib from 'zlib';
import axiosRetry from 'axios-retry';
import axios from 'axios';
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import {
  DeleteCommand,
  DynamoDBDocumentClient,
  GetCommand,
  PutCommand,
} from '@aws-sdk/lib-dynamodb';

/**
 * Lambdaハンドラ
 */
export const handler = async (event: S3Event): Promise<void> => {
  dotenv.config();

  const dynamoDbHandlerProps = createDynamoDbHandlerProps(event);
  const csvFileDir = createUniqueFilePath();

  const checkTransactionFunc = await checkTransaction(dynamoDbHandlerProps);
  const startTransactionFunc = await startTransaction(dynamoDbHandlerProps);
  const endTransactionFunc = await endTransaction(dynamoDbHandlerProps);
  const writeBufferToFileFunc = writeBufferToFile(csvFileDir);
  const sendLogFunc = sendLog(csvFileDir);

  await checkTransactionFunc(event)
    .then(startTransactionFunc)
    .then(getBodyFromS3)
    .then(validateS3Object)
    .then(decompressGzip)
    .then(writeBufferToFileFunc)
    .then(sendLogFunc)
    .then(() => console.log('送信処理が成功しました'))
    .catch(handleError)
    .finally(async () => {
      deleteFile(csvFileDir);
      await endTransactionFunc();
      console.log('処理を終了します');
    });
};

/**
 * エラー
 */
class S3ObjectEmptyError extends Error {}
class S3ObjectNotFoundError extends Error {}
class InvalidFileFormatError extends Error {}
class DuplicateTransactionError extends Error {}

/**
 * 型
 */
type ParseJson = { [key: string]: string | number | boolean | object };
type DynamoDbHandlerProps = {
  tableName: string;
  client: DynamoDBDocumentClient;
  lockId: string;
};

/**
 * 関数
 */
function createUniqueFilePath(): string {
  const timestamp = new Date().getTime();
  const testCsvFileDir = `../test/${timestamp}_output.csv`;
  const prdCsvFileDir = `/mnt/cost-usage-report/${timestamp}_output.csv`;

  return process.env.NODE_ENV === 'test' ? testCsvFileDir : prdCsvFileDir;
}

function createDynamoDbHandlerProps(event: S3Event): DynamoDbHandlerProps {
  const bucketName = event.Records[0].s3.bucket.name;
  const objectKey = decodeURIComponent(
    event.Records[0].s3.object.key.replace(/\+/g, ' '),
  );

  const lockId = `${bucketName}/${objectKey}`;
  const client = DynamoDBDocumentClient.from(
    new DynamoDBClient({
      region: 'ap-northeast-1',
      ...(process.env.NODE_ENV === 'test' && {
        endpoint: 'http://localhost:8000',
      }),
    }),
  );
  const tableName = 'cost_usage_report';

  return { client, lockId, tableName } as const;
}

async function checkTransaction(
  props: DynamoDbHandlerProps,
): Promise<(event: S3Event) => Promise<S3Event>> {
  return async (event) => {
    const getCommand = new GetCommand({
      TableName: props.tableName,
      Key: {
        LockID: props.lockId,
      },
    });

    await props.client.send(getCommand).then((result) => {
      if (result.Item) {
        throw new DuplicateTransactionError(
          '同一ファイルに対する処理が既に実行されています',
        );
      }
    });

    console.log('Transactionが開始されていないため処理を続行します');
    return event;
  };
}

async function startTransaction(
  props: DynamoDbHandlerProps,
): Promise<(event: S3Event) => Promise<S3Event>> {
  return async (event) => {
    console.log('Transactionを開始します');
    const putCommand = new PutCommand({
      TableName: props.tableName,
      Item: {
        LockID: props.lockId,
      },
    });

    await props.client.send(putCommand);

    return event;
  };
}

async function endTransaction(
  props: DynamoDbHandlerProps,
): Promise<() => Promise<void>> {
  return async () => {
    console.log('Transactionを終了します');
    const deleteCommand = new DeleteCommand({
      TableName: props.tableName,
      Key: {
        LockID: props.lockId,
      },
    });

    await props.client.send(deleteCommand);
  };
}

async function getBodyFromS3(event: S3Event): Promise<Buffer> {
  console.log('s3からデータを取得します');
  const bucketName = event.Records[0].s3.bucket.name;
  const objectKey = decodeURIComponent(
    event.Records[0].s3.object.key.replace(/\+/g, ' '),
  );

  return await new AWS.S3()
    .getObject({ Bucket: bucketName, Key: objectKey })
    .promise()
    .then((object) => {
      if (!object.Body)
        throw new S3ObjectEmptyError('S3から取得したファイルが空です');
      return object.Body as Buffer;
    })
    .catch((error) => {
      throw new S3ObjectNotFoundError(
        `S3からファイルが取得できませんでした: ${error}`,
      );
    });
}

function validateS3Object(s3Object: Buffer): Buffer {
  console.log('ファイル形式の検証をします');
  const isGzip = (buffer: Buffer): boolean => {
    const gzipHeader = [0x1f, 0x8b];
    return gzipHeader.includes(buffer[0]) && gzipHeader.includes(buffer[1]);
  };

  if (!isGzip(s3Object)) {
    throw new InvalidFileFormatError('取得したファイルの形式が不正です');
  }

  return s3Object;
}

function decompressGzip(buffer: Buffer): Buffer {
  console.log('ファイルの解凍をします');
  return zlib.unzipSync(buffer);
}

function writeBufferToFile(fileDir: string): (buffer: Buffer) => void {
  return (buffer: Buffer) => {
    console.log('ファイルの書き込み処理を行います');
    fs.writeFileSync(path.resolve(__dirname, fileDir), buffer);
  };
}

function sendLog(csvFileDir: string): () => Promise<void> {
  const compress = (data: object[]): Buffer => {
    return zlib.gzipSync(JSON.stringify(data));
  };

  const createReadCsvStream = (fileDir: string) => {
    return fs.createReadStream(path.resolve(__dirname, fileDir));
  };

  const processObjectToSend = (object: ParseJson): ParseJson | undefined => {
    const zeroCost = 0;
    const isZeroCost =
      object['lineItem/BlendedCost'] == '' ||
      parseFloat(object['lineItem/BlendedCost'] as string) === zeroCost;

    if (isZeroCost) {
      return undefined;
    }

    const emptyValueRemovedObject: ParseJson = JSON.parse(
      JSON.stringify(object),
      (key, value) => {
        return value === '' ? undefined : value;
      },
    );

    return {
      ...emptyValueRemovedObject,
      logtype: 'cur-integration',
    };
  };

  const createParser = async (fileDir: string): Promise<Parser> => {
    return new Promise((resolve, reject) => {
      const headerParser = parse({ columns: false, to_line: 1 });
      createReadCsvStream(fileDir)
        .pipe(headerParser)
        .on('readable', () => {
          resolve(parse({ columns: headerParser.read(), from_line: 2 }));
        })
        .on('error', (err) => {
          reject(err);
        });
    });
  };

  const send = async (data: Buffer): Promise<void> => {
    if (data.length === 0) {
      return;
    }

    const retryCount = 3;
    const url = 'https://log-api.newrelic.com/log/v1';
    const headers = {
      'Content-Type': 'application/json',
      'Content-Encoding': 'gzip',
      'Api-Key': process.env.NEW_RELIC_LICENSE_KEY as string,
    };

    axiosRetry(axios, {
      retries: retryCount,
      retryDelay: axiosRetry.exponentialDelay,
      retryCondition: () => true,
      onRetry(retryCount, error) {
        console.log(`リトライ試行回数 #${retryCount}`);
        console.error(
          `ステータスコード :${error.status}/ message:${error.message}`,
        );
      },
    });

    await axios.post(url, data, { headers });
  };

  return async () => {
    console.log('ログを送信します');
    const chunkSize = process.env.NODE_ENV == 'test' ? 1 : 5000;
    const resultList: object[] = [];
    const parser = await createParser(csvFileDir);

    return new Promise((resolve, reject) => {
      createReadCsvStream(csvFileDir)
        .pipe(parser)
        .on('readable', async () => {
          let record: ParseJson | undefined;
          while ((record = parser.read()) !== null) {
            if (!record) continue;
            const result = processObjectToSend(record);
            if (result) resultList.push(result);

            if (resultList.length >= chunkSize) {
              console.log(`ログを${resultList.length}件送信します`);
              send(compress(resultList));
              resultList.length = 0;
            }
          }
        })
        .on('end', async () => {
          if (resultList.length > 0) {
            console.log(`ログを${resultList.length}件送信します`);
            send(compress(resultList));
          }
          resolve();
        })
        .on('error', (err: unknown) => {
          reject(err);
        });
    });
  };
}

function deleteFile(fileDir: string): void {
  if (!fs.existsSync(path.resolve(__dirname, fileDir))) {
    return;
  }

  console.log(`ファイルを削除します: ${fileDir}`);
  fs.unlinkSync(path.resolve(__dirname, fileDir));
}

function handleError(error: Error) {
  // 重複エラーによる処理終了は正常終了とみなす
  if (error instanceof DuplicateTransactionError) {
    console.warn(`処理中断:${error.message}`);
    return;
  }

  console.error(`処理失敗:${error.message}`);
  throw error;
}

costUsageReport.test.ts
import { S3Event } from 'aws-lambda';
import { handler } from './costUsageReport';
import AWS from 'aws-sdk';
import zlib from 'zlib';
import axios from 'axios';
import { DynamoDBDocumentClient } from '@aws-sdk/lib-dynamodb';

const s3EventMock: S3Event = {
  Records: [
    {
      eventVersion: '2.1',
      eventSource: 'aws:s3',
      awsRegion: 'us-east-1',
      eventTime: '2021-01-01T12:00:00.000Z',
      eventName: 'ObjectCreated:Put',
      userIdentity: {
        principalId: 'EXAMPLE',
      },
      requestParameters: {
        sourceIPAddress: '127.0.0.1',
      },
      responseElements: {
        'x-amz-request-id': 'EXAMPLE123456789',
        'x-amz-id-2': 'EXAMPLE123/EXAMPLE123456789/EXAMPLE123456789',
      },
      s3: {
        s3SchemaVersion: '1.0',
        configurationId: 'testConfigRule',
        bucket: {
          name: 'example-bucket',
          ownerIdentity: {
            principalId: 'EXAMPLE',
          },
          arn: 'arn:aws:s3:::example-bucket',
        },
        object: {
          key: 'example-key.csv.gz',
          size: 1024,
          eTag: '123456789abcdef123456789abcdef12',
          sequencer: '1234567890ABCDEF',
        },
      },
    },
  ],
};

jest.mock('aws-sdk', () => {
  const s3Mock = {
    getObject: jest.fn().mockReturnThis(),
    promise: jest.fn(),
  };
  return { S3: jest.fn(() => s3Mock) };
});

jest.mock('axios', () => {
  const axios = jest.requireActual('axios');
  const axiosMock = {
    ...axios,
    post: jest.fn(), // Newrelicにログを送信したい場合コメントアウト
  };
  return axiosMock;
});

describe('costUsageReport', () => {
  beforeEach(() => {
    // dynamodbによるトランザクションで重複して処理が実行されないようにオブジェクトのキーをテストケースごとに一意にする
    s3EventMock.Records[0].s3.object.key = `example-key-${new Date().getTime()}.csv.gz`;

    jest.clearAllMocks();
  });

  it('CSVが正しい形式の場合処理が成功すること', async () => {
    const csvContent = 'name,age\n"John",30\nDoe,40';
    const gzipContent = zlib.gzipSync(csvContent);

    const s3 = new AWS.S3();
    (s3.getObject().promise as jest.Mock).mockResolvedValue({
      Body: gzipContent,
    });

    await expect(handler(s3EventMock)).resolves.not.toThrow();
    expect(axios.post).toHaveBeenCalledTimes(2);
  });

  it('CSVのレコードデータが重複して送信されないこと', async () => {
    const csvContent = 'name,age\n"John",30\nDoe,40';
    const gzipContent = zlib.gzipSync(csvContent);

    const s3 = new AWS.S3();
    (s3.getObject().promise as jest.Mock).mockResolvedValue({
      Body: gzipContent,
    });

    await handler(s3EventMock).then(() => {
      expect(axios.post).toHaveBeenCalledTimes(2);
    });
  });

  it('CSVが不正な形式の場合エラーが発生すること', async () => {
    const csvContent = '{"name": "John", "age": 30}';
    const gzipContent = zlib.gzipSync(csvContent);

    const s3 = new AWS.S3();
    (s3.getObject().promise as jest.Mock).mockResolvedValue({
      Body: gzipContent,
    });

    await expect(handler(s3EventMock)).rejects.toThrow();
  });

  it('CSVが空の場合エラーが発生すること', async () => {
    const s3 = new AWS.S3();
    (s3.getObject().promise as jest.Mock).mockResolvedValue({
      Body: undefined,
    });

    await expect(handler(s3EventMock)).rejects.toThrow();
  });

  it('gzipファイルでない場合エラーが発生すること', async () => {
    const csvContent = 'name,age\n"John",30\nDoe,40';

    const s3 = new AWS.S3();
    (s3.getObject().promise as jest.Mock).mockResolvedValue({
      Body: csvContent,
    });

    await expect(handler(s3EventMock)).rejects.toThrow();
  });

  it('重複エラーが発生しても処理が正常に終了すること', async () => {
    // 重複エラーを再現するためにDynamoDBのsend関数のレスポンスをモックしてLockIdを返すようにしている
    jest.spyOn(DynamoDBDocumentClient, 'from').mockReturnValue({
      send: jest
        .fn()
        .mockResolvedValueOnce({ Item: { LockID: 'existing-lock-id' } }),
    } as any);

    const csvContent = 'name,age\n"John",30\nDoe,40';
    const gzipContent = zlib.gzipSync(csvContent);

    const s3 = new AWS.S3();
    (s3.getObject().promise as jest.Mock).mockResolvedValue({
      Body: gzipContent,
    });

    await expect(handler(s3EventMock)).resolves.not.toThrow();
    expect(axios.post).toHaveBeenCalledTimes(0);
  });
});

おわりに

開発者がAWSのコンソールからコストを確認するのはやや敷居が高いかもしれませんが、NewRelicのダッシュボードからコスト情報が見られるようになり、確認が格段に容易になりました。

さらに、タグを使ったフィルタリングにより、開発チームやシステムごとのコストを把握できるため、メトリクスと同様にシステムの状態を可視化し、改善の動きにつなげやすくなったと感じています。

レバテック開発部

Discussion