Halaman ini menjelaskan cara memublikasikan peristiwa pipeline Cloud Data Fusion, seperti status pipeline, ke topik Pub/Sub. Dokumen ini juga menjelaskan cara membuat Fungsi Cloud Run yang memproses pesan Pub/Sub dan mengambil tindakan, seperti mengidentifikasi dan mencoba kembali pipeline yang gagal.
Sebelum memulai
- Buat topik tempat Pub/Sub dapat memublikasikan peristiwa pipeline Cloud Data Fusion.
Peran yang diperlukan
Untuk memastikan Akun Layanan Cloud Data Fusion memiliki
izin untuk memublikasikan peristiwa pipeline ke topik Pub/Sub,
minta administrator untuk memberi Akun Layanan Cloud Data Fusion
Peran IAM Pub/Sub Publisher (roles/pubsub.publisher
) di project tempat Anda membuat topik Pub/Sub.
Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.
Administrator Anda mungkin juga dapat memberikan Akun Layanan Cloud Data Fusion izin yang diperlukan melalui perintah peran atau setelan standar lainnya peran tertentu.
Mengelola publikasi peristiwa di instance Cloud Data Fusion
Anda dapat mengelola publikasi peristiwa di Cloud Data Fusion baru dan yang sudah ada instance yang menggunakan REST API di versi 6.7.0 dan yang lebih baru.
Memublikasikan peristiwa dalam instance baru
Buat instance baru dan sertakan kolom EventPublishConfig
. Untuk selengkapnya
informasi tentang kolom yang wajib diisi untuk instance baru. Lihat
Resource instance
alamat IP internal.
curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
"https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
-d '{
"version": "VERSION_NUMBER",
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Ganti kode berikut:
PROJECT_ID
: project ID Google CloudLOCATION
: lokasi project AndaINSTANCE_ID
: ID Cloud Data Fusion Anda salinan digitalVERSION_NUMBER
: Versi Cloud Data Fusion tempat Anda membuat instance, misalnya,6.10.1
TOPIC_ID
: ID topik Pub/Sub
Mengaktifkan publikasi peristiwa di instance Cloud Data Fusion yang ada
Ubah
EventPublishConfig
di instance Cloud Data Fusion yang ada:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
-d '{
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Ganti kode berikut:
PROJECT_ID
: project ID Google CloudLOCATION
: lokasi project AndaINSTANCE_ID
: ID Cloud Data Fusion Anda salinan digitalTOPIC_ID
: ID topik Pub/Sub
Menghapus publikasi acara dari instance
Untuk menghapus publikasi acara dari instance, perbarui
peristiwa memublikasikan nilai enabled
ke false
:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
-d '{ "event_publish_config": { "enabled": false } }'
Membuat fungsi untuk membaca pesan Pub/Sub
Fungsi Cloud Run dapat membaca pesan Pub/Sub dan menindaklanjuti seperti mencoba lagi pipeline yang gagal. Untuk membuat fungsi Cloud Run, lakukan hal berikut:
Di konsol Google Cloud, buka halaman Cloud Run functions.
Klik Create function.
Masukkan region dan nama fungsi.
Di kolom Trigger type, pilih Cloud Pub/Sub.
Masukkan ID topik Pub/Sub.
Klik Berikutnya.
Tambahkan fungsi untuk membaca pesan Pub/Sub dan mengambil tindakan. Misalnya, Anda dapat menambahkan fungsi untuk kasus penggunaan berikut:
- Kirim pemberitahuan untuk kegagalan pipeline.
- Mengirim pemberitahuan untuk KPI, seperti jumlah kumpulan data atau informasi pengoperasian.
- Memulai ulang pipeline yang gagal dan belum dijalankan ulang.
Untuk contoh fungsi Cloud Run, lihat kasus penggunaan.
Klik Deploy. Untuk mengetahui informasi selengkapnya, lihat Men-deploy fungsi Cloud Run.
Kasus penggunaan: Mendokumentasikan status pipeline dan mencoba ulang pipeline yang gagal
Contoh fungsi Cloud Run berikut telah dibaca pesan Pub/Sub tentang status operasi pipeline, lalu coba lagi pipeline yang gagal di Cloud Data Fusion.
Contoh fungsi ini merujuk pada komponen Google Cloud berikut:
- Project Google Cloud: project tempat Fungsi Cloud Run dan topik Pub/Sub dibuat
- Topik Pub/Sub: topik Pub/Sub yang ditautkan ke instance Cloud Data Fusion Anda
- Instance Cloud Data Fusion: Cloud Data Fusion instance tempat Anda mendesain dan menjalankan pipeline
- Tabel BigQuery: tabel BigQuery yang menangkap status pipeline dan detail operasi dan operasi ulang
- Fungsi Cloud Run: fungsi Cloud Run tempat Anda men-deploy kode yang mencoba kembali pipeline yang gagal
Contoh fungsi Cloud Run berikut membaca Pesan Pub/Sub tentang status Cloud Data Fusion peristiwa.
# Triggered from a message on a Pub/Sub topic. @functions_framework.cloud_event def cdf_event_trigger(cloud_event): decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message. pubsub_message = json.loads(decoded_message) # Extract pipeline run details. projectName = pubsub_message["projectName"] publishTime = pubsub_message["publishTime"] instanceName = pubsub_message["instanceName"] namespace = pubsub_message["programStatusEventDetails"]["namespace"] applicationName = pubsub_message["programStatusEventDetails"]["applicationName"] status = pubsub_message["programStatusEventDetails"]["status"] event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms') print(f"projectName: {projectName}") print(f"publishTime: {publishTime}") print(f"instanceName: {instanceName}") print(f"namespace: {namespace}") print(f"applicationName: {applicationName}") print(f"status: {status}") print(f"event timestamp: {event_timestamp}") try: error = pubsub_message["programStatusEventDetails"]["error"] print(f"error: {error}") except: print(f"Pipeline: {applicationName}'s current status: {status}")
Fungsi contoh berikut membuat dan menyimpan sebuah tabel BigQuery, dan membuat kueri detail operasi pipeline.
# Global variables. pipeline_rerun_count = 0 has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes. table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information. # Update BigQuery table with the pipeline status and rerun details. schema=[ bigquery.SchemaField("Project_Name", "STRING"), bigquery.SchemaField("Instance_Name", "STRING"), bigquery.SchemaField("Namespace", "STRING"), bigquery.SchemaField("Pipeline_Name", "STRING"), bigquery.SchemaField("Pipeline_Status", "STRING"), bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"), bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"), ] # Prepare DataFrame to load the data in BigQuery. data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]} dataframe = pd.DataFrame(data) # Prepare BigQuery data load job configuration. job_config = bigquery.LoadJobConfig(schema=schema) job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config) job.result() # Wait for the job to complete. table = bq_client.get_table(table_id) # Make an API request. print("BigQuery table: {} updated.".format(table_id))
Contoh fungsi berikut memeriksa pipeline yang telah gagal dan apakah iklan dijalankan kembali dalam satu jam terakhir.
bq_client = bigquery.Client() if status == "FAILED": print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.") QUERY = f""" SELECT * FROM `{table_id}` WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED" AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE) AND Pipeline_Rerun_Count > 0 """ query_job = bq_client.query_and_wait(QUERY) # API request. row_count = query_job.total_rows # Waits for query to finish. print(f"Query job result row count: {row_count}") if (row_count > 0): print("Pipeline has FAILED and rerun recently...") global has_pipeline_failed_and_rerun_recently has_pipeline_failed_and_rerun_recently = True
Jika pipeline yang gagal belum berjalan baru-baru ini, contoh fungsi berikut menjalankan ulang pipeline yang gagal.
if not has_pipeline_failed_and_rerun_recently: applicationName = applicationName auth_token = get_access_token() post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"} cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api" run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName) # Start the job. response = requests.post(run_pipeline_endpoint,headers=post_headers) print(f"Response for restarting the failed pipeline: {response}") global pipeline_rerun_count pipeline_rerun_count = 1
Langkah selanjutnya
- Pelajari cara menulis fungsi Cloud Run.