Carregue dados do Cloud Storage para o BigQuery através dos Workflows

Last reviewed 2021-05-12 UTC

Este tutorial mostra como executar fluxos de trabalho sem servidor de forma fiável usando o Workflows, funções do Cloud Run> e Firestore para carregar dados não processados, como registos de eventos, do Cloud Storage para o BigQuery. As plataformas de análise têm normalmente uma ferramenta de orquestração para carregar periodicamente dados no BigQuery através de tarefas do BigQuery e, em seguida, transformar os dados para fornecer métricas empresariais através de declarações SQL, incluindo declarações de linguagem processual do BigQuery. Este tutorial destina-se a programadores e arquitetos que querem criar pipelines de processamento de dados sem servidor orientados por eventos. O tutorial pressupõe que tem conhecimentos de YAML, SQL e Python.

Arquitetura

O diagrama seguinte mostra a arquitetura de alto nível de um pipeline de extração, carregamento e transformação (ELT) sem servidor através dos Workflows.

Pipeline de extração, carregamento e transformação.

No diagrama anterior, considere uma plataforma de retalho que recolhe periodicamente eventos de vendas como ficheiros de várias lojas e, em seguida, escreve os ficheiros num contentor do Cloud Storage. Os eventos são usados para fornecer métricas empresariais através da importação e do processamento no BigQuery. Esta arquitetura oferece um sistema de orquestração fiável e sem servidor para importar os seus ficheiros para o BigQuery e está dividida nos dois módulos seguintes:

  • Lista de ficheiros: mantém a lista de ficheiros não processados adicionados a um contentor do Cloud Storage numa coleção do Firestore. Este módulo funciona através de uma função do Cloud Run acionada por um evento de armazenamento Object Finalize, que é gerado quando um novo ficheiro é adicionado ao contentor do Cloud Storage. O nome do ficheiro é anexado à matriz da coleção denominada new no Firestore.files
  • Fluxo de trabalho: executa os fluxos de trabalho agendados. O Cloud Scheduler aciona um fluxo de trabalho que executa uma série de passos de acordo com uma sintaxe baseada em YAML para orquestrar o carregamento e, em seguida, transformar os dados no BigQuery chamando funções do Cloud Run. Os passos no fluxo de trabalho chamam funções do Cloud Run para executar as seguintes tarefas:

    • Crie e inicie uma tarefa de carregamento do BigQuery.
    • Verifique o estado da tarefa de carregamento.
    • Crie e inicie a tarefa de consulta de transformação.
    • Verifique o estado da tarefa de transformação.

A utilização de transações para manter a lista de novos ficheiros no Firestore ajuda a garantir que nenhum ficheiro é ignorado quando um fluxo de trabalho os importa para o BigQuery. As execuções separadas do fluxo de trabalho são tornadas idempotentes através do armazenamento do estado e dos metadados das tarefas no Firestore.

Objetivos

  • Crie uma base de dados do Firestore.
  • Configure um acionador de função do Cloud Run para acompanhar os ficheiros adicionados ao contentor do Cloud Storage no Firestore.
  • Implemente funções do Cloud Run para executar e monitorizar tarefas do BigQuery.
  • Implemente e execute um fluxo de trabalho para automatizar o processo.

Custos

Neste documento, usa os seguintes componentes faturáveis do Google Cloud:

Para gerar uma estimativa de custos com base na sua utilização projetada, use a calculadora de preços.

Os novos Google Cloud utilizadores podem ser elegíveis para uma avaliação gratuita.

Quando terminar as tarefas descritas neste documento, pode evitar a faturação contínua eliminando os recursos que criou. Para mais informações, consulte o artigo Limpe.

Antes de começar

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Verify that billing is enabled for your Google Cloud project.

  3. Enable the Cloud Build, Cloud Run functions, Identity and Access Management, Resource Manager, and Workflows APIs.

    Enable the APIs

  4. Aceda à página Boas-vindas e tome nota do ID do projeto para usar num passo posterior.

    Aceder à página de boas-vindas

  5. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    Prepare o seu ambiente

    Para preparar o seu ambiente, crie uma base de dados do Firestore, clone os exemplos de código do repositório do GitHub, crie recursos com o Terraform, edite o ficheiro YAML dos fluxos de trabalho e instale os requisitos para o gerador de ficheiros.

    1. Para criar uma base de dados do Firestore, faça o seguinte:

      1. Na Google Cloud consola, aceda à página do Firestore.

        Aceder ao Firestore

      2. Clique em Selecionar modo nativo.

      3. No menu Selecionar uma localização, selecione a região onde quer alojar a base de dados do Firestore. Recomendamos que escolha uma região próxima da sua localização física.

      4. Clique em Criar base de dados.

    2. No Cloud Shell, clone o repositório de origem:

      cd $HOME && git clone https://github.com/GoogleCloudPlatform/workflows-demos
      cd workflows-demos/workflows-bigquery-load
      
    3. No Cloud Shell, crie os seguintes recursos com o Terraform:

      terraform init
      terraform apply \
          -var project_id=PROJECT_ID \
          -var region=REGION \
          -var zone=ZONE \
          --auto-approve
      

      Substitua o seguinte:

      • PROJECT_ID: o ID do seu Google Cloud projeto
      • REGION: uma localização geográfica Google Cloud específica para alojar os seus recursos, por exemplo, us-central1
      • ZONE: uma localização numa região para alojar os seus recursos, por exemplo, us-central1-b

      Deverá ver uma mensagem semelhante à seguinte: Apply complete! Resources: 7 added, 0 changed, 1 destroyed.

      O Terraform pode ajudar a criar, alterar e atualizar a infraestrutura em grande escala de forma segura e previsível. Os seguintes recursos são criados no seu projeto:

      • Contas de serviço com os privilégios necessários para garantir o acesso seguro aos seus recursos.
      • Um conjunto de dados do BigQuery denominado serverless_elt_dataset e uma tabela denominada word_count para carregar os ficheiros recebidos.
      • Um contentor do Cloud Storage com o nome ${project_id}-ordersbucket para preparar ficheiros de entrada.
      • As cinco funções do Cloud Run seguintes:
        • O file_add_handler adiciona o nome dos ficheiros que são adicionados ao contentor do Cloud Storage à coleção do Firestore.
        • create_job cria uma nova tarefa de carregamento do BigQuery e associa ficheiros na coleção do Firebase à tarefa.
        • create_query cria uma nova tarefa de consulta do BigQuery.
        • poll_bigquery_job recebe o estado de uma tarefa do BigQuery.
        • run_bigquery_job inicia uma tarefa do BigQuery.
    4. Obtenha os URLs das funções do create_job, create_query, poll_job e run_bigquery_job do Cloud Run que implementou no passo anterior.

      gcloud functions describe create_job | grep url
      gcloud functions describe poll_bigquery_job | grep url
      gcloud functions describe run_bigquery_job | grep url
      gcloud functions describe create_query | grep url
      

      O resultado é semelhante ao seguinte:

      url: https://REGION-PROJECT_ID.cloudfunctions.net/create_job
      url: https://REGION-PROJECT_ID.cloudfunctions.net/poll_bigquery_job
      url: https://REGION-PROJECT_ID.cloudfunctions.net/run_bigquery_job
      url: https://REGION-PROJECT_ID.cloudfunctions.net/create_query
      

      Tome nota destes URLs, uma vez que são necessários quando implementar o fluxo de trabalho.

    Crie e implemente um fluxo de trabalho

    1. No Cloud Shell, abra o ficheiro de origem do fluxo de trabalho, workflow.yaml:

      main:
        steps:
          - constants:
              assign:
                - create_job_url: CREATE_JOB_URL
                - poll_job_url: POLL_BIGQUERY_JOB_URL
                - run_job_url: RUN_BIGQUERY_JOB_URL
                - create_query_url: CREATE_QUERY_URL
                - region: BQ_REGION
                - table_name: BQ_DATASET_TABLE_NAME
              next: createJob
      
          - createJob:
              call: http.get
              args:
                url: ${create_job_url}
                auth:
                    type: OIDC
                query:
                    region: ${region}
                    table_name: ${table_name}
              result: job
              next: setJobId
      
          - setJobId:
              assign:
                - job_id: ${job.body.job_id}
              next: jobCreateCheck
      
          - jobCreateCheck:
              switch:
                - condition: ${job_id == Null}
                  next: noOpJob
              next: runLoadJob
      
          - runLoadJob:
              call: runBigQueryJob
              args:
                  job_id: ${job_id}
                  run_job_url: ${run_job_url}
                  poll_job_url: ${poll_job_url}
              result: jobStatus
              next: loadRunCheck
      
          - loadRunCheck:
              switch:
                - condition: ${jobStatus == 2}
                  next: createQueryJob
              next: failedLoadJob
      
          - createQueryJob:
              call: http.get
              args:
                url: ${create_query_url}
                query:
                    qs: "select count(*) from serverless_elt_dataset.word_count"
                    region: "US"
                auth:
                    type: OIDC
              result: queryjob
              next: setQueryJobId
      
          - setQueryJobId:
              assign:
                - qid: ${queryjob.body.job_id}
              next: queryCreateCheck
      
          - queryCreateCheck:
              switch:
                - condition: ${qid == Null}
                  next: failedQueryJob
              next: runQueryJob
      
          - runQueryJob:
              call: runBigQueryJob
              args:
                job_id: ${qid}
                run_job_url: ${run_job_url}
                poll_job_url: ${poll_job_url}
              result: queryJobState
              next: runQueryCheck
      
          - runQueryCheck:
              switch:
                - condition: ${queryJobState == 2}
                  next: allDone
              next: failedQueryJob
      
          - noOpJob:
              return: "No files to import"
              next: end
      
          - allDone:
              return: "All done!"
              next: end
      
          - failedQueryJob:
              return: "Query job failed"
              next: end
      
          - failedLoadJob:
              return: "Load job failed"
              next: end
      
      
      runBigQueryJob:
        params: [job_id, run_job_url, poll_job_url]
        steps:
          - startBigQueryJob:
              try:
                call: http.get
                args:
                    url: ${run_job_url}
                    query:
                      job_id: ${job_id}
                    auth:
                      type: OIDC
                    timeout: 600
                result: submitJobState
              retry: ${http.default_retry}
              next: validateSubmit
      
          - validateSubmit:
              switch:
                - condition: ${submitJobState.body.status == 1}
                  next: sleepAndPollLoad
              next: returnState
      
          - returnState:
              return: ${submitJobState.body.status}
      
          - sleepAndPollLoad:
              call: sys.sleep
              args:
                seconds: 5
              next: pollJob
      
          - pollJob:
              try:
                call: http.get
                args:
                  url: ${poll_job_url}
                  query:
                    job_id: ${job_id}
                  auth:
                    type: OIDC
                  timeout: 600
                result: pollJobState
              retry:
                predicate: ${http.default_retry_predicate}
                max_retries: 10
                backoff:
                  initial_delay: 1
                  max_delay: 60
                  multiplier: 2
              next: stateCheck
      
          - stateCheck:
              switch:
                - condition: ${pollJobState.body.status == 2}
                  return: ${pollJobState.body.status}
                - condition: ${pollJobState.body.status == 3}
                  return: ${pollJobState.body.status}
              next: sleepAndPollLoad

      Substitua o seguinte:

      • CREATE_JOB_URL: o URL da função para criar uma nova tarefa
      • POLL_BIGQUERY_JOB_URL: o URL da função para sondar o estado de uma tarefa em execução
      • RUN_BIGQUERY_JOB_URL: o URL da função para iniciar uma tarefa de carregamento do BigQuery
      • CREATE_QUERY_URL: o URL da função para iniciar uma tarefa de consulta do BigQuery
      • BQ_REGION: a região do BigQuery onde os dados são armazenados, por exemplo, US
      • BQ_DATASET_TABLE_NAME: o nome da tabela do conjunto de dados do BigQuery no formato PROJECT_ID.serverless_elt_dataset.word_count
    2. Implemente o ficheiro workflow:

      gcloud workflows deploy WORKFLOW_NAME \
          --location=WORKFLOW_REGION \
          --description='WORKFLOW_DESCRIPTION' \
          --service-account=workflow-runner@PROJECT_ID.iam.gserviceaccount.com \
          --source=workflow.yaml
      

      Substitua o seguinte:

      • WORKFLOW_NAME: o nome exclusivo do fluxo de trabalho
      • WORKFLOW_REGION: a região na qual o fluxo de trabalho é implementado, por exemplo, us-central1
      • WORKFLOW_DESCRIPTION: a descrição do fluxo de trabalho
    3. Crie um ambiente virtual do Python 3 e instale os requisitos para o gerador de ficheiros:

      sudo apt-get install -y python3-venv
      python3 -m venv env
      . env/bin/activate
      cd generator
      pip install -r requirements.txt
      

    Gere ficheiros para importar

    O script gen.pyPython gera conteúdo aleatório no formato Avro. O esquema é igual ao da tabela word_count do BigQuery. Estes ficheiros Avro são copiados para o contentor do Cloud Storage especificado.

    No Cloud Shell, gere os ficheiros:

    python gen.py -p PROJECT_ID \
        -o PROJECT_ID-ordersbucket \
        -n RECORDS_PER_FILE \
        -f NUM_FILES \
        -x FILE_PREFIX
    

    Substitua o seguinte:

    • RECORDS_PER_FILE: o número de registos num único ficheiro
    • NUM_FILES: o número total de ficheiros a carregar
    • FILE_PREFIX: o prefixo dos nomes dos ficheiros gerados

    Veja entradas de ficheiros no Firestore

    Quando os ficheiros são copiados para o Cloud Storage, a função do Cloud Run é acionada.handle_new_file Esta função adiciona a lista de ficheiros à matriz de listas de ficheiros no documento new na coleção jobs do Firestore.

    Para ver a lista de ficheiros, na Google Cloud consola, aceda à página Dados do Firestore.

    Aceda a Dados

    Lista de ficheiros adicionados à coleção.

    Acione o fluxo de trabalho

    Os fluxos de trabalho associam uma série de tarefas sem servidor a partir dos serviços Google Cloud e API. Os passos individuais neste fluxo de trabalho são executados como funções do Cloud Run e o estado é armazenado no Firestore. Todas as chamadas para as funções do Cloud Run são autenticadas através da conta de serviço do fluxo de trabalho.

    No Cloud Shell, execute o fluxo de trabalho:

    gcloud workflows execute WORKFLOW_NAME
    

    O diagrama seguinte mostra os passos usados no fluxo de trabalho:

    Passos usados no fluxo de trabalho principal e secundário.

    O fluxo de trabalho está dividido em duas partes: o fluxo de trabalho principal e o fluxo de trabalho secundário. O fluxo de trabalho principal processa a criação de tarefas e a execução condicional, enquanto o subfluxo de trabalho executa uma tarefa do BigQuery. O fluxo de trabalho realiza as seguintes operações:

    • A função do create_jobCloud Run cria um novo objeto de tarefa, obtém a lista de ficheiros adicionados ao Cloud Storage a partir do documento do Firestore e associa os ficheiros à tarefa de carregamento. Se não existirem ficheiros para carregar, a função não cria uma nova tarefa.
    • A função do create_query Cloud Run recebe a consulta que tem de ser executada, juntamente com a região do BigQuery na qual a consulta deve ser executada. A função cria a tarefa no Firestore e devolve o ID da tarefa.
    • A função do run_bigquery_jobCloud Run recebe o ID da tarefa que tem de ser executada e, em seguida, chama a API BigQuery para enviar a tarefa.
    • Em vez de aguardar a conclusão da tarefa na função do Cloud Run, pode sondar periodicamente o estado da tarefa.
      • A função poll_bigquery_jobCloud Run fornece o estado da tarefa. É chamado repetidamente até que a tarefa seja concluída.
      • Para adicionar um atraso entre as chamadas à função do Cloud Run, é chamada uma sleeprotina a partir dos fluxos de trabalho.poll_bigquery_job

    Veja o estado da tarefa

    Pode ver a lista de ficheiros e o estado da tarefa.

    1. NaGoogle Cloud consola, aceda à página Dados do Firestore.

      Aceda a Dados

    2. É gerado um identificador exclusivo (UUID) para cada tarefa. Para ver o job_type e o status, clique no ID da tarefa. Cada tarefa pode ter um dos seguintes tipos e estados:

      • job_type: o tipo de tarefa que está a ser executada pelo fluxo de trabalho com um dos seguintes valores:

        • 0: Carregue dados para o BigQuery.
        • 1: execute uma consulta no BigQuery.
      • status: o estado atual da tarefa com um dos seguintes valores:

        • 0: a tarefa foi criada, mas não iniciada.
        • 1: O trabalho está em execução.
        • 2: O trabalho concluiu a execução com êxito.
        • 3: Ocorreu um erro e a tarefa não foi concluída com êxito.

      O objeto de tarefa também contém atributos de metadados, como a região do conjunto de dados do BigQuery, o nome da tabela do BigQuery e, se for uma tarefa de consulta, a string de consulta que está a ser executada.

    Lista de ficheiros com o estado da tarefa realçado.

    Ver dados no BigQuery

    Para confirmar que a tarefa de ELT foi bem-sucedida, verifique se os dados aparecem na tabela.

    1. Na Google Cloud consola, aceda à página do editor do BigQuery.

      Aceder ao Editor

    2. Clique na tabela serverless_elt_dataset.word_count.

    3. Clique no separador Pré-visualizar.

      Separador de pré-visualização que mostra os dados na tabela.

    Agende o fluxo de trabalho

    Para executar periodicamente o fluxo de trabalho de acordo com uma programação, pode usar o Cloud Scheduler.

    Limpar

    A forma mais fácil de eliminar a faturação é eliminar o Google Cloud projeto que criou para o tutorial. Em alternativa, pode eliminar os recursos individuais.

    Elimine os recursos individuais

    1. No Cloud Shell, remova todos os recursos criados com o Terraform:

      cd $HOME/bigquery-workflows-load
      terraform destroy \
      -var project_id=PROJECT_ID \
      -var region=REGION \
      -var zone=ZONE \
      --auto-approve
      
    2. Na Google Cloud consola, aceda à página Dados do Firestore.

      Aceda a Dados

    3. Junto a Empregos, clique em Menu e selecione Eliminar.

      Caminho do menu para eliminar uma coleção.

    Elimine o projeto

    1. In the Google Cloud console, go to the Manage resources page.

      Go to Manage resources

    2. In the project list, select the project that you want to delete, and then click Delete.
    3. In the dialog, type the project ID, and then click Shut down to delete the project.

    O que se segue?