La plantilla de Pub/Sub Avro a BigQuery es una canalización de transmisión que transfiere datos de Avro desde una suscripción de Pub/Sub a una tabla de BigQuery. Cualquier error que ocurra mientras se escribe en la tabla de BigQuery se transmite a un tema de Pub/Sub sin procesar.
Requisitos de la canalización
- La suscripción de entrada de Pub/Sub debe existir.
- El archivo de esquema para los registros de Avro debe existir en Cloud Storage.
- El tema de Pub/Sub sin procesar debe existir. ¿
- El conjunto de datos de salida de BigQuery debe existir.
Parámetros de la plantilla
Parámetros obligatorios
- schemaPath: Es la ubicación de Cloud Storage del archivo de esquema de Avro. Por ejemplo,
gs://path/to/my/schema.avsc
- inputSubscription: Es la suscripción de entrada de Pub/Sub desde la que se desea leer. Por ejemplo,
projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_ID>
- outputTableSpec: La ubicación de la tabla de BigQuery en la que se escribirá el resultado. Por ejemplo,
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
. Según elcreateDisposition
especificado, es posible que la tabla de resultados se cree automáticamente con el esquema de Avro proporcionado por el usuario. - outputTopic: Es el tema de Pub/Sub que se usará para los registros no procesados. Por ejemplo,
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
Parámetros opcionales
- useStorageWriteApiAtLeastOnce: Cuando usas la API de Storage Write, se especifica la semántica de escritura. Para usar una semántica de al menos una vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), configura el parámetro en true. Para usar una semántica de una y solo una vez, configura el parámetro en
false
. Este parámetro se aplica solo cuandouseStorageWriteApi
estrue
. El valor predeterminado esfalse
. - writeDisposition: El valor de WriteDisposition de BigQuery (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Por ejemplo,
WRITE_APPEND
,WRITE_EMPTY
oWRITE_TRUNCATE
. La configuración predeterminada esWRITE_APPEND
. - createDisposition: El objeto CreateDisposition de BigQuery (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Por ejemplo,
CREATE_IF_NEEDED
yCREATE_NEVER
. El valor predeterminado esCREATE_IF_NEEDED
. - useStorageWriteApi: Si es verdadero, la canalización usa la API de BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). El valor predeterminado es
false
. Para obtener más información, consulta Usa la API de Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams: Cuando usas la API de Storage Write, se especifica la cantidad de transmisiones de escritura. Si
useStorageWriteApi
estrue
yuseStorageWriteApiAtLeastOnce
esfalse
, debes configurar este parámetro. La configuración predeterminada es 0. - storageWriteApiTriggeringFrequencySec: Cuando se usa la API de Storage Write, se especifica la frecuencia de activación en segundos. Si
useStorageWriteApi
estrue
yuseStorageWriteApiAtLeastOnce
esfalse
, debes configurar este parámetro.
Ejecuta la plantilla
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Avro to BigQuery template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow flex-template runJOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME /VERSION /flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH ,\ inputSubscription=SUBSCRIPTION_NAME ,\ outputTableSpec=BIGQUERY_TABLE ,\ outputTopic=DEADLETTER_TOPIC
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: Es la ruta de acceso de Cloud Storage al archivo de esquema de Avro (por ejemplo,gs://MyBucket/file.avsc
).SUBSCRIPTION_NAME
: Es el nombre de la suscripción de entrada de Pub/Sub.BIGQUERY_TABLE
: Es el nombre de la tabla de salida de BigQuery.DEADLETTER_TOPIC
: Es el tema de Pub/Sub que se usará para la cola no procesada.
Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID /locations/LOCATION /flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME ", "containerSpecGcsPath": "gs://dataflow-templates-LOCATION /VERSION /flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH ", "inputSubscription": "SUBSCRIPTION_NAME ", "outputTableSpec": "BIGQUERY_TABLE ", "outputTopic": "DEADLETTER_TOPIC " } } }
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: Es la ruta de acceso de Cloud Storage al archivo de esquema de Avro (por ejemplo,gs://MyBucket/file.avsc
).SUBSCRIPTION_NAME
: Es el nombre de la suscripción de entrada de Pub/Sub.BIGQUERY_TABLE
: Es el nombre de la tabla de salida de BigQuery.DEADLETTER_TOPIC
: Es el tema de Pub/Sub que se usará para la cola no procesada.
Código fuente de la plantilla
¿Qué sigue?
- Obtén información sobre las plantillas de Dataflow.
- Consulta la lista de plantillas que proporciona Google.