このドキュメントでは、ログ分析を使用するようにアップグレードされたログバケットに保存されているログエントリに対するサンプルクエリについて説明します。これらのバケットでは、Google Cloud コンソールの [Log Analytics] ページから SQL クエリを実行できます。その他のサンプルについては、logging-analytics-samples
と security-analytics
の GitHub リポジトリをご覧ください。
このドキュメントでは、SQL やログエントリの転送と保存の方法については説明しません。これらのトピックについては、次のステップ セクションをご覧ください。
準備
[Log Analytics] ページで、このドキュメントに示すクエリを使用するには、TABLE を、クエリを実行するビューに対応するテーブルの名前に置き換えます。テーブル名の形式は
project_ID.region.bucket_ID.view_ID
です。ビューのテーブル名は [Log Analytics] ページで確認できます。ログビューのデフォルトのクエリでは、FROM
ステートメントのテーブル名が一覧表示されます。デフォルト クエリにアクセスする方法については、ログビューをクエリするをご覧ください。このドキュメントに示すクエリを [BigQuery Studio] ページで使用するには、TABLE をテーブルへのパスに置き換えます。たとえば、プロジェクト
myproject
にあるリンクされたデータセットmydataset
でビュー_AllLogs
をクエリするには、このフィールドをmyproject.mydataset._AllLogs
に設定します。Google Cloud コンソールで [BigQuery] ページに移動します。
このページは、検索バーを使用して見つけることもできます。
[Log Analytics] ページを開くには、次の手順を行います。
-
Google Cloud コンソールで、[ログ分析] ページに移動します。
検索バーを使用してこのページを検索する場合は、小見出しが [Logging] である結果を選択します。
省略可: ログビューのテーブル スキーマを特定するには、[ログビュー] リストでビューを見つけて、ビューの名前を選択します。
テーブルのスキーマが表示されます。[フィルタ] フィールドを使用して、特定のフィールドを見つけることができます。スキーマは変更できません。
-
ログをフィルタする
SQL クエリは、処理するテーブルの行を決定してから、その行をグループ化し、集計オペレーションを実行します。グループ化オペレーションと集計オペレーションが記載されていない場合、クエリの結果には、フィルタ オペレーションによって選択された行が含まれます。このセクションのサンプルでは、フィルタについて説明します。
時間でフィルタする
クエリの期間を設定するには、期間セレクタを使用することをおすすめします。このセレクタは、クエリで WHERE
句に timestamp
フィールドが指定されていない場合に自動的に使用されます。
たとえば、過去 1 週間のデータを表示するには、期間セレクタで [過去 7 日間] を選択します。また、期間セレクタを使用して、開始時刻と終了時刻の指定、表示時間の指定、タイムゾーンの変更を行うこともできます。
WHERE
句に timestamp
フィールドを含めると、期間セレクタの設定は使用されません。次の例では、TIMESTAMP_SUB
関数を使用してデータをフィルタリングしています。これにより、現在の時刻から遡る間隔を指定できます。
WHERE
timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
時間でフィルタする方法の詳細については、時間関数とタイムスタンプ関数をご覧ください。
リソースでフィルタする
リソースでフィルタリングするには、resource.type
の制限を追加します。
たとえば、次のクエリは、直近 1 時間のデータを読み取り、リソースタイプが gce_instance
と一致する行を保持してから、最大 100 個のエントリを並べ替えて表示します。
SELECT
timestamp, log_name, severity, json_payload, resource, labels
FROM
`TABLE`
WHERE
resource.type = "gce_instance"
ORDER BY timestamp ASC
LIMIT 100
重大度でフィルタする
severity = 'ERROR'
などの制限を使用して、特定の重大度でフィルタできます。別の方法は、IN
ステートメントを使用して、有効な値のセットを指定することです。
たとえば、次のクエリは、直近 1 時間のデータを読み取り、値が 'INFO'
または 'ERROR'
の severity
フィールドを含む行のみを保持します。
SELECT
timestamp, log_name, severity, json_payload, resource, labels
FROM
`TABLE`
WHERE
severity IS NOT NULL AND
severity IN ('INFO', 'ERROR')
ORDER BY timestamp ASC
LIMIT 100
上記のクエリは、severity
フィールドの値でフィルタします。ログの重大度の数値でフィルタするクエリを作成することもできます。たとえば、severity
行を次の行に置き換えると、重大度が NOTICE
以上のログエントリすべてが返されます。
severity_number IS NOT NULL AND
severity_number > 200
列挙される値の詳細については、LogSeverity
をご覧ください。
ログ名でフィルタする
ログ名でフィルタするには、log_name
フィールドか log_id
フィールドの値に制限を追加します。log_name
フィールドには、リソースパスが入ります。つまり、このフィールドは、projects/myproject/logs/mylog
のような値になります。log_id
フィールドには、mylog
などのログ名のみが保存されます。
たとえば、次のクエリでは、直近 1 時間のデータを読み取り、log_id
フィールドの値が cloudaudit.googleapis.com/data_access
の行を保持してから、結果を並べ替えて表示します。
SELECT
timestamp, log_id, severity, json_payload, resource, labels
FROM
`TABLE`
WHERE
log_id = "cloudaudit.googleapis.com/data_access"
ORDER BY timestamp ASC
LIMIT 100
リソースラベルでフィルタする
モニタリング対象リソース記述子のほとんどは、特定のリソースの識別に使用されるラベルを定義します。たとえば、Compute Engine インスタンスの記述子には、ゾーン、プロジェクト ID、インスタンス ID のラベルがあります。ログエントリが書き込まれると、各フィールドに値が割り当てられます。そのような例を次に示します。
{
type: "gce_instance"
labels: {
instance_id: "1234512345123451"
project_id: "my-project"
zone: "us-central1-f"
}
}
labels
フィールドのデータ型は JSON であるため、クエリに resource.labels.zone = "us-centra1-f"
などの制限を含めると構文エラーになります。データ型が JSON のフィールド値を取得するには、関数 JSON_VALUE
を使用します。
たとえば、次のクエリは、最新のデータを読み取り、リソースが us-central1-f
ゾーンにある Compute Engine インスタンスの行を保持します。
SELECT
timestamp, log_name, severity, JSON_VALUE(resource.labels.zone) AS zone, json_payload, resource, labels
FROM
`TABLE`
WHERE
resource.type = "gce_instance" AND
JSON_VALUE(resource.labels.zone) = "us-central1-f"
ORDER BY timestamp ASC
LIMIT 100
JSON データを取得して変換できるすべての関数については、JSON 関数をご覧ください。
HTTP リクエストでフィルタする
HTTP リクエストや応答に対応する行のみが含まれるようにテーブルをフィルタするには、http_request IS NOT NULL
制限を追加します。
SELECT
timestamp, log_name, severity, http_request, resource, labels
FROM
`TABLE`
WHERE
http_request IS NOT NULL
ORDER BY timestamp
LIMIT 100
次のクエリには、GET
や POST
リクエストに対応する行のみが含まれています。
SELECT
timestamp, log_name, severity, http_request, resource, labels
FROM
`TABLE`
WHERE
http_request IS NOT NULL AND
http_request.request_method IN ('GET', 'POST')
ORDER BY timestamp ASC
LIMIT 100
HTTP ステータスでフィルタする
HTTP ステータスでフィルタリングするには、http_request.status
フィールドを定義するように WHERE
句を変更します。
SELECT
timestamp, log_name, http_request.status, http_request, resource, labels
FROM
`TABLE`
WHERE
http_request IS NOT NULL AND
http_request.status IS NOT NULL
ORDER BY timestamp ASC
LIMIT 100
フィールドに格納されているデータの種類を見極めるには、スキーマを確認するか、フィールドを表示します。上記のクエリの結果は、http_request.status
フィールドに整数値が格納されていることを示しています。
JSON 型のフィールドでフィルタする
データ型が JSON の列から値を抽出するには、関数 JSON_VALUE
を使用します。
次のクエリについて考えてみましょう。
SELECT
json_payload
FROM
`TABLE`
WHERE
json_payload.status IS NOT NULL
と
SELECT
json_payload
FROM
`TABLE`
WHERE
JSON_VALUE(json_payload.status) IS NOT NULL
上述のクエリは、列 json_payload
の値をテストします。この列の内容は、ログエントリの内容によって決まります。どちらのクエリでも、json_payload
というラベルの付いた列がない行は破棄されます。
これら 2 つのクエリは、NULL
に対してテストされる内容を定義する最後の行に違いがあります。2 行あるテーブルについて考えてみましょう。一方の行では、json_payload
列が次の形式になっています。
{
status: {
measureTime: "1661517845"
}
}
もう一方の行では、json_payload
列が異なる構造になっています。
{
@type: "type.googleapis.com/google.cloud.scheduler.logging.AttemptFinished"
jobName: "projects/my-project/locations/us-central1/jobs/test1"
relativeUrl: "/food=cake"
status: "NOT_FOUND"
targetType: "APP_ENGINE_HTTP"
}
上記の行はどちらも制限 json_payload.status IS NOT NULL
を満たしています。つまり、クエリの結果には、両方の行が含まれます。ただし、制限が JSON_VALUE(json_payload.status) IS NOT NULL
の場合、結果には 2 番目の行のみが含まれます。
正規表現でフィルタする
正規表現と一致する部分文字列を返すには、関数 REGEXP_EXTRACT
を使用します。この関数の戻り値の型は、STRING
か BYTES
のいずれかです。
次のクエリは、受信した最新のログエントリを表示し、それらのエントリを json_payload.jobName
フィールドで保持して、test
で始まる名前の一部分を表示しています。
SELECT
timestamp, REGEXP_EXTRACT(JSON_VALUE(json_payload.jobName), r".*(test.*)$") AS name,
FROM
`TABLE`
WHERE
json_payload.jobName IS NOT NULL
ORDER BY timestamp DESC
LIMIT 20
他の例については、REGEXP_EXTRACT
のドキュメントをご覧ください。使用できる他の正規表現の例については、関数、演算子、条件をご覧ください。
この例に示すクエリは、効率的ではありません。図に示すような部分文字列の一致の場合は、CONTAINS_SUBSTR
関数を使用します。
ログエントリをグループ化して集計する
このセクションは、前述のサンプルに基づき、テーブルの行をグループ化して集計する方法について説明します。グループ化を指定せずに集計を指定すると、SQL は WHERE
句を満たすすべての行を 1 つのグループとして扱うため、1 つの結果が出力されます。
すべての SELECT
式はグループ フィールドに含めるか、集計する必要があります。
時間ごとにグループ化する
データを時間でグループ化するには、関数 TIMESTAMP_TRUNC
を使用します。この関数は、MINUTE
のような指定した粒度までタイムスタンプを切り詰めます。たとえば、粒度が MINUTE
に設定されている場合、15:30:11
のタイムスタンプ(hours:minutes:seconds
形式)は、15:30:00
になります。
次のクエリは、期間選択ツールで指定された間隔で受信したデータを読み取り、json_payload.status
フィールドの値が NULL ではない行を保持します。
クエリは、1 時間ごとの各行のタイムスタンプを切り詰め、切り詰められたタイムスタンプとステータスで行をグループ化します。
SELECT
TIMESTAMP_TRUNC(timestamp, HOUR) AS hour,
JSON_VALUE(json_payload.status) AS status,
COUNT(*) AS count
FROM
`TABLE`
WHERE
json_payload IS NOT NULL AND
JSON_VALUE(json_payload.status) IS NOT NULL
GROUP BY hour,status
ORDER BY hour ASC
他の例については、TIMESTAMP_TRUNC
のドキュメントをご覧ください。他の時間ベースの関数については、日時関数をご覧ください。
リソースごとにグループ化する
次のクエリは、直近 1 時間のデータを読み取り、リソースタイプ別に行をグループ化します。次に、各タイプの行数をカウントし、2 つの列を持つテーブルを返します。最初の列にはリソースタイプを列挙し、2 番目の列にはそのリソースタイプの行数が入ります。
SELECT
resource.type, COUNT(*) AS count
FROM
`TABLE`
GROUP BY resource.type
LIMIT 100
重大度でグループ化する
次のクエリは、直近 1 時間のデータを読み取り、重大度フィールドを持つ行を保持します。次に、このクエリは、重大度で行をグループ化し、各グループの行数をカウントします。
SELECT
severity, COUNT(*) AS count
FROM
`TABLE`
WHERE
severity IS NOT NULL
GROUP BY severity
ORDER BY severity
LIMIT 100
log_id
でグループ化
次のクエリ結果は 2 つの列を持つテーブルです。最初の列にはログ名が表示され、2 番目の列にはそのログに書き込まれたログエントリの数が表示されます。このクエリは、エントリ数で結果を並べ替えます。
SELECT
log_id, COUNT(*) AS count
FROM
`TABLE`
GROUP BY log_id
ORDER BY count DESC
LIMIT 100
HTTP リクエストの平均レイテンシを計算する
次のクエリでは、複数の列でのグループ化と平均値の計算を示します。クエリは、HTTP リクエストに含まれる URL と、labels.checker_location
フィールドの値で行をグループ化します。行をグループ化した後、クエリは各グループの平均レイテンシを計算します。
SELECT
JSON_VALUE(labels.checker_location) AS location,
AVG(http_request.latency.seconds) AS secs, http_request.request_url
FROM
`TABLE`
WHERE
http_request IS NOT NULL AND
http_request.request_method IN ('GET')
GROUP BY http_request.request_url, location
ORDER BY location
LIMIT 100
上記の式では、labels
のデータ型が JSON であるため、labels.checker_location
フィールドの値を抽出するために JSON_VALUE
が必要です。ただし、http_request.latency.seconds
フィールドからの値の抽出には、この関数を使用しません。後者のフィールドのデータ型は整数です。
サブネットワーク テストの平均送信バイト数を計算する
次のクエリは、場所ごとに送信された平均バイト数を表示する方法を示しています。
このクエリは、直近 1 時間のデータを読み取り、リソースタイプの列が gce_subnetwork
で、json_payload
列が NULL ではない行のみを保持します。次に、このクエリによって、リソースのロケーション別に行がグループ化されます。データが数値として保存される前述の例とは異なり、bytes_sent
フィールドの値は文字列であるため、平均を計算する前に値を FLOAT64
に変換する必要があります。
SELECT JSON_VALUE(resource.labels.location) AS location,
AVG(CAST(JSON_VALUE(json_payload.bytes_sent) AS FLOAT64)) AS bytes
FROM
`TABLE`
WHERE
resource.type = "gce_subnetwork" AND
json_payload IS NOT NULL
GROUP BY location
LIMIT 100
上記のクエリの結果は、各行にロケーションと、そのロケーションに送信された平均バイト数が記載されたテーブルになります。
JSON データを取得して変換できるすべての関数については、JSON 関数をご覧ください。
CAST
などの変換関数の詳細については、変換関数をご覧ください。
パターンに一致するフィールドがあるログエントリをカウントする
正規表現と一致する部分文字列を返すには、関数 REGEXP_EXTRACT
を使用します。この関数の戻り値の型は、STRING
か BYTES
のいずれかです。
次のクエリは、json_payload.jobName
フィールドの値が NULL ではないログエントリを保持します。
次に、test
で始まる名前の接尾辞でエントリをグループ化します。最後に、クエリは各グループのエントリ数をカウントします。
SELECT
REGEXP_EXTRACT(JSON_VALUE(json_payload.jobName), r".*(test.*)$") AS name,
COUNT(*) AS count
FROM
`TABLE`
WHERE
json_payload.jobName IS NOT NULL
GROUP BY name
ORDER BY count
LIMIT 20
他の例については、REGEXP_EXTRACT
のドキュメントをご覧ください。使用できる他の正規表現の例については、関数、演算子、条件をご覧ください。
クロスカラム検索
このセクションでは、テーブルの複数の列を検索する際に使用できる 2 つの方法について説明します。
トークンベースの検索
一連の検索キーワードに一致するエントリのテーブルを検索するには、関数 SEARCH
を使用します。この関数では、検索する場所と検索クエリの 2 つのパラメータが必要です。SEARCH
関数にはデータの検索方法に関する特定のルールがあるため、SEARCH
のドキュメントを一読することをおすすめします。
次のクエリは、フィールドが「35.193.12.15」と完全に一致する行のみを保持します。
SELECT
timestamp, log_id, proto_payload, severity, resource.type, resource, labels
FROM
`TABLE` AS t
WHERE
proto_payload IS NOT NULL AND
log_id = "cloudaudit.googleapis.com/data_access" AND
SEARCH(t,"`35.193.12.15`")
ORDER BY timestamp ASC
LIMIT 20
上記のクエリでは、バッククォートが検索対象の値をラップしています。これにより、SEARCH
関数はフィールド値とバッククォートの間の値との完全一致を検索します。
クエリ文字列でバッククォートが省略されている場合、クエリ文字列は SEARCH
のドキュメントで定義されているルールに基づいて分割されます。たとえば、次のステートメントを実行すると、クエリ文字列は「35」、「193」、「12」、「15」の 4 つのトークンに分割されます。
SEARCH(t,"35.193.12.15")
上記の SEARCH
ステートメントでは、1 つのフィールドが 4 つのトークンすべてに一致する場合、行と一致します。トークンの順序は関係ありません。
クエリには、複数の SEARCH
ステートメントを含めることができます。たとえば、上記のクエリでは、ログ ID のフィルタを次のようなステートメントに置き換えることができます。
SEARCH(t,"`cloudaudit.googleapis.com/data_access`")
上記のステートメントはテーブル全体を検索しますが、元のステートメントは log_id
列のみを検索します。
列に対して複数の検索を行うには、個々の文字列をスペースで区切ります。たとえば、次のステートメントは、フィールドに「Hello World」、「happy」、「days」を含む行を照合します。
SEARCH(t,"`Hello World` happy days")
最後に、テーブル全体ではなく、テーブルの特定の列を検索することもできます。たとえば、次のステートメントは text_payload
と json_payload
という名前の列のみを検索します。
SEARCH((text_payload, json_payload) ,"`35.222.132.245`")
SEARCH
関数のパラメータの処理方法については、BigQuery リファレンス ページの検索機能をご覧ください。
部分文字列の検索
式に値が存在するかどうかを判断するために大文字と小文字を区別しないテストを実行するには、関数 CONTAINS_SUBSTR
を使用します。
この関数は、値が存在する場合は TRUE
、存在しない場合は FALSE
を返します。検索値は STRING
リテラルである必要があり、リテラル NULL
ではありません。
たとえば、次のクエリは、タイムスタンプが特定の時間範囲内にある特定の IP アドレスを持つすべてのデータアクセス監査ログエントリを取得します。最後に、クエリ結果を並べ替えて、古いものから 20 件の結果を表示します。
SELECT
timestamp, log_id, proto_payload, severity, resource.type, resource, labels
FROM
`TABLE` AS t
WHERE
proto_payload IS NOT NULL AND
log_id = "cloudaudit.googleapis.com/data_access" AND
CONTAINS_SUBSTR(t,"35.193.12.15")
ORDER BY timestamp ASC
LIMIT 20
上記のクエリでは、部分文字列テストが実行されます。したがって、「35.193.12.152」を含む行は CONTAINS_SUBSTR
ステートメントと一致します。
複数のソースのデータを統合
クエリ ステートメントは、1 つ以上のテーブルまたは式をスキャンし、計算結果の行を返します。たとえば、クエリ ステートメントを使用して、さまざまなテーブルやデータセットの SELECT
ステートメントの結果をマージし、結合データから列を選択できます。
2 つのテーブルのデータを結合で結合
2 つのテーブルの情報を組み合わせるには、いずれかの JOIN 演算子を使用します。使用する結合の種類と条件句によって、行の結合方法と破棄方法が決まります。
次のクエリは、同じトレーススパンによって書き込まれた 2 つの異なるテーブルの行から json_payload
フィールドを取得します。クエリは、両方のテーブルの span_id
列と trace
列の値が一致する行の 2 つのテーブルに対して内部 JOIN
を実行します。この結果により、TABLE_1 から取得した timestamp
、severity
、json_payload
の各フィールドがクエリから選択されます。json_payload
フィールドは TABLE_2 と、2 つのテーブルが結合された span_id
フィールドと trace
フィールドの値は、最大 100 行を返します。
SELECT
a.timestamp, a.severity, a.json_payload, b.json_payload, a.span_id, a.trace
FROM `TABLE_1` a
JOIN `TABLE_2` b
ON
a.span_id = b.span_id AND
a.trace = b.trace
LIMIT 100
複数の選択を union で結合
2 つ以上の SELECT
ステートメントの結果を結合して重複行を破棄するには、UNION
演算子を使用します。重複した行を保持するには、UNION ALL
演算子を使用します。
次のクエリは、TABLE_1 から直近の 1 時間のデータを読み取り、その結果を TABLE_2 から直近の 1 時間のデータとマージし、タイムスタンプを昇順で並べ替えて、古い順に 100 個のエントリを表示します。
SELECT
timestamp, log_name, severity, json_payload, resource, labels
FROM(
SELECT * FROM `TABLE_1`
UNION ALL
SELECT * FROM `TABLE_2`
)
ORDER BY timestamp ASC
LIMIT 100
次のステップ
ログエントリを転送して保存する方法については、次のドキュメントをご覧ください。
SQL リファレンス ドキュメントについては、次のドキュメントをご覧ください。