Nesta página, mostramos como escrever, implantar e acionar uma execução de pipeline usando uma função do Cloud orientada a eventos com um gatilho do Cloud Pub/Sub. Siga estas etapas:
Defina um pipeline de ML usando o SDK do Kubeflow Pipelines (KFP) e compile-o em um arquivo YAML.
Faça upload da definição do pipeline compilado para um bucket do Cloud Storage.
Use as funções do Cloud Run para criar, configurar e implantar uma função que é acionada por um tópico do Pub/Sub novo ou existente.
Definir e compilar um pipeline
Com o SDK do Kubeflow Pipelines, crie um pipeline programado e compile-o em um arquivo YAML.
Amostra hello-world-scheduled-pipeline:
from kfp import compiler
from kfp import dsl
# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
greeting_str = f'Hello, {message}'
print(greeting_str)
return greeting_str
# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
hello_world_task = hello_world(greet_name)
# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
package_path='hello_world_scheduled_pipeline.yaml')
Fazer upload do YAML do pipeline compilado no bucket do Cloud Storage
Abra o navegador do Cloud Storage no Google Cloud console.
Clique no bucket do Cloud Storage criado quando você configurou o projeto.
Usando uma pasta atual ou uma nova, faça upload do YAML do pipeline compilado (neste exemplo,
hello_world_scheduled_pipeline.yaml) para a pasta selecionada.Clique no arquivo YAML enviado para acessar os detalhes. Copie o URI da gsutil para uso posterior.
Criar uma função do Cloud Run com um gatilho do Pub/Sub
Acesse a página Cloud Run functions no console:
Clique no botão Criar função.
Na seção Noções básicas, atribua um nome à função (por exemplo,
my-scheduled-pipeline-function).Na seção Gatilho, selecione Cloud Pub/Sub como o tipo de gatilho.

Na lista Selecionar um tópico do Cloud Pub/Sub, clique em Criar um tópico.
Na caixa Criar um tópico, dê um nome ao novo tópico (por exemplo,
my-scheduled-pipeline-topic) e selecione Criar tópico.Deixe todos os outros campos como padrão e clique em Salvar para salvar a configuração da seção gatilho.
Deixe todos os outros campos como padrão e clique em Next para acessar a seção "Code".
Em Ambiente de execução, selecione Python 3.7.
Em Entry, insira "subscribe" (exemplo de nome da função do ponto de entrada do código).
Em Código-fonte, selecione Editor in-line, se essa opção ainda não estiver selecionada.
No arquivo
main.py, adicione o seguinte código:import base64 import json from google.cloud import aiplatform PROJECT_ID = 'your-project-id' # <---CHANGE THIS REGION = 'your-region' # <---CHANGE THIS PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS def subscribe(event, context): """Triggered from a message on a Cloud Pub/Sub topic. Args: event (dict): Event payload. context (google.cloud.functions.Context): Metadata for the event. """ # decode the event payload string payload_message = base64.b64decode(event['data']).decode('utf-8') # parse payload string into JSON object payload_json = json.loads(payload_message) # trigger pipeline run with payload trigger_pipeline_run(payload_json) def trigger_pipeline_run(payload_json): """Triggers a pipeline run Args: payload_json: expected in the following format: { "pipeline_spec_uri": "<path-to-your-compiled-pipeline>", "parameter_values": { "greet_name": "<any-greet-string>" } } """ pipeline_spec_uri = payload_json['pipeline_spec_uri'] parameter_values = payload_json['parameter_values'] # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri aiplatform.init( project=PROJECT_ID, location=REGION, ) job = aiplatform.PipelineJob( display_name='hello-world-pipeline-cloud-function-invocation', template_path=pipeline_spec_uri, pipeline_root=PIPELINE_ROOT, enable_caching=False, parameter_values=parameter_values ) # Submit the PipelineJob job.submit()Substitua:
- PROJECT_ID: o projeto Google Cloud em que este pipeline é executado.
- REGION: a região em que este pipeline é executado.
- PIPELINE_ROOT: especifique um URI do Cloud Storage que sua conta de serviço de pipelines possa acessar. Os artefatos das execuções de pipeline são armazenados na raiz do pipeline.
No arquivo
requirements.txt, substitua o conteúdo pelos seguintes requisitos de pacote:google-api-python-client>=1.7.8,<2 google-cloud-aiplatformClique em implantar para implantar a função.
A seguir
- Saiba mais sobre o Google Cloud Pub/Sub.
- Visualizar e analisar os resultados do pipeline.
- Saiba como criar gatilhos no Cloud Run com eventos do Pub/Sub.
- Para conferir exemplos de código de uso do Pub/Sub, consulte o navegador de exemplos doGoogle Cloud .