Cloud Storage 批处理来源

本页面提供了有关如何在 Cloud Data Fusion 中配置 Cloud Storage 批量来源插件的指南。

借助 Cloud Storage 批处理来源插件,您可以从 Cloud Storage 存储分区中读取数据并将其导入 Cloud Data Fusion,以便进一步处理和转换。它可让您从多种文件格式加载数据,包括:

  • 结构化:CSV、Avro、Parquet、ORC
  • 半结构化:JSON、XML
  • 其他:文字、二进制

准备工作

Cloud Data Fusion 通常有两个服务帐号:

在使用 Cloud Storage 批处理来源插件之前,请向每个服务帐号授予以下角色或权限。

Cloud Data Fusion API Service Agent

此服务帐号已具有所有必需的权限,您无需添加其他权限。

Compute Engine 服务账号

在 Google Cloud 项目中,向 Compute Engine 服务帐号授予以下 IAM 角色或权限:

  • Storage Legacy Bucket Reader (roles/storage.legacyBucketReader)。此预定义角色包含所需的 storage.buckets.get 权限。
  • Storage Object Viewer (roles/storage.legacyBucketReader)。此预定义角色包含以下所需权限:

    • storage.objects.get
    • storage.objects.list

配置插件

  1. 转到 Cloud Data Fusion 网页界面,然后点击 Studio
  2. 检查是否已选择 Data Pipeline - Batch(而非实时)。
  3. 来源菜单中,点击 GCS。Cloud Storage 节点会显示在您的流水线中。
  4. 如需配置来源,请转到 Cloud Storage 节点,然后点击属性
  5. 输入以下属性。如需查看完整列表,请参阅属性

    1. 为 Cloud Storage 节点输入标签,例如 Cloud Storage tables
    2. 输入连接详情。您可以设置新的一次性连接,也可以设置可重复使用的现有连接。

      新增关联项

      如需添加与 Cloud Storage 的一次性连接,请按以下步骤操作:

      1. 使使用连接保持关闭状态。
      2. 项目 ID 字段中,将值保留为“自动检测”。
      3. 服务帐号类型字段中,将值保留为文件路径,将服务帐号文件路径保留为自动检测。

      可重复使用的连接

      要重复使用现有连接,请按以下步骤操作:

      1. 开启使用网络连接
      2. 点击浏览连接
      3. 点击连接名称,例如 Cloud Storage Default

      4. 可选:如果连接不存在,并且您想要创建新的可重复使用的连接,请点击添加连接,然后参考本页面上新建连接标签页中的步骤。

    3. 引用名称字段中,输入要用于沿袭的名称,例如 data-fusion-gcs-campaign

    4. 路径字段中,输入读取路径,例如 gs://BUCKET_PATH

    5. 格式字段中,为要读取的数据选择以下文件格式之一:

      • avro
      • blob(blob 格式需要架构,包含名为字节体类型的字段)
      • csv
      • 分隔
      • json
      • parquet
      • text(文本格式所需的架构需包含名为“string”类型的“body”字段)
      • tsv
      • 您在环境中部署的任何格式插件的名称
    6. 可选:如需测试连接,请点击获取架构

    7. 可选:在样本大小字段中,输入要检查所选数据类型的最大行数,例如 1000

    8. 可选:在替换字段中,输入要跳过的列名称及其各自的数据类型。

    9. 可选:输入高级属性,例如最小拆分大小或正则表达式路径过滤条件(请参阅属性)。

    10. 可选:在临时存储桶名称字段中,输入 Cloud Storage 存储桶的名称。

  6. 可选:点击验证并解决发现的所有错误。

  7. 点击关闭。属性已保存,您可以继续在 Cloud Data Fusion Studio 中构建数据流水线。

属性

属性 已启用宏 必需属性 说明
标签 数据流水线中节点的名称。
使用连接 浏览以查找可重复使用的来源连接。如需详细了解如何添加、导入和修改连接时显示的连接,请参阅管理连接
连接 如果 Use connection 处于开启状态,您选择的可重复使用的连接的名称将显示在此字段中。
项目 ID 仅在使用连接处于关闭状态时使用。项目的全局唯一标识符。
默认值为 auto-detect
服务帐号类型 从下列选项中选择一项:
  • 文件路径:服务帐号所在的路径。
  • JSON:服务账号的 JSON 内容。
服务账号文件路径 仅在服务账号类型值为文件路径时使用。用于授权的服务帐号密钥的本地文件系统上的路径。如果作业在 Dataproc 集群上运行,请将该值设置为自动检测。如果作业在其他类型的集群上运行,则该文件必须存在于集群中的每个节点上。
默认值为 auto-detect
服务帐号 JSON 仅在服务帐号类型值为 JSON 时使用。 服务帐号的 JSON 文件内容。
参考编号 名称,用于为其他服务(例如沿袭和注释元数据)唯一标识此来源。
路径 要读取的文件的路径。如果指定了目录,请使用反斜杠 (/) 终止路径,例如 gs://bucket/path/to/directory/。如需匹配文件名模式,您可以使用星号 (*) 作为通配符。如果找不到或找不到文件,流水线将失败。
格式 要读取的数据的格式。格式必须为以下之一:
  • avro
  • blob(blob 格式需要架构,包含名为字节类型的正文的字段)
  • csv
  • 分隔
  • json
  • parquet
  • text(文本格式所需的架构需包含名为“正文”类型为字符串的字段)
  • tsv
  • 您在环境中部署的任何格式插件的名称
  • 如果格式为宏,则只能使用预封装的格式
样本大小 为自动检测数据类型而调查的行数上限。默认值为 1000
替换 包含相应数据(跳过了自动数据类型检测)的列的列表。
分隔符 采用分隔格式时使用的分隔符。对于其他格式,系统会忽略此属性。
启用带引号的值 是否将引号之间的内容视为值。此属性仅用于 csvtsv分隔格式。例如,如果此属性设置为 true,则以下转换会输出两个字段:1, "a, b, c"。 第一个字段值为 1。第二个代码具有 a, b, c。引号字符会被删除。换行符不能用引号括起来。
插件假定引号已正确括起来,例如 "a, b, c"。不加引号 ("a,b,c,) 会导致错误。
默认值为 False
将第一行用作标题 是否使用每个文件的第一行作为列标题。支持的格式有文本csvtsv分隔
默认值为 False
最小拆分大小 每个输入分区的最小大小(以字节为单位)。较小的分区可以提高并行级别,但需要更多的资源和开销。
如果格式值为 blob,则无法拆分数据。
最大拆分大小 每个输入分区的大小上限(以字节为单位)。较小的分区可以提高并行级别,但需要更多的资源和开销。
如果格式值为 blob,则无法拆分数据。
默认值为 128 MB
正则表达式路径过滤条件 文件路径必须匹配才能包含在输入中的正则表达式。系统会比较完整路径,而不只是文件名。如果未指定任何文件,则不会执行文件过滤。如需详细了解正则表达式语法,请参阅模式
路径字段 输出字段,用于放置从中读取记录的文件的路径。如果未指定,则输出记录中不包含该路径。如果已指定,则该字段必须以字符串形式存在于输出架构中。
仅路径文件名 如果设置了路径字段属性,请仅使用文件名,不要使用路径的 URI。
默认值为 False
以递归方式读取文件 是否从路径以递归方式读取文件。
默认值为 False
允许空输入内容 是否允许输入路径中不含任何数据。如果设置为 False,则插件会在没有要读取的数据时发生错误。如果设置为 True,则不会抛出任何错误,并且不会读取任何记录。
默认值为 False
数据文件已加密 文件是否经过加密。如需了解详情,请参阅数据文件加密
默认值为 False
加密元数据文件后缀 加密元数据文件的文件名后缀。
默认值为 metadata
文件系统属性 读取数据时要与 InputFormat 一起使用的其他属性。
文件编码 要读取的文件的字符编码。
默认值为 UTF-8
输出架构 如果设置了路径字段属性,则该属性必须以字符串形式存在于架构中。

数据文件加密

本部分介绍了数据文件加密属性。如果将此属性设置为 true,系统会使用 Tink 库提供的流式 AEAD 解密文件。每个数据文件必须附带一个包含加密信息的元数据文件。例如,位于 gs://BUCKET/PATH_TO_DIRECTORY/file1.csv.enc 的加密数据文件必须在位于 gs://BUCKET/ PATH_TO_DIRECTORY/file1.csv.enc.metadata 的位置具有元数据文件。该元数据文件包含一个具有以下属性的 JSON 对象:

属性 说明
kms 用于加密数据加密密钥的 Cloud Key Management Service URI。
aad 加密中使用的 Base64 编码的额外身份验证数据。
key set 一个 JSON 对象,表示来自 Tink 库的序列化密钥集信息。

示例

    /* Counting example */
    {

      "kms": "gcp-kms://projects/my-key-project/locations/us-west1/keyRings/my-key-ring/cryptoKeys/mykey",

      "aad": "73iT4SUJBM24umXecCCf3A==",

      "keyset": {

          "keysetInfo": {

              "primaryKeyId": 602257784,

              "keyInfo": [{

                  "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmHkdfStreamingKey",

                  "outputPrefixType": "RAW",

                  "keyId": 602257784,

                  "status": "ENABLED"

              }]

          },

          "encryptedKeyset": "CiQAz5HH+nUA0Zuqnz4LCnBEVTHS72s/zwjpcnAMIPGpW6kxLggSrAEAcJKHmXeg8kfJ3GD4GuFeWDZzgGn3tfolk6Yf5d7rxKxDEChIMWJWGhWlDHbBW5B9HqWfKx2nQWSC+zjM8FLefVtPYrdJ8n6Eg8ksAnSyXmhN5LoIj6az3XBugtXvCCotQHrBuyoDY+j5ZH9J4tm/bzrLEjCdWAc+oAlhsUAV77jZhowJr6EBiyVuRVfcwLwiscWkQ9J7jjHc7ih9HKfnqAZmQ6iWP36OMrEn"

      }

    }
    

版本说明

后续步骤