Escribe desde Dataflow a BigQuery

En este documento, se describe cómo escribir datos de Dataflow en BigQuery mediante el conector de E/S de BigQuery de Apache Beam.

El conector de E/S de BigQuery está disponible en el SDK de Apache Beam. Te recomendamos usar la versión más reciente del SDK. Para obtener más información, consulta SDK de Apache Beam 2.x.

También se encuentra disponible la compatibilidad entre lenguajes para Python.

Descripción general

El conector de E/S de BigQuery admite los siguientes métodos para escribir en BigQuery:

  • STORAGE_WRITE_API En este modo, el conector realiza operaciones de escritura directas en el almacenamiento de BigQuery mediante la API de escritura de almacenamiento de BigQuery. La API de Storage Write combina la transferencia de transmisión y la carga por lotes en una sola API de alto rendimiento. Este modo garantiza la semántica de exactamente una vez.
  • STORAGE_API_AT_LEAST_ONCE Este modo también usa la API de Storage Write, pero proporciona una semántica de al menos una vez. Este modo da como resultado una latencia más baja para la mayoría de las canalizaciones. Sin embargo, es posible realizar operaciones de escritura duplicadas.
  • FILE_LOADS En este modo, el conector escribe los datos de entrada en archivos de etapa de pruebas en Cloud Storage. Luego, ejecuta un trabajo de carga de BigQuery para cargar los datos en BigQuery. El modo es el predeterminado para PCollections delimitada, que se encuentran con mayor frecuencia en las canalizaciones por lotes.
  • STREAMING_INSERTS En este modo, el conector usa la API de transmisión heredada. Este modo es el predeterminado para PCollections no delimitadas, pero no se recomienda para proyectos nuevos.

Cuando elijas un método de escritura, ten en cuenta los siguientes puntos:

  • Para los trabajos de transmisión, considera usar STORAGE_WRITE_API o STORAGE_API_AT_LEAST_ONCE, ya que estos modos escriben directamente en el almacenamiento de BigQuery, sin usar archivos de etapa de pruebas intermedias.
  • Si ejecutas la canalización con el modo de transmisión al menos una vez, configura el modo de escritura en STORAGE_API_AT_LEAST_ONCE. Esta configuración es más eficiente y coincide con la semántica del modo de transmisión al menos una vez.
  • Las cargas de archivos y la API de Storage Write tienen diferentes cuotas y límites.
  • Los trabajos de carga usan el grupo de ranuras compartido de BigQuery o las ranuras reservadas. Para usar ranuras reservadas, ejecuta el trabajo de carga en un proyecto con una asignación de reserva de tipo PIPELINE. Los trabajos de carga son gratuitos si usas el grupo de ranuras compartido de BigQuery. Sin embargo, BigQuery no garantiza la capacidad disponible del grupo compartido. Para obtener más información, consulta Introducción a las reservas.

Paralelismo

  • Para FILE_LOADS y STORAGE_WRITE_API en las canalizaciones de transmisión, el conector fragmenta los datos en un número de archivos o transmisiones. En general, recomendamos llamar a withAutoSharding para habilitar la fragmentación automática.

  • Para FILE_LOADS en canalizaciones por lotes, el conector escribe datos en archivos particionados, que luego se cargan en BigQuery en paralelo.

  • Para STORAGE_WRITE_API en canalizaciones por lotes, cada trabajador crea una o más transmisiones para escribir en BigQuery, determinadas por la cantidad total de fragmentos.

  • Para STORAGE_API_AT_LEAST_ONCE, hay una sola transmisión de escritura predeterminada. Varios trabajadores adjuntan a esta transmisión.

Rendimiento

En la siguiente tabla, se muestran las métricas de rendimiento de varias opciones de lectura de E/S de BigQuery. Las cargas de trabajo se ejecutaron en un trabajador e2-standard2, con el SDK de Apache Beam 2.49.0 para Java. No usaron Runner v2.

100 millones de registros | 1 KB | 1 columna Capacidad de procesamiento (bytes) Capacidad de procesamiento (elementos)
Escritura de almacenamiento 55 MBps 54,000 elementos por segundo
Carga de Avro 78 MBps 77,000 elementos por segundo
Json Load 54 MBps 53,000 elementos por segundo

Estas métricas se basan en canalizaciones por lotes simples. Están diseñadas para comparar el rendimiento entre los conectores de E/S y no representan necesariamente las canalizaciones del mundo real. El rendimiento de la canalización de Dataflow es complejo y es una función del tipo de VM, los datos que se procesan, el rendimiento de las fuentes y los receptores externos y el código de usuario. Las métricas se basan en la ejecución del SDK de Java y no representan las características de rendimiento de otros SDK de lenguaje. Para obtener más información, consulta Rendimiento de E/S de Beam.

prácticas recomendadas

En esta sección, se describen las prácticas recomendadas para escribir en BigQuery desde Dataflow.

Consideraciones generales

  • La API de Storage Write tiene límites de cuota. El conector controla estos límites para la mayoría de las canalizaciones. Sin embargo, algunas situaciones pueden agotar las transmisiones de la API de Storage Write disponibles. Por ejemplo, este problema puede ocurrir en una canalización que usa fragmentación automática y ajuste de escala automático con una gran cantidad de destinos, en especial, en trabajos de larga duración con cargas de trabajo altamente variables. Si se produce este problema, considera usar STORAGE_WRITE_API_AT_LEAST_ONCE, que evita el problema.

  • Usa las métricas de Google Cloud para supervisar el uso de cuota de la API de Storage Write.

  • Cuando se usan cargas de archivos, Avro suele superar el JSON. Para usar Avro, llama a withAvroFormatFunction.

  • De forma predeterminada, los trabajos de carga se ejecutan en el mismo proyecto que el trabajo de Dataflow. Para especificar un proyecto diferente, llama a withLoadJobProjectId.

  • Cuando uses el SDK de Java, considera crear una clase que represente el esquema de la tabla de BigQuery. Luego, llama a useBeamSchema en tu canalización para convertir de forma automática entre los tipos de Row de Apache Beam y BigQuery TableRow. Para ver un ejemplo de una clase de esquema, consulta ExampleModel.java.

  • Si cargas tablas con esquemas complejos que contienen miles de campos, considera llamar a withMaxBytesPerPartition para establecer un tamaño máximo más pequeño para cada trabajo de carga.

Canalizaciones de transmisión

Las siguientes recomendaciones se aplican a las canalizaciones de transmisión.

  • Para las canalizaciones de transmisión, recomendamos usar la API de Storage Write (STORAGE_WRITE_API o STORAGE_API_AT_LEAST_ONCE).

  • Una canalización de transmisión puede usar cargas de archivos, pero este enfoque tiene las desventajas siguientes:

    • Se requiere una renderización de ventanas para escribir los archivos. No puedes usar la ventana global.
    • BigQuery carga los archivos según el mejor esfuerzo cuando se usa el grupo de ranuras compartidas. Puede haber una demora significativa entre el momento en que se escribe un registro y el momento en que está disponible en BigQuery.
    • Si un trabajo de carga falla, por ejemplo, debido a datos incorrectos o una falta de coincidencia del esquema, toda la canalización falla.
  • Considera usar STORAGE_WRITE_API_AT_LEAST_ONCE cuando sea posible. Puede hacer que se escriban registros duplicados en BigQuery, pero es menos costoso y más escalable que STORAGE_WRITE_API.

  • En general, evita usar STREAMING_INSERTS. Las inserciones de transmisión son más costosas que la API de Storage Write y no tienen un buen rendimiento.

  • La fragmentación de datos puede mejorar el rendimiento en las canalizaciones de transmisión. Para la mayoría de las canalizaciones, la fragmentación automática es un buen punto de partida. Sin embargo, puedes ajustar la fragmentación de la siguiente manera:

  • Si usas inserciones de transmisión, te recomendamos configurar retryTransientErrors como la política de reintento.

Canalizaciones por lotes

Las siguientes recomendaciones se aplican a las canalizaciones por lotes.

  • Para la mayoría de las canalizaciones por lotes grandes, te recomendamos que primero pruebes FILE_LOADS. Una canalización por lotes puede usar STORAGE_WRITE_API, pero es probable que exceda los límites de cuota a gran escala (más de 1,000 CPU virtuales) o si se ejecutan canalizaciones simultáneas. Apache Beam no limita la cantidad máxima de transmisiones de escritura para los trabajos por lotes STORAGE_WRITE_API, por lo que el trabajo finalmente alcanza los límites de la API de almacenamiento de BigQuery.

  • Cuando usas FILE_LOADS, puedes agotar el grupo de ranuras compartido de BigQuery o el grupo de ranuras reservadas. Si encuentras este tipo de errores, prueba los siguientes enfoques:

    • Reduce la cantidad máxima o el tamaño de los trabajadores para el trabajo.
    • Comprar más ranuras reservadas.
    • Considera usar STORAGE_WRITE_API.
  • Las canalizaciones pequeñas a medianas (menos de 1,000 CPU virtuales) podrían beneficiarse de usar STORAGE_WRITE_API. Para estos trabajos más pequeños, considera usar STORAGE_WRITE_API si deseas una cola de mensajes no entregados o cuando el grupo de ranuras compartido FILE_LOADS no es suficiente.

  • Si puedes tolerar datos duplicados, considera usar STORAGE_WRITE_API_AT_LEAST_ONCE. Este modo puede hacer que se escriban registros duplicados en BigQuery, pero puede ser menos costoso que la opción STORAGE_WRITE_API.

  • Los diferentes modos de escritura pueden tener un rendimiento diferente según las características de tu canalización. Experimenta para encontrar el mejor modo de escritura para tu carga de trabajo.

Maneja errores a nivel de fila

En esta sección, se describe cómo manejar errores que pueden ocurrir a nivel de fila, por ejemplo, debido a datos de entrada con formato incorrecto o a discrepancias de esquema.

En la API de Storage Write, las filas que no se pueden escribir se colocan en un PCollection separado. Para obtener esta colección, llama a getFailedStorageApiInserts en el objeto WriteResult. Para ver un ejemplo de este enfoque, consulta Transmite datos a BigQuery.

Se recomienda enviar los errores a una cola o tabla de mensajes no entregados para su procesamiento posterior. Para obtener más información sobre este patrón, consulta Patrón de mensajes no entregados BigQueryIO.

En FILE_LOADS, si se produce un error mientras se cargan los datos, el trabajo de carga falla y la canalización genera una excepción de entorno de ejecución. Puedes ver el error en los registros de Dataflow o ver el historial de trabajos de BigQuery. El conector de E/S no muestra información sobre las filas con errores individuales.

Si deseas obtener más información para solucionar errores, consulta Errores del conector de BigQuery.

Ejemplos

En los siguientes ejemplos, se muestra cómo usar Dataflow para escribir en BigQuery.

Escribir en una tabla existente

En el siguiente ejemplo, se crea una canalización por lotes que escribe una PCollection<MyData> en BigQuery, en la que MyData es un tipo de datos personalizado.

El método BigQueryIO.write() muestra un tipo BigQueryIO.Write<T>, que se usa para configurar la operación de escritura. Para obtener más información, consulta Escribe en una tabla en la documentación de Apache Beam. En este ejemplo de código, se escribe en una tabla existente (CREATE_NEVER) y se agregan las filas nuevas a la tabla (WRITE_APPEND).

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWrite {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to an exiting BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API));
    pipeline.run().waitUntilFinish();
  }
}

Escribe en una tabla nueva o existente

En el siguiente ejemplo, se crea una tabla nueva si la tabla de destino no existe mediante la configuración de la disposición de creación como CREATE_IF_NEEDED. Cuando usas esta opción, debes proporcionar un esquema de tabla. El conector usa este esquema si crea una tabla nueva.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWriteWithSchema {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Define a table schema. A schema is required for write disposition CREATE_IF_NEEDED.
    TableSchema schema = new TableSchema()
        .setFields(
            Arrays.asList(
                new TableFieldSchema()
                    .setName("user_name")
                    .setType("STRING")
                    .setMode("REQUIRED"),
                new TableFieldSchema()
                    .setName("age")
                    .setType("INT64") // Defaults to NULLABLE
            )
        );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to a new or existing BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withSchema(schema)
            .withMethod(Write.Method.STORAGE_WRITE_API)
        );
    pipeline.run().waitUntilFinish();
  }
}

Transmite datos a BigQuery

En el siguiente ejemplo, se muestra cómo transmitir datos mediante la semántica de exactamente una vez mediante la configuración del modo de escritura en STORAGE_WRITE_API:

No todas las canalizaciones de transmisión requieren una semántica de exactamente una vez. Por ejemplo, puedes quitar los duplicados de forma manual de la tabla de destino. Si la posibilidad de tener registros duplicados es aceptable en tu caso, considera usar una semántica de al menos una vez mediante la configuración del método de escritura en STORAGE_API_AT_LEAST_ONCE. Por lo general, este método es más eficiente y da como resultado una latencia más baja para la mayoría de las canalizaciones.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class BigQueryStreamExactlyOnce {
  // Create a PTransform that sends simulated streaming data. In a real application, the data
  // source would be an external source, such as Pub/Sub.
  private static TestStream<String> createEventSource() {
    Instant startTime = new Instant(0);
    return TestStream.create(StringUtf8Coder.of())
        .advanceWatermarkTo(startTime)
        .addElements(
            TimestampedValue.of("Alice,20", startTime),
            TimestampedValue.of("Bob,30",
                startTime.plus(Duration.standardSeconds(1))),
            TimestampedValue.of("Charles,40",
                startTime.plus(Duration.standardSeconds(2))),
            TimestampedValue.of("Dylan,Invalid value",
                startTime.plus(Duration.standardSeconds(2))))
        .advanceWatermarkToInfinity();
  }

  public static PipelineResult main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);
    options.setStreaming(true);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Add a streaming data source.
        .apply(createEventSource())
        // Map the event data into TableRow objects.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((String x) -> {
              String[] columns = x.split(",");
              return new TableRow().set("user_name", columns[0]).set("age", columns[1]);
            }))
        // Write the rows to BigQuery
        .apply(BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API)
            // For exactly-once processing, set the triggering frequency.
            .withTriggeringFrequency(Duration.standardSeconds(5)))
        // Get the collection of write errors.
        .getFailedStorageApiInserts()
        .apply(MapElements.into(TypeDescriptors.strings())
            // Process each error. In production systems, it's useful to write the errors to
            // another destination, such as a dead-letter table or queue.
            .via(
                x -> {
                  System.out.println("Failed insert: " + x.getErrorMessage());
                  System.out.println("Row: " + x.getRow());
                  return "";
                }));
    return pipeline.run();
  }
}

¿Qué sigue?