Carregar dados do Cloud Storage para o BigQuery usando o Workflows

Last reviewed 2021-05-12 UTC

Neste tutorial, mostramos como executar fluxos de trabalho sem servidor de maneira confiável usando fluxos de trabalho, Cloud Functions e Firestore para carregar dados brutos, como registros de eventos, do Cloud Storage para o BigQuery. As plataformas de análise geralmente têm uma ferramenta de orquestração para carregar dados periodicamente no BigQuery usando jobs do BigQuery e transformar esses dados para fornecer métricas de negócios usando instruções SQL, incluindo instruções de linguagem processual do BigQuery. Este tutorial destina-se a desenvolvedores e arquitetos que querem criar pipelines de processamento de dados baseados em eventos sem servidor. Neste tutorial, presumimos que você esteja familiarizado com YAML, SQL e Python.

Arquitetura

O diagrama a seguir mostra a arquitetura de alto nível de um pipeline extrair, carregar e transformar (ELT) sem servidor usando o Workflows.

Extrair, carregar e transformar o pipeline.

No diagrama anterior, pense em uma plataforma de varejo que coleta periodicamente eventos de vendas como arquivos de várias lojas e, em seguida, grava os arquivos em um bucket do Cloud Storage. Os eventos são usados para fornecer métricas de negócios por meio da importação e do processamento no BigQuery. Essa arquitetura oferece um sistema de orquestração confiável e sem servidor para importar arquivos para o BigQuery e é dividida nos dois módulos a seguir:

  • Lista de arquivos: mantém a lista de arquivos não processados adicionados a um bucket do Cloud Storage em uma coleção do Firestore. Esse módulo funciona por meio de uma Função do Cloud acionada por um evento de armazenamento Object Finalize, que é gerado quando um novo arquivo é adicionado ao bucket do Cloud Storage. O nome do arquivo é anexado à matriz files da coleção chamada new no Firestore.
  • Workflows: executa os fluxos de trabalhos programados. O Cloud Scheduler aciona um fluxo de trabalho que executa uma série de etapas de acordo com uma sintaxe baseada em YAML para orquestrar o carregamento e transformar os dados no BigQuery chamando o Cloud Functions. As etapas no fluxo de trabalho chamam o Cloud Functions para executar as seguintes tarefas:

    • Crie e inicie um job de carregamento do BigQuery.
    • Pesquise o status do job de carregamento.
    • Crie e inicie o job de consulta de transformação.
    • Pesquise o status do job de transformação.

O uso de transações para manter a lista de novos arquivos no Firestore ajuda a garantir que nenhum arquivo seja perdido quando um fluxo de trabalho os importa para o BigQuery. As execuções separadas do fluxo de trabalho se tornam idempotentes armazenando os metadados e o status do job no Firestore.

Objetivos

  • Crie um banco de dados do Firestore.
  • Configure um gatilho do Cloud Functions para rastrear arquivos adicionados ao bucket do Cloud Storage no Firestore.
  • Implantar o Cloud Functions para executar e monitorar jobs do BigQuery.
  • Implante e execute um fluxo de trabalho para automatizar o processo.

Custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Ao concluir as tarefas descritas neste documento, é possível evitar o faturamento contínuo excluindo os recursos criados. Saiba mais em Limpeza.

Antes de começar

  1. No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  2. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  3. Ative as APIs Cloud Build, Cloud Functions, Identity and Access Management, Resource Manager, and Workflows.

    Ative as APIs

  4. Acesse a página de Boas-vindas e anote o ID do projeto para usar em uma etapa posterior.

    Acessar a Página de recepção

  5. No Console do Google Cloud, ative o Cloud Shell.

    Ativar o Cloud Shell

Prepare o ambiente

Para preparar o ambiente, crie um banco de dados do Firestore, clone os exemplos de código do repositório do GitHub, crie recursos usando o Terraform, edite o arquivo YAML do Workflows e instale os requisitos para o gerador de arquivos.

  1. Para criar um banco de dados do Firestore, faça o seguinte:

    1. No Console do Google Cloud, acesse a página Firestore.

      Acessar o Firestore

    2. Clique em Selecionar modo nativo.

    3. No menu Selecionar um local, selecione a região em que você quer hospedar o banco de dados do Firestore. Recomendamos escolher uma região próxima à sua localização física.

    4. Clique em Create database.

  2. No Cloud Shell, clone o repositório de origem:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/workflows-demos
    cd workflows-demos/workflows-bigquery-load
    
  3. No Cloud Shell, crie os seguintes recursos usando o Terraform:

    terraform init
    terraform apply \
        -var project_id=PROJECT_ID \
        -var region=REGION \
        -var zone=ZONE \
        --auto-approve
    

    Substitua:

    • PROJECT_ID: é o ID do projeto do Google Cloud.
    • REGION: um local geográfico específico do Google Cloud para hospedar seus recursos, por exemplo, us-central1
    • ZONE: um local dentro de uma região para hospedar seus recursos, por exemplo, us-central1-b

    Você verá uma mensagem semelhante a esta: Apply complete! Resources: 7 added, 0 changed, 1 destroyed.

    O Terraform pode ajudar você a criar, alterar e fazer upgrade da infraestrutura em escala com segurança e previsibilidade. Os seguintes recursos são criados no seu projeto:

    • Contas de serviço com os privilégios necessários para garantir o acesso seguro aos seus recursos.
    • Um conjunto de dados do BigQuery chamado serverless_elt_dataset e uma tabela chamada word_count para carregar os arquivos de entrada.
    • Um bucket do Cloud Storage chamado ${project_id}-ordersbucket para preparar arquivos de entrada.
    • As cinco Cloud Functions a seguir:
      • file_add_handler adiciona o nome dos arquivos adicionados ao bucket do Cloud Storage à coleção do Firestore.
      • create_job cria um novo job de carregamento do BigQuery e associa arquivos na coleção do Firebase ao job.
      • create_query cria um novo job de consulta do BigQuery.
      • poll_bigquery_job recebe o status de um job do BigQuery.
      • run_bigquery_job inicia um job do BigQuery
  4. Encontre os URLs do Cloud Functions create_job, create_query, poll_job e run_bigquery_job que você implantou na etapa anterior.

    gcloud functions describe create_job | grep url
    gcloud functions describe poll_bigquery_job | grep url
    gcloud functions describe run_bigquery_job | grep url
    gcloud functions describe create_query | grep url
    

    O resultado será assim:

    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/poll_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/run_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_query
    

    Anote esses URLs, porque eles serão necessários ao implantar o fluxo de trabalho.

Criar e implantar um fluxo de trabalho

  1. No Cloud Shell, abra o arquivo de origem do fluxo de trabalho, workflow.yaml:

    main:
      steps:
        - constants:
            assign:
              - create_job_url: CREATE_JOB_URL
              - poll_job_url: POLL_BIGQUERY_JOB_URL
              - run_job_url: RUN_BIGQUERY_JOB_URL
              - create_query_url: CREATE_QUERY_URL
              - region: BQ_REGION
              - table_name: BQ_DATASET_TABLE_NAME
            next: createJob
    
        - createJob:
            call: http.get
            args:
              url: ${create_job_url}
              auth:
                  type: OIDC
              query:
                  region: ${region}
                  table_name: ${table_name}
            result: job
            next: setJobId
    
        - setJobId:
            assign:
              - job_id: ${job.body.job_id}
            next: jobCreateCheck
    
        - jobCreateCheck:
            switch:
              - condition: ${job_id == Null}
                next: noOpJob
            next: runLoadJob
    
        - runLoadJob:
            call: runBigQueryJob
            args:
                job_id: ${job_id}
                run_job_url: ${run_job_url}
                poll_job_url: ${poll_job_url}
            result: jobStatus
            next: loadRunCheck
    
        - loadRunCheck:
            switch:
              - condition: ${jobStatus == 2}
                next: createQueryJob
            next: failedLoadJob
    
        - createQueryJob:
            call: http.get
            args:
              url: ${create_query_url}
              query:
                  qs: "select count(*) from serverless_elt_dataset.word_count"
                  region: "US"
              auth:
                  type: OIDC
            result: queryjob
            next: setQueryJobId
    
        - setQueryJobId:
            assign:
              - qid: ${queryjob.body.job_id}
            next: queryCreateCheck
    
        - queryCreateCheck:
            switch:
              - condition: ${qid == Null}
                next: failedQueryJob
            next: runQueryJob
    
        - runQueryJob:
            call: runBigQueryJob
            args:
              job_id: ${qid}
              run_job_url: ${run_job_url}
              poll_job_url: ${poll_job_url}
            result: queryJobState
            next: runQueryCheck
    
        - runQueryCheck:
            switch:
              - condition: ${queryJobState == 2}
                next: allDone
            next: failedQueryJob
    
        - noOpJob:
            return: "No files to import"
            next: end
    
        - allDone:
            return: "All done!"
            next: end
    
        - failedQueryJob:
            return: "Query job failed"
            next: end
    
        - failedLoadJob:
            return: "Load job failed"
            next: end
    
    runBigQueryJob:
      params: [job_id, run_job_url, poll_job_url]
      steps:
        - startBigQueryJob:
            try:
              call: http.get
              args:
                  url: ${run_job_url}
                  query:
                    job_id: ${job_id}
                  auth:
                    type: OIDC
                  timeout: 600
              result: submitJobState
            retry: ${http.default_retry}
            next: validateSubmit
    
        - validateSubmit:
            switch:
              - condition: ${submitJobState.body.status == 1}
                next: sleepAndPollLoad
            next: returnState
    
        - returnState:
            return: ${submitJobState.body.status}
    
        - sleepAndPollLoad:
            call: sys.sleep
            args:
              seconds: 5
            next: pollJob
    
        - pollJob:
            try:
              call: http.get
              args:
                url: ${poll_job_url}
                query:
                  job_id: ${job_id}
                auth:
                  type: OIDC
                timeout: 600
              result: pollJobState
            retry:
              predicate: ${http.default_retry_predicate}
              max_retries: 10
              backoff:
                initial_delay: 1
                max_delay: 60
                multiplier: 2
            next: stateCheck
    
        - stateCheck:
            switch:
              - condition: ${pollJobState.body.status == 2}
                return: ${pollJobState.body.status}
              - condition: ${pollJobState.body.status == 3}
                return: ${pollJobState.body.status}
            next: sleepAndPollLoad

    Substitua:

    • CREATE_JOB_URL: o URL da função para criar um novo job
    • POLL_BIGQUERY_JOB_URL: o URL da função para pesquisar o status de um job em execução
    • RUN_BIGQUERY_JOB_URL: o URL da função para iniciar um job de carga do BigQuery.
    • CREATE_QUERY_URL: o URL da função para iniciar um job de consulta do BigQuery.
    • BQ_REGION: a região do BigQuery em que os dados são armazenados, por exemplo, US
    • BQ_DATASET_TABLE_NAME: o nome da tabela do conjunto de dados do BigQuery no formato PROJECT_ID.serverless_elt_dataset.word_count
  2. Implante o arquivo workflow:

    gcloud workflows deploy WORKFLOW_NAME \
        --location=WORKFLOW_REGION \
        --description='WORKFLOW_DESCRIPTION' \
        --service-account=workflow-runner@PROJECT_ID.iam.gserviceaccount.com \
        --source=workflow.yaml
    

    Substitua:

    • WORKFLOW_NAME: o nome exclusivo do fluxo de trabalho.
    • WORKFLOW_REGION: a região em que o fluxo de trabalho é implantado, por exemplo, us-central1
    • WORKFLOW_DESCRIPTION: a descrição do fluxo de trabalho
  3. Crie um ambiente virtual em Python 3 e requisitos de instalação para o gerador de arquivos:

    sudo apt-get install -y python3-venv
    python3 -m venv env
    . env/bin/activate
    cd generator
    pip install -r requirements.txt
    

Gerar arquivos para importar

O script gen.py do Python gera conteúdo aleatório no formato Avro. O esquema é o mesmo da tabela word_count do BigQuery. Esses arquivos Avro são copiados para o bucket especificado do Cloud Storage.

No Cloud Shell, gere os arquivos:

python gen.py -p PROJECT_ID \
    -o PROJECT_ID-ordersbucket \
    -n RECORDS_PER_FILE \
    -f NUM_FILES \
    -x FILE_PREFIX

Substitua:

  • RECORDS_PER_FILE: o número de registros em um único arquivo
  • NUM_FILES: o número total de arquivos a serem enviados
  • FILE_PREFIX: o prefixo dos nomes dos arquivos gerados.

Ver entradas de arquivo no Firestore

Quando os arquivos são copiados para o Cloud Storage, a função do Cloud handle_new_file é acionada. Essa função adiciona a lista de arquivos à matriz da lista de arquivos no documento new na coleção jobs do Firestore.

Para visualizar a lista de arquivos, acesse a página Dados do Firestore no console do Google Cloud.

Ir para Dados

Lista de arquivos adicionados à coleção.

Acione o fluxo de trabalho

Os Workflows vinculam uma série de tarefas sem servidor a partir dos serviços do Google Cloud e da API. As etapas individuais neste fluxo de trabalho são executadas como Cloud Functions e o estado é armazenado no Firestore. Todas as chamadas para o Cloud Functions são autenticadas usando a conta de serviço do fluxo de trabalho.

No Cloud Shell, execute o fluxo de trabalho:

gcloud workflows execute WORKFLOW_NAME

No diagrama a seguir, mostramos as etapas usadas no fluxo de trabalho:

Etapas usadas no fluxo de trabalho principal e no subfluxo.

O fluxo de trabalho é dividido em duas partes: o principal e o sub-fluxo. O fluxo de trabalho principal processa a criação de jobs e a execução condicional enquanto o subfluxo de trabalho executa um job do BigQuery. O fluxo de trabalho executa as seguintes operações:

  • A função do Cloud create_job cria um novo objeto de job, recebe a lista de arquivos adicionados ao Cloud Storage do documento do Firestore e associa os arquivos ao job de carregamento. Se não houver arquivos para carregar, a função não criará um novo job.
  • A Função do Cloud create_query usa a consulta que precisa ser executada com a região do BigQuery em que a consulta será executada. A função cria o job no Firestore e retorna o ID dele.
  • A Função do Cloud run_bigquery_job recebe o ID do job que precisa ser executado e, em seguida, chama a API do BigQuery para enviar o job.
  • Em vez de esperar que o job seja concluído na Função do Cloud, pesquise periodicamente o status do job.
    • A Função do Cloud poll_bigquery_job fornece o status do job. É chamado repetidamente até que o job seja concluído.
    • Para adicionar um atraso entre as chamadas para a Função do Cloud poll_bigquery_job, uma rotina sleep é chamada no Workflows.

Ver o status do job

É possível conferir a lista de arquivos e o status do job.

  1. No console do Google Cloud, acesse a página Dados do Firestore.

    Ir para Dados

  2. Um identificador exclusivo (UUID) é gerado para cada job. Para visualizar job_type e status, clique no ID do job. Cada job pode ter um dos seguintes tipos e status:

    • job_type: o tipo de job que está sendo executado pelo fluxo de trabalho com um dos seguintes valores:

      • 0: carregar dados no BigQuery.
      • 1: execute uma consulta no BigQuery.
    • status: o estado atual do job com um dos seguintes valores:

      • 0: o job foi criado, mas não foi iniciado.
      • 1: o job está em execução.
      • 2: o job concluiu a execução com êxito.
      • 3: ocorreu um erro e o job não foi concluído com êxito.

    O objeto do job também contém atributos de metadados, como a região do conjunto de dados do BigQuery, o nome da tabela do BigQuery e, se for um job de consulta, a string de consulta em execução.

Lista de arquivos com status de trabalho destacados.

Visualizar dados no BigQuery

Para confirmar que o job de ELT foi bem-sucedido, verifique se os dados aparecem na tabela.

  1. No console do Google Cloud, acesse a página Editor do BigQuery.

    Acessar o Editor

  2. Clique na tabela serverless_elt_dataset.word_count.

  3. Clique na guia Visualizar.

    Guia de visualização mostrando dados na tabela.

Programe o fluxo de trabalho

Para executar periodicamente o fluxo de trabalho em uma programação, use o Cloud Scheduler.

Limpar

A maneira mais fácil de eliminar o faturamento é excluir o projeto do Google Cloud que você criou para o tutorial. A outra opção é excluir os recursos individuais.

Excluir recursos individuais

  1. No Cloud Shell, remova todos os recursos criados com o Terraform:

    cd $HOME/bigquery-workflows-load
    terraform destroy \
    -var project_id=PROJECT_ID \
    -var region=REGION \
    -var zone=ZONE \
    --auto-approve
    
  2. No console do Google Cloud, acesse a página Dados do Firestore.

    Ir para Dados

  3. Ao lado de Jobs, clique em Menu e selecione Excluir.

    Caminho do menu para excluir uma coleção.

Exclua o projeto

  1. No Console do Google Cloud, acesse a página Gerenciar recursos.

    Acessar "Gerenciar recursos"

  2. Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir .
  3. Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.

A seguir