Metadaten mit Workflows aus einer benutzerdefinierten Quelle importieren

In diesem Dokument wird beschrieben, wie Sie Metadaten aus einer Drittanbieterquelle in Dataplex importieren, indem Sie eine verwaltete Konnektivitätspipeline in Workflows ausführen.

Wenn Sie eine verwaltete Konnektivitätspipeline einrichten möchten, erstellen Sie einen Connector für Ihre Datenquelle. Anschließend führen Sie die Pipeline in Workflows aus. Die Pipeline extrahiert Metadaten aus Ihrer Datenquelle und importiert dann die Metadaten in Dataplex an. Bei Bedarf erstellt die Pipeline auch Dataplex-Katalog-Eintragsgruppen in Ihrem Google Cloud-Projekt.

Weitere Informationen zu verwalteten Verbindungen finden Sie unter Verwaltete Verbindung – Übersicht

Hinweis

Führen Sie die Aufgaben in diesem Abschnitt aus, bevor Sie Metadaten importieren.

Connector erstellen

Ein Connector extrahiert die Metadaten aus Ihren Daten und generiert eine Metadaten-Importdatei, die von Dataplex. Der Connector ist ein Artifact Registry-Image, das auf Dataproc Serverless

Google Cloud-Ressourcen konfigurieren

  1. Enable the Workflows, Dataproc, Cloud Storage, Dataplex, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.

    Enable the APIs

    Wenn Sie die Pipeline nicht nach einem Zeitplan ausführen möchten, Aktivieren Sie die Cloud Scheduler API.

  2. Erstellen Sie Secrets in Secret Manager, um die Anmeldedaten für Ihre Drittanbieterdatenquelle zu speichern.

  3. VPC-Netzwerk (Virtual Private Cloud) konfigurieren um Dataproc Serverless für Spark-Arbeitslasten auszuführen.

  4. Cloud Storage-Bucket erstellen, um Metadaten-Importdateien speichern.

  5. Erstellen Sie die folgenden Dataplex Catalog-Ressourcen:

    1. Benutzerdefinierte Aspekttypen erstellen für die Einträge, die Sie importieren möchten.

    2. Benutzerdefinierte Eintragstypen erstellen für die Einträge, die Sie importieren möchten.

Erforderliche Rollen

Ein Dienstkonto stellt die Identität eines Workflows dar und bestimmt, welche Berechtigungen der Workflow hat und auf welche Google Cloud-Ressourcen er zugreifen kann. Sie benötigen ein Dienstkonto für Workflows (zum Ausführen der Pipeline) und für Dataproc Serverless (zum Ausführen des Connectors).

Sie können das Compute Engine-Standarddienstkonto (PROJECT_NUMBER-compute@developer.gserviceaccount.com) oder erstellen Sie Ihr eigenes Dienstkonto (oder Konten), um die Pipeline für die verwaltete Verbindung auszuführen.

Console

  1. Öffnen Sie in der Google Cloud Console die Seite IAM.

    IAM aufrufen

  2. Wählen Sie das Projekt aus, in das Sie Metadaten importieren möchten.

  3. Klicken Sie auf  Zugriff gewähren und geben Sie dann die E-Mail-Adresse des Dienstkontos ein.

  4. Weisen Sie dem Dienstkonto die folgenden Rollen zu:

    • Logautor
    • Inhaber von Dataplex-Eintragsgruppen
    • Dataplex Metadata Job Owner
    • Dataplex Catalog Editor
    • Dataproc-Bearbeiter
    • Dataproc-Worker
    • Zugriffsperson für Secret Manager-Secret für das Secret, in dem die Anmeldedaten für Ihre Datenquelle gespeichert sind
    • Storage Object User – im Cloud Storage-Bucket
    • Artifact Registry Reader: Im Artifact Registry-Repository, das das Connector-Image enthält
    • Service Account User – Wenn Sie verschiedene Dienstkonten verwenden, Dem Dienstkonto, das Workflows ausführt, diese Rolle gewähren für das Dienstkonto, auf dem Dataproc Serverless ausgeführt wird Batchjobs
    • Workflows Invoker: Wenn Sie die Pipeline planen möchten
  5. Speichern Sie die Änderungen.

gcloud

  1. Weisen Sie dem Dienstkonto Rollen zu. Führen Sie folgende Befehle aus:

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/logging.logWriter
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.entryGroupOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.metadataJobOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.catalogEditor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.editor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.worker
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: der Name des Ziel-Google Cloud-Projekts, in das die Metadaten importiert werden sollen.
    • SERVICE_ACCOUNT_ID: das Dienstkonto, z. B. my-service-account@my-project.iam.gserviceaccount.com.
  2. Weisen Sie dem Dienstkonto die folgenden Rollen auf Ressourcenebene zu:

    gcloud secrets add-iam-policy-binding SECRET_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/secretmanager.secretaccessor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/storage.objectUser \
        --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID')
    gcloud artifacts repositories add-iam-policy-binding REPOSITORY \
        --location=REPOSITORY_LOCATION \
        --member=SERVICE_ACCOUNT_ID} \
        --role=roles/artifactregistry.reader
    

    Ersetzen Sie Folgendes:

    • SECRET_ID: die ID des gespeicherten Secrets die Anmeldedaten für Ihre Datenquelle. Sie verwendet das Format projects/PROJECT_ID/secrets/SECRET_ID.
    • BUCKET_ID: der Name von Cloud Storage Bucket.
    • REPOSITORY: das Artifact Registry-Repository das das Connector-Image enthält.
    • REPOSITORY_LOCATION: Der Google Cloud-Standort, an dem das Repository gehostet wird.
  3. Gewähren Sie dem Dienstkonto, das Workflows ausführt, die Rolle roles/iam.serviceAccountUser für das Dienstkonto Ausführen der serverlosen Batchjobs von Dataproc Serverless Sie müssen diese Rolle auch gewähren, wenn Sie dasselbe Dienstkonto für Workflows und Dataproc Serverless verwenden.

    gcloud iam service-accounts add-iam-policy-binding \
        serviceAccount:SERVICE_ACCOUNT_ID \
        --member='SERVICE_ACCOUNT_ID' \
        --role='roles/iam.serviceAccountUser'
    

    Wenn Sie verschiedene Dienstkonten verwenden, ist der Wert für das Flag --member ist das Dienstkonto, auf dem Dataproc Serverless ausgeführt wird Batchjobs

  4. Wenn Sie die Pipeline planen möchten, weisen Sie dem Dienstkonto die folgende Rolle zu:

    gcloud projects add-iam-policy-binding PROJECT_ID \
     --member="SERVICE_ACCOUNT_ID" \
     --role=roles/workflows.invoker
    

Metadaten importieren

Wenn Sie Metadaten importieren möchten, erstellen Sie einen Workflow, in dem die verwaltete Konnektivitätspipeline ausgeführt wird, und führen Sie ihn aus. Optional können Sie auch einen Zeitplan für die Ausführung der Pipeline erstellen.

Console

  1. Erstellen Sie den Workflow. Geben Sie die folgenden Informationen an:

    • Dienstkonto: das Dienstkonto, das Sie unter Erforderliche Rollen konfiguriert haben dieses Dokuments.
    • Verschlüsselung: Wählen Sie Von Google verwalteter Verschlüsselungsschlüssel aus.

    • Workflow definieren: Stellen Sie die folgende Definitionsdatei bereit:

      main:
        params: [args]
        steps:
          - init:
              assign:
              - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
              - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
              - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
              - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
      
          - check_networking:
              switch:
                - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                  raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
                - condition: ${NETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_network_uri:
                        assign:
                          - NETWORKING: ${NETWORK_URI}
                          - NETWORK_TYPE: "networkUri"
                - condition: ${SUBNETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_subnetwork_uri:
                        assign:
                          - NETWORKING: ${SUBNETWORK_URI}
                          - NETWORK_TYPE: "subnetworkUri"
              next: set_default_networking
      
          - set_default_networking:
              assign:
                - NETWORK_TYPE: "networkUri"
                - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
              next: check_create_target_entry_group
      
          - check_create_target_entry_group:
              switch:
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                  next: create_target_entry_group
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                  next: generate_extract_job_link
      
          - create_target_entry_group:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: generate_extract_job_link
      
          - generate_extract_job_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                  severity: "INFO"
              next: submit_pyspark_extract_job
      
          - submit_pyspark_extract_job:
              call: http.post
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                headers:
                  Content-Type: "application/json"
                query:
                  batchId: ${WORKFLOW_ID}
                body:
                  pysparkBatch:
                    mainPythonFileUri: file:///main.py
                    args:
                      - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                      - ${"--target_location_id=" + args.CLOUD_REGION}
                      - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                      - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                      - ${"--output_folder=" + WORKFLOW_ID}
                      - ${args.ADDITIONAL_CONNECTOR_ARGS}
                  runtimeConfig:
                      containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                  environmentConfig:
                      executionConfig:
                          serviceAccount: ${args.SERVICE_ACCOUNT}
                          stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                          ${NETWORK_TYPE}: ${NETWORKING}
                          networkTags: ${NETWORK_TAGS}
              result: RESPONSE_MESSAGE
              next: check_pyspark_extract_job
      
          - check_pyspark_extract_job:
              call: http.get
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: PYSPARK_EXTRACT_JOB_STATUS
              next: check_pyspark_extract_job_done
      
          - check_pyspark_extract_job_done:
              switch:
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                  next: generate_import_logs_link
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              next: pyspark_extract_job_wait
      
          - pyspark_extract_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_pyspark_extract_job
      
          - generate_import_logs_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                  severity: "INFO"
              next: submit_import_job
      
          - submit_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                body:
                  type: IMPORT
                  import_spec:
                    source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                    entry_sync_mode: FULL
                    aspect_sync_mode: INCREMENTAL
                    log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                    scope:
                      entry_groups: 
                        - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                      entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                      aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
              result: IMPORT_JOB_RESPONSE
              next: get_job_start_time
      
          - get_job_start_time:
              assign:
                - importJobStartTime: ${sys.now()}
              next: import_job_startup_wait
      
          - import_job_startup_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: initial_get_import_job
      
          - initial_get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_status_available
      
          - check_import_job_status_available:
              switch:
                - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                  next: kill_import_job
                - condition: ${"status" in IMPORT_JOB_STATUS.body}
                  next: check_import_job_done
              next: import_job_status_wait
      
          - import_job_status_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_import_job_status_available
      
          - check_import_job_done:
              switch:
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                  next: the_end
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                  next: kill_import_job
              next: import_job_wait
      
          - get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_done
      
          - import_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: get_import_job
      
          - kill_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: get_killed_import_job
      
          - get_killed_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: KILLED_IMPORT_JOB_STATUS
              next: killed
      
          - killed:
              raise: ${KILLED_IMPORT_JOB_STATUS}
      
          - the_end:
              return: ${IMPORT_JOB_STATUS}
  2. Wenn Sie die Pipeline bei Bedarf ausführen möchten, führen Sie den Workflow aus.

    Geben Sie die folgenden Laufzeitargumente an:

    {
        "TARGET_PROJECT_ID": "PROJECT_ID",
        "CLOUD_REGION": "LOCATION_ID",
        "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
        "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
        "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
        "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
        "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
        "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
        "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
        "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
        "IMPORT_JOB_LOG_LEVEL": "INFO",
        "NETWORK_TAGS": [],
        "NETWORK_URI": "",
        "SUBNETWORK_URI": ""
     }
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: der Name des Ziel-Google Cloud-Projekts, in das die Metadaten importiert werden sollen.
    • LOCATION_ID: Der Google Cloud-Speicherort, an dem die Dataproc Serverless- und Metadatenimportjobs ausgeführt und Metadaten importiert werden.
    • ENTRY_GROUP_ID: die ID der zu importierenden Eintragsgruppe in die Metadaten. Die ID der Eintragsgruppe kann Kleinbuchstaben, Ziffern und Bindestriche enthalten.

      Der vollständige Ressourcenname dieser Eintragsgruppe lautet projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID

    • CREATE_ENTRY_GROUP_BOOLEAN: wenn die Pipeline Erstellen Sie die Eintragsgruppe, falls diese noch nicht in Ihrem Projekt vorhanden ist, legen Sie Folgendes fest: Wert auf true.
    • BUCKET_ID: der Name von Cloud Storage Bucket zum Speichern der vom Connector generierten Metadatenimportdatei. Bei jeder Workflowausführung wird ein neuer Ordner erstellt.
    • SERVICE_ACCOUNT_ID: das Dienstkonto, das Sie im Abschnitt Erforderliche Rollen dieses Dokuments konfiguriert haben. Das Dienstkonto führt den Connector in Dataproc Serverless aus.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: eine Liste mit zusätzlichen die an den Connector übergeben werden sollen. Beispiele finden Sie unter Benutzerdefinierten Connector für den Metadatenimport. Setzen Sie jedes Argument in doppelte und trennen Sie die Argumente durch Kommas.
    • CONTAINER_IMAGE ist das benutzerdefinierte Container-Image des in Artifact Registry gehosteter Connector.
    • ENTRY_TYPES: eine Liste der Eintragstypen, die einbezogen werden für den Import im Format projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. LOCATION_ID muss entweder derselbe Google Cloud-Speicherort sein, in den Sie Metadaten importieren, oder global.
    • ASPECT_TYPES: eine Liste der Aspekttypen, die in den Umfang fallen für den Import im Format projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. Das Feld LOCATION_ID muss entweder gleich Google Cloud-Standort, in den Sie Metadaten importieren, oder global.
    • Optional: Geben Sie für das Argument NETWORK_TAGS eine Liste mit Netzwerk-Tags an.
    • Optional: Geben Sie für das Argument NETWORK_URI den URI des VPC-Netzwerks an, das mit der Datenquelle verbunden ist. Wenn Sie ein Netzwerk angeben, lassen Sie Subnetzwerk-Arguments.
    • Optional: Geben Sie für das Argument SUBNETWORK_URI den URI des Unternetzwerks an, das eine Verbindung zur Datenquelle herstellt. Wenn Sie ein Subnetz angeben, lassen Sie das Netzwerkargument weg.

    Je nach Menge der importierten Metadaten kann die Ausführung der Pipeline einige Minuten oder länger dauern. Weitere Informationen zu wie Sie den Fortschritt sehen, Auf Ergebnisse der Workflowausführung zugreifen

    Nachdem die Pipeline ausgeführt wurde, können Sie in Dataplex Catalog nach den importierten Metadaten suchen.

  3. Optional: Wenn Sie die Pipeline nach einem Zeitplan ausführen möchten, einen Zeitplan erstellen, indem Sie Cloud Scheduler Geben Sie die folgenden Informationen an:

    • Frequency (Häufigkeit): ein Unix-Cron-Ausdruck, der den Zeitplan für führen Sie die Pipeline aus.
    • Workflow-Argument: Die Laufzeitargumente für den Connector, wie im vorherigen Schritt beschrieben.
    • Dienstkonto: Das Dienstkonto. Das Dienstkonto verwaltet den Scheduler.

gcloud

  1. Speichern Sie die folgende Arbeitslastdefinition als YAML-Datei:

    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: ${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: ${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: ${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: ${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: set_default_networking
    
        - set_default_networking:
            assign:
              - NETWORK_TYPE: "networkUri"
              - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - ${"--target_location_id=" + args.CLOUD_REGION}
                    - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--output_folder=" + WORKFLOW_ID}
                    - ${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: ${args.SERVICE_ACCOUNT}
                        stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                        ${NETWORK_TYPE}: ${NETWORKING}
                        networkTags: ${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: ${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: ${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: ${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: ${IMPORT_JOB_STATUS}
  2. Definieren Sie Bash-Variablen, erstellen Sie den Workflow und erstellen Sie optional einen Zeitplan für die Ausführung der Pipeline:

    # Define Bash variables (replace with your actual values)
    project_id="PROJECT_ID"
    region="LOCATION_ID"
    service_account="SERVICE_ACCOUNT_ID"
    workflow_source="WORKFLOW_DEFINITION_FILE.yaml"
    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    # Create Workflows resource
    gcloud workflows deploy ${workflow_name} \
      --project=${project_id} \
      --location=${region} \
      --source=${workflow_source} \
      --service-account=${service_account}
    
    # Create Cloud Scheduler job
    gcloud scheduler jobs create http ${workflow_name}-scheduler \
      --project=${project_id} \
      --location=${region} \
      --schedule="CRON_SCHEDULE_EXPRESSION" \
      --time-zone="UTC" \
      --uri="https://workflowexecutions.googleapis.com/v1/projects/${project_id}/locations/${region}/workflows/${workflow_name}/executions" \
      --http-method="POST" \
      --oauth-service-account-email=${service_account} \
      --headers="Content-Type=application/json" \
      --message-body='{"argument": ${workflow_args}}'
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: der Name des Ziel-Google Cloud-Projekts, in das die Metadaten importiert werden sollen.
    • LOCATION_ID: der Google Cloud-Zielstandort in dem die serverlosen Dataproc- und Metadaten-Importjobs ausgeführt werden. und Metadaten importiert werden.
    • SERVICE_ACCOUNT_ID: das Dienstkonto, das Sie unter Erforderliche Rollen konfiguriert haben dieses Dokuments.
    • WORKFLOW_DEFINITION_FILE: der Pfad zum YAML-Datei für die Workflow-Definition.
    • WORKFLOW_NAME: Der Name des Workflows.
    • WORKFLOW_ARGUMENTS: die Laufzeitargumente, die an den Connector übergeben werden sollen. Die Argumente liegen im JSON-Format vor:

      {
          "TARGET_PROJECT_ID": "PROJECT_ID",
          "CLOUD_REGION": "LOCATION_ID",
          "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
          "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
          "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
          "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
          "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
          "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
          "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
          "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
          "IMPORT_JOB_LOG_LEVEL": "INFO",
          "NETWORK_TAGS": [],
          "NETWORK_URI": "",
          "SUBNETWORK_URI": ""
       }
      

      Bei Cloud Scheduler werden die doppelten Anführungszeichen innerhalb des Strings in Anführungszeichen mit Schrägstrichen (\) maskiert. Beispiel:--message-body="{\"argument\": \"{\\\"key\\\": \\\"value\\\"}\"}".

      Ersetzen Sie Folgendes:

      • ENTRY_GROUP_ID: die ID der zu importierenden Eintragsgruppe in die Metadaten. Die Eintragsgruppen-ID kann Kleinbuchstaben, Ziffern und Bindestriche verwenden.

        Der vollständige Ressourcenname dieser Eintragsgruppe lautet projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID

      • CREATE_ENTRY_GROUP_BOOLEAN: wenn die Pipeline Erstellen Sie die Eintragsgruppe, falls diese noch nicht in Ihrem Projekt vorhanden ist, legen Sie Folgendes fest: Wert auf true.
      • BUCKET_ID: der Name von Cloud Storage Bucket zum Speichern der vom Connector generierten Metadatenimportdatei. Für jede Workflowausführung wird ein neuer Ordner erstellt.
      • ADDITIONAL_CONNECTOR_ARGUMENTS: eine Liste mit zusätzlichen Argumenten, die an den Connector übergeben werden sollen. Beispiele finden Sie unter Benutzerdefinierten Connector für den Metadatenimport entwickeln.
      • CONTAINER_IMAGE ist das benutzerdefinierte Container-Image des in Artifact Registry gehosteter Connector.
      • ENTRY_TYPES: eine Liste der Eintragstypen, die einbezogen werden für den Import im Format projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. LOCATION_ID muss entweder derselbe Google Cloud-Speicherort sein, in den Sie Metadaten importieren, oder global.
      • ASPECT_TYPES: eine Liste der Aspekttypen, die in den Umfang fallen für den Import im Format projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. Das Feld LOCATION_ID muss entweder gleich Google Cloud-Standort, in den Sie Metadaten importieren, oder global.
      • Optional: Geben Sie für das Argument NETWORK_TAGS eine Liste mit Netzwerk-Tags an.
      • Optional: Geben Sie für das Argument NETWORK_URI den URI des VPC-Netzwerks an, das mit der Datenquelle verbunden ist. Wenn Sie ein Netzwerk angeben, lassen Sie Subnetzwerk-Arguments.
      • Optional: Geben Sie für das Argument SUBNETWORK_URI den URI des Unternetzwerks an, das mit der Datenquelle verbunden ist. Wenn Sie ein Subnetz angeben, lassen Sie das Netzwerkargument weg.
    • CRON_SCHEDULE_EXPRESSION: ein Cron-Ausdruck, der definiert den Zeitplan für die Ausführung der Pipeline. Um den Zeitplan beispielsweise jeden Tag um Mitternacht den Ausdruck 0 0 * * *.

  3. Um die Pipeline on demand auszuführen, Workflow ausführen:

    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    gcloud workflows run "${workflow_name}" --project=${project_id} --location=${location} --data '${workflow_args}'
    

    Die Workflowargumente liegen im JSON-Format vor, werden aber nicht maskiert.

    Je nach Menge der importierten Metadaten kann der Workflow mehrere Minuten oder länger dauern. Weitere Informationen zum Aufrufen des Fortschritts finden Sie unter Auf Ergebnisse der Workflowausführung zugreifen.

    Nachdem die Pipeline ausgeführt wurde, können Sie im Dataplex Catalog nach den importierten Metadaten suchen.

Terraform

  1. Klonen Sie das cloud-dataplex-Repository.

    Das Repository enthält die folgenden Terraform-Dateien:

    • main.tf: definiert die zu erstellenden Google Cloud-Ressourcen.
    • variables.tf: Hier werden die Variablen deklariert.
    • byo-connector.tfvars: definiert die Variablen für die verwaltete Verbindungspipeline.
  2. Ersetzen Sie in der Datei .tfvars die Platzhalter durch die Informationen. für den Connector.

    project_id                      = "PROJECT_ID"
    region                          = "LOCATION_ID"
    service_account                 = "SERVICE_ACCOUNT_ID"
    cron_schedule                   = "CRON_SCHEDULE_EXPRESSION"
    workflow_args                   = {"TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID", "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [], "NETWORK_URI": "", "SUBNETWORK_URI": ""}
    
    
    workflow_source                 = <<EOF
    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: $${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: $${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: $${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: $${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: $${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: $${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: $${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: $${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: $${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: set_default_networking
    
        - set_default_networking:
            assign:
              - NETWORK_TYPE: "networkUri"
              - NETWORKING: $${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: $${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - $${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - $${"--target_location_id=" + args.CLOUD_REGION}
                    - $${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - $${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - $${"--output_folder=" + WORKFLOW_ID}
                    - $${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: $${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: $${args.SERVICE_ACCOUNT}
                        stagingBucket: $${args.CLOUD_STORAGE_BUCKET_ID}
                        $${NETWORK_TYPE}: $${NETWORKING}
                        networkTags: $${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: $${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: $${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - $${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: $${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: $${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: $${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: $${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: $${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: $${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: $${IMPORT_JOB_STATUS}
    EOF
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: der Name des Ziel-Google Cloud-Projekts, in das die Metadaten importiert werden sollen.
    • LOCATION_ID: der Google Cloud-Zielstandort in dem die serverlosen Dataproc- und Metadaten-Importjobs ausgeführt werden. und Metadaten importiert werden.
    • SERVICE_ACCOUNT_ID: das Dienstkonto, das Sie im Abschnitt Erforderliche Rollen dieses Dokuments konfiguriert haben.
    • CRON_SCHEDULE_EXPRESSION: Ein Cron-Ausdruck, der den Zeitplan für die Ausführung der Pipeline definiert. Um den Zeitplan beispielsweise jeden Tag um Mitternacht den Ausdruck 0 0 * * *.
    • ENTRY_GROUP_ID: die ID der Eintragsgruppe, in die Metadaten importiert werden sollen. Die ID der Eingangsgruppe kann Kleinbuchstaben, Ziffern und Bindestriche enthalten.

      Der vollständige Ressourcenname dieser Eintragsgruppe lautet projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID

    • CREATE_ENTRY_GROUP_BOOLEAN: wenn die Pipeline Erstellen Sie die Eintragsgruppe, falls diese noch nicht in Ihrem Projekt vorhanden ist, legen Sie Folgendes fest: Wert auf true.
    • BUCKET_ID: der Name von Cloud Storage Bucket zum Speichern der vom Connector generierten Metadatenimportdatei. Für jede Workflowausführung wird ein neuer Ordner erstellt.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: eine Liste mit zusätzlichen Argumenten, die an den Connector übergeben werden sollen. Beispiele finden Sie unter Benutzerdefinierten Connector für den Metadatenimport entwickeln. Setzen Sie jedes Argument in doppelte und trennen Sie die Argumente durch Kommas.
    • CONTAINER_IMAGE ist das benutzerdefinierte Container-Image des in Artifact Registry gehosteter Connector.
    • ENTRY_TYPES: eine Liste der Eintragstypen, die einbezogen werden für den Import im Format projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. LOCATION_ID muss entweder derselbe Google Cloud-Speicherort sein, in den Sie Metadaten importieren, oder global.
    • ASPECT_TYPES: eine Liste der Aspekttypen, die in den Umfang fallen für den Import im Format projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. Das Feld LOCATION_ID muss entweder gleich Google Cloud-Standort, in den Sie Metadaten importieren, oder global.
    • Optional: Geben Sie für das Argument NETWORK_TAGS eine Liste mit Netzwerk-Tags an.
    • Optional: Geben Sie für das Argument NETWORK_URI den URI des VPC-Netzwerks an, das mit der Datenquelle verbunden ist. Wenn Sie ein Netzwerk angeben, lassen Sie Subnetzwerk-Arguments.
    • Optional: Geben Sie für das Argument SUBNETWORK_URI den URI des Unternetzwerks an, das mit der Datenquelle verbunden ist. Wenn Sie ein Subnetz angeben, lassen Sie das Netzwerkargument weg.
  3. Initialisieren Sie Terraform:

    terraform init
    
  4. Validieren Sie Terraform mit Ihrer .tfvars-Datei:

    terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Ersetzen Sie CONNECTOR_VARIABLES_FILE durch den Namen. Ihrer Variablendefinitionsdatei.

  5. Terraform mit der Datei .tfvars bereitstellen:

    terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Terraform erstellt einen Workflow und einen Cloud Scheduler-Job im angegebenen Projekt. Workflows führt die Pipeline an der den angegebenen Zeitplan.

    Je nach Menge der importierten Metadaten kann der Workflow mehrere Minuten oder länger dauern. Weitere Informationen zur den Fortschritt anzeigen, sehen Sie, Auf Ergebnisse der Workflowausführung zugreifen

    Nachdem die Pipeline ausgeführt wurde, können Sie im Dataplex Catalog nach den importierten Metadaten suchen.

Joblogs ansehen

Verwenden Sie Cloud Logging, um Logs für eine verwaltete Verbindungspipeline anzusehen. Das Protokoll enthält einen Link zu den Logs für Dataproc Serverless. Batchjob und Metadatenimportjob nach Bedarf. Weitere Informationen finden Sie unter Workflow-Logs ansehen.

Fehlerbehebung

Gehen Sie zur Fehlerbehebung folgendermaßen vor:

  • Konfigurieren Sie die Protokollebene des Importjobs für den Metadatenjob so, dass Logging auf Debugebene anstelle von Logging auf Infoebene verwendet wird.
  • Prüfen Sie die Logs für den serverlosen Dataproc-Batchjob (für Connectorausführungen) und den Metadatenimportjob. Weitere Informationen finden Sie unter Dataproc Serverless für Spark-Logs abfragen und Logs von Abfragemetadatenjobs.
  • Wenn ein Eintrag nicht mit der Pipeline importiert werden kann und die Fehlermeldung sollten Sie einen benutzerdefinierten Eintrag mit denselben Details erstellen, in einer Testeintragsgruppe. Weitere Informationen finden Sie unter Benutzerdefinierten Eintrag erstellen.

Nächste Schritte