本文档介绍了如何在 Workflows 中运行托管式连接流水线,以便将元数据从第三方来源导入 Dataplex。
若要设置托管式连接流水线,您需要为数据源构建连接器。然后,您可以在 Workflows 中运行该流水线。该流水线会从数据源中提取元数据,然后将元数据导入 Dataplex。如有必要,该流水线还会在您的 Google Cloud 项目中创建 Dataplex 目录条目组。
如需详细了解托管式连接,请参阅托管式连接概览。
准备工作
在导入元数据之前,请完成本部分中的任务。
构建连接器
连接器会从数据源中提取元数据,并生成可供 Dataplex 导入的元数据导入文件。该连接器是一个可在 Dataproc Serverless 上运行的 Artifact Registry 映像。
构建用于从第三方来源提取元数据的自定义连接器。
如需查看可用作参考模板来构建您自己的连接器的示例连接器,请参阅开发用于元数据导入的自定义连接器。
配置 Google Cloud 资源
-
Enable the Workflows, Dataproc, Cloud Storage, Dataplex, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.
如果您不打算按计划运行流水线,则无需启用 Cloud Scheduler API。
在 Secret Manager 中创建 Secret,以存储第三方数据源的凭据。
配置 Virtual Private Cloud (VPC) 网络,以运行 Dataproc Serverless for Spark 工作负载。
创建 Cloud Storage 存储桶以存储元数据导入文件。
创建以下 Dataplex Catalog 资源:
所需的角色
服务账号代表工作流的身份,并决定工作流拥有哪些权限以及可以访问哪些 Google Cloud 资源。您需要为 Workflows(用于运行流水线)和 Dataproc Serverless(用于运行连接器)创建服务账号。
您可以使用 Compute Engine 默认服务账号 (PROJECT_NUMBER-compute@developer.gserviceaccount.com
),也可以创建自己的服务账号(或账号)来运行托管的连接流水线。
控制台
在 Google Cloud 控制台中,前往 IAM 页面。
选择要将元数据导入到的项目。
点击
授予访问权限,然后输入服务账号的电子邮件地址。向服务账号分配以下角色:
- Logs Writer
- Dataplex Entry Group Owner
- Dataplex Metadata Job Owner
- Dataplex Catalog Editor
- Dataproc Editor
- Dataproc Worker
- Secret Manager Secret Accessor - 用于存储数据源凭据的 Secret
- Storage Object User - Cloud Storage 存储桶
- Artifact Registry Reader - 在包含连接器映像的 Artifact Registry 仓库上
- Service Account User(服务账号用户)- 如果您使用不同的服务账号,请向运行 Dataproc Serverless 批处理作业的服务账号授予运行 Workflows 服务账号的此角色
- Workflows Invoker - 如果您想安排流水线
保存更改。
gcloud
向服务账号授予角色。运行以下命令:
gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/logging.logWriter gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/dataplex.entryGroupOwner gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/dataplex.metadataJobOwner gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/dataplex.catalogEditor gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/dataproc.editor gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/dataproc.worker
替换以下内容:
-
PROJECT_ID
:要将元数据导入到的目标 Google Cloud项目的名称。 SERVICE_ACCOUNT_ID
:服务账号,例如my-service-account@my-project.iam.gserviceaccount.com
。
-
在资源级别向服务账号授予以下角色:
gcloud secrets add-iam-policy-binding SECRET_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/secretmanager.secretaccessor gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/storage.objectUser \ --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID') gcloud artifacts repositories add-iam-policy-binding REPOSITORY \ --location=REPOSITORY_LOCATION \ --member=SERVICE_ACCOUNT_ID} \ --role=roles/artifactregistry.reader
替换以下内容:
SECRET_ID
:用于存储数据源凭据的 Secret 的 ID。其格式为projects/PROJECT_ID/secrets/SECRET_ID
。BUCKET_ID
:Cloud Storage 存储桶的名称。REPOSITORY
:包含连接器映像的 Artifact Registry 制品库。REPOSITORY_LOCATION
:代码库托管的 Google Cloud位置。
向运行 Dataproc Serverless 批处理作业的服务账号授予运行 Workflows 服务账号的
roles/iam.serviceAccountUser
角色。即使您为 Workflows 和 Dataproc Serverless 使用相同的服务账号,也必须授予此角色。gcloud iam service-accounts add-iam-policy-binding \ serviceAccount:SERVICE_ACCOUNT_ID \ --member='SERVICE_ACCOUNT_ID' \ --role='roles/iam.serviceAccountUser'
如果您使用不同的服务账号,
--member
标志的值为运行 Dataproc Serverless 批处理作业的服务账号。如果您想安排流水线的运行时间,请向服务账号授予以下角色:
gcloud projects add-iam-policy-binding PROJECT_ID \ --member="SERVICE_ACCOUNT_ID" \ --role=roles/workflows.invoker
导入元数据
如需导入元数据,请创建并执行用于运行托管式连接流水线的工作流。(可选)您还可以创建运行流水线的时间表。
控制台
创建工作流。 提供以下信息:
- 服务账号:您在此文档的所需角色部分中配置的服务账号。
加密:选择 的 Google 管理 加密密钥。
定义工作流:提供以下定义文件:
如需按需运行流水线,请执行工作流。
提供以下运行时参数:
替换以下内容:
-
PROJECT_ID
:要将元数据导入到的目标 Google Cloud项目的名称。 -
LOCATION_ID
:目标 Google Cloud 位置,Dataproc Serverless 和元数据导入作业将在其中运行,并且元数据将导入其中。 -
ENTRY_GROUP_ID
:要将元数据导入到的条目组的 ID。条目组 ID 可以包含小写字母、数字和连字符。此条目组的完整资源名称为
projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID
。 -
CREATE_ENTRY_GROUP_BOOLEAN
:如果您希望流水线在项目中不存在条目组时创建该条目组,请将此值设置为true
。 -
BUCKET_ID
:用于存储连接器生成的元数据导入文件的 Cloud Storage 存储桶的名称。每次执行工作流都会创建一个新文件夹。 -
SERVICE_ACCOUNT_ID
:您在本文档的所需角色部分中配置的服务账号。 该服务账号会在 Dataproc Serverless 中运行连接器。 -
ADDITIONAL_CONNECTOR_ARGUMENTS
:要传递给连接器的其他参数列表。如需查看示例,请参阅为元数据导入开发自定义连接器。请用双引号括起每个参数,并用英文逗号分隔参数。 -
CONTAINER_IMAGE
:托管在 Artifact Registry 中的连接器的自定义容器映像。 -
ENTRY_TYPES
:可导入的条目类型的列表,格式为projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID
。LOCATION_ID
必须是您要将元数据导入到的Google Cloud 位置,或者global
。 -
ASPECT_TYPES
:可导入的方面类型的列表,格式为projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID
。LOCATION_ID
必须是您要将元数据导入到的Google Cloud 位置,或者global
。 -
可选:为
NETWORK_TAGS
参数提供网络标记列表。 -
可选:对于
NETWORK_URI
参数,请提供连接到数据源的 VPC 网络的 URI。如果您指定了网络,请省略子网参数。 -
可选:对于
SUBNETWORK_URI
参数,请提供连接到数据源的子网的 URI。如果您提供子网,请省略网络参数。
流水线的运行时间可能需要几分钟或更长时间,具体取决于您导入的元数据量。如需详细了解如何查看进度,请参阅访问工作流执行结果。
流水线运行完毕后,您可以在 Dataplex Catalog 中搜索导入的元数据。
-
可选:如果您想按计划运行流水线,请使用 Cloud Scheduler 创建时间表。提供以下信息:
- 频率:一个 unix-cron 表达式,用于定义运行流水线的时间表。
- 工作流参数:连接器的运行时参数,如上一步所述。
- 服务账号:服务账号。服务账号会管理调度程序。
gcloud
将以下工作负载定义保存为 YAML 文件:
定义 Bash 变量、创建工作流,并根据需要创建时间表来运行流水线:
替换以下内容:
-
PROJECT_ID
:要将元数据导入到的目标 Google Cloud项目的名称。 -
LOCATION_ID
:目标 Google Cloud 位置,Dataproc Serverless 和元数据导入作业将在其中运行,并且元数据将导入其中。 -
SERVICE_ACCOUNT_ID
:您在本文档的所需角色部分中配置的服务账号。 WORKFLOW_DEFINITION_FILE
:工作流定义 YAML 文件的路径。WORKFLOW_NAME
:工作流的名称。WORKFLOW_ARGUMENTS
:要传递给连接器的运行时参数。参数采用 JSON 格式:对于 Cloud Scheduler,带英文引号的字符串中的英文双引号使用反斜杠 (\) 进行转义。例如:
--message-body="{\"argument\": \"{\\\"key\\\": \\\"value\\\"}\"}"
。替换以下内容:
-
ENTRY_GROUP_ID
:要将元数据导入到的条目组的 ID。条目组 ID 可以包含小写字母、数字和连字符。此条目组的完整资源名称为
projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID
。 -
CREATE_ENTRY_GROUP_BOOLEAN
:如果您希望流水线在项目中不存在条目组时创建该条目组,请将此值设置为true
。 -
BUCKET_ID
:用于存储连接器生成的元数据导入文件的 Cloud Storage 存储桶的名称。每次执行工作流都会创建一个新文件夹。 -
ADDITIONAL_CONNECTOR_ARGUMENTS
:要传递给连接器的其他参数列表。如需查看示例,请参阅为元数据导入开发自定义连接器。 -
CONTAINER_IMAGE
:托管在 Artifact Registry 中的连接器的自定义容器映像。 -
ENTRY_TYPES
:可导入的条目类型的列表,格式为projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID
。LOCATION_ID
必须是您要将元数据导入到的Google Cloud 位置,或者global
。 -
ASPECT_TYPES
:可导入的方面类型的列表,格式为projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID
。LOCATION_ID
必须是您要将元数据导入到的Google Cloud 位置,或者global
。 -
可选:为
NETWORK_TAGS
参数提供网络标记列表。 -
可选:对于
NETWORK_URI
参数,请提供连接到数据源的 VPC 网络的 URI。如果您指定了网络,请省略子网参数。 -
可选:对于
SUBNETWORK_URI
参数,请提供连接到数据源的子网的 URI。如果您提供子网,请省略网络参数。
-
CRON_SCHEDULE_EXPRESSION
:一个 Cron 表达式,用于定义运行流水线的时间表。例如,如需每天午夜运行时间表,请使用表达式0 0 * * *
。
-
如需按需运行流水线,请执行工作流:
工作流参数采用 JSON 格式,但未转义。
工作流的运行时间可能需要几分钟或更长时间,具体取决于您导入的元数据量。如需详细了解如何查看进度,请参阅访问工作流执行结果。
流水线运行完毕后,您可以在 Dataplex Catalog 中搜索导入的元数据。
Terraform
-
该代码库包含以下 Terraform 文件:
main.tf
:定义要创建的 Google Cloud 资源。variables.tf
:声明变量。byo-connector.tfvars
:为托管的连接性流水线定义变量。
修改
.tfvars
文件,将占位符替换为连接器的信息。替换以下内容:
-
PROJECT_ID
:要将元数据导入到的目标 Google Cloud项目的名称。 -
LOCATION_ID
:目标 Google Cloud 位置,Dataproc Serverless 和元数据导入作业将在其中运行,并且元数据将导入其中。 -
SERVICE_ACCOUNT_ID
:您在本文档的所需角色部分中配置的服务账号。 -
CRON_SCHEDULE_EXPRESSION
:一个 Cron 表达式,用于定义运行流水线的时间表。例如,如需每天午夜运行时间表,请使用表达式0 0 * * *
。 -
ENTRY_GROUP_ID
:要将元数据导入到的条目组的 ID。条目组 ID 可以包含小写字母、数字和连字符。此条目组的完整资源名称为
projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID
。 -
CREATE_ENTRY_GROUP_BOOLEAN
:如果您希望流水线在项目中不存在条目组时创建该条目组,请将此值设置为true
。 -
BUCKET_ID
:用于存储连接器生成的元数据导入文件的 Cloud Storage 存储桶的名称。每次执行工作流都会创建一个新文件夹。 -
ADDITIONAL_CONNECTOR_ARGUMENTS
:要传递给连接器的其他参数列表。如需查看示例,请参阅为元数据导入开发自定义连接器。请用双引号括起每个参数,并用英文逗号分隔参数。 -
CONTAINER_IMAGE
:托管在 Artifact Registry 中的连接器的自定义容器映像。 -
ENTRY_TYPES
:可导入的条目类型的列表,格式为projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID
。LOCATION_ID
必须是您要导入元数据的Google Cloud 位置,或者global
。 -
ASPECT_TYPES
:可导入的方面类型的列表,格式为projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID
。LOCATION_ID
必须是您要将元数据导入到的Google Cloud 位置,或者global
。 -
可选:为
NETWORK_TAGS
参数提供网络标记列表。 -
可选:对于
NETWORK_URI
参数,请提供连接到数据源的 VPC 网络的 URI。如果您指定了网络,请省略子网参数。 -
可选:对于
SUBNETWORK_URI
参数,请提供连接到数据源的子网的 URI。如果您提供子网,请省略网络参数。
-
初始化 Terraform:
terraform init
使用
.tfvars
文件验证 Terraform:terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
将
CONNECTOR_VARIABLES_FILE
替换为变量定义文件的名称。使用
.tfvars
文件部署 Terraform:terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
Terraform 会在指定项目中创建一个工作流和一个 Cloud Scheduler 作业。Workflows 会按照您指定的时间表运行流水线。
工作流的运行时间可能需要几分钟或更长时间,具体取决于您导入的元数据量。如需详细了解如何查看进度,请参阅访问工作流执行结果。
流水线运行完毕后,您可以在 Dataplex Catalog 中搜索导入的元数据。
查看作业日志
使用 Cloud Logging 查看托管式连接流水线的日志。日志载荷包含指向 Dataproc Serverless 批量作业和元数据导入作业的日志的链接(如适用)。如需了解详情,请参阅查看工作流日志。
问题排查
请参考以下问题排查建议:
- 为元数据作业配置导入作业日志级别,以使用调试级日志记录,而不是信息级日志记录。
- 查看 Dataproc Serverless 批处理批量作业(适用于连接器运行)和元数据导入作业的日志。如需了解详情,请参阅查询 Dataproc Serverless for Spark 日志和查询元数据作业日志。
- 如果无法使用流水线导入条目,并且错误消息未提供足够的信息,请尝试在测试条目组中创建具有相同详细信息的自定义条目。如需了解详情,请参阅创建自定义条目。