本文档介绍了如何在 Workflows 中运行托管式连接流水线,以便将元数据从第三方来源导入 Dataplex。
若要设置托管式连接流水线,您需要为数据源构建连接器。然后,您可以在 Workflows 中运行该流水线。通过 流水线从您的数据源提取元数据,然后导入这些元数据 Dataplex。如有必要,流水线还会创建 Google Cloud 项目中的 Dataplex Catalog 条目组。
如需详细了解托管式连接,请参阅托管式连接概览。
准备工作
在导入元数据之前,请完成本部分中的任务。
构建连接器
连接器会从数据源中提取元数据,并生成可供 Dataplex 导入的元数据导入文件。该连接器是一个可以在 Artifact Registry 中运行的 Artifact Registry 映像, Dataproc Serverless。
构建用于从第三方来源提取元数据的自定义连接器。
有关可用作构建 自己的连接器,请参阅 开发用于元数据导入的自定义连接器。
配置 Google Cloud 资源
-
Enable the Workflows, Dataproc, Cloud Storage, Dataplex, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.
如果您不打算按计划运行流水线,则无需 启用 Cloud Scheduler API。
在 Secret Manager 中创建 Secret 来存储第三方数据源的凭据。
配置 Virtual Private Cloud (VPC) 网络,以运行 Dataproc Serverless for Spark 工作负载。
创建 Cloud Storage 存储桶以存储元数据导入文件。
创建以下 Dataplex Catalog 资源:
所需的角色
服务账号代表工作流的身份, 工作流拥有的权限,以及它可以使用哪些 Google Cloud 资源 访问权限。您需要一个用于 Workflows 的服务账号(要运行 流水线)和 Dataproc Serverless(运行连接器)。
您可以使用 Compute Engine 默认服务账号
(PROJECT_NUMBER-compute@developer.gserviceaccount.com
),或创建自己的服务账号
来运行代管式连接流水线。
控制台
在 Google Cloud 控制台中,转到 IAM 页面。
选择要导入元数据的项目。
点击
授予访问权限。 然后输入服务账号的电子邮件地址。向服务账号分配以下角色:
- Logs Writer
- Dataplex Entry Group Owner
- Dataplex Metadata Job Owner
- Dataplex Catalog Editor
- Dataproc Editor
- Dataproc Worker
- Secret Manager Secret Accessor - 用于存储 数据源的凭据
- Storage Object User - Cloud Storage 存储桶
- Artifact Registry 读取者 - 位于 Artifact Registry 代码库中 包含连接器映像
- Service Account User(服务账号用户)- 如果您使用不同的服务账号,请向运行 Dataproc Serverless 批处理作业的服务账号授予运行工作流的服务账号的此角色
- Workflows Invoker - 如果您想安排流水线的运行时间
保存更改。
gcloud
向服务账号授予角色。运行以下命令:
gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/logging.logWriter gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/dataplex.entryGroupOwner gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/dataplex.metadataJobOwner gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/dataplex.catalogEditor gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/dataproc.editor gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/dataproc.worker
替换以下内容:
- PROJECT_ID:目标的项目 ID 要导入元数据的 Google Cloud 项目。
- SERVICE_ACCOUNT:服务账号,例如
my-service-account@my-project.iam.gserviceaccount.com
。
在资源层级向服务账号授予以下角色:
gcloud secrets add-iam-policy-binding SECRET_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/secretmanager.secretaccessor gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/storage.objectUser \ --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID') gcloud artifacts repositories add-iam-policy-binding REPOSITORY \ --location=REPOSITORY_LOCATION \ --member=SERVICE_ACCOUNT} \ --role=roles/artifactregistry.reader
替换以下内容:
- SECRET_ID:用于存储数据源凭据的 Secret 的 ID。其格式为
projects/PROJECT_ID/secrets/SECRET_ID
。 - BUCKET_ID:Cloud Storage 存储桶的名称。
- REPOSITORY:Artifact Registry 代码库 (包含连接器映像)
- REPOSITORY_LOCATION:代码库托管的 Google Cloud 位置。
- SECRET_ID:用于存储数据源凭据的 Secret 的 ID。其格式为
向运行 Workflows 的服务账号授予 针对服务账号的
roles/iam.serviceAccountUser
角色 运行 Dataproc 无服务器批量作业。即使您为 Workflows 和 Dataproc Serverless 使用相同的服务账号,也必须授予此角色。gcloud iam service-accounts add-iam-policy-binding \ serviceAccount:SERVICE_ACCOUNT \ --member='SERVICE_ACCOUNT' \ --role='roles/iam.serviceAccountUser'
如果您使用不同的服务账号,则
--member
标志的值 是运行 Dataproc Serverless 的服务账号 批量作业。如果要安排流水线,请向服务账号授予 以下角色:
gcloud projects add-iam-policy-binding PROJECT_ID \ --member="SERVICE_ACCOUNT" \ --role=roles/workflows.invoker
导入元数据
如需导入元数据,请创建并执行运行代管式 流水线。(可选)您还可以创建运行流水线的时间表。
控制台
创建工作流。 请提供以下信息:
- 服务账号:您在 本文档的必需角色部分。
加密:选择 Google 管理的加密密钥。
定义工作流:提供以下定义文件:
main: params: [args] steps: - init: assign: - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")} - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")} - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")} - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])} - NETWORK_TYPE: "networkUri" - check_networking: switch: - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""} raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one." - condition: ${NETWORK_URI != ""} steps: - submit_extract_job_with_network_uri: assign: - NETWORKING: ${NETWORK_URI} - NETWORK_TYPE: "networkUri" - condition: ${SUBNETWORK_URI != ""} steps: - submit_extract_job_with_subnetwork_uri: assign: - NETWORKING: ${SUBNETWORK_URI} - NETWORK_TYPE: "subnetworkUri" next: check_create_target_entry_group - check_create_target_entry_group: switch: - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true} next: create_target_entry_group - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false} next: generate_extract_job_link - create_target_entry_group: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" next: generate_extract_job_link - generate_extract_job_link: call: sys.log args: data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID} severity: "INFO" next: submit_pyspark_extract_job - submit_pyspark_extract_job: call: http.post args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" headers: Content-Type: "application/json" query: batchId: ${WORKFLOW_ID} body: pysparkBatch: mainPythonFileUri: file:///main.py args: - ${"--target_project_id=" + args.TARGET_PROJECT_ID} - ${"--target_location_id=" + args.CLOUD_REGION} - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID} - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID} - ${"--output_folder=" + WORKFLOW_ID} - ${args.ADDITIONAL_CONNECTOR_ARGS} runtimeConfig: containerImage: ${args.CUSTOM_CONTAINER_IMAGE} environmentConfig: executionConfig: serviceAccount: ${args.SERVICE_ACCOUNT} stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID} ${NETWORK_TYPE}: ${NETWORKING} networkTags: ${NETWORK_TAGS} result: RESPONSE_MESSAGE next: check_pyspark_extract_job - check_pyspark_extract_job: call: http.get args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: PYSPARK_EXTRACT_JOB_STATUS next: check_pyspark_extract_job_done - check_pyspark_extract_job_done: switch: - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"} next: generate_import_logs_link - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"} raise: ${PYSPARK_EXTRACT_JOB_STATUS} - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"} raise: ${PYSPARK_EXTRACT_JOB_STATUS} next: pyspark_extract_job_wait - pyspark_extract_job_wait: call: sys.sleep args: seconds: 30 next: check_pyspark_extract_job - generate_import_logs_link: call: sys.log args: data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"} severity: "INFO" next: submit_import_job - submit_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" body: type: IMPORT import_spec: source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"} entry_sync_mode: FULL aspect_sync_mode: INCREMENTAL log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")} scope: entry_groups: - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID} entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES} aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES} result: IMPORT_JOB_RESPONSE next: get_job_start_time - get_job_start_time: assign: - importJobStartTime: ${sys.now()} next: import_job_startup_wait - import_job_startup_wait: call: sys.sleep args: seconds: 30 next: initial_get_import_job - initial_get_import_job: call: http.get args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: IMPORT_JOB_STATUS next: check_import_job_status_available - check_import_job_status_available: switch: - condition: ${"status" in IMPORT_JOB_STATUS.body} next: check_import_job_done - condition: ${sys.now() - importJobStartTime > 300} # 5 minutes = 300 seconds next: kill_import_job next: import_job_status_wait - import_job_status_wait: call: sys.sleep args: seconds: 30 next: check_import_job_status_available - check_import_job_done: switch: - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"} next: the_end - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"} raise: ${IMPORT_JOB_STATUS} - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"} raise: ${IMPORT_JOB_STATUS} - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"} raise: ${IMPORT_JOB_STATUS} - condition: ${sys.now() - importJobStartTime > 43200} # 12 hours = 43200 seconds next: kill_import_job next: import_job_wait - get_import_job: call: http.get args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: IMPORT_JOB_STATUS next: check_import_job_done - import_job_wait: call: sys.sleep args: seconds: 30 next: get_import_job - kill_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" next: get_killed_import_job - get_killed_import_job: call: http.get args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: KILLED_IMPORT_JOB_STATUS next: killed - killed: raise: ${KILLED_IMPORT_JOB_STATUS} - the_end: return: ${IMPORT_JOB_STATUS}
如需按需运行流水线 执行工作流。
提供以下运行时参数:
{ "TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT", "ADDITIONAL_CONNECTOR_ARGS": [ ADDITIONAL_CONNECTOR_ARGUMENTS ], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "SPARK_DRIVER_TYPE": "PYSPARK", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [NETWORK_TAGS], "NETWORK_URI": "NETWORK_URI", "SUBNETWORK_URI": "SUBNETWORK_URI" }
替换以下内容:
- PROJECT_ID:要将元数据导入到的目标 Google Cloud 项目的名称。
- LOCATION_ID:目标 Google Cloud Dataproc Serverless 导入作业将运行,然后导入元数据。
ENTRY_GROUP_ID:要添加到的条目组的 ID 将元数据导入其中。条目组 ID 可以包含小写字母 字母、数字和连字符。
此条目组的完整资源名称为
projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID
。CREATE_ENTRY_GROUP_BOOLEAN:如果您希望 创建一个条目组(如果该条目组尚不存在) 您的项目,请将此值设置为
true
。BUCKET_ID:用于存储连接器生成的元数据导入文件的 Cloud Storage 存储桶的名称。每次工作流执行都会创建一个新文件夹。
SERVICE_ACCOUNT:服务账号。通过 服务账号在 Dataproc Serverless。
ADDITIONAL_CONNECTOR_ARGUMENTS:要传递给连接器的其他参数的列表。如需查看示例,请参阅为元数据导入开发自定义连接器。用双引号将每个参数括起来 标记,然后使用逗号分隔各个参数。
CONTAINER_IMAGE:自定义容器映像 Artifact Registry 中托管的连接器的不同需求。
ENTRY_TYPES:可导入的条目类型的列表,格式为
projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID
。LOCATION_ID
必须相同 要在其中导入元数据的 Google Cloud 位置,或global
。ASPECT_TYPES: 在导入的范围内,格式为
projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID
。LOCATION_ID
必须相同 要在其中导入元数据的 Google Cloud 位置,或global
。NETWORK_TAGS(可选):网络标记列表。
NETWORK_URI(可选):连接到数据源的 VPC 网络的 URI。如果提供网络,请忽略子网参数。
SUBNETWORK_URI(可选): 连接到数据源的子网如果您提供 子网,请忽略网络参数。
流水线的运行时间可能需要几分钟或更长时间,具体取决于您导入的元数据量。如需详细了解 如何查看进度,请参阅 访问工作流执行结果。
流水线运行完毕后,您可以在 Dataplex Catalog 中搜索导入的元数据。
可选:如果您想按计划运行流水线,请使用 Cloud Scheduler 创建时间表。请提供以下信息:
- 频率:一个 unix-cron 表达式,用于定义运行流水线的时间表。
- 工作流参数:连接器的运行时参数,如上一步所述。
- 服务账号:服务账号。服务账号 管理调度器。
gcloud
将以下工作负载定义保存为 YAML 文件:
main: params: [args] steps: - init: assign: - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")} - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")} - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")} - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])} - NETWORK_TYPE: "networkUri" - check_networking: switch: - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""} raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one." - condition: ${NETWORK_URI != ""} steps: - submit_extract_job_with_network_uri: assign: - NETWORKING: ${NETWORK_URI} - NETWORK_TYPE: "networkUri" - condition: ${SUBNETWORK_URI != ""} steps: - submit_extract_job_with_subnetwork_uri: assign: - NETWORKING: ${SUBNETWORK_URI} - NETWORK_TYPE: "subnetworkUri" next: check_create_target_entry_group - check_create_target_entry_group: switch: - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true} next: create_target_entry_group - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false} next: generate_extract_job_link - create_target_entry_group: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" next: generate_extract_job_link - generate_extract_job_link: call: sys.log args: data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID} severity: "INFO" next: submit_pyspark_extract_job - submit_pyspark_extract_job: call: http.post args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" headers: Content-Type: "application/json" query: batchId: ${WORKFLOW_ID} body: pysparkBatch: mainPythonFileUri: file:///main.py args: - ${"--target_project_id=" + args.TARGET_PROJECT_ID} - ${"--target_location_id=" + args.CLOUD_REGION} - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID} - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID} - ${"--output_folder=" + WORKFLOW_ID} - ${args.ADDITIONAL_CONNECTOR_ARGS} runtimeConfig: containerImage: ${args.CUSTOM_CONTAINER_IMAGE} environmentConfig: executionConfig: serviceAccount: ${args.SERVICE_ACCOUNT} stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID} ${NETWORK_TYPE}: ${NETWORKING} networkTags: ${NETWORK_TAGS} result: RESPONSE_MESSAGE next: check_pyspark_extract_job - check_pyspark_extract_job: call: http.get args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: PYSPARK_EXTRACT_JOB_STATUS next: check_pyspark_extract_job_done - check_pyspark_extract_job_done: switch: - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"} next: generate_import_logs_link - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"} raise: ${PYSPARK_EXTRACT_JOB_STATUS} - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"} raise: ${PYSPARK_EXTRACT_JOB_STATUS} next: pyspark_extract_job_wait - pyspark_extract_job_wait: call: sys.sleep args: seconds: 30 next: check_pyspark_extract_job - generate_import_logs_link: call: sys.log args: data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"} severity: "INFO" next: submit_import_job - submit_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" body: type: IMPORT import_spec: source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"} entry_sync_mode: FULL aspect_sync_mode: INCREMENTAL log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")} scope: entry_groups: - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID} entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES} aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES} result: IMPORT_JOB_RESPONSE next: get_job_start_time - get_job_start_time: assign: - importJobStartTime: ${sys.now()} next: import_job_startup_wait - import_job_startup_wait: call: sys.sleep args: seconds: 30 next: initial_get_import_job - initial_get_import_job: call: http.get args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: IMPORT_JOB_STATUS next: check_import_job_status_available - check_import_job_status_available: switch: - condition: ${"status" in IMPORT_JOB_STATUS.body} next: check_import_job_done - condition: ${sys.now() - importJobStartTime > 300} # 5 minutes = 300 seconds next: kill_import_job next: import_job_status_wait - import_job_status_wait: call: sys.sleep args: seconds: 30 next: check_import_job_status_available - check_import_job_done: switch: - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"} next: the_end - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"} raise: ${IMPORT_JOB_STATUS} - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"} raise: ${IMPORT_JOB_STATUS} - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"} raise: ${IMPORT_JOB_STATUS} - condition: ${sys.now() - importJobStartTime > 43200} # 12 hours = 43200 seconds next: kill_import_job next: import_job_wait - get_import_job: call: http.get args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: IMPORT_JOB_STATUS next: check_import_job_done - import_job_wait: call: sys.sleep args: seconds: 30 next: get_import_job - kill_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" next: get_killed_import_job - get_killed_import_job: call: http.get args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: KILLED_IMPORT_JOB_STATUS next: killed - killed: raise: ${KILLED_IMPORT_JOB_STATUS} - the_end: return: ${IMPORT_JOB_STATUS}
定义以下 Bash 变量:
workflow_name="WORKFLOW_NAME" project_id="PROJECT_ID" location="LOCATION_ID" service_account="SERVICE_ACCOUNT" workflow_source="WORKFLOW_DEFINITION_FILE.yaml"
替换以下内容:
- WORKFLOW_NAME:工作流的名称。
- PROJECT_ID:要将元数据导入到的目标 Google Cloud 项目的名称。
- LOCATION_ID:Dataproc Serverless 和元数据导入作业将运行并导入元数据的目标 Google Cloud 位置。
- SERVICE_ACCOUNT:您要创建的服务账号 在此的所需角色部分中配置 文档。
- WORKFLOW_DEFINITION_FILE:工作流定义 YAML 文件。
-
gcloud workflows deploy ${workflow_name} \ --project=${project_id} \ --location=${location} \ --source=${workflow_source} \ --service-account=${service_account}
如需按需运行流水线 执行工作流:
gcloud workflows run ${workflow_name} --project=${project_id} --location=${location} --data "$(cat << EOF { "TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT", "ADDITIONAL_CONNECTOR_ARGS": [ ADDITIONAL_CONNECTOR_ARGUMENTS ], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "SPARK_DRIVER_TYPE": "PYSPARK", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [NETWORK_TAGS], "NETWORK_URI": "NETWORK_URI", "SUBNETWORK_URI": "SUBNETWORK_URI" } EOF )"
替换以下内容:
ENTRY_GROUP_ID:要添加到的条目组的 ID 将元数据导入其中。条目组 ID 可以包含小写字母 字母、数字和连字符。
此条目组的完整资源名称为
projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID
。CREATE_ENTRY_GROUP_BOOLEAN:如果您希望 创建一个条目组(如果该条目组尚不存在) 您的项目,请将此值设置为
true
。BUCKET_ID:Cloud Storage 的名称 用于存储由 连接器。每次工作流执行都会创建一个新文件夹。
ADDITIONAL_CONNECTOR_ARGUMENTS:要传递给连接器的其他参数的列表。如需查看示例,请参阅为元数据导入开发自定义连接器。用双引号将每个参数括起来 标记,然后使用逗号分隔各个参数。
CONTAINER_IMAGE:自定义容器映像 Artifact Registry 中托管的连接器的不同需求。
ENTRY_TYPES:可导入的条目类型的列表,格式为
projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID
。LOCATION_ID
必须相同 要在其中导入元数据的 Google Cloud 位置,或global
。ASPECT_TYPES: 在导入的范围内,格式为
projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID
。LOCATION_ID
必须相同 要在其中导入元数据的 Google Cloud 位置,或global
。NETWORK_TAGS(可选):网络标记列表。
NETWORK_URI(可选):连接到数据源的 VPC 网络的 URI。如果提供网络,请忽略子网参数。
SUBNETWORK_URI(可选): 连接到数据源的子网如果您提供子网,请省略网络参数。
根据导入的元数据量,工作流程 需要几分钟或更长时间才能运行如需详细了解如何 查看进度 访问工作流执行结果。
流水线运行完毕后,您可以 在 Dataplex Catalog 中搜索导入的元数据。
可选:如果您想按计划运行流水线,请使用 Cloud Scheduler 创建时间表:
gcloud scheduler jobs create http ${workflow_name}-scheduler \ --project=${project_id} \ --location=${region} \ --schedule="SCHEDULE" \ --time-zone="TIME_ZONE" \ --uri="https://workflowexecutions.googleapis.com/v1/projects/${project_id}/locations/${region}/workflows/${workflow_name}/executions" \ --http-method="POST" \ --oauth-service-account-email=${service_account} \ --headers="Content-Type=application/json" \ --message-body="{\"argument\": \"DOUBLE_ESCAPED_JSON_STRING\"}" \
替换以下内容:
- SCHEDULE:一个 Cron 表达式,用于定义 来运行流水线
- TIME_ZONE:时区,例如
UTC
。 - DOUBLE_ESCAPED_JSON_STRING:工作流参数的 JSON 编码。引号内的双引号
带英文引号的字符串使用反斜杠 (\) 进行转义。例如:
--message-body="{\"argument\": \"{\\\"foo\\\": \\\"bar\\\"}\"}"
Terraform
保存以下文件:
另存为
main.tf
:module "cloud_workflows" { source = "GoogleCloudPlatform/cloud-workflows/google" version = "0.1.1" workflow_name = var.workflow_name project_id = var.project_id region = var.region service_account_email = var.service_account workflow_trigger = { cloud_scheduler = { name = "${var.workflow_name}-scheduler" cron = "0 0 * * *" time_zone = "UTC" service_account_email = var.service_account deadline = "1800s" argument = jsonencode(var.workflow_args) } } workflow_source = var.workflow_source }
另存为
variables.tf
:variable "project_id" { default = "" } variable "region" { default = "" } variable "service_account" { default = "" } variable "workflow_name" { default = "managed-orchestration-for-dataplex" } variable "description" { default = "Submits a Dataproc Serverless Job and then runs a Dataplex Import Job. Times out after 12 hours." } variable "workflow_args" { default = {} } variable "cron_schedule" { default = "0 0 * * *" } variable "workflow_source" { default = "" }
将以下变量定义文件保存为
.tfvars
文件。将占位符替换为连接器的信息。project_id = "PROJECT_ID" region = "LOCATION_ID" service_account = "SERVICE_ACCOUNT" cron_schedule = "SCHEDULE" workflow_args = {"TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT", "ADDITIONAL_CONNECTOR_ARGS": [ ADDITIONAL_CONNECTOR_ARGUMENTS ], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "SPARK_DRIVER_TYPE": "PYSPARK", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "NETWORK_TAGS": [NETWORK_TAGS], "NETWORK_URI": "NETWORK_URI", "SUBNETWORK_URI": "SUBNETWORK_URI" } workflow_source = <<EOF main: params: [args] steps: - init: assign: - WORKFLOW_ID: $${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")} - NETWORK_URI: $${default(map.get(args, "NETWORK_URI"), "")} - SUBNETWORK_URI: $${default(map.get(args, "SUBNETWORK_URI"), "")} - NETWORK_TAGS: $${default(map.get(args, "NETWORK_TAGS"), [])} - NETWORK_TYPE: "networkUri" - check_networking: switch: - condition: $${NETWORK_URI != "" and SUBNETWORK_URI != ""} raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one." - condition: $${NETWORK_URI != ""} steps: - submit_extract_job_with_network_uri: assign: - NETWORKING: $${NETWORK_URI} - NETWORK_TYPE: "networkUri" - condition: $${SUBNETWORK_URI != ""} steps: - submit_extract_job_with_subnetwork_uri: assign: - NETWORKING: $${SUBNETWORK_URI} - NETWORK_TYPE: "subnetworkUri" next: check_create_target_entry_group - check_create_target_entry_group: switch: - condition: $${args.CREATE_TARGET_ENTRY_GROUP == true} next: create_target_entry_group - condition: $${args.CREATE_TARGET_ENTRY_GROUP == false} next: generate_extract_job_link - create_target_entry_group: call: http.post args: url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" next: generate_extract_job_link - generate_extract_job_link: call: sys.log args: data: $${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID} severity: "INFO" next: submit_pyspark_extract_job - submit_pyspark_extract_job: call: http.post args: url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" headers: Content-Type: "application/json" query: batchId: $${WORKFLOW_ID} body: pysparkBatch: mainPythonFileUri: file:///main.py args: - $${"--target_project_id=" + args.TARGET_PROJECT_ID} - $${"--target_location_id=" + args.CLOUD_REGION} - $${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID} - $${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID} - $${"--output_folder=" + WORKFLOW_ID} - $${args.ADDITIONAL_CONNECTOR_ARGS} runtimeConfig: containerImage: $${args.CUSTOM_CONTAINER_IMAGE} environmentConfig: executionConfig: serviceAccount: $${args.SERVICE_ACCOUNT} stagingBucket: $${args.CLOUD_STORAGE_BUCKET_ID} $${NETWORK_TYPE}: $${NETWORKING} networkTags: $${NETWORK_TAGS} result: RESPONSE_MESSAGE next: check_pyspark_extract_job - check_pyspark_extract_job: call: http.get args: url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: PYSPARK_EXTRACT_JOB_STATUS next: check_pyspark_extract_job_done - check_pyspark_extract_job_done: switch: - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"} next: generate_import_logs_link - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"} raise: $${PYSPARK_EXTRACT_JOB_STATUS} - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"} raise: $${PYSPARK_EXTRACT_JOB_STATUS} next: pyspark_extract_job_wait - pyspark_extract_job_wait: call: sys.sleep args: seconds: 30 next: check_pyspark_extract_job - generate_import_logs_link: call: sys.log args: data: $${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"} severity: "INFO" next: submit_import_job - submit_import_job: call: http.post args: url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" body: type: IMPORT import_spec: source_storage_uri: $${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"} entry_sync_mode: FULL aspect_sync_mode: INCREMENTAL log_level: $${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")} scope: entry_groups: - $${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID} entry_types: $${args.IMPORT_JOB_SCOPE_ENTRY_TYPES} aspect_types: $${args.IMPORT_JOB_SCOPE_ASPECT_TYPES} result: IMPORT_JOB_RESPONSE next: get_job_start_time - get_job_start_time: assign: - importJobStartTime: $${sys.now()} next: import_job_startup_wait - import_job_startup_wait: call: sys.sleep args: seconds: 30 next: initial_get_import_job - initial_get_import_job: call: http.get args: url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: IMPORT_JOB_STATUS next: check_import_job_status_available - check_import_job_status_available: switch: - condition: $${"status" in IMPORT_JOB_STATUS.body} next: check_import_job_done - condition: $${sys.now() - importJobStartTime > 300} # 5 minutes = 300 seconds next: kill_import_job next: import_job_status_wait - import_job_status_wait: call: sys.sleep args: seconds: 30 next: check_import_job_status_available - check_import_job_done: switch: - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"} next: the_end - condition: $${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"} raise: $${IMPORT_JOB_STATUS} - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"} raise: $${IMPORT_JOB_STATUS} - condition: $${IMPORT_JOB_STATUS.body.status.state == "FAILED"} raise: $${IMPORT_JOB_STATUS} - condition: $${sys.now() - importJobStartTime > 43200} # 12 hours = 43200 seconds next: kill_import_job next: import_job_wait - get_import_job: call: http.get args: url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: IMPORT_JOB_STATUS next: check_import_job_done - import_job_wait: call: sys.sleep args: seconds: 30 next: get_import_job - kill_import_job: call: http.post args: url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" next: get_killed_import_job - get_killed_import_job: call: http.get args: url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: KILLED_IMPORT_JOB_STATUS next: killed - killed: raise: $${KILLED_IMPORT_JOB_STATUS} - the_end: return: $${IMPORT_JOB_STATUS} EOF
替换以下内容:
PROJECT_ID
:要将元数据导入到的目标 Google Cloud 项目的名称。LOCATION_ID
:Dataproc Serverless 和元数据导入作业将运行并导入元数据的目标 Google Cloud 位置。SERVICE_ACCOUNT
:您在本文档的所需角色部分中配置的服务账号。SCHEDULE
:用于定义运行的时间表的 Cron 表达式 流水线。ENTRY_GROUP_ID
:要向其中导入元数据的条目组的 ID。 条目组 ID 可以包含小写字母、数字和连字符。此条目组的完整资源名称是
projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID
。CREATE_ENTRY_GROUP_BOOLEAN
:如果您希望流水线在项目中不存在条目组时创建该条目组,请将此值设置为true
。BUCKET_ID
:用于存储连接器生成的元数据导入文件的 Cloud Storage 存储桶的名称。每次工作流执行都会创建一个新文件夹。ADDITIONAL_CONNECTOR_ARGUMENTS
:您用于定义工作流的其他连接器参数的列表。如需查看示例,请参阅为元数据导入开发自定义连接器。 请用双引号括起每个参数,并用英文逗号分隔参数。CONTAINER_IMAGE
:Artifact Registry 中托管的连接器的自定义容器映像。ENTRY_TYPES
:符合导入的条目类型的列表,位于 格式为projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID
LOCATION_ID
必须是您导入的 Google Cloud 位置 或global
。ASPECT_TYPES
:要导入的方面类型的列表,格式为projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID
。LOCATION_ID
必须与您要导入元数据的 Google Cloud 位置相同,或者为global
。NETWORK_TAGS
(可选):网络标记列表。NETWORK_URI
(可选):VPC 网络的 URI, 连接到数据源。如果您指定了网络,请省略子网参数。SUBNETWORK_URI
(可选):连接到数据源的子网的 URI。如果提供子网,请忽略网络参数。
初始化 Terraform:
terraform init
使用
.tfvars
文件验证 Terraform:terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
将
CONNECTOR_VARIABLES_FILE
替换为名称 变量定义文件中的代码类型。使用
.tfvars
文件部署 Terraform:terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
Terraform 会在 指定项目。Workflows 于 2024 年 12 月 根据您指定的时间表运行
工作流的运行时间可能需要几分钟或更长时间,具体取决于您导入的元数据量。如需详细了解如何 查看进度 访问工作流执行结果。
流水线运行完毕后,您可以 在 Dataplex Catalog 中搜索导入的元数据。
查看作业日志
使用 Cloud Logging 查看代管式连接流水线的日志。日志 载荷包含指向 Dataproc Serverless 日志的链接 批量作业和元数据导入作业。如需了解详情,请参阅查看工作流日志。
问题排查
请参考以下问题排查建议:
- 将元数据作业的导入作业日志级别配置为 使用调试级日志记录 而不是信息级日志记录
- 查看 Dataproc 无服务器批量作业的日志( 连接器运行)和元数据导入作业。如需了解详情,请参阅查询 Dataproc Serverless for Spark 日志和查询元数据作业日志。
- 如果无法使用流水线导入条目,并且错误消息未提供足够的信息,请尝试在测试条目组中创建具有相同详细信息的自定义条目。如需了解详情,请参阅 创建自定义条目。