En este documento se describe cómo escribir datos de Dataflow en BigQuery.
Información general
En la mayoría de los casos prácticos, te recomendamos que uses E/S gestionada para escribir en BigQuery. La E/S gestionada ofrece funciones como las actualizaciones automáticas y una API de configuración coherente. Al escribir en BigQuery, la E/S gestionada elige automáticamente el mejor método de escritura para las tareas por lotes o de streaming.
Si necesitas una optimización del rendimiento más avanzada, te recomendamos que uses el conector BigQueryIO
. Para obtener más información, consulta la sección Usar el conector BigQueryIO
de este documento.
Rendimiento
En la siguiente tabla se muestran las métricas de rendimiento de varias cargas de trabajo. Estas cargas de trabajo se ejecutaron en un e2-standard2
trabajador con el SDK de Apache Beam 2.49.0 para Java. No usaron Runner v2.
100 M de registros | 1 KB | 1 columna | Rendimiento (bytes) | Rendimiento (elementos) |
---|---|---|
Escritura de almacenamiento | 55 MB/s | 54.000 elementos por segundo |
Carga de Avro | 78 MBps | 77.000 elementos por segundo |
Carga de JSON | 54 MBps | 53.000 elementos por segundo |
Estas métricas se basan en sencillas canalizaciones por lotes. Su objetivo es comparar el rendimiento entre conectores de E/S y no representan necesariamente las canalizaciones del mundo real. El rendimiento de las canalizaciones de Dataflow es complejo y depende del tipo de VM, los datos que se procesan, el rendimiento de las fuentes y los receptores externos, y el código de usuario. Las métricas se basan en la ejecución del SDK de Java y no representan las características de rendimiento de otros SDKs de lenguaje. Para obtener más información, consulta Rendimiento de Beam IO.
Usar el conector BigQueryIO
El conector de entrada/salida de BigQuery admite los siguientes métodos para escribir en BigQuery:
STORAGE_WRITE_API
. En este modo, el conector escribe directamente en el almacenamiento de BigQuery mediante la API Storage Write de BigQuery. La API Storage Write combina la ingestión de streaming y la carga por lotes en una sola API de alto rendimiento. Este modo garantiza la semántica de exactamente una vez.STORAGE_API_AT_LEAST_ONCE
. Este modo también usa la API Storage Write, pero proporciona semántica de "al menos una vez". Este modo reduce la latencia de la mayoría de las canalizaciones. Sin embargo, es posible que se produzcan escrituras duplicadas.FILE_LOADS
. En este modo, el conector escribe los datos de entrada en archivos de almacenamiento provisional en Cloud Storage. A continuación, ejecuta un trabajo de carga de BigQuery para cargar los datos en BigQuery. Este modo es el predeterminado para losPCollections
acotados, que se suelen encontrar en las canalizaciones por lotes.STREAMING_INSERTS
. En este modo, el conector usa la API de streaming antigua. Este modo es el predeterminado para losPCollections
sin límites, pero no se recomienda para proyectos nuevos.
Cuando elijas un método de escritura, ten en cuenta lo siguiente:
- En el caso de las tareas de streaming, te recomendamos que uses
STORAGE_WRITE_API
oSTORAGE_API_AT_LEAST_ONCE
, ya que estos modos escriben directamente en el almacenamiento de BigQuery sin usar archivos de almacenamiento provisional intermedios. - Si ejecutas la canalización con el modo de streaming al menos una vez, define el modo de escritura como
STORAGE_API_AT_LEAST_ONCE
. Este ajuste es más eficiente y coincide con la semántica del modo de streaming al menos una vez. - Las cargas de archivos y la API Storage Write tienen diferentes cuotas y límites.
- Las tareas de carga usan el grupo de ranuras de BigQuery compartido o las ranuras reservadas. Para usar las ranuras reservadas, ejecuta la tarea de carga en un proyecto con una asignación de reserva de tipo
PIPELINE
. Las tareas de carga son gratuitas si usas el grupo de ranuras de BigQuery compartido. Sin embargo, BigQuery no garantiza la disponibilidad del grupo compartido. Para obtener más información, consulta la introducción a las reservas.
Paralelismo
En
FILE_LOADS
ySTORAGE_WRITE_API
en las canalizaciones de streaming, el conector fragmenta los datos en varios archivos o flujos. En general, recomendamos llamar awithAutoSharding
para habilitar la fragmentación automática.En el caso de
FILE_LOADS
en las canalizaciones por lotes, el conector escribe datos en archivos particionados, que se cargan en BigQuery en paralelo.En el caso de
STORAGE_WRITE_API
en las canalizaciones por lotes, cada trabajador crea uno o varios flujos para escribir en BigQuery, lo que se determina en función del número total de particiones.En el caso de
STORAGE_API_AT_LEAST_ONCE
, hay un solo flujo de escritura predeterminado. Varios trabajadores añaden contenido a este flujo.
Prácticas recomendadas
La API Storage Write tiene límites de cuota. El conector gestiona estos límites en la mayoría de las canalizaciones. Sin embargo, en algunos casos, se pueden agotar los flujos de la API Storage Write disponibles. Por ejemplo, este problema puede producirse en una canalización que utilice el particionado automático y el autoescalado con un gran número de destinos, sobre todo en tareas de larga duración con cargas de trabajo muy variables. Si se produce este problema, puedes usar
STORAGE_WRITE_API_AT_LEAST_ONCE
, que no lo tiene.Usa las métricas de Google Cloud Platform para monitorizar el uso de la cuota de la API Storage Write.
Cuando se usan cargas de archivos, Avro suele superar a JSON. Para usar Avro, llama a
withAvroFormatFunction
.De forma predeterminada, las tareas de carga se ejecutan en el mismo proyecto que la tarea de Dataflow. Para especificar otro proyecto, llama a
withLoadJobProjectId
.Cuando uses el SDK de Java, considera la posibilidad de crear una clase que represente el esquema de la tabla de BigQuery. A continuación, llama a
useBeamSchema
en tu canalización para convertir automáticamente los tipos deRow
de Apache Beam yTableRow
de BigQuery. Para ver un ejemplo de una clase de esquema, consultaExampleModel.java
.Si carga tablas con esquemas complejos que contienen miles de campos, le recomendamos que llame a
withMaxBytesPerPartition
para definir un tamaño máximo más pequeño para cada tarea de carga.De forma predeterminada,
BigQueryIO
usa ajustes de la API Storage Write que son razonables para la mayoría de las canalizaciones. Sin embargo, si detectas problemas de rendimiento, puedes definir opciones de la canalización para ajustar estos ajustes. Para obtener más información, consulta Ajustar la API Storage Write en la documentación de Apache Beam.
Flujos de procesamiento en streaming
Las siguientes recomendaciones se aplican a las canalizaciones de streaming.
En el caso de las canalizaciones de streaming, te recomendamos que uses la API Storage Write (
STORAGE_WRITE_API
oSTORAGE_API_AT_LEAST_ONCE
).Una canalización de streaming puede usar cargas de archivos, pero este enfoque tiene desventajas:
- Requiere ventanas para escribir los archivos. No puedes usar la ventana global.
- BigQuery carga archivos de la mejor forma posible cuando se usa el grupo de slots compartido. Puede haber un retraso significativo entre el momento en que se escribe un registro y el momento en que está disponible en BigQuery.
- Si se produce un error en una tarea de carga (por ejemplo, debido a datos incorrectos o a un error de coincidencia de esquemas), se produce un error en toda la canalización.
Te recomendamos que uses
STORAGE_WRITE_API_AT_LEAST_ONCE
siempre que sea posible. Puede provocar que se escriban registros duplicados en BigQuery, pero es menos costoso y más escalable queSTORAGE_WRITE_API
.En general, evita usar
STREAMING_INSERTS
. Las inserciones de streaming son más caras que la API Storage Write y no tienen el mismo rendimiento.El particionado de datos puede mejorar el rendimiento de las canalizaciones de streaming. En la mayoría de las pipelines, el particionamiento automático es un buen punto de partida. Sin embargo, puedes ajustar la fragmentación de la siguiente manera:
- En
STORAGE_WRITE_API
, llama awithNumStorageWriteApiStreams
para definir el número de flujos de escritura. - En
FILE_LOADS
, llama awithNumFileShards
para definir el número de fragmentos de archivo.
- En
Si usas inserciones de streaming, te recomendamos que definas
retryTransientErrors
como política de reintentos.
Flujos de procesamiento por lotes
Las siguientes recomendaciones se aplican a las canalizaciones por lotes.
En la mayoría de las grandes canalizaciones de procesamiento por lotes, te recomendamos que pruebes primero
FILE_LOADS
. Una canalización por lotes puede usarSTORAGE_WRITE_API
, pero es probable que supere los límites de cuota a gran escala (más de 1000 vCPUs) o si se ejecutan canalizaciones simultáneas. Apache Beam no limita el número máximo de flujos de escritura de las tareas por lotes, por lo que la tarea acaba alcanzando los límites de la API Storage de BigQuery.STORAGE_WRITE_API
Cuando se usa
FILE_LOADS
, puede agotar el grupo de ranuras de BigQuery compartido o el grupo de ranuras reservadas. Si te encuentras con este tipo de fallo, prueba los siguientes métodos:- Reduce el número máximo de trabajadores o el tamaño de los trabajadores de la tarea.
- Compra más ranuras reservadas.
- Considera usar
STORAGE_WRITE_API
.
Las canalizaciones pequeñas y medianas (menos de 1000 vCPUs) pueden beneficiarse del uso de
STORAGE_WRITE_API
. En el caso de estas tareas más pequeñas, te recomendamos que usesSTORAGE_WRITE_API
si quieres una cola de mensajes fallidos o cuando el grupo de ranuras compartidoFILE_LOADS
no sea suficiente.Si puedes tolerar los datos duplicados, te recomendamos que uses
STORAGE_WRITE_API_AT_LEAST_ONCE
. Este modo puede provocar que se escriban registros duplicados en BigQuery, pero puede ser menos caro que la opciónSTORAGE_WRITE_API
.Los diferentes modos de escritura pueden funcionar de forma distinta en función de las características de tu canalización. Experimenta para encontrar el mejor modo de escritura para tu carga de trabajo.
Gestionar errores a nivel de fila
En esta sección se describe cómo gestionar los errores que pueden producirse a nivel de fila, por ejemplo, debido a datos de entrada mal formados o a incompatibilidades de esquemas.
En el caso de la API Storage Write, las filas que no se pueden escribir se colocan en un PCollection
independiente. Para obtener esta colección, llama a getFailedStorageApiInserts
en el objeto WriteResult
. Para ver un ejemplo de este enfoque, consulta Transmitir datos a BigQuery.
Te recomendamos que envíes los errores a una cola o tabla de mensajes fallidos para procesarlos más adelante. Para obtener más información sobre este patrón, consulta el patrón de mensajes fallidos.BigQueryIO
En el caso de FILE_LOADS
, si se produce un error al cargar los datos, el trabajo de carga falla y la canalización genera una excepción de tiempo de ejecución. Puede ver el error en los registros de Dataflow o consultar el historial de tareas de BigQuery.
El conector de entrada/salida no devuelve información sobre las filas que no se han podido insertar.
Para obtener más información sobre cómo solucionar errores, consulta Errores del conector de BigQuery.
Ejemplos
En los siguientes ejemplos se muestra cómo usar Dataflow para escribir en BigQuery. En estos ejemplos se usa el conector BigQueryIO
.
Escribir en una tabla
En el siguiente ejemplo se crea una canalización por lotes que escribe un PCollection<MyData>
en BigQuery, donde MyData
es un tipo de datos personalizado.
El método BigQueryIO.write()
devuelve un tipo BigQueryIO.Write<T>
, que se usa para configurar la operación de escritura. Para obtener más información, consulta el artículo Escribir en una tabla de la documentación de Apache Beam. En este ejemplo de código se escribe en una tabla (CREATE_NEVER
) y se añaden las nuevas filas a la tabla (WRITE_APPEND
).
Java
Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.
Escribir en una tabla nueva o ya creada
En el siguiente ejemplo se crea una tabla si la tabla de destino no existe. Para ello, se asigna el valor CREATE_IF_NEEDED
a create disposition. Si utiliza esta opción, debe proporcionar un esquema de tabla. El conector usa este esquema si crea una tabla.
Java
Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.
Transmitir datos a BigQuery
En el siguiente ejemplo se muestra cómo transmitir datos con semántica de entrega única exacta. Para ello, se define el modo de escritura como STORAGE_WRITE_API
No todos los flujos de procesamiento requieren semántica de exactamente una vez. Por ejemplo, puede eliminar manualmente los duplicados de la tabla de destino. Si en tu caso es aceptable que haya registros duplicados, puedes usar la semántica de al menos una vez configurando el método de escritura en STORAGE_API_AT_LEAST_ONCE
. Este método suele ser más eficiente y da como resultado una latencia menor en la mayoría de las canalizaciones.
Java
Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.
Siguientes pasos
- Consulta más información sobre E/S gestionada.
- Consulta las prácticas recomendadas para pasar mensajes de Pub/Sub a BigQuery.