借助 Cloud Storage 批处理来源插件,您可以从 Cloud Storage 存储分区,并将其导入 Cloud Data Fusion 进一步处理和转换。它支持从多个文件加载数据 格式,其中包括:
- 结构化:CSV、Avro、Parquet、ORC
- 半结构化:JSON、XML
- 其他:文本、二进制
准备工作
Cloud Data Fusion 通常有两个服务账号:
- 设计时服务账号: Cloud Data Fusion API 服务代理
- 执行时服务账号:Compute Engine 服务账号
在使用 Cloud Storage 批处理来源插件之前, 以下角色或权限。
Cloud Data Fusion API Service Agent
此服务账号已拥有所有必要权限,您无需添加其他权限。
Compute Engine 服务账号
在您的 Google Cloud 项目中,授予以下 IAM 角色或 对 Compute Engine 服务账号的权限:
- Storage Legacy Bucket Reader (
roles/storage.legacyBucketReader
)。这个 预定义角色包含所需的storage.buckets.get
权限。 Storage Object Viewer (
roles/storage.legacyBucketReader
)。这个 预定义角色包含以下所需权限:storage.objects.get
storage.objects.list
配置插件
- 转到 Cloud Data Fusion 网页界面 然后点击 Studio。
- 检查是否已选择 Data Pipeline - Batch(而非 Realtime)。
- 在来源菜单中,点击 GCS。Cloud Storage 节点 。
- 如需配置该来源,请前往 Cloud Storage 节点,然后点击属性。
输入以下属性。有关完整列表,请参阅 属性。
- 输入 Cloud Storage 节点的标签
示例
Cloud Storage tables
。 输入连接详情。您可以设置新的一次性连接 或可重复使用的现有连接。
新增关联项
如需添加与 Cloud Storage 的一次性连接,请按照下列步骤操作 步骤:
- 让使用网络连接保持关闭状态。
- 在项目 ID 字段中,将值保留为“自动检测”。
在服务账号类型字段中,将值保留为文件路径,并将服务账号文件路径设为“自动检测”。
可重复使用的连接
如需重复使用现有连接,请按以下步骤操作:
- 开启使用网络连接。
- 点击浏览连接。
点击连接名称,例如 Cloud Storage 默认值。
可选:如果连接不存在,并且您想要创建一个 新的可重复使用的连接,请点击添加连接并参阅 本页面上新建连接标签页中介绍的步骤。
在“参考名称”字段中,输入要使用的名称 沿袭 - 例如
data-fusion-gcs-campaign
。在 Path 字段中,输入要从中读取数据的路径,例如
gs://BUCKET_PATH
。在格式字段中,为 要读取的数据:
- avro
- blob(blob 格式需要包含字段的架构 字节类型的命名正文)
- csv
- 分隔
- json
- 镶木地板
- text(文本格式需要包含字段的架构 (字符串类型的命名正文)
- tsv
- 您在 Google Cloud 控制台中部署的任何格式插件的名称, 环境
可选:如需测试连接性,请点击获取架构。
可选:在样本大小字段中,输入要检查的行数上限 (针对所选数据类型,例如
1000
)。可选:在替换字段中,输入要跳过的列名称及其各自的数据类型。
可选:输入高级属性,例如最小拆分大小或 正则表达式路径过滤器(请参阅属性)。
可选:在临时存储桶名称字段中,输入一个名称 指定 Cloud Storage 存储桶的存储位置。
- 输入 Cloud Storage 节点的标签
示例
可选:点击验证,并解决发现的所有错误。
点击关闭。 已保存属性,您可以继续构建 Cloud Data Fusion Studio 中的数据流水线。
属性
属性 | 宏已启用 | 必需属性 | 说明 |
---|---|---|---|
标签 | 否 | 是 | 数据流水线中相应节点的名称。 |
使用连接 | 否 | 否 | 浏览到来源的可重复使用连接。如需详细了解如何添加、导入和修改浏览连接时显示的连接,请参阅管理连接。 |
连接 | 是 | 是 | 如果使用连接处于开启状态,系统会在 您选择的可重用连接会显示在此字段中。 |
项目 ID | 是 | 否 | 仅在使用连接处于关闭状态时使用。全球
项目的唯一标识符。 默认值为 auto-detect 。 |
服务账号类型 | 是 | 否 | 从下列选项中选择一项:
|
服务账号文件路径 | 是 | 否 | 仅当服务账号类型值为文件路径时使用。服务账号密钥的本地文件系统上的路径
用于授权。如果作业在 Dataproc 集群上运行,
将值设置为自动检测。如果作业在其他类型的集群上运行,
文件。 默认值为 auto-detect 。 |
服务账号 JSON | 是 | 否 | 仅在服务账号类型值为 JSON 时使用。 服务账号的 JSON 文件内容。 |
参考名称 | 否 | 是 | 用于唯一标识此来源以便其他服务(例如沿袭和注释元数据)使用。 |
路径 | 是 | 是 | 要读取的文件的路径。如果指定了目录,
包含反斜杠 (/ ) 的路径。例如:
gs://bucket/path/to/directory/ 。如需匹配文件名模式,
您可以使用星号 (* ) 作为通配符。如果没有任何文件
流水线将失败。 |
格式 | 否 | 是 | 要读取的数据的格式。格式必须是下列其中一项:
|
样本规模 | 是 | 否 | 调查自动数据的最大行数 类型检测。默认值为 1000。 |
覆盖 | 是 | 否 | 一列列表,其中包含自动 会跳过数据类型检测。 |
分隔符 | 是 | 否 | 格式为分隔时使用的分隔符。这个 属性。 |
启用带英文引号的值 | 是 | 否 | 是否将引号之间的内容视为值。此属性仅适用于 csv、tsv 或分隔格式。例如,如果此属性设置为
true,则以下代码会输出两个字段:1, "a, b, c" 。
第一个字段的值为 1 。第二个具有 a, b, c 。引号字符已被删减。通过
换行符分隔符不能用引号引起来。插件假定引号已正确括起来,例如: "a, b, c" 。未闭合引号 ("a,b,c, ) 的原因
出错。默认值为 False。 |
将第一行用作标题 | 是 | 否 | 是否将每个文件的第一行用作列
标头。支持的格式有 text、csv、
tsv 和 bound。 的默认值为 False。 |
拆分大小下限 | 是 | 否 | 每个输入分区的最小大小(以字节为单位)。较小的分区
会增加并行级别,但需要更多资源和开销。
如果格式值为 blob ,则无法拆分
数据。 |
分屏大小上限 | 是 | 否 | 每个输入分区的大小上限(以字节为单位)。较小的分区
会增加并行级别,但需要更多资源和开销。
如果格式值为 blob ,则无法拆分
数据。默认值为 128 MB。 |
正则表达式路径过滤条件 | 是 | 否 | 文件路径必须与正则表达式匹配才能包含在输入中。系统会比较完整路径,而不仅仅是文件名。如果未指定任何文件,则不会进行文件过滤。如需详细了解常规广告 表达式语法,请参阅 图案。 |
路径字段 | 是 | 否 | 输出字段,用于放置读取记录的文件的路径 。如果未指定,则输出记录中不包含该路径。如果 则该字段必须以字符串形式存在于输出架构中。 |
仅路径文件名 | 是 | 否 | 如果设置了路径字段属性,请仅使用文件名
而不是路径的 URI。 默认值为 False。 |
以递归方式读取文件 | 是 | 否 | 是否以递归方式从路径读取文件。 默认值为 False。 |
允许输入空值 | 是 | 否 | 是否允许输入路径不包含数据。设置为
False,则插件会在没有
阅读。如果设置为 True,则不会抛出任何错误并且为零
读取记录。 默认值为 False。 |
数据文件已加密 | 是 | 否 | 文件是否经过加密。如需了解详情,请参阅
数据文件加密。 默认值为 False。 |
加密元数据文件后缀 | 是 | 否 | 加密元数据文件的文件名后缀。 默认值为元数据。 |
文件系统属性 | 是 | 否 | 读取 数据。 |
文件编码 | 是 | 否 | 要读取的文件的字符编码。 默认值为 UTF-8。 |
输出架构 | 是 | 否 | 如果设置了路径字段属性,则该属性必须位于 以字符串形式表示架构 |
数据文件加密
本部分介绍了数据文件加密,
属性。如果将其设置为 true,则系统会对文件
使用由
Tink 库。每个数据文件
还必须附带包含密码的元数据文件
信息。例如,Google 服务器上的
gs://BUCKET/PATH_TO_DIRECTORY/file1.csv.enc
必须具有位于 gs://BUCKET/
PATH_TO_DIRECTORY/file1.csv.enc.metadata
的元数据文件。元数据文件
包含具有以下属性的 JSON 对象:
属性 | 说明 |
---|---|
kms |
用于加密数据加密的 Cloud Key Management Service URI 密钥。 |
aad |
通过 Base64 编码的附加身份验证数据, 加密。 |
key set |
一个 JSON 对象,表示来自 Tink 库的序列化密钥集信息。 |
示例
/* Counting example */ { "kms": "gcp-kms://projects/my-key-project/locations/us-west1/keyRings/my-key-ring/cryptoKeys/mykey", "aad": "73iT4SUJBM24umXecCCf3A==", "keyset": { "keysetInfo": { "primaryKeyId": 602257784, "keyInfo": [{ "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmHkdfStreamingKey", "outputPrefixType": "RAW", "keyId": 602257784, "status": "ENABLED" }] }, "encryptedKeyset": "CiQAz5HH+nUA0Zuqnz4LCnBEVTHS72s/zwjpcnAMIPGpW6kxLggSrAEAcJKHmXeg8kfJ3GD4GuFeWDZzgGn3tfolk6Yf5d7rxKxDEChIMWJWGhWlDHbBW5B9HqWfKx2nQWSC+zjM8FLefVtPYrdJ8n6Eg8ksAnSyXmhN5LoIj6az3XBugtXvCCotQHrBuyoDY+j5ZH9J4tm/bzrLEjCdWAc+oAlhsUAV77jZhowJr6EBiyVuRVfcwLwiscWkQ9J7jjHc7ih9HKfnqAZmQ6iWP36OMrEn" } }
版本说明
后续步骤
- 详细了解 Cloud Data Fusion 中的插件。