La plantilla de Pub/Sub Avro a BigQuery es una canalización de transmisión que transfiere datos de Avro desde una suscripción de Pub/Sub a una tabla de BigQuery. Cualquier error que ocurra mientras se escribe en la tabla de BigQuery se transmite a un tema de Pub/Sub sin procesar.
Requisitos de la canalización
- La suscripción de entrada de Pub/Sub debe existir.
- El archivo de esquema para los registros de Avro debe existir en Cloud Storage.
- El tema de Pub/Sub sin procesar debe existir. ¿
- El conjunto de datos de salida de BigQuery debe existir.
Parámetros de la plantilla
Parámetros obligatorios
- schemaPath: Es la ubicación de Cloud Storage del archivo de esquema de Avro. Por ejemplo,
gs://path/to/my/schema.avsc
- inputSubscription: Es la suscripción de entrada de Pub/Sub desde la que se desea leer. Por ejemplo,
projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_ID>
- outputTableSpec: La ubicación de la tabla de BigQuery en la que se escribirá el resultado. Por ejemplo,
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
. Según elcreateDisposition
especificado, es posible que la tabla de resultados se cree automáticamente con el esquema de Avro proporcionado por el usuario. - outputTopic: Es el tema de Pub/Sub que se usará para los registros no procesados. Por ejemplo,
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
Parámetros opcionales
- useStorageWriteApiAtLeastOnce: Cuando usas la API de Storage Write, se especifica la semántica de escritura. Para usar una semántica de al menos una vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), configura el parámetro en true. Para usar una semántica de una y solo una vez, configura el parámetro en
false
. Este parámetro se aplica solo cuandouseStorageWriteApi
estrue
. El valor predeterminado esfalse
. - writeDisposition: El valor de WriteDisposition de BigQuery (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Por ejemplo,
WRITE_APPEND
,WRITE_EMPTY
oWRITE_TRUNCATE
. La configuración predeterminada esWRITE_APPEND
. - createDisposition: Es el objeto CreateDisposition de BigQuery (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Por ejemplo,
CREATE_IF_NEEDED
yCREATE_NEVER
. El valor predeterminado esCREATE_IF_NEEDED
. - useStorageWriteApi: Si es verdadero, la canalización usa la API de BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). El valor predeterminado es
false
. Para obtener más información, consulta Usa la API de Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams: Cuando usas la API de Storage Write, se especifica la cantidad de transmisiones de escritura. Si
useStorageWriteApi
estrue
yuseStorageWriteApiAtLeastOnce
esfalse
, debes configurar este parámetro. La configuración predeterminada es 0. - storageWriteApiTriggeringFrequencySec: Cuando se usa la API de Storage Write, se especifica la frecuencia de activación en segundos. Si
useStorageWriteApi
estrue
yuseStorageWriteApiAtLeastOnce
esfalse
, debes configurar este parámetro.
Ejecuta la plantilla
Console
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Avro to BigQuery template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: Es la ruta de acceso de Cloud Storage al archivo de esquema de Avro (por ejemplo,gs://MyBucket/file.avsc
).SUBSCRIPTION_NAME
: Es el nombre de la suscripción de entrada de Pub/Sub.BIGQUERY_TABLE
: Es el nombre de la tabla de salida de BigQuery.DEADLETTER_TOPIC
: Es el tema de Pub/Sub que se usará para la cola no procesada.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: Es la ruta de acceso de Cloud Storage al archivo de esquema de Avro (por ejemplo,gs://MyBucket/file.avsc
).SUBSCRIPTION_NAME
: Es el nombre de la suscripción de entrada de Pub/Sub.BIGQUERY_TABLE
: Es el nombre de la tabla de salida de BigQuery.DEADLETTER_TOPIC
: Es el tema de Pub/Sub que se usará para la cola no procesada.
¿Qué sigue?
- Obtén información sobre las plantillas de Dataflow.
- Consulta la lista de plantillas que proporciona Google.