将发现结果流式传输到 BigQuery 进行分析

本页面介绍如何使用适用于 BigQuery 的 Security Command Center 导出功能将新的和更新后的发现结果流式传输到 BigQuery 数据集。现有发现结果不会更新到 BigQuery,除非对其进行更新。

BigQuery 是 Google Cloud 的全代管式 PB 级经济实惠的分析数据仓库,可让您近乎实时地分析大量数据。您可以使用 BigQuery 来运行 针对新的发现结果和更新后的发现结果进行查询、过滤数据以找到所需内容 生成自定义报告如需详细了解 BigQuery,请参阅 BigQuery 文档

概览

启用此功能后,写入到 Security Command Center 的新发现结果将近乎实时地导出到 BigQuery 表。然后,您可以将这些数据集成到现有工作流中并创建自定义分析。您可以在组织、文件夹和项目级层启用此功能,以根据您的需要导出发现结果。

此功能是将 Security Command Center 发现结果导出到 BigQuery 的推荐方法,因为它是全代管式的,不需要执行手动操作或编写自定义代码。

数据集结构

此功能会将每个新发现结果及其后续更新添加为 findings 表中的新行,该表按 source_idfinding_idevent_time 聚簇。

发现结果更新时,此功能会使用 source_idfinding_id 值相同,但 event_time 值不同 值。此数据集结构可让您查看每个发现结果的状态变化 。

请注意,您的数据集中可能存在重复条目。如需对其进行解析,您可以使用 DISTINCT 子句,如第一个示例查询所示。

每个数据集包含一个 findings 表,其中包含以下字段:

字段 说明
source_id

Security Command Center 分配给发现结果来源的唯一标识符。 例如,来自 Cloud Anomaly Detection 来源的所有发现结果都具有相同的 source_id 值。

示例:1234567890

finding_id 表示发现结果的唯一标识符。它在 组织来源由字母和数字组成,长度小于 或 等于 32 个字符。
event_time

事件发生的时间或发现结果的更新时间。例如,如果发现结果代表一个未解决的 防火墙,则 event_time 会捕获检测器的时间 认为防火墙已打开。如果发现结果在之后得以解决,那么该时间会反映解决发现结果的时间。

示例:2019-09-26 12:48:00.985000 UTC

finding

安全、风险、健康或隐私等评估数据的记录; 提取到 Security Command Center 进行演示时 通知、分析、政策测试和实施。例如: App Engine 中的跨站脚本攻击 (XSS) 漏洞 就是一个发现结果。

如需详细了解嵌套字段,请参阅 API 参考文档 的 Finding 对象。

资源

与此发现结果关联的 Google Cloud 资源的相关信息。

如需详细了解嵌套字段,请参阅 Resource 对象的 API 参考文档。

费用

您需要支付与此功能相关的 BigQuery 费用。如需了解详情,请参阅 BigQuery 价格

准备工作

您必须先完成以下步骤,然后才能启用此功能。

设置权限

如需完成本指南,您必须拥有以下 Identity and Access Management (IAM) 角色:

创建 BigQuery 数据集

创建 BigQuery 数据集。如需了解详情,请参阅创建数据集

规划数据驻留

如果为 Security Command Center 启用了数据驻留,则用于定义将数据流式导出到 BigQuery 的配置(BigQueryExport 资源)会受到数据驻留控制,并存储在您选择的 Security Command Center 位置中。

如需将 Security Command Center 位置中的发现结果导出到 BigQuery,请执行以下操作: 您必须在同一测试中 Security Command Center 位置作为发现结果。

由于 BigQuery Export 中使用的过滤条件可以包含 受驻留控制措施约束的数据,请确保您指定正确的 然后再创建这些文件Security Command Center 不会限制 创建导出作业的位置

BigQuery Export 仅存储在其创建的位置,无法在其他位置查看或修改。

创建 BigQuery Export 后,您便无法更改其 位置。如需更改位置,您需要删除 BigQuery Export 文件,然后在新位置重新创建该文件。

如需使用 API 调用检索 BigQuery Export,您需要 在 bigQueryExport 的完整资源名称中指定位置。 例如:

GET https://securitycenter.googleapis.com/v2/{name=organizations/123/locations/eu/bigQueryExports/my-export-01}

同样,如需使用 gcloud CLI 检索 BigQuery 导出内容,您需要在配置的完整资源名称中指定位置,或使用 --locations 标志指定位置。例如:

gcloud scc scc bqexports get myBigQueryExport organizations/123 \
    --location=locations/us

将发现结果从 Security Command Center 导出到 BigQuery

如需导出发现结果,请先启用 Security Command Center API。

启用 Security Command Center API

要启用 Security Command Center API,请执行以下操作:

  1. 转到 Google Cloud 控制台中的“API 库”页面。

    转到 API 库

  2. 选择要启用 Security Command Center API 的项目。

  3. 搜索框中,输入 Security Command Center,然后点击搜索结果中的 Security Command Center。

  4. 在随即显示的 API 页面上,点击启用

您的项目已启用 Security Command Center API。接下来,您将使用 gcloud CLI 创建到 BigQuery 的新导出配置。

在 VPC Service Controls 中授予边界访问权限

如果您使用 VPC Service Controls 并且您的 BigQuery 数据集是服务边界内项目的一部分, 必须授予对项目的访问权限才能导出发现结果。

如需授予对项目的访问权限,请为导出发现结果的主账号和项目创建入站和出站规则。这些规则允许访问受保护的资源,并可让 BigQuery 验证用户是否具有对 BigQuery 数据集的 setIamPolicy 权限。

设置到 BigQuery 的新导出之前的准备工作

  1. 转到 Google Cloud 控制台中的 VPC Service Controls 页面。

    前往 VPC Service Controls

  2. 如有必要,请选择您的组织。

  3. 点击要更改的服务边界的名称。

    如需查找您需要修改的服务边界,您可以查看日志中是否存在显示 RESOURCES_NOT_IN_SAME_SERVICE_PERIMETER 违规行为的条目。在这些条目中,检查 servicePerimeterName 字段:accessPolicies/ACCESS_POLICY_ID/servicePerimeters/SERVICE_PERIMETER_NAME

  4. 点击修改边界

  5. 在导航菜单中,点击入站流量政策

  6. 如需为用户或服务账号配置入站流量规则,请使用以下参数:

    • API 客户端的 FROM 特性
      • 身份菜单中,选择选定的身份
      • 来源菜单中,选择所有来源
      • 点击选择,然后输入用于调用 Security Command Center API 的主账号。
    • Google Cloud 服务/资源的 TO 属性
      • 项目菜单中,选择所选项目
      • 点击选择,然后输入 BigQuery 数据集所属的项目。
      • 服务菜单中,选择选定的服务,然后选择 BigQuery API
      • 方法菜单中,选择所有操作
  7. 点击保存

  8. 在导航菜单中,点击出站流量政策

  9. 点击添加规则 (Add Rule)。

  10. 如需为用户或服务账号配置出站流量规则,请输入以下参数:

    • API 客户端的 FROM 特性
      • 身份菜单中,选择选定的身份
      • 点击选择,然后输入用于调用 Security Command Center API。
    • Google Cloud 服务/资源的 TO 属性
      • 项目菜单中,选择所有项目
      • 服务菜单中,选择选定的服务,然后选择 BigQuery API
      • 方法菜单中,选择所有操作
  11. 点击保存

设置到 BigQuery 的新导出

在此步骤中,您将创建导出配置,将发现结果导出到 BigQuery 实例。您可以在项目、文件夹或组织级层创建导出配置。例如,如果您要将发现结果从项目导出到 BigQuery 数据集,则可以在项目级层创建导出配置,以仅导出与该项目相关的发现结果。(可选)您可以指定过滤条件,以便仅导出特定发现结果。

请务必在适当的级层创建导出配置。对于 例如,如果您在项目 B 中创建导出配置来导出发现结果 并且您可以定义过滤条件 resource.project_display_name: project-a-id,配置不会导出 任何发现结果。

您最多可以创建 500 个导出配置到 BigQuery 。您可以使用同一数据集进行多次导出 配置。如果使用同一数据集,则所有更新都将应用于 相同的发现结果表

首次创建导出配置时,服务账号 由系统自动为您创建创建或 更新数据集内的发现结果表,并将发现结果导出到该表。 其形式为 service-org-ORGANIZATION_ID@gcp-sa-scc-notification.iam.gservicaccount.com和 被授予 BigQuery Data Editor (roles/bigquery.dataEditor) 角色, BigQuery 数据集级别。

gcloud

  1. 转到 Google Cloud 控制台。

    前往 Google Cloud 控制台

  2. 选择启用了 Security Command Center API 的项目。

  3. 点击激活 Cloud Shell

  4. 如需创建新的导出配置,请运行以下命令:

    gcloud scc bqexports create BIG_QUERY_EXPORT \
        --dataset=DATASET_NAME \
        --folder=FOLDER_ID | --organization=ORGANIZATION_ID | --project=PROJECT_ID \
        --location=LOCATION \
        [--description=DESCRIPTION] \
        [--filter=FILTER]
    

    替换以下内容:

    • BIG_QUERY_EXPORT 替换为此导出配置的名称。
    • DATASET_NAME 替换为 BigQuery 数据集,例如 projects/PROJECT_ID/datasets/DATASET_ID
    • FOLDER_IDORGANIZATION_IDPROJECT_ID 替换为您的文件夹、组织或项目的名称。您必须设置其中一个选项。对于文件夹和 组织的名称是文件夹 ID 或组织 ID。对于 项目,该名称就是项目编号或项目 ID。
    • LOCATION:如果已启用数据驻留,请指定 Security Command Center 位置 将在其中创建 BigQuery ExportBigQuery Export 配置存储在此位置。只有来自此位置的发现结果才会包含在导出范围内。

      如果未启用数据驻留,指定 --location 标志会使用 Security Command Center API v2 创建 BigQuery 导出,并且该标志的唯一有效值为 global

    • DESCRIPTION 替换为直观易懂的导出配置说明。此变量为可选项。

    • FILTER 替换为一个定义要处理的发现结果的表达式 导出的数据例如,如果要过滤 XSS_SCRIPTING 类别,请输入 "category=\"XSS_SCRIPTING\"。此变量为可选项。

Java

如需向 Security Command Center 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅 为本地开发环境设置身份验证

以下示例使用 v1 API。如需修改 v2 的示例,请将 v1 替换为 v2,并将 /locations/LOCATION 添加到资源名称中。

对于大多数资源,请在资源名称的 /PARENT/PARENT_ID 后面添加 /locations/LOCATION,其中 PARENTorganizationsfoldersprojects

对于发现结果,请在资源名称的 /sources/SOURCE_ID 后面添加 /locations/LOCATION,其中 SOURCE_ID 是发出发现结果的 Security Command Center 服务 的 ID。


import com.google.cloud.securitycenter.v1.BigQueryExport;
import com.google.cloud.securitycenter.v1.CreateBigQueryExportRequest;
import com.google.cloud.securitycenter.v1.SecurityCenterClient;
import java.io.IOException;
import java.util.UUID;

public class CreateBigQueryExport {

  public static void main(String[] args) throws IOException {
    // TODO(Developer): Modify the following variable values.

    // parent: Use any one of the following resource paths:
    //              - organizations/{organization_id}
    //              - folders/{folder_id}
    //              - projects/{project_id}
    String parent = String.format("projects/%s", "your-google-cloud-project-id");

    // filter: Expression that defines the filter to apply across create/update events of findings.
    String filter =
        "severity=\"LOW\" OR severity=\"MEDIUM\" AND "
            + "category=\"Persistence: IAM Anomalous Grant\" AND "
            + "-resource.type:\"compute\"";

    // bigQueryDatasetId: The BigQuery dataset to write findings' updates to.
    String bigQueryDatasetId = "your-bigquery-dataset-id";

    // bigQueryExportId: Unique identifier provided by the client.
    // For more info, see:
    // https://cloud.google.com/security-command-center/docs/how-to-analyze-findings-in-big-query#export_findings_from_to
    String bigQueryExportId = "default-" + UUID.randomUUID().toString().split("-")[0];

    createBigQueryExport(parent, filter, bigQueryDatasetId, bigQueryExportId);
  }

  // Create export configuration to export findings from a project to a BigQuery dataset.
  // Optionally specify filter to export certain findings only.
  public static void createBigQueryExport(
      String parent, String filter, String bigQueryDatasetId, String bigQueryExportId)
      throws IOException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (SecurityCenterClient client = SecurityCenterClient.create()) {

      // Create the BigQuery export configuration.
      BigQueryExport bigQueryExport =
          BigQueryExport.newBuilder()
              .setDescription(
                  "Export low and medium findings if the compute resource "
                      + "has an IAM anomalous grant")
              .setFilter(filter)
              .setDataset(String.format("%s/datasets/%s", parent, bigQueryDatasetId))
              .build();

      CreateBigQueryExportRequest bigQueryExportRequest =
          CreateBigQueryExportRequest.newBuilder()
              .setParent(parent)
              .setBigQueryExport(bigQueryExport)
              .setBigQueryExportId(bigQueryExportId)
              .build();

      // Create the export request.
      BigQueryExport response = client.createBigQueryExport(bigQueryExportRequest);

      System.out.printf("BigQuery export request created successfully: %s\n", response.getName());
    }
  }
}

Python

如需向 Security Command Center 进行身份验证,请设置应用默认值 凭据。如需了解详情,请参阅 为本地开发环境设置身份验证

以下示例使用 v1 API。修改 v2 的示例,将 v1 替换为 v2,并添加 将 /locations/LOCATION 设置为资源名称。

对于大多数资源,请在资源名称的 /PARENT/PARENT_ID 后面添加 /locations/LOCATION,其中 PARENTorganizationsfoldersprojects

对于发现结果,请在资源名称的 /sources/SOURCE_ID 后面添加 /locations/LOCATION,其中 SOURCE_ID 是发出发现结果的 Security Command Center 服务 的 ID。



def create_bigquery_export(
    parent: str, export_filter: str, bigquery_dataset_id: str, bigquery_export_id: str
):
    from google.cloud import securitycenter

    """
    Create export configuration to export findings from a project to a BigQuery dataset.
    Optionally specify filter to export certain findings only.

    Args:
        parent: Use any one of the following resource paths:
             - organizations/{organization_id}
             - folders/{folder_id}
             - projects/{project_id}
        export_filter: Expression that defines the filter to apply across create/update events of findings.
        bigquery_dataset_id: The BigQuery dataset to write findings' updates to.
        bigquery_export_id: Unique identifier provided by the client.
             - example id: f"default-{str(uuid.uuid4()).split('-')[0]}"
        For more info, see:
        https://cloud.google.com/security-command-center/docs/how-to-analyze-findings-in-big-query#export_findings_from_to
    """
    client = securitycenter.SecurityCenterClient()

    # Create the BigQuery export configuration.
    bigquery_export = securitycenter.BigQueryExport()
    bigquery_export.description = "Export low and medium findings if the compute resource has an IAM anomalous grant"
    bigquery_export.filter = export_filter
    bigquery_export.dataset = f"{parent}/datasets/{bigquery_dataset_id}"

    request = securitycenter.CreateBigQueryExportRequest()
    request.parent = parent
    request.big_query_export = bigquery_export
    request.big_query_export_id = bigquery_export_id

    # Create the export request.
    response = client.create_big_query_export(request)

    print(f"BigQuery export request created successfully: {response.name}\n")

创建导出配置后,您应该会在大约 15 分钟内在 BigQuery 数据集中看到发现结果。创建 BigQuery 表后,与过滤条件和范围匹配的所有新发现结果和更新后的发现结果都将近乎实时地显示在该表中。

如需查看您的发现结果,请参阅查看发现结果

为到 BigQuery 的新导出创建入站规则

如果您使用 VPC Service Controls 并且您的 BigQuery 数据集是服务边界内项目的一部分, 必须为新导出到 BigQuery 创建入站规则。

  1. 从以下位置重新打开服务边界: 设置新的 BigQuery 导出

    前往 VPC Service Controls

  2. 点击入站流量政策

  3. 点击添加规则 (Add Rule)。

  4. 如需为导出配置设置入站规则,请输入以下参数:

    • API 客户端的 FROM 特性
      • 来源下拉菜单中,选择所有来源
      • 身份下拉菜单中,选择选定的身份
      • 点击选择,然后输入 BigQuery 的名称 导出配置服务账号: service-org-ORGANIZATION_ID@gcp-sa-scc-notification.iam.gserviceaccount.com
    • GCP 服务/资源的 TO 特性
      • 项目下拉菜单中,选择选定的项目
      • 点击选择,然后选择 BigQuery 数据集所属的项目。
      • 服务下拉菜单中,选择所选服务,然后 然后选择 BigQuery API
      • 方法下拉菜单中,选择所有操作
  5. 在导航菜单中,点击保存

选定的项目、用户和服务账号现在可以访问受保护的资源并导出发现结果。

如果您已按照本指南中的所有步骤操作,并且导出正常工作,您现在可以删除以下内容:

  • 主账号的入站规则
  • 主账号的出站规则

这些规则仅在配置导出配置时才需要。但是,对于 继续工作,您必须保留 让 Security Command Center 将发现结果导出到您的 位于服务边界内的 BigQuery 数据集。

查看导出配置的详细信息

gcloud

  1. 转到 Google Cloud 控制台。

    前往 Google Cloud 控制台

  2. 选择启用了 Security Command Center API 的项目。

  3. 点击激活 Cloud Shell

  4. 如需验证导出配置的详细信息,请运行以下命令 命令:

    gcloud scc bqexports get BIG_QUERY_EXPORT \
        --folder=FOLDER_ID | --organization=ORGANIZATION_ID | --project=PROJECT_ID \
        --location=LOCATION
    

    替换以下内容:

    • BIG_QUERY_EXPORT 替换为此导出作业的名称 配置。
    • FOLDER_IDORGANIZATION_IDPROJECT_ID 替换为您的文件夹、组织或项目的名称。您必须设置其中一个选项。对于文件夹和组织,该名称是文件夹 ID 或组织 ID。对于 项目,该名称就是项目编号或项目 ID。
    • LOCATION:如果启用了任一数据驻留,则为必填项 或者 BigQueryExport 资源是使用 v2 API 创建的。

      如果已启用数据驻留,请指定 Security Command Center 位置 导出的数据的对象。

      如果未启用数据驻留,请添加 /locations/LOCATION 仅在 BigQueryExport 时 是使用 Security Command Center API v2 创建的,其中 在这种情况下,唯一的有效位置是 global

      例如,如需从组织 ID 设置为 123 的组织中获取名为 my-bq-export 的导出配置,请运行以下命令:

      gcloud scc bqexports get my-bq-export \
          --organization=123
      

Java

如需向 Security Command Center 进行身份验证,请设置应用默认值 凭据。如需了解详情,请参阅为本地开发环境设置身份验证

以下示例使用 v1 API。修改 v2 的示例,将 v1 替换为 v2,并添加 将 /locations/LOCATION 设置为资源名称。

对于大多数资源,请将 /locations/LOCATION 添加到 资源名称在 /PARENT/PARENT_ID 之后,其中 PARENTorganizationsfolders、 或 projects

对于发现结果,请将 /locations/LOCATION 添加到资源中 名称在 /sources/SOURCE_ID 之后,其中 SOURCE_IDSecurity Command Center 服务 发出该发现结果。


import com.google.cloud.securitycenter.v1.BigQueryExport;
import com.google.cloud.securitycenter.v1.GetBigQueryExportRequest;
import com.google.cloud.securitycenter.v1.SecurityCenterClient;
import java.io.IOException;

public class GetBigQueryExport {

  public static void main(String[] args) throws IOException {
    // TODO(Developer): Modify the following variable values.

    // parent: Use any one of the following resource paths:
    //              - organizations/{organization_id}
    //              - folders/{folder_id}
    //              - projects/{project_id}
    String parent = String.format("projects/%s", "your-google-cloud-project-id");

    // bigQueryExportId: Unique identifier that is used to identify the export.
    String bigQueryExportId = "export-id";

    getBigQueryExport(parent, bigQueryExportId);
  }

  // Retrieve an existing BigQuery export.
  public static void getBigQueryExport(String parent, String bigQueryExportId) throws IOException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (SecurityCenterClient client = SecurityCenterClient.create()) {

      GetBigQueryExportRequest bigQueryExportRequest =
          GetBigQueryExportRequest.newBuilder()
              .setName(String.format("%s/bigQueryExports/%s", parent, bigQueryExportId))
              .build();

      BigQueryExport response = client.getBigQueryExport(bigQueryExportRequest);
      System.out.printf("Retrieved the BigQuery export: %s", response.getName());
    }
  }
}

Python

如需向 Security Command Center 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅 为本地开发环境设置身份验证

以下示例使用 v1 API。如需修改 v2 的示例,请将 v1 替换为 v2,并将 /locations/LOCATION 添加到资源名称中。

对于大多数资源,请将 /locations/LOCATION 添加到 资源名称在 /PARENT/PARENT_ID 之后,其中 PARENTorganizationsfolders、 或 projects

对于发现结果,请在资源名称的 /sources/SOURCE_ID 后面添加 /locations/LOCATION,其中 SOURCE_ID 是发出发现结果的 Security Command Center 服务 的 ID。

def get_bigquery_export(parent: str, bigquery_export_id: str):
    from google.cloud import securitycenter

    """
    Retrieve an existing BigQuery export.
    Args:
        parent: Use any one of the following resource paths:
                 - organizations/{organization_id}
                 - folders/{folder_id}
                 - projects/{project_id}
        bigquery_export_id: Unique identifier that is used to identify the export.
    """

    client = securitycenter.SecurityCenterClient()

    request = securitycenter.GetBigQueryExportRequest()
    request.name = f"{parent}/bigQueryExports/{bigquery_export_id}"

    response = client.get_big_query_export(request)
    print(f"Retrieved the BigQuery export: {response.name}")

更新导出配置

必要时,您可以修改 现有导出配置。您无法更改导出配置的名称。

gcloud

  1. 转到 Google Cloud 控制台。

    前往 Google Cloud 控制台

  2. 选择启用了 Security Command Center API 的项目。

  3. 点击激活 Cloud Shell

  4. 如需更新导出配置,请运行以下命令:

    gcloud scc bqexports update BIG_QUERY_EXPORT \
        --dataset=DATASET_NAME \
        --folder=FOLDER_ID | --organization=ORGANIZATION_ID | --project=PROJECT_ID \
        --location=LOCATION \
        [--description=DESCRIPTION] \
        [--filter=FILTER]
    

    替换以下内容:

    • BIG_QUERY_EXPORT 替换为您要更新的导出配置的名称。
    • DATASET_NAME 替换为 BigQuery 数据集,例如 projects/PROJECT_ID/datasets/DATASET_ID
    • FOLDER_IDORGANIZATION_IDPROJECT_ID 替换为您的文件夹、组织或项目的名称。您必须设置其中一个选项。对于文件夹和组织,该名称是文件夹 ID 或组织 ID。对于项目,该名称是项目编号或项目 ID。
    • LOCATION:如果数据驻留为任一数据驻留,则必须提供 或者 BigQueryExport 资源是使用 v2 API 创建的。

      如果已启用数据驻留,请指定 Security Command Center 位置 导出的数据的对象。

      如果未启用数据驻留,请添加 /locations/LOCATION,或指定 --location 标记,但仅当 BigQueryExport 资源是由 安全命令中心 API v2,在这种情况下, 地点为global

    • DESCRIPTION 替换为 导出配置。此变量为可选项。

    • FILTER 替换为一个定义哪些发现结果的表达式 要包含在导出范围内的例如,如果要过滤 XSS_SCRIPTING 类别,请输入 "category=\"XSS_SCRIPTING\"。此变量 是可选属性。

Java

如需向 Security Command Center 进行身份验证,请设置应用默认值 凭据。如需了解详情,请参阅为本地开发环境设置身份验证

以下示例使用 v1 API。修改 v2 的示例,将 v1 替换为 v2,并添加 将 /locations/LOCATION 设置为资源名称。

对于大多数资源,请在资源名称的 /PARENT/PARENT_ID 后面添加 /locations/LOCATION,其中 PARENTorganizationsfoldersprojects

对于发现结果,请在资源名称的 /sources/SOURCE_ID 后面添加 /locations/LOCATION,其中 SOURCE_ID 是发出发现结果的 Security Command Center 服务 的 ID。


import com.google.cloud.securitycenter.v1.BigQueryExport;
import com.google.cloud.securitycenter.v1.SecurityCenterClient;
import com.google.cloud.securitycenter.v1.UpdateBigQueryExportRequest;
import com.google.protobuf.FieldMask;
import java.io.IOException;

public class UpdateBigQueryExport {

  public static void main(String[] args) throws IOException {
    // TODO(Developer): Modify the following variable values.

    // parent: Use any one of the following resource paths:
    //              - organizations/{organization_id}
    //              - folders/{folder_id}
    //              - projects/{project_id}
    String parent = String.format("projects/%s", "your-google-cloud-project-id");

    // filter: Expression that defines the filter to apply across create/update events of findings.
    String filter =
        "severity=\"LOW\" OR severity=\"MEDIUM\" AND "
            + "category=\"Persistence: IAM Anomalous Grant\" AND "
            + "-resource.type:\"compute\"";

    // bigQueryExportId: Unique identifier provided by the client.
    // For more info, see:
    // https://cloud.google.com/security-command-center/docs/how-to-analyze-findings-in-big-query#export_findings_from_to
    String bigQueryExportId = "big-query-export-id";

    updateBigQueryExport(parent, filter, bigQueryExportId);
  }

  // Updates an existing BigQuery export.
  public static void updateBigQueryExport(String parent, String filter, String bigQueryExportId)
      throws IOException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (SecurityCenterClient client = SecurityCenterClient.create()) {

      //  Set the new values for export configuration.
      BigQueryExport bigQueryExport =
          BigQueryExport.newBuilder()
              .setName(String.format("%s/bigQueryExports/%s", parent, bigQueryExportId))
              .setFilter(filter)
              .build();

      UpdateBigQueryExportRequest request =
          UpdateBigQueryExportRequest.newBuilder()
              .setBigQueryExport(bigQueryExport)
              // Set the update mask to specify which properties should be updated.
              // If empty, all mutable fields will be updated.
              // For more info on constructing field mask path, see the proto or:
              // https://cloud.google.com/java/docs/reference/protobuf/latest/com.google.protobuf.FieldMask
              .setUpdateMask(FieldMask.newBuilder().addPaths("filter").build())
              .build();

      BigQueryExport response = client.updateBigQueryExport(request);
      if (!response.getFilter().equalsIgnoreCase(filter)) {
        System.out.println("Failed to update BigQueryExport!");
        return;
      }
      System.out.println("BigQueryExport updated successfully!");
    }
  }
}

Python

如需向 Security Command Center 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

以下示例使用 v1 API。修改 v2 的示例,将 v1 替换为 v2,并添加 将 /locations/LOCATION 设置为资源名称。

对于大多数资源,请在资源名称的 /PARENT/PARENT_ID 后面添加 /locations/LOCATION,其中 PARENTorganizationsfoldersprojects

对于发现结果,请将 /locations/LOCATION 添加到资源中 名称在 /sources/SOURCE_ID 之后,其中 SOURCE_IDSecurity Command Center 服务 发出该发现结果。

def update_bigquery_export(parent: str, export_filter: str, bigquery_export_id: str):
    """
    Updates an existing BigQuery export.
    Args:
        parent: Use any one of the following resource paths:
                 - organizations/{organization_id}
                 - folders/{folder_id}
                 - projects/{project_id}
        export_filter: Expression that defines the filter to apply across create/update events of findings.
        bigquery_export_id: Unique identifier provided by the client.
        For more info, see:
        https://cloud.google.com/security-command-center/docs/how-to-analyze-findings-in-big-query#export_findings_from_to
    """
    from google.cloud import securitycenter
    from google.protobuf import field_mask_pb2

    client = securitycenter.SecurityCenterClient()

    # Set the new values for export configuration.
    bigquery_export = securitycenter.BigQueryExport()
    bigquery_export.name = f"{parent}/bigQueryExports/{bigquery_export_id}"
    bigquery_export.filter = export_filter

    # Field mask to only update the export filter.
    # Set the update mask to specify which properties should be updated.
    # If empty, all mutable fields will be updated.
    # For more info on constructing field mask path, see the proto or:
    # https://googleapis.dev/python/protobuf/latest/google/protobuf/field_mask_pb2.html
    field_mask = field_mask_pb2.FieldMask(paths=["filter"])

    request = securitycenter.UpdateBigQueryExportRequest()
    request.big_query_export = bigquery_export
    request.update_mask = field_mask

    response = client.update_big_query_export(request)

    if response.filter != export_filter:
        print("Failed to update BigQueryExport!")
        return
    print("BigQueryExport updated successfully!")

查看所有导出配置

您可以查看组织、文件夹或项目中的所有导出配置。

gcloud

  1. 转到 Google Cloud 控制台。

    前往 Google Cloud 控制台

  2. 选择启用了 Security Command Center API 的项目。

  3. 点击激活 Cloud Shell

  4. 如需列出导出配置,请运行以下命令:

    gcloud scc bqexports list \
        --folder=FOLDER_ID | --organization=ORGANIZATION_ID | --project=PROJECT_ID \
        --location=LOCATION \
        [--limit=LIMIT] \
        [--page-size=PAGE_SIZE]
    

    替换以下内容:

    • FOLDER_IDORGANIZATION_IDPROJECT_ID 替换为您的文件夹、组织或项目的名称。您必须设置其中一个选项。对于文件夹和组织,该名称是文件夹 ID 或组织 ID。对于 项目,该名称就是项目编号或项目 ID。

      如果您指定了组织 ID,则此列表会包含所有导出内容 在该组织中定义的配置,包括文件夹中的配置 和项目级别。如果您指定了文件夹 ID,则此列表会包含 导出在文件夹级别和项目中定义的配置 所有项目。如果您指定了项目编号或项目 ID,该列表仅包含该项目的所有导出配置。

    • LOCATION:如果启用了任一数据驻留,则为必填项 或者 BigQueryExport 资源是使用 v2 API 创建的。

      如果已启用数据驻留,请指定 Security Command Center 位置 导出的数据的存储位置。

      如果未启用数据驻留(包括 --location 标志列表),则会发生此错误 仅使用BigQueryExport Security Command Center API v2,唯一的有效位置是 global

    • LIMIT 替换为您的导出配置的数量 想要看到的效果此变量为可选项。

    • PAGE_SIZE 以及 页面大小值。 此变量为可选项。

Java

如需向 Security Command Center 进行身份验证,请设置应用默认值 凭据。如需了解详情,请参阅 为本地开发环境设置身份验证

以下示例使用 v1 API。修改 v2 的示例,将 v1 替换为 v2,并添加 将 /locations/LOCATION 设置为资源名称。

对于大多数资源,请在资源名称的 /PARENT/PARENT_ID 后面添加 /locations/LOCATION,其中 PARENTorganizationsfoldersprojects

对于发现结果,请在资源名称的 /sources/SOURCE_ID 后面添加 /locations/LOCATION,其中 SOURCE_ID 是发出发现结果的 Security Command Center 服务 的 ID。


import com.google.cloud.securitycenter.v1.BigQueryExport;
import com.google.cloud.securitycenter.v1.ListBigQueryExportsRequest;
import com.google.cloud.securitycenter.v1.SecurityCenterClient;
import com.google.cloud.securitycenter.v1.SecurityCenterClient.ListBigQueryExportsPagedResponse;
import java.io.IOException;

public class ListBigQueryExports {

  public static void main(String[] args) throws IOException {
    // TODO(Developer): Modify the following variable values.

    // parent: The parent, which owns the collection of BigQuery exports.
    //         Use any one of the following resource paths:
    //              - organizations/{organization_id}
    //              - folders/{folder_id}
    //              - projects/{project_id}
    String parent = String.format("projects/%s", "your-google-cloud-project-id");

    listBigQueryExports(parent);
  }

  // List BigQuery exports in the given parent.
  public static void listBigQueryExports(String parent) throws IOException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (SecurityCenterClient client = SecurityCenterClient.create()) {

      ListBigQueryExportsRequest request =
          ListBigQueryExportsRequest.newBuilder().setParent(parent).build();

      ListBigQueryExportsPagedResponse response = client.listBigQueryExports(request);

      System.out.println("Listing BigQuery exports:");
      for (BigQueryExport bigQueryExport : response.iterateAll()) {
        System.out.println(bigQueryExport.getName());
      }
    }
  }
}

Python

如需向 Security Command Center 进行身份验证,请设置应用默认值 凭据。如需了解详情,请参阅 为本地开发环境设置身份验证

以下示例使用 v1 API。如需修改 v2 的示例,请将 v1 替换为 v2,并将 /locations/LOCATION 添加到资源名称中。

对于大多数资源,请在资源名称的 /PARENT/PARENT_ID 后面添加 /locations/LOCATION,其中 PARENTorganizationsfoldersprojects

对于发现结果,请在资源名称的 /sources/SOURCE_ID 后面添加 /locations/LOCATION,其中 SOURCE_ID 是发出发现结果的 Security Command Center 服务 的 ID。

def list_bigquery_exports(parent: str):
    from google.cloud import securitycenter

    """
    List BigQuery exports in the given parent.
    Args:
         parent: The parent which owns the collection of BigQuery exports.
             Use any one of the following resource paths:
                 - organizations/{organization_id}
                 - folders/{folder_id}
                 - projects/{project_id}
    """

    client = securitycenter.SecurityCenterClient()

    request = securitycenter.ListBigQueryExportsRequest()
    request.parent = parent

    response = client.list_big_query_exports(request)

    print("Listing BigQuery exports:")
    for bigquery_export in response:
        print(bigquery_export.name)

删除导出配置

如果您不再需要导出配置,可以将其删除。

gcloud

  1. 转到 Google Cloud 控制台。

    前往 Google Cloud 控制台

  2. 选择启用了 Security Command Center API 的项目。

  3. 点击激活 Cloud Shell

  4. 如需删除导出配置,请运行以下命令:

    gcloud scc bqexports delete BIG_QUERY_EXPORT \
        --folder=FOLDER_ID | --organization=ORGANIZATION_ID | --project=PROJECT_ID \
        --location=LOCATION
    

    替换以下内容:

    • BIG_QUERY_EXPORT 替换为导出作业的名称 要删除的配置。
    • FOLDER_IDORGANIZATION_ID 或 将 PROJECT_ID 替换为您的文件夹、组织的名称 或项目。您必须设置其中一个选项。对于文件夹和组织,该名称是文件夹 ID 或组织 ID。对于 项目,该名称就是项目编号或项目 ID。
    • LOCATION:如果启用了任一数据驻留,则为必填项 或者 BigQueryExport 资源是使用 v2 API 创建的。

      如果已启用数据驻留,请指定 Security Command Center 位置 导出的数据的对象。

      如果未启用数据驻留,请添加 /locations/LOCATION 仅在 BigQueryExport 时 是使用 Security Command Center API v2 创建的,其中 在这种情况下,唯一的有效位置是 global

      例如,如需从组织 ID 设置为 123 的组织中删除名为 my-bq-export 的导出配置,请运行以下命令:

      gcloud scc bqexports delete my-bq-export \
          --organization=123
      

Java

如需向 Security Command Center 进行身份验证,请设置应用默认值 凭据。如需了解详情,请参阅为本地开发环境设置身份验证

以下示例使用 v1 API。修改 v2 的示例,将 v1 替换为 v2,并添加 将 /locations/LOCATION 设置为资源名称。

对于大多数资源,请在资源名称的 /PARENT/PARENT_ID 后面添加 /locations/LOCATION,其中 PARENTorganizationsfoldersprojects

对于发现结果,请在资源名称的 /sources/SOURCE_ID 后面添加 /locations/LOCATION,其中 SOURCE_ID 是发出发现结果的 Security Command Center 服务 的 ID。


import com.google.cloud.securitycenter.v1.DeleteBigQueryExportRequest;
import com.google.cloud.securitycenter.v1.SecurityCenterClient;
import java.io.IOException;

public class DeleteBigQueryExport {

  public static void main(String[] args) throws IOException {
    // TODO(Developer): Modify the following variable values.

    // parent: Use any one of the following resource paths:
    //              - organizations/{organization_id}
    //              - folders/{folder_id}
    //              - projects/{project_id}
    String parent = String.format("projects/%s", "your-google-cloud-project-id");

    // bigQueryExportId: Unique identifier that is used to identify the export.
    String bigQueryExportId = "export-id";

    deleteBigQueryExport(parent, bigQueryExportId);
  }

  // Delete an existing BigQuery export.
  public static void deleteBigQueryExport(String parent, String bigQueryExportId)
      throws IOException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (SecurityCenterClient client = SecurityCenterClient.create()) {

      DeleteBigQueryExportRequest bigQueryExportRequest =
          DeleteBigQueryExportRequest.newBuilder()
              .setName(String.format("%s/bigQueryExports/%s", parent, bigQueryExportId))
              .build();

      client.deleteBigQueryExport(bigQueryExportRequest);
      System.out.printf("BigQuery export request deleted successfully: %s", bigQueryExportId);
    }
  }
}

Python

如需向 Security Command Center 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅 为本地开发环境设置身份验证

以下示例使用 v1 API。修改 v2 的示例,将 v1 替换为 v2,并添加 将 /locations/LOCATION 设置为资源名称。

对于大多数资源,请在资源名称的 /PARENT/PARENT_ID 后面添加 /locations/LOCATION,其中 PARENTorganizationsfoldersprojects

对于发现结果,请在资源名称的 /sources/SOURCE_ID 后面添加 /locations/LOCATION,其中 SOURCE_ID 是发出发现结果的 Security Command Center 服务 的 ID。

def delete_bigquery_export(parent: str, bigquery_export_id: str):
    """
    Delete an existing BigQuery export.
    Args:
        parent: Use any one of the following resource paths:
                 - organizations/{organization_id}
                 - folders/{folder_id}
                 - projects/{project_id}
        bigquery_export_id: Unique identifier that is used to identify the export.
    """
    from google.cloud import securitycenter

    client = securitycenter.SecurityCenterClient()

    request = securitycenter.DeleteBigQueryExportRequest()
    request.name = f"{parent}/bigQueryExports/{bigquery_export_id}"

    client.delete_big_query_export(request)
    print(f"BigQuery export request deleted successfully: {bigquery_export_id}")

删除导出配置后,您可以从 Looker 数据洞察中移除数据。如需了解详情,请参阅移除、删除和恢复数据源

在 BigQuery 中查看发现结果

创建导出配置后,新发现结果会导出到 您指定的项目中的 BigQuery 数据集。

如需在 BigQuery 中查看发现结果,请执行以下操作:

  1. 转到 BigQuery 中的项目。

    转到 BigQuery

  2. 如果您不在正确的项目中,请执行以下步骤:

    1. 在工具栏上,点击 项目选择器。
    2. “请选择:”旁边,选择您的组织。
    3. 在项目列表中,选择您的项目。
  3. 探索器窗格中,展开项目的节点。

  4. 展开您的数据集。

  5. 点击发现结果表。

  6. 在打开的标签页上,点击预览。此时会显示一组数据示例。

实用的查询

本部分提供了用于分析发现结果数据的示例查询。在以下示例中,请将 将 DATASET 替换为分配给数据集的名称; 将 PROJECT_ID 替换为您的数据集的项目名称。

如需排查遇到的任何错误,请参阅错误消息

每天创建和更新的新发现结果的数量

SELECT
    FORMAT_DATETIME("%Y-%m-%d", event_time) AS date,
    count(DISTINCT finding_id)
FROM `PROJECT_ID.DATASET.findings`
GROUP BY date
ORDER BY date DESC

每个发现结果的最新发现结果记录

SELECT
    * EXCEPT(row)
FROM (
    SELECT *, ROW_NUMBER() OVER(
        PARTITION BY finding_id
        ORDER BY event_time DESC, finding.mute_update_time DESC
    ) AS row
    FROM `PROJECT_ID.DATASET.findings`
)
WHERE row = 1

按时间排序的当前活跃发现结果

WITH latestFindings AS (
    SELECT * EXCEPT(row)
    FROM (
        SELECT *, ROW_NUMBER() OVER(
            PARTITION BY finding_id
            ORDER BY event_time DESC, finding.mute_update_time DESC
        ) AS row
        FROM `PROJECT_ID.DATASET.findings`
    ) WHERE row = 1
)
SELECT finding_id, event_time, finding
FROM latestFindings
WHERE finding.state = "ACTIVE"
ORDER BY event_time DESC

项目中的当前发现结果

WITH latestFindings AS (
    SELECT * EXCEPT(row)
    FROM (
        SELECT *, ROW_NUMBER() OVER(
            PARTITION BY finding_id
            ORDER BY event_time DESC, finding.mute_update_time DESC
        ) AS row
        FROM `PROJECT_ID.DATASET.findings`
    ) WHERE row = 1
)
SELECT finding_id, event_time, finding, resource
FROM latestFindings
WHERE resource.project_display_name = 'PROJECT'

PROJECT 替换为项目名称。

文件夹中的当前发现结果

WITH latestFindings AS(
    SELECT * EXCEPT(row)
    FROM (
        SELECT *, ROW_NUMBER() OVER(
            PARTITION BY finding_id
            ORDER BY event_time DESC, finding.mute_update_time DESC
        ) AS row
        FROM `PROJECT_ID.DATASET.findings`
    ) WHERE row = 1
)
SELECT finding_id, event_time, finding, resource
FROM latestFindings
CROSS JOIN UNNEST(resource.folders) AS folder
WHERE folder.resource_folder_display_name = 'FOLDER'

FOLDER 替换为文件夹名称。

扫描程序 Logging Scanner 的当前发现结果

WITH latestFindings AS (
    SELECT * EXCEPT(row)
    FROM (
        SELECT *, ROW_NUMBER() OVER(
            PARTITION BY finding_id
            ORDER BY event_time DESC, finding.mute_update_time DESC
        ) AS row
        FROM `PROJECT_ID.DATASET.findings`
    ) WHERE row = 1
)
SELECT finding_id, event_time, finding
FROM latestFindings
CROSS JOIN UNNEST(finding.source_properties) AS source_property
WHERE source_property.key = "ScannerName"
  AND source_property.value = "LOGGING_SCANNER"

类型为 Persistence: IAM Anomalous Grant 的当前活跃发现结果

WITH latestFindings AS(
    SELECT * EXCEPT(row)
    FROM (
        SELECT *, ROW_NUMBER() OVER(
            PARTITION BY finding_id
            ORDER BY event_time DESC, finding.mute_update_time DESC
        ) AS row
        FROM `PROJECT_ID.DATASET.findings`
    ) WHERE row = 1
)
SELECT finding_id, event_time, finding
FROM latestFindings
WHERE finding.state = "ACTIVE"
  AND finding.category = "Persistence: IAM Anomalous Grant"

将给定类型的活跃发现结果与 Cloud Audit Logs 关联

此示例查询有助于调查异常的 IAM 授权发现结果 从使用 Cloud Audit Logs 的 Event Threat Detection 中运行。 此时间段之前和之后的管理员活动操作 异常 IAM 授权操作。以下查询将发现结果时间戳之前 1 小时到之后 1 小时的管理员活动日志进行关联。

WITH latestFindings AS(
    SELECT * EXCEPT(row)
    FROM (
        SELECT *, ROW_NUMBER() OVER(
            PARTITION BY finding_id
            ORDER BY event_time DESC, finding.mute_update_time DESC
        ) AS row
        FROM `PROJECT_ID.DATASET.findings`
    ) WHERE row = 1
)
SELECT
  finding_id,
  ANY_VALUE(event_time) as event_time,
  ANY_VALUE(finding.access.principal_email) as grantor,
  JSON_VALUE_ARRAY(ANY_VALUE(finding.source_properties_json), '$.properties.sensitiveRoleGrant.members') as grantees,
  ARRAY_AGG(
    STRUCT(
      timestamp,
      IF(timestamp < event_time, 'before', 'after') as timeline,
      protopayload_auditlog.methodName,
      protopayload_auditlog.resourceName,
      protopayload_auditlog.serviceName
    )
    ORDER BY timestamp ASC
  ) AS recent_activity
FROM (
  SELECT
    f.*,
    a.*,
  FROM latestFindings AS f
  LEFT JOIN `PROJECT_ID.DATASET.cloudaudit_googleapis_com_activity` AS a
  ON a.protopayload_auditlog.authenticationInfo.principalEmail = f.finding.access.principal_email
  WHERE f.finding.state = "ACTIVE"
    AND f.finding.category = "Persistence: IAM Anomalous Grant"
    AND a.timestamp >= TIMESTAMP_SUB(f.event_time, INTERVAL 1 HOUR)
    AND a.timestamp <= TIMESTAMP_ADD(f.event_time, INTERVAL 1 HOUR)
  )
GROUP BY
  finding_id
ORDER BY
  event_time DESC

输出类似于以下内容:

查询结果的屏幕截图,其中显示了发现结果及其关联的审核日志

在 Looker 数据洞察中创建图表

Looker Studio 可以 创建交互式报告和信息中心

一般来说,通过 Looker 数据洞察访问 BigQuery 时会产生 BigQuery 使用费。如需了解详情,请参阅使用 Looker 数据洞察直观呈现 BigQuery 数据

如需创建按严重性和类别直观呈现发现结果数据的图表,请执行以下操作:

  1. 打开 Looker 数据洞察并登录。
  2. 如果系统提示,请提供更多信息并设置其他偏好设置。 阅读服务条款,如果满意,则继续。
  3. 点击空白报告
  4. 连接到数据标签页上,点击 BigQuery 卡。
  5. 如果出现提示,请授权 Looker Studio 访问 BigQuery 项目。
  6. 关联到您的发现结果数据:

    1. 项目部分,为数据集选择该项目。或者,在我的项目标签页中,输入您的项目 ID 以进行搜索。
    2. 对于数据集,点击数据集的名称。
    3. 对于表格,点击发现结果
    4. 点击添加
    5. 在对话框中,点击添加到报告
  7. 添加报告后,点击添加图表

  8. 点击堆叠柱形图,然后点击区域 放置它。

    图表选择界面的屏幕截图
  9. 图表 > 条形图窗格的数据标签页上,设置以下字段:

    1. 维度字段中,选择 finding.severity
    2. 细分维度字段中,选择 finding.category
    按严重性分类并按类别进一步分类的发现结果图表的屏幕截图

报告已更新为显示多个列,其中包含按严重性划分的发现结果 和类别。

后续步骤

了解如何在 BigQuery 中运行查询