Memicu proses pipeline dengan Pub/Sub

Halaman ini menunjukkan cara menulis, men-deploy, dan memicu proses pipeline menggunakan Cloud Function Berbasis Peristiwa dengan pemicu Cloud Pub/Sub. Ikuti langkah-langkah berikut:

  1. Tentukan pipeline ML menggunakan Kubeflow Pipelines (KFP) SDK dan kompilasi menjadi file YAML.

  2. Upload definisi pipeline yang dikompilasi ke bucket Cloud Storage.

  3. Gunakan fungsi Cloud Run untuk membuat, mengonfigurasi, dan men-deploy fungsi yang dipicu oleh topik Pub/Sub baru atau yang sudah ada.

Menentukan dan mengompilasi pipeline

Dengan menggunakan Kubeflow Pipelines SDK, bangun pipeline terjadwal dan kompilasi menjadi file YAML.

Contoh hello-world-scheduled-pipeline:

from kfp import compiler
from kfp import dsl

# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
    greeting_str = f'Hello, {message}'
    print(greeting_str)
    return greeting_str

# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
    name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
    hello_world_task = hello_world(greet_name)

# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
                            package_path='hello_world_scheduled_pipeline.yaml')

Mengupload YAML pipeline yang dikompilasi ke bucket Cloud Storage

  1. Buka browser Cloud Storage di Google Cloud konsol.

    Browser Cloud Storage

  2. Klik bucket Cloud Storage yang Anda buat saat mengonfigurasi project.

  3. Dengan menggunakan folder yang sudah ada atau folder baru, upload YAML pipeline yang telah dikompilasi (dalam contoh ini hello_world_scheduled_pipeline.yaml) ke folder yang dipilih.

  4. Klik file YAML yang diupload untuk mengakses detailnya. Salin gsutil URI untuk digunakan nanti.

Membuat fungsi Cloud Run dengan pemicu Pub/Sub

  1. Buka halaman fungsi Cloud Run di konsol.

    Buka halaman fungsi Cloud Run

  2. Klik tombol Buat fungsi.

  3. Di bagian Dasar, beri nama fungsi Anda (misalnya my-scheduled-pipeline-function).

  4. Di bagian Pemicu, pilih Cloud Pub/Sub sebagai jenis Pemicu.

    Buat konfigurasi fungsi, pilih pubsub sebagai gambar jenis Pemicu

  5. Pada daftar Pilih topik Cloud Pub/Sub, klik Buat topik.

  6. Di kotak Buat topik, beri nama topik baru Anda (misalnya my-scheduled-pipeline-topic), lalu pilih Buat topik.

  7. Biarkan semua kolom lain dalam setelan default, lalu klik Simpan untuk menyimpan konfigurasi bagian Pemicu.

  8. Biarkan semua kolom lainnya sesuai setelan default, lalu klik Berikutnya untuk melanjutkan ke bagian Kode.

  9. Di bagian Runtime, pilih Python 3.7.

  10. Di titik Entri, masukkan "subscribe" (contoh nama fungsi titik entri kode).

  11. Pada Kode sumber, pilih Editor Inline jika belum dipilih.

  12. Di file main.py, tambahkan kode berikut:

      import base64
      import json
      from google.cloud import aiplatform
    
      PROJECT_ID = 'your-project-id'                     # <---CHANGE THIS
      REGION = 'your-region'                             # <---CHANGE THIS
      PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS
    
      def subscribe(event, context):
        """Triggered from a message on a Cloud Pub/Sub topic.
        Args:
              event (dict): Event payload.
              context (google.cloud.functions.Context): Metadata for the event.
        """
        # decode the event payload string
        payload_message = base64.b64decode(event['data']).decode('utf-8')
        # parse payload string into JSON object
        payload_json = json.loads(payload_message)
        # trigger pipeline run with payload
        trigger_pipeline_run(payload_json)
    
      def trigger_pipeline_run(payload_json):
        """Triggers a pipeline run
        Args:
              payload_json: expected in the following format:
                {
                  "pipeline_spec_uri": "<path-to-your-compiled-pipeline>",
                  "parameter_values": {
                    "greet_name": "<any-greet-string>"
                  }
                }
        """
        pipeline_spec_uri = payload_json['pipeline_spec_uri']
        parameter_values = payload_json['parameter_values']
    
        # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri
        aiplatform.init(
            project=PROJECT_ID,
            location=REGION,
        )
        job = aiplatform.PipelineJob(
            display_name='hello-world-pipeline-cloud-function-invocation',
            template_path=pipeline_spec_uri,
            pipeline_root=PIPELINE_ROOT,
            enable_caching=False,
            parameter_values=parameter_values
        )
    
        # Submit the PipelineJob
        job.submit()
    

    Ganti kode berikut:

    • PROJECT_ID: Project Google Cloud tempat pipeline ini berjalan.
    • REGION: Region tempat pipeline ini berjalan.
    • PIPELINE_ROOT: Tentukan Cloud Storage URI yang dapat diakses oleh akun layanan pipeline Anda. Artefak operasi pipeline Anda disimpan di root pipeline.
  13. Dalam file requirements.txt, ganti konten dengan persyaratan paket berikut:

    google-api-python-client>=1.7.8,<2
    google-cloud-aiplatform
    
  14. Klik deploy untuk men-deploy Fungsi.

Langkah berikutnya