CloudWatchLogsからS3にログを集約してAthenaでクエリする

TL;DR

  • CloudWatchLogsの内容を複数のLogGroupに跨って分析
  • ログの日時を絞り込んで検索できるように(日時はJSTで)
  • 検索対象とするLogGroupは任意に指定できるように
  • なるべく低コストで。なるべく運用負荷がかからないように。

経緯

LambdaやFargateからのログはCloudWatchLogsにそれぞれLogGroup/LogStreamが分かれた形で出力されます。 serverless構成が多くなったりすると、これらを時系列で横断的にログ確認したくなるのですが CloudWatchでクエリーするのは面倒だったりレスポンスが遅かったり。

スタンダードな構成としてはCloudWatchLogsからOpenSearchService(ES)に連携して Kibanaとかでログ分析する形になると思いますが、これだとデータ保持やインスタンス時間課金が気になるので 今回は除外しました。

もっと簡単に日時範囲だけを指定してサッと横断的に生ログを確認する方法がほしくなったのでやってみた。

今回作った最終アウトプット

先に結果から。 以下のように Athena で日時を指定して生ログを検索することができます

picture 1

なるべく手軽にログ検索できるように以下のことができるようになっています

  • 複数のLogGroup/LogStreamを横断的に検索
    • LogGroup/LogStreamの内容をS3にまとめて検索
  • どんなフォーマットのログでも対応
    • 生ログをそのまま出力
    • それぞれのアプリケーションのログフォーマットが統一されていなくてもOK
      • ただの文字列/JSON文字列/CSV/TSVなど混在していてもOK
  • S3に出力する対象のLogGroupを指定できる
    • 必要なLogGroupだけS3に出力
    • 後から追加・削除も簡単
  • 年月日時分秒で絞り込み
    • ログ発生の秒範囲まで指定して検索できる
    • UTCでなくJSTで検索できる
  • ほぼリアルタイムで反映
    • バッチでなくstreamで処理することでログ内容をすぐに反映
  • なるべく低コスト
    • S3を年月日時でパーティションすることでスキャン範囲を効率化
    • CloudWatchLogsに貯めるよりS3のほうが安価
      • さらに古いログはLifeCycleでGlacier行き
      • CloudWatchLogsの保持期間を短くできる
    • serverlessで組むことで時間課金を極力下げる

アーキテクチャ

アーキテクチャ

方式検討

まずCloudWatchLogsの内容をS3に出力する部分です。

CloudWatchLogsからS3に出力する方法はいくつか考えられます。

  1. CloudWatchLogsのS3ログエクスポート機能を使う
  2. CloudWatchLogsのsubscription filter + LambdaでS3に出力する
  3. CloudWatchLogsのsubscription filter + Kinesis DataStreams + LambdaでS3に出力する
  4. CloudWatchLogsのsubscription filter + Kinesis Firehose + LambdaでS3に出力する

CloudWatchLogsのS3ログエクスポート機能は非常に簡単で、実行するだけで出力してくれるのですが、 S3に出力されるログがスペース区切りの文字列形式で、かつtimestampがUTCだったりするため 分析するにはS3出力後に二次加工が必要になりそうです。

それ以外の方法だと基本、どこかでLambdaを使って整形する必要がありそうですが、 どのみちUTC→JST変換などカスタム処理を入れたいので避けられないかと。

ということで今回は 4.CloudWatchLogsのsubscription filter + Kinesis Firehose + LambdaでS3に出力する を採用しました

またS3に出力したログを検索・分析するには Athena を使うと手っ取り早いです。 Athena ではデータのスキーマ(型)が必要になります。 これは Athenacreate table で自分で作ることもできますが Glue のCrawlerを使うことで自動化することが可能です。

こんな感じで作成していきます。

作成前に知っておくといいこと

CloudWatchLogs -> subscription filter -> Kinesis Firehose(+ Lambda) -> S3 を実装するうえで 色々とハマりポイントがあるのでまとめておきます。

FirehoseでLambda整形は必須

CloudWatchLogsからFirehoseに連携してAthenaで検索したい場合、 今回のUTC -> JST変換みたいな固有の要件がない場合でも FirehoseでのTransform Lambdaが必須になります。

理由は以下のとおり

  1. Athenaは1行に1ログレコードの形式じゃないとダメ
  2. CloudWatchLogsからのログはそのままだと改行コードつかない

つまり今の流れでAthenaで分析できるようにするには 最低でもLambdaを使ってレコード毎の改行をつける必要があります。

Kinesis Firehoseのインプットはbase64 + gzipされてる

Firehoseから呼び出されるLambdaでの考慮事項です。 ログレコードを取得するにはLambda内でbase64デコードとgunzipしてあげる必要があります

Firehoseは出力を自動でgzipしてくれない

picture 2

あたかもgzipしてくれそうな設定がありますが、これを GZIP にして S3に出力されたファイルを確認したら中身は圧縮されていませんでした。 出力されたS3 Objectのファイル名につく拡張子は .gz ってなってるけど・・・

仕方がないのでLambdaで出力時にgzipして対応しました

※上記はFirehoseでLambda変換使わない時はgzipしてくれるかもしれません(未検証)

FirehoseのtimestampはUTC

FirehoseではS3に出力する際にprefixを指定する機能があります。 また、以下のようにFirehoseに連携された時点の timestamp を使って年月日などにすることが可能になっています。

s3://bucket-name/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/

これなら簡単にS3をpartitioningできる・・・と思いましたがタイムゾーンがUTCなのです。 そうなるとAthenaで検索する時にJSTとの差を考慮しなくてはならずめんどくさい・・・

これに対応するにはLambdaでJSTにしたものを使ってdynamic partitioningをしてあげる必要があります。 具体的には

  • Firehoseから呼ばれるLambdaで metadata.partitionKeys を一緒に返却するようにする
  • Firehoseの設定でS3 prefixに上記設定したkeyを使って設定する

ということになります。

作成手順と解説

Transform Lambda

Firehoseでの肝になります。 まず全量です。

ポイントを解説していきます。

Firehoseのtransformとして使うLambdaは FirehoseTransformatonHandler を型にするとよいです。

1
2
3
4
5
import { FirehoseTransformationHandler } from 'aws-lambda'

export const handler: FirehoseTransformationHandler = async event => {
  return { records: xxxxx }
}

引数 event は以下のようになっています

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
export interface FirehoseTransformationEvent {
  invocationId: string
  deliveryStreamArn: string
  sourceKinesisStreamArn?: string | undefined
  region: string
  records: FirehoseTransformationEventRecord[]
}

export interface FirehoseTransformationEventRecord {
  recordId: string
  approximateArrivalTimestamp: number
  /** Base64 encoded */
  data: string
  kinesisRecordMetadata?: FirehoseRecordMetadata | undefined
}

この records->data の中にログデータが入っていますが、records->data

  • 複数件のログが入っている
  • base64 + gzipで圧縮されている

ので注意が必要です。

返り値 records は以下の型の配列になっています。

1
2
3
4
5
6
7
export interface FirehoseTransformationResultRecord {
  recordId: string
  result: FirehoseRecordTransformationStatus
  /** Encode in Base64 */
  data: string
  metadata?: FirehoseTransformationMetadata
}
  • recordId: handlerの引数 event.records->recordId を設定します。
  • result: Ok Dropped ProcessingFailed のいずれかの値を設定します。
  • data: 出力データを設定します。base64 でencodeしたものを設定します。
  • metadata: dynamic partitioningで利用する partitionKeys を設定します(後述)

Firehoseからのログデータをbase64 decodeしてgunzipします。

解凍したデータは JSON.parse することで CloudWatchLogsDecodedData 型として扱うことができます。

1
2
3
4
const rawLogs = zlib
  .gunzipSync(Buffer.from(record.data, 'base64'))
  .toString('utf-8')
const logRecord = JSON.parse(rawLogs) as CloudWatchLogsDecodedData

フォーマット変換の本体部分です。

filter しているのは不要なログを除外したいためです。

Lambdaでこのように判定してもOKですが、
CloudWatchLogsのsubscription filterのfilter条件でそもそも除外してもよいでしょう

datetime をログデータとして埋め込んでいます。ここでJSTでformatしています。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
const records = logRecord.logEvents
  .filter(
    e =>
      // drop lambda control message
      !e.message.startsWith('START RequestId:') &&
      !e.message.startsWith('REPORT RequestId:')
  )
  .map(e =>
    JSON.stringify({
      timestamp: e.timestamp,
      datetime: formatTZ(new Date(e.timestamp), 'yyyyMMddHHmmss'),
      message: e.message,
    })
  )

フォーマット変換したログデータをbase64 + gzip圧縮します。

1
2
3
4
5
const data = GZIP_COMPRESS
  ? zlib
      .gzipSync(Buffer.from(`${records.join(`\n`)}\n`, 'utf8'))
      .toString('base64')
  : Buffer.from(`${records.join(`\n`)}\n`, 'utf8').toString('base64')

ここで ログデータとともに、metadata.partitionKeys に年月日などを設定して返却しています。

ここで値を設定しておくと、Firehose側でs3に出力する際のprefixとして利用できます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
return {
  result: 'Ok',
  recordId: record.recordId,
  data,
  metadata: {
    partitionKeys: {
      year: formatTZ(date, 'yyyy'),
      month: formatTZ(date, 'MM'),
      day: formatTZ(date, 'dd'),
      hour: formatTZ(date, 'HH'),
    },
  },
}

AWS Resources

CDKで作成していきます。ポイントだけ解説します。

S3 Bucket

まずS3Bucketを作ります。lifecycleで古くなったログはglacierに置くようにしていますが このあたりは好きに設定してOKです。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
const s3Bucket = new aws_s3.Bucket(this, 'LogBucket', {
  bucketName: `${PREFIX}-logs`,
  objectOwnership: aws_s3.ObjectOwnership.BUCKET_OWNER_ENFORCED,
  blockPublicAccess: aws_s3.BlockPublicAccess.BLOCK_ALL,
  removalPolicy: RemovalPolicy.RETAIN,
  autoDeleteObjects: false,
  versioned: false,
  publicReadAccess: false,
  encryption: aws_s3.BucketEncryption.S3_MANAGED,
  lifecycleRules: [
    {
      abortIncompleteMultipartUploadAfter: Duration.days(30),
      transitions: [
        {
          storageClass: aws_s3.StorageClass.INFREQUENT_ACCESS,
          transitionAfter: Duration.days(30),
        },
        {
          storageClass: aws_s3.StorageClass.GLACIER,
          transitionAfter: Duration.days(730),
        },
      ],
    },
  ],
})

IAM Roles

Transform Lambdaの実行ロールを作ります。

service-role/AWSLambdaKinesisExecutionRole をつけてあげる必要があります。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
const lambdaRole = new aws_iam.Role(this, 'LambdaRole', {
  roleName: `${PREFIX}-firehose-transform-role`,
  assumedBy: new aws_iam.ServicePrincipal('lambda.amazonaws.com'),
  path: '/',
  managedPolicies: [
    aws_iam.ManagedPolicy.fromAwsManagedPolicyName(
      'service-role/AWSLambdaBasicExecutionRole'
    ),
    aws_iam.ManagedPolicy.fromAwsManagedPolicyName('AWSXRayDaemonWriteAccess'),
    aws_iam.ManagedPolicy.fromAwsManagedPolicyName(
      'service-role/AWSLambdaKinesisExecutionRole'
    ),
  ],
})

Firehose甩の実行ロールを作ります。

主に S3 Kinesis Lambda CloudWatchLogs あたりのアクセス権が必要になります。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
const deliveryStreamRole = new aws_iam.Role(this, 'DeliveryStreamRole', {
  roleName: `${PREFIX}-firehose-delivery-role`,
  assumedBy: new aws_iam.ServicePrincipal(`firehose.amazonaws.com`),
  inlinePolicies: {
    policy: new aws_iam.PolicyDocument({
      statements: [
        new aws_iam.PolicyStatement({
          effect: aws_iam.Effect.ALLOW,
          actions: [
            's3:AbortMultipartUpload',
            's3:GetBucketLocation',
            's3:GetObject',
            's3:ListBucket',
            's3:ListBucketMultipartUploads',
            's3:PutObject',
          ],
          resources: [bucket.bucketArn, `${bucket.bucketArn}/*`],
        }),
        new aws_iam.PolicyStatement({
          effect: aws_iam.Effect.ALLOW,
          actions: [
            'kinesis:DescribeStream',
            'kinesis:GetShardIterator',
            'kinesis:GetRecords',
            'kinesis:ListShards',
          ],
          resources: ['*'],
        }),
        new aws_iam.PolicyStatement({
          effect: aws_iam.Effect.ALLOW,
          actions: ['lambda:InvokeFunction', 'lambda:GetFunctionConfiguration'],
          resources: ['*'],
        }),
        new aws_iam.PolicyStatement({
          effect: aws_iam.Effect.ALLOW,
          actions: ['lambda:InvokeFunction', 'lambda:GetFunctionConfiguration'],
          resources: ['*'],
        }),
        new aws_iam.PolicyStatement({
          effect: aws_iam.Effect.ALLOW,
          actions: [
            'logs:GetLogEvents',
            'logs:PutLogEvents',
            'logs:CreateLogStream',
            'logs:DescribeLogStreams',
            'logs:CreateLogGroup',
          ],
          resources: ['*'],
        }),
      ],
    }),
  },
})

subscription filterに付与する実行ロールを作っておきます。

これは後でLogGroupにsubscription filterをつける時に必要になります。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
const subscriptionFilterRole = new aws_iam.Role(
  this,
  'SubscriptionFilterRole',
  {
    roleName: `${PREFIX}-cloudwatchlogs-subscription-role`,
    assumedBy: new aws_iam.ServicePrincipal(`logs.amazonaws.com`),
    inlinePolicies: {
      policy: new aws_iam.PolicyDocument({
        statements: [
          new aws_iam.PolicyStatement({
            effect: aws_iam.Effect.ALLOW,
            actions: ['firehose:*'],
            resources: [
              `arn:aws:firehose:${this.region}:${this.account}:deliverystream/${PREFIX}*`,
            ],
          }),
        ],
      }),
    },
  }
)

Glueの実行ロールを作っておきます。

crawl対象(ログを出力しておくS3 bucket)に対して GetObject できるようにアクセス権を追加しています。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
const glueRole = new aws_iam.Role(this, 'GlueRole', {
  roleName: `${PREFIX}-cwl-export-glue-role`,
  assumedBy: new aws_iam.ServicePrincipal('glue.amazonaws.com'),
  path: '/',
  managedPolicies: [
    aws_iam.ManagedPolicy.fromAwsManagedPolicyName(
      'service-role/AWSGlueServiceRole'
    ),
  ],
})

glueRole.addToPolicy(
  new aws_iam.PolicyStatement({
    actions: ['s3:GetObject'],
    resources: [logBucket.bucketArn, `${logBucket.bucketArn}/*`],
  })
)

Lambda

Transform Lambdaを作ります。

typescript で作っているので NodejsFunction を使います

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
const transformLambda = new NodejsFunction(this, 'TransformLambda', {
  functionName: `${PREFIX}-firehose-transform-function`,
  entry: 'lambda/firehose-transformer/cloudwatchlogs-transformer.ts',
  runtime: aws_lambda.Runtime.NODEJS_16_X,
  timeout: Duration.seconds(300),
  architecture: Architecture.ARM_64,
  role: lambdaRole,
  memorySize: 256,
  logRetention: RetentionDays.ONE_WEEK,
  tracing: aws_lambda.Tracing.ACTIVE,
})

Firehose

Firehoseを作ります。

ポイントはhighlightした部分です。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
const dynamicPartitions: string[] = [
  'year=!{partitionKeyFromLambda:year}',
  'month=!{partitionKeyFromLambda:month}',
  'day=!{partitionKeyFromLambda:day}',
  'hour=!{partitionKeyFromLambda:hour}',
]

const deliveryStream = new aws_kinesisfirehose.CfnDeliveryStream(
  this,
  'DeliveryStream',
  {
    deliveryStreamName: `${PREFIX}-cloudwatchlogs-delivery-stream`,
    deliveryStreamType: 'DirectPut',
    extendedS3DestinationConfiguration: {
      bucketArn: bucket.bucketArn,
      roleArn: deliveryStreamRole.roleArn,
      cloudWatchLoggingOptions: {
        enabled: true,
        logGroupName: `${PREFIX}-CloudWatchLogs-DeliveryStream`,
        logStreamName: `${PREFIX}-CloudWatchLogs-DeliveryStream-logstream`,
      },
      bufferingHints: {
        intervalInSeconds: 60,
        sizeInMBs: 64, // at least 64MB when dynamic partitioning enabled
      },

      // configuration for transform by lambda
      processingConfiguration: {
        enabled: true,
        processors: [
          {
            parameters: [
              {
                parameterName: 'LambdaArn',
                parameterValue: transformLambda.functionArn,
              },
            ],
            type: 'Lambda',
          },
        ],
      },

      // configuration for dynamic partition
      prefix: `cloudwatchlogs/${dynamicPartitions.join('/')}/`,
      errorOutputPrefix: `cloudwatchlogs-error/!{firehose:error-output-type}`,
      dynamicPartitioningConfiguration: {
        enabled: true,
      },

      // compress output
      compressionFormat: 'GZIP',
    },
  }
)
  • s3のprefixとしてdynamic partitioningを利用します。
    • !{partitionKeyFromLambda:xxxxx} と指定することでLambdaで出力した metadata.partitionKeys の値を利用できます
    • year=xxxx/month=xx のように指定するとそのままS3のprefixになります
      • prefixに key=value 形式で設定しておくと後でGlueがcrawlした時、partition nameとして自動的に year などをschemaに設定してくれます
    • dynamicPartitioningConfiguration をonにしておく必要があります
  • compressionFormatGZIP を指定するとS3に出力されるS3オブジェクト名に拡張子 .gz が付与されます。

Glue

GlueのDatabaseとCrawlerを作ります。

ここではCrawlerは1日1回実行にしていますが用途に合わせて変更しておくとよいと思います。

Crawlerで引っ掛けておかないとS3にobjectが出力されていてもAthenaの検索に当たりません。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
const database = new aws_glue.CfnDatabase(this, 'GlueDatabase', {
  catalogId: this.account,
  databaseInput: {
    name: `${PREFIX}-cwl-export-database`,
  },
})

new aws_glue.CfnCrawler(this, 'GlueCrawler', {
  role: glueRole.roleArn,
  name: `${PREFIX}-cwl-export-glue-crawler`,
  databaseName: database.ref,
  targets: {
    s3Targets: [
      {
        path: `s3://${logBucket.bucketName}/`,
      },
    ],
  },
  tablePrefix: 'cwl_export_',
  recrawlPolicy: { recrawlBehavior: 'CRAWL_NEW_FOLDERS_ONLY' },
  schedule: { scheduleExpression: 'cron(0 15 * * ? *)' }, // every 24:00(JST)
})

Subscription filter

最後に出力対象とするLogGroupにSubscription Filterを貼ります。

ここではManagementConsoleから作ってみます。

CloudWatch -> Logs -> Log groups -> 対象とするLog group -> Subscription filters -> Create Kinesis Firehose subscription filter を選択

  • Kinesis Firehose delivery stream: 作成したfirehoseを指定
  • Select an existing role: 作成したsubscription甩のIAM roleを指定
  • Subscription filter name: 好きな名前

あとはデフォルトのままでOKです。

結果確認

ここまでの手順で作成した後、CloudWatchLogsにログを出力したタイミングで自動的にS3にファイルが出力されると思います。

picture 0

また、Glueを開くとデータカタログでS3に出力したファイルのスキーマを見ることができます。

まだCrawlerが実行されていない場合、Glueから手動でCrawlerを実行してみてください

picture 1

ここまでくれば後はAthenaを開いて、データカタログを指定してクエリするだけです。

picture 2