BigQuery E/S

Los SDK de Dataflow tienen incorporadas transformaciones de Lectura y Escritura que pueden leer y escribir datos de una tabla de Google BigQuery. Puedes leer una tabla entera que especificas por nombre o puedes leer datos parciales con una string de consulta.

Especifica un nombre de tabla de BigQuery

Para leer o escribir desde una tabla de BigQuery, debes proporcionar un nombre completo de tabla de BigQuery. Un nombre completo de tabla de BigQuery consiste de tres partes:

  • Un ID del proyecto: el ID para tu proyecto de Google Cloud. El valor predeterminado proviene de tu objeto de opciones de canalización.
  • Un ID del conjunto de datos: el ID del conjunto de datos de BigQuery, que es único dentro de un determinado proyecto de Cloud.
  • Un ID de tabla: un ID de tabla, que es único dentro de un conjunto de datos determinado.

Java

Ten en cuenta que puedes usar BigQueryIO sin proporcionar un nombre de proyecto; si se omite, BigQueryIO usa el nombre de proyecto predeterminado de tu objeto PipelineOptions.

La API del cliente de BigQuery para Java toma un objeto del tipo TableReference a fin de identificar la tabla de BigQuery de destino. El paquete de BigQueryIO en el SDK de Dataflow para Java contiene un método de ayuda, BigQueryIO.parseTableSpec, que puedes usar a fin de construir una TableReference de una String que contiene tres partes de tu nombre de tabla de BigQuery.

En general, no necesitarás usar un objeto TableReference de forma explícita. Los métodos de fábrica estáticos para una transformación BigQueryIO toman el nombre de tabla como una String. Luego, usan parseTableSpec de manera interna a fin de construir un objeto TableReference para la String proporcionada.

Formatos de string de nombre de tabla

Puedes especificar la tabla de BigQuery de destino con una string que contenga uno de los siguientes formatos:

  [project_id]:[dataset_id].[table_id]
  Example: "clouddataflow-readonly:samples.weather_stations"

Java

También puedes omitir project_id. Si omites project_id, Cloud Dataflow usará el ID del proyecto predeterminado de tu objeto de opciones de canalización. En Java, se puede acceder al ID con PipelineOptions.getProject.

  [dataset_id].[table_id]
  Example: "samples.weather_stations"

Esquemas y filas de tablas de BigQuery

Java

Las transformaciones de lectura y escritura de BigQueryIO producen y consumen datos como PCollection de objetos TableRow de BigQuery. TableRow es parte de la API del cliente de BigQuery para Java, en el paquete com.google.api.services.bigquery.model.TableRow.

Además, cuando escribas a BigQuery, deberás proporcionar un objeto TableSchema para los campos que quieres escribir en la tabla de destino. Deberás usar las clases TableSchema y TableFieldSchema de BigQuery. Estas clases se definen en los paquetes com.google.api.services.bigquery.model.TableSchema y com.google.api.services.bigquery.model.TableFieldSchema, respectivamente.

Lee desde BigQuery

Java

Para leer desde una tabla de BigQuery, apply una transformación BigQueryIO.Read. BigQueryIO.Read muestra una PCollection de objetos TableRow de BigQuery, en el que cada elemento en la PCollection representa una sola fila en la tabla.

Puedes leer una tabla de BigQuery entera si proporcionas el nombre de la tabla de BigQuery a BigQueryIO.Read con la operación .from. El siguiente código de ejemplo muestra cómo aplicar la transformación BigQueryIO.Read para leer una tabla de BigQuery entera:

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollection<TableRow> weatherData = p.apply(
    BigQueryIO.Read
         .named("ReadWeatherStations")
         .from("clouddataflow-readonly:samples.weather_stations"));

Si no quieres leer la tabla entera, puedes proporcionar una string de consulta a BigQueryIO.Read con la operación .fromQuery. El siguiente código de ejemplo muestra cómo leer campos específicos de una tabla de BigQuery con una string de consulta:

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollection<TableRow> weatherData = p.apply(
    BigQueryIO.Read
         .named("ReadYearAndTemp")
         .fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]");

De manera alternativa, puedes usar el dialecto de SQL estándar de BigQuery, como se muestra en el siguiente ejemplo:

PCollection<TableRow> weatherData = p.apply(
BigQueryIO.Read
    .named("ReadYearAndTemp")
    .fromQuery("SELECT year, mean_temp FROM `samples.weather_stations`")
    .usingStandardSql();

Ten en cuenta que, cuando lees desde BigQuery, los valores enteros en los objetos TableRow están codificados como String para coincidir con el formato de JSON exportado de BigQuery.

Cuando aplicas una transformación BigQueryIO.Read en modo por lotes, Dataflow invoca una solicitud de BigQuery Export. Ten en cuenta que el uso de Dataflow de esta API está sujeto a las políticas de Cuota y Precios de BigQuery.

Escribe a BigQuery

Java

A fin de escribirle a una tabla de BigQuery, apply una transformación BigQueryIO.Write. Deberás apply la transformación a PCollection<TableRow>.

En general, deberás usar otra transformación, como ParDo, para dar formato a tus datos de resultado en una colección de objetos TableRow de BigQuery.

Cuando construyas una transformación BigQueryIO.Write, deberás proporcionar información adicional basada en la tabla de destino. Además del nombre de tabla, deberás proporcionar los siguientes datos:

  • La CreateDisposition de la tabla de destino. CreateDisposition especifica si la tabla de destino debe existir o si puede crearse con la operación de escritura.
  • La WriteDisposition de la tabla de destino. WriteDisposition especifica si los datos que escribas reemplazarán una tabla existente, anexarán filas a una tabla existente o solo escribirán a una tabla vacía.

Además, si tu operación de escritura crea una tabla de BigQuery nueva, debes proporcionar información del esquema de la tabla de destino. En este caso, deberás incluir un objeto TableSchema con tu operación de escritura.

CreateDisposition

La CreateDisposition controla si tu operación de escritura de BigQuery crea una tabla si la tabla de destino no existe. Especificas la CreateDisposition cuando construyes tu transformación BigQueryIO.Write con la invocación del método .withCreateDisposition.

CreateDisposition es una enum con los siguientes valores válidos:

  • BigQueryIO.Write.CreateDisposition.CREATE_NEVER: especifica que una tabla nunca debe crearse. Si la tabla de destino no existe, la operación de escritura falla.
  • BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED: especifica que la operación de escritura debe crear una tabla nueva si no existe una. Si usas este valor, también debes proporcionar un esquema de tabla con la operación .withSchema. CREATE_IF_NEEDED es el comportamiento predeterminado.

Ten en cuenta que si especificas CREATE_IF_NEEDED como la CreateDisposition y no proporcionas un TableSchema, la transformación puede fallar en el entorno de ejecución con una java.lang.IllegalArgumentException si la tabla de destino no existe.

WriteDisposition

La WriteDisposition controla cómo se aplica tu operación de escritura de BigQuery a una tabla existente. Especificas la WriteDisposition cuando construyes tu transformación BigQueryIO.Write con la invocación del método .withWriteDisposition.

WriteDisposition es una enum con los siguientes valores válidos:

  • BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE: especifica que la operación de escritura debe reemplazar una tabla existente. Se quitan todas las filas existentes en la tabla de destino y las filas nuevas se agregan.
  • BigQueryIO.Write.WriteDisposition.WRITE_APPEND: especifica que la operación de escritura debe anexar las filas al final de una tabla existente.
  • BigQueryIO.Write.WriteDisposition.WRITE_EMPTY: especifica que la operación de escritura debe fallar en el entorno de ejecución si la tabla de destino no está vacía. WRITE_EMPTY es el comportamiento predeterminado.

Cuando uses WRITE_EMPTY para la WriteDisposition, ten en cuenta que la verificación a fin de comprobar si la tabla de destino está vacía puede ocurrir mucho antes de la operación de escritura. Además, esa verificación no garantiza que tu canalización tenga acceso exclusivo a la tabla. Si dos programas ejecutados con simultaneidad intentan escribir en la misma tabla de resultado con una WriteDisposition de WRITE_EMPTY, ambas pueden tener éxito.

Crea un TableSchema para escribir a una tabla nueva

Si tu operación de escritura de BigQuery crea una tabla nueva, deberás proporcionar información del esquema. Proporcionas la información del esquema con la creación de un objeto TableSchema. Apruebas el TableSchema con la operación .withSchema cuando construyes tu transformación BigQueryIO.Write.

Un objeto TableSchema contiene información sobre cada campo en la tabla, con objetos del tipo TableFieldSchema. A fin de construir un TableSchema, primero compilas una List de los campos en la tabla. Luego, apruebas la lista con la operación .setFields cuando construyes el TableSchema.

El siguiente código de ejemplo muestra cómo construir un TableSchema para una tabla con dos campos del tipo String:

  List<TableFieldSchema> fields = new ArrayList<>();
  fields.add(new TableFieldSchema().setName("source").setType("STRING"));
  fields.add(new TableFieldSchema().setName("quote").setType("STRING"));
  TableSchema schema = new TableSchema().setFields(fields);

Aplica una transformación BigQueryIO.Write

El siguiente código de ejemplo muestra cómo apply una transformación BigQueryIO.Write para escribir una PCollection<TableRow> a una tabla de BigQuery. La operación de escritura crea una tabla si es necesario. Si la tabla ya existe, la reemplazará.

  PCollection<TableRow> quotes = ...;

  quotes.apply(BigQueryIO.Write
      .named("Write")
      .to("my-project:output.output_table")
      .withSchema(schema)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
Apache Beam™ es una marca registrada de The Apache Software Foundation o sus afiliados en Estados Unidos y otros países.
¿Te ha resultado útil esta página? Enviar comentarios:

Enviar comentarios sobre...

Si necesitas ayuda, visita nuestra página de asistencia.