使用 Cloud Storage 文件集

您可以使用 Data Catalog API 创建和管理 Cloud Storage 文件集条目(在本文档的其余部分中称为“文件集”)。

文件集如何与 Data Catalog 配合使用

在查看文件集之前,我们先了解一下主要的 Data Catalog 概念以及文件集如何与 Data Catalog 配合使用。

条目
Data Catalog entry表示 Google Cloud 资源,例如 BigQuery 数据集或表、Pub/Sub 主题或 Cloud Storage 文件集。您可以创建、搜索和管理 Data Catalog 条目,以探索和管理 Google Cloud 数据资源。文件集条目可以包含可选的文件集数据架构。
条目组

条目包含在 entry group 中。条目组包含逻辑相关的条目以及 Cloud Identity and Access Management 政策,用于指定可以在条目组中创建、修改和查看条目的用户。

Data Catalog 会自动为 BigQuery 条目 ("@bigquery") 和 Pub/Sub 主题 ("@pubsub") 创建一个条目组。您可以创建自己的条目组,以包含您的 Cloud Storage 文件集条目以及与这些条目关联的 IAM 政策。与条目一样,您可以搜索条目组。

以下插图描绘了预定义条目组和用户创建的条目组如何与 Data Catalog 数据模型配合使用。

Cloud Storage 文件集

Cloud Storage 文件集是用户创建的条目组中的一个条目。它由一个或多个文件模式定义,用于指定一个或多个 Cloud Storage 文件的集合。

文件模式要求:

创建条目组和文件集

文件集必须放置在用户创建的条目组中。如果您尚未创建条目组,请先创建一个条目组,然后在该条目组中创建文件集。您可以对条目组设置 IAM 政策,以定义谁有权访问该条目组中的文件集和其他条目。

gcloud 命令

1.创建一个条目组

使用 gcloud data-catalog entry-groups create 命令创建具有附加架构和说明的条目组。

例如:

gcloud data-catalog entry-groups create my_entrygroup \
    --location=us-central1

2.在条目组中创建文件集

使用 gcloud data-catalog entry create 命令在条目组中创建文件集。下面的这个 gcloud 命令示例创建了一个文件集条目,其中包含文件集数据的架构。

gcloud data-catalog entries create my_fileset_entry \  
    --location=us-central1 \  
    --entry-group=my_entrygroup \  
    --type=FILESET \  
    --gcs-file-patterns=gs://my-bucket/*.csv \  
    --schema-from-file=path_to_schema_file \  
    --description="Fileset description ..."

标志说明:

  • --gcs-file-patterns:请参阅文件模式要求
  • --schema-from-file:以下示例展示了 --schema-from-file 标志接受的架构文本文件的 JSON 格式。
    [
      {
        "column": "first_name",
        "description": "First name",
        "mode": "REQUIRED",
        "type": "STRING"
      },
      {
        "column": "last_name",
        "description": "Last name",
        "mode": "REQUIRED",
        "type": "STRING"
      },
      {
        "column": "address",
        "description": "Address",
        "mode": "REPEATED",
        "type": "STRING"
      }
    ]
    

控制台

  1. 在 Google Cloud Console 中,从创建下拉列表或 Data Catalog 界面条目组部分中点击创建条目组

  2. 填写创建条目组表单,然后点击创建

  3. 系统会打开条目组详情页面。选中文件集标签页后,然后点击创建

  4. 填写创建文件集表单。

    1. 要附加架构,请点击定义架构以打开架构表单。点击 + 添加字段可逐个添加字段,也可切换表单右上角的以文本形式修改,以 JSON 格式指定字段。
    2. 点击保存以保存架构。
  5. 点击创建以创建文件集。

Python

  1. 安装客户端库
  2. 设置应用默认凭据
  3. 运行代码
    """
    This application demonstrates how to perform core operations with the
    Data Catalog API.
    
    For more information, see the README.md and the official documentation at
    https://cloud.google.com/data-catalog/docs.
    """
    
    # -------------------------------
    # Import required modules.
    # -------------------------------
    from google.api_core.exceptions import NotFound, PermissionDenied
    from google.cloud import datacatalog_v1
    
    # -------------------------------
    # Set your Google Cloud Platform project ID.
    # -------------------------------
    project_id = 'your-project-id'
    
    # -------------------------------
    # Currently, Data Catalog stores metadata in the
    # us-central1 region.
    # -------------------------------
    location = 'us-central1'
    
    # -------------------------------
    # Use Application Default Credentials to create a new
    # Data Catalog client. GOOGLE_APPLICATION_CREDENTIALS
    # environment variable must be set with the location
    # of a service account key file.
    # -------------------------------
    datacatalog = datacatalog_v1.DataCatalogClient()
    
    # -------------------------------
    # 1. Environment cleanup: delete pre-existing data.
    # -------------------------------
    # Delete any pre-existing Entry with the same name
    # that will be used in step 3.
    expected_entry_name = datacatalog_v1.DataCatalogClient\
        .entry_path(project_id, location, 'fileset_entry_group', 'fileset_entry_id')
    
    try:
        datacatalog.delete_entry(name=expected_entry_name)
    except (NotFound, PermissionDenied):
        pass
    
    # Delete any pre-existing Entry Group with the same name
    # that will be used in step 2.
    expected_entry_group_name = datacatalog_v1.DataCatalogClient\
        .entry_group_path(project_id, location, 'fileset_entry_group')
    
    try:
        datacatalog.delete_entry_group(name=expected_entry_group_name)
    except (NotFound, PermissionDenied):
        pass
    
    # -------------------------------
    # 2. Create an Entry Group.
    # -------------------------------
    entry_group_obj = datacatalog_v1.types.EntryGroup()
    entry_group_obj.display_name = 'My Fileset Entry Group'
    entry_group_obj.description = 'This Entry Group consists of ....'
    
    entry_group = datacatalog.create_entry_group(
        parent=datacatalog_v1.DataCatalogClient.location_path(project_id, location),
        entry_group_id='fileset_entry_group',
        entry_group=entry_group_obj)
    print('Created entry group: {}'.format(entry_group.name))
    
    # -------------------------------
    # 3. Create a Fileset Entry.
    # -------------------------------
    entry = datacatalog_v1.types.Entry()
    entry.display_name = 'My Fileset'
    entry.description = 'This fileset consists of ....'
    entry.gcs_fileset_spec.file_patterns.append('gs://my_bucket/*')
    entry.type = datacatalog_v1.enums.EntryType.FILESET
    
    # Create the Schema, for example when you have a csv file.
    columns = []
    columns.append(datacatalog_v1.types.ColumnSchema(
        column='first_name',
        description='First name',
        mode='REQUIRED',
        type='STRING'))
    
    columns.append(datacatalog_v1.types.ColumnSchema(
        column='last_name',
        description='Last name',
        mode='REQUIRED',
        type='STRING'))
    
    # Create sub columns for the addresses parent column
    subcolumns = []
    
    subcolumns.append(datacatalog_v1.types.ColumnSchema(
        column='city',
        description='City',
        mode='NULLABLE',
        type='STRING'))
    
    subcolumns.append(datacatalog_v1.types.ColumnSchema(
        column='state',
        description='State',
        mode='NULLABLE',
        type='STRING'))
    
    columns.append(datacatalog_v1.types.ColumnSchema(
        column='addresses',
        description='Addresses',
        mode='REPEATED',
        subcolumns = subcolumns,
        type='RECORD'))
    
    entry.schema.columns.extend(columns)
    
    entry = datacatalog.create_entry(
        parent=entry_group.name,
        entry_id='fileset_entry_id',
        entry=entry)
    print('Created entry: {}'.format(entry.name))
      

Java

  1. 安装客户端库
  2. 设置应用默认凭据
  3. 运行代码
    /*
    This application demonstrates how to perform core operations with the
    Data Catalog API.
    
    For more information, see the README.md and the official documentation at
    https://cloud.google.com/data-catalog/docs.
    */
    
    package com.example.datacatalog;
    
    import com.google.cloud.datacatalog.v1.ColumnSchema;
    import com.google.cloud.datacatalog.v1.CreateEntryGroupRequest;
    import com.google.cloud.datacatalog.v1.CreateEntryRequest;
    import com.google.cloud.datacatalog.v1.Entry;
    import com.google.cloud.datacatalog.v1.EntryGroup;
    import com.google.cloud.datacatalog.v1.EntryGroupName;
    import com.google.cloud.datacatalog.v1.EntryName;
    import com.google.cloud.datacatalog.v1.EntryType;
    import com.google.cloud.datacatalog.v1.GcsFilesetSpec;
    import com.google.cloud.datacatalog.v1.LocationName;
    import com.google.cloud.datacatalog.v1.Schema;
    
    import com.google.cloud.datacatalog.v1.DataCatalogClient;
    
    public class CreateFilesetEntry {
    
      public static void createEntry() {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "my-project-id";
        String entryGroupId = "fileset_entry_group";
        String entryId = "fileset_entry_id";
        createEntry(projectId, entryGroupId, entryId);
      }
    
      /**
       * Create Fileset Entry
       */
      public static void createEntry(String projectId, String entryGroupId, String entryId) {
    
        // -------------------------------
        // Currently, Data Catalog stores metadata in the
        // us-central1 region.
        // -------------------------------
        String location = "us-central1";
    
        // Initialize client that will be used to send requests. This client only needs to be created
        // once, and can be reused for multiple requests. After completing all of your requests, call
        // the "close" method on the client to safely clean up any remaining background resources.
        try (DataCatalogClient dataCatalogClient = DataCatalogClient.create()) {
    
          // -------------------------------
          // 1. Environment cleanup: delete pre-existing data.
          // -------------------------------
          // Delete any pre-existing Entry with the same name
          // that will be used in step 3.
          try {
            dataCatalogClient.deleteEntry(
                EntryName.of(projectId, location, entryGroupId, entryId).toString());
          } catch (Exception e) {
            System.out.println("Entry does not exist.");
          }
    
          // Delete any pre-existing Entry Group with the same name
          // that will be used in step 2.
          try {
            dataCatalogClient.deleteEntryGroup(
                EntryGroupName.of(projectId, location, entryGroupId).toString());
          } catch (Exception e) {
            System.out.println("Entry Group does not exist.");
          }
    
          // -------------------------------
          // 2. Create an Entry Group.
          // -------------------------------
          // Construct the EntryGroup for the EntryGroup request.
          EntryGroup entryGroup =
              EntryGroup.newBuilder()
                  .setDisplayName("My Fileset Entry Group")
                  .setDescription("This Entry Group consists of ....")
                  .build();
    
          // Construct the EntryGroup request to be sent by the client.
          CreateEntryGroupRequest entryGroupRequest = CreateEntryGroupRequest.newBuilder()
              .setParent(LocationName.of(projectId, location).toString())
              .setEntryGroupId(entryGroupId)
              .setEntryGroup(entryGroup)
              .build();
    
          // Use the client to send the API request.
          EntryGroup entryGroupResponse = dataCatalogClient.createEntryGroup(entryGroupRequest);
    
          System.out.printf("\nEntry Group created with name: %s\n", entryGroupResponse.getName());
    
          // -------------------------------
          // 3. Create a Fileset Entry.
          // -------------------------------
          // Construct the Entry for the Entry request.
          Entry entry =
              Entry.newBuilder()
                  .setDisplayName("My Fileset")
                  .setDescription("This fileset consists of ....")
                  .setGcsFilesetSpec(
                      GcsFilesetSpec.newBuilder().addFilePatterns("gs://my_bucket/*").build())
                  .setSchema(
                      Schema.newBuilder()
                          .addColumns(
                              ColumnSchema.newBuilder()
                                  .setColumn("first_name")
                                  .setDescription("First name")
                                  .setMode("REQUIRED")
                                  .setType("STRING")
                                  .build())
                          .addColumns(
                              ColumnSchema.newBuilder()
                                  .setColumn("last_name")
                                  .setDescription("Last name")
                                  .setMode("REQUIRED")
                                  .setType("STRING")
                                  .build())
                          .addColumns(
                              ColumnSchema.newBuilder()
                                  .setColumn("addresses")
                                  .setDescription("Addresses")
                                  .setMode("REPEATED")
                                  .setType("RECORD")
                                  .addSubcolumns(
                                      ColumnSchema.newBuilder()
                                          .setColumn("city")
                                          .setDescription("City")
                                          .setMode("NULLABLE")
                                          .setType("STRING")
                                          .build())
                                  .addSubcolumns(
                                      ColumnSchema.newBuilder()
                                          .setColumn("state")
                                          .setDescription("State")
                                          .setMode("NULLABLE")
                                          .setType("STRING")
                                          .build())
                                  .build())
                          .build())
                  .setType(EntryType.FILESET)
                  .build();
    
          // Construct the Entry request to be sent by the client.
          CreateEntryRequest entryRequest = CreateEntryRequest.newBuilder()
              .setParent(entryGroupResponse.getName())
              .setEntryId(entryId)
              .setEntry(entry)
              .build();
    
          // Use the client to send the API request.
          Entry entryResponse = dataCatalogClient.createEntry(entryRequest);
    
          System.out.printf("\nEntry created with name: %s\n", entryResponse.getName());
    
        } catch (Exception e) {
          System.out.println("Error in create entry process:\n" + e.toString());
        }
      }
    }
    

Node.js

  1. 安装客户端库
  2. 设置应用默认凭据
  3. 运行代码
    /**
     * This application demonstrates how to create an Entry Group and a fileset
     * Entry with the Cloud Data Catalog API.
    
     * For more information, see the README.md under /datacatalog and the
     * documentation at https://cloud.google.com/data-catalog/docs.
     */
    const main = async (
      projectId = process.env.GCLOUD_PROJECT,
      entryGroupId,
      entryId
    ) => {
        // -------------------------------
      // Import required modules.
      // -------------------------------
      const { DataCatalogClient } = require('@google-cloud/datacatalog').v1;
      const datacatalog = new DataCatalogClient();
    
      // -------------------------------
      // Currently, Data Catalog stores metadata in the
      // us-central1 region.
      // -------------------------------
      const location = "us-central1";
    
      // -------------------------------
      // 1. Environment cleanup: delete pre-existing data.
      // -------------------------------
      // Delete any pre-existing Entry with the same name
      // that will be used in step 3.
      try {
        const formattedName = datacatalog.entryPath(projectId, location, entryGroupId, entryId);
        await datacatalog.deleteEntry({ name: formattedName });
      } catch (err) {
        console.log('Entry does not exist.');
      }
    
      // Delete any pre-existing Entry Group with the same name
      // that will be used in step 2.
      try {
        const formattedName = datacatalog.entryGroupPath(projectId, location, entryGroupId);
        await datacatalog.deleteEntryGroup({ name: formattedName });
      } catch (err) {
        console.log('Entry Group does not exist.');
      }
    
      // -------------------------------
      // 2. Create an Entry Group.
      // -------------------------------
      // Construct the EntryGroup for the EntryGroup request.
      const entryGroup = {
        displayName: 'My Fileset Entry Group',
        description: 'This Entry Group consists of ....'
      }
    
      // Construct the EntryGroup request to be sent by the client.
      const entryGroupRequest = {
        parent: datacatalog.locationPath(projectId, location),
        entryGroupId: entryGroupId,
        entryGroup: entryGroup,
      };
    
      // Use the client to send the API request.
      await datacatalog.createEntryGroup(entryGroupRequest)
    
      // -------------------------------
      // 3. Create a Fileset Entry.
      // -------------------------------
      // Construct the Entry for the Entry request.
      const FILESET_TYPE = 4
    
      const entry = {
        displayName: 'My Fileset',
        description: 'This fileset consists of ....',
        gcsFilesetSpec: {filePatterns: ['gs://my_bucket/*']},
        schema: {
          columns: [
            {
              column: 'city',
              description: 'City',
              mode: 'NULLABLE',
              type: 'STRING',
            },
            {
              column: 'state',
              description: 'State',
              mode: 'NULLABLE',
              type: 'STRING',
            },
            {
              column: 'addresses',
              description: 'Addresses',
              mode: 'REPEATED',
              subcolumns: [
                {
                  column: 'city',
                  description: 'City',
                  mode: 'NULLABLE',
                  type: 'STRING',
                },
                {
                  column: 'state',
                  description: 'State',
                  mode: 'NULLABLE',
                  type: 'STRING',
                },
              ],
              type: 'RECORD',
            },
          ],
        },
        type: FILESET_TYPE,
      };
    
      // Construct the Entry request to be sent by the client.
      const request = {
        parent: datacatalog.entryGroupPath(projectId, location, entryGroupId),
        entryId: entryId,
        entry: entry,
      };
    
      // Use the client to send the API request.
      const [response] = await datacatalog.createEntry(request)
    
      console.log(response);
    };
    // [END datacatalog_create_fileset_quickstart_tag]
    
    // node createFilesetEntry.js   
    main(...process.argv.slice(2));
    

REST 和命令行

如果您无法使用针对您的语言的 Cloud 客户端库或者您想要使用 REST 请求来测试 API,请参阅以下示例并参阅 Data Catalog REST API entryGroups.createentryGroups.entries.create 文档。

1.创建一个条目组

在使用下面的请求数据之前,请先进行以下替换:

  • project-id:您的 GCP 项目 ID
  • entryGroupId:ID 必须以字母或下划线开头,只能包含英文字母、数字和下划线,长度不超过 64 个字符。
  • displayName:条目组的文本名称。

HTTP 方法和网址:

POST https://datacatalog.googleapis.com/v1/projects/project-id/locations/us-central1/entryGroups?entryGroupId=entryGroupId

请求 JSON 正文:

{
  "displayName": "Entry Group display name"
}

如需发送您的请求,请展开以下选项之一:

您应会收到如下所示的 JSON 响应:

{
  "name": "projects/my_projectid/locations/us-central1/entryGroups/my_entry_group",
  "displayName": "Entry Group display name",
  "dataCatalogTimestamps": {
    "createTime": "2019-10-19T16:35:50.135Z",
    "updateTime": "2019-10-19T16:35:50.135Z"
  }
}

2.在条目组中创建文件集

在使用下面的请求数据之前,请先进行以下替换:

  • project_id:您的 GCP 项目 ID
  • entryGroupId:现有 entryGroup 的 ID。此 sntryGroup 中将创建文件集。
  • entryId:新文件集的 entryId。ID 必须以字母或下划线开头,只能包含英文字母、数字和下划线,长度不超过 64 个字符。
  • description:文件集说明。
  • displayName:文件集条目的文本名称。
  • filePatterns:必须以“gs://bucket_name/”开头。请参阅文件模式要求
  • schema:文件集架构。

    JSON 架构示例
    { ...
      "schema": {
        "columns": [
          {
            "column": "first_name",
            "description": "First name",
            "mode": "REQUIRED",
            "type": "STRING"
          },
          {
            "column": "last_name",
            "description": "Last name",
            "mode": "REQUIRED",
            "type": "STRING"
          },
          {
            "column": "address",
            "description": "Address",
            "mode": "REPEATED",
            "subcolumns": [
              {
                "column": "city",
                "description": "City",
                "mode": "NULLABLE",
                "type": "STRING"
              },
              {
                "column": "state",
                "description": "State",
                "mode": "NULLABLE",
                "type": "STRING"
              }
            ],
            "type": "RECORD"
          }
        ]
      }
    ...
    }
    

HTTP 方法和网址:

POST https://datacatalog.googleapis.com/v1/projects/project_id/locations/us-central1/entryGroups/entryGroupId/entries?entryId=entryId

请求 JSON 正文:

{
  "description": "Fileset description.",
  "displayName": "Display name",
  "gcsFilesetSpec": {
    "filePatterns": [
      "gs://bucket_name/file_pattern"
    ]
  },
  "type": "FILESET",
  "schema": { schema }
}

如需发送您的请求,请展开以下选项之一:

您应会收到如下所示的 JSON 响应:

{
  "name": "projects/my_project_id/locations/us-central1/entryGroups/my_entryGroup_id/entries/my_entry_id",
  "type": "FILESET",
  "displayName": "My Fileset",
  "description": "My Fileset description.",
  "schema": {
    "columns": [
      {
        "type": "STRING",
        "description": "First name",
        "mode": "REQUIRED",
        "column": "first_name"
      },
      {
        "type": "STRING",
        "description": "Last name",
        "mode": "REQUIRED",
        "column": "last_name"
      },
      {
        "type": "RECORD",
        "description": "Address",
        "mode": "REPEATED",
        "column": "address",
        "subcolumns": [
          {
            "type": "STRING",
            "description": "City",
            "mode": "NULLABLE",
            "column": "city"
          },
          {
            "type": "STRING",
            "description": "State",
            "mode": "NULLABLE",
            "column": "state"
          }
        ]
      }
    ]
  },
  "gcsFilesetSpec": {
    "filePatterns": [
      "gs://my_bucket_name/chicago_taxi_trips/csv/shard-*.csv"
    ]
  },
  "sourceSystemTimestamps": {
    "createTime": "2019-10-23T23:11:26.326Z",
    "updateTime": "2019-10-23T23:11:26.326Z"
  },
"linkedResource": "//datacatalog.googleapis.com/projects/my_project_id/locations/us-central1/entryGroups/my_entryGroup_id/entries/my_entry_id"
}

IAM 角色、权限和政策

Data Catalog 定义了 entry 和 entryGroup 角色,以帮助管理文件集和其他 Data Catalog 资源的权限。

Entry 角色 说明
dataCatalog.entryOwner 特定条目或条目组的所有者。
  • 权限:
    • datacatalog.entries.(*)
    • datacatalog.entryGroups.get
  • 适用性:
    • 组织、项目和 entryGroup。
dataCatalog.entryViewer 可以查看 entry 和 entryGroup 的详细信息。
  • 权限
    • datacatalog.entries.get
    • datacatalog.entryGroups.get
  • 适用性:
    • 组织、项目和 entryGroup。
entryGroup 角色 说明
dataCatalog.entryGroupOwner 特定 entryGroup 的所有者。
  • 权限:
    • datacatalog.entryGroups.(*)
    • datacatalog entries.(*)
  • 适用性:
    • 组织、项目和 entryGroup 级层。
dataCatalog.entryGroupCreator 可以在项目中创建 entryGroup。entryGroup 的创建者会自动获得 dataCatalog.entryGroupOwner 角色。
  • 权限
    • datacatalog.entryGroups。(get | create)
  • 适用性:
    • 组织和项目级层。

设置 IAM 政策

拥有 datacatalog.<resource>.setIamPolicy 权限的用户可以对 Data Catalog 条目组和其他 Data Catalog 资源设置 IAM 政策(请参阅 Data Catalog 角色)。

gcloud 命令

控制台

导航至 Data Catalog 界面条目组详情页面,然后使用右侧的 IAM 面板授予或撤消权限。

授予条目组角色

示例 1:

如果公司的文件集具有不同的业务环境,则会创建单独的 order-filesuser-files 条目组:

公司将向用户授予 order-files 的 EntryGroup Viewer 角色,这意味着他们只能搜索该条目组中包含的条目。他们的搜索结果不会返回 user-files 条目组中的条目。

示例 2:

公司只能将 EntryGroup Viewer 角色授予 project_entry_group 项目中的用户。用户只能查看该项目中的条目。

搜索文件集

用户可以使用 type 构面限制 Data Catalog 中的搜索范围。type=entry_group 将搜索查询限制为条目组,而 type=fileset 仅搜索文件集。type 构面可与其他构面(如 projectid)结合使用。

gcloud 命令

  • 搜索项目中的条目组:

    gcloud data-catalog search \  
        --include-project-ids=my-project
        "projectid=my-project type=entry_group"
    

  • 搜索您可以访问的所有条目组:

    gcloud data-catalog search \  
        --include-project-ids=my-project
        "type=entry_group"
    

  • 搜索项目中的文件集:

    gcloud data-catalog search \  
        --include-project-ids=my-project
        "type=entry.fileset"
    

  • 搜索项目中的文件集 - 简化了语法:

    gcloud data-catalog search \  
        --include-project-ids=my-project
        "type=fileset"