Memantau status pipeline

Halaman ini menjelaskan cara memublikasikan peristiwa pipeline Cloud Data Fusion, seperti status pipeline, ke topik Pub/Sub. Dokumen ini juga menjelaskan cara membuat fungsi Cloud Run yang memproses pesan Pub/Sub dan melakukan tindakan, seperti mengidentifikasi dan mencoba lagi pipeline yang gagal.

Sebelum memulai

  • Buat topik tempat Pub/Sub dapat memublikasikan peristiwa pipeline Cloud Data Fusion.

Peran yang diperlukan

Untuk memastikan Akun Layanan Cloud Data Fusion memiliki izin yang diperlukan untuk memublikasikan peristiwa pipeline ke topik Pub/Sub, minta administrator Anda untuk memberikan peran IAM Pub/Sub Publisher (roles/pubsub.publisher) kepada Akun Layanan Cloud Data Fusion di project tempat Anda membuat topik Pub/Sub.

Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Administrator Anda mungkin juga dapat memberikan izin yang diperlukan kepada Akun Layanan Cloud Data Fusion melalui peran khusus atau peran bawaan lainnya.

Mengelola penerbitan peristiwa di instance Cloud Data Fusion

Anda dapat mengelola penerbitan peristiwa di instance Cloud Data Fusion baru dan yang sudah ada menggunakan REST API di versi 6.7.0 dan yang lebih baru.

Memublikasikan peristiwa di instance baru

Buat instance baru dan sertakan kolom EventPublishConfig. Untuk mengetahui informasi selengkapnya tentang kolom wajib diisi untuk instance baru, lihat referensi resource Instance.

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
  -d '{
    "version": "VERSION_NUMBER",
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
  }'

Ganti kode berikut:

  • PROJECT_ID: Google Cloud project ID
  • LOCATION: lokasi project Anda
  • INSTANCE_ID: ID instance Cloud Data Fusion Anda
  • VERSION_NUMBER: Versi Cloud Data Fusion tempat Anda membuat instance–misalnya, 6.10.1
  • TOPIC_ID: ID topik Pub/Sub

Mengaktifkan publikasi peristiwa di instance Cloud Data Fusion yang ada

Perbarui kolom EventPublishConfig di instance Cloud Data Fusion yang ada:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
  -d '{
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
}'

Ganti kode berikut:

  • PROJECT_ID: Google Cloud project ID
  • LOCATION: lokasi project Anda
  • INSTANCE_ID: ID instance Cloud Data Fusion Anda
  • TOPIC_ID: ID topik Pub/Sub

Menghapus publikasi acara dari instance

Untuk menghapus publikasi peristiwa dari instance, perbarui nilai enabled publikasi peristiwa menjadi false:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
  -d '{ "event_publish_config": { "enabled": false } }'

Membuat fungsi untuk membaca pesan Pub/Sub

Fungsi Cloud Run dapat membaca pesan Pub/Sub dan bertindak berdasarkan pesan tersebut, seperti mencoba lagi pipeline yang gagal. Untuk membuat fungsi Cloud Run, lakukan hal berikut:

  1. Di Google Cloud konsol, buka halaman Cloud Run functions.

    Buka Cloud Run Functions

  2. Klik Create function.

  3. Masukkan nama fungsi dan region.

  4. Di kolom Trigger type, pilih Cloud Pub/Sub.

  5. Masukkan ID topik Pub/Sub.

  6. Klik Berikutnya.

  7. Tambahkan fungsi untuk membaca pesan Pub/Sub dan melakukan tindakan lainnya. Misalnya, Anda dapat menambahkan fungsi untuk kasus penggunaan berikut:

    • Mengirim pemberitahuan untuk kegagalan pipeline.
    • Mengirimkan pemberitahuan untuk KPI, seperti jumlah data atau informasi proses.
    • Mulai ulang pipeline yang gagal dan belum dijalankan ulang.

    Untuk contoh fungsi Cloud Run, lihat bagian kasus penggunaan.

  8. Klik Deploy. Untuk mengetahui informasi selengkapnya, lihat Men-deploy fungsi Cloud Run.

Kasus penggunaan: Status pipeline dokumen dan coba lagi pipeline yang gagal

Contoh fungsi Cloud Run berikut membaca pesan Pub/Sub tentang status eksekusi pipeline, lalu mencoba lagi pipeline yang gagal di Cloud Data Fusion.

Contoh fungsi ini merujuk ke komponen Google Cloud berikut:

  • ProjectGoogle Cloud : project tempat fungsi Cloud Run dan topik Pub/Sub dibuat
  • Topik Pub/Sub: topik Pub/Sub yang ditautkan ke instance Cloud Data Fusion Anda
  • Instance Cloud Data Fusion: instance Cloud Data Fusion tempat Anda mendesain dan menjalankan pipeline
  • Tabel BigQuery: tabel BigQuery yang mencatat status pipeline serta detail eksekusi dan eksekusi ulang
  • Fungsi Cloud Run: fungsi Cloud Run tempat Anda men-deploy kode yang mencoba ulang pipeline yang gagal
  1. Contoh fungsi Cloud Run berikut membaca pesan Pub/Sub tentang peristiwa status Cloud Data Fusion.

    # Triggered from a message on a Pub/Sub topic.
    @functions_framework.cloud_event
    def cdf_event_trigger(cloud_event):
    
    decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message.
    pubsub_message = json.loads(decoded_message)
    
    # Extract pipeline run details.
    projectName = pubsub_message["projectName"]
    publishTime = pubsub_message["publishTime"]
    instanceName = pubsub_message["instanceName"]
    namespace = pubsub_message["programStatusEventDetails"]["namespace"]
    applicationName = pubsub_message["programStatusEventDetails"]["applicationName"]
    status = pubsub_message["programStatusEventDetails"]["status"]
    event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms')
    
    print(f"projectName: {projectName}")
    print(f"publishTime: {publishTime}")
    print(f"instanceName: {instanceName}")
    print(f"namespace: {namespace}")
    print(f"applicationName: {applicationName}")
    print(f"status: {status}")
    print(f"event timestamp: {event_timestamp}")
    try:
        error = pubsub_message["programStatusEventDetails"]["error"]
        print(f"error: {error}")
    except:
        print(f"Pipeline: {applicationName}'s current status: {status}")
    
  2. Contoh fungsi berikut membuat dan menyimpan tabel BigQuery, serta mengkueri detail eksekusi pipeline.

    # Global variables.
    pipeline_rerun_count = 0
    has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes.
    table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information.
    
    # Update BigQuery table with the pipeline status and rerun details.
    schema=[
        bigquery.SchemaField("Project_Name", "STRING"),
        bigquery.SchemaField("Instance_Name", "STRING"),
        bigquery.SchemaField("Namespace", "STRING"),
        bigquery.SchemaField("Pipeline_Name", "STRING"),
        bigquery.SchemaField("Pipeline_Status", "STRING"),
        bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"),
    ]
    
    # Prepare DataFrame to load the data in BigQuery.
    data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]}
    dataframe = pd.DataFrame(data)
    
    # Prepare BigQuery data load job configuration.
    job_config = bigquery.LoadJobConfig(schema=schema)
    
    job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.
    
    table = bq_client.get_table(table_id)  # Make an API request.
    print("BigQuery table: {} updated.".format(table_id))
    
  3. Contoh fungsi berikut memeriksa pipeline yang gagal dan apakah pipeline tersebut dijalankan ulang dalam satu jam terakhir.

    bq_client = bigquery.Client()
    
    if status == "FAILED":
        print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.")
    
        QUERY = f"""
            SELECT * FROM `{table_id}`
            WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED"
            AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE)
            AND Pipeline_Rerun_Count > 0
            """
    
        query_job = bq_client.query_and_wait(QUERY)  # API request.
        row_count = query_job.total_rows  # Waits for query to finish.
        print(f"Query job result row count: {row_count}")
    
        if (row_count > 0):
            print("Pipeline has FAILED and rerun recently...")
            global has_pipeline_failed_and_rerun_recently
            has_pipeline_failed_and_rerun_recently = True
    
  4. Jika pipeline yang gagal belum dijalankan baru-baru ini, contoh fungsi berikut akan menjalankan kembali pipeline yang gagal.

    if not has_pipeline_failed_and_rerun_recently:
        applicationName = applicationName
        auth_token = get_access_token()
        post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
        cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api"
        run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName)
    
        # Start the job.
        response = requests.post(run_pipeline_endpoint,headers=post_headers)
        print(f"Response for restarting the failed pipeline: {response}")
        global pipeline_rerun_count
        pipeline_rerun_count = 1
    

Langkah berikutnya