Contoh kode berikut menunjukkan cara menulis, men-deploy, dan memicu pipeline menggunakan Cloud Function Berbasis Peristiwa dengan pemicu Cloud Pub/Sub.
Membuat dan mengompilasi Pipeline sederhana
Dengan menggunakan Kubeflow Pipelines SDK, bangun pipeline terjadwal dan kompilasi menjadi file YAML.
Contoh hello-world-scheduled-pipeline
:
from kfp import compiler
from kfp import dsl
# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
greeting_str = f'Hello, {message}'
print(greeting_str)
return greeting_str
# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
hello_world_task = hello_world(greet_name)
# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
package_path='hello_world_scheduled_pipeline.yaml')
Mengupload YAML pipeline yang dikompilasi ke bucket Cloud Storage
Buka browser Cloud Storage di konsol Google Cloud.
Klik bucket Cloud Storage yang Anda buat saat mengonfigurasi project.
Dengan menggunakan folder yang sudah ada atau folder baru, upload YAML pipeline yang telah dikompilasi (dalam contoh ini
hello_world_scheduled_pipeline.yaml
) ke folder yang dipilih.Klik file YAML yang diupload untuk mengakses detailnya. Salin gsutil URI untuk digunakan nanti.
Membuat Cloud Function dengan Pemicu Pub/Sub
Buka halaman fungsi Cloud Run di konsol.
Klik tombol Buat fungsi.
Di bagian Dasar, beri nama fungsi Anda (misalnya
my-scheduled-pipeline-function
).Di bagian Pemicu, pilih Cloud Pub/Sub sebagai jenis Pemicu.
Pada dropdown Pilih topik Cloud Pub/Sub, klik Buat topik.
Di kotak Buat topik, beri nama topik baru Anda (misalnya
my-scheduled-pipeline-topic
), lalu pilih Buat topik.Biarkan semua kolom lain dalam setelan default, lalu klik Simpan untuk menyimpan konfigurasi bagian Pemicu.
Biarkan semua kolom lainnya sesuai setelan default, lalu klik Berikutnya untuk melanjutkan ke bagian Kode.
Di bagian Runtime, pilih Python 3.7.
Di titik Entri, masukkan "subscribe" (contoh nama fungsi titik entri kode).
Pada Kode sumber, pilih Editor Inline jika belum dipilih.
Di file
main.py
, tambahkan kode berikut:import base64 import json from google.cloud import aiplatform PROJECT_ID = 'your-project-id' # <---CHANGE THIS REGION = 'your-region' # <---CHANGE THIS PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS def subscribe(event, context): """Triggered from a message on a Cloud Pub/Sub topic. Args: event (dict): Event payload. context (google.cloud.functions.Context): Metadata for the event. """ # decode the event payload string payload_message = base64.b64decode(event['data']).decode('utf-8') # parse payload string into JSON object payload_json = json.loads(payload_message) # trigger pipeline run with payload trigger_pipeline_run(payload_json) def trigger_pipeline_run(payload_json): """Triggers a pipeline run Args: payload_json: expected in the following format: { "pipeline_spec_uri": "<path-to-your-compiled-pipeline>", "parameter_values": { "greet_name": "<any-greet-string>" } } """ pipeline_spec_uri = payload_json['pipeline_spec_uri'] parameter_values = payload_json['parameter_values'] # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri aiplatform.init( project=PROJECT_ID, location=REGION, ) job = aiplatform.PipelineJob( display_name='hello-world-pipeline-cloud-function-invocation', template_path=pipeline_spec_uri, pipeline_root=PIPELINE_ROOT, enable_caching=False, parameter_values=parameter_values ) # Submit the PipelineJob job.submit()
Ganti kode berikut:
- PROJECT_ID: Project Google Cloud tempat pipeline ini berjalan.
- REGION: Region tempat pipeline ini berjalan.
- PIPELINE_ROOT: Tentukan Cloud Storage URI yang dapat diakses oleh akun layanan pipeline Anda. Artefak operasi pipeline Anda disimpan di root pipeline.
Dalam file
requirements.txt
, ganti konten dengan persyaratan paket berikut:google-api-python-client>=1.7.8,<2 google-cloud-aiplatform
Klik deploy untuk men-deploy Fungsi.