Cargar datos de Cloud Storage en BigQuery con Workflows

Last reviewed 2021-05-12 UTC

En este tutorial se muestra cómo ejecutar flujos de trabajo sin servidor de forma fiable mediante Workflows, Cloud Run functions y Firestore para cargar datos sin procesar, como registros de eventos, desde Cloud Storage hasta BigQuery. Las plataformas de analíticas suelen tener una herramienta de orquestación para cargar periódicamente datos en BigQuery mediante trabajos de BigQuery y, a continuación, transformar los datos para proporcionar métricas empresariales mediante instrucciones SQL, incluidas las instrucciones del lenguaje de procedimiento de BigQuery. Este tutorial está dirigido a desarrolladores y arquitectos que quieran crear flujos de procesamiento de datos basados en eventos sin servidor. En este tutorial se da por supuesto que tienes conocimientos de YAML, SQL y Python.

Arquitectura

En el siguiente diagrama se muestra la arquitectura general de una canalización de extracción, carga y transformación (ELT) sin servidor que usa Workflows.

Proceso de extracción, carga y transformación.

En el diagrama anterior, se muestra una plataforma de comercio que recoge periódicamente eventos de ventas como archivos de varias tiendas y, a continuación, escribe los archivos en un segmento de Cloud Storage. Los eventos se usan para proporcionar métricas empresariales importándolos y procesándolos en BigQuery. Esta arquitectura proporciona un sistema de orquestación fiable y sin servidor para importar tus archivos a BigQuery, y se divide en los dos módulos siguientes:

  • Lista de archivos: mantiene la lista de archivos sin procesar añadidos a un segmento de Cloud Storage en una colección de Firestore. Este módulo funciona a través de una función de Cloud Run que se activa mediante un evento de almacenamiento Object Finalize, que se genera cuando se añade un archivo nuevo al segmento de Cloud Storage. El nombre de archivo se añade a la files matriz de la colección llamada new en Firestore.
  • Flujo de trabajo: ejecuta los flujos de trabajo programados. Cloud Scheduler activa un flujo de trabajo que ejecuta una serie de pasos según una sintaxis basada en YAML para orquestar la carga y, a continuación, transformar los datos en BigQuery llamando a funciones de Cloud Run. Los pasos del flujo de trabajo llaman a funciones de Cloud Run para ejecutar las siguientes tareas:

    • Crea e inicia una tarea de carga de BigQuery.
    • Sondea el estado de la tarea de carga.
    • Crea e inicia la tarea de consulta de transformación.
    • Sondea el estado del trabajo de transformación.

Usar transacciones para mantener la lista de archivos nuevos en Firestore ayuda a asegurar que no se pierda ningún archivo cuando un flujo de trabajo los importe a BigQuery. Las ejecuciones independientes del flujo de trabajo se hacen idempotentes almacenando los metadatos y el estado de los trabajos en Firestore.

Objetivos

  • Crea una base de datos de Firestore.
  • Configura un activador de función de Cloud Run para monitorizar los archivos añadidos al segmento de Cloud Storage en Firestore.
  • Despliega funciones de Cloud Run para ejecutar y monitorizar tareas de BigQuery.
  • Implementa y ejecuta un flujo de trabajo para automatizar el proceso.

Costes

En este documento, se utilizan los siguientes componentes facturables de Google Cloud:

Para generar una estimación de costes basada en el uso previsto, utiliza la calculadora de precios.

Los usuarios nuevos Google Cloud pueden disfrutar de una prueba gratuita.

Cuando termines las tareas que se describen en este documento, puedes evitar que se te siga facturando eliminando los recursos que has creado. Para obtener más información, consulta la sección Limpiar.

Antes de empezar

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Verify that billing is enabled for your Google Cloud project.

  3. Enable the Cloud Build, Cloud Run functions, Identity and Access Management, Resource Manager, and Workflows APIs.

    Enable the APIs

  4. Ve a la página Bienvenido y anota el ID del proyecto para usarlo en un paso posterior.

    Ir a la página de bienvenida

  5. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    Prepara tu entorno

    Para preparar tu entorno, crea una base de datos de Firestore, clona las muestras de código del repositorio de GitHub, crea recursos con Terraform, edita el archivo YAML de Workflows e instala los requisitos del generador de archivos.

    1. Para crear una base de datos de Firestore, sigue estos pasos:

      1. En la Google Cloud consola, ve a la página de Firestore.

        Ir a Firestore

      2. Haz clic en Seleccionar modo nativo.

      3. En el menú Selecciona una ubicación, elige la región en la que quieras alojar la base de datos de Firestore. Te recomendamos que elijas una región que esté cerca de tu ubicación física.

      4. Haz clic en Crear base de datos.

    2. En Cloud Shell, clona el repositorio de origen:

      cd $HOME && git clone https://github.com/GoogleCloudPlatform/workflows-demos
      cd workflows-demos/workflows-bigquery-load
      
    3. En Cloud Shell, crea los siguientes recursos con Terraform:

      terraform init
      terraform apply \
          -var project_id=PROJECT_ID \
          -var region=REGION \
          -var zone=ZONE \
          --auto-approve
      

      Haz los cambios siguientes:

      • PROJECT_ID: tu ID de proyecto Google Cloud
      • REGION: una ubicación geográfica Google Cloud específica para alojar tus recursos, como us-central1
      • ZONE: una ubicación dentro de una región para alojar tus recursos, por ejemplo, us-central1-b

      Debería ver un mensaje similar al siguiente: Apply complete! Resources: 7 added, 0 changed, 1 destroyed.

      Terraform puede ayudarte a crear, cambiar y actualizar la infraestructura a gran escala de forma segura y predecible. En tu proyecto se crearán los siguientes recursos:

      • Cuentas de servicio con los privilegios necesarios para asegurar el acceso a tus recursos.
      • Un conjunto de datos de BigQuery llamado serverless_elt_dataset y una tabla llamada word_count para cargar los archivos entrantes.
      • Un segmento de Cloud Storage llamado ${project_id}-ordersbucket para los archivos de entrada de staging.
      • Las cinco funciones de Cloud Run siguientes:
        • file_add_handler añade el nombre de los archivos que se han añadido al segmento de Cloud Storage a la colección de Firestore.
        • create_job crea un trabajo de carga de BigQuery y asocia los archivos de la colección de Firebase con el trabajo.
        • create_query crea una tarea de consulta de BigQuery.
        • poll_bigquery_job obtiene el estado de una tarea de BigQuery.
        • run_bigquery_job inicia una tarea de BigQuery.
    4. Obtén las URLs de las funciones de create_job, create_query, poll_job y run_bigquery_job de Cloud Run que has implementado en el paso anterior.

      gcloud functions describe create_job | grep url
      gcloud functions describe poll_bigquery_job | grep url
      gcloud functions describe run_bigquery_job | grep url
      gcloud functions describe create_query | grep url
      

      El resultado debería ser similar al siguiente:

      url: https://REGION-PROJECT_ID.cloudfunctions.net/create_job
      url: https://REGION-PROJECT_ID.cloudfunctions.net/poll_bigquery_job
      url: https://REGION-PROJECT_ID.cloudfunctions.net/run_bigquery_job
      url: https://REGION-PROJECT_ID.cloudfunctions.net/create_query
      

      Anota estas URLs, ya que las necesitarás cuando implementes tu flujo de trabajo.

    Crear y desplegar un flujo de trabajo

    1. En Cloud Shell, abre el archivo de origen del flujo de trabajo, workflow.yaml:

      main:
        steps:
          - constants:
              assign:
                - create_job_url: CREATE_JOB_URL
                - poll_job_url: POLL_BIGQUERY_JOB_URL
                - run_job_url: RUN_BIGQUERY_JOB_URL
                - create_query_url: CREATE_QUERY_URL
                - region: BQ_REGION
                - table_name: BQ_DATASET_TABLE_NAME
              next: createJob
      
          - createJob:
              call: http.get
              args:
                url: ${create_job_url}
                auth:
                    type: OIDC
                query:
                    region: ${region}
                    table_name: ${table_name}
              result: job
              next: setJobId
      
          - setJobId:
              assign:
                - job_id: ${job.body.job_id}
              next: jobCreateCheck
      
          - jobCreateCheck:
              switch:
                - condition: ${job_id == Null}
                  next: noOpJob
              next: runLoadJob
      
          - runLoadJob:
              call: runBigQueryJob
              args:
                  job_id: ${job_id}
                  run_job_url: ${run_job_url}
                  poll_job_url: ${poll_job_url}
              result: jobStatus
              next: loadRunCheck
      
          - loadRunCheck:
              switch:
                - condition: ${jobStatus == 2}
                  next: createQueryJob
              next: failedLoadJob
      
          - createQueryJob:
              call: http.get
              args:
                url: ${create_query_url}
                query:
                    qs: "select count(*) from serverless_elt_dataset.word_count"
                    region: "US"
                auth:
                    type: OIDC
              result: queryjob
              next: setQueryJobId
      
          - setQueryJobId:
              assign:
                - qid: ${queryjob.body.job_id}
              next: queryCreateCheck
      
          - queryCreateCheck:
              switch:
                - condition: ${qid == Null}
                  next: failedQueryJob
              next: runQueryJob
      
          - runQueryJob:
              call: runBigQueryJob
              args:
                job_id: ${qid}
                run_job_url: ${run_job_url}
                poll_job_url: ${poll_job_url}
              result: queryJobState
              next: runQueryCheck
      
          - runQueryCheck:
              switch:
                - condition: ${queryJobState == 2}
                  next: allDone
              next: failedQueryJob
      
          - noOpJob:
              return: "No files to import"
              next: end
      
          - allDone:
              return: "All done!"
              next: end
      
          - failedQueryJob:
              return: "Query job failed"
              next: end
      
          - failedLoadJob:
              return: "Load job failed"
              next: end
      
      
      runBigQueryJob:
        params: [job_id, run_job_url, poll_job_url]
        steps:
          - startBigQueryJob:
              try:
                call: http.get
                args:
                    url: ${run_job_url}
                    query:
                      job_id: ${job_id}
                    auth:
                      type: OIDC
                    timeout: 600
                result: submitJobState
              retry: ${http.default_retry}
              next: validateSubmit
      
          - validateSubmit:
              switch:
                - condition: ${submitJobState.body.status == 1}
                  next: sleepAndPollLoad
              next: returnState
      
          - returnState:
              return: ${submitJobState.body.status}
      
          - sleepAndPollLoad:
              call: sys.sleep
              args:
                seconds: 5
              next: pollJob
      
          - pollJob:
              try:
                call: http.get
                args:
                  url: ${poll_job_url}
                  query:
                    job_id: ${job_id}
                  auth:
                    type: OIDC
                  timeout: 600
                result: pollJobState
              retry:
                predicate: ${http.default_retry_predicate}
                max_retries: 10
                backoff:
                  initial_delay: 1
                  max_delay: 60
                  multiplier: 2
              next: stateCheck
      
          - stateCheck:
              switch:
                - condition: ${pollJobState.body.status == 2}
                  return: ${pollJobState.body.status}
                - condition: ${pollJobState.body.status == 3}
                  return: ${pollJobState.body.status}
              next: sleepAndPollLoad

      Haz los cambios siguientes:

      • CREATE_JOB_URL: URL de la función para crear un nuevo trabajo.
      • POLL_BIGQUERY_JOB_URL: la URL de la función para sondear el estado de un trabajo en curso
      • RUN_BIGQUERY_JOB_URL: la URL de la función para iniciar una tarea de carga de BigQuery
      • CREATE_QUERY_URL: la URL de la función para iniciar un trabajo de consulta de BigQuery
      • BQ_REGION: la región de BigQuery en la que se almacenan los datos (por ejemplo, US
      • BQ_DATASET_TABLE_NAME: nombre de la tabla del conjunto de datos de BigQuery en formato PROJECT_ID.serverless_elt_dataset.word_count
    2. Implementa el archivo workflow:

      gcloud workflows deploy WORKFLOW_NAME \
          --location=WORKFLOW_REGION \
          --description='WORKFLOW_DESCRIPTION' \
          --service-account=workflow-runner@PROJECT_ID.iam.gserviceaccount.com \
          --source=workflow.yaml
      

      Haz los cambios siguientes:

      • WORKFLOW_NAME: nombre único del flujo de trabajo
      • WORKFLOW_REGION: la región en la que se implementa el flujo de trabajo (por ejemplo, us-central1)
      • WORKFLOW_DESCRIPTION: la descripción del flujo de trabajo
    3. Crea un entorno virtual de Python 3 e instala los requisitos del generador de archivos:

      sudo apt-get install -y python3-venv
      python3 -m venv env
      . env/bin/activate
      cd generator
      pip install -r requirements.txt
      

    Generar archivos para importar

    La secuencia de comandos de gen.py Python genera contenido aleatorio en formato Avro. El esquema es el mismo que el de la tabla word_count de BigQuery. Estos archivos Avro se copian en el segmento de Cloud Storage especificado.

    En Cloud Shell, genera los archivos:

    python gen.py -p PROJECT_ID \
        -o PROJECT_ID-ordersbucket \
        -n RECORDS_PER_FILE \
        -f NUM_FILES \
        -x FILE_PREFIX
    

    Haz los cambios siguientes:

    • RECORDS_PER_FILE: número de registros de un solo archivo
    • NUM_FILES: número total de archivos que se van a subir
    • FILE_PREFIX: el prefijo de los nombres de los archivos generados.

    Ver entradas de archivos en Firestore

    Cuando los archivos se copian en Cloud Storage, se activa la función handle_new_fileCloud Run. Esta función añade la lista de archivos al array de la lista de archivos del documento new de la colección jobs de Firestore.

    Para ver la lista de archivos, en la Google Cloud consola, ve a la página Datos de Firestore.

    Ir a Datos

    Lista de archivos añadidos a la colección.

    Activar el flujo de trabajo

    Workflows vincula una serie de tareas sin servidor deGoogle Cloud y servicios de API. Los pasos individuales de este flujo de trabajo se ejecutan como funciones de Cloud Run y el estado se almacena en Firestore. Todas las llamadas a las funciones de Cloud Run se autentican mediante la cuenta de servicio del flujo de trabajo.

    En Cloud Shell, ejecuta el flujo de trabajo:

    gcloud workflows execute WORKFLOW_NAME
    

    En el siguiente diagrama se muestran los pasos que se siguen en el flujo de trabajo:

    Pasos utilizados en el flujo de trabajo principal y en el secundario.

    El flujo de trabajo se divide en dos partes: el flujo de trabajo principal y el subflujo de trabajo. El flujo de trabajo principal gestiona la creación de tareas y la ejecución condicional, mientras que el subflujo de trabajo ejecuta una tarea de BigQuery. El flujo de trabajo realiza las siguientes operaciones:

    • La función create_job Cloud Run crea un objeto de trabajo, obtiene la lista de archivos añadidos a Cloud Storage desde el documento de Firestore y asocia los archivos con el trabajo de carga. Si no hay archivos que cargar, la función no crea ningún trabajo.
    • La función create_query de Cloud Run toma la consulta que se debe ejecutar junto con la región de BigQuery en la que se debe ejecutar la consulta. La función crea el trabajo en Firestore y devuelve el ID del trabajo.
    • La función run_bigquery_job Cloud Run obtiene el ID del trabajo que se debe ejecutar y, a continuación, llama a la API de BigQuery para enviar el trabajo.
    • En lugar de esperar a que se complete el trabajo en la función de Cloud Run, puedes sondear periódicamente el estado del trabajo.
      • La función poll_bigquery_job Cloud Run proporciona el estado del trabajo. Se llama repetidamente hasta que se completa el trabajo.
      • Para añadir un retraso entre las llamadas a la función de poll_bigquery_job Cloud Run, se llama a una sleep rutina desde Workflows.

    Ver el estado del trabajo

    Puedes ver la lista de archivos y el estado del trabajo.

    1. En la consolaGoogle Cloud , ve a la página Datos de Firestore.

      Ir a Datos

    2. Se genera un identificador único (UUID) para cada trabajo. Para ver el job_type y el status, haz clic en el ID de la tarea. Cada trabajo puede tener uno de los siguientes tipos y estados:

      • job_type: el tipo de trabajo que ejecuta el flujo de trabajo con uno de los siguientes valores:

        • 0: Carga datos en BigQuery.
        • 1: Ejecuta una consulta en BigQuery.
      • status: el estado actual del trabajo, con uno de los siguientes valores:

        • 0: La tarea se ha creado, pero no se ha iniciado.
        • 1: La tarea se está ejecutando.
        • 2: El trabajo se ha ejecutado correctamente.
        • 3: Se ha producido un error y el trabajo no se ha completado correctamente.

      El objeto de trabajo también contiene atributos de metadatos, como la región del conjunto de datos de BigQuery, el nombre de la tabla de BigQuery y, si se trata de un trabajo de consulta, la cadena de consulta que se está ejecutando.

    Lista de archivos con el estado de la tarea resaltado.

    Ver datos en BigQuery

    Para confirmar que el trabajo de ELT se ha realizado correctamente, compruebe que los datos aparecen en la tabla.

    1. En la Google Cloud consola, ve a la página Editor de BigQuery.

      Ir al editor

    2. Haz clic en la tabla serverless_elt_dataset.word_count.

    3. Haga clic en la pestaña Vista previa.

      Pestaña de vista previa que muestra los datos en una tabla.

    Programar el flujo de trabajo

    Para ejecutar el flujo de trabajo periódicamente según una programación, puedes usar Cloud Scheduler.

    Limpieza

    La forma más fácil de evitar que te cobren es eliminar el Google Cloud proyecto que has creado para el tutorial. También puedes eliminar los recursos de forma individual.

    Eliminar los recursos concretos

    1. En Cloud Shell, elimina todos los recursos creados con Terraform:

      cd $HOME/bigquery-workflows-load
      terraform destroy \
      -var project_id=PROJECT_ID \
      -var region=REGION \
      -var zone=ZONE \
      --auto-approve
      
    2. En la Google Cloud consola, ve a la página Datos de Firestore.

      Ir a Datos

    3. Junto a Empleos, haz clic en Menú y selecciona Eliminar.

      Ruta del menú para eliminar una colección.

    Eliminar el proyecto

    1. In the Google Cloud console, go to the Manage resources page.

      Go to Manage resources

    2. In the project list, select the project that you want to delete, and then click Delete.
    3. In the dialog, type the project ID, and then click Shut down to delete the project.

    Siguientes pasos