Programar execução de pipeline com o Cloud Scheduler

É possível programar a execução de um pipeline pré-compilado usando o Cloud Scheduler por meio da função de nuvem orientada a eventos com um gatilho HTTP.

Criar e compilar um pipeline simples

Com o SDK do Kubeflow Pipelines, crie um pipeline programado e compile-o em um arquivo JSON.

Amostra hello-world-scheduled-pipeline:

import json
from kfp.v2 import compiler
from kfp.v2 import dsl
from kfp.v2.dsl import component

# A simple component that prints and returns a greeting string
@component
def hello_world(message: str) -> str:
    greeting_str = f'Hello, {message}'
    print(greeting_str)
    return greeting_str

# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
    name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
    hello_world_task = hello_world(greet_name)

# Compile the pipeline and generate a JSON file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
                            package_path='hello_world_scheduled_pipeline.json')

Fazer upload do JSON do pipeline compilado no bucket do Cloud Storage

  1. Abra o navegador do Cloud Storage no Console do Google Cloud.

    Navegador do Cloud Storage

  2. Clique no bucket do Cloud Storage criado quando você configurou o projeto.

  3. Usando uma pasta atual ou uma nova, faça upload do JSON do pipeline compilado (neste exemplo, hello_world_scheduled_pipeline.json) para a pasta selecionada.

  4. Clique no arquivo JSON enviado para acessar os detalhes. Copie o URI da gsutil para uso posterior.

Criar um Cloud Function com gatilho HTTP

  1. Acesse a página Cloud Functions no console.

    Acessar a página do Cloud Functions

  2. Clique no botão Criar função.

  3. Na seção Noções básicas, atribua um nome à função (por exemplo, hello-world-scheduled-pipeline-function).

  4. Na seção Gatilho, selecione HTTP como o tipo de gatilho.

    criar configuração da função, escolher HTTP como imagem do tipo gatilho

  5. Anote o URL gerado para esse gatilho e salve-o para uso na próxima seção.

  6. Deixe todos os outros campos como padrão e clique em Salvar para salvar a configuração da seção gatilho.

  7. Deixe todos os outros campos como padrão e clique em Next para acessar a seção "Code".

  8. Em Ambiente de execução, selecione Python 3.7.

  9. No ponto de entrada, insira process_request (o nome da função do ponto de entrada do código de exemplo).

  10. Em Código-fonte, selecione Editor in-line, se essa opção ainda não estiver selecionada.

  11. No arquivo main.py, adicione o seguinte código:

    import json
    from google.cloud import aiplatform
    
    PROJECT_ID = 'your-project-id'         # <---CHANGE THIS
    REGION = 'your-region'                 # <---CHANGE THIS
    PIPELINE_ROOT = 'your-cloud-storage-pipeline-root'   # <---CHANGE THIS
    
    def process_request(request):
       """Processes the incoming HTTP request.
    
       Args:
         request (flask.Request): HTTP request object.
    
       Returns:
         The response text or any set of values that can be turned into a Response
         object using `make_response
         <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
       """
    
       # decode http request payload and translate into JSON object
       request_str = request.data.decode('utf-8')
       request_json = json.loads(request_str)
    
       pipeline_spec_uri = request_json['pipeline_spec_uri']
       parameter_values = request_json['parameter_values']
    
       aiplatform.init(
           project=PROJECT_ID,
           location=REGION,
       )
    
       job = aiplatform.PipelineJob(
           display_name=f'hello-world-cloud-function-pipeline',
           template_path=pipeline_spec_uri,
           pipeline_root=PIPELINE_ROOT,
           enable_caching=False,
           parameter_values=parameter_values
       )
    
       job.submit()
       return "Job submitted"
    

    Substitua:

    • PROJECT_ID: o projeto do Google Cloud em que este pipeline é executado.
    • REGION: a região em que este pipeline é executado.
    • PIPELINE_ROOT: especifique um URI do Cloud Storage que sua conta de serviço de pipelines possa acessar. Os artefatos das execuções de pipeline são armazenados na raiz do pipeline.
  12. No arquivo requirements.txt, substitua o conteúdo pelos seguintes requisitos do pacote:

    google-api-python-client>=1.7.8,<2
    google-cloud-aiplatform
    
  13. Clique em implantar para implantar a função.

Criar um job do Cloud Scheduler

  1. Acesse a página Cloud Scheduler no Console.

    Acessar a página do Cloud Scheduler

  2. Clique no botão Criar job.

  3. Dê um nome ao job (por exemplo, my-hello-world-schedule) e, se quiser, adicione uma descrição.

  4. Especifique a frequência do job usando o formato unix-cron. Exemplo:

    0 9 * * 1

    Este exemplo de programação do cron é executado toda segunda-feira às 9h. Para mais informações, consulte Configurar programações de cron job.

  5. Selecione seu fuso horário.

  6. Clique em Continuar para configurar a execução.

  7. Selecione HTTP no menu "Tipo de destino".

  8. No campo URL, insira o URL do gatilho que você salvou na seção anterior.

  9. Selecione POST como método HTTP, caso ainda não esteja selecionado.

  10. Na caixa Corpo, insira uma string JSON no formato definido no código. No exemplo da seção anterior:

     {
       "pipeline_spec_uri": "<path-to-your-compiled-pipeline>",
       "parameter_values": {
         "greet_name": "<any-greet-string>"
       }
     }
    
    

    Configure a seção &quot;Execuções&quot; para criar uma imagem da caixa de job

  11. Na lista suspensa do cabeçalho Auth, selecione Add OIDC token.

  12. Na lista suspensa, selecione uma conta de serviço que tenha permissão para invocar o Cloud Function. Use a conta de serviço padrão do Compute Engine para este tutorial.

  13. Mantenha os outros campos como padrão e clique em Criar.

Executar seu job manualmente (opcional)

Também é possível executar o job que você acabou de criar para verificar a funcionalidade.

  1. Abra a página do console do Cloud Scheduler.

  2. Clique no botão Executar agora ao lado do nome do job.

    A primeira execução do primeiro job criado em um projeto pode demorar alguns minutos devido à configuração necessária.

  3. Você pode ver o status do job na coluna Resultado.