Os exemplos de código a seguir mostram como gravar, implantar e acionar um pipeline usando um gatilho baseado em eventos no Cloud Functions com um gatilho do Cloud Pub/Sub.
Criar e compilar um pipeline simples
Com o SDK do Kubeflow Pipelines, crie um pipeline programado e compile-o em um arquivo YAML.
Amostra hello-world-scheduled-pipeline
:
from kfp import compiler
from kfp import dsl
# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
greeting_str = f'Hello, {message}'
print(greeting_str)
return greeting_str
# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
hello_world_task = hello_world(greet_name)
# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
package_path='hello_world_scheduled_pipeline.yaml')
Fazer upload do YAML do pipeline compilado no bucket do Cloud Storage
Abra o navegador do Cloud Storage no Console do Google Cloud.
Clique no bucket do Cloud Storage criado quando você configurou o projeto.
Usando uma pasta atual ou uma nova, faça upload do YAML do pipeline compilado (neste exemplo,
hello_world_scheduled_pipeline.yaml
) para a pasta selecionada.Clique no arquivo YAML enviado para acessar os detalhes. Copie o URI da gsutil para uso posterior.
Criar um Cloud Function com gatilho do Pub/Sub
Acesse a página Cloud Run functions no console:
Clique no botão Criar função.
Na seção Noções básicas, atribua um nome à função (por exemplo,
my-scheduled-pipeline-function
).Na seção Gatilho, selecione Cloud Pub/Sub como o tipo de gatilho.
Na lista suspensa Selecionar um tópico do Cloud Pub/Sub, clique em Criar um tópico.
Na caixa Criar um tópico, dê um nome ao novo tópico (por exemplo,
my-scheduled-pipeline-topic
) e selecione Criar tópico.Deixe todos os outros campos como padrão e clique em Salvar para salvar a configuração da seção gatilho.
Deixe todos os outros campos como padrão e clique em Next para acessar a seção "Code".
Em Ambiente de execução, selecione Python 3.7.
Em Entry, insira "subscribe" (exemplo de nome da função do ponto de entrada do código).
Em Código-fonte, selecione Editor in-line, se essa opção ainda não estiver selecionada.
No arquivo
main.py
, adicione o seguinte código:import base64 import json from google.cloud import aiplatform PROJECT_ID = 'your-project-id' # <---CHANGE THIS REGION = 'your-region' # <---CHANGE THIS PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS def subscribe(event, context): """Triggered from a message on a Cloud Pub/Sub topic. Args: event (dict): Event payload. context (google.cloud.functions.Context): Metadata for the event. """ # decode the event payload string payload_message = base64.b64decode(event['data']).decode('utf-8') # parse payload string into JSON object payload_json = json.loads(payload_message) # trigger pipeline run with payload trigger_pipeline_run(payload_json) def trigger_pipeline_run(payload_json): """Triggers a pipeline run Args: payload_json: expected in the following format: { "pipeline_spec_uri": "<path-to-your-compiled-pipeline>", "parameter_values": { "greet_name": "<any-greet-string>" } } """ pipeline_spec_uri = payload_json['pipeline_spec_uri'] parameter_values = payload_json['parameter_values'] # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri aiplatform.init( project=PROJECT_ID, location=REGION, ) job = aiplatform.PipelineJob( display_name='hello-world-pipeline-cloud-function-invocation', template_path=pipeline_spec_uri, pipeline_root=PIPELINE_ROOT, enable_caching=False, parameter_values=parameter_values ) # Submit the PipelineJob job.submit()
Substitua:
- PROJECT_ID: o projeto do Google Cloud em que este pipeline é executado.
- REGION: a região em que este pipeline é executado.
- PIPELINE_ROOT: especifique um URI do Cloud Storage que sua conta de serviço de pipelines possa acessar. Os artefatos das execuções de pipeline são armazenados na raiz do pipeline.
No arquivo
requirements.txt
, substitua o conteúdo pelos seguintes requisitos de pacote:google-api-python-client>=1.7.8,<2 google-cloud-aiplatform
Clique em implantar para implantar a função.