Carregar dados do Cloud Storage para o BigQuery usando o Workflows

Last reviewed 2021-05-12 UTC

Neste tutorial, mostramos como executar fluxos de trabalho sem servidor de maneira confiável usando Fluxos de trabalho, Funções do Cloud Run, e Firestore para carregar dados brutos, como logs de eventos, Cloud Storage para BigQuery. As plataformas de análise geralmente têm uma ferramenta de orquestração para carregar dados periodicamente o BigQuery usando Jobs do BigQuery e, em seguida, transformar os dados para fornecer métricas de negócios usando SQL declarações, incluindo Instruções de linguagem procedural do BigQuery. Este tutorial destina-se a desenvolvedores e arquitetos que querem criar pipelines de processamento de dados baseados em eventos sem servidor. Neste tutorial, presumimos que você esteja familiarizado com YAML, SQL e Python.

Arquitetura

O diagrama a seguir mostra a arquitetura de alto nível de um pipeline extrair, carregar e transformar (ELT) sem servidor usando o Workflows.

Extrair, carregar e transformar o pipeline.

No diagrama anterior, pense em uma plataforma de varejo que coleta periodicamente eventos de vendas como arquivos de várias lojas e, em seguida, grava os arquivos em um bucket do Cloud Storage. Os eventos são usados para fornecer métricas de negócios por meio da importação e do processamento no BigQuery. Essa arquitetura oferece um sistema de orquestração confiável e sem servidor para importar arquivos para o BigQuery e é dividida nos dois módulos a seguir:

  • Lista de arquivos: mantém a lista de arquivos não processados adicionados a um bucket do Cloud Storage em uma coleção do Firestore. Esse módulo funciona por meio de uma função do Cloud Run acionada por um evento de armazenamento Object Finalize, que é gerado quando um novo arquivo é adicionado ao bucket do Cloud Storage. O nome do arquivo é anexado à matriz files da coleção chamada new no Firestore.
  • Workflows: executa os fluxos de trabalhos programados. O Cloud Scheduler aciona um fluxo de trabalho que executa uma série de etapas de acordo com uma sintaxe baseada em YAML para orquestrar o carregamento e transformar os dados no BigQuery chamando as funções do Cloud Run. As etapas no fluxo de trabalho chamam as funções do Cloud Run para executar o as seguintes tarefas:

    • Crie e inicie um job de carregamento do BigQuery.
    • Pesquise o status do job de carregamento.
    • Crie e inicie o job de consulta de transformação.
    • Pesquise o status do job de transformação.

O uso de transações para manter a lista de novos arquivos no Firestore ajuda a garantir que nenhum arquivo seja perdido quando um fluxo de trabalho os importa para o BigQuery. As execuções separadas do fluxo de trabalho se tornam idempotentes armazenando os metadados e o status do job no Firestore.

Objetivos

  • Crie um banco de dados do Firestore.
  • Configure um acionador de função do Cloud Run para rastrear arquivos adicionados ao bucket do Cloud Storage no Firestore.
  • Implante funções do Cloud Run para executar e monitorar jobs do BigQuery.
  • Implante e execute um fluxo de trabalho para automatizar o processo.

Custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Ao concluir as tarefas descritas neste documento, é possível evitar o faturamento contínuo excluindo os recursos criados. Saiba mais em Limpeza.

Antes de começar

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  3. Enable the Cloud Build, Cloud Run functions, Identity and Access Management, Resource Manager, and Workflows APIs.

    Enable the APIs

  4. Acesse a página Welcome e anote o ID do projeto para usar em uma etapa posterior.

    Acessar a página de recepção

  5. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

Prepare o ambiente

Para preparar seu ambiente, crie um banco de dados do Firestore, clone os exemplos de código do repositório do GitHub, crie recursos usando o Terraform, edite o arquivo YAML dos fluxos de trabalho e instale os requisitos para o gerador de arquivos.

  1. Para criar um banco de dados do Firestore, faça o seguinte:

    1. No Console do Google Cloud, acesse a página Firestore.

      Vá para o Firestore

    2. Clique em Selecionar modo nativo.

    3. No menu Selecionar um local, selecione a região em que você quer hospedar o banco de dados do Firestore. Recomendamos escolher uma região próxima à sua localização física.

    4. Clique em Create database.

  2. No Cloud Shell, clone o repositório de origem:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/workflows-demos
    cd workflows-demos/workflows-bigquery-load
    
  3. No Cloud Shell, crie os seguintes recursos usando o Terraform:

    terraform init
    terraform apply \
        -var project_id=PROJECT_ID \
        -var region=REGION \
        -var zone=ZONE \
        --auto-approve
    

    Substitua:

    • PROJECT_ID: é o ID do projeto do Google Cloud.
    • REGION: um local geográfico específico do Google Cloud para hospedar seus recursos, por exemplo, us-central1
    • ZONE: um local dentro de uma região para hospedar seus recursos, por exemplo, us-central1-b

    Você verá uma mensagem semelhante a esta: Apply complete! Resources: 7 added, 0 changed, 1 destroyed.

    O Terraform pode ajudar você a criar, alterar e fazer upgrade da infraestrutura em escala com segurança e previsibilidade. Os seguintes recursos são criados no seu projeto:

    • Contas de serviço com os privilégios necessários para garantir o acesso seguro aos recursos.
    • Um conjunto de dados do BigQuery chamado serverless_elt_dataset e uma tabela chamada word_count para carregar os arquivos de entrada.
    • Um bucket do Cloud Storage chamado ${project_id}-ordersbucket para preparar arquivos de entrada.
    • As cinco funções do Cloud Run a seguir:
      • file_add_handler adiciona o nome dos arquivos adicionados ao bucket do Cloud Storage à coleção do Firestore.
      • create_job cria um novo job de carregamento do BigQuery e associa arquivos na coleção do Firebase ao job.
      • create_query cria um novo job de consulta do BigQuery.
      • poll_bigquery_job recebe o status de um job do BigQuery.
      • run_bigquery_job inicia um job do BigQuery
  4. Encontre os URLs das funções do Cloud Run create_job, create_query, poll_job e run_bigquery_job que você implantou na etapa anterior.

    gcloud functions describe create_job | grep url
    gcloud functions describe poll_bigquery_job | grep url
    gcloud functions describe run_bigquery_job | grep url
    gcloud functions describe create_query | grep url
    

    O resultado será assim:

    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/poll_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/run_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_query
    

    Anote esses URLs, porque eles serão necessários ao implantar seu fluxo de trabalho.

Criar e implantar um fluxo de trabalho

  1. No Cloud Shell, abra o arquivo de origem do fluxo de trabalho workflow.yaml:

    main:
      steps:
        - constants:
            assign:
              - create_job_url: CREATE_JOB_URL
              - poll_job_url: POLL_BIGQUERY_JOB_URL
              - run_job_url: RUN_BIGQUERY_JOB_URL
              - create_query_url: CREATE_QUERY_URL
              - region: BQ_REGION
              - table_name: BQ_DATASET_TABLE_NAME
            next: createJob
    
        - createJob:
            call: http.get
            args:
              url: ${create_job_url}
              auth:
                  type: OIDC
              query:
                  region: ${region}
                  table_name: ${table_name}
            result: job
            next: setJobId
    
        - setJobId:
            assign:
              - job_id: ${job.body.job_id}
            next: jobCreateCheck
    
        - jobCreateCheck:
            switch:
              - condition: ${job_id == Null}
                next: noOpJob
            next: runLoadJob
    
        - runLoadJob:
            call: runBigQueryJob
            args:
                job_id: ${job_id}
                run_job_url: ${run_job_url}
                poll_job_url: ${poll_job_url}
            result: jobStatus
            next: loadRunCheck
    
        - loadRunCheck:
            switch:
              - condition: ${jobStatus == 2}
                next: createQueryJob
            next: failedLoadJob
    
        - createQueryJob:
            call: http.get
            args:
              url: ${create_query_url}
              query:
                  qs: "select count(*) from serverless_elt_dataset.word_count"
                  region: "US"
              auth:
                  type: OIDC
            result: queryjob
            next: setQueryJobId
    
        - setQueryJobId:
            assign:
              - qid: ${queryjob.body.job_id}
            next: queryCreateCheck
    
        - queryCreateCheck:
            switch:
              - condition: ${qid == Null}
                next: failedQueryJob
            next: runQueryJob
    
        - runQueryJob:
            call: runBigQueryJob
            args:
              job_id: ${qid}
              run_job_url: ${run_job_url}
              poll_job_url: ${poll_job_url}
            result: queryJobState
            next: runQueryCheck
    
        - runQueryCheck:
            switch:
              - condition: ${queryJobState == 2}
                next: allDone
            next: failedQueryJob
    
        - noOpJob:
            return: "No files to import"
            next: end
    
        - allDone:
            return: "All done!"
            next: end
    
        - failedQueryJob:
            return: "Query job failed"
            next: end
    
        - failedLoadJob:
            return: "Load job failed"
            next: end
    
    
    runBigQueryJob:
      params: [job_id, run_job_url, poll_job_url]
      steps:
        - startBigQueryJob:
            try:
              call: http.get
              args:
                  url: ${run_job_url}
                  query:
                    job_id: ${job_id}
                  auth:
                    type: OIDC
                  timeout: 600
              result: submitJobState
            retry: ${http.default_retry}
            next: validateSubmit
    
        - validateSubmit:
            switch:
              - condition: ${submitJobState.body.status == 1}
                next: sleepAndPollLoad
            next: returnState
    
        - returnState:
            return: ${submitJobState.body.status}
    
        - sleepAndPollLoad:
            call: sys.sleep
            args:
              seconds: 5
            next: pollJob
    
        - pollJob:
            try:
              call: http.get
              args:
                url: ${poll_job_url}
                query:
                  job_id: ${job_id}
                auth:
                  type: OIDC
                timeout: 600
              result: pollJobState
            retry:
              predicate: ${http.default_retry_predicate}
              max_retries: 10
              backoff:
                initial_delay: 1
                max_delay: 60
                multiplier: 2
            next: stateCheck
    
        - stateCheck:
            switch:
              - condition: ${pollJobState.body.status == 2}
                return: ${pollJobState.body.status}
              - condition: ${pollJobState.body.status == 3}
                return: ${pollJobState.body.status}
            next: sleepAndPollLoad

    Substitua:

    • CREATE_JOB_URL: o URL da função para criar um novo job
    • POLL_BIGQUERY_JOB_URL: o URL da função para pesquisar o status de um job em execução
    • RUN_BIGQUERY_JOB_URL: o URL da função para iniciar um job de carga do BigQuery.
    • CREATE_QUERY_URL: o URL da função para iniciar um job de consulta do BigQuery.
    • BQ_REGION: a região do BigQuery em que os dados são armazenados, por exemplo, US
    • BQ_DATASET_TABLE_NAME: o BigQuery nome da tabela do conjunto de dados no formato PROJECT_ID.serverless_elt_dataset.word_count
  2. Implante o arquivo workflow:

    gcloud workflows deploy WORKFLOW_NAME \
        --location=WORKFLOW_REGION \
        --description='WORKFLOW_DESCRIPTION' \
        --service-account=workflow-runner@PROJECT_ID.iam.gserviceaccount.com \
        --source=workflow.yaml
    

    Substitua:

    • WORKFLOW_NAME: o nome exclusivo do fluxo de trabalho.
    • WORKFLOW_REGION: a região em que o fluxo de trabalho é implantado. Por exemplo, us-central1
    • WORKFLOW_DESCRIPTION: a descrição do fluxo de trabalho
  3. Crie um ambiente virtual em Python 3 e requisitos de instalação para o gerador de arquivos:

    sudo apt-get install -y python3-venv
    python3 -m venv env
    . env/bin/activate
    cd generator
    pip install -r requirements.txt
    

Gerar arquivos para importar

O script Python gen.py gera conteúdo aleatório em Formato Avro. O esquema é igual à tabela word_count do BigQuery. Esses arquivos Avro são copiados no bucket especificado do Cloud Storage.

No Cloud Shell, gere os arquivos:

python gen.py -p PROJECT_ID \
    -o PROJECT_ID-ordersbucket \
    -n RECORDS_PER_FILE \
    -f NUM_FILES \
    -x FILE_PREFIX

Substitua:

  • RECORDS_PER_FILE: o número de registros em um único arquivo
  • NUM_FILES: o número total de arquivos a serem enviados
  • FILE_PREFIX: o prefixo dos nomes dos arquivos gerados.

Ver entradas de arquivo no Firestore

Quando os arquivos são copiados para o Cloud Storage, a função do Cloud Run handle_new_file é acionada. Essa função adiciona a lista de arquivos à matriz da lista de arquivos no documento new na coleção jobs do Firestore.

Para conferir a lista de arquivos, acesse o console do Google Cloud Página Dados do Firestore.

Ir para Dados

Lista de arquivos adicionados à coleção.

Acione o fluxo de trabalho

Os Workflows vinculam uma série de tarefas sem servidor a partir dos serviços do Google Cloud e da API. As etapas individuais neste fluxo de trabalho são executadas como funções do Cloud Run, e o estado é armazenado no Firestore. Todas as chamadas para as funções do Cloud Run são autenticadas usando a conta de serviço do fluxo de trabalho.

No Cloud Shell, execute o fluxo de trabalho:

gcloud workflows execute WORKFLOW_NAME

No diagrama a seguir, mostramos as etapas usadas no fluxo de trabalho:

Etapas usadas no fluxo de trabalho principal e no subfluxo.

O fluxo de trabalho é dividido em duas partes: o principal e o sub-fluxo. O fluxo de trabalho principal processa a criação de jobs e a execução condicional enquanto o subfluxo de trabalho executa um job do BigQuery. O fluxo de trabalho executa as seguintes operações:

  • A função create_job do Cloud Run cria um novo objeto de job. recebe a lista de arquivos adicionados ao Cloud Storage pelo documento do Firestore e associa os arquivos à carga trabalho. Se não houver arquivos para carregar, a função não criará um novo job.
  • A função create_query do Cloud Run usa a consulta que precisa ser executada com a região do BigQuery em que a consulta será executada. A função cria o job no Firestore e retorna o ID dele.
  • A função run_bigquery_job do Cloud Run recebe o ID que precisa ser executado e, em seguida, chama a API BigQuery para e enviar o job.
  • Em vez de esperar que o job seja concluído na função do Cloud Run, pesquise periodicamente o status do job.
    • A função poll_bigquery_job do Cloud Run fornece a status do trabalho. É chamado repetidamente até que o job seja concluído.
    • Para adicionar um atraso entre as chamadas para o poll_bigquery_job função do Cloud Run, Rotina sleep é chamado no Workflows.

Ver o status do job

É possível ver a lista de arquivos e o status do job.

  1. No console do Google Cloud, acesse a página Dados do Firestore.

    Ir para Dados

  2. Um identificador exclusivo (UUID) é gerado para cada job. Para visualizar job_type e status, clique no ID do job. Cada job pode ter um dos seguintes tipos e status:

    • job_type: o tipo de job que está sendo executado pelo fluxo de trabalho com um dos seguintes valores:

      • 0: carregar dados no BigQuery.
      • 1: execute uma consulta no BigQuery.
    • status: o estado atual do job com um dos seguintes valores:

      • 0: o job foi criado, mas não foi iniciado.
      • 1: o job está em execução.
      • 2: o job concluiu a execução com êxito.
      • 3: ocorreu um erro, e o job não foi concluído.

    O objeto do trabalho também contém atributos de metadados, como a região do conjunto de dados do BigQuery, e, no caso de um job de consulta, a string de consulta em execução.

Lista de arquivos com status de trabalho destacados.

Ver dados no BigQuery

Para confirmar se o job de ELT foi bem-sucedido, verifique se os dados aparecem no tabela.

  1. No console do Google Cloud, acesse a página Editor do BigQuery.

    Acessar o Editor

  2. Clique na tabela serverless_elt_dataset.word_count.

  3. Clique na guia Visualizar.

    Guia de visualização mostrando dados na tabela.

Programe o fluxo de trabalho

Para executar periodicamente o fluxo de trabalho em uma programação, use o Cloud Scheduler.

Limpar

A maneira mais fácil de eliminar o faturamento é excluir o projeto do Google Cloud que você criou para o tutorial. A outra opção é excluir os recursos individuais.

Excluir recursos individuais

  1. No Cloud Shell, remova todos os recursos criados com o Terraform:

    cd $HOME/bigquery-workflows-load
    terraform destroy \
    -var project_id=PROJECT_ID \
    -var region=REGION \
    -var zone=ZONE \
    --auto-approve
    
  2. No console do Google Cloud, acesse a página Dados do Firestore.

    Ir para Dados

  3. Ao lado de Jobs, clique em Menu e selecione Excluir.

    Caminho do menu para excluir uma coleção.

Exclua o projeto

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

A seguir