Halaman ini menunjukkan cara menulis, men-deploy, dan memicu proses pipeline menggunakan Cloud Function Berbasis Peristiwa dengan pemicu Cloud Pub/Sub. Ikuti langkah-langkah berikut:
Tentukan pipeline ML menggunakan Kubeflow Pipelines (KFP) SDK dan kompilasi menjadi file YAML.
Upload definisi pipeline yang dikompilasi ke bucket Cloud Storage.
Gunakan fungsi Cloud Run untuk membuat, mengonfigurasi, dan men-deploy fungsi yang dipicu oleh topik Pub/Sub baru atau yang sudah ada.
Menentukan dan mengompilasi pipeline
Dengan menggunakan Kubeflow Pipelines SDK, bangun pipeline terjadwal dan kompilasi menjadi file YAML.
Contoh hello-world-scheduled-pipeline
:
from kfp import compiler
from kfp import dsl
# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
greeting_str = f'Hello, {message}'
print(greeting_str)
return greeting_str
# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
hello_world_task = hello_world(greet_name)
# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
package_path='hello_world_scheduled_pipeline.yaml')
Mengupload YAML pipeline yang dikompilasi ke bucket Cloud Storage
Buka browser Cloud Storage di Google Cloud konsol.
Klik bucket Cloud Storage yang Anda buat saat mengonfigurasi project.
Dengan menggunakan folder yang sudah ada atau folder baru, upload YAML pipeline yang telah dikompilasi (dalam contoh ini
hello_world_scheduled_pipeline.yaml
) ke folder yang dipilih.Klik file YAML yang diupload untuk mengakses detailnya. Salin gsutil URI untuk digunakan nanti.
Membuat fungsi Cloud Run dengan pemicu Pub/Sub
Buka halaman fungsi Cloud Run di konsol.
Klik tombol Buat fungsi.
Di bagian Dasar, beri nama fungsi Anda (misalnya
my-scheduled-pipeline-function
).Di bagian Pemicu, pilih Cloud Pub/Sub sebagai jenis Pemicu.
Pada daftar Pilih topik Cloud Pub/Sub, klik Buat topik.
Di kotak Buat topik, beri nama topik baru Anda (misalnya
my-scheduled-pipeline-topic
), lalu pilih Buat topik.Biarkan semua kolom lain dalam setelan default, lalu klik Simpan untuk menyimpan konfigurasi bagian Pemicu.
Biarkan semua kolom lainnya sesuai setelan default, lalu klik Berikutnya untuk melanjutkan ke bagian Kode.
Di bagian Runtime, pilih Python 3.7.
Di titik Entri, masukkan "subscribe" (contoh nama fungsi titik entri kode).
Pada Kode sumber, pilih Editor Inline jika belum dipilih.
Di file
main.py
, tambahkan kode berikut:import base64 import json from google.cloud import aiplatform PROJECT_ID = 'your-project-id' # <---CHANGE THIS REGION = 'your-region' # <---CHANGE THIS PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS def subscribe(event, context): """Triggered from a message on a Cloud Pub/Sub topic. Args: event (dict): Event payload. context (google.cloud.functions.Context): Metadata for the event. """ # decode the event payload string payload_message = base64.b64decode(event['data']).decode('utf-8') # parse payload string into JSON object payload_json = json.loads(payload_message) # trigger pipeline run with payload trigger_pipeline_run(payload_json) def trigger_pipeline_run(payload_json): """Triggers a pipeline run Args: payload_json: expected in the following format: { "pipeline_spec_uri": "<path-to-your-compiled-pipeline>", "parameter_values": { "greet_name": "<any-greet-string>" } } """ pipeline_spec_uri = payload_json['pipeline_spec_uri'] parameter_values = payload_json['parameter_values'] # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri aiplatform.init( project=PROJECT_ID, location=REGION, ) job = aiplatform.PipelineJob( display_name='hello-world-pipeline-cloud-function-invocation', template_path=pipeline_spec_uri, pipeline_root=PIPELINE_ROOT, enable_caching=False, parameter_values=parameter_values ) # Submit the PipelineJob job.submit()
Ganti kode berikut:
- PROJECT_ID: Project Google Cloud tempat pipeline ini berjalan.
- REGION: Region tempat pipeline ini berjalan.
- PIPELINE_ROOT: Tentukan Cloud Storage URI yang dapat diakses oleh akun layanan pipeline Anda. Artefak operasi pipeline Anda disimpan di root pipeline.
Dalam file
requirements.txt
, ganti konten dengan persyaratan paket berikut:google-api-python-client>=1.7.8,<2 google-cloud-aiplatform
Klik deploy untuk men-deploy Fungsi.
Langkah berikutnya
- Pelajari lebih lanjut Google Cloud Pub/Sub.
- Memvisualisasikan dan menganalisis hasil pipeline.
- Pelajari cara membuat pemicu di Cloud Run dari peristiwa Pub/Sub.
- Untuk melihat contoh kode penggunaan Pub/Sub, lihat browser contohGoogle Cloud .