Leer de BigQuery a Dataflow

En este documento se describe cómo leer datos de BigQuery en Dataflow.

Información general

En la mayoría de los casos prácticos, te recomendamos que uses E/S gestionada para leer datos de BigQuery. La E/S gestionada ofrece funciones como las actualizaciones automáticas y una API de configuración coherente. Al leer datos de BigQuery, la E/gestionada realiza lecturas directas de tablas, lo que ofrece el mejor rendimiento de lectura.

Si necesitas un ajuste de rendimiento más avanzado, te recomendamos que uses el conector BigQueryIO. El conector BigQueryIO admite tanto lecturas directas de tablas como lecturas de tareas de exportación de BigQuery. También ofrece un control más preciso sobre la deserialización de los registros de la tabla. Para obtener más información, consulta la sección Usar el conector BigQueryIO de este documento.

Proyección y filtrado de columnas

Para reducir el volumen de datos que lee tu flujo de procesamiento de BigQuery, puedes usar las siguientes técnicas:

  • La proyección de columnas especifica un subconjunto de columnas que se leerán de la tabla. Utiliza la proyección de columnas cuando tu tabla tenga un gran número de columnas y solo necesites leer un subconjunto de ellas.
  • Filtrado de filas: especifica un predicado que se aplica a la tabla. La operación de lectura de BigQuery solo devuelve las filas que coinciden con el filtro, lo que puede reducir la cantidad total de datos que ingiere la canalización.

En el siguiente ejemplo se leen las columnas "user_name" y "age" de una tabla y se excluyen las filas que no coinciden con el predicado "age > 18". En este ejemplo se usa E/S gestionada.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

import com.google.common.collect.ImmutableMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

public class BigQueryReadWithProjectionAndFiltering {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    String tableSpec = String.format("%s:%s.%s",
        options.getProjectId(),
        options.getDatasetName(),
        options.getTableName());

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", tableSpec)
        .put("row_restriction", "age > 18")
        .put("fields", List.of("user_name", "age"))
        .build();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
        .apply(MapElements
            .into(TypeDescriptors.strings())
            // Access individual fields in the row.
            .via((Row row) -> {
              String output = String.format("Name: %s, Age: %s%n",
                  row.getString("user_name"),
                  row.getInt64("age"));
              System.out.println(output);
              return output;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Leer de un resultado de consulta

En el siguiente ejemplo se usa la E/S gestionada para leer el resultado de una consulta SQL. Ejecuta una consulta en un conjunto de datos público de BigQuery. También puedes usar consultas de SQL para leer datos de una vista o una vista materializada de BigQuery.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

import com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

public class BigQueryReadFromQuery {
  public static void main(String[] args) {
    // The SQL query to run inside BigQuery.
    final String queryString =
        "SELECT repo_name as repo, COUNT(*) as count "
            + "FROM `bigquery-public-data.github_repos.sample_commits` "
            + "GROUP BY repo_name";

    // Parse the pipeline options passed into the application.
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().create();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("query", queryString)
        .build();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
        .apply(MapElements
            .into(TypeDescriptors.strings())
            // Access individual fields in the row.
            .via((Row row) -> {
              String output = String.format("Repo: %s, commits: %d%n",
                  row.getString("repo"),
                  row.getInt64("count"));
              System.out.println(output);
              return output;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Usar el conector BigQueryIO

El conector BigQueryIO admite los siguientes métodos de serialización:

El conector admite dos opciones para leer datos:

  • Exportar trabajo. De forma predeterminada, el conector BigQueryIO ejecuta una tarea de exportación de BigQuery que escribe los datos de la tabla en Cloud Storage. A continuación, el conector lee los datos de Cloud Storage.
  • Lecturas directas de la tabla: Esta opción es más rápida que las tareas de exportación, ya que usa la API Read de BigQuery Storage y se salta el paso de exportación. Para usar lecturas directas de tablas, llama a withMethod(Method.DIRECT_READ) al crear la canalización.

A la hora de elegir qué opción usar, ten en cuenta lo siguiente:

  • Por lo general, recomendamos usar lecturas directas de tablas. La API Storage Read se adapta mejor a las canalizaciones de datos que los trabajos de exportación, ya que no necesita el paso intermedio de exportar datos.

  • Si usas lecturas directas, se te cobrará por el uso de la API Storage Read. Consulta los precios de la extracción de datos en la página de precios de BigQuery.

  • Las tareas de exportación no tienen ningún coste adicional. Sin embargo, los trabajos de exportación tienen límites. Para mover grandes cantidades de datos, cuando la puntualidad es una prioridad y el coste es flexible, se recomienda usar lecturas directas.

  • La API Storage Read tiene límites de cuota. Usa las métricas de Google Cloud Platform para monitorizar el uso de tu cuota.

  • Si usas tareas de exportación, define la --tempLocation opción de canalización para especificar un segmento de Cloud Storage para los archivos exportados.

  • Cuando usas la API Storage Read, es posible que veas errores de vencimiento de la concesión y de tiempo de espera de la sesión en los registros, como los siguientes:

    • DEADLINE_EXCEEDED
    • Server Unresponsive
    • StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session`

    Estos errores pueden producirse cuando una operación tarda más que el tiempo de espera, normalmente en las canalizaciones que se ejecutan durante más de 6 horas. Para mitigar este problema, cambia a las exportaciones de archivos.

  • El grado de paralelismo depende del método de lectura:

    • Lecturas directas: el conector de E/S genera un número dinámico de flujos en función del tamaño de la solicitud de exportación. Lee estas secuencias directamente de BigQuery en paralelo.

    • Tareas de exportación: BigQuery determina cuántos archivos se escriben en Cloud Storage. El número de archivos depende de la consulta y del volumen de datos. El conector de E/S lee los archivos exportados en paralelo.

En la siguiente tabla se muestran las métricas de rendimiento de varias opciones de lectura de E/S de BigQuery. Las cargas de trabajo se ejecutaron en un e2-standard2 trabajador con el SDK de Apache Beam 2.49.0 para Java. No han usado Runner v2.

100 M de registros | 1 KB | 1 columna Rendimiento (bytes) Rendimiento (elementos)
Lectura de almacenamiento 120 MBps 88.000 elementos por segundo
Exportación Avro 105 MBps 78.000 elementos por segundo
Exportación de JSON 110 MB/s 81.000 elementos por segundo

Estas métricas se basan en sencillas canalizaciones por lotes. Su objetivo es comparar el rendimiento entre conectores de E/S y no representan necesariamente las canalizaciones del mundo real. El rendimiento de las canalizaciones de Dataflow es complejo y depende del tipo de VM, los datos que se procesan, el rendimiento de las fuentes y los receptores externos, y el código de usuario. Las métricas se basan en la ejecución del SDK de Java y no representan las características de rendimiento de otros SDKs de lenguaje. Para obtener más información, consulta Rendimiento de Beam IO.

Ejemplos

En los siguientes ejemplos de código se usa el conector BigQueryIO con lecturas directas de tablas. Para usar una tarea de exportación, omite la llamada a withMethod.

Leer registros con formato Avro

En este ejemplo se muestra cómo usar el conector BigQueryIO para leer registros con formato Avro.

Para leer datos de BigQuery en registros con formato Avro, usa el método read(SerializableFunction). Este método toma una función definida por la aplicación que analiza objetos SchemaAndRecord y devuelve un tipo de datos personalizado. La salida del conector es un PCollection de tu tipo de datos personalizado.

El siguiente código lee un PCollection<MyData> de una tabla de BigQuery, donde MyData es una clase definida por la aplicación.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadAvro {

  // A custom datatype to hold a record from the source table.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    // Function to convert Avro records to MyData instances.
    public static class FromSchemaAndRecord
            implements SerializableFunction<SchemaAndRecord, MyData> {
      @Override public MyData apply(SchemaAndRecord elem) {
        MyData data = new MyData();
        GenericRecord record = elem.getRecord();
        data.name = ((Utf8) record.get("user_name")).toString();
        data.age = (Long) record.get("age");
        return data;
      }
    }
  }

  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into Avro records, using an application-defined parsing function.
        .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<MyData>.
        .apply(MapElements
            .into(TypeDescriptor.of(MyData.class))
            .via((MyData x) -> {
              System.out.printf("Name: %s, Age: %d%n", x.name, x.age);
              return x;
            }));
    pipeline.run().waitUntilFinish();
  }
}

El método read toma una interfaz SerializableFunction<SchemaAndRecord, T>, que define una función para convertir registros de Avro en una clase de datos personalizada. En el ejemplo de código anterior, el método MyData.apply implementa esta función de conversión. La función de ejemplo analiza los campos name y age del registro Avro y devuelve una instancia de MyData.

Para especificar qué tabla de BigQuery se debe leer, llama al método from, como se muestra en el ejemplo anterior. Para obtener más información, consulta la sección Nombres de tablas de la documentación del conector de entrada/salida de BigQuery.

Leer objetos TableRow

En este ejemplo se muestra cómo usar el conector BigQueryIO para leer objetos TableRow.

El método readTableRows lee datos de BigQuery y los introduce en un PCollection de objetos TableRow. Cada TableRow es un mapa de pares clave-valor que contiene una sola fila de datos de la tabla. Especifica la tabla de BigQuery que se va a leer llamando al método from.

El siguiente código lee un PCollection<TableRows> de una tabla de BigQuery.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BiqQueryReadTableRows {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(Method.DIRECT_READ)
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = (String) row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

En este ejemplo también se muestra cómo acceder a los valores del diccionario TableRow. Los valores enteros se codifican como cadenas para que coincidan con el formato JSON exportado de BigQuery.

Siguientes pasos