Memantau status pipeline

Halaman ini menjelaskan cara memublikasikan peristiwa pipeline Cloud Data Fusion, seperti status pipeline, ke topik Pub/Sub. Dokumen ini juga menjelaskan cara membuat Fungsi Cloud Run yang memproses pesan Pub/Sub dan mengambil tindakan, seperti mengidentifikasi dan mencoba kembali pipeline yang gagal.

Sebelum memulai

  • Buat topik tempat Pub/Sub dapat memublikasikan peristiwa pipeline Cloud Data Fusion.

Peran yang diperlukan

Untuk memastikan Akun Layanan Cloud Data Fusion memiliki izin untuk memublikasikan peristiwa pipeline ke topik Pub/Sub, minta administrator untuk memberi Akun Layanan Cloud Data Fusion Peran IAM Pub/Sub Publisher (roles/pubsub.publisher) di project tempat Anda membuat topik Pub/Sub. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Administrator Anda mungkin juga dapat memberikan Akun Layanan Cloud Data Fusion izin yang diperlukan melalui perintah peran atau setelan standar lainnya peran tertentu.

Mengelola publikasi peristiwa di instance Cloud Data Fusion

Anda dapat mengelola publikasi peristiwa di Cloud Data Fusion baru dan yang sudah ada instance yang menggunakan REST API di versi 6.7.0 dan yang lebih baru.

Memublikasikan peristiwa dalam instance baru

Buat instance baru dan sertakan kolom EventPublishConfig. Untuk selengkapnya informasi tentang kolom yang wajib diisi untuk instance baru. Lihat Resource instance alamat IP internal.

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
  -d '{
    "version": "VERSION_NUMBER",
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
  }'

Ganti kode berikut:

  • PROJECT_ID: project ID Google Cloud
  • LOCATION: lokasi project Anda
  • INSTANCE_ID: ID Cloud Data Fusion Anda salinan digital
  • VERSION_NUMBER: Versi Cloud Data Fusion tempat Anda membuat instance, misalnya, 6.10.1
  • TOPIC_ID: ID topik Pub/Sub

Mengaktifkan publikasi peristiwa di instance Cloud Data Fusion yang ada

Ubah EventPublishConfig di instance Cloud Data Fusion yang ada:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
  -d '{
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
}'

Ganti kode berikut:

  • PROJECT_ID: project ID Google Cloud
  • LOCATION: lokasi project Anda
  • INSTANCE_ID: ID Cloud Data Fusion Anda salinan digital
  • TOPIC_ID: ID topik Pub/Sub

Menghapus publikasi acara dari instance

Untuk menghapus publikasi acara dari instance, perbarui peristiwa memublikasikan nilai enabled ke false:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
  -d '{ "event_publish_config": { "enabled": false } }'

Membuat fungsi untuk membaca pesan Pub/Sub

Fungsi Cloud Run dapat membaca pesan Pub/Sub dan menindaklanjuti seperti mencoba lagi pipeline yang gagal. Untuk membuat fungsi Cloud Run, lakukan hal berikut:

  1. Di konsol Google Cloud, buka halaman Cloud Run functions.

    Buka fungsi Cloud Run

  2. Klik Create function.

  3. Masukkan region dan nama fungsi.

  4. Di kolom Trigger type, pilih Cloud Pub/Sub.

  5. Masukkan ID topik Pub/Sub.

  6. Klik Berikutnya.

  7. Tambahkan fungsi untuk membaca pesan Pub/Sub dan mengambil tindakan. Misalnya, Anda dapat menambahkan fungsi untuk kasus penggunaan berikut:

    • Kirim pemberitahuan untuk kegagalan pipeline.
    • Mengirim pemberitahuan untuk KPI, seperti jumlah kumpulan data atau informasi pengoperasian.
    • Memulai ulang pipeline yang gagal dan belum dijalankan ulang.

    Untuk contoh fungsi Cloud Run, lihat kasus penggunaan.

  8. Klik Deploy. Untuk mengetahui informasi selengkapnya, lihat Men-deploy fungsi Cloud Run.

Kasus penggunaan: Mendokumentasikan status pipeline dan mencoba ulang pipeline yang gagal

Contoh fungsi Cloud Run berikut telah dibaca pesan Pub/Sub tentang status operasi pipeline, lalu coba lagi pipeline yang gagal di Cloud Data Fusion.

Contoh fungsi ini merujuk pada komponen Google Cloud berikut:

  • Project Google Cloud: project tempat Fungsi Cloud Run dan topik Pub/Sub dibuat
  • Topik Pub/Sub: topik Pub/Sub yang ditautkan ke instance Cloud Data Fusion Anda
  • Instance Cloud Data Fusion: Cloud Data Fusion instance tempat Anda mendesain dan menjalankan pipeline
  • Tabel BigQuery: tabel BigQuery yang menangkap status pipeline dan detail operasi dan operasi ulang
  • Fungsi Cloud Run: fungsi Cloud Run tempat Anda men-deploy kode yang mencoba kembali pipeline yang gagal
  1. Contoh fungsi Cloud Run berikut membaca Pesan Pub/Sub tentang status Cloud Data Fusion peristiwa.

    # Triggered from a message on a Pub/Sub topic.
    @functions_framework.cloud_event
    def cdf_event_trigger(cloud_event):
    
    decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message.
    pubsub_message = json.loads(decoded_message)
    
    # Extract pipeline run details.
    projectName = pubsub_message["projectName"]
    publishTime = pubsub_message["publishTime"]
    instanceName = pubsub_message["instanceName"]
    namespace = pubsub_message["programStatusEventDetails"]["namespace"]
    applicationName = pubsub_message["programStatusEventDetails"]["applicationName"]
    status = pubsub_message["programStatusEventDetails"]["status"]
    event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms')
    
    print(f"projectName: {projectName}")
    print(f"publishTime: {publishTime}")
    print(f"instanceName: {instanceName}")
    print(f"namespace: {namespace}")
    print(f"applicationName: {applicationName}")
    print(f"status: {status}")
    print(f"event timestamp: {event_timestamp}")
    try:
        error = pubsub_message["programStatusEventDetails"]["error"]
        print(f"error: {error}")
    except:
        print(f"Pipeline: {applicationName}'s current status: {status}")
    
  2. Fungsi contoh berikut membuat dan menyimpan sebuah tabel BigQuery, dan membuat kueri detail operasi pipeline.

    # Global variables.
    pipeline_rerun_count = 0
    has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes.
    table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information.
    
    # Update BigQuery table with the pipeline status and rerun details.
    schema=[
        bigquery.SchemaField("Project_Name", "STRING"),
        bigquery.SchemaField("Instance_Name", "STRING"),
        bigquery.SchemaField("Namespace", "STRING"),
        bigquery.SchemaField("Pipeline_Name", "STRING"),
        bigquery.SchemaField("Pipeline_Status", "STRING"),
        bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"),
    ]
    
    # Prepare DataFrame to load the data in BigQuery.
    data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]}
    dataframe = pd.DataFrame(data)
    
    # Prepare BigQuery data load job configuration.
    job_config = bigquery.LoadJobConfig(schema=schema)
    
    job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.
    
    table = bq_client.get_table(table_id)  # Make an API request.
    print("BigQuery table: {} updated.".format(table_id))
    
  3. Contoh fungsi berikut memeriksa pipeline yang telah gagal dan apakah iklan dijalankan kembali dalam satu jam terakhir.

    bq_client = bigquery.Client()
    
    if status == "FAILED":
        print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.")
    
        QUERY = f"""
            SELECT * FROM `{table_id}`
            WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED"
            AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE)
            AND Pipeline_Rerun_Count > 0
            """
    
        query_job = bq_client.query_and_wait(QUERY)  # API request.
        row_count = query_job.total_rows  # Waits for query to finish.
        print(f"Query job result row count: {row_count}")
    
        if (row_count > 0):
            print("Pipeline has FAILED and rerun recently...")
            global has_pipeline_failed_and_rerun_recently
            has_pipeline_failed_and_rerun_recently = True
    
  4. Jika pipeline yang gagal belum berjalan baru-baru ini, contoh fungsi berikut menjalankan ulang pipeline yang gagal.

    if not has_pipeline_failed_and_rerun_recently:
        applicationName = applicationName
        auth_token = get_access_token()
        post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
        cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api"
        run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName)
    
        # Start the job.
        response = requests.post(run_pipeline_endpoint,headers=post_headers)
        print(f"Response for restarting the failed pipeline: {response}")
        global pipeline_rerun_count
        pipeline_rerun_count = 1
    

Langkah selanjutnya