E/S de BigQuery

Los SDK de Dataflow tienen transformaciones incorporadas de lectura y escritura que pueden leer datos de una tabla de Google BigQuery y escribir en ella. Puedes leer una tabla entera que especificas por nombre o puedes leer datos parciales con una string de consulta.

Especifica un nombre de tabla de BigQuery

Para leer o escribir desde una tabla de BigQuery, debes proporcionar un nombre completo de tabla de BigQuery. Un nombre completo de tabla de BigQuery consiste en tres partes:

  • Un ID del proyecto: el ID para tu proyecto de Google Cloud (el valor predeterminado proviene de tu objeto de opciones de canalización)
  • Un ID del conjunto de datos: el ID del conjunto de datos de BigQuery, que es único dentro de un determinado proyecto de Cloud
  • Un ID de tabla: un ID de tabla, que es único dentro de un conjunto de datos determinado

Java

Ten en cuenta que puedes usar BigQueryIO sin proporcionar el nombre de un proyecto. Si se omite, BigQueryIO usa el proyecto predeterminado de tu objeto PipelineOptions.

La API de cliente de BigQuery para Java toma un objeto de tipo TableReference para identificar la tabla de BigQuery de destino. El paquete BigQueryIO del SDK de Dataflow para Java contiene un método auxiliar BigQueryIO.parseTableSpec que puedes usar para construir una TableReference a partir de una String que contenga las tres partes del nombre de tu tabla de BigQuery.

La mayoría de las veces, no necesitarás usar un objeto TableReference explícitamente; los métodos estáticos de fábrica para una transformación BigQueryIO toman el nombre de la tabla como String y, luego, usan parseTableSpec internamente para construir un objeto TableReference a partir del String proporcionado.

Formatos de string de nombre de tabla

Puedes especificar la tabla de BigQuery de destino con una string que contenga uno de los siguientes formatos:

  [project_id]:[dataset_id].[table_id]
  Example: "clouddataflow-readonly:samples.weather_stations"

Java

También puedes omitir project_id. Si omites project_id, Cloud Dataflow usará el ID del proyecto predeterminado de tu objeto de opciones de canalización. En Java, se puede acceder al ID con PipelineOptions.getProject.

  [dataset_id].[table_id]
  Example: "samples.weather_stations"

Esquemas y filas de tablas de BigQuery

Java

Las transformaciones de lectura y escritura de BigQueryIO producen y consumen datos como PCollection de objetos TableRow de BigQuery. TableRow es parte de la API de cliente de BigQuery para Java, en el paquete com.google.api.services.bigquery.model.TableRow.

Además, cuando escribas en BigQuery, deberás proporcionar un objeto TableSchema para los campos que deseas escribir en la tabla de destino. Deberás usar las clases TableSchema y TableFieldSchema de BigQuery. Estas clases se definen en los paquetes com.google.api.services.bigquery.model.TableSchema y com.google.api.services.bigquery.model.TableFieldSchema, respectivamente.

Lee de BigQuery

Java

Para leer de una tabla de BigQuery, debes usar la operación apply con una transformación BigQueryIO.Read. BigQueryIO.Read muestra una PCollection de objetos TableRow de BigQuery, en la que cada elemento en la PCollection representa una sola fila en la tabla.

Puedes leer una tabla de BigQuery completa si proporcionas el nombre de la tabla de BigQuery a BigQueryIO.Read con la operación .from. En el siguiente código de ejemplo, se muestra cómo aplicar la transformación BigQueryIO.Read para leer una tabla de BigQuery completa:

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollection<TableRow> weatherData = p.apply(
    BigQueryIO.Read
         .named("ReadWeatherStations")
         .from("clouddataflow-readonly:samples.weather_stations"));

Si no deseas leer toda la tabla, puedes proporcionar una cadena de consulta a BigQueryIO.Read mediante la operación .fromQuery. En el siguiente código de ejemplo, se muestra cómo leer campos específicos de una tabla de BigQuery con una string de consulta:

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollection<TableRow> weatherData = p.apply(
    BigQueryIO.Read
         .named("ReadYearAndTemp")
         .fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]");

Como alternativa, también puedes usar el dialecto de SQL estándar de BigQuery, como se muestra en el siguiente ejemplo:

PCollection<TableRow> weatherData = p.apply(
BigQueryIO.Read
    .named("ReadYearAndTemp")
    .fromQuery("SELECT year, mean_temp FROM `samples.weather_stations`")
    .usingStandardSql();

Ten en cuenta que los valores de número entero en los objetos TableRow se codifican como String para coincidir con el formato JSON exportado de BigQuery cuando lees de BigQuery.

Cuando aplicas una transformación BigQueryIO.Read en modo por lotes, Dataflow invoca una solicitud de exportación de BigQuery. Ten en cuenta que el uso de Dataflow de esta API está sujeto a las políticas de cuotas y precios de BigQuery.

Escribe en BigQuery

Java

Para escribir en una tabla de BigQuery, debes realizar una operación apply con una transformación BigQueryIO.Write. La operación apply con la transformación debe dirigirse a PCollection<TableRow>.

En general, deberás usar otra transformación, como ParDo, para dar formato a los datos resultantes en una colección de objetos TableRow de BigQuery.

Cuando construyes una transformación BigQueryIO.Write, deberás proporcionar información adicional basada en la tabla de destino. Además del nombre de tabla, deberás proporcionar los siguientes datos:

  • La CreateDisposition de la tabla de destino. CreateDisposition especifica si la tabla de destino debe existir o si puede crearse con la operación de escritura.
  • La WriteDisposition de la tabla de destino. WriteDisposition especifica si los datos que escribas reemplazarán una tabla existente, le anexarán filas o solo escribirán en una tabla vacía.

Además, si tu operación de escritura crea una tabla nueva de BigQuery, debes proporcionar información del esquema de la tabla de destino. En este caso, deberás incluir un objeto TableSchema con tu operación de escritura.

CreateDisposition

La CreateDisposition determina si tu operación de escritura de BigQuery crea una tabla o no en caso de que la tabla de destino no exista. Para especificar la CreateDisposition cuando construyes tu transformación BigQueryIO.Write, debes invocar el método .withCreateDisposition.

CreateDisposition es una enum con los siguientes valores válidos:

  • BigQueryIO.Write.CreateDisposition.CREATE_NEVER: Especifica que nunca debe crearse una tabla. Si la tabla de destino no existe, la operación de escritura falla.
  • BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED: Especifica que, si no existe una tabla, la operación de escritura debe crearla. Si usas este valor, también deberás proporcionar un esquema de tabla con la operación .withSchema. CREATE_IF_NEEDED es el comportamiento predeterminado.

Ten en cuenta que, si especificas CREATE_IF_NEEDED como CreateDisposition y no proporcionas un TableSchema, la transformación puede fallar en el entorno de ejecución con una java.lang.IllegalArgumentException en caso de que la tabla de destino no exista.

WriteDisposition

La WriteDisposition determina cómo se aplica tu operación de escritura de BigQuery a una tabla existente. Para especificar la WriteDisposition cuando construyes tu transformación BigQueryIO.Write, debes invocar el método .withWriteDisposition.

WriteDisposition es una enum con los siguientes valores válidos:

  • BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE: Especifica que la operación de escritura debe reemplazar una tabla existente. Se quitan todas las filas existentes en la tabla de destino y se agregan las filas nuevas.
  • BigQueryIO.Write.WriteDisposition.WRITE_APPEND: Especifica que la operación de escritura debe anexar las filas al final de una tabla existente.
  • BigQueryIO.Write.WriteDisposition.WRITE_EMPTY: Especifica que la operación de escritura debe fallar en el entorno de ejecución si la tabla de destino no está vacía. WRITE_EMPTY es el comportamiento predeterminado.

Cuando uses WRITE_EMPTY como WriteDisposition, ten en cuenta que el acto de verificar si la tabla de destino está vacía puede ocurrir mucho antes de la operación de escritura. Además, esa verificación no garantiza que tu canalización tenga acceso exclusivo a la tabla. Si dos programas que se ejecutan en forma simultánea intentan escribir en la misma tabla de resultado con WriteDisposition como WRITE_EMPTY, ambos pueden tener éxito.

Crea un TableSchema para escribir en una tabla nueva

Si tu operación de escritura de BigQuery crea una tabla nueva, deberás proporcionar información del esquema. Para proporcionar la información del esquema, debes crear un objeto TableSchema. El TableSchema se pasa con la operación .withSchema cuando construyes tu transformación BigQueryIO.Write.

Un objeto TableSchema contiene información sobre cada campo de la tabla mediante objetos de tipo TableFieldSchema. A fin de construir un TableSchema, primero debes compilar una List de los campos de la tabla. Luego, debes pasar la lista con la operación .setFields cuando construyes el TableSchema.

En el siguiente código de ejemplo, se muestra cómo construir un TableSchema para una tabla con dos campos de tipo String:

  List<TableFieldSchema> fields = new ArrayList<>();
  fields.add(new TableFieldSchema().setName("source").setType("STRING"));
  fields.add(new TableFieldSchema().setName("quote").setType("STRING"));
  TableSchema schema = new TableSchema().setFields(fields);

Aplica una transformación BigQueryIO.Write

En el siguiente código de ejemplo, se muestra cómo realizar una operación apply con una transformación BigQueryIO.Write para escribir un PCollection<TableRow> en una tabla de BigQuery. La operación de escritura crea una tabla si es necesario. Si la tabla ya existe, la reemplazará.

  PCollection<TableRow> quotes = ...;

  quotes.apply(BigQueryIO.Write
      .named("Write")
      .to("my-project:output.output_table")
      .withSchema(schema)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
Apache Beam™ es una marca de The Apache Software Foundation o sus afiliados de Estados Unidos y otros países.