Acione uma execução de pipeline com o Pub/Sub

Esta página mostra-lhe como escrever, implementar e acionar uma execução de pipeline usando uma função do Cloud orientada por eventos com um acionador do Cloud Pub/Sub. Siga estes passos:

  1. Defina um pipeline de ML com o SDK Kubeflow Pipelines (KFP) e compile-o num ficheiro YAML.

  2. Carregue a definição da pipeline compilada para um contentor do Cloud Storage.

  3. Use funções do Cloud Run para criar, configurar e implementar uma função que é acionada por um tópico do Pub/Sub novo ou existente.

Defina e compile um pipeline

Usando o SDK Kubeflow Pipelines, crie um pipeline agendado e compile-o num ficheiro YAML.

Exemplo hello-world-scheduled-pipeline:

from kfp import compiler
from kfp import dsl

# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
    greeting_str = f'Hello, {message}'
    print(greeting_str)
    return greeting_str

# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
    name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
    hello_world_task = hello_world(greet_name)

# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
                            package_path='hello_world_scheduled_pipeline.yaml')

Carregue o YAML do pipeline compilado para o contentor do Cloud Storage

  1. Abra o navegador do Cloud Storage na Google Cloud consola
    .

    Navegador do Cloud Storage

  2. Clique no contentor do Cloud Storage que criou quando configurou o projeto.

  3. Usando uma pasta existente ou uma nova pasta, carregue o YAML da pipeline compilado (neste exemplo, hello_world_scheduled_pipeline.yaml) para a pasta selecionada.

  4. Clique no ficheiro YAML carregado para aceder aos detalhes. Copie o URI gsutil para utilização posterior.

Crie funções do Cloud Run com um acionador do Pub/Sub

  1. Visite a página de funções do Cloud Run na consola.

    Aceda à página de funções do Cloud Run

  2. Clique no botão Criar função.

  3. Na secção Básico, atribua um nome à função (por exemplo, my-scheduled-pipeline-function).

  4. Na secção Acionador, selecione Cloud Pub/Sub como o tipo de acionador.

    create function configuration choose pubsub as Trigger type image

  5. Na lista Selecionar um tópico do Cloud Pub/Sub, clique em Criar um tópico.

  6. Na caixa Criar um tópico, atribua um nome ao novo tópico (por exemplo, my-scheduled-pipeline-topic) e selecione Criar tópico.

  7. Deixe todos os outros campos como predefinição e clique em Guardar para guardar a configuração da secção Acionador.

  8. Deixe todos os outros campos como predefinição e clique em Seguinte para avançar para a secção Código.

  9. Em Tempo de execução, selecione Python 3.7.

  10. Em Ponto de entrada, introduza "subscribe" (o nome da função do ponto de entrada do código de exemplo).

  11. Em Código fonte, selecione Editor inline, se ainda não estiver selecionado.

  12. No ficheiro main.py, adicione o seguinte código:

      import base64
      import json
      from google.cloud import aiplatform
    
      PROJECT_ID = 'your-project-id'                     # <---CHANGE THIS
      REGION = 'your-region'                             # <---CHANGE THIS
      PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS
    
      def subscribe(event, context):
        """Triggered from a message on a Cloud Pub/Sub topic.
        Args:
              event (dict): Event payload.
              context (google.cloud.functions.Context): Metadata for the event.
        """
        # decode the event payload string
        payload_message = base64.b64decode(event['data']).decode('utf-8')
        # parse payload string into JSON object
        payload_json = json.loads(payload_message)
        # trigger pipeline run with payload
        trigger_pipeline_run(payload_json)
    
      def trigger_pipeline_run(payload_json):
        """Triggers a pipeline run
        Args:
              payload_json: expected in the following format:
                {
                  "pipeline_spec_uri": "<path-to-your-compiled-pipeline>",
                  "parameter_values": {
                    "greet_name": "<any-greet-string>"
                  }
                }
        """
        pipeline_spec_uri = payload_json['pipeline_spec_uri']
        parameter_values = payload_json['parameter_values']
    
        # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri
        aiplatform.init(
            project=PROJECT_ID,
            location=REGION,
        )
        job = aiplatform.PipelineJob(
            display_name='hello-world-pipeline-cloud-function-invocation',
            template_path=pipeline_spec_uri,
            pipeline_root=PIPELINE_ROOT,
            enable_caching=False,
            parameter_values=parameter_values
        )
    
        # Submit the PipelineJob
        job.submit()
    

    Substitua o seguinte:

    • PROJECT_ID: O Google Cloud projeto no qual esta pipeline é executada.
    • REGION: a região em que este pipeline é executado.
    • PIPELINE_ROOT: especifique um URI do Cloud Storage ao qual a conta de serviço dos seus pipelines pode aceder. Os artefactos das execuções do pipeline são armazenados na raiz do pipeline.
  13. No ficheiro requirements.txt, substitua o conteúdo pelos seguintes requisitos do pacote:

    google-api-python-client>=1.7.8,<2
    google-cloud-aiplatform
    
  14. Clique em Implementar para implementar a função.

O que se segue?