Metadaten mithilfe von Workflows aus einer benutzerdefinierten Quelle importieren

In diesem Dokument wird beschrieben, wie Sie Metadaten aus einer Drittanbieterquelle in Dataplex importieren, indem Sie eine verwaltete Konnektivitätspipeline in Workflows ausführen.

Wenn Sie eine verwaltete Konnektivitätspipeline einrichten möchten, erstellen Sie einen Connector für Ihre Datenquelle. Anschließend führen Sie die Pipeline in Workflows aus. Die Pipeline extrahiert Metadaten aus Ihrer Datenquelle und importiert sie dann in Dataplex. Bei Bedarf erstellt die Pipeline auch Dataplex Catalog-Eintragsgruppen in Ihrem Google Cloud -Projekt.

Weitere Informationen zur verwalteten Konnektivität finden Sie unter Verwaltete Konnektivität – Übersicht.

Hinweis

Führen Sie die Aufgaben in diesem Abschnitt aus, bevor Sie Metadaten importieren.

Connector erstellen

Ein Connector extrahiert die Metadaten aus Ihrer Datenquelle und generiert eine Metadatenimportdatei, die von Dataplex importiert werden kann. Der Connector ist ein Artifact Registry-Image, das auf Dataproc Serverless ausgeführt werden kann.

Google Cloud -Ressourcen konfigurieren

  1. Enable the Workflows, Dataproc, Cloud Storage, Dataplex, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.

    Enable the APIs

    Wenn Sie die Pipeline nicht nach einem Zeitplan ausführen möchten, müssen Sie die Cloud Scheduler API nicht aktivieren.

  2. Erstellen Sie Secrets in Secret Manager, um die Anmeldedaten für Ihre Drittanbieterdatenquelle zu speichern.

  3. Konfigurieren Sie Ihr VPC-Netzwerk (Virtual Private Cloud), um Dataproc Serverless für Spark-Arbeitslasten auszuführen.

  4. Erstellen Sie einen Cloud Storage-Bucket zum Speichern der Metadatenimportdateien.

  5. Erstellen Sie die folgenden Dataplex Catalog-Ressourcen:

    1. Erstellen Sie benutzerdefinierte Aspekttypen für die Einträge, die Sie importieren möchten.

    2. Erstellen Sie benutzerdefinierte Eintragstypen für die Einträge, die Sie importieren möchten.

Erforderliche Rollen

Ein Dienstkonto stellt die Identität eines Workflows dar und bestimmt, welche Berechtigungen der Workflow hat und auf welche Google Cloud Ressourcen er zugreifen kann. Sie benötigen ein Dienstkonto für Workflows (zum Ausführen der Pipeline) und für Dataproc Serverless (zum Ausführen des Connectors).

Sie können das Compute Engine-Standarddienstkonto (PROJECT_NUMBER-compute@developer.gserviceaccount.com) verwenden oder eigene Dienstkonten erstellen, um die verwaltete Konnektivitätspipeline auszuführen.

Console

  1. Öffnen Sie in der Google Cloud Console die Seite IAM.

    IAM aufrufen

  2. Wählen Sie das Projekt aus, in das Sie Metadaten importieren möchten.

  3. Klicken Sie auf Zugriff gewähren und geben Sie dann die E-Mail-Adresse des Dienstkontos ein.

  4. Weisen Sie dem Dienstkonto die folgenden Rollen zu:

    • Log-Autor
    • Dataplex Entry Group Owner
    • Dataplex Metadata Job Owner
    • Dataplex Catalog Editor
    • Dataproc-Bearbeiter
    • Dataproc-Worker
    • Zugriffsperson für Secret Manager-Secret für das Secret, in dem die Anmeldedaten für Ihre Datenquelle gespeichert sind
    • Storage Object User für den Cloud Storage-Bucket
    • Artifact Registry Reader: Im Artifact Registry-Repository, das das Connector-Image enthält
    • Dienstkontonutzer: Wenn Sie verschiedene Dienstkonten verwenden, weisen Sie dem Dienstkonto, auf dem Workflows ausgeführt werden, diese Rolle für das Dienstkonto zu, auf dem die Dataproc-Batchjobs ohne Server ausgeführt werden.
    • Workflows Invoker: Wenn Sie die Pipeline planen möchten
  5. Speichern Sie die Änderungen.

gcloud

  1. Weisen Sie dem Dienstkonto Rollen zu. Führen Sie folgende Befehle aus:

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/logging.logWriter
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.entryGroupOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.metadataJobOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.catalogEditor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.editor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.worker
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Der Name des Zielprojekts Google Cloud, in das die Metadaten importiert werden sollen.
    • SERVICE_ACCOUNT_ID: das Dienstkonto, z. B. my-service-account@my-project.iam.gserviceaccount.com.
  2. Weisen Sie dem Dienstkonto die folgenden Rollen auf Ressourcenebene zu:

    gcloud secrets add-iam-policy-binding SECRET_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/secretmanager.secretaccessor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/storage.objectUser \
        --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID')
    gcloud artifacts repositories add-iam-policy-binding REPOSITORY \
        --location=REPOSITORY_LOCATION \
        --member=SERVICE_ACCOUNT_ID} \
        --role=roles/artifactregistry.reader
    

    Ersetzen Sie Folgendes:

    • SECRET_ID: die ID des Secrets, in dem die Anmeldedaten für Ihre Datenquelle gespeichert sind. Sie verwendet das Format projects/PROJECT_ID/secrets/SECRET_ID.
    • BUCKET_ID: den Namen des Cloud Storage-Buckets.
    • REPOSITORY: das Artifact Registry-Repository, das das Connector-Image enthält.
    • REPOSITORY_LOCATION: den Google CloudStandort, an dem das Repository gehostet wird.
  3. Weisen Sie dem Dienstkonto, auf dem Workflows ausgeführt werden, die Rolle roles/iam.serviceAccountUser für das Dienstkonto zu, auf dem die Dataproc Serverless-Batchjobs ausgeführt werden. Sie müssen diese Rolle auch gewähren, wenn Sie dasselbe Dienstkonto für Workflows und Dataproc Serverless verwenden.

    gcloud iam service-accounts add-iam-policy-binding \
        serviceAccount:SERVICE_ACCOUNT_ID \
        --member='SERVICE_ACCOUNT_ID' \
        --role='roles/iam.serviceAccountUser'
    

    Wenn Sie unterschiedliche Dienstkonten verwenden, ist der Wert für das Flag --member das Dienstkonto, unter dem die Dataproc-Serverless-Batchjobs ausgeführt werden.

  4. Wenn Sie die Pipeline planen möchten, weisen Sie dem Dienstkonto die folgende Rolle zu:

    gcloud projects add-iam-policy-binding PROJECT_ID \
     --member="SERVICE_ACCOUNT_ID" \
     --role=roles/workflows.invoker
    

Metadaten importieren

Wenn Sie Metadaten importieren möchten, erstellen Sie einen Workflow, in dem die verwaltete Konnektivitätspipeline ausgeführt wird, und führen Sie ihn aus. Optional können Sie auch einen Zeitplan für die Ausführung der Pipeline erstellen.

Console

  1. Erstellen Sie den Workflow. Geben Sie die folgenden Informationen an:

    • Dienstkonto: Das Dienstkonto, das Sie im Abschnitt Erforderliche Rollen dieses Dokuments konfiguriert haben.
    • Verschlüsselung: Wählen Sie Google-managed encryption key aus.

    • Workflow definieren: Geben Sie die folgende Definitiondatei an:

      main:
        params: [args]
        steps:
          - init:
              assign:
              - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
              - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
              - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
              - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
      
          - check_networking:
              switch:
                - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                  raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
                - condition: ${NETWORK_URI == "" and SUBNETWORK_URI == ""}
                  steps:
                   - submit_extract_job_with_default_network_uri:
                        assign:
                          - NETWORK_TYPE: "networkUri"
                          - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}  
                - condition: ${NETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_network_uri:
                        assign:
                          - NETWORKING: ${NETWORK_URI}
                          - NETWORK_TYPE: "networkUri"
                - condition: ${SUBNETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_subnetwork_uri:
                        assign:
                          - NETWORKING: ${SUBNETWORK_URI}
                          - NETWORK_TYPE: "subnetworkUri"
              next: check_create_target_entry_group
      
          - check_create_target_entry_group:
              switch:
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                  next: create_target_entry_group
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                  next: generate_extract_job_link
      
          - create_target_entry_group:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: generate_extract_job_link
      
          - generate_extract_job_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                  severity: "INFO"
              next: submit_pyspark_extract_job
      
          - submit_pyspark_extract_job:
              call: http.post
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                headers:
                  Content-Type: "application/json"
                query:
                  batchId: ${WORKFLOW_ID}
                body:
                  pysparkBatch:
                    mainPythonFileUri: file:///main.py
                    args:
                      - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                      - ${"--target_location_id=" + args.CLOUD_REGION}
                      - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                      - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                      - ${"--output_folder=" + WORKFLOW_ID}
                      - ${args.ADDITIONAL_CONNECTOR_ARGS}
                  runtimeConfig:
                      containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                  environmentConfig:
                      executionConfig:
                          serviceAccount: ${args.SERVICE_ACCOUNT}
                          stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                          ${NETWORK_TYPE}: ${NETWORKING}
                          networkTags: ${NETWORK_TAGS}
              result: RESPONSE_MESSAGE
              next: check_pyspark_extract_job
      
          - check_pyspark_extract_job:
              call: http.get
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: PYSPARK_EXTRACT_JOB_STATUS
              next: check_pyspark_extract_job_done
      
          - check_pyspark_extract_job_done:
              switch:
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                  next: generate_import_logs_link
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              next: pyspark_extract_job_wait
      
          - pyspark_extract_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_pyspark_extract_job
      
          - generate_import_logs_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                  severity: "INFO"
              next: submit_import_job
      
          - submit_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                body:
                  type: IMPORT
                  import_spec:
                    source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                    entry_sync_mode: FULL
                    aspect_sync_mode: INCREMENTAL
                    log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                    scope:
                      entry_groups: 
                        - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                      entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                      aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
              result: IMPORT_JOB_RESPONSE
              next: get_job_start_time
      
          - get_job_start_time:
              assign:
                - importJobStartTime: ${sys.now()}
              next: import_job_startup_wait
      
          - import_job_startup_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: initial_get_import_job
      
          - initial_get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_status_available
      
          - check_import_job_status_available:
              switch:
                - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                  next: kill_import_job
                - condition: ${"status" in IMPORT_JOB_STATUS.body}
                  next: check_import_job_done
              next: import_job_status_wait
      
          - import_job_status_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_import_job_status_available
      
          - check_import_job_done:
              switch:
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                  next: the_end
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                  next: kill_import_job
              next: import_job_wait
      
          - get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_done
      
          - import_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: get_import_job
      
          - kill_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: get_killed_import_job
      
          - get_killed_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: KILLED_IMPORT_JOB_STATUS
              next: killed
      
          - killed:
              raise: ${KILLED_IMPORT_JOB_STATUS}
      
          - the_end:
              return: ${IMPORT_JOB_STATUS}
  2. Wenn Sie die Pipeline on demand ausführen möchten, führen Sie den Workflow aus.

    Geben Sie die folgenden Laufzeitargumente an:

    {
        "TARGET_PROJECT_ID": "PROJECT_ID",
        "CLOUD_REGION": "LOCATION_ID",
        "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
        "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
        "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
        "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
        "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
        "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
        "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
        "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
        "IMPORT_JOB_LOG_LEVEL": "INFO",
        "NETWORK_TAGS": [],
        "NETWORK_URI": "",
        "SUBNETWORK_URI": ""
     }
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Der Name des Zielprojekts Google Cloud, in das die Metadaten importiert werden sollen.
    • LOCATION_ID: der Ziel Google Cloud speicherort, an dem die Dataproc Serverless- und Metadatenimportjobs ausgeführt und Metadaten importiert werden
    • ENTRY_GROUP_ID: die ID der Eintragsgruppe, in die Metadaten importiert werden sollen. Die ID der Eingangsgruppe kann Kleinbuchstaben, Ziffern und Bindestriche enthalten.

      Der vollständige Ressourcenname dieser Eintragsgruppe lautet projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN: Wenn die Pipeline die Einstiegsgruppe erstellen soll, wenn sie in Ihrem Projekt noch nicht vorhanden ist, legen Sie diesen Wert auf true fest.
    • BUCKET_ID: Der Name des Cloud Storage-Buckets, in dem die vom Connector generierte Metadatenimportdatei gespeichert werden soll. Für jede Workflowausführung wird ein neuer Ordner erstellt.
    • SERVICE_ACCOUNT_ID: das Dienstkonto, das Sie im Abschnitt Erforderliche Rollen dieses Dokuments konfiguriert haben. Das Dienstkonto führt den Connector in Dataproc Serverless aus.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: Eine Liste mit zusätzlichen Argumenten, die an den Connector übergeben werden sollen. Beispiele finden Sie unter Benutzerdefinierten Connector für den Metadatenimport entwickeln. Setzen Sie jedes Argument in Anführungszeichen und trennen Sie die Argumente durch Kommas.
    • CONTAINER_IMAGE: das benutzerdefinierte Container-Image des in Artifact Registry gehosteten Connectors.
    • ENTRY_TYPES: Liste der Eintragstypen, die importiert werden sollen, im Format projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. Die LOCATION_ID muss entweder derselbeGoogle Cloud Speicherort sein, an den Sie Metadaten importieren, oder global.
    • ASPECT_TYPES: Liste der Aspekte, die importiert werden sollen, im Format projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. Die LOCATION_ID muss entweder derselbeGoogle Cloud Speicherort sein, an den Sie Metadaten importieren, oder global.
    • Optional: Geben Sie für das Argument NETWORK_TAGS eine Liste von Netzwerk-Tags an.
    • Optional: Geben Sie für das Argument NETWORK_URI den URI des VPC-Netzwerks an, das eine Verbindung zur Datenquelle herstellt. Wenn Sie ein Netzwerk angeben, lassen Sie das Subnetzargument weg.
    • Optional: Geben Sie für das Argument SUBNETWORK_URI den URI des Unternetzwerks an, das eine Verbindung zur Datenquelle herstellt. Wenn Sie ein Subnetz angeben, lassen Sie das Netzwerkargument weg.

    Je nach Menge der importierten Metadaten kann die Ausführung der Pipeline einige Minuten oder länger dauern. Weitere Informationen zum Aufrufen des Fortschritts finden Sie unter Auf Ergebnisse der Workflowausführung zugreifen.

    Nachdem die Pipeline ausgeführt wurde, können Sie in Dataplex Catalog nach den importierten Metadaten suchen.

  3. Optional: Wenn Sie die Pipeline nach einem Zeitplan ausführen möchten, erstellen Sie einen Zeitplan mit Cloud Scheduler. Geben Sie die folgenden Informationen an:

    • Häufigkeit: Ein Unix-Cron-Ausdruck, der den Zeitplan für die Ausführung der Pipeline definiert.
    • Workflow-Argument: Die Laufzeitargumente für den Connector, wie im vorherigen Schritt beschrieben.
    • Dienstkonto: Das Dienstkonto. Das Dienstkonto verwaltet den Scheduler.

gcloud

  1. Speichern Sie die folgende Arbeitslastdefinition als YAML-Datei:

    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: ${NETWORK_URI == "" and SUBNETWORK_URI == ""}
                steps:
                 - submit_extract_job_with_default_network_uri:
                      assign:
                        - NETWORK_TYPE: "networkUri"
                        - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}  
              - condition: ${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: ${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: ${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: ${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - ${"--target_location_id=" + args.CLOUD_REGION}
                    - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--output_folder=" + WORKFLOW_ID}
                    - ${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: ${args.SERVICE_ACCOUNT}
                        stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                        ${NETWORK_TYPE}: ${NETWORKING}
                        networkTags: ${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: ${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: ${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: ${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: ${IMPORT_JOB_STATUS}
  2. Definieren Sie Bash-Variablen, erstellen Sie den Workflow und erstellen Sie optional einen Zeitplan für die Ausführung der Pipeline:

    # Define Bash variables (replace with your actual values)
    project_id="PROJECT_ID"
    region="LOCATION_ID"
    service_account="SERVICE_ACCOUNT_ID"
    workflow_source="WORKFLOW_DEFINITION_FILE.yaml"
    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    # Create Workflows resource
    gcloud workflows deploy ${workflow_name} \
      --project=${project_id} \
      --location=${region} \
      --source=${workflow_source} \
      --service-account=${service_account}
    
    # Create Cloud Scheduler job
    gcloud scheduler jobs create http ${workflow_name}-scheduler \
      --project=${project_id} \
      --location=${region} \
      --schedule="CRON_SCHEDULE_EXPRESSION" \
      --time-zone="UTC" \
      --uri="https://workflowexecutions.googleapis.com/v1/projects/${project_id}/locations/${region}/workflows/${workflow_name}/executions" \
      --http-method="POST" \
      --oauth-service-account-email=${service_account} \
      --headers="Content-Type=application/json" \
      --message-body='{"argument": ${workflow_args}}'
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Der Name des Zielprojekts Google Cloud, in das die Metadaten importiert werden sollen.
    • LOCATION_ID: der Ziel Google Cloud speicherort, an dem die Dataproc Serverless- und Metadatenimportjobs ausgeführt und Metadaten importiert werden
    • SERVICE_ACCOUNT_ID: das Dienstkonto, das Sie im Abschnitt Erforderliche Rollen dieses Dokuments konfiguriert haben.
    • WORKFLOW_DEFINITION_FILE: Pfad zur YAML-Datei mit der Workflowdefinition.
    • WORKFLOW_NAME: Der Name des Workflows.
    • WORKFLOW_ARGUMENTS: die Laufzeitargumente, die an den Connector übergeben werden sollen. Die Argumente müssen im JSON-Format vorliegen:

      {
          "TARGET_PROJECT_ID": "PROJECT_ID",
          "CLOUD_REGION": "LOCATION_ID",
          "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
          "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
          "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
          "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
          "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
          "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
          "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
          "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
          "IMPORT_JOB_LOG_LEVEL": "INFO",
          "NETWORK_TAGS": [],
          "NETWORK_URI": "",
          "SUBNETWORK_URI": ""
       }
      

      Bei Cloud Scheduler werden die doppelten Anführungszeichen innerhalb des Strings in Anführungszeichen mit Schrägstrichen (\) maskiert. Beispiel:--message-body="{\"argument\": \"{\\\"key\\\": \\\"value\\\"}\"}".

      Ersetzen Sie Folgendes:

      • ENTRY_GROUP_ID: die ID der Eintragsgruppe, in die Metadaten importiert werden sollen. Die ID der Eingangsgruppe kann Kleinbuchstaben, Ziffern und Bindestriche enthalten.

        Der vollständige Ressourcenname dieser Eintragsgruppe lautet projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

      • CREATE_ENTRY_GROUP_BOOLEAN: Wenn die Pipeline die Einstiegsgruppe erstellen soll, wenn sie in Ihrem Projekt noch nicht vorhanden ist, legen Sie diesen Wert auf true fest.
      • BUCKET_ID: Der Name des Cloud Storage-Buckets, in dem die vom Connector generierte Metadatenimportdatei gespeichert werden soll. Für jede Workflowausführung wird ein neuer Ordner erstellt.
      • ADDITIONAL_CONNECTOR_ARGUMENTS: Eine Liste mit zusätzlichen Argumenten, die an den Connector übergeben werden sollen. Beispiele finden Sie unter Benutzerdefinierten Connector für den Metadatenimport entwickeln.
      • CONTAINER_IMAGE: das benutzerdefinierte Container-Image des in Artifact Registry gehosteten Connectors.
      • ENTRY_TYPES: Liste der Eintragstypen, die importiert werden sollen, im Format projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. Die LOCATION_ID muss entweder derselbeGoogle Cloud Speicherort sein, an den Sie Metadaten importieren, oder global.
      • ASPECT_TYPES: Liste der Aspekte, die importiert werden sollen, im Format projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. Die LOCATION_ID muss entweder derselbeGoogle Cloud Speicherort sein, an den Sie Metadaten importieren, oder global.
      • Optional: Geben Sie für das Argument NETWORK_TAGS eine Liste von Netzwerk-Tags an.
      • Optional: Geben Sie für das Argument NETWORK_URI den URI des VPC-Netzwerks an, das eine Verbindung zur Datenquelle herstellt. Wenn Sie ein Netzwerk angeben, lassen Sie das Subnetzargument weg.
      • Optional: Geben Sie für das Argument SUBNETWORK_URI den URI des Unternetzwerks an, das eine Verbindung zur Datenquelle herstellt. Wenn Sie ein Subnetz angeben, lassen Sie das Netzwerkargument weg.
    • CRON_SCHEDULE_EXPRESSION: Ein Cron-Ausdruck, der den Zeitplan für die Ausführung der Pipeline definiert. Wenn Sie den Zeitplan beispielsweise jeden Tag um Mitternacht ausführen möchten, verwenden Sie den Ausdruck 0 0 * * *.

  3. Wenn Sie die Pipeline bei Bedarf ausführen möchten, führen Sie den Workflow aus:

    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    gcloud workflows run "${workflow_name}" --project=${project_id} --location=${location} --data '${workflow_args}'
    

    Die Workflow-Argumente sind im JSON-Format, aber nicht maskiert.

    Je nach Menge der importierten Metadaten kann die Ausführung des Workflows einige Minuten oder länger dauern. Weitere Informationen zum Aufrufen des Fortschritts finden Sie unter Auf Ergebnisse der Workflowausführung zugreifen.

    Nachdem die Pipeline ausgeführt wurde, können Sie in Dataplex Catalog nach den importierten Metadaten suchen.

Terraform

  1. Klonen Sie das cloud-dataplex-Repository.

    Das Repository enthält die folgenden Terraform-Dateien:

    • main.tf: Google Cloud Definiert die zu erstellenden Ressourcen.
    • variables.tf: Hier werden die Variablen deklariert.
    • byo-connector.tfvars: Hiermit werden die Variablen für die verwaltete Konnektivitätspipeline definiert.
  2. Bearbeiten Sie die .tfvars-Datei und ersetzen Sie die Platzhalter durch die Informationen für Ihren Connector.

    project_id                      = "PROJECT_ID"
    region                          = "LOCATION_ID"
    service_account                 = "SERVICE_ACCOUNT_ID"
    cron_schedule                   = "CRON_SCHEDULE_EXPRESSION"
    workflow_args                   = {"TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID", "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [], "NETWORK_URI": "", "SUBNETWORK_URI": ""}
    
    
    workflow_source                 = <<EOF
    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: $${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: $${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: $${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: $${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: $${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: $${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: $${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: $${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: $${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: set_default_networking
    
        - set_default_networking:
            assign:
              - NETWORK_TYPE: "networkUri"
              - NETWORKING: $${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: $${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - $${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - $${"--target_location_id=" + args.CLOUD_REGION}
                    - $${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - $${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - $${"--output_folder=" + WORKFLOW_ID}
                    - $${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: $${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: $${args.SERVICE_ACCOUNT}
                        stagingBucket: $${args.CLOUD_STORAGE_BUCKET_ID}
                        $${NETWORK_TYPE}: $${NETWORKING}
                        networkTags: $${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: $${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: $${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - $${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: $${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: $${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: $${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: $${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: $${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: $${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: $${IMPORT_JOB_STATUS}
    EOF
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Der Name des Zielprojekts Google Cloud, in das die Metadaten importiert werden sollen.
    • LOCATION_ID: der Ziel Google Cloud speicherort, an dem die Dataproc Serverless- und Metadatenimportjobs ausgeführt und Metadaten importiert werden
    • SERVICE_ACCOUNT_ID: das Dienstkonto, das Sie im Abschnitt Erforderliche Rollen dieses Dokuments konfiguriert haben.
    • CRON_SCHEDULE_EXPRESSION: Ein Cron-Ausdruck, der den Zeitplan für die Ausführung der Pipeline definiert. Wenn Sie den Zeitplan beispielsweise jeden Tag um Mitternacht ausführen möchten, verwenden Sie den Ausdruck 0 0 * * *.
    • ENTRY_GROUP_ID: die ID der Eintragsgruppe, in die Metadaten importiert werden sollen. Die ID der Eingangsgruppe kann Kleinbuchstaben, Ziffern und Bindestriche enthalten.

      Der vollständige Ressourcenname dieser Eintragsgruppe lautet projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN: Wenn die Pipeline die Einstiegsgruppe erstellen soll, wenn sie in Ihrem Projekt noch nicht vorhanden ist, legen Sie diesen Wert auf true fest.
    • BUCKET_ID: Der Name des Cloud Storage-Buckets, in dem die vom Connector generierte Metadatenimportdatei gespeichert werden soll. Für jede Workflowausführung wird ein neuer Ordner erstellt.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: Eine Liste mit zusätzlichen Argumenten, die an den Connector übergeben werden sollen. Beispiele finden Sie unter Benutzerdefinierten Connector für den Metadatenimport entwickeln. Setzen Sie jedes Argument in Anführungszeichen und trennen Sie die Argumente durch Kommas.
    • CONTAINER_IMAGE: das benutzerdefinierte Container-Image des in Artifact Registry gehosteten Connectors.
    • ENTRY_TYPES: Liste der Eintragstypen, die importiert werden sollen, im Format projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. Die LOCATION_ID muss entweder derselbeGoogle Cloud Speicherort sein, an den Sie Metadaten importieren, oder global.
    • ASPECT_TYPES: Liste der Aspekte, die importiert werden sollen, im Format projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. Die LOCATION_ID muss entweder derselbeGoogle Cloud Speicherort sein, an den Sie Metadaten importieren, oder global.
    • Optional: Geben Sie für das Argument NETWORK_TAGS eine Liste von Netzwerk-Tags an.
    • Optional: Geben Sie für das Argument NETWORK_URI den URI des VPC-Netzwerks an, das eine Verbindung zur Datenquelle herstellt. Wenn Sie ein Netzwerk angeben, lassen Sie das Subnetzargument weg.
    • Optional: Geben Sie für das Argument SUBNETWORK_URI den URI des Unternetzwerks an, das eine Verbindung zur Datenquelle herstellt. Wenn Sie ein Subnetz angeben, lassen Sie das Netzwerkargument weg.
  3. Initialisieren Sie Terraform:

    terraform init
    
  4. Validieren Sie Terraform mit Ihrer .tfvars-Datei:

    terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Ersetzen Sie CONNECTOR_VARIABLES_FILE durch den Namen der Variablendefinitionsdatei.

  5. Terraform mit der Datei .tfvars bereitstellen:

    terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Terraform erstellt einen Workflow und einen Cloud Scheduler-Job im angegebenen Projekt. Mit Workflows wird die Pipeline gemäß dem von Ihnen angegebenen Zeitplan ausgeführt.

    Je nach Menge der importierten Metadaten kann die Ausführung des Workflows einige Minuten oder länger dauern. Weitere Informationen zum Aufrufen des Fortschritts finden Sie unter Auf Ergebnisse der Workflowausführung zugreifen.

    Nachdem die Pipeline ausgeführt wurde, können Sie in Dataplex Catalog nach den importierten Metadaten suchen.

Jobprotokolle ansehen

Verwenden Sie Cloud Logging, um Logs für eine verwaltete Konnektivitätspipeline aufzurufen. Die Protokollnutzlast enthält einen Link zu den Protokollen für den Dataproc Serverless-Batchjob und den Metadatenimportjob, sofern zutreffend. Weitere Informationen finden Sie unter Workflow-Logs ansehen.

Fehlerbehebung

Versuchen Sie, das Problem anhand der folgenden Vorschläge zu beheben:

  • Konfigurieren Sie die Protokollebene des Importjobs für den Metadatenjob so, dass Logging auf Debugebene anstelle von Logging auf Infoebene verwendet wird.
  • Prüfen Sie die Logs für den serverlosen Dataproc-Batchjob (für Connectorausführungen) und den Metadatenimportjob. Weitere Informationen finden Sie unter Dataproc Serverless für Spark-Protokolle abfragen und Metadatenjobprotokolle abfragen.
  • Wenn ein Eintrag nicht mithilfe der Pipeline importiert werden kann und die Fehlermeldung nicht genügend Informationen enthält, erstellen Sie einen benutzerdefinierten Eintrag mit denselben Details in einer Testeintragsgruppe. Weitere Informationen finden Sie unter Benutzerdefinierten Eintrag erstellen.

Nächste Schritte