Halaman ini menjelaskan cara memublikasikan peristiwa pipeline Cloud Data Fusion, seperti status pipeline, ke topik Pub/Sub. Panduan ini juga menjelaskan cara membuat fungsi Cloud Run yang memproses pesan Pub/Sub dan mengambil tindakan, seperti mengidentifikasi dan mencoba kembali pipeline yang gagal.
Sebelum memulai
- Buat topik tempat Pub/Sub dapat memublikasikan peristiwa pipeline Cloud Data Fusion.
Peran yang diperlukan
Untuk memastikan bahwa Akun Layanan Cloud Data Fusion memiliki izin yang diperlukan untuk memublikasikan peristiwa pipeline ke topik Pub/Sub, minta administrator Anda untuk memberikan peran IAM Pub/Sub Publisher (roles/pubsub.publisher
) kepada Akun Layanan Cloud Data Fusion di project tempat Anda membuat topik Pub/Sub.
Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.
Administrator Anda mungkin juga dapat memberikan izin yang diperlukan kepada Akun Layanan Cloud Data Fusion melalui peran khusus atau peran bawaan lainnya.
Mengelola publikasi peristiwa di instance Cloud Data Fusion
Anda dapat mengelola publikasi peristiwa di instance Cloud Data Fusion yang baru dan yang sudah ada menggunakan REST API dalam versi 6.7.0 dan yang lebih baru.
Memublikasikan peristiwa di instance baru
Buat instance baru dan sertakan kolom EventPublishConfig
. Untuk mengetahui informasi selengkapnya tentang kolom yang diperlukan untuk instance baru, lihat referensi Resource instance.
curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
"https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
-d '{
"version": "VERSION_NUMBER",
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Ganti kode berikut:
PROJECT_ID
: Google Cloud project IDLOCATION
: lokasi project AndaINSTANCE_ID
: ID instance Cloud Data Fusion AndaVERSION_NUMBER
: Versi Cloud Data Fusion tempat Anda membuat instance, misalnya,6.10.1
TOPIC_ID
: ID topik Pub/Sub
Mengaktifkan publikasi peristiwa di instance Cloud Data Fusion yang ada
Perbarui kolom EventPublishConfig
di instance Cloud Data Fusion yang ada:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
-d '{
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Ganti kode berikut:
PROJECT_ID
: Google Cloud project IDLOCATION
: lokasi project AndaINSTANCE_ID
: ID instance Cloud Data Fusion AndaTOPIC_ID
: ID topik Pub/Sub
Menghapus publikasi peristiwa dari instance
Untuk menghapus publikasi peristiwa dari instance, perbarui nilai enabled
publikasi peristiwa menjadi false
:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
-d '{ "event_publish_config": { "enabled": false } }'
Membuat fungsi untuk membaca pesan Pub/Sub
Fungsi Cloud Run dapat membaca pesan Pub/Sub dan menindaklanjutinya, seperti mencoba ulang pipeline yang gagal. Untuk membuat fungsi Cloud Run, lakukan hal berikut:
Di konsol Google Cloud, buka halaman Cloud Run functions.
Klik Create function.
Masukkan nama dan wilayah fungsi.
Di kolom Trigger type, pilih Cloud Pub/Sub.
Masukkan ID topik Pub/Sub.
Klik Berikutnya.
Tambahkan fungsi untuk membaca pesan Pub/Sub dan melakukan tindakan lain. Misalnya, Anda dapat menambahkan fungsi untuk kasus penggunaan berikut:
- Mengirim pemberitahuan untuk kegagalan pipeline.
- Mengirim pemberitahuan untuk KPI, seperti jumlah data atau informasi operasi.
- Mulai ulang pipeline yang gagal dan belum dijalankan ulang.
Untuk contoh fungsi Cloud Run, lihat bagian kasus penggunaan.
Klik Deploy. Untuk informasi selengkapnya, lihat Men-deploy fungsi Cloud Run.
Kasus penggunaan: Mendokumentasikan status pipeline dan mencoba kembali pipeline yang gagal
Contoh fungsi Cloud Run berikut membaca pesan Pub/Sub tentang status pengoperasian pipeline, lalu mencoba ulang pipeline yang gagal di Cloud Data Fusion.
Contoh fungsi ini merujuk ke komponen Google Cloud berikut:
- Google Cloud project: project tempat fungsi Cloud Run dan topik Pub/Sub dibuat
- Topik Pub/Sub: topik Pub/Sub yang ditautkan ke instance Cloud Data Fusion Anda
- Instance Cloud Data Fusion: instance Cloud Data Fusion tempat Anda mendesain dan menjalankan pipeline
- Tabel BigQuery: tabel BigQuery yang menangkap status pipeline serta detail pengoperasian dan pengoperasian ulang
- Cloud Run function: Cloud Run function tempat Anda men-deploy kode yang mencoba ulang pipeline yang gagal
Contoh fungsi Cloud Run berikut membaca pesan Pub/Sub tentang peristiwa status Cloud Data Fusion.
# Triggered from a message on a Pub/Sub topic. @functions_framework.cloud_event def cdf_event_trigger(cloud_event): decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message. pubsub_message = json.loads(decoded_message) # Extract pipeline run details. projectName = pubsub_message["projectName"] publishTime = pubsub_message["publishTime"] instanceName = pubsub_message["instanceName"] namespace = pubsub_message["programStatusEventDetails"]["namespace"] applicationName = pubsub_message["programStatusEventDetails"]["applicationName"] status = pubsub_message["programStatusEventDetails"]["status"] event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms') print(f"projectName: {projectName}") print(f"publishTime: {publishTime}") print(f"instanceName: {instanceName}") print(f"namespace: {namespace}") print(f"applicationName: {applicationName}") print(f"status: {status}") print(f"event timestamp: {event_timestamp}") try: error = pubsub_message["programStatusEventDetails"]["error"] print(f"error: {error}") except: print(f"Pipeline: {applicationName}'s current status: {status}")
Contoh fungsi berikut membuat dan menyimpan tabel BigQuery, serta membuat kueri detail operasi pipeline.
# Global variables. pipeline_rerun_count = 0 has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes. table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information. # Update BigQuery table with the pipeline status and rerun details. schema=[ bigquery.SchemaField("Project_Name", "STRING"), bigquery.SchemaField("Instance_Name", "STRING"), bigquery.SchemaField("Namespace", "STRING"), bigquery.SchemaField("Pipeline_Name", "STRING"), bigquery.SchemaField("Pipeline_Status", "STRING"), bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"), bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"), ] # Prepare DataFrame to load the data in BigQuery. data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]} dataframe = pd.DataFrame(data) # Prepare BigQuery data load job configuration. job_config = bigquery.LoadJobConfig(schema=schema) job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config) job.result() # Wait for the job to complete. table = bq_client.get_table(table_id) # Make an API request. print("BigQuery table: {} updated.".format(table_id))
Contoh fungsi berikut memeriksa pipeline yang telah gagal dan apakah pipeline tersebut dijalankan ulang dalam satu jam terakhir.
bq_client = bigquery.Client() if status == "FAILED": print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.") QUERY = f""" SELECT * FROM `{table_id}` WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED" AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE) AND Pipeline_Rerun_Count > 0 """ query_job = bq_client.query_and_wait(QUERY) # API request. row_count = query_job.total_rows # Waits for query to finish. print(f"Query job result row count: {row_count}") if (row_count > 0): print("Pipeline has FAILED and rerun recently...") global has_pipeline_failed_and_rerun_recently has_pipeline_failed_and_rerun_recently = True
Jika pipeline yang gagal belum berjalan baru-baru ini, contoh fungsi berikut akan menjalankan kembali pipeline yang gagal.
if not has_pipeline_failed_and_rerun_recently: applicationName = applicationName auth_token = get_access_token() post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"} cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api" run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName) # Start the job. response = requests.post(run_pipeline_endpoint,headers=post_headers) print(f"Response for restarting the failed pipeline: {response}") global pipeline_rerun_count pipeline_rerun_count = 1
Langkah selanjutnya
- Pelajari cara menulis fungsi Cloud Run.