En este documento, se describe cómo escribir datos de Dataflow en BigQuery mediante el conector de E/S de BigQuery de Apache Beam.
El conector de E/S de BigQuery está disponible en el SDK de Apache Beam. Te recomendamos usar la versión más reciente del SDK. Para obtener más información, consulta SDK de Apache Beam 2.x.
También se encuentra disponible la compatibilidad entre lenguajes para Python.
Descripción general
El conector de E/S de BigQuery admite los siguientes métodos para escribir en BigQuery:
STORAGE_WRITE_API
En este modo, el conector realiza operaciones de escritura directas en el almacenamiento de BigQuery mediante la API de escritura de almacenamiento de BigQuery. La API de Storage Write combina la transferencia de transmisión y la carga por lotes en una sola API de alto rendimiento. Este modo garantiza la semántica de exactamente una vez.STORAGE_API_AT_LEAST_ONCE
Este modo también usa la API de Storage Write, pero proporciona una semántica de al menos una vez. Este modo da como resultado una latencia más baja para la mayoría de las canalizaciones. Sin embargo, es posible realizar operaciones de escritura duplicadas.FILE_LOADS
En este modo, el conector escribe los datos de entrada en archivos de etapa de pruebas en Cloud Storage. Luego, ejecuta un trabajo de carga de BigQuery para cargar los datos en BigQuery. El modo es el predeterminado paraPCollections
delimitadas, que se encuentran con mayor frecuencia en las canalizaciones por lotes.STREAMING_INSERTS
En este modo, el conector usa la API de transmisión heredada. Este modo es el predeterminado paraPCollections
no delimitadas, pero no se recomienda para proyectos nuevos.
Cuando elijas un método de escritura, ten en cuenta los siguientes puntos:
- Para las tareas de transmisión, considera usar
STORAGE_WRITE_API
oSTORAGE_API_AT_LEAST_ONCE
, ya que estos modos escriben directamente en el almacenamiento de BigQuery, sin usar archivos almacenados en etapa intermedia. - Si ejecutas la canalización con el modo de transmisión de al menos una vez, establece el modo de escritura en
STORAGE_API_AT_LEAST_ONCE
. Este parámetro de configuración es más eficiente y coincide con la semántica del modo de transmisión de al menos una vez. - Las cargas de archivos y la API de Storage Write tienen diferentes cuotas y límites.
- Los trabajos de carga usan el grupo de ranuras compartido de BigQuery o las ranuras reservadas. Para usar ranuras reservadas, ejecuta el trabajo de carga en un proyecto con una asignación de reserva de tipo
PIPELINE
. Los trabajos de carga son gratuitos si usas el grupo de ranuras compartido de BigQuery. Sin embargo, BigQuery no garantiza la capacidad disponible del grupo compartido. Para obtener más información, consulta Introducción a las reservas.
Paralelismo
Para
FILE_LOADS
ySTORAGE_WRITE_API
en canalizaciones de transmisión, el conector fragmenta los datos en varios archivos o transmisiones. En general, te recomendamos llamar awithAutoSharding
para habilitar la fragmentación automática.Para
FILE_LOADS
en canalizaciones por lotes, el conector escribe datos en archivos particionados, que luego se cargan en BigQuery de forma simultánea.Para
STORAGE_WRITE_API
en canalizaciones por lotes, cada trabajador crea uno o más flujos para escribir en BigQuery, determinado por la cantidad total de fragmentos.Para
STORAGE_API_AT_LEAST_ONCE
, hay un solo flujo de escritura predeterminado. Varios trabajadores se agregan a este flujo.
Rendimiento
En la siguiente tabla, se muestran las métricas de rendimiento de varias opciones de lectura de E/S de BigQuery. Las cargas de trabajo se ejecutaron en un trabajador e2-standard2
, con el SDK de Apache Beam 2.49.0 para Java. No usaron Runner v2.
100 millones de registros | 1 KB | 1 columna | Capacidad de procesamiento (bytes) | Capacidad de procesamiento (elementos) |
---|---|---|
Escritura de almacenamiento | 55 MBps | 54,000 elementos por segundo |
Carga de Avro | 78 MBps | 77,000 elementos por segundo |
Carga de JSON | 54 MBps | 53,000 elementos por segundo |
Estas métricas se basan en canalizaciones por lotes simples. Están diseñadas para comparar el rendimiento entre los conectores de E/S y no representan necesariamente las canalizaciones del mundo real. El rendimiento de la canalización de Dataflow es complejo y es una función del tipo de VM, los datos que se procesan, el rendimiento de las fuentes y los receptores externos y el código de usuario. Las métricas se basan en la ejecución del SDK de Java y no representan las características de rendimiento de otros SDK de lenguaje. Para obtener más información, consulta Rendimiento de E/S de Beam.
Prácticas recomendadas
En esta sección, se describen las prácticas recomendadas para escribir en BigQuery desde Dataflow.
Consideraciones generales
La API de Storage Write tiene límites de cuota. El conector controla estos límites para la mayoría de las canalizaciones. Sin embargo, en algunas situaciones, se pueden agotar los flujos disponibles de la API de Storage Write. Por ejemplo, este problema puede ocurrir en una canalización que usa la fragmentación automática y el ajuste de escala automático con una gran cantidad de destinos, especialmente en trabajos de larga duración con cargas de trabajo muy variables. Si ocurre este problema, considera usar
STORAGE_WRITE_API_AT_LEAST_ONCE
, que evita el problema.Usa las métricas de Google Cloud para supervisar el uso de la cuota de la API de Storage Write.
Cuando se usan cargas de archivos, Avro suele tener un mejor rendimiento que JSON. Para usar Avro, llama a
withAvroFormatFunction
.De forma predeterminada, los trabajos de carga se ejecutan en el mismo proyecto que el trabajo de Dataflow. Para especificar un proyecto diferente, llama a
withLoadJobProjectId
.Cuando uses el SDK de Java, considera crear una clase que represente el esquema de la tabla de BigQuery. Luego, llama a
useBeamSchema
en tu canalización para convertir de forma automática entre los tipos deRow
de Apache Beam y BigQueryTableRow
. Para ver un ejemplo de una clase de esquema, consultaExampleModel.java
.Si cargas tablas con esquemas complejos que contienen miles de campos, considera llamar a
withMaxBytesPerPartition
para establecer un tamaño máximo más pequeño para cada trabajo de carga.
Canalizaciones de transmisión
Las siguientes recomendaciones se aplican a las canalizaciones de transmisión.
Para las canalizaciones de transmisión, recomendamos usar la API de Storage Write (
STORAGE_WRITE_API
oSTORAGE_API_AT_LEAST_ONCE
).Una canalización de transmisión puede usar cargas de archivos, pero este enfoque tiene desventajas:
- Se requiere una renderización de ventanas para escribir los archivos. No puedes usar la ventana global.
- BigQuery carga archivos de la mejor manera posible cuando se usa el grupo de ranuras compartidas. Puede haber una demora significativa entre el momento en que se escribe un registro y el momento en que está disponible en BigQuery.
- Si falla un trabajo de carga, por ejemplo, debido a datos incorrectos o a una discrepancia de esquema, falla toda la canalización.
Considera usar
STORAGE_WRITE_API_AT_LEAST_ONCE
siempre que sea posible. Puede generar que se escriban registros duplicados en BigQuery, pero es menos costoso y más escalable queSTORAGE_WRITE_API
.En general, evita usar
STREAMING_INSERTS
. Las inserciones de transmisión son más costosas que la API de Storage Write y no tienen un buen rendimiento.La fragmentación de datos puede mejorar el rendimiento en las canalizaciones de transmisión. Para la mayoría de las canalización, la fragmentación automática es un buen punto de partida. Sin embargo, puedes ajustar la fragmentación de la siguiente manera:
- Para
STORAGE_WRITE_API
, llama awithNumStorageWriteApiStreams
para establecer la cantidad de transmisiones de escritura. - Para
FILE_LOADS
, llama awithNumFileShards
para establecer la cantidad de fragmentos de archivos.
- Para
Si usas inserciones de transmisión, te recomendamos que configures
retryTransientErrors
como la política de reintento.
Canalizaciones por lotes
Las siguientes recomendaciones se aplican a las canalizaciones por lotes.
Para la mayoría de las canalizaciones por lotes grandes, te recomendamos que primero pruebes
FILE_LOADS
. Una canalización por lotes puede usarSTORAGE_WRITE_API
, pero es probable que exceda los límites de cuota a gran escala (más de 1,000 CPU virtuales) o si se ejecutan canalizaciones simultáneas. Apache Beam no limita la cantidad máxima de transmisiones de escritura para los trabajos por lotesSTORAGE_WRITE_API
, por lo que el trabajo finalmente alcanza los límites de la API de almacenamiento de BigQuery.Cuando usas
FILE_LOADS
, es posible que agotes el grupo de ranuras compartidas de BigQuery o el grupo de ranuras reservadas. Si te encuentras con este tipo de falla, prueba los siguientes enfoques:- Reduce la cantidad máxima de trabajadores o el tamaño de los trabajadores para el trabajo.
- Compra más ranuras reservadas
- Considera usar
STORAGE_WRITE_API
.
Las canalizaciones pequeñas a medianas (menos de 1,000 CPU virtuales) podrían beneficiarse de usar
STORAGE_WRITE_API
. Para estos trabajos más pequeños, considera usarSTORAGE_WRITE_API
si deseas una cola de mensajes no entregados o cuando el grupo de ranuras compartidasFILE_LOADS
no sea suficiente.Si puedes tolerar datos duplicados, considera usar
STORAGE_WRITE_API_AT_LEAST_ONCE
. Este modo puede provocar que se escriban registros duplicados en BigQuery, pero podría ser menos costoso que la opciónSTORAGE_WRITE_API
.Los diferentes modos de escritura pueden tener un rendimiento diferente según las características de tu canalización. Experimenta para encontrar el mejor modo de escritura para tu carga de trabajo.
Maneja errores a nivel de fila
En esta sección, se describe cómo manejar errores que pueden ocurrir a nivel de fila, por ejemplo, debido a datos de entrada con formato incorrecto o a discrepancias de esquema.
En la API de Storage Write, las filas que no se pueden escribir se colocan en un PCollection
separado. Para obtener esta colección, llama a getFailedStorageApiInserts
en el objeto WriteResult
. Para ver un ejemplo de este enfoque, consulta Cómo transmitir datos a BigQuery.
Se recomienda enviar los errores a una cola o tabla de mensajes no entregados para su procesamiento posterior. Para obtener más información sobre este patrón, consulta Patrón de mensajes no entregados BigQueryIO
.
En FILE_LOADS
, si se produce un error mientras se cargan los datos, el trabajo de carga falla y la canalización genera una excepción de entorno de ejecución. Puedes ver el error en los registros de Dataflow o ver el historial de trabajos de BigQuery.
El conector de E/S no muestra información sobre las filas con errores individuales.
Si deseas obtener más información para solucionar errores, consulta Errores del conector de BigQuery.
Ejemplos
En los siguientes ejemplos, se muestra cómo usar Dataflow para escribir en BigQuery.
Escribir en una tabla existente
En el siguiente ejemplo, se crea una canalización por lotes que escribe una PCollection<MyData>
en BigQuery, en la que MyData
es un tipo de datos personalizado.
El método BigQueryIO.write()
muestra un tipo BigQueryIO.Write<T>
, que se usa para configurar la operación de escritura. Para obtener más información, consulta Escribe en una tabla en la documentación de Apache Beam. En este ejemplo de código, se escribe en una tabla existente (CREATE_NEVER
) y se agregan las filas nuevas a la tabla (WRITE_APPEND
).
Java
Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.
Escribe en una tabla nueva o existente
En el siguiente ejemplo, se crea una tabla nueva si la tabla de destino no existe mediante la configuración de la disposición de creación como CREATE_IF_NEEDED
. Cuando usas esta opción, debes proporcionar un esquema de tabla. El conector usa este esquema si crea una tabla nueva.
Java
Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.
Transmite datos a BigQuery
En el siguiente ejemplo, se muestra cómo transmitir datos mediante la semántica de exactamente una vez mediante la configuración del modo de escritura en STORAGE_WRITE_API
:
No todas las canalizaciones de transmisión requieren una semántica de exactamente una vez. Por ejemplo, puedes quitar los duplicados de forma manual de la tabla de destino. Si la posibilidad de tener registros duplicados es aceptable en tu caso, considera usar una semántica de al menos una vez mediante la configuración del método de escritura en STORAGE_API_AT_LEAST_ONCE
. Por lo general, este método es más eficiente y da como resultado una latencia más baja para la mayoría de las canalizaciones.
Java
Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.
¿Qué sigue?
- Obtén más información sobre el conector de E/S de BigQuery en la documentación de Apache Beam.
- Lee sobre la transmisión de datos a BigQuery mediante la API de Storage Write (entrada de blog).