Esta página mostra-lhe como escrever, implementar e acionar uma execução de pipeline usando uma função do Cloud orientada por eventos com um acionador do Cloud Pub/Sub. Siga estes passos:
Defina um pipeline de ML com o SDK Kubeflow Pipelines (KFP) e compile-o num ficheiro YAML.
Carregue a definição da pipeline compilada para um contentor do Cloud Storage.
Use funções do Cloud Run para criar, configurar e implementar uma função que é acionada por um tópico do Pub/Sub novo ou existente.
Defina e compile um pipeline
Usando o SDK Kubeflow Pipelines, crie um pipeline agendado e compile-o num ficheiro YAML.
Exemplo hello-world-scheduled-pipeline
:
from kfp import compiler
from kfp import dsl
# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
greeting_str = f'Hello, {message}'
print(greeting_str)
return greeting_str
# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
hello_world_task = hello_world(greet_name)
# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
package_path='hello_world_scheduled_pipeline.yaml')
Carregue o YAML do pipeline compilado para o contentor do Cloud Storage
Abra o navegador do Cloud Storage na Google Cloud consola
.Clique no contentor do Cloud Storage que criou quando configurou o projeto.
Usando uma pasta existente ou uma nova pasta, carregue o YAML da pipeline compilado (neste exemplo,
hello_world_scheduled_pipeline.yaml
) para a pasta selecionada.Clique no ficheiro YAML carregado para aceder aos detalhes. Copie o URI gsutil para utilização posterior.
Crie funções do Cloud Run com um acionador do Pub/Sub
Visite a página de funções do Cloud Run na consola.
Clique no botão Criar função.
Na secção Básico, atribua um nome à função (por exemplo,
my-scheduled-pipeline-function
).Na secção Acionador, selecione Cloud Pub/Sub como o tipo de acionador.
Na lista Selecionar um tópico do Cloud Pub/Sub, clique em Criar um tópico.
Na caixa Criar um tópico, atribua um nome ao novo tópico (por exemplo,
my-scheduled-pipeline-topic
) e selecione Criar tópico.Deixe todos os outros campos como predefinição e clique em Guardar para guardar a configuração da secção Acionador.
Deixe todos os outros campos como predefinição e clique em Seguinte para avançar para a secção Código.
Em Tempo de execução, selecione Python 3.7.
Em Ponto de entrada, introduza "subscribe" (o nome da função do ponto de entrada do código de exemplo).
Em Código fonte, selecione Editor inline, se ainda não estiver selecionado.
No ficheiro
main.py
, adicione o seguinte código:import base64 import json from google.cloud import aiplatform PROJECT_ID = 'your-project-id' # <---CHANGE THIS REGION = 'your-region' # <---CHANGE THIS PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS def subscribe(event, context): """Triggered from a message on a Cloud Pub/Sub topic. Args: event (dict): Event payload. context (google.cloud.functions.Context): Metadata for the event. """ # decode the event payload string payload_message = base64.b64decode(event['data']).decode('utf-8') # parse payload string into JSON object payload_json = json.loads(payload_message) # trigger pipeline run with payload trigger_pipeline_run(payload_json) def trigger_pipeline_run(payload_json): """Triggers a pipeline run Args: payload_json: expected in the following format: { "pipeline_spec_uri": "<path-to-your-compiled-pipeline>", "parameter_values": { "greet_name": "<any-greet-string>" } } """ pipeline_spec_uri = payload_json['pipeline_spec_uri'] parameter_values = payload_json['parameter_values'] # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri aiplatform.init( project=PROJECT_ID, location=REGION, ) job = aiplatform.PipelineJob( display_name='hello-world-pipeline-cloud-function-invocation', template_path=pipeline_spec_uri, pipeline_root=PIPELINE_ROOT, enable_caching=False, parameter_values=parameter_values ) # Submit the PipelineJob job.submit()
Substitua o seguinte:
- PROJECT_ID: O Google Cloud projeto no qual esta pipeline é executada.
- REGION: a região em que este pipeline é executado.
- PIPELINE_ROOT: especifique um URI do Cloud Storage ao qual a conta de serviço dos seus pipelines pode aceder. Os artefactos das execuções do pipeline são armazenados na raiz do pipeline.
No ficheiro
requirements.txt
, substitua o conteúdo pelos seguintes requisitos do pacote:google-api-python-client>=1.7.8,<2 google-cloud-aiplatform
Clique em Implementar para implementar a função.
O que se segue?
- Saiba mais sobre o Google Cloud Pub/Sub.
- Visualize e analise os resultados do pipeline.
- Saiba como criar acionadores no Cloud Run a partir de eventos do Pub/Sub.
- Para ver exemplos de código para usar o Pub/Sub, consulte o Google Cloud navegador de exemplos.