Mengimpor metadata dari sumber kustom menggunakan Alur Kerja

Dokumen ini menjelaskan cara mengimpor metadata dari sumber pihak ketiga ke Dataplex dengan menjalankan pipeline konektivitas terkelola di Alur Kerja.

Untuk menyiapkan pipeline konektivitas terkelola, Anda harus mem-build konektor untuk sumber data. Kemudian, jalankan pipeline di Alur Kerja. Pipeline ini mengekstrak metadata dari sumber data Anda, lalu mengimpor metadata ke Dataplex. Jika perlu, pipeline juga akan membuat grup entri Katalog Dataplex di project Google Cloud Anda.

Untuk mengetahui informasi selengkapnya tentang konektivitas terkelola, lihat Ringkasan konektivitas terkelola.

Sebelum memulai

Sebelum mengimpor metadata, selesaikan tugas-tugas di bagian ini.

Mem-build konektor

Konektor mengekstrak metadata dari sumber data Anda dan membuat file impor metadata yang dapat diimpor oleh Dataplex. Konektor adalah image Artifact Registry yang dapat dijalankan di Dataproc Serverless.

Mengonfigurasi resource Google Cloud

  1. Enable the Workflows, Dataproc, Cloud Storage, Dataplex, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.

    Enable the APIs

    Jika tidak berencana menjalankan pipeline sesuai jadwal, Anda tidak perlu mengaktifkan Cloud Scheduler API.

  2. Buat secret di Secret Manager untuk menyimpan kredensial sumber data pihak ketiga Anda.

  3. Konfigurasikan jaringan Virtual Private Cloud (VPC) Anda untuk menjalankan workload Dataproc Serverless untuk Spark.

  4. Buat bucket Cloud Storage untuk menyimpan file impor metadata.

  5. Buat resource Katalog Dataplex berikut:

    1. Buat jenis aspek kustom untuk entri yang ingin Anda impor.

    2. Buat jenis entri kustom untuk entri yang ingin Anda impor.

Peran yang diperlukan

Akun layanan mewakili identitas alur kerja dan menentukan izin yang dimiliki alur kerja dan resource Google Cloud yang dapat diaksesnya. Anda memerlukan akun layanan untuk Workflows (untuk menjalankan pipeline) dan untuk Dataproc Serverless (untuk menjalankan konektor).

Anda dapat menggunakan akun layanan default Compute Engine (PROJECT_NUMBER-compute@developer.gserviceaccount.com), atau membuat akun layanan Anda sendiri (atau akun) untuk menjalankan pipeline konektivitas terkelola.

Konsol

  1. Di konsol Google Cloud, buka halaman IAM.

    Buka IAM

  2. Pilih project tempat Anda ingin mengimpor metadata.

  3. Klik Grant Access, lalu masukkan alamat email akun layanan.

  4. Tetapkan peran berikut ke akun layanan:

    • Logs Writer
    • Pemilik Grup Entri Dataplex
    • Pemilik Tugas Metadata Dataplex
    • Dataplex Catalog Editor
    • Editor Dataproc
    • Dataproc Worker
    • Secret Manager Secret Accessor - pada secret yang menyimpan kredensial untuk sumber data Anda
    • Storage Object User - di bucket Cloud Storage
    • Artifact Registry Reader - di repositori Artifact Registry yang berisi image konektor
    • Service Account User - jika Anda menggunakan akun layanan yang berbeda, berikan peran ini kepada akun layanan yang menjalankan Alur Kerja di akun layanan yang menjalankan tugas batch Dataproc Serverless
    • Workflows Invoker - jika Anda ingin menjadwalkan pipeline
  5. Simpan perubahan Anda.

gcloud

  1. Memberikan peran ke akun layanan. Jalankan perintah berikut:

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/logging.logWriter
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.entryGroupOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.metadataJobOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.catalogEditor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.editor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.worker
    

    Ganti kode berikut:

    • PROJECT_ID: nama project Google Cloud target tempat metadata akan diimpor.
    • SERVICE_ACCOUNT_ID: akun layanan, seperti my-service-account@my-project.iam.gserviceaccount.com.
  2. Berikan peran berikut ke akun layanan di tingkat resource:

    gcloud secrets add-iam-policy-binding SECRET_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/secretmanager.secretaccessor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/storage.objectUser \
        --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID')
    gcloud artifacts repositories add-iam-policy-binding REPOSITORY \
        --location=REPOSITORY_LOCATION \
        --member=SERVICE_ACCOUNT_ID} \
        --role=roles/artifactregistry.reader
    

    Ganti kode berikut:

    • SECRET_ID: ID secret yang menyimpan kredensial untuk sumber data Anda. Format ini menggunakan format projects/PROJECT_ID/secrets/SECRET_ID.
    • BUCKET_ID: nama bucket Cloud Storage.
    • REPOSITORY: repositori Artifact Registry yang berisi image konektor.
    • REPOSITORY_LOCATION: lokasi Google Cloud tempat repositori dihosting.
  3. Berikan peran roles/iam.serviceAccountUser pada akun layanan yang menjalankan Alur Kerja di akun layanan yang menjalankan tugas batch Dataproc Serverless. Anda harus memberikan peran ini meskipun menggunakan akun layanan yang sama untuk Alur Kerja dan Dataproc Serverless.

    gcloud iam service-accounts add-iam-policy-binding \
        serviceAccount:SERVICE_ACCOUNT_ID \
        --member='SERVICE_ACCOUNT_ID' \
        --role='roles/iam.serviceAccountUser'
    

    Jika Anda menggunakan akun layanan yang berbeda, nilai untuk tanda --member adalah akun layanan yang menjalankan tugas batch Dataproc Serverless.

  4. Jika Anda ingin menjadwalkan pipeline, berikan peran berikut kepada akun layanan:

    gcloud projects add-iam-policy-binding PROJECT_ID \
     --member="SERVICE_ACCOUNT_ID" \
     --role=roles/workflows.invoker
    

Mengimpor metadata

Untuk mengimpor metadata, buat lalu jalankan alur kerja yang menjalankan pipeline konektivitas terkelola. Secara opsional, Anda juga dapat membuat jadwal untuk menjalankan pipeline.

Konsol

  1. Buat alur kerja. Berikan informasi berikut:

    • Akun layanan: akun layanan yang Anda konfigurasikan di bagian Peran yang diperlukan dalam dokumen ini.
    • Encryption: pilih Google-managed encryption key.

    • Menentukan alur kerja: berikan file definisi berikut:

      main:
        params: [args]
        steps:
          - init:
              assign:
              - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
              - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
              - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
              - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
      
          - check_networking:
              switch:
                - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                  raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
                - condition: ${NETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_network_uri:
                        assign:
                          - NETWORKING: ${NETWORK_URI}
                          - NETWORK_TYPE: "networkUri"
                - condition: ${SUBNETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_subnetwork_uri:
                        assign:
                          - NETWORKING: ${SUBNETWORK_URI}
                          - NETWORK_TYPE: "subnetworkUri"
              next: set_default_networking
      
          - set_default_networking:
              assign:
                - NETWORK_TYPE: "networkUri"
                - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
              next: check_create_target_entry_group
      
          - check_create_target_entry_group:
              switch:
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                  next: create_target_entry_group
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                  next: generate_extract_job_link
      
          - create_target_entry_group:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: generate_extract_job_link
      
          - generate_extract_job_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                  severity: "INFO"
              next: submit_pyspark_extract_job
      
          - submit_pyspark_extract_job:
              call: http.post
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                headers:
                  Content-Type: "application/json"
                query:
                  batchId: ${WORKFLOW_ID}
                body:
                  pysparkBatch:
                    mainPythonFileUri: file:///main.py
                    args:
                      - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                      - ${"--target_location_id=" + args.CLOUD_REGION}
                      - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                      - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                      - ${"--output_folder=" + WORKFLOW_ID}
                      - ${args.ADDITIONAL_CONNECTOR_ARGS}
                  runtimeConfig:
                      containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                  environmentConfig:
                      executionConfig:
                          serviceAccount: ${args.SERVICE_ACCOUNT}
                          stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                          ${NETWORK_TYPE}: ${NETWORKING}
                          networkTags: ${NETWORK_TAGS}
              result: RESPONSE_MESSAGE
              next: check_pyspark_extract_job
      
          - check_pyspark_extract_job:
              call: http.get
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: PYSPARK_EXTRACT_JOB_STATUS
              next: check_pyspark_extract_job_done
      
          - check_pyspark_extract_job_done:
              switch:
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                  next: generate_import_logs_link
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              next: pyspark_extract_job_wait
      
          - pyspark_extract_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_pyspark_extract_job
      
          - generate_import_logs_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                  severity: "INFO"
              next: submit_import_job
      
          - submit_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                body:
                  type: IMPORT
                  import_spec:
                    source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                    entry_sync_mode: FULL
                    aspect_sync_mode: INCREMENTAL
                    log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                    scope:
                      entry_groups: 
                        - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                      entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                      aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
              result: IMPORT_JOB_RESPONSE
              next: get_job_start_time
      
          - get_job_start_time:
              assign:
                - importJobStartTime: ${sys.now()}
              next: import_job_startup_wait
      
          - import_job_startup_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: initial_get_import_job
      
          - initial_get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_status_available
      
          - check_import_job_status_available:
              switch:
                - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                  next: kill_import_job
                - condition: ${"status" in IMPORT_JOB_STATUS.body}
                  next: check_import_job_done
              next: import_job_status_wait
      
          - import_job_status_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_import_job_status_available
      
          - check_import_job_done:
              switch:
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                  next: the_end
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                  next: kill_import_job
              next: import_job_wait
      
          - get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_done
      
          - import_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: get_import_job
      
          - kill_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: get_killed_import_job
      
          - get_killed_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: KILLED_IMPORT_JOB_STATUS
              next: killed
      
          - killed:
              raise: ${KILLED_IMPORT_JOB_STATUS}
      
          - the_end:
              return: ${IMPORT_JOB_STATUS}
  2. Untuk menjalankan pipeline sesuai permintaan, jalankan alur kerja.

    Berikan argumen runtime berikut:

    {
        "TARGET_PROJECT_ID": "PROJECT_ID",
        "CLOUD_REGION": "LOCATION_ID",
        "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
        "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
        "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
        "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
        "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
        "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
        "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
        "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
        "IMPORT_JOB_LOG_LEVEL": "INFO",
        "NETWORK_TAGS": [],
        "NETWORK_URI": "",
        "SUBNETWORK_URI": ""
     }
    

    Ganti kode berikut:

    • PROJECT_ID: nama project Google Cloud target tempat metadata akan diimpor.
    • LOCATION_ID: lokasi Google Cloud target tempat tugas impor metadata dan Dataproc Serverless akan berjalan, dan metadata akan diimpor.
    • ENTRY_GROUP_ID: ID grup entri tempat metadata akan diimpor. ID grup entri dapat berisi huruf kecil, angka, dan tanda hubung.

      Nama resource lengkap grup entri ini adalah projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN: jika Anda ingin pipeline membuat grup entri jika belum ada di project, tetapkan nilai ini ke true.
    • BUCKET_ID: nama bucket Cloud Storage untuk menyimpan file impor metadata yang dihasilkan oleh konektor. Setiap eksekusi alur kerja akan membuat folder baru.
    • SERVICE_ACCOUNT_ID: akun layanan yang Anda konfigurasikan di bagian Peran yang diperlukan dalam dokumen ini. Akun layanan menjalankan konektor di Dataproc Serverless.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: daftar argumen tambahan yang akan diteruskan ke konektor. Untuk contohnya, lihat Mengembangkan konektor kustom untuk impor metadata. Sertakan setiap argumen dalam tanda kutip ganda, dan pisahkan argumen dengan koma.
    • CONTAINER_IMAGE: image container kustom konektor yang dihosting di Artifact Registry.
    • ENTRY_TYPES: daftar jenis entri yang termasuk dalam cakupan impor, dalam format projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. LOCATION_ID harus berupa lokasi Google Cloud yang sama dengan lokasi tempat Anda mengimpor metadata, atau global.
    • ASPECT_TYPES: daftar jenis aspek yang termasuk dalam cakupan untuk impor, dalam format projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. LOCATION_ID harus berupa lokasi Google Cloud yang sama dengan lokasi tempat Anda mengimpor metadata, atau global.
    • Opsional: untuk argumen NETWORK_TAGS, berikan daftar tag jaringan.
    • Opsional: Untuk argumen NETWORK_URI, berikan URI jaringan VPC yang terhubung ke sumber data. Jika Anda menyediakan jaringan, hapus argumen subjaringan.
    • Opsional: Untuk argumen SUBNETWORK_URI, berikan URI subjaringan yang terhubung ke sumber data. Jika Anda memberikan subnet, hapus argumen jaringan.

    Bergantung pada jumlah metadata yang Anda impor, pipeline mungkin memerlukan waktu beberapa menit atau lebih lama untuk dijalankan. Untuk informasi selengkapnya tentang cara melihat progres, lihat Mengakses hasil eksekusi alur kerja.

    Setelah pipeline selesai berjalan, Anda dapat menelusuri metadata yang diimpor di Katalog Dataplex.

  3. Opsional: Jika Anda ingin menjalankan pipeline sesuai jadwal, buat jadwal menggunakan Cloud Scheduler. Berikan informasi berikut:

    • Frequency: ekspresi unix-cron yang menentukan jadwal untuk menjalankan pipeline.
    • Argumen alur kerja: argumen runtime untuk konektor, seperti yang dijelaskan pada langkah sebelumnya.
    • Service account: akun layanan. Akun layanan mengelola penjadwal.

gcloud

  1. Simpan definisi beban kerja berikut sebagai file YAML:

    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: ${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: ${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: ${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: ${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: set_default_networking
    
        - set_default_networking:
            assign:
              - NETWORK_TYPE: "networkUri"
              - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - ${"--target_location_id=" + args.CLOUD_REGION}
                    - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--output_folder=" + WORKFLOW_ID}
                    - ${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: ${args.SERVICE_ACCOUNT}
                        stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                        ${NETWORK_TYPE}: ${NETWORKING}
                        networkTags: ${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: ${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: ${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: ${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: ${IMPORT_JOB_STATUS}
  2. Tentukan variabel Bash, buat alur kerja, dan opsional buat jadwal untuk menjalankan pipeline:

    # Define Bash variables (replace with your actual values)
    project_id="PROJECT_ID"
    region="LOCATION_ID"
    service_account="SERVICE_ACCOUNT_ID"
    workflow_source="WORKFLOW_DEFINITION_FILE.yaml"
    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    # Create Workflows resource
    gcloud workflows deploy ${workflow_name} \
      --project=${project_id} \
      --location=${region} \
      --source=${workflow_source} \
      --service-account=${service_account}
    
    # Create Cloud Scheduler job
    gcloud scheduler jobs create http ${workflow_name}-scheduler \
      --project=${project_id} \
      --location=${region} \
      --schedule="CRON_SCHEDULE_EXPRESSION" \
      --time-zone="UTC" \
      --uri="https://workflowexecutions.googleapis.com/v1/projects/${project_id}/locations/${region}/workflows/${workflow_name}/executions" \
      --http-method="POST" \
      --oauth-service-account-email=${service_account} \
      --headers="Content-Type=application/json" \
      --message-body='{"argument": ${workflow_args}}'
    

    Ganti kode berikut:

    • PROJECT_ID: nama project Google Cloud target tempat metadata akan diimpor.
    • LOCATION_ID: lokasi Google Cloud target tempat tugas impor metadata dan Dataproc Serverless akan berjalan, dan metadata akan diimpor.
    • SERVICE_ACCOUNT_ID: akun layanan yang Anda konfigurasikan di bagian Peran yang diperlukan dalam dokumen ini.
    • WORKFLOW_DEFINITION_FILE: jalur ke file YAML definisi alur kerja.
    • WORKFLOW_NAME: nama alur kerja.
    • WORKFLOW_ARGUMENTS: argumen runtime yang akan diteruskan ke konektor. Argumennya dalam format JSON:

      {
          "TARGET_PROJECT_ID": "PROJECT_ID",
          "CLOUD_REGION": "LOCATION_ID",
          "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
          "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
          "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
          "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
          "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
          "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
          "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
          "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
          "IMPORT_JOB_LOG_LEVEL": "INFO",
          "NETWORK_TAGS": [],
          "NETWORK_URI": "",
          "SUBNETWORK_URI": ""
       }
      

      Untuk Cloud Scheduler, tanda kutip ganda di dalam string yang dikutip di-escape menggunakan garis miring terbalik (\). Misalnya: --message-body="{\"argument\": \"{\\\"key\\\": \\\"value\\\"}\"}".

      Ganti kode berikut:

      • ENTRY_GROUP_ID: ID grup entri tempat metadata akan diimpor. ID grup entri dapat berisi huruf kecil, angka, dan tanda hubung.

        Nama resource lengkap grup entri ini adalah projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

      • CREATE_ENTRY_GROUP_BOOLEAN: jika Anda ingin pipeline membuat grup entri jika belum ada di project, tetapkan nilai ini ke true.
      • BUCKET_ID: nama bucket Cloud Storage untuk menyimpan file impor metadata yang dihasilkan oleh konektor. Setiap eksekusi alur kerja akan membuat folder baru.
      • ADDITIONAL_CONNECTOR_ARGUMENTS: daftar argumen tambahan yang akan diteruskan ke konektor. Untuk contohnya, lihat Mengembangkan konektor kustom untuk impor metadata.
      • CONTAINER_IMAGE: image container kustom konektor yang dihosting di Artifact Registry.
      • ENTRY_TYPES: daftar jenis entri yang termasuk dalam cakupan impor, dalam format projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. LOCATION_ID harus berupa lokasi Google Cloud yang sama dengan lokasi tempat Anda mengimpor metadata, atau global.
      • ASPECT_TYPES: daftar jenis aspek yang termasuk dalam cakupan untuk impor, dalam format projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. LOCATION_ID harus berupa lokasi Google Cloud yang sama dengan lokasi tempat Anda mengimpor metadata, atau global.
      • Opsional: untuk argumen NETWORK_TAGS, berikan daftar tag jaringan.
      • Opsional: Untuk argumen NETWORK_URI, berikan URI jaringan VPC yang terhubung ke sumber data. Jika Anda menyediakan jaringan, hapus argumen subjaringan.
      • Opsional: Untuk argumen SUBNETWORK_URI, berikan URI subjaringan yang terhubung ke sumber data. Jika Anda memberikan subnet, hapus argumen jaringan.
    • CRON_SCHEDULE_EXPRESSION: ekspresi cron yang menentukan jadwal untuk menjalankan pipeline. Misalnya, untuk menjalankan jadwal pada tengah malam setiap hari, gunakan ekspresi 0 0 * * *.

  3. Untuk menjalankan pipeline sesuai permintaan, jalankan alur kerja:

    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    gcloud workflows run "${workflow_name}" --project=${project_id} --location=${location} --data '${workflow_args}'
    

    Argumen alur kerja dalam format JSON, tetapi tidak di-escape.

    Bergantung pada jumlah metadata yang Anda impor, alur kerja mungkin memerlukan waktu beberapa menit atau lebih lama untuk dijalankan. Untuk mengetahui informasi selengkapnya tentang cara melihat progres, lihat Mengakses hasil eksekusi alur kerja.

    Setelah pipeline selesai berjalan, Anda dapat menelusuri metadata yang diimpor di Katalog Dataplex.

Terraform

  1. Clone repositori cloud-dataplex.

    Repositori ini menyertakan file Terraform berikut:

  2. Edit file .tfvars untuk mengganti placeholder dengan informasi untuk konektor Anda.

    project_id                      = "PROJECT_ID"
    region                          = "LOCATION_ID"
    service_account                 = "SERVICE_ACCOUNT_ID"
    cron_schedule                   = "CRON_SCHEDULE_EXPRESSION"
    workflow_args                   = {"TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID", "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [], "NETWORK_URI": "", "SUBNETWORK_URI": ""}
    
    
    workflow_source                 = <<EOF
    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: $${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: $${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: $${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: $${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: $${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: $${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: $${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: $${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: $${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: set_default_networking
    
        - set_default_networking:
            assign:
              - NETWORK_TYPE: "networkUri"
              - NETWORKING: $${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: $${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - $${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - $${"--target_location_id=" + args.CLOUD_REGION}
                    - $${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - $${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - $${"--output_folder=" + WORKFLOW_ID}
                    - $${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: $${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: $${args.SERVICE_ACCOUNT}
                        stagingBucket: $${args.CLOUD_STORAGE_BUCKET_ID}
                        $${NETWORK_TYPE}: $${NETWORKING}
                        networkTags: $${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: $${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: $${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - $${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: $${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: $${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: $${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: $${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: $${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: $${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: $${IMPORT_JOB_STATUS}
    EOF
    

    Ganti kode berikut:

    • PROJECT_ID: nama project Google Cloud target tempat metadata akan diimpor.
    • LOCATION_ID: lokasi Google Cloud target tempat tugas impor metadata dan Dataproc Serverless akan berjalan, dan metadata akan diimpor.
    • SERVICE_ACCOUNT_ID: akun layanan yang Anda konfigurasikan di bagian Peran yang diperlukan dalam dokumen ini.
    • CRON_SCHEDULE_EXPRESSION: ekspresi cron yang menentukan jadwal untuk menjalankan pipeline. Misalnya, untuk menjalankan jadwal pada tengah malam setiap hari, gunakan ekspresi 0 0 * * *.
    • ENTRY_GROUP_ID: ID grup entri tempat metadata akan diimpor. ID grup entri dapat berisi huruf kecil, angka, dan tanda hubung.

      Nama resource lengkap grup entri ini adalah projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN: jika Anda ingin pipeline membuat grup entri jika belum ada di project, tetapkan nilai ini ke true.
    • BUCKET_ID: nama bucket Cloud Storage untuk menyimpan file impor metadata yang dihasilkan oleh konektor. Setiap eksekusi alur kerja akan membuat folder baru.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: daftar argumen tambahan yang akan diteruskan ke konektor. Untuk contohnya, lihat Mengembangkan konektor kustom untuk impor metadata. Sertakan setiap argumen dalam tanda kutip ganda, dan pisahkan argumen dengan koma.
    • CONTAINER_IMAGE: image container kustom konektor yang dihosting di Artifact Registry.
    • ENTRY_TYPES: daftar jenis entri yang termasuk dalam cakupan impor, dalam format projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. LOCATION_ID harus berupa lokasi Google Cloud yang sama dengan lokasi tempat Anda mengimpor metadata, atau global.
    • ASPECT_TYPES: daftar jenis aspek yang termasuk dalam cakupan untuk impor, dalam format projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. LOCATION_ID harus berupa lokasi Google Cloud yang sama dengan lokasi tempat Anda mengimpor metadata, atau global.
    • Opsional: untuk argumen NETWORK_TAGS, berikan daftar tag jaringan.
    • Opsional: Untuk argumen NETWORK_URI, berikan URI jaringan VPC yang terhubung ke sumber data. Jika Anda menyediakan jaringan, hapus argumen subjaringan.
    • Opsional: Untuk argumen SUBNETWORK_URI, berikan URI subjaringan yang terhubung ke sumber data. Jika Anda memberikan subnet, hapus argumen jaringan.
  3. Lakukan inisialisasi Terraform:

    terraform init
    
  4. Validasi Terraform dengan file .tfvars Anda:

    terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Ganti CONNECTOR_VARIABLES_FILE dengan nama file definisi variabel Anda.

  5. Deploy Terraform dengan file .tfvars Anda:

    terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Terraform membuat alur kerja dan tugas Cloud Scheduler dalam project yang ditentukan. Alur kerja menjalankan pipeline sesuai jadwal yang Anda tentukan.

    Bergantung pada jumlah metadata yang Anda impor, alur kerja mungkin memerlukan waktu beberapa menit atau lebih lama untuk dijalankan. Untuk mengetahui informasi selengkapnya tentang cara melihat progres, lihat Mengakses hasil eksekusi alur kerja.

    Setelah pipeline selesai berjalan, Anda dapat menelusuri metadata yang diimpor di Katalog Dataplex.

Lihat log tugas

Gunakan Cloud Logging untuk melihat log untuk pipeline konektivitas terkelola. Payload log menyertakan link ke log untuk tugas batch Dataproc Serverless dan tugas impor metadata, sesuai kebutuhan. Untuk informasi selengkapnya, lihat Melihat log alur kerja.

Pemecahan masalah

Gunakan saran pemecahan masalah berikut:

Langkah selanjutnya