将发现结果流式传输到 BigQuery 进行分析

本页面介绍如何使用适用于 BigQuery 的 Security Command Center 导出功能将新的和更新后的发现结果流式传输到 BigQuery 数据集。现有发现结果不会更新到 BigQuery,除非对其进行更新。

BigQuery 是 Google Cloud 的全代管式 PB 级经济实惠的分析数据仓库,可让您近乎实时地分析大量数据。您可以使用 BigQuery 对新发现和更新后的发现结果运行查询,过滤数据以查找所需内容,并生成自定义报告。如需详细了解 BigQuery,请参阅 BigQuery 文档

概览

启用此功能后,写入到 Security Command Center 的新发现结果将近乎实时地导出到 BigQuery 表。然后,您可以将数据集成到现有工作流中并创建自定义分析。您可以在组织、文件夹和项目级别启用此功能,以根据您的要求导出发现结果。

此功能是将 Security Command Center 发现结果导出到 BigQuery 的推荐方法,因为它是全代管式的,不需要执行手动操作或编写自定义代码。

数据集结构

此功能会将每个新发现结果及其后续更新添加为 findings 表中的新行,该表按 source_idfinding_idevent_time 聚簇。

发现结果更新后,此功能会创建具有相同 source_idfinding_id 值但具有不同 event_time 值的多个发现结果记录。通过此数据集结构,您可以查看每个发现结果的状态随时间的变化。

请注意,您的数据集中可能存在重复条目。如需对其进行解析,您可以使用 DISTINCT 子句,如第一个示例查询所示。

每个数据集包含一个 findings 表,其中包含以下字段:

字段 说明
source_id Security Command Center 分配给发现结果来源的唯一标识符。例如,来自 Cloud Anomaly Detection 来源的所有发现结果都具有相同的 source_id 值。

示例:1234567890
finding_id 表示发现结果的唯一标识符。该标识符在组织来源范围内是唯一的。它由字母数字构成,并且少于或等于 32 个字符。
event_time 事件的发生时间或发现结果的更新时间。例如,如果发现结果表明防火墙打开,则“event_time”会捕获检测器认为防火墙处于打开状态的时间。如果发现结果在之后得以解决,那么该时间会反映解决发现结果的时间。

示例:2019-09-26 12:48:00.985000 UTC
finding 被提取到 Security Command Center 以进行展示、通知、分析、政策测试和实施的评估数据(如安全、风险、运行状况或隐私)记录。例如,App Engine 应用中的跨站脚本攻击 (XSS) 漏洞是一个发现结果。

如需详细了解嵌套字段,请参阅 Finding 对象的 API 参考文档。
resource 与此发现结果关联的 Google Cloud 资源的相关信息。

如需详细了解嵌套字段,请参阅 Resource 对象的 API 参考文档。

费用

您需要支付与此功能相关的 BigQuery 费用。如需了解详情,请参阅 BigQuery 价格

准备工作

您必须先完成以下步骤,然后才能启用此功能。

设置权限

如需完成本指南,您必须拥有以下 Identity and Access Management (IAM) 角色:

创建 BigQuery 数据集

创建 BigQuery 数据集。如需了解详情,请参阅创建数据集

数据驻留规划

如果为 Security Command Center 启用了数据驻留,则定义将数据流式导出到 BigQuery 的配置(BigQueryExport 资源)受数据驻留控制,并存储在您选择的 Security Command Center 位置中。

如需将某个 Security Command Center 位置中的发现结果导出到 BigQuery,您必须在发现结果所在的 Security Command Center 位置配置 BigQuery 导出。

由于 BigQuery Export 中使用的过滤条件可能包含受驻留控制措施约束的数据,因此请确保在创建这些过滤条件之前指定正确的位置。Security Command Center 不限制您在哪个位置创建导出作业。

BigQuery Export 仅存储在创建它们的位置,无法在其他位置查看或修改。

创建 BigQuery 导出后,便无法更改其位置。如需更改此位置,您需要删除 BigQuery 导出内容,然后在新位置重新创建。

如需使用 API 调用检索 BigQuery 导出,您需要在 bigQueryExport 的完整资源名称中指定位置。例如:

GET https://securitycenter.googleapis.com/v2/{name=organizations/123/locations/eu/bigQueryExports/my-export-01}

同样,如需使用 gcloud CLI 检索 BigQuery 导出数据,您需要在配置的完整资源名称中或使用 --locations 标志指定位置。例如:

gcloud scc scc bqexports get myBigQueryExport organizations/123 \
    --location=locations/us

将发现结果从 Security Command Center 导出到 BigQuery

如需导出发现结果,请先启用 Security Command Center API。

启用 Security Command Center API

要启用 Security Command Center API,请执行以下操作:

  1. 转到 Google Cloud 控制台中的“API 库”页面。

    转到 API 库

  2. 选择要启用 Security Command Center API 的项目。

  3. 搜索框中,输入 Security Command Center,然后点击搜索结果中的 Security Command Center。

  4. 在随即显示的 API 页面上,点击启用

您的项目已启用 Security Command Center API。接下来,您将使用 gcloud CLI 创建到 BigQuery 的新导出配置。

在 VPC Service Controls 中授予边界访问权限

如果您使用 VPC Service Controls,并且 BigQuery 数据集属于服务边界内的项目,则必须授予对项目的访问权限,以便导出发现结果。

如需授予对项目的访问权限,请为导出发现结果的主账号和项目创建入站和出站规则。这些规则允许访问受保护的资源,并可让 BigQuery 验证用户是否具有对 BigQuery 数据集的 setIamPolicy 权限。

设置到 BigQuery 的新导出之前的准备工作

  1. 转到 Google Cloud 控制台中的 VPC Service Controls 页面。

    转到 VPC Service Controls

  2. 如有必要,请选择您的组织。

  3. 点击要更改的服务边界的名称。

    如需查找您需要修改的服务边界,您可以查看日志中是否存在显示 RESOURCES_NOT_IN_SAME_SERVICE_PERIMETER 违规行为的条目。在这些条目中,检查 servicePerimeterName 字段:accessPolicies/ACCESS_POLICY_ID/servicePerimeters/SERVICE_PERIMETER_NAME

  4. 点击修改边界

  5. 在导航菜单中,点击入站流量政策

  6. 如需为用户或服务账号配置入站流量规则,请使用以下参数:

    • API 客户端的 FROM 特性
      • 身份下拉菜单中,选择选定的身份
      • 来源下拉菜单中,选择所有来源
      • 点击选择,然后输入用于调用 Security Command Center API 的主账号。
    • Google Cloud 服务/资源的 TO 属性
      • 项目下拉菜单中,选择选定的项目
      • 点击选择,然后输入 BigQuery 数据集所属的项目。
      • 服务下拉菜单中,选择选定的服务,然后选择 BigQuery API
      • 方法下拉菜单中,选择所有操作
  7. 点击保存

  8. 在导航菜单中,点击出站流量政策

  9. 点击添加规则 (Add Rule)。

  10. 如需为用户或服务账号配置出站流量规则,请输入以下参数:

    • API 客户端的 FROM 特性
      • 身份下拉菜单中,选择选定的身份
      • 点击选择,然后输入用于调用 Security Command Center API 的主账号。
    • Google Cloud 服务/资源的 TO 属性
      • 项目下拉菜单中,选择所有项目
      • 服务下拉菜单中,选择选定的服务,然后选择 BigQuery API
      • 方法下拉菜单中,选择所有操作
  11. 点击保存

设置到 BigQuery 的新导出

在此步骤中,您将创建一个导出配置,以将发现结果导出到 BigQuery 实例。您可以在项目、文件夹或组织级层创建导出配置。例如,如果您要将发现结果从项目导出到 BigQuery 数据集,则可以在项目级层创建导出配置,以仅导出与该项目相关的发现结果。(可选)您可以指定过滤条件,以便仅导出特定发现结果。

请务必在适当的级层创建导出配置。例如,如果您在项目 B 中创建导出配置,但从项目 A 中导出发现结果,并且定义了 resource.project_display_name: project-a-id 等过滤条件,则该配置不会导出任何发现结果。

您最多可以为组织创建 500 项到 BigQuery 的导出配置。您可以将同一数据集用于多项导出配置。如果您使用同一数据集,则系统会对同一发现结果表执行所有更新。

创建第一个导出配置时,系统会自动为您创建服务账号。必须使用此服务账号才能在数据集中创建或更新发现结果表,以及将发现结果导出到表中。该账号格式为 service-org-ORGANIZATION_ID@gcp-sa-scc-notification.iam.gservicaccount.com,并且在 BigQuery 数据集级层被授予 BigQuery Data Editor (roles/bigquery.dataEditor) 角色。

gcloud

  1. 转到 Google Cloud 控制台。

    转到 Google Cloud 控制台

  2. 选择启用了 Security Command Center API 的项目。

  3. 点击激活 Cloud Shell

  4. 如需创建新的导出配置,请运行以下命令:

    gcloud scc bqexports create BIG_QUERY_EXPORT \
      --dataset=DATASET_NAME \
      --folder=FOLDER_ID | --organization=ORGANIZATION_ID | --project=PROJECT_ID \
      --location=LOCATION \
      [--description=DESCRIPTION] \
      [--filter=FILTER]
    

    替换以下内容:

    • BIG_QUERY_EXPORT 替换为此导出配置的名称。

    • DATASET_NAME 替换为 BigQuery 数据集的名称,例如 projects/<PROJECT_ID>/datasets/<DATASET_ID>

    • FOLDER_IDORGANIZATION_IDPROJECT_ID 替换为您的文件夹、组织或项目的名称。您必须设置其中一个选项。对于文件夹和组织,该名称是文件夹 ID 或组织 ID。对于项目,该名称是项目编号或项目 ID。

    • LOCATION:如果启用了数据驻留,请指定要在其中创建 BigQuery 导出数据的 Security Command Center 位置。BigQuery 导出配置存储在此位置。导出结果中仅包含来自此位置的发现结果。

      如果未启用数据驻留,则指定 --location 标志会使用 Security Command Center API v2 创建 BigQuery 导出,并且该标志的唯一有效值为 global

    • DESCRIPTION 替换为直观易懂的导出配置说明。此变量为可选项。

    • FILTER,以及定义要包含在导出中的结果的表达式。例如,如果要按 XSS_SCRIPTING 类别进行过滤,请输入 "category=\"XSS_SCRIPTING\"。此变量为可选项。

Java

如需向 Security Command Center 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

以下示例使用 v1 API。如需修改 v2 的示例,请将 v1 替换为 v2,并将 /locations/LOCATION 添加到资源名称中。

对于大多数资源,请在资源名称的 /PARENT/PARENT_ID 后添加 /locations/LOCATION,其中 PARENTorganizationsfoldersprojects

对于发现结果,请在资源名称的 /sources/SOURCE_ID 后添加 /locations/LOCATION,其中 SOURCE_ID 是发出发现结果的 Security Command Center 服务的 ID。


import com.google.cloud.securitycenter.v1.BigQueryExport;
import com.google.cloud.securitycenter.v1.CreateBigQueryExportRequest;
import com.google.cloud.securitycenter.v1.SecurityCenterClient;
import java.io.IOException;
import java.util.UUID;

public class CreateBigQueryExport {

  public static void main(String[] args) throws IOException {
    // TODO(Developer): Modify the following variable values.

    // parent: Use any one of the following resource paths:
    //              - organizations/{organization_id}
    //              - folders/{folder_id}
    //              - projects/{project_id}
    String parent = String.format("projects/%s", "your-google-cloud-project-id");

    // filter: Expression that defines the filter to apply across create/update events of findings.
    String filter =
        "severity=\"LOW\" OR severity=\"MEDIUM\" AND "
            + "category=\"Persistence: IAM Anomalous Grant\" AND "
            + "-resource.type:\"compute\"";

    // bigQueryDatasetId: The BigQuery dataset to write findings' updates to.
    String bigQueryDatasetId = "your-bigquery-dataset-id";

    // bigQueryExportId: Unique identifier provided by the client.
    // For more info, see:
    // https://cloud.google.com/security-command-center/docs/how-to-analyze-findings-in-big-query#export_findings_from_to
    String bigQueryExportId = "default-" + UUID.randomUUID().toString().split("-")[0];

    createBigQueryExport(parent, filter, bigQueryDatasetId, bigQueryExportId);
  }

  // Create export configuration to export findings from a project to a BigQuery dataset.
  // Optionally specify filter to export certain findings only.
  public static void createBigQueryExport(
      String parent, String filter, String bigQueryDatasetId, String bigQueryExportId)
      throws IOException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (SecurityCenterClient client = SecurityCenterClient.create()) {

      // Create the BigQuery export configuration.
      BigQueryExport bigQueryExport =
          BigQueryExport.newBuilder()
              .setDescription(
                  "Export low and medium findings if the compute resource "
                      + "has an IAM anomalous grant")
              .setFilter(filter)
              .setDataset(String.format("%s/datasets/%s", parent, bigQueryDatasetId))
              .build();

      CreateBigQueryExportRequest bigQueryExportRequest =
          CreateBigQueryExportRequest.newBuilder()
              .setParent(parent)
              .setBigQueryExport(bigQueryExport)
              .setBigQueryExportId(bigQueryExportId)
              .build();

      // Create the export request.
      BigQueryExport response = client.createBigQueryExport(bigQueryExportRequest);

      System.out.printf("BigQuery export request created successfully: %s\n", response.getName());
    }
  }
}

Python

如需向 Security Command Center 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

以下示例使用 v1 API。如需修改 v2 的示例,请将 v1 替换为 v2,并将 /locations/LOCATION 添加到资源名称中。

对于大多数资源,请在资源名称的 /PARENT/PARENT_ID 后添加 /locations/LOCATION,其中 PARENTorganizationsfoldersprojects

对于发现结果,请在资源名称的 /sources/SOURCE_ID 后添加 /locations/LOCATION,其中 SOURCE_ID 是发出发现结果的 Security Command Center 服务的 ID。



def create_bigquery_export(
    parent: str, export_filter: str, bigquery_dataset_id: str, bigquery_export_id: str
):
    from google.cloud import securitycenter

    """
    Create export configuration to export findings from a project to a BigQuery dataset.
    Optionally specify filter to export certain findings only.

    Args:
        parent: Use any one of the following resource paths:
             - organizations/{organization_id}
             - folders/{folder_id}
             - projects/{project_id}
        export_filter: Expression that defines the filter to apply across create/update events of findings.
        bigquery_dataset_id: The BigQuery dataset to write findings' updates to.
        bigquery_export_id: Unique identifier provided by the client.
             - example id: f"default-{str(uuid.uuid4()).split('-')[0]}"
        For more info, see:
        https://cloud.google.com/security-command-center/docs/how-to-analyze-findings-in-big-query#export_findings_from_to
    """
    client = securitycenter.SecurityCenterClient()

    # Create the BigQuery export configuration.
    bigquery_export = securitycenter.BigQueryExport()
    bigquery_export.description = "Export low and medium findings if the compute resource has an IAM anomalous grant"
    bigquery_export.filter = export_filter
    bigquery_export.dataset = f"{parent}/datasets/{bigquery_dataset_id}"

    request = securitycenter.CreateBigQueryExportRequest()
    request.parent = parent
    request.big_query_export = bigquery_export
    request.big_query_export_id = bigquery_export_id

    # Create the export request.
    response = client.create_big_query_export(request)

    print(f"BigQuery export request created successfully: {response.name}\n")

创建导出配置后,您应该会在大约 15 分钟内在 BigQuery 数据集中看到发现结果。创建 BigQuery 表后,与过滤条件和范围匹配的所有新发现结果和更新后的发现结果都将近乎实时地显示在该表中。

如需查看您的发现结果,请参阅查看发现结果

为到 BigQuery 的新导出创建入站规则

如果您使用 VPC Service Controls,并且 BigQuery 数据集属于服务边界内的项目,则必须为到 BigQuery 的新导出创建入站规则。

  1. 重新打开设置到 BigQuery 的新导出中的服务边界。

    转到 VPC Service Controls

  2. 点击入站流量政策

  3. 点击添加规则 (Add Rule)。

  4. 如需为导出配置设置入站规则,请输入以下参数:

    • API 客户端的 FROM 特性
      • 来源下拉菜单中,选择所有来源
      • 身份下拉菜单中,选择选定的身份
      • 点击选择,然后输入 BigQuery Export 配置服务账号的名称:service-org-ORGANIZATION_ID@gcp-sa-scc-notification.iam.gserviceaccount.com
    • GCP 服务/资源的 TO 特性
      • 项目下拉菜单中,选择选定的项目
      • 点击选择,然后选择 BigQuery 数据集所属的项目。
      • 服务下拉菜单中,选择选定的服务,然后选择 BigQuery API
      • 方法下拉菜单中,选择所有操作
  5. 在导航菜单中,点击保存

选定的项目、用户和服务账号现在可以访问受保护的资源并导出发现结果。

如果您已按照本指南中的所有步骤操作,并且导出正常工作,您现在可以删除以下内容:

  • 主账号的入站规则
  • 主账号的出站规则

这些规则仅在配置导出配置时才需要。但是,为了让导出配置继续正常工作,您必须保留上面创建的入站规则,该规则允许 Security Command Center 将发现结果导出到服务边界后面的 BigQuery 数据集。

查看导出配置的详细信息

gcloud

  1. 转到 Google Cloud 控制台。

    转到 Google Cloud 控制台

  2. 选择启用了 Security Command Center API 的项目。

  3. 点击激活 Cloud Shell

  4. 如需验证导出配置的详细信息,请运行以下命令:

    gcloud scc bqexports get BIG_QUERY_EXPORT \
      --folder=FOLDER_ID | --organization=ORGANIZATION_ID | --project=PROJECT_ID \
      --location=LOCATION
    

    替换以下内容:

    • BIG_QUERY_EXPORT 替换为此导出配置的名称。

    • FOLDER_IDORGANIZATION_IDPROJECT_ID 替换为您的文件夹、组织或项目的名称。您必须设置其中一个选项。对于文件夹和组织,该名称是文件夹 ID 或组织 ID。对于项目,该名称是项目编号或项目 ID。

    • LOCATION:如果启用了数据驻留或者 BigQueryExport 资源是使用 v2 API 创建的,则需要。

      如果启用了数据驻留,请指定存储导出数据的 Security Command Center 位置

      如果未启用数据驻留,请仅在 BigQueryExport 资源是使用 Security Command Center API v2 创建时添加 /locations/LOCATION(在这种情况下,唯一有效的位置是 global)。

    例如,如需从组织 ID 设置为 123 的组织中获取名为 my-bq-export 的导出配置,请运行以下命令:

    gcloud scc bqexports get my-bq-export --organization=123
    

Java

如需向 Security Command Center 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

以下示例使用 v1 API。如需修改 v2 的示例,请将 v1 替换为 v2,并将 /locations/LOCATION 添加到资源名称中。

对于大多数资源,请在资源名称的 /PARENT/PARENT_ID 后添加 /locations/LOCATION,其中 PARENTorganizationsfoldersprojects

对于发现结果,请在资源名称的 /sources/SOURCE_ID 后添加 /locations/LOCATION,其中 SOURCE_ID 是发出发现结果的 Security Command Center 服务的 ID。


import com.google.cloud.securitycenter.v1.BigQueryExport;
import com.google.cloud.securitycenter.v1.GetBigQueryExportRequest;
import com.google.cloud.securitycenter.v1.SecurityCenterClient;
import java.io.IOException;

public class GetBigQueryExport {

  public static void main(String[] args) throws IOException {
    // TODO(Developer): Modify the following variable values.

    // parent: Use any one of the following resource paths:
    //              - organizations/{organization_id}
    //              - folders/{folder_id}
    //              - projects/{project_id}
    String parent = String.format("projects/%s", "your-google-cloud-project-id");

    // bigQueryExportId: Unique identifier that is used to identify the export.
    String bigQueryExportId = "export-id";

    getBigQueryExport(parent, bigQueryExportId);
  }

  // Retrieve an existing BigQuery export.
  public static void getBigQueryExport(String parent, String bigQueryExportId) throws IOException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (SecurityCenterClient client = SecurityCenterClient.create()) {

      GetBigQueryExportRequest bigQueryExportRequest =
          GetBigQueryExportRequest.newBuilder()
              .setName(String.format("%s/bigQueryExports/%s", parent, bigQueryExportId))
              .build();

      BigQueryExport response = client.getBigQueryExport(bigQueryExportRequest);
      System.out.printf("Retrieved the BigQuery export: %s", response.getName());
    }
  }
}

Python

如需向 Security Command Center 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

以下示例使用 v1 API。如需修改 v2 的示例,请将 v1 替换为 v2,并将 /locations/LOCATION 添加到资源名称中。

对于大多数资源,请在资源名称的 /PARENT/PARENT_ID 后添加 /locations/LOCATION,其中 PARENTorganizationsfoldersprojects

对于发现结果,请在资源名称的 /sources/SOURCE_ID 后添加 /locations/LOCATION,其中 SOURCE_ID 是发出发现结果的 Security Command Center 服务的 ID。

def get_bigquery_export(parent: str, bigquery_export_id: str):
    from google.cloud import securitycenter

    """
    Retrieve an existing BigQuery export.
    Args:
        parent: Use any one of the following resource paths:
                 - organizations/{organization_id}
                 - folders/{folder_id}
                 - projects/{project_id}
        bigquery_export_id: Unique identifier that is used to identify the export.
    """

    client = securitycenter.SecurityCenterClient()

    request = securitycenter.GetBigQueryExportRequest()
    request.name = f"{parent}/bigQueryExports/{bigquery_export_id}"

    response = client.get_big_query_export(request)
    print(f"Retrieved the BigQuery export: {response.name}")

更新导出配置

如有必要,您可以修改现有导出配置的过滤条件、数据集和说明。您无法更改导出配置的名称。

gcloud

  1. 转到 Google Cloud 控制台。

    转到 Google Cloud 控制台

  2. 选择启用了 Security Command Center API 的项目。

  3. 点击激活 Cloud Shell

  4. 如需更新导出配置,请运行以下命令:

    gcloud scc bqexports update BIG_QUERY_EXPORT \
      --dataset=DATASET_NAME \
      --folder=FOLDER_ID | --organization=ORGANIZATION_ID | --project=PROJECT_ID \
      --location=LOCATION \
      [--description=DESCRIPTION] \
      [--filter=FILTER]
    

    替换以下内容:

    • BIG_QUERY_EXPORT 替换为您要更新的导出配置的名称。

    • DATASET_NAME 替换为 BigQuery 数据集的名称,例如 projects/<PROJECT_ID>/datasets/<DATASET_ID>

    • FOLDER_IDORGANIZATION_IDPROJECT_ID 替换为您的文件夹、组织或项目的名称。您必须设置其中一个选项。对于文件夹和组织,该名称是文件夹 ID 或组织 ID。对于项目,该名称是项目编号或项目 ID。

    • LOCATION:如果启用了数据驻留或者 BigQueryExport 资源是使用 v2 API 创建的,则需要。

      如果启用了数据驻留,请指定存储导出数据的 Security Command Center 位置

      如果未启用数据驻留,则仅当 BigQueryExport 资源是使用 Security Command Center API v2 创建时,才在全名中包含 /locations/LOCATION 或指定 --location 标志(在这种情况下,唯一有效的位置是 global)。

    • DESCRIPTION 替换为直观易懂的导出配置说明。此变量为可选项。

    • FILTER,以及定义要包含在导出中的结果的表达式。例如,如果要按 XSS_SCRIPTING 类别进行过滤,请输入 "category=\"XSS_SCRIPTING\"。此变量为可选项。

Java

如需向 Security Command Center 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

以下示例使用 v1 API。如需修改 v2 的示例,请将 v1 替换为 v2,并将 /locations/LOCATION 添加到资源名称中。

对于大多数资源,请在资源名称的 /PARENT/PARENT_ID 后添加 /locations/LOCATION,其中 PARENTorganizationsfoldersprojects

对于发现结果,请在资源名称的 /sources/SOURCE_ID 后添加 /locations/LOCATION,其中 SOURCE_ID 是发出发现结果的 Security Command Center 服务的 ID。


import com.google.cloud.securitycenter.v1.BigQueryExport;
import com.google.cloud.securitycenter.v1.SecurityCenterClient;
import com.google.cloud.securitycenter.v1.UpdateBigQueryExportRequest;
import com.google.protobuf.FieldMask;
import java.io.IOException;

public class UpdateBigQueryExport {

  public static void main(String[] args) throws IOException {
    // TODO(Developer): Modify the following variable values.

    // parent: Use any one of the following resource paths:
    //              - organizations/{organization_id}
    //              - folders/{folder_id}
    //              - projects/{project_id}
    String parent = String.format("projects/%s", "your-google-cloud-project-id");

    // filter: Expression that defines the filter to apply across create/update events of findings.
    String filter =
        "severity=\"LOW\" OR severity=\"MEDIUM\" AND "
            + "category=\"Persistence: IAM Anomalous Grant\" AND "
            + "-resource.type:\"compute\"";

    // bigQueryExportId: Unique identifier provided by the client.
    // For more info, see:
    // https://cloud.google.com/security-command-center/docs/how-to-analyze-findings-in-big-query#export_findings_from_to
    String bigQueryExportId = "big-query-export-id";

    updateBigQueryExport(parent, filter, bigQueryExportId);
  }

  // Updates an existing BigQuery export.
  public static void updateBigQueryExport(String parent, String filter, String bigQueryExportId)
      throws IOException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (SecurityCenterClient client = SecurityCenterClient.create()) {

      //  Set the new values for export configuration.
      BigQueryExport bigQueryExport =
          BigQueryExport.newBuilder()
              .setName(String.format("%s/bigQueryExports/%s", parent, bigQueryExportId))
              .setFilter(filter)
              .build();

      UpdateBigQueryExportRequest request =
          UpdateBigQueryExportRequest.newBuilder()
              .setBigQueryExport(bigQueryExport)
              // Set the update mask to specify which properties should be updated.
              // If empty, all mutable fields will be updated.
              // For more info on constructing field mask path, see the proto or:
              // https://cloud.google.com/java/docs/reference/protobuf/latest/com.google.protobuf.FieldMask
              .setUpdateMask(FieldMask.newBuilder().addPaths("filter").build())
              .build();

      BigQueryExport response = client.updateBigQueryExport(request);
      if (!response.getFilter().equalsIgnoreCase(filter)) {
        System.out.println("Failed to update BigQueryExport!");
        return;
      }
      System.out.println("BigQueryExport updated successfully!");
    }
  }
}

Python

如需向 Security Command Center 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

以下示例使用 v1 API。如需修改 v2 的示例,请将 v1 替换为 v2,并将 /locations/LOCATION 添加到资源名称中。

对于大多数资源,请在资源名称的 /PARENT/PARENT_ID 后添加 /locations/LOCATION,其中 PARENTorganizationsfoldersprojects

对于发现结果,请在资源名称的 /sources/SOURCE_ID 后添加 /locations/LOCATION,其中 SOURCE_ID 是发出发现结果的 Security Command Center 服务的 ID。

def update_bigquery_export(parent: str, export_filter: str, bigquery_export_id: str):
    """
    Updates an existing BigQuery export.
    Args:
        parent: Use any one of the following resource paths:
                 - organizations/{organization_id}
                 - folders/{folder_id}
                 - projects/{project_id}
        export_filter: Expression that defines the filter to apply across create/update events of findings.
        bigquery_export_id: Unique identifier provided by the client.
        For more info, see:
        https://cloud.google.com/security-command-center/docs/how-to-analyze-findings-in-big-query#export_findings_from_to
    """
    from google.cloud import securitycenter
    from google.protobuf import field_mask_pb2

    client = securitycenter.SecurityCenterClient()

    # Set the new values for export configuration.
    bigquery_export = securitycenter.BigQueryExport()
    bigquery_export.name = f"{parent}/bigQueryExports/{bigquery_export_id}"
    bigquery_export.filter = export_filter

    # Field mask to only update the export filter.
    # Set the update mask to specify which properties should be updated.
    # If empty, all mutable fields will be updated.
    # For more info on constructing field mask path, see the proto or:
    # https://googleapis.dev/python/protobuf/latest/google/protobuf/field_mask_pb2.html
    field_mask = field_mask_pb2.FieldMask(paths=["filter"])

    request = securitycenter.UpdateBigQueryExportRequest()
    request.big_query_export = bigquery_export
    request.update_mask = field_mask

    response = client.update_big_query_export(request)

    if response.filter != export_filter:
        print("Failed to update BigQueryExport!")
        return
    print("BigQueryExport updated successfully!")

查看所有导出配置

您可以查看组织、文件夹或项目中的所有导出配置。

gcloud

  1. 转到 Google Cloud 控制台。

    转到 Google Cloud 控制台

  2. 选择启用了 Security Command Center API 的项目。

  3. 点击激活 Cloud Shell

  4. 如需列出导出配置,请运行以下命令:

    gcloud scc bqexports list \
      --folder=FOLDER_ID | --organization=ORGANIZATION_ID | --project=PROJECT_ID \
      --location=LOCATION \
      [--limit=LIMIT] \
      [--page-size=PAGE_SIZE]
    

    替换以下内容:

    • FOLDER_IDORGANIZATION_IDPROJECT_ID 替换为您的文件夹、组织或项目的名称。您必须设置其中一个选项。对于文件夹和组织,该名称是文件夹 ID 或组织 ID。对于项目,该名称是项目编号或项目 ID。

      如果您指定了组织 ID,该列表会包含该组织中定义的所有导出配置,包括在文件夹和项目级层定义的配置。如果您指定了文件夹 ID,该列表会包含在文件夹级层以及在该文件夹内的项目中定义的所有导出配置。如果您指定了项目编号或项目 ID,该列表仅包含该项目的所有导出配置。

    • LOCATION:如果启用了数据驻留或者 BigQueryExport 资源是使用 v2 API 创建的,则需要。

      如果启用了数据驻留,请指定存储导出内容的 Security Command Center 位置

      如果未启用数据驻留,则在添加 --location 标志时,系统只会列出使用 Security Command Center API v2 创建的 BigQueryExport 资源,并且唯一的有效位置是 global

    • LIMIT 替换为您要查看的导出配置的数量。此变量为可选项。

    • PAGE_SIZE 替换为页面大小值。此变量为可选项。

Java

如需向 Security Command Center 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

以下示例使用 v1 API。如需修改 v2 的示例,请将 v1 替换为 v2,并将 /locations/LOCATION 添加到资源名称中。

对于大多数资源,请在资源名称的 /PARENT/PARENT_ID 后添加 /locations/LOCATION,其中 PARENTorganizationsfoldersprojects

对于发现结果,请在资源名称的 /sources/SOURCE_ID 后添加 /locations/LOCATION,其中 SOURCE_ID 是发出发现结果的 Security Command Center 服务的 ID。


import com.google.cloud.securitycenter.v1.BigQueryExport;
import com.google.cloud.securitycenter.v1.ListBigQueryExportsRequest;
import com.google.cloud.securitycenter.v1.SecurityCenterClient;
import com.google.cloud.securitycenter.v1.SecurityCenterClient.ListBigQueryExportsPagedResponse;
import java.io.IOException;

public class ListBigQueryExports {

  public static void main(String[] args) throws IOException {
    // TODO(Developer): Modify the following variable values.

    // parent: The parent, which owns the collection of BigQuery exports.
    //         Use any one of the following resource paths:
    //              - organizations/{organization_id}
    //              - folders/{folder_id}
    //              - projects/{project_id}
    String parent = String.format("projects/%s", "your-google-cloud-project-id");

    listBigQueryExports(parent);
  }

  // List BigQuery exports in the given parent.
  public static void listBigQueryExports(String parent) throws IOException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (SecurityCenterClient client = SecurityCenterClient.create()) {

      ListBigQueryExportsRequest request =
          ListBigQueryExportsRequest.newBuilder().setParent(parent).build();

      ListBigQueryExportsPagedResponse response = client.listBigQueryExports(request);

      System.out.println("Listing BigQuery exports:");
      for (BigQueryExport bigQueryExport : response.iterateAll()) {
        System.out.println(bigQueryExport.getName());
      }
    }
  }
}

Python

如需向 Security Command Center 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

以下示例使用 v1 API。如需修改 v2 的示例,请将 v1 替换为 v2,并将 /locations/LOCATION 添加到资源名称中。

对于大多数资源,请在资源名称的 /PARENT/PARENT_ID 后添加 /locations/LOCATION,其中 PARENTorganizationsfoldersprojects

对于发现结果,请在资源名称的 /sources/SOURCE_ID 后添加 /locations/LOCATION,其中 SOURCE_ID 是发出发现结果的 Security Command Center 服务的 ID。

def list_bigquery_exports(parent: str):
    from google.cloud import securitycenter

    """
    List BigQuery exports in the given parent.
    Args:
         parent: The parent which owns the collection of BigQuery exports.
             Use any one of the following resource paths:
                 - organizations/{organization_id}
                 - folders/{folder_id}
                 - projects/{project_id}
    """

    client = securitycenter.SecurityCenterClient()

    request = securitycenter.ListBigQueryExportsRequest()
    request.parent = parent

    response = client.list_big_query_exports(request)

    print("Listing BigQuery exports:")
    for bigquery_export in response:
        print(bigquery_export.name)

删除导出配置

如果您不再需要导出配置,可以将其删除。

gcloud

  1. 转到 Google Cloud 控制台。

    转到 Google Cloud 控制台

  2. 选择启用了 Security Command Center API 的项目。

  3. 点击激活 Cloud Shell

  4. 如需删除导出配置,请运行以下命令:

    gcloud scc bqexports delete BIG_QUERY_EXPORT \
      --folder=FOLDER_ID | --organization=ORGANIZATION_ID | --project=PROJECT_ID \
      --location=LOCATION
    

    替换以下内容:

    • BIG_QUERY_EXPORT 替换为您要删除的导出配置的名称。

    • FOLDER_IDORGANIZATION_IDPROJECT_ID 替换为您的文件夹、组织或项目的名称。您必须设置其中一个选项。对于文件夹和组织,该名称是文件夹 ID 或组织 ID。对于项目,该名称是项目编号或项目 ID。

    • LOCATION:如果启用了数据驻留或者 BigQueryExport 资源是使用 v2 API 创建的,则需要。

      如果启用了数据驻留,请指定存储导出数据的 Security Command Center 位置

      如果未启用数据驻留,请仅在 BigQueryExport 资源是使用 Security Command Center API v2 创建时添加 /locations/LOCATION(在这种情况下,唯一有效的位置是 global)。

    例如,如需从组织 ID 设置为 123 的组织中删除名为 my-bq-export 的导出配置,请运行以下命令:

    gcloud scc bqexports delete my-bq-export --organization=123
    

Java

如需向 Security Command Center 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

以下示例使用 v1 API。如需修改 v2 的示例,请将 v1 替换为 v2,并将 /locations/LOCATION 添加到资源名称中。

对于大多数资源,请在资源名称的 /PARENT/PARENT_ID 后添加 /locations/LOCATION,其中 PARENTorganizationsfoldersprojects

对于发现结果,请在资源名称的 /sources/SOURCE_ID 后添加 /locations/LOCATION,其中 SOURCE_ID 是发出发现结果的 Security Command Center 服务的 ID。


import com.google.cloud.securitycenter.v1.DeleteBigQueryExportRequest;
import com.google.cloud.securitycenter.v1.SecurityCenterClient;
import java.io.IOException;

public class DeleteBigQueryExport {

  public static void main(String[] args) throws IOException {
    // TODO(Developer): Modify the following variable values.

    // parent: Use any one of the following resource paths:
    //              - organizations/{organization_id}
    //              - folders/{folder_id}
    //              - projects/{project_id}
    String parent = String.format("projects/%s", "your-google-cloud-project-id");

    // bigQueryExportId: Unique identifier that is used to identify the export.
    String bigQueryExportId = "export-id";

    deleteBigQueryExport(parent, bigQueryExportId);
  }

  // Delete an existing BigQuery export.
  public static void deleteBigQueryExport(String parent, String bigQueryExportId)
      throws IOException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (SecurityCenterClient client = SecurityCenterClient.create()) {

      DeleteBigQueryExportRequest bigQueryExportRequest =
          DeleteBigQueryExportRequest.newBuilder()
              .setName(String.format("%s/bigQueryExports/%s", parent, bigQueryExportId))
              .build();

      client.deleteBigQueryExport(bigQueryExportRequest);
      System.out.printf("BigQuery export request deleted successfully: %s", bigQueryExportId);
    }
  }
}

Python

如需向 Security Command Center 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

以下示例使用 v1 API。如需修改 v2 的示例,请将 v1 替换为 v2,并将 /locations/LOCATION 添加到资源名称中。

对于大多数资源,请在资源名称的 /PARENT/PARENT_ID 后添加 /locations/LOCATION,其中 PARENTorganizationsfoldersprojects

对于发现结果,请在资源名称的 /sources/SOURCE_ID 后添加 /locations/LOCATION,其中 SOURCE_ID 是发出发现结果的 Security Command Center 服务的 ID。

def delete_bigquery_export(parent: str, bigquery_export_id: str):
    """
    Delete an existing BigQuery export.
    Args:
        parent: Use any one of the following resource paths:
                 - organizations/{organization_id}
                 - folders/{folder_id}
                 - projects/{project_id}
        bigquery_export_id: Unique identifier that is used to identify the export.
    """
    from google.cloud import securitycenter

    client = securitycenter.SecurityCenterClient()

    request = securitycenter.DeleteBigQueryExportRequest()
    request.name = f"{parent}/bigQueryExports/{bigquery_export_id}"

    client.delete_big_query_export(request)
    print(f"BigQuery export request deleted successfully: {bigquery_export_id}")

删除导出配置后,您可以从 Looker 数据洞察中移除数据。如需了解详情,请参阅移除、删除和恢复数据源

在 BigQuery 中查看发现结果

创建导出配置后,系统会将新的发现结果导出到您指定的项目中的 BigQuery 数据集。

如需在 BigQuery 中查看发现结果,请执行以下操作:

  1. 转到 BigQuery 中的项目。

    转至 BigQuery

  2. 如果您不在正确的项目中,请执行以下步骤:

    1. 在工具栏上,点击 项目选择器。
    2. “请选择:”旁边,选择您的组织。
    3. 在项目列表中,选择您的项目。
  3. 探索器窗格中,展开项目的节点。

  4. 展开您的数据集。

  5. 点击发现结果表。

  6. 在打开的标签页上,点击预览。此时会显示一组数据示例。

实用的查询

本部分提供了用于分析发现结果数据的示例查询。在以下示例中,将 DATASET 替换为分配给数据集的名称,将 PROJECT_ID 替换为数据集的项目名称。

如需排查遇到的任何错误,请参阅错误消息

每天创建和更新的新发现结果的数量

SELECT
    FORMAT_DATETIME("%Y-%m-%d", event_time) AS date,
    count(DISTINCT finding_id)
FROM `PROJECT_ID.DATASET.findings`
GROUP BY date
ORDER BY date DESC

每个发现结果的最新发现结果记录

SELECT
    * EXCEPT(row)
FROM (
    SELECT *, ROW_NUMBER() OVER(
        PARTITION BY finding_id
        ORDER BY event_time DESC, finding.mute_update_time DESC
    ) AS row
    FROM `PROJECT_ID.DATASET.findings`
)
WHERE row = 1

按时间排序的当前活跃发现结果

WITH latestFindings AS (
    SELECT * EXCEPT(row)
    FROM (
        SELECT *, ROW_NUMBER() OVER(
            PARTITION BY finding_id
            ORDER BY event_time DESC, finding.mute_update_time DESC
        ) AS row
        FROM `PROJECT_ID.DATASET.findings`
    ) WHERE row = 1
)
SELECT finding_id, event_time, finding
FROM latestFindings
WHERE finding.state = "ACTIVE"
ORDER BY event_time DESC

项目中的当前发现结果

WITH latestFindings AS (
    SELECT * EXCEPT(row)
    FROM (
        SELECT *, ROW_NUMBER() OVER(
            PARTITION BY finding_id
            ORDER BY event_time DESC, finding.mute_update_time DESC
        ) AS row
        FROM `PROJECT_ID.DATASET.findings`
    ) WHERE row = 1
)
SELECT finding_id, event_time, finding, resource
FROM latestFindings
WHERE resource.project_display_name = 'PROJECT'

PROJECT 替换为项目名称。

文件夹中的当前发现结果

WITH latestFindings AS(
    SELECT * EXCEPT(row)
    FROM (
        SELECT *, ROW_NUMBER() OVER(
            PARTITION BY finding_id
            ORDER BY event_time DESC, finding.mute_update_time DESC
        ) AS row
        FROM `PROJECT_ID.DATASET.findings`
    ) WHERE row = 1
)
SELECT finding_id, event_time, finding, resource
FROM latestFindings
CROSS JOIN UNNEST(resource.folders) AS folder
WHERE folder.resource_folder_display_name = 'FOLDER'

FOLDER 替换为文件夹名称。

扫描程序 Logging Scanner 的当前发现结果

WITH latestFindings AS (
    SELECT * EXCEPT(row)
    FROM (
        SELECT *, ROW_NUMBER() OVER(
            PARTITION BY finding_id
            ORDER BY event_time DESC, finding.mute_update_time DESC
        ) AS row
        FROM `PROJECT_ID.DATASET.findings`
    ) WHERE row = 1
)
SELECT finding_id, event_time, finding
FROM latestFindings
CROSS JOIN UNNEST(finding.source_properties) AS source_property
WHERE source_property.key = "ScannerName"
  AND source_property.value = "LOGGING_SCANNER"

类型为 Persistence: IAM Anomalous Grant 的当前活跃发现结果

WITH latestFindings AS(
    SELECT * EXCEPT(row)
    FROM (
        SELECT *, ROW_NUMBER() OVER(
            PARTITION BY finding_id
            ORDER BY event_time DESC, finding.mute_update_time DESC
        ) AS row
        FROM `PROJECT_ID.DATASET.findings`
    ) WHERE row = 1
)
SELECT finding_id, event_time, finding
FROM latestFindings
WHERE finding.state = "ACTIVE"
  AND finding.category = "Persistence: IAM Anomalous Grant"

将给定类型的活跃发现结果与 Cloud Audit Logs 关联

此示例查询会显示授权者在异常 IAM 授权操作之前和之后的时段内所执行的管理员活动操作序列,以便使用 Cloud Audit Logs 中的 Event Threat Detection 功能来调查异常 IAM 授权的发现结果。以下查询将发现结果时间戳之前 1 小时到之后 1 小时的管理员活动日志进行关联。

WITH latestFindings AS(
    SELECT * EXCEPT(row)
    FROM (
        SELECT *, ROW_NUMBER() OVER(
            PARTITION BY finding_id
            ORDER BY event_time DESC, finding.mute_update_time DESC
        ) AS row
        FROM `PROJECT_ID.DATASET.findings`
    ) WHERE row = 1
)
SELECT
  finding_id,
  ANY_VALUE(event_time) as event_time,
  ANY_VALUE(finding.access.principal_email) as grantor,
  JSON_VALUE_ARRAY(ANY_VALUE(finding.source_properties_json), '$.properties.sensitiveRoleGrant.members') as grantees,
  ARRAY_AGG(
    STRUCT(
      timestamp,
      IF(timestamp < event_time, 'before', 'after') as timeline,
      protopayload_auditlog.methodName,
      protopayload_auditlog.resourceName,
      protopayload_auditlog.serviceName
    )
    ORDER BY timestamp ASC
  ) AS recent_activity
FROM (
  SELECT
    f.*,
    a.*,
  FROM latestFindings AS f
  LEFT JOIN `PROJECT_ID.DATASET.cloudaudit_googleapis_com_activity` AS a
  ON a.protopayload_auditlog.authenticationInfo.principalEmail = f.finding.access.principal_email
  WHERE f.finding.state = "ACTIVE"
    AND f.finding.category = "Persistence: IAM Anomalous Grant"
    AND a.timestamp >= TIMESTAMP_SUB(f.event_time, INTERVAL 1 HOUR)
    AND a.timestamp <= TIMESTAMP_ADD(f.event_time, INTERVAL 1 HOUR)
  )
GROUP BY
  finding_id
ORDER BY
  event_time DESC

输出类似于以下内容:

查询结果的屏幕截图,其中显示了发现结果及其关联的审核日志

在 Looker 数据洞察中创建图表

利用 Looker 数据洞察,您可以创建交互式报告和信息中心。

一般来说,通过 Looker 数据洞察访问 BigQuery 时会产生 BigQuery 使用费。如需了解详情,请参阅使用 Looker 数据洞察直观呈现 BigQuery 数据

要创建按严重性和类别直观呈现发现结果数据的图表,请执行以下操作:

  1. 打开 Looker 数据洞察并登录。
  2. 如果出现提示,请提供其他信息并设置其他偏好设置。阅读服务条款,如果满意,则继续。
  3. 点击空白报告
  4. 连接到数据标签页上,点击 BigQuery 卡。
  5. 如果出现提示,请授权 Looker 数据洞察访问 BigQuery 项目。
  6. 连接到您的发现结果数据:
    1. 项目部分,为数据集选择该项目。或者,在我的项目标签页中,输入您的项目 ID 以进行搜索。
    2. 对于数据集,点击数据集的名称。
    3. 对于表格,点击发现结果
    4. 点击添加
    5. 在对话框中,点击添加到报告
  7. 添加报告后,点击添加图表
  8. 点击堆叠柱形图,然后点击要放置的区域。

    图表选择界面的屏幕截图

  9. 图表 > 条形图窗格的数据标签页上,设置以下字段:

    1. 维度字段中,选择 finding.severity
    2. 细分维度字段中,选择 finding.category
    按严重性分类并按类别进一步分类的发现结果图表的屏幕截图

报告已更新,以显示按严重程度和类别拆分的结果的多个列。

后续步骤

了解如何在 BigQuery 中运行查询