Transmite registros de Avro a BigQuery con Dataflow

En este instructivo, se describe cómo almacenar objetos SpecificRecord de Avro en BigQuery mediante Dataflow por medio de la generación automática del esquema de tabla y la transformación de los elementos de entrada. También se muestra el uso de las clases generadas por Avro para materializar o transmitir datos intermedios entre los trabajadores de tu canalización de Dataflow.

Apache Avro es un sistema de serialización que se basa en esquemas para estructurar datos. Debido a que el esquema siempre está presente cuando se leen o escriben datos de Avro, la serialización es rápida y breve. Los beneficios de rendimiento lo convierten en una opción popular para transmitir mensajes entre sistemas, como una app que envía eventos a un sistema de estadísticas por medio de un agente de mensajes. Puedes usar el esquema de Avro para administrar tu esquema de almacén de datos de BigQuery. Para convertir el esquema de Avro en la estructura de tabla de BigQuery, se requiere un código personalizado, como se muestra en este instructivo.

Este instructivo está dirigido a los desarrolladores y arquitectos interesados en usar el esquema de Avro para administrar el esquema de almacén de datos de BigQuery. En él, se supone que estás familiarizado con Avro y Java.

En el siguiente diagrama, se ilustra la arquitectura de alto nivel de este instructivo.

Se muestra la arquitectura de un esquema de Avro que administra tu esquema de almacén de datos de BigQuery.

En este instructivo, se usa un sistema de procesamiento de pedidos simple con los siguientes pasos para demostrar este patrón arquitectónico:

  • Una aplicación en línea genera eventos cuando el cliente realiza una compra.
  • Un objeto de pedido contiene un identificador único, la lista de artículos comprados y una marca de tiempo.
  • Una canalización de Dataflow lee los mensajes de Avro OrderDetails SpecificRecord desde un tema de Pub/Sub.
  • La canalización de Dataflow escribe los registros en Cloud Storage como archivos de Avro.
  • La clase OrderDetails genera de forma automática el esquema de BigQuery correspondiente.
  • Los objetos OrderDetails se escriben en BigQuery mediante una función de transformación genérica.

Objetivos

  • Transferir strings JSON desde una transmisión de datos de Pub/Sub mediante Dataflow
  • Transformar los objetos JSON en objetos de clases generadas por Avro
  • Generar el esquema de tabla de BigQuery a partir del esquema Avro
  • Escribir los registros de Avro en un archivo de Cloud Storage
  • Escribir los registros de Avro en BigQuery

Costos

En este instructivo, se usan los siguientes componentes facturables de Google Cloud:

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud sean aptos para obtener una prueba gratuita.

Cuando finalices este instructivo, podrás borrar los recursos creados para evitar que se te siga facturando. Para obtener más información, consulta cómo hacer una limpieza.

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  3. Asegúrate de que la facturación esté habilitada para tu proyecto de Cloud. Descubre cómo confirmar que tienes habilitada la facturación en un proyecto.

  4. Habilita las API de BigQuery, Cloud Storage, and Dataflow.

    Habilita las API

  5. En Cloud Console, activa Cloud Shell.

    Activar Cloud Shell

    En la parte inferior de Cloud Console, se inicia una sesión de Cloud Shell en la que se muestra una ventana de línea de comandos. Cloud Shell es un entorno de shell que tiene el SDK de Cloud preinstalado, incluida la herramienta de línea de comandos de gcloud, y valores ya establecidos para el proyecto actual. La inicialización de la sesión puede tomar unos minutos.

Configura tu entorno

  1. En Cloud Shell, clona el repositorio de código fuente:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/bigquery-ingest-avro-dataflow-sample.git
    
  2. Genera clases Avro:

    cd $HOME/bigquery-ingest-avro-dataflow-sample/BeamAvro
    mvn generate-sources
    

    Este comando usa el archivo orderdetails.avsc para generar la clase OrderDetails y la clase OrderItems. La clase OrderDetails tiene un identificador único, una marca de tiempo y una lista de OrderItems. La clase OrderItems tiene un identificador único, un nombre y un precio. El esquema de Avro se propaga a la tabla de BigQuery, en la que una fila que contiene un pedido tiene un arreglo de registros del tipo OrderItem. Para obtener más información, consulta Especifica columnas anidadas y repetidas.

  3. Abre el archivo env.sh:

    cd $HOME/bigquery-ingest-avro-dataflow-sample
    nano env.sh
    
  4. El archivo env.sh contiene ajustes predeterminados que puedes usar en este instructivo, pero puedes modificarlos para tu entorno.

    # pubsub topic
    MY_TOPIC="avro-records"
    
    # Cloud Storage Bucket
    MY_BUCKET="$GOOGLE_CLOUD_PROJECT""_avro_beam"
    
    # Avro file Cloud Storage output path
    AVRO_OUT="$MY_BUCKET/""out/"
    
    # Region for Cloud Pub/Sub and Cloud Dataflow
    REGION="us-central1"
    
    # Region for BigQuery
    BQ_REGION="US"
    
    # BigQuery dataset name
    BQ_DATASET="sales"
    
    # BigQuery table name
    `BQ_TABLE=`"`orders`"
    
    # Maximum number of Dataflow workers
    NUM_WORKERS=1
    

    Reemplaza los siguientes valores:

    • avro-records: Es el nombre de tu tema de Pub/Sub.
    • $GOOGLE_CLOUD_PROJECT"_avro_beam: Es el nombre de tu depósito de Cloud Storage que genera el ID del proyecto de Cloud.
    • $MY_BUCKET/""out/": Es la ruta del depósito de Cloud Storage que contiene tu resultado de Avro.
    • us-central1: Es la región que usas para Pub/Sub y Dataflow. Para obtener más información acerca de las regiones, consulta Geografía y regiones.
    • US: Es la región para BigQuery. Para obtener más información acerca de las ubicaciones, consulta Ubicaciones de conjuntos de datos.
    • sales: Es el nombre del conjunto de datos de BigQuery.
    • orders: el nombre de la tabla de BigQuery.
    • 1: la cantidad máxima de trabajadores de Dataflow.
  5. Configura las variables de entorno:

     . ./env.sh
    

Crea recursos

  1. En Cloud Shell, crea un tema de Pub/Sub:

    gcloud pubsub topics create $MY_TOPIC
    
  2. Crea un bucket de Cloud Storage:

    gsutil mb -l $REGION -c regional gs://$MY_BUCKET
    

    El depósito de Cloud Storage crea una copia de seguridad de los eventos sin procesar generados por la app. También puede servir como una fuente alternativa para la validación y las estadísticas sin conexión mediante el uso de trabajos de Spark y Hadoop ejecutados en Dataproc.

  3. Crea un conjunto de datos de BigQuery:

    bq --location=$BQ_REGION mk --dataset $GOOGLE_CLOUD_PROJECT:$BQ_DATASET

    Un conjunto de datos de BigQuery contiene tablas y vistas en una sola región o una geografía que contiene varias regiones. Para obtener más información, consulta Crea conjuntos de datos.

Inicia la app de Beam Dataflow

  1. En Cloud Shell, implementa y ejecuta la canalización en el ejecutor de Dataflow:

    cd $HOME/bigquery-ingest-avro-dataflow-sample/BeamAvro
    mvn compile exec:java \
        -Dexec.mainClass=com.google.cloud.solutions.beamavro.AvroToBigQuery \
        -Dexec.cleanupDaemonThreads=false \
        -Dexec.args=" \
        --project=$GOOGLE_CLOUD_PROJECT \
        --runner=DataflowRunner \
        --stagingLocation=gs://$MY_BUCKET/stage/ \
        --tempLocation=gs://$MY_BUCKET/temp/ \
        --inputPath=projects/$GOOGLE_CLOUD_PROJECT/topics/$MY_TOPIC \
        --workerMachineType=n1-standard-1 \
        --maxNumWorkers=$NUM_WORKERS \
        --region=$REGION \
        --dataset=$BQ_DATASET \
        --bqTable=$BQ_TABLE \
        --outputPath=$AVRO_OUT"
    

    El resultado contiene el ID de tu app. Anótalo porque lo necesitarás más adelante en el instructivo.

  2. En Cloud Console, ve a Dataflow.

    Ir a Dataflow

  3. Para ver el estado de la canalización, haz clic en el ID de tu app. El estado de la canalización se muestra como un gráfico.

    Gráfico del estado de la canalización.

Revisa el código

En el archivo AvroToBigQuery.java, las opciones de canalización con los parámetros requeridos pasan por los parámetros de la línea de comandos. La opción de modo de transmisión también está habilitada. El esquema de tabla de BigQuery se genera a partir del esquema de clase Avro y se usa más adelante en la clase BigQuery IO:

TableSchema ts = BigQueryAvroUtils.getTableSchema(OrderDetails.SCHEMA$);

La clase AvroUtils itera en los campos del objeto de esquema de Avro y genera los objetos TableFieldSchema correspondientes de forma recursiva. Los objetos se unen en un objeto TableSchema y se muestran.

En el caso del formato de entrada de Avro, los objetos se leen desde Pub/Sub. Si el formato de entrada es JSON, los eventos se leen y se transforman en objetos de Avro.

private static PCollection<OrderDetails> getInputCollection(
    Pipeline pipeline, String inputPath, FORMAT format) {
  if (format == FORMAT.JSON) {
    // Transform JSON to Avro
    return pipeline
        .apply("Read JSON from PubSub", PubsubIO.readStrings().fromTopic(inputPath))
        .apply("To binary", ParDo.of(new JSONToAvro()));
  } else {
    // Read Avro
    return pipeline.apply(
        "Read Avro from PubSub", PubsubIO.readAvros(OrderDetails.class).fromTopic(inputPath));
  }
}

La canalización se bifurca. La transformación Write to Cloud Storage es una transformación compuesta, que recopila los registros en el período durante 10 segundos y, luego, los escribe en un archivo Avro en Cloud Storage mediante el escritor AvroIO:

ods.apply(
    "Write to GCS",
    new AvroWriter()
        .withOutputPath(options.getOutputPath())
        .withRecordType(OrderDetails.class));

La transformación Write to BigQuery escribe los registros en la tabla de BigQuery:

ods.apply(
    "Write to BigQuery",
    BigQueryIO.write()
        .to(bqStr)
        .withSchema(ts)
        .withWriteDisposition(WRITE_APPEND)
        .withCreateDisposition(CREATE_IF_NEEDED)
        .withFormatFunction(TABLE_ROW_PARSER));

La transformación BigQueryIO escribe los objetos de Avro en BigQuery y los transforma en objetos TableRow mediante el método TABLE_ROW_PARSER. El analizador llama al método convertSpecificRecordToTableRow en la clase BigQueryAvroUtils, que se compila en una clase de prueba en el proyecto de Apache Beam. El method analiza los campos de Avro de forma recursiva y los agrega a un objeto TableRow.

private static TableRow convertSpecificRecordToTableRow(
    SpecificRecord record, List<TableFieldSchema> fields) {
  TableRow row = new TableRow();
  for (TableFieldSchema subSchema : fields) {
    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the name field
    // is required, so it may not be null.
    Field field = record.getSchema().getField(subSchema.getName());
    if (field == null || field.name() == null) {
      continue;
    }
    Object convertedValue = getTypedCellValue(field.schema(), subSchema, record.get(field.pos()));
    if (convertedValue != null) {
      // To match the JSON files exported by BigQuery, do not include null values in the output.
      row.set(field.name(), convertedValue);
    }
  }

  return row;
}

En la siguiente tabla, se muestra la asignación entre los tipos de datos de BigQuery y los tipos de datos de Avro. Presta atención a tipos como Date y Timestamp, que se identifican mediante el tipo lógico del campo.

BigQuery Avro
STRING STRING
GEOGRAPHY STRING
BYTES BYTES
INTEGER INT
FLOAT FLOAT
FLOAT64 DOUBLE
NUMERIC BYTES
BOOLEAN BOOLEAN
INT64 LONG
TIMESTAMP LONG
DATE INT
DATETIME STRING
TIME LONG
STRUCT RECORD
REPEATED FIELD ARRAY

Ve los resultados en BigQuery

Para probar la canalización, inicia la secuencia de comandos gen.py. Esta secuencia de comandos simula la generación de eventos de pedido y los envía al tema de Pub/Sub.

  1. En Cloud Shell, cambia al directorio de secuencia de comandos del generador de eventos de muestra y ejecuta la secuencia de comandos:

    cd $HOME/bigquery-ingest-avro-dataflow-sample/generator
    python3 -m venv env
    . ./env/bin/activate
    pip install -r requirements.txt
    python3 gen.py -p $GOOGLE_CLOUD_PROJECT -t $MY_TOPIC -n 100 -f avro
    
  2. En Cloud Console, ve a BigQuery.

    Ir a BigQuery

  3. Para ver el esquema de tabla, haz clic en el conjunto de datos sales y, luego, selecciona la tabla orders. Si modificaste las variables de entorno predeterminadas en env.sh, los nombres del conjunto de datos y de la tabla pueden ser diferentes.

    Esquema de tabla de la tabla “orders” (pedidos).

  4. Para ver algunos datos de muestra, ejecuta una consulta en el Editor de consultas:

    SELECT * FROM sales.orders LIMIT 5
    

    Resultado de la consulta de los datos de muestra.

    El esquema de tabla de BigQuery se genera de forma automática a partir de los registros de Avro, y los datos se convierten de forma automática en la estructura de tabla de BigQuery.

Limpia

Borra el proyecto

  1. En Cloud Console, ve a la página Administrar recursos.

    Ir a Administrar recursos

  2. En la lista de proyectos, elige el proyecto que quieres borrar y haz clic en Borrar.
  3. En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.

Borra los recursos individuales

  1. Sigue estas instrucciones para detener el trabajo de Dataflow.

  2. Borra el bucket de Cloud Storage:

    gsutil rm -r gs://$MY_BUCKET
    

¿Qué sigue?