使用文件集条目显示 Cloud Storage 中的文件

您可以使用 Data Catalog API 创建和搜索 Cloud Storage 文件集条目(在本文档的其余部分中称为“文件集”)。

文件集

Cloud Storage 文件集是用户创建的条目组中的一个条目。如需了解详情,请参阅条目和条目组

它由一个或多个文件模式定义,用于指定一个或多个 Cloud Storage 文件的集合。

文件模式要求:

  • 文件模式必须以 gs://bucket_name/ 开头。
  • 存储桶名称必须符合 Cloud Storage 存储桶名称要求
  • 文件模式的文件夹和文件部分允许使用通配符,但存储桶名称中不允许使用通配符。如需查看示例,请参阅:
  • 每个文件集必须至少有一个文件集模式,且不得超过 500 个。

您可以使用 Dataflow SQL 查询 Data Catalog 文件集,但前提是它们已定义架构,并且仅包含没有标题行的 CSV 文件。

创建条目组和文件集

文件集必须放置在用户创建的条目组中。如果您尚未创建条目组,请先创建一个条目组,然后在该条目组中创建文件集。您可以对条目组设置 IAM 政策,以定义谁有权访问该条目组中的文件集和其他条目。

控制台

控制台

  1. 转到 Dataplex > 条目组页面。

    转到 Dataplex 条目组

  2. 点击创建条目组

  3. 填写创建条目组表单,然后点击创建

  4. 系统会打开条目组详情页面。选中条目 标签页后,点击创建

  5. 填写创建文件集表单。

    1. 要附加架构,请点击定义架构以打开架构表单。点击 + 添加字段可逐个添加字段,也可切换表单右上角的以文本形式修改,以 JSON 格式指定字段。
    2. 点击保存以保存架构。
  6. 点击创建以创建文件集。

gcloud

gcloud

1. 创建一个条目组

使用 gcloud data-catalog entry-groups create 命令创建具有附加架构和说明的条目组。

示例:

gcloud data-catalog entry-groups create my_entrygroup \
    --location=us-central1

2. 在条目组中创建文件集

使用 gcloud data-catalog entry create 命令在条目组中创建文件集。下面的这个 gcloud 命令示例创建了一个文件集条目,其中包含文件集数据的架构。

gcloud data-catalog entries create my_fileset_entry \  
    --location=us-central1 \  
    --entry-group=my_entrygroup \  
    --type=FILESET \  
    --gcs-file-patterns=gs://my-bucket/*.csv \  
    --schema-from-file=path_to_schema_file \  
    --description="Fileset description ..."

标志说明:

  • --gcs-file-patterns:请参阅文件模式要求
  • --schema-from-file:以下示例展示了 --schema-from-file 标志接受的架构文本文件的 JSON 格式。
    [
      {
        "column": "first_name",
        "description": "First name",
        "mode": "REQUIRED",
        "type": "STRING"
      },
      {
        "column": "last_name",
        "description": "Last name",
        "mode": "REQUIRED",
        "type": "STRING"
      },
      {
        "column": "address",
        "description": "Address",
        "mode": "REPEATED",
        "type": "STRING"
      }
    ]
    

Java

在试用此示例之前,请按照 Data Catalog 快速入门:使用客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Data Catalog Java API 参考文档

如需向 Data Catalog 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.cloud.datacatalog.v1.ColumnSchema;
import com.google.cloud.datacatalog.v1.CreateEntryRequest;
import com.google.cloud.datacatalog.v1.DataCatalogClient;
import com.google.cloud.datacatalog.v1.Entry;
import com.google.cloud.datacatalog.v1.EntryGroupName;
import com.google.cloud.datacatalog.v1.EntryType;
import com.google.cloud.datacatalog.v1.GcsFilesetSpec;
import com.google.cloud.datacatalog.v1.Schema;
import java.io.IOException;

// Sample to create file set entry
public class CreateFilesetEntry {

  public static void main(String[] args) throws IOException {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "my-project-id";
    String entryGroupId = "fileset_entry_group";
    String entryId = "fileset_entry_id";
    createFilesetEntry(projectId, entryGroupId, entryId);
  }

  // Create Fileset Entry.
  public static void createFilesetEntry(String projectId, String entryGroupId, String entryId)
      throws IOException {
    // Currently, Data Catalog stores metadata in the us-central1 region.
    String location = "us-central1";

    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (DataCatalogClient dataCatalogClient = DataCatalogClient.create()) {
      // Construct the Entry for the Entry request.
      Entry entry =
          Entry.newBuilder()
              .setDisplayName("My Fileset")
              .setDescription("This fileset consists of ....")
              .setGcsFilesetSpec(
                  GcsFilesetSpec.newBuilder().addFilePatterns("gs://cloud-samples-data/*").build())
              .setSchema(
                  Schema.newBuilder()
                      .addColumns(
                          ColumnSchema.newBuilder()
                              .setColumn("first_name")
                              .setDescription("First name")
                              .setMode("REQUIRED")
                              .setType("STRING")
                              .build())
                      .addColumns(
                          ColumnSchema.newBuilder()
                              .setColumn("last_name")
                              .setDescription("Last name")
                              .setMode("REQUIRED")
                              .setType("STRING")
                              .build())
                      .addColumns(
                          ColumnSchema.newBuilder()
                              .setColumn("addresses")
                              .setDescription("Addresses")
                              .setMode("REPEATED")
                              .setType("RECORD")
                              .addSubcolumns(
                                  ColumnSchema.newBuilder()
                                      .setColumn("city")
                                      .setDescription("City")
                                      .setMode("NULLABLE")
                                      .setType("STRING")
                                      .build())
                              .addSubcolumns(
                                  ColumnSchema.newBuilder()
                                      .setColumn("state")
                                      .setDescription("State")
                                      .setMode("NULLABLE")
                                      .setType("STRING")
                                      .build())
                              .build())
                      .build())
              .setType(EntryType.FILESET)
              .build();

      // Construct the Entry request to be sent by the client.
      CreateEntryRequest entryRequest =
          CreateEntryRequest.newBuilder()
              .setParent(EntryGroupName.of(projectId, location, entryGroupId).toString())
              .setEntryId(entryId)
              .setEntry(entry)
              .build();

      // Use the client to send the API request.
      Entry entryCreated = dataCatalogClient.createEntry(entryRequest);
      System.out.printf("Entry created with name: %s", entryCreated.getName());
    }
  }
}

Node.js

在试用此示例之前,请按照 Data Catalog 快速入门:使用客户端库中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Data Catalog Node.js API 参考文档

如需向 Data Catalog 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

// Import the Google Cloud client library.
const {DataCatalogClient} = require('@google-cloud/datacatalog').v1;
const datacatalog = new DataCatalogClient();

async function createFileset() {
  // Create a fileset within an entry group.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const projectId = 'my_project';
  // const entryGroupId = 'my_entry_group';
  // const entryId = 'my_entry';

  // Currently, Data Catalog stores metadata in the us-central1 region.
  const location = 'us-central1';

  // Delete any pre-existing Entry with the same name that will be used
  // when creating the new Entry.
  try {
    const formattedName = datacatalog.entryPath(
      projectId,
      location,
      entryGroupId,
      entryId
    );
    await datacatalog.deleteEntry({name: formattedName});
  } catch (err) {
    console.log('Entry does not exist.');
  }

  // Delete any pre-existing Entry Group with the same name
  // that will be used to create the new Entry Group.
  try {
    const formattedName = datacatalog.entryGroupPath(
      projectId,
      location,
      entryGroupId
    );
    await datacatalog.deleteEntryGroup({name: formattedName});
  } catch (err) {
    console.log('Entry Group does not exist.');
  }

  // Construct the Entry Group for the Entry Group request.
  const entryGroup = {
    displayName: 'My Fileset Entry Group',
    description: 'This Entry Group consists of ....',
  };

  // Construct the Entry Group request to be sent by the client.
  const entryGroupRequest = {
    parent: datacatalog.locationPath(projectId, location),
    entryGroupId: entryGroupId,
    entryGroup: entryGroup,
  };

  // Use the client to send the API request.
  await datacatalog.createEntryGroup(entryGroupRequest);

  // Construct the Entry for the Entry request.
  const FILESET_TYPE = 4;

  const entry = {
    displayName: 'My Fileset',
    description: 'This fileset consists of ....',
    gcsFilesetSpec: {filePatterns: ['gs://my_bucket/*']},
    schema: {
      columns: [
        {
          column: 'city',
          description: 'City',
          mode: 'NULLABLE',
          type: 'STRING',
        },
        {
          column: 'state',
          description: 'State',
          mode: 'NULLABLE',
          type: 'STRING',
        },
        {
          column: 'addresses',
          description: 'Addresses',
          mode: 'REPEATED',
          subcolumns: [
            {
              column: 'city',
              description: 'City',
              mode: 'NULLABLE',
              type: 'STRING',
            },
            {
              column: 'state',
              description: 'State',
              mode: 'NULLABLE',
              type: 'STRING',
            },
          ],
          type: 'RECORD',
        },
      ],
    },
    type: FILESET_TYPE,
  };

  // Construct the Entry request to be sent by the client.
  const request = {
    parent: datacatalog.entryGroupPath(projectId, location, entryGroupId),
    entryId: entryId,
    entry: entry,
  };

  // Use the client to send the API request.
  const [response] = await datacatalog.createEntry(request);

  console.log(`Name: ${response.name}`);
  console.log(`Display name: ${response.displayName}`);
  console.log(`Type: ${response.type}`);
}
createFileset();

Python

在试用此示例之前,请按照 Data Catalog 快速入门:使用客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Data Catalog Python API 参考文档

如需向 Data Catalog 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

# Import required modules.
from google.cloud import datacatalog_v1

# TODO: Set these values before running the sample.
project_id = "project_id"
fileset_entry_group_id = "entry_group_id"
fileset_entry_id = "entry_id"

# For all regions available, see:
# https://cloud.google.com/data-catalog/docs/concepts/regions
location = "us-central1"

datacatalog = datacatalog_v1.DataCatalogClient()

# Create an Entry Group.
entry_group_obj = datacatalog_v1.types.EntryGroup()
entry_group_obj.display_name = "My Fileset Entry Group"
entry_group_obj.description = "This Entry Group consists of ...."

entry_group = datacatalog.create_entry_group(
    parent=datacatalog_v1.DataCatalogClient.common_location_path(
        project_id, location
    ),
    entry_group_id=fileset_entry_group_id,
    entry_group=entry_group_obj,
)
print(f"Created entry group: {entry_group.name}")

# Create a Fileset Entry.
entry = datacatalog_v1.types.Entry()
entry.display_name = "My Fileset"
entry.description = "This fileset consists of ...."
entry.gcs_fileset_spec.file_patterns.append("gs://my_bucket/*.csv")
entry.type_ = datacatalog_v1.EntryType.FILESET

# Create the Schema, for example when you have a csv file.
entry.schema.columns.append(
    datacatalog_v1.types.ColumnSchema(
        column="first_name",
        description="First name",
        mode="REQUIRED",
        type_="STRING",
    )
)

entry.schema.columns.append(
    datacatalog_v1.types.ColumnSchema(
        column="last_name", description="Last name", mode="REQUIRED", type_="STRING"
    )
)

# Create the addresses parent column
addresses_column = datacatalog_v1.types.ColumnSchema(
    column="addresses", description="Addresses", mode="REPEATED", type_="RECORD"
)

# Create sub columns for the addresses parent column
addresses_column.subcolumns.append(
    datacatalog_v1.types.ColumnSchema(
        column="city", description="City", mode="NULLABLE", type_="STRING"
    )
)

addresses_column.subcolumns.append(
    datacatalog_v1.types.ColumnSchema(
        column="state", description="State", mode="NULLABLE", type_="STRING"
    )
)

entry.schema.columns.append(addresses_column)

entry = datacatalog.create_entry(
    parent=entry_group.name, entry_id=fileset_entry_id, entry=entry
)
print(f"Created fileset entry: {entry.name}")

REST 和命令行

REST

如果您无法使用针对您的语言的 Cloud 客户端库或者您想要使用 REST 请求来测试 API,请参阅以下示例并参阅 Data Catalog REST API entryGroups.createentryGroups.entries.create 文档。

1. 创建一个条目组

在使用任何请求数据之前,请先进行以下替换:

  • project-id:您的 Google Cloud 项目 ID
  • entryGroupId:ID 必须以字母或下划线开头,只能包含英文字母、数字和下划线,长度不超过 64 个字符。
  • displayName:条目组的文本名称。

HTTP 方法和网址:

POST https://datacatalog.googleapis.com/v1/projects/project-id/locations/region/entryGroups?entryGroupId=entryGroupId

请求 JSON 正文:

{
  "displayName": "Entry Group display name"
}

如需发送您的请求,请展开以下选项之一:

您应该收到类似以下内容的 JSON 响应:

{
  "name": "projects/my_projectid/locations/us-central1/entryGroups/my_entry_group",
  "displayName": "Entry Group display name",
  "dataCatalogTimestamps": {
    "createTime": "2019-10-19T16:35:50.135Z",
    "updateTime": "2019-10-19T16:35:50.135Z"
  }
}

2. 在条目组中创建文件集

在使用任何请求数据之前,请先进行以下替换:

  • project_id:您的 Google Cloud 项目 ID
  • entryGroupId:现有 entryGroup 的 ID。此 sntryGroup 中将创建文件集。
  • entryId:新文件集的 entryId。ID 必须以字母或下划线开头,只能包含英文字母、数字和下划线,长度不超过 64 个字符。
  • description:文件集说明。
  • displayName:文件集条目的文本名称。
  • filePatterns:必须以“gs://bucket_name/”开头。请参阅文件模式要求
  • schema:文件集架构。

    JSON 架构示例
    { ...
      "schema": {
        "columns": [
          {
            "column": "first_name",
            "description": "First name",
            "mode": "REQUIRED",
            "type": "STRING"
          },
          {
            "column": "last_name",
            "description": "Last name",
            "mode": "REQUIRED",
            "type": "STRING"
          },
          {
            "column": "address",
            "description": "Address",
            "mode": "REPEATED",
            "subcolumns": [
              {
                "column": "city",
                "description": "City",
                "mode": "NULLABLE",
                "type": "STRING"
              },
              {
                "column": "state",
                "description": "State",
                "mode": "NULLABLE",
                "type": "STRING"
              }
            ],
            "type": "RECORD"
          }
        ]
      }
    ...
    }
    

HTTP 方法和网址:

POST https://datacatalog.googleapis.com/v1/projects/project_id/locations/region/entryGroups/entryGroupId/entries?entryId=entryId

请求 JSON 正文:

{
  "description": "Fileset description.",
  "displayName": "Display name",
  "gcsFilesetSpec": {
    "filePatterns": [
      "gs://bucket_name/file_pattern"
    ]
  },
  "type": "FILESET",
  "schema": { schema }
}

如需发送您的请求,请展开以下选项之一:

您应该收到类似以下内容的 JSON 响应:

{
  "name": "projects/my_project_id/locations/us-central1/entryGroups/my_entryGroup_id/entries/my_entry_id",
  "type": "FILESET",
  "displayName": "My Fileset",
  "description": "My Fileset description.",
  "schema": {
    "columns": [
      {
        "type": "STRING",
        "description": "First name",
        "mode": "REQUIRED",
        "column": "first_name"
      },
      {
        "type": "STRING",
        "description": "Last name",
        "mode": "REQUIRED",
        "column": "last_name"
      },
      {
        "type": "RECORD",
        "description": "Address",
        "mode": "REPEATED",
        "column": "address",
        "subcolumns": [
          {
            "type": "STRING",
            "description": "City",
            "mode": "NULLABLE",
            "column": "city"
          },
          {
            "type": "STRING",
            "description": "State",
            "mode": "NULLABLE",
            "column": "state"
          }
        ]
      }
    ]
  },
  "gcsFilesetSpec": {
    "filePatterns": [
      "gs://my_bucket_name/chicago_taxi_trips/csv/shard-*.csv"
    ]
  },
  "sourceSystemTimestamps": {
    "createTime": "2019-10-23T23:11:26.326Z",
    "updateTime": "2019-10-23T23:11:26.326Z"
  },
"linkedResource": "//datacatalog.googleapis.com/projects/my_project_id/locations/us-central1/entryGroups/my_entryGroup_id/entries/my_entry_id"
}

IAM 角色、权限和政策

Data Catalog 定义了 entry 和 entryGroup 角色,以方便管理文件集和其他 Data Catalog 资源的权限。

Entry 角色 说明
dataCatalog.entryOwner 特定条目或条目组的所有者。
  • 权限:
    • datacatalog.entries.(*)
    • datacatalog.entryGroups.get
  • 适用性:
    • 组织、项目和 entryGroup。
dataCatalog.entryViewer 可以查看 entry 和 entryGroup 的详细信息。
  • 权限
    • datacatalog.entries.get
    • datacatalog.entryGroups.get
  • 适用性:
    • 组织、项目和 entryGroup。
entryGroup 角色 说明
dataCatalog.entryGroupOwner 特定 entryGroup 的所有者。
  • 权限:
    • datacatalog.entryGroups.(*)
    • datacatalog entries.(*)
  • 适用性:
    • 组织、项目和 entryGroup 级层。
dataCatalog.entryGroupCreator 可以在项目中创建 entryGroup。entryGroup 的创建者会自动获得 dataCatalog.entryGroupOwner 角色。
  • 权限
    • datacatalog.entryGroups.(get | create)
  • 适用性:
    • 组织和项目级层。

设置 IAM 政策

拥有 datacatalog.<resource>.setIamPolicy 权限的用户可以对 Data Catalog 条目组和其他 Data Catalog 资源设置 IAM 政策(请参阅 Data Catalog 角色)。

gcloud

控制台

导航至 Data Catalog 界面条目组详情页面,然后使用右侧的 IAM 面板授予或撤消权限。

授予条目组角色

示例 1

如果公司的文件集具有不同的业务环境,则会创建单独的 order-filesuser-files 条目组:

订单文件组的存储桶中存储有已取消和已完成的有序文件,而用户文件组的存储桶中存储有 PII 文件。
图 1. 如何在不同条目组中存储订单数据和用户数据的示例。

公司将向用户授予 order-files 的 EntryGroup Viewer 角色,这意味着他们只能搜索该条目组中包含的条目。他们的搜索结果不会返回 user-files 条目组中的条目。

示例 2

公司仅向 project_entry_group 项目中的用户授予 EntryGroup Viewer 角色。该用户只能查看该项目中的条目。

搜索文件集

用户可以使用 type 构面限制 Data Catalog 中的搜索范围。type=entry_group 将搜索查询限制为条目组,而 type=fileset 仅搜索文件集。type 构面可与其他构面(如 projectid)结合使用。

gcloud

  • 搜索项目中的条目组:

    gcloud data-catalog search \  
        --include-project-ids=my-project
        "projectid=my-project type=entry_group"
    

  • 搜索您可以访问的所有条目组:

    gcloud data-catalog search \  
        --include-project-ids=my-project
        "type=entry_group"
    

  • 搜索项目中的文件集:

    gcloud data-catalog search \  
        --include-project-ids=my-project
        "type=entry.fileset"
    

  • 搜索项目中的文件集 - 简化了语法:

    gcloud data-catalog search \  
        --include-project-ids=my-project
        "type=fileset"