Pipeline mit Cloud Pub/Sub auslösen

Die folgenden Codebeispiele zeigen, wie Sie eine Pipeline mit einer ereignisgesteuerten Cloud Functions-Funktion und einem Cloud Pub/Sub-Trigger schreiben, bereitstellen und auslösen.

Einfache Pipeline erstellen und kompilieren

Mit dem Kubeflow Pipelines SDK eine geplante Pipeline erstellen und in eine YAML-Datei kompilieren.

Beispiel hello-world-scheduled-pipeline:

from kfp import compiler
from kfp import dsl

# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
    greeting_str = f'Hello, {message}'
    print(greeting_str)
    return greeting_str

# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
    name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
    hello_world_task = hello_world(greet_name)

# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
                            package_path='hello_world_scheduled_pipeline.yaml')

Kompilierte YAML-Pipeline in den Cloud Storage-Bucket hochladen

  1. Öffnen Sie den Cloud Storage-Browser in der Google Cloud Console.

    Cloud Storage-Browser

  2. Klicken Sie auf den Cloud Storage-Bucket, den Sie bei der Konfiguration Ihres Projekts erstellt haben.

  3. Laden Sie mithilfe eines vorhandenen Ordners oder eines neuen Ordners Ihre kompilierte YAML-Pipeline (in diesem Beispiel hello_world_scheduled_pipeline.yaml) in den ausgewählten Ordner hoch.

  4. Klicken Sie auf die hochgeladene YAML-Datei, um die Details aufzurufen. Kopieren Sie den gsutil-URI zur späteren Verwendung.

Cloud Functions-Funktion mit Pub/Sub-Trigger erstellen

  1. Gehen Sie in der Konsole zur Seite Cloud Run-Funktionen:

    Zur Übersicht "Cloud Run-Funktionen"

  2. Klicken Sie auf Funktion erstellen.

  3. Geben Sie der Funktion im Abschnitt Grundlagen einen Namen (z. B. my-scheduled-pipeline-function).

  4. Wählen Sie im Abschnitt Trigger Cloud Pub/Sub als Trigger-Typ aus.

    Bild: Funktionskonfiguration – pubsub als Trigger-Typ-Image auswählen

  5. Klicken Sie im Drop-down-Menü Cloud Pub/Sub-Thema auswählen auf Thema erstellen.

  6. Geben Sie im Feld Thema erstellen einen Namen für das neue Thema ein (z. B. my-scheduled-pipeline-topic) und wählen Sie Thema erstellen aus.

  7. Übernehmen Sie für alle anderen Felder die Standardeinstellungen und klicken Sie auf Speichern, um die Konfiguration des Triggerabschnitts zu speichern.

  8. Übernehmen Sie für alle anderen Felder die Standardwerte und klicken Sie auf Weiter, um zum Abschnitt „Code“ zu gelangen.

  9. Wählen Sie unter Laufzeit die Option Python 3.7 aus.

  10. Geben Sie unter Einstiegspunkt „subscribe“ ein. Dies ist der Name der Beispielfunktion für den Codepunkt des Eintrags.

  11. Wählen Sie unter Quellcode Inline-Editor aus, falls er noch nicht ausgewählt ist.

  12. Fügen Sie in der Datei main.py den folgenden Code hinzu:

      import base64
      import json
      from google.cloud import aiplatform
    
      PROJECT_ID = 'your-project-id'                     # <---CHANGE THIS
      REGION = 'your-region'                             # <---CHANGE THIS
      PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS
    
      def subscribe(event, context):
        """Triggered from a message on a Cloud Pub/Sub topic.
        Args:
              event (dict): Event payload.
              context (google.cloud.functions.Context): Metadata for the event.
        """
        # decode the event payload string
        payload_message = base64.b64decode(event['data']).decode('utf-8')
        # parse payload string into JSON object
        payload_json = json.loads(payload_message)
        # trigger pipeline run with payload
        trigger_pipeline_run(payload_json)
    
      def trigger_pipeline_run(payload_json):
        """Triggers a pipeline run
        Args:
              payload_json: expected in the following format:
                {
                  "pipeline_spec_uri": "<path-to-your-compiled-pipeline>",
                  "parameter_values": {
                    "greet_name": "<any-greet-string>"
                  }
                }
        """
        pipeline_spec_uri = payload_json['pipeline_spec_uri']
        parameter_values = payload_json['parameter_values']
    
        # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri
        aiplatform.init(
            project=PROJECT_ID,
            location=REGION,
        )
        job = aiplatform.PipelineJob(
            display_name='hello-world-pipeline-cloud-function-invocation',
            template_path=pipeline_spec_uri,
            pipeline_root=PIPELINE_ROOT,
            enable_caching=False,
            parameter_values=parameter_values
        )
    
        # Submit the PipelineJob
        job.submit()
    

    Dabei gilt:

    • PROJECT_ID: Das Google Cloud-Projekt, in dem diese Pipeline ausgeführt wird.
    • REGION: Die Region, in der diese Pipeline ausgeführt wird.
    • PIPELINE_ROOT: Geben Sie einen Cloud Storage-URI an, auf den das Pipelines-Dienstkonto zugreifen kann. Die Artefakte Ihrer Pipelineausführungen werden im Pipeline-Stammverzeichnis gespeichert.
  13. Ersetzen Sie in der Datei requirements.txt den Inhalt durch die folgenden Paketanforderungen:

    google-api-python-client>=1.7.8,<2
    google-cloud-aiplatform
    
  14. Klicken Sie auf Bereitstellen, um die Funktion bereitzustellen.