使用 Workflows 从自定义来源导入元数据

本文档介绍了如何在 Workflows 中运行托管式连接流水线,以便将元数据从第三方来源导入 Dataplex。

若要设置托管式连接流水线,您需要为数据源构建连接器。然后,您可以在 Workflows 中运行该流水线。通过 流水线从您的数据源提取元数据,然后导入这些元数据 Dataplex。如有必要,流水线还会创建 Google Cloud 项目中的 Dataplex Catalog 条目组。

如需详细了解托管式连接,请参阅托管式连接概览

准备工作

在导入元数据之前,请完成本部分中的任务。

构建连接器

连接器会从数据源中提取元数据,并生成可供 Dataplex 导入的元数据导入文件。该连接器是一个可以在 Artifact Registry 中运行的 Artifact Registry 映像, Dataproc Serverless。

配置 Google Cloud 资源

  1. Enable the Workflows, Dataproc, Cloud Storage, Dataplex, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.

    Enable the APIs

    如果您不打算按计划运行流水线,则无需 启用 Cloud Scheduler API。

  2. 在 Secret Manager 中创建 Secret 来存储第三方数据源的凭据。

  3. 配置 Virtual Private Cloud (VPC) 网络,以运行 Dataproc Serverless for Spark 工作负载。

  4. 创建 Cloud Storage 存储桶以存储元数据导入文件。

  5. 创建以下 Dataplex Catalog 资源:

    1. 为要导入的条目创建自定义方面类型

    2. 创建自定义条目类型

所需的角色

服务账号代表工作流的身份, 工作流拥有的权限,以及它可以使用哪些 Google Cloud 资源 访问权限。您需要一个用于 Workflows 的服务账号(要运行 流水线)和 Dataproc Serverless(运行连接器)。

您可以使用 Compute Engine 默认服务账号 (PROJECT_NUMBER-compute@developer.gserviceaccount.com),或创建自己的服务账号 来运行代管式连接流水线。

控制台

  1. 在 Google Cloud 控制台中,转到 IAM 页面。

    转到 IAM

  2. 选择要导入元数据的项目。

  3. 点击 授予访问权限。 然后输入服务账号的电子邮件地址。

  4. 向服务账号分配以下角色:

    • Logs Writer
    • Dataplex Entry Group Owner
    • Dataplex Metadata Job Owner
    • Dataplex Catalog Editor
    • Dataproc Editor
    • Dataproc Worker
    • Secret Manager Secret Accessor - 用于存储 数据源的凭据
    • Storage Object User - Cloud Storage 存储桶
    • Artifact Registry 读取者 - 位于 Artifact Registry 代码库中 包含连接器映像
    • Service Account User(服务账号用户)- 如果您使用不同的服务账号,请向运行 Dataproc Serverless 批处理作业的服务账号授予运行工作流的服务账号的此角色
    • Workflows Invoker - 如果您想安排流水线的运行时间
  5. 保存更改。

gcloud

  1. 向服务账号授予角色。运行以下命令:

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT" \
        --role=roles/logging.logWriter
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT" \
        --role=roles/dataplex.entryGroupOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT" \
        --role=roles/dataplex.metadataJobOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT" \
        --role=roles/dataplex.catalogEditor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT" \
        --role=roles/dataproc.editor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT" \
        --role=roles/dataproc.worker
    

    替换以下内容:

    • PROJECT_ID:目标的项目 ID 要导入元数据的 Google Cloud 项目。
    • SERVICE_ACCOUNT:服务账号,例如 my-service-account@my-project.iam.gserviceaccount.com
  2. 在资源层级向服务账号授予以下角色:

    gcloud secrets add-iam-policy-binding SECRET_ID \
        --member="serviceAccount:SERVICE_ACCOUNT" \
        --role=roles/secretmanager.secretaccessor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT" \
        --role=roles/storage.objectUser \
        --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID')
    gcloud artifacts repositories add-iam-policy-binding REPOSITORY \
        --location=REPOSITORY_LOCATION \
        --member=SERVICE_ACCOUNT} \
        --role=roles/artifactregistry.reader
    

    替换以下内容:

    • SECRET_ID:用于存储数据源凭据的 Secret 的 ID。其格式为 projects/PROJECT_ID/secrets/SECRET_ID
    • BUCKET_ID:Cloud Storage 存储桶的名称。
    • REPOSITORY:Artifact Registry 代码库 (包含连接器映像)
    • REPOSITORY_LOCATION:代码库托管的 Google Cloud 位置。
  3. 向运行 Workflows 的服务账号授予 针对服务账号的 roles/iam.serviceAccountUser 角色 运行 Dataproc 无服务器批量作业。即使您为 Workflows 和 Dataproc Serverless 使用相同的服务账号,也必须授予此角色。

    gcloud iam service-accounts add-iam-policy-binding \
        serviceAccount:SERVICE_ACCOUNT \
        --member='SERVICE_ACCOUNT' \
        --role='roles/iam.serviceAccountUser'
    

    如果您使用不同的服务账号,则 --member 标志的值 是运行 Dataproc Serverless 的服务账号 批量作业。

  4. 如果要安排流水线,请向服务账号授予 以下角色:

    gcloud projects add-iam-policy-binding PROJECT_ID \
     --member="SERVICE_ACCOUNT" \
     --role=roles/workflows.invoker
    

导入元数据

如需导入元数据,请创建并执行运行代管式 流水线。(可选)您还可以创建运行流水线的时间表。

控制台

  1. 创建工作流。 请提供以下信息:

    • 服务账号:您在 本文档的必需角色部分。
    • 加密:选择 Google 管理的加密密钥

    • 定义工作流:提供以下定义文件:

      main:
        params: [args]
        steps:
          - init:
              assign:
              - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
              - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
              - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
              - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
              - NETWORK_TYPE: "networkUri"
      
          - check_networking:
              switch:
                - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                  raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
                - condition: ${NETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_network_uri:
                        assign:
                          - NETWORKING: ${NETWORK_URI}
                          - NETWORK_TYPE: "networkUri"
                - condition: ${SUBNETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_subnetwork_uri:
                        assign:
                          - NETWORKING: ${SUBNETWORK_URI}
                          - NETWORK_TYPE: "subnetworkUri"
              next: check_create_target_entry_group
      
          - check_create_target_entry_group:
              switch:
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                  next: create_target_entry_group
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                  next: generate_extract_job_link
      
          - create_target_entry_group:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: generate_extract_job_link
      
          - generate_extract_job_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                  severity: "INFO"
              next: submit_pyspark_extract_job
      
          - submit_pyspark_extract_job:
              call: http.post
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                headers:
                  Content-Type: "application/json"
                query:
                  batchId: ${WORKFLOW_ID}
                body:
                  pysparkBatch:
                    mainPythonFileUri: file:///main.py
                    args:
                      - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                      - ${"--target_location_id=" + args.CLOUD_REGION}
                      - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                      - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                      - ${"--output_folder=" + WORKFLOW_ID}
                      - ${args.ADDITIONAL_CONNECTOR_ARGS}
                  runtimeConfig:
                      containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                  environmentConfig:
                      executionConfig:
                          serviceAccount: ${args.SERVICE_ACCOUNT}
                          stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                          ${NETWORK_TYPE}: ${NETWORKING}
                          networkTags: ${NETWORK_TAGS}
              result: RESPONSE_MESSAGE
              next: check_pyspark_extract_job
      
          - check_pyspark_extract_job:
              call: http.get
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: PYSPARK_EXTRACT_JOB_STATUS
              next: check_pyspark_extract_job_done
      
          - check_pyspark_extract_job_done:
              switch:
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                  next: generate_import_logs_link
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              next: pyspark_extract_job_wait
      
          - pyspark_extract_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_pyspark_extract_job
      
          - generate_import_logs_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                  severity: "INFO"
              next: submit_import_job
      
          - submit_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                body:
                  type: IMPORT
                  import_spec:
                    source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                    entry_sync_mode: FULL
                    aspect_sync_mode: INCREMENTAL
                    log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                    scope:
                      entry_groups:
                        - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                      entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                      aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
              result: IMPORT_JOB_RESPONSE
              next: get_job_start_time
      
          - get_job_start_time:
              assign:
                - importJobStartTime: ${sys.now()}
              next: import_job_startup_wait
      
          - import_job_startup_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: initial_get_import_job
      
          - initial_get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_status_available
      
          - check_import_job_status_available:
              switch:
                - condition: ${"status" in IMPORT_JOB_STATUS.body}
                  next: check_import_job_done
                - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                  next: kill_import_job
              next: import_job_status_wait
      
          - import_job_status_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_import_job_status_available
      
          - check_import_job_done:
              switch:
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                  next: the_end
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                  next: kill_import_job
              next: import_job_wait
      
          - get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_done
      
          - import_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: get_import_job
      
          - kill_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: get_killed_import_job
      
          - get_killed_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: KILLED_IMPORT_JOB_STATUS
              next: killed
      
          - killed:
              raise: ${KILLED_IMPORT_JOB_STATUS}
      
          - the_end:
              return: ${IMPORT_JOB_STATUS}
  2. 如需按需运行流水线 执行工作流

    提供以下运行时参数:

    {
        "TARGET_PROJECT_ID": "PROJECT_ID",
        "CLOUD_REGION": "LOCATION_ID",
        "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
        "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
        "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
        "SERVICE_ACCOUNT": "SERVICE_ACCOUNT",
        "ADDITIONAL_CONNECTOR_ARGS": [
            ADDITIONAL_CONNECTOR_ARGUMENTS
        ],
        "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
        "SPARK_DRIVER_TYPE": "PYSPARK",
        "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
        "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
        "IMPORT_JOB_LOG_LEVEL": "INFO",
        "NETWORK_TAGS": [NETWORK_TAGS],
        "NETWORK_URI": "NETWORK_URI",
        "SUBNETWORK_URI": "SUBNETWORK_URI"
    }
    

    替换以下内容:

    • PROJECT_ID:要将元数据导入到的目标 Google Cloud 项目的名称。
    • LOCATION_ID:目标 Google Cloud Dataproc Serverless 导入作业将运行,然后导入元数据。
    • ENTRY_GROUP_ID:要添加到的条目组的 ID 将元数据导入其中。条目组 ID 可以包含小写字母 字母、数字和连字符。

      此条目组的完整资源名称为 projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID

    • CREATE_ENTRY_GROUP_BOOLEAN:如果您希望 创建一个条目组(如果该条目组尚不存在) 您的项目,请将此值设置为 true

    • BUCKET_ID:用于存储连接器生成的元数据导入文件的 Cloud Storage 存储桶的名称。每次工作流执行都会创建一个新文件夹。

    • SERVICE_ACCOUNT:服务账号。通过 服务账号在 Dataproc Serverless。

    • ADDITIONAL_CONNECTOR_ARGUMENTS:要传递给连接器的其他参数的列表。如需查看示例,请参阅为元数据导入开发自定义连接器。用双引号将每个参数括起来 标记,然后使用逗号分隔各个参数。

    • CONTAINER_IMAGE:自定义容器映像 Artifact Registry 中托管的连接器的不同需求。

    • ENTRY_TYPES:可导入的条目类型的列表,格式为 projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_IDLOCATION_ID 必须相同 要在其中导入元数据的 Google Cloud 位置,或 global

    • ASPECT_TYPES: 在导入的范围内,格式为 projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_IDLOCATION_ID 必须相同 要在其中导入元数据的 Google Cloud 位置,或 global

    • NETWORK_TAGS(可选):网络标记列表。

    • NETWORK_URI(可选):连接到数据源的 VPC 网络的 URI。如果提供网络,请忽略子网参数。

    • SUBNETWORK_URI(可选): 连接到数据源的子网如果您提供 子网,请忽略网络参数。

    流水线的运行时间可能需要几分钟或更长时间,具体取决于您导入的元数据量。如需详细了解 如何查看进度,请参阅 访问工作流执行结果

    流水线运行完毕后,您可以在 Dataplex Catalog 中搜索导入的元数据

  3. 可选:如果您想按计划运行流水线,请使用 Cloud Scheduler 创建时间表。请提供以下信息:

    • 频率:一个 unix-cron 表达式,用于定义运行流水线的时间表。
    • 工作流参数:连接器的运行时参数,如上一步所述。
    • 服务账号:服务账号。服务账号 管理调度器。

gcloud

  1. 将以下工作负载定义保存为 YAML 文件:

    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
            - NETWORK_TYPE: "networkUri"
    
        - check_networking:
            switch:
              - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: ${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: ${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: ${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: ${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - ${"--target_location_id=" + args.CLOUD_REGION}
                    - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--output_folder=" + WORKFLOW_ID}
                    - ${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: ${args.SERVICE_ACCOUNT}
                        stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                        ${NETWORK_TYPE}: ${NETWORKING}
                        networkTags: ${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups:
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: ${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: ${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
              - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: ${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: ${IMPORT_JOB_STATUS}
  2. 定义以下 Bash 变量:

    workflow_name="WORKFLOW_NAME"
    project_id="PROJECT_ID"
    location="LOCATION_ID"
    service_account="SERVICE_ACCOUNT"
    workflow_source="WORKFLOW_DEFINITION_FILE.yaml"
    

    替换以下内容:

    • WORKFLOW_NAME:工作流的名称。
    • PROJECT_ID:要将元数据导入到的目标 Google Cloud 项目的名称。
    • LOCATION_ID:Dataproc Serverless 和元数据导入作业将运行并导入元数据的目标 Google Cloud 位置。
    • SERVICE_ACCOUNT:您要创建的服务账号 在此的所需角色部分中配置 文档。
    • WORKFLOW_DEFINITION_FILE:工作流定义 YAML 文件。
  3. 创建工作流

    gcloud workflows deploy ${workflow_name} \
        --project=${project_id} \
        --location=${location} \
        --source=${workflow_source} \
        --service-account=${service_account}
    
  4. 如需按需运行流水线 执行工作流

    gcloud workflows run ${workflow_name} --project=${project_id} --location=${location} --data "$(cat << EOF
    {
    "TARGET_PROJECT_ID": "PROJECT_ID",
    "CLOUD_REGION": "LOCATION_ID",
    "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
    "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
    "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
    "SERVICE_ACCOUNT": "SERVICE_ACCOUNT",
    "ADDITIONAL_CONNECTOR_ARGS": [
        ADDITIONAL_CONNECTOR_ARGUMENTS
    ],
    "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
    "SPARK_DRIVER_TYPE": "PYSPARK",
    "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
    "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
    "IMPORT_JOB_LOG_LEVEL": "INFO",
    "NETWORK_TAGS": [NETWORK_TAGS],
    "NETWORK_URI": "NETWORK_URI",
    "SUBNETWORK_URI": "SUBNETWORK_URI"
    }
    EOF
    )"
    

    替换以下内容:

    • ENTRY_GROUP_ID:要添加到的条目组的 ID 将元数据导入其中。条目组 ID 可以包含小写字母 字母、数字和连字符。

      此条目组的完整资源名称为 projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID

    • CREATE_ENTRY_GROUP_BOOLEAN:如果您希望 创建一个条目组(如果该条目组尚不存在) 您的项目,请将此值设置为 true

    • BUCKET_ID:Cloud Storage 的名称 用于存储由 连接器。每次工作流执行都会创建一个新文件夹。

    • ADDITIONAL_CONNECTOR_ARGUMENTS:要传递给连接器的其他参数的列表。如需查看示例,请参阅为元数据导入开发自定义连接器。用双引号将每个参数括起来 标记,然后使用逗号分隔各个参数。

    • CONTAINER_IMAGE:自定义容器映像 Artifact Registry 中托管的连接器的不同需求。

    • ENTRY_TYPES:可导入的条目类型的列表,格式为 projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_IDLOCATION_ID 必须相同 要在其中导入元数据的 Google Cloud 位置,或 global

    • ASPECT_TYPES: 在导入的范围内,格式为 projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_IDLOCATION_ID 必须相同 要在其中导入元数据的 Google Cloud 位置,或 global

    • NETWORK_TAGS(可选):网络标记列表。

    • NETWORK_URI(可选):连接到数据源的 VPC 网络的 URI。如果提供网络,请忽略子网参数。

    • SUBNETWORK_URI(可选): 连接到数据源的子网如果您提供子网,请省略网络参数。

    根据导入的元数据量,工作流程 需要几分钟或更长时间才能运行如需详细了解如何 查看进度 访问工作流执行结果

    流水线运行完毕后,您可以 在 Dataplex Catalog 中搜索导入的元数据

  5. 可选:如果您想按计划运行流水线,请使用 Cloud Scheduler 创建时间表

    gcloud scheduler jobs create http ${workflow_name}-scheduler \
        --project=${project_id} \
        --location=${region} \
        --schedule="SCHEDULE" \
        --time-zone="TIME_ZONE" \
        --uri="https://workflowexecutions.googleapis.com/v1/projects/${project_id}/locations/${region}/workflows/${workflow_name}/executions" \
        --http-method="POST" \
        --oauth-service-account-email=${service_account} \
        --headers="Content-Type=application/json" \
        --message-body="{\"argument\": \"DOUBLE_ESCAPED_JSON_STRING\"}" \
    

    替换以下内容:

    • SCHEDULE:一个 Cron 表达式,用于定义 来运行流水线
    • TIME_ZONE:时区,例如 UTC
    • DOUBLE_ESCAPED_JSON_STRING:工作流参数的 JSON 编码。引号内的双引号 带英文引号的字符串使用反斜杠 (\) 进行转义。例如: --message-body="{\"argument\": \"{\\\"foo\\\": \\\"bar\\\"}\"}"

Terraform

  1. 保存以下文件:

    • 另存为 main.tf

      module "cloud_workflows" {
          source  = "GoogleCloudPlatform/cloud-workflows/google"
          version = "0.1.1"
          workflow_name             = var.workflow_name
          project_id                = var.project_id
          region                    = var.region
          service_account_email     = var.service_account
          workflow_trigger = {
              cloud_scheduler = {
              name                  = "${var.workflow_name}-scheduler"
              cron                  = "0 0 * * *"
              time_zone             = "UTC"
              service_account_email = var.service_account
              deadline              = "1800s"
              argument              = jsonencode(var.workflow_args)
              }
          }
          workflow_source = var.workflow_source
          }
      
    • 另存为 variables.tf

      variable "project_id" {
          default = ""
      }
      
      variable "region" {
          default = ""
      }
      
      variable "service_account" {
          default = ""
      }
      
      variable "workflow_name" {
          default = "managed-orchestration-for-dataplex"
      }
      
      variable "description" {
          default = "Submits a Dataproc Serverless Job and then runs a Dataplex Import Job. Times out after 12 hours."
      }
      
      variable "workflow_args" {
          default = {}
      }
      
      variable "cron_schedule" {
          default = "0 0 * * *"
      }
      
      variable "workflow_source" {
          default = ""
      }
      
  2. 将以下变量定义文件保存为 .tfvars 文件。将占位符替换为连接器的信息。

    project_id                      = "PROJECT_ID"
    region                          = "LOCATION_ID"
    service_account                 = "SERVICE_ACCOUNT"
    cron_schedule                   = "SCHEDULE"
    workflow_args                   = {"TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT", "ADDITIONAL_CONNECTOR_ARGS": [ ADDITIONAL_CONNECTOR_ARGUMENTS ], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "SPARK_DRIVER_TYPE": "PYSPARK", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "NETWORK_TAGS": [NETWORK_TAGS], "NETWORK_URI": "NETWORK_URI", "SUBNETWORK_URI": "SUBNETWORK_URI" }
    
    workflow_source                 = <<EOF
    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: $${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: $${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: $${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: $${default(map.get(args, "NETWORK_TAGS"), [])}
            - NETWORK_TYPE: "networkUri"
    
        - check_networking:
            switch:
              - condition: $${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: $${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: $${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: $${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: $${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: $${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - $${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - $${"--target_location_id=" + args.CLOUD_REGION}
                    - $${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - $${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - $${"--output_folder=" + WORKFLOW_ID}
                    - $${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: $${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: $${args.SERVICE_ACCOUNT}
                        stagingBucket: $${args.CLOUD_STORAGE_BUCKET_ID}
                        $${NETWORK_TYPE}: $${NETWORKING}
                        networkTags: $${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: $${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: $${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups:
                      - $${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: $${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: $${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: $${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: $${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
              - condition: $${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: $${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: $${IMPORT_JOB_STATUS}
    EOF

    替换以下内容:

    • PROJECT_ID:要将元数据导入到的目标 Google Cloud 项目的名称。
    • LOCATION_ID:Dataproc Serverless 和元数据导入作业将运行并导入元数据的目标 Google Cloud 位置。
    • SERVICE_ACCOUNT:您在本文档的所需角色部分中配置的服务账号。
    • SCHEDULE:用于定义运行的时间表的 Cron 表达式 流水线。
    • ENTRY_GROUP_ID:要向其中导入元数据的条目组的 ID。 条目组 ID 可以包含小写字母、数字和连字符。

      此条目组的完整资源名称是 projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID

    • CREATE_ENTRY_GROUP_BOOLEAN:如果您希望流水线在项目中不存在条目组时创建该条目组,请将此值设置为 true
    • BUCKET_ID:用于存储连接器生成的元数据导入文件的 Cloud Storage 存储桶的名称。每次工作流执行都会创建一个新文件夹。
    • ADDITIONAL_CONNECTOR_ARGUMENTS:您用于定义工作流的其他连接器参数的列表。如需查看示例,请参阅为元数据导入开发自定义连接器。 请用双引号括起每个参数,并用英文逗号分隔参数。
    • CONTAINER_IMAGE:Artifact Registry 中托管的连接器的自定义容器映像。
    • ENTRY_TYPES:符合导入的条目类型的列表,位于 格式为 projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID LOCATION_ID 必须是您导入的 Google Cloud 位置 或 global
    • ASPECT_TYPES:要导入的方面类型的列表,格式为 projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_IDLOCATION_ID 必须与您要导入元数据的 Google Cloud 位置相同,或者为 global
    • NETWORK_TAGS(可选):网络标记列表。
    • NETWORK_URI(可选):VPC 网络的 URI, 连接到数据源。如果您指定了网络,请省略子网参数。
    • SUBNETWORK_URI(可选):连接到数据源的子网的 URI。如果提供子网,请忽略网络参数。
  3. 初始化 Terraform:

    terraform init
    
  4. 使用 .tfvars 文件验证 Terraform:

    terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    CONNECTOR_VARIABLES_FILE 替换为名称 变量定义文件中的代码类型。

  5. 使用 .tfvars 文件部署 Terraform:

    terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Terraform 会在 指定项目。Workflows 于 2024 年 12 月 根据您指定的时间表运行

    工作流的运行时间可能需要几分钟或更长时间,具体取决于您导入的元数据量。如需详细了解如何 查看进度 访问工作流执行结果

    流水线运行完毕后,您可以 在 Dataplex Catalog 中搜索导入的元数据

查看作业日志

使用 Cloud Logging 查看代管式连接流水线的日志。日志 载荷包含指向 Dataproc Serverless 日志的链接 批量作业和元数据导入作业。如需了解详情,请参阅查看工作流日志

问题排查

请参考以下问题排查建议:

后续步骤