TL;DR
- CloudWatchLogsの内容を複数のLogGroupに跨って分析
- ログの日時を絞り込んで検索できるように(日時はJSTで)
- 検索対象とするLogGroupは任意に指定できるように
- なるべく低コストで。なるべく運用負荷がかからないように。
経緯
LambdaやFargateからのログはCloudWatchLogsにそれぞれLogGroup/LogStreamが分かれた形で出力されます。
serverless構成が多くなったりすると、これらを時系列で横断的にログ確認したくなるのですが
CloudWatchでクエリーするのは面倒だったりレスポンスが遅かったり。
スタンダードな構成としてはCloudWatchLogsからOpenSearchService(ES)に連携して
Kibanaとかでログ分析する形になると思いますが、これだとデータ保持やインスタンス時間課金が気になるので
今回は除外しました。
もっと簡単に日時範囲だけを指定してサッと横断的に生ログを確認する方法がほしくなったのでやってみた。
今回作った最終アウトプット
先に結果から。
以下のように Athena
で日時を指定して生ログを検索することができます

なるべく手軽にログ検索できるように以下のことができるようになっています
- 複数のLogGroup/LogStreamを横断的に検索
- LogGroup/LogStreamの内容をS3にまとめて検索
- どんなフォーマットのログでも対応
- 生ログをそのまま出力
- それぞれのアプリケーションのログフォーマットが統一されていなくてもOK
- ただの文字列/JSON文字列/CSV/TSVなど混在していてもOK
- S3に出力する対象のLogGroupを指定できる
- 必要なLogGroupだけS3に出力
- 後から追加・削除も簡単
- 年月日時分秒で絞り込み
- ログ発生の秒範囲まで指定して検索できる
- UTCでなくJSTで検索できる
- ほぼリアルタイムで反映
- バッチでなくstreamで処理することでログ内容をすぐに反映
- なるべく低コスト
- S3を年月日時でパーティションすることでスキャン範囲を効率化
- CloudWatchLogsに貯めるよりS3のほうが安価
- さらに古いログはLifeCycleでGlacier行き
- CloudWatchLogsの保持期間を短くできる
- serverlessで組むことで時間課金を極力下げる
アーキテクチャ

方式検討
まずCloudWatchLogsの内容をS3に出力する部分です。
CloudWatchLogsからS3に出力する方法はいくつか考えられます。
- CloudWatchLogsのS3ログエクスポート機能を使う
- CloudWatchLogsのsubscription filter + LambdaでS3に出力する
- CloudWatchLogsのsubscription filter + Kinesis DataStreams + LambdaでS3に出力する
- CloudWatchLogsのsubscription filter + Kinesis Firehose + LambdaでS3に出力する
CloudWatchLogsのS3ログエクスポート機能は非常に簡単で、実行するだけで出力してくれるのですが、
S3に出力されるログがスペース区切りの文字列形式で、かつtimestampがUTCだったりするため
分析するにはS3出力後に二次加工が必要になりそうです。
それ以外の方法だと基本、どこかでLambdaを使って整形する必要がありそうですが、
どのみちUTC→JST変換などカスタム処理を入れたいので避けられないかと。
ということで今回は 4.CloudWatchLogsのsubscription filter + Kinesis Firehose + LambdaでS3に出力する
を採用しました
またS3に出力したログを検索・分析するには Athena
を使うと手っ取り早いです。
Athena
ではデータのスキーマ(型)が必要になります。
これは Athena
に create table
で自分で作ることもできますが
Glue
のCrawlerを使うことで自動化することが可能です。
こんな感じで作成していきます。
作成前に知っておくといいこと
CloudWatchLogs -> subscription filter -> Kinesis Firehose(+ Lambda) -> S3 を実装するうえで
色々とハマりポイントがあるのでまとめておきます。
FirehoseでLambda整形は必須
CloudWatchLogsからFirehoseに連携してAthenaで検索したい場合、
今回のUTC -> JST変換みたいな固有の要件がない場合でも
FirehoseでのTransform Lambdaが必須になります。
理由は以下のとおり
- Athenaは1行に1ログレコードの形式じゃないとダメ
- CloudWatchLogsからのログはそのままだと改行コードつかない
つまり今の流れでAthenaで分析できるようにするには
最低でもLambdaを使ってレコード毎の改行をつける必要があります。
Kinesis Firehoseのインプットはbase64 + gzipされてる
Firehoseから呼び出されるLambdaでの考慮事項です。
ログレコードを取得するにはLambda内でbase64デコードとgunzipしてあげる必要があります
Firehoseは出力を自動でgzipしてくれない

あたかもgzipしてくれそうな設定がありますが、これを GZIP
にして
S3に出力されたファイルを確認したら中身は圧縮されていませんでした。
出力されたS3 Objectのファイル名につく拡張子は .gz
ってなってるけど・・・
仕方がないのでLambdaで出力時にgzipして対応しました
※上記はFirehoseでLambda変換使わない時はgzipしてくれるかもしれません(未検証)
FirehoseのtimestampはUTC
FirehoseではS3に出力する際にprefixを指定する機能があります。
また、以下のようにFirehoseに連携された時点の timestamp
を使って年月日などにすることが可能になっています。
s3://bucket-name/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
これなら簡単にS3をpartitioningできる・・・と思いましたがタイムゾーンがUTCなのです。
そうなるとAthenaで検索する時にJSTとの差を考慮しなくてはならずめんどくさい・・・
これに対応するにはLambdaでJSTにしたものを使ってdynamic partitioningをしてあげる必要があります。
具体的には
- Firehoseから呼ばれるLambdaで
metadata.partitionKeys
を一緒に返却するようにする - Firehoseの設定でS3 prefixに上記設定したkeyを使って設定する
ということになります。
作成手順と解説
Firehoseでの肝になります。
まず全量です。
ポイントを解説していきます。
Firehoseのtransformとして使うLambdaは FirehoseTransformatonHandler
を型にするとよいです。
1
2
3
4
5
| import { FirehoseTransformationHandler } from 'aws-lambda'
export const handler: FirehoseTransformationHandler = async event => {
return { records: xxxxx }
}
|
引数 event
は以下のようになっています
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| export interface FirehoseTransformationEvent {
invocationId: string
deliveryStreamArn: string
sourceKinesisStreamArn?: string | undefined
region: string
records: FirehoseTransformationEventRecord[]
}
export interface FirehoseTransformationEventRecord {
recordId: string
approximateArrivalTimestamp: number
/** Base64 encoded */
data: string
kinesisRecordMetadata?: FirehoseRecordMetadata | undefined
}
|
この records->data
の中にログデータが入っていますが、records->data
は
- 複数件のログが入っている
- base64 + gzipで圧縮されている
ので注意が必要です。
返り値 records
は以下の型の配列になっています。
1
2
3
4
5
6
7
| export interface FirehoseTransformationResultRecord {
recordId: string
result: FirehoseRecordTransformationStatus
/** Encode in Base64 */
data: string
metadata?: FirehoseTransformationMetadata
}
|
recordId
: handlerの引数 event.records->recordId
を設定します。result
: Ok
Dropped
ProcessingFailed
のいずれかの値を設定します。data
: 出力データを設定します。base64
でencodeしたものを設定します。metadata
: dynamic partitioningで利用する partitionKeys
を設定します(後述)
Firehoseからのログデータをbase64 decodeしてgunzipします。
解凍したデータは JSON.parse
することで CloudWatchLogsDecodedData
型として扱うことができます。
1
2
3
4
| const rawLogs = zlib
.gunzipSync(Buffer.from(record.data, 'base64'))
.toString('utf-8')
const logRecord = JSON.parse(rawLogs) as CloudWatchLogsDecodedData
|
フォーマット変換の本体部分です。
filter
しているのは不要なログを除外したいためです。
Lambdaでこのように判定してもOKですが、
CloudWatchLogsのsubscription filterのfilter条件でそもそも除外してもよいでしょう
datetime
をログデータとして埋め込んでいます。ここでJSTでformatしています。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| const records = logRecord.logEvents
.filter(
e =>
// drop lambda control message
!e.message.startsWith('START RequestId:') &&
!e.message.startsWith('REPORT RequestId:')
)
.map(e =>
JSON.stringify({
timestamp: e.timestamp,
datetime: formatTZ(new Date(e.timestamp), 'yyyyMMddHHmmss'),
message: e.message,
})
)
|
フォーマット変換したログデータをbase64 + gzip圧縮します。
1
2
3
4
5
| const data = GZIP_COMPRESS
? zlib
.gzipSync(Buffer.from(`${records.join(`\n`)}\n`, 'utf8'))
.toString('base64')
: Buffer.from(`${records.join(`\n`)}\n`, 'utf8').toString('base64')
|
ここで ログデータとともに、metadata.partitionKeys
に年月日などを設定して返却しています。
ここで値を設定しておくと、Firehose側でs3に出力する際のprefixとして利用できます。
1
2
3
4
5
6
7
8
9
10
11
12
13
| return {
result: 'Ok',
recordId: record.recordId,
data,
metadata: {
partitionKeys: {
year: formatTZ(date, 'yyyy'),
month: formatTZ(date, 'MM'),
day: formatTZ(date, 'dd'),
hour: formatTZ(date, 'HH'),
},
},
}
|
AWS Resources
CDKで作成していきます。ポイントだけ解説します。
S3 Bucket
まずS3Bucketを作ります。lifecycleで古くなったログはglacierに置くようにしていますが
このあたりは好きに設定してOKです。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| const s3Bucket = new aws_s3.Bucket(this, 'LogBucket', {
bucketName: `${PREFIX}-logs`,
objectOwnership: aws_s3.ObjectOwnership.BUCKET_OWNER_ENFORCED,
blockPublicAccess: aws_s3.BlockPublicAccess.BLOCK_ALL,
removalPolicy: RemovalPolicy.RETAIN,
autoDeleteObjects: false,
versioned: false,
publicReadAccess: false,
encryption: aws_s3.BucketEncryption.S3_MANAGED,
lifecycleRules: [
{
abortIncompleteMultipartUploadAfter: Duration.days(30),
transitions: [
{
storageClass: aws_s3.StorageClass.INFREQUENT_ACCESS,
transitionAfter: Duration.days(30),
},
{
storageClass: aws_s3.StorageClass.GLACIER,
transitionAfter: Duration.days(730),
},
],
},
],
})
|
IAM Roles
Transform Lambdaの実行ロールを作ります。
service-role/AWSLambdaKinesisExecutionRole
をつけてあげる必要があります。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| const lambdaRole = new aws_iam.Role(this, 'LambdaRole', {
roleName: `${PREFIX}-firehose-transform-role`,
assumedBy: new aws_iam.ServicePrincipal('lambda.amazonaws.com'),
path: '/',
managedPolicies: [
aws_iam.ManagedPolicy.fromAwsManagedPolicyName(
'service-role/AWSLambdaBasicExecutionRole'
),
aws_iam.ManagedPolicy.fromAwsManagedPolicyName('AWSXRayDaemonWriteAccess'),
aws_iam.ManagedPolicy.fromAwsManagedPolicyName(
'service-role/AWSLambdaKinesisExecutionRole'
),
],
})
|
Firehose甩の実行ロールを作ります。
主に S3
Kinesis
Lambda
CloudWatchLogs
あたりのアクセス権が必要になります。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
| const deliveryStreamRole = new aws_iam.Role(this, 'DeliveryStreamRole', {
roleName: `${PREFIX}-firehose-delivery-role`,
assumedBy: new aws_iam.ServicePrincipal(`firehose.amazonaws.com`),
inlinePolicies: {
policy: new aws_iam.PolicyDocument({
statements: [
new aws_iam.PolicyStatement({
effect: aws_iam.Effect.ALLOW,
actions: [
's3:AbortMultipartUpload',
's3:GetBucketLocation',
's3:GetObject',
's3:ListBucket',
's3:ListBucketMultipartUploads',
's3:PutObject',
],
resources: [bucket.bucketArn, `${bucket.bucketArn}/*`],
}),
new aws_iam.PolicyStatement({
effect: aws_iam.Effect.ALLOW,
actions: [
'kinesis:DescribeStream',
'kinesis:GetShardIterator',
'kinesis:GetRecords',
'kinesis:ListShards',
],
resources: ['*'],
}),
new aws_iam.PolicyStatement({
effect: aws_iam.Effect.ALLOW,
actions: ['lambda:InvokeFunction', 'lambda:GetFunctionConfiguration'],
resources: ['*'],
}),
new aws_iam.PolicyStatement({
effect: aws_iam.Effect.ALLOW,
actions: ['lambda:InvokeFunction', 'lambda:GetFunctionConfiguration'],
resources: ['*'],
}),
new aws_iam.PolicyStatement({
effect: aws_iam.Effect.ALLOW,
actions: [
'logs:GetLogEvents',
'logs:PutLogEvents',
'logs:CreateLogStream',
'logs:DescribeLogStreams',
'logs:CreateLogGroup',
],
resources: ['*'],
}),
],
}),
},
})
|
subscription filterに付与する実行ロールを作っておきます。
これは後でLogGroupにsubscription filterをつける時に必要になります。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| const subscriptionFilterRole = new aws_iam.Role(
this,
'SubscriptionFilterRole',
{
roleName: `${PREFIX}-cloudwatchlogs-subscription-role`,
assumedBy: new aws_iam.ServicePrincipal(`logs.amazonaws.com`),
inlinePolicies: {
policy: new aws_iam.PolicyDocument({
statements: [
new aws_iam.PolicyStatement({
effect: aws_iam.Effect.ALLOW,
actions: ['firehose:*'],
resources: [
`arn:aws:firehose:${this.region}:${this.account}:deliverystream/${PREFIX}*`,
],
}),
],
}),
},
}
)
|
Glueの実行ロールを作っておきます。
crawl対象(ログを出力しておくS3 bucket)に対して GetObject
できるようにアクセス権を追加しています。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| const glueRole = new aws_iam.Role(this, 'GlueRole', {
roleName: `${PREFIX}-cwl-export-glue-role`,
assumedBy: new aws_iam.ServicePrincipal('glue.amazonaws.com'),
path: '/',
managedPolicies: [
aws_iam.ManagedPolicy.fromAwsManagedPolicyName(
'service-role/AWSGlueServiceRole'
),
],
})
glueRole.addToPolicy(
new aws_iam.PolicyStatement({
actions: ['s3:GetObject'],
resources: [logBucket.bucketArn, `${logBucket.bucketArn}/*`],
})
)
|
Lambda
Transform Lambdaを作ります。
typescript
で作っているので NodejsFunction
を使います
1
2
3
4
5
6
7
8
9
10
11
| const transformLambda = new NodejsFunction(this, 'TransformLambda', {
functionName: `${PREFIX}-firehose-transform-function`,
entry: 'lambda/firehose-transformer/cloudwatchlogs-transformer.ts',
runtime: aws_lambda.Runtime.NODEJS_16_X,
timeout: Duration.seconds(300),
architecture: Architecture.ARM_64,
role: lambdaRole,
memorySize: 256,
logRetention: RetentionDays.ONE_WEEK,
tracing: aws_lambda.Tracing.ACTIVE,
})
|
Firehose
Firehoseを作ります。
ポイントはhighlightした部分です。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
| const dynamicPartitions: string[] = [
'year=!{partitionKeyFromLambda:year}',
'month=!{partitionKeyFromLambda:month}',
'day=!{partitionKeyFromLambda:day}',
'hour=!{partitionKeyFromLambda:hour}',
]
const deliveryStream = new aws_kinesisfirehose.CfnDeliveryStream(
this,
'DeliveryStream',
{
deliveryStreamName: `${PREFIX}-cloudwatchlogs-delivery-stream`,
deliveryStreamType: 'DirectPut',
extendedS3DestinationConfiguration: {
bucketArn: bucket.bucketArn,
roleArn: deliveryStreamRole.roleArn,
cloudWatchLoggingOptions: {
enabled: true,
logGroupName: `${PREFIX}-CloudWatchLogs-DeliveryStream`,
logStreamName: `${PREFIX}-CloudWatchLogs-DeliveryStream-logstream`,
},
bufferingHints: {
intervalInSeconds: 60,
sizeInMBs: 64, // at least 64MB when dynamic partitioning enabled
},
// configuration for transform by lambda
processingConfiguration: {
enabled: true,
processors: [
{
parameters: [
{
parameterName: 'LambdaArn',
parameterValue: transformLambda.functionArn,
},
],
type: 'Lambda',
},
],
},
// configuration for dynamic partition
prefix: `cloudwatchlogs/${dynamicPartitions.join('/')}/`,
errorOutputPrefix: `cloudwatchlogs-error/!{firehose:error-output-type}`,
dynamicPartitioningConfiguration: {
enabled: true,
},
// compress output
compressionFormat: 'GZIP',
},
}
)
|
- s3のprefixとしてdynamic partitioningを利用します。
!{partitionKeyFromLambda:xxxxx}
と指定することでLambdaで出力した metadata.partitionKeys
の値を利用できますyear=xxxx/month=xx
のように指定するとそのままS3のprefixになります- prefixに
key=value
形式で設定しておくと後でGlueがcrawlした時、partition nameとして自動的に year
などをschemaに設定してくれます
dynamicPartitioningConfiguration
をonにしておく必要があります
compressionFormat
に GZIP
を指定するとS3に出力されるS3オブジェクト名に拡張子 .gz
が付与されます。
Glue
GlueのDatabaseとCrawlerを作ります。
ここではCrawlerは1日1回実行にしていますが用途に合わせて変更しておくとよいと思います。
Crawlerで引っ掛けておかないとS3にobjectが出力されていてもAthenaの検索に当たりません。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| const database = new aws_glue.CfnDatabase(this, 'GlueDatabase', {
catalogId: this.account,
databaseInput: {
name: `${PREFIX}-cwl-export-database`,
},
})
new aws_glue.CfnCrawler(this, 'GlueCrawler', {
role: glueRole.roleArn,
name: `${PREFIX}-cwl-export-glue-crawler`,
databaseName: database.ref,
targets: {
s3Targets: [
{
path: `s3://${logBucket.bucketName}/`,
},
],
},
tablePrefix: 'cwl_export_',
recrawlPolicy: { recrawlBehavior: 'CRAWL_NEW_FOLDERS_ONLY' },
schedule: { scheduleExpression: 'cron(0 15 * * ? *)' }, // every 24:00(JST)
})
|
Subscription filter
最後に出力対象とするLogGroupにSubscription Filterを貼ります。
ここではManagementConsoleから作ってみます。
CloudWatch -> Logs -> Log groups -> 対象とするLog group -> Subscription filters -> Create Kinesis Firehose subscription filter を選択
- Kinesis Firehose delivery stream: 作成したfirehoseを指定
- Select an existing role: 作成したsubscription甩のIAM roleを指定
- Subscription filter name: 好きな名前
あとはデフォルトのままでOKです。
結果確認
ここまでの手順で作成した後、CloudWatchLogsにログを出力したタイミングで自動的にS3にファイルが出力されると思います。

また、Glueを開くとデータカタログでS3に出力したファイルのスキーマを見ることができます。
まだCrawlerが実行されていない場合、Glueから手動でCrawlerを実行してみてください

ここまでくれば後はAthenaを開いて、データカタログを指定してクエリするだけです。
