Escribir datos de Dataflow en BigQuery

En este documento se describe cómo escribir datos de Dataflow en BigQuery.

Información general

En la mayoría de los casos prácticos, te recomendamos que uses E/S gestionada para escribir en BigQuery. La E/S gestionada ofrece funciones como las actualizaciones automáticas y una API de configuración coherente. Al escribir en BigQuery, la E/S gestionada elige automáticamente el mejor método de escritura para las tareas por lotes o de streaming.

Si necesitas una optimización del rendimiento más avanzada, te recomendamos que uses el conector BigQueryIO. Para obtener más información, consulta la sección Usar el conector BigQueryIO de este documento.

Rendimiento

En la siguiente tabla se muestran las métricas de rendimiento de varias cargas de trabajo. Estas cargas de trabajo se ejecutaron en un e2-standard2 trabajador con el SDK de Apache Beam 2.49.0 para Java. No usaron Runner v2.

100 M de registros | 1 KB | 1 columna Rendimiento (bytes) Rendimiento (elementos)
Escritura de almacenamiento 55 MB/s 54.000 elementos por segundo
Carga de Avro 78 MBps 77.000 elementos por segundo
Carga de JSON 54 MBps 53.000 elementos por segundo

Estas métricas se basan en sencillas canalizaciones por lotes. Su objetivo es comparar el rendimiento entre conectores de E/S y no representan necesariamente las canalizaciones del mundo real. El rendimiento de las canalizaciones de Dataflow es complejo y depende del tipo de VM, los datos que se procesan, el rendimiento de las fuentes y los receptores externos, y el código de usuario. Las métricas se basan en la ejecución del SDK de Java y no representan las características de rendimiento de otros SDKs de lenguaje. Para obtener más información, consulta Rendimiento de Beam IO.

Usar el conector BigQueryIO

El conector de entrada/salida de BigQuery admite los siguientes métodos para escribir en BigQuery:

  • STORAGE_WRITE_API. En este modo, el conector escribe directamente en el almacenamiento de BigQuery mediante la API Storage Write de BigQuery. La API Storage Write combina la ingestión de streaming y la carga por lotes en una sola API de alto rendimiento. Este modo garantiza la semántica de exactamente una vez.
  • STORAGE_API_AT_LEAST_ONCE. Este modo también usa la API Storage Write, pero proporciona semántica de "al menos una vez". Este modo reduce la latencia de la mayoría de las canalizaciones. Sin embargo, es posible que se produzcan escrituras duplicadas.
  • FILE_LOADS. En este modo, el conector escribe los datos de entrada en archivos de almacenamiento provisional en Cloud Storage. A continuación, ejecuta un trabajo de carga de BigQuery para cargar los datos en BigQuery. Este modo es el predeterminado para los PCollections acotados, que se suelen encontrar en las canalizaciones por lotes.
  • STREAMING_INSERTS. En este modo, el conector usa la API de streaming antigua. Este modo es el predeterminado para los PCollections sin límites, pero no se recomienda para proyectos nuevos.

Cuando elijas un método de escritura, ten en cuenta lo siguiente:

  • En el caso de las tareas de streaming, te recomendamos que uses STORAGE_WRITE_API o STORAGE_API_AT_LEAST_ONCE, ya que estos modos escriben directamente en el almacenamiento de BigQuery sin usar archivos de almacenamiento provisional intermedios.
  • Si ejecutas la canalización con el modo de streaming al menos una vez, define el modo de escritura como STORAGE_API_AT_LEAST_ONCE. Este ajuste es más eficiente y coincide con la semántica del modo de streaming al menos una vez.
  • Las cargas de archivos y la API Storage Write tienen diferentes cuotas y límites.
  • Las tareas de carga usan el grupo de ranuras de BigQuery compartido o las ranuras reservadas. Para usar las ranuras reservadas, ejecuta la tarea de carga en un proyecto con una asignación de reserva de tipo PIPELINE. Las tareas de carga son gratuitas si usas el grupo de ranuras de BigQuery compartido. Sin embargo, BigQuery no garantiza la disponibilidad del grupo compartido. Para obtener más información, consulta la introducción a las reservas.

Paralelismo

  • En FILE_LOADS y STORAGE_WRITE_API en las canalizaciones de streaming, el conector fragmenta los datos en varios archivos o flujos. En general, recomendamos llamar a withAutoSharding para habilitar la fragmentación automática.

  • En el caso de FILE_LOADS en las canalizaciones por lotes, el conector escribe datos en archivos particionados, que se cargan en BigQuery en paralelo.

  • En el caso de STORAGE_WRITE_API en las canalizaciones por lotes, cada trabajador crea uno o varios flujos para escribir en BigQuery, lo que se determina en función del número total de particiones.

  • En el caso de STORAGE_API_AT_LEAST_ONCE, hay un solo flujo de escritura predeterminado. Varios trabajadores añaden contenido a este flujo.

Prácticas recomendadas

  • La API Storage Write tiene límites de cuota. El conector gestiona estos límites en la mayoría de las canalizaciones. Sin embargo, en algunos casos, se pueden agotar los flujos de la API Storage Write disponibles. Por ejemplo, este problema puede producirse en una canalización que utilice el particionado automático y el autoescalado con un gran número de destinos, sobre todo en tareas de larga duración con cargas de trabajo muy variables. Si se produce este problema, puedes usar STORAGE_WRITE_API_AT_LEAST_ONCE, que no lo tiene.

  • Usa las métricas de Google Cloud Platform para monitorizar el uso de la cuota de la API Storage Write.

  • Cuando se usan cargas de archivos, Avro suele superar a JSON. Para usar Avro, llama a withAvroFormatFunction.

  • De forma predeterminada, las tareas de carga se ejecutan en el mismo proyecto que la tarea de Dataflow. Para especificar otro proyecto, llama a withLoadJobProjectId.

  • Cuando uses el SDK de Java, considera la posibilidad de crear una clase que represente el esquema de la tabla de BigQuery. A continuación, llama a useBeamSchema en tu canalización para convertir automáticamente los tipos de Row de Apache Beam y TableRow de BigQuery. Para ver un ejemplo de una clase de esquema, consulta ExampleModel.java.

  • Si carga tablas con esquemas complejos que contienen miles de campos, le recomendamos que llame a withMaxBytesPerPartition para definir un tamaño máximo más pequeño para cada tarea de carga.

  • De forma predeterminada, BigQueryIO usa ajustes de la API Storage Write que son razonables para la mayoría de las canalizaciones. Sin embargo, si detectas problemas de rendimiento, puedes definir opciones de la canalización para ajustar estos ajustes. Para obtener más información, consulta Ajustar la API Storage Write en la documentación de Apache Beam.

Flujos de procesamiento en streaming

Las siguientes recomendaciones se aplican a las canalizaciones de streaming.

  • En el caso de las canalizaciones de streaming, te recomendamos que uses la API Storage Write (STORAGE_WRITE_API o STORAGE_API_AT_LEAST_ONCE).

  • Una canalización de streaming puede usar cargas de archivos, pero este enfoque tiene desventajas:

    • Requiere ventanas para escribir los archivos. No puedes usar la ventana global.
    • BigQuery carga archivos de la mejor forma posible cuando se usa el grupo de slots compartido. Puede haber un retraso significativo entre el momento en que se escribe un registro y el momento en que está disponible en BigQuery.
    • Si se produce un error en una tarea de carga (por ejemplo, debido a datos incorrectos o a un error de coincidencia de esquemas), se produce un error en toda la canalización.
  • Te recomendamos que uses STORAGE_WRITE_API_AT_LEAST_ONCE siempre que sea posible. Puede provocar que se escriban registros duplicados en BigQuery, pero es menos costoso y más escalable que STORAGE_WRITE_API.

  • En general, evita usar STREAMING_INSERTS. Las inserciones de streaming son más caras que la API Storage Write y no tienen el mismo rendimiento.

  • El particionado de datos puede mejorar el rendimiento de las canalizaciones de streaming. En la mayoría de las pipelines, el particionamiento automático es un buen punto de partida. Sin embargo, puedes ajustar la fragmentación de la siguiente manera:

  • Si usas inserciones de streaming, te recomendamos que definas retryTransientErrors como política de reintentos.

Flujos de procesamiento por lotes

Las siguientes recomendaciones se aplican a las canalizaciones por lotes.

  • En la mayoría de las grandes canalizaciones de procesamiento por lotes, te recomendamos que pruebes primero FILE_LOADS. Una canalización por lotes puede usar STORAGE_WRITE_API, pero es probable que supere los límites de cuota a gran escala (más de 1000 vCPUs) o si se ejecutan canalizaciones simultáneas. Apache Beam no limita el número máximo de flujos de escritura de las tareas por lotes, por lo que la tarea acaba alcanzando los límites de la API Storage de BigQuery.STORAGE_WRITE_API

  • Cuando se usa FILE_LOADS, puede agotar el grupo de ranuras de BigQuery compartido o el grupo de ranuras reservadas. Si te encuentras con este tipo de fallo, prueba los siguientes métodos:

    • Reduce el número máximo de trabajadores o el tamaño de los trabajadores de la tarea.
    • Compra más ranuras reservadas.
    • Considera usar STORAGE_WRITE_API.
  • Las canalizaciones pequeñas y medianas (menos de 1000 vCPUs) pueden beneficiarse del uso de STORAGE_WRITE_API. En el caso de estas tareas más pequeñas, te recomendamos que uses STORAGE_WRITE_API si quieres una cola de mensajes fallidos o cuando el grupo de ranuras compartido FILE_LOADS no sea suficiente.

  • Si puedes tolerar los datos duplicados, te recomendamos que uses STORAGE_WRITE_API_AT_LEAST_ONCE. Este modo puede provocar que se escriban registros duplicados en BigQuery, pero puede ser menos caro que la opción STORAGE_WRITE_API.

  • Los diferentes modos de escritura pueden funcionar de forma distinta en función de las características de tu canalización. Experimenta para encontrar el mejor modo de escritura para tu carga de trabajo.

Gestionar errores a nivel de fila

En esta sección se describe cómo gestionar los errores que pueden producirse a nivel de fila, por ejemplo, debido a datos de entrada mal formados o a incompatibilidades de esquemas.

En el caso de la API Storage Write, las filas que no se pueden escribir se colocan en un PCollection independiente. Para obtener esta colección, llama a getFailedStorageApiInserts en el objeto WriteResult. Para ver un ejemplo de este enfoque, consulta Transmitir datos a BigQuery.

Te recomendamos que envíes los errores a una cola o tabla de mensajes fallidos para procesarlos más adelante. Para obtener más información sobre este patrón, consulta el patrón de mensajes fallidos.BigQueryIO

En el caso de FILE_LOADS, si se produce un error al cargar los datos, el trabajo de carga falla y la canalización genera una excepción de tiempo de ejecución. Puede ver el error en los registros de Dataflow o consultar el historial de tareas de BigQuery. El conector de entrada/salida no devuelve información sobre las filas que no se han podido insertar.

Para obtener más información sobre cómo solucionar errores, consulta Errores del conector de BigQuery.

Ejemplos

En los siguientes ejemplos se muestra cómo usar Dataflow para escribir en BigQuery. En estos ejemplos se usa el conector BigQueryIO.

Escribir en una tabla

En el siguiente ejemplo se crea una canalización por lotes que escribe un PCollection<MyData> en BigQuery, donde MyData es un tipo de datos personalizado.

El método BigQueryIO.write() devuelve un tipo BigQueryIO.Write<T>, que se usa para configurar la operación de escritura. Para obtener más información, consulta el artículo Escribir en una tabla de la documentación de Apache Beam. En este ejemplo de código se escribe en una tabla (CREATE_NEVER) y se añaden las nuevas filas a la tabla (WRITE_APPEND).

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWrite {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to an exiting BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API));
    pipeline.run().waitUntilFinish();
  }
}

Escribir en una tabla nueva o ya creada

En el siguiente ejemplo se crea una tabla si la tabla de destino no existe. Para ello, se asigna el valor CREATE_IF_NEEDED a create disposition. Si utiliza esta opción, debe proporcionar un esquema de tabla. El conector usa este esquema si crea una tabla.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWriteWithSchema {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Define a table schema. A schema is required for write disposition CREATE_IF_NEEDED.
    TableSchema schema = new TableSchema()
        .setFields(
            Arrays.asList(
                new TableFieldSchema()
                    .setName("user_name")
                    .setType("STRING")
                    .setMode("REQUIRED"),
                new TableFieldSchema()
                    .setName("age")
                    .setType("INT64") // Defaults to NULLABLE
            )
        );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to a new or existing BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withSchema(schema)
            .withMethod(Write.Method.STORAGE_WRITE_API)
        );
    pipeline.run().waitUntilFinish();
  }
}

Transmitir datos a BigQuery

En el siguiente ejemplo se muestra cómo transmitir datos con semántica de entrega única exacta. Para ello, se define el modo de escritura como STORAGE_WRITE_API

No todos los flujos de procesamiento requieren semántica de exactamente una vez. Por ejemplo, puede eliminar manualmente los duplicados de la tabla de destino. Si en tu caso es aceptable que haya registros duplicados, puedes usar la semántica de al menos una vez configurando el método de escritura en STORAGE_API_AT_LEAST_ONCE. Este método suele ser más eficiente y da como resultado una latencia menor en la mayoría de las canalizaciones.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class BigQueryStreamExactlyOnce {
  // Create a PTransform that sends simulated streaming data. In a real application, the data
  // source would be an external source, such as Pub/Sub.
  private static TestStream<String> createEventSource() {
    Instant startTime = new Instant(0);
    return TestStream.create(StringUtf8Coder.of())
        .advanceWatermarkTo(startTime)
        .addElements(
            TimestampedValue.of("Alice,20", startTime),
            TimestampedValue.of("Bob,30",
                startTime.plus(Duration.standardSeconds(1))),
            TimestampedValue.of("Charles,40",
                startTime.plus(Duration.standardSeconds(2))),
            TimestampedValue.of("Dylan,Invalid value",
                startTime.plus(Duration.standardSeconds(2))))
        .advanceWatermarkToInfinity();
  }

  public static PipelineResult main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);
    options.setStreaming(true);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Add a streaming data source.
        .apply(createEventSource())
        // Map the event data into TableRow objects.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((String x) -> {
              String[] columns = x.split(",");
              return new TableRow().set("user_name", columns[0]).set("age", columns[1]);
            }))
        // Write the rows to BigQuery
        .apply(BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API)
            // For exactly-once processing, set the triggering frequency.
            .withTriggeringFrequency(Duration.standardSeconds(5)))
        // Get the collection of write errors.
        .getFailedStorageApiInserts()
        .apply(MapElements.into(TypeDescriptors.strings())
            // Process each error. In production systems, it's useful to write the errors to
            // another destination, such as a dead-letter table or queue.
            .via(
                x -> {
                  System.out.println("Failed insert: " + x.getErrorMessage());
                  System.out.println("Row: " + x.getRow());
                  return "";
                }));
    return pipeline.run();
  }
}

Siguientes pasos