Memicu operasi pipeline dengan Cloud Pub/Sub

Contoh kode berikut menunjukkan cara menulis, men-deploy, dan memicu pipeline menggunakan Cloud Function Berbasis Peristiwa dengan pemicu Cloud Pub/Sub.

Membuat dan mengompilasi Pipeline sederhana

Dengan menggunakan Kubeflow Pipelines SDK, bangun pipeline terjadwal dan kompilasi menjadi file YAML.

Contoh hello-world-scheduled-pipeline:

from kfp import compiler
from kfp import dsl

# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
    greeting_str = f'Hello, {message}'
    print(greeting_str)
    return greeting_str

# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
    name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
    hello_world_task = hello_world(greet_name)

# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
                            package_path='hello_world_scheduled_pipeline.yaml')

Mengupload YAML pipeline yang dikompilasi ke bucket Cloud Storage

  1. Buka browser Cloud Storage di konsol Google Cloud.

    Browser Cloud Storage

  2. Klik bucket Cloud Storage yang Anda buat saat mengonfigurasi project.

  3. Dengan menggunakan folder yang sudah ada atau folder baru, upload YAML pipeline yang telah dikompilasi (dalam contoh ini hello_world_scheduled_pipeline.yaml) ke folder yang dipilih.

  4. Klik file YAML yang diupload untuk mengakses detailnya. Salin gsutil URI untuk digunakan nanti.

Membuat Cloud Function dengan Pemicu Pub/Sub

  1. Buka halaman fungsi Cloud Run di konsol.

    Buka halaman fungsi Cloud Run

  2. Klik tombol Buat fungsi.

  3. Di bagian Dasar, beri nama fungsi Anda (misalnya my-scheduled-pipeline-function).

  4. Di bagian Pemicu, pilih Cloud Pub/Sub sebagai jenis Pemicu.

    Buat konfigurasi fungsi, pilih pubsub sebagai gambar jenis Pemicu

  5. Pada dropdown Pilih topik Cloud Pub/Sub, klik Buat topik.

  6. Di kotak Buat topik, beri nama topik baru Anda (misalnya my-scheduled-pipeline-topic), lalu pilih Buat topik.

  7. Biarkan semua kolom lain dalam setelan default, lalu klik Simpan untuk menyimpan konfigurasi bagian Pemicu.

  8. Biarkan semua kolom lainnya sesuai setelan default, lalu klik Berikutnya untuk melanjutkan ke bagian Kode.

  9. Di bagian Runtime, pilih Python 3.7.

  10. Di titik Entri, masukkan "subscribe" (contoh nama fungsi titik entri kode).

  11. Pada Kode sumber, pilih Editor Inline jika belum dipilih.

  12. Di file main.py, tambahkan kode berikut:

      import base64
      import json
      from google.cloud import aiplatform
    
      PROJECT_ID = 'your-project-id'                     # <---CHANGE THIS
      REGION = 'your-region'                             # <---CHANGE THIS
      PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS
    
      def subscribe(event, context):
        """Triggered from a message on a Cloud Pub/Sub topic.
        Args:
              event (dict): Event payload.
              context (google.cloud.functions.Context): Metadata for the event.
        """
        # decode the event payload string
        payload_message = base64.b64decode(event['data']).decode('utf-8')
        # parse payload string into JSON object
        payload_json = json.loads(payload_message)
        # trigger pipeline run with payload
        trigger_pipeline_run(payload_json)
    
      def trigger_pipeline_run(payload_json):
        """Triggers a pipeline run
        Args:
              payload_json: expected in the following format:
                {
                  "pipeline_spec_uri": "<path-to-your-compiled-pipeline>",
                  "parameter_values": {
                    "greet_name": "<any-greet-string>"
                  }
                }
        """
        pipeline_spec_uri = payload_json['pipeline_spec_uri']
        parameter_values = payload_json['parameter_values']
    
        # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri
        aiplatform.init(
            project=PROJECT_ID,
            location=REGION,
        )
        job = aiplatform.PipelineJob(
            display_name='hello-world-pipeline-cloud-function-invocation',
            template_path=pipeline_spec_uri,
            pipeline_root=PIPELINE_ROOT,
            enable_caching=False,
            parameter_values=parameter_values
        )
    
        # Submit the PipelineJob
        job.submit()
    

    Ganti kode berikut:

    • PROJECT_ID: Project Google Cloud tempat pipeline ini berjalan.
    • REGION: Region tempat pipeline ini berjalan.
    • PIPELINE_ROOT: Tentukan Cloud Storage URI yang dapat diakses oleh akun layanan pipeline Anda. Artefak operasi pipeline Anda disimpan di root pipeline.
  13. Dalam file requirements.txt, ganti konten dengan persyaratan paket berikut:

    google-api-python-client>=1.7.8,<2
    google-cloud-aiplatform
    
  14. Klik deploy untuk men-deploy Fungsi.