La plantilla de proto de Pub/Sub a BigQuery es una canalización de transmisión que transfiere
datos de proto desde una suscripción a Pub/Sub hacia una tabla de BigQuery.
Cualquier error que
ocurra mientras se escribe en la tabla de BigQuery se transmite a un tema de
Pub/Sub sin procesar.
Se puede proporcionar una función definida por el usuario (UDF) de JavaScript para transformar los datos. Los errores mientras se ejecuta la UDF se pueden enviar a un tema de Pub/Sub separado o al mismo tema sin procesar que los errores de BigQuery.
Requisitos de la canalización
- La suscripción de entrada de Pub/Sub debe existir.
- El archivo de esquema de los registros proto debe existir en Cloud Storage.
- El tema de Pub/Sub de salida debe existir.
- El conjunto de datos de salida de BigQuery debe existir.
- Si la tabla de BigQuery existe, debe tener un esquema que coincida con los datos del proto, sin importar el valor
createDisposition
.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
protoSchemaPath |
La ubicación de Cloud Storage del archivo de esquema proto autónomo. Por ejemplo, gs://path/to/my/file.pb .
Este archivo se puede generar con la marca --descriptor_set_out del comando protoc .
La marca --include_imports garantiza que el archivo sea autónomo. |
fullMessageName |
El nombre completo del mensaje proto. Por ejemplo, package.name.MessageName , en el que package.name es el valor proporcionado para la declaración package y no la declaración java_package . |
inputSubscription |
Suscripción de entrada de Pub/Sub desde la que se desea leer. Por ejemplo, projects/<project>/subscriptions/<subscription> . |
outputTopic |
El tema de Pub/Sub que se usará para registros no procesados. Por ejemplo, projects/<project-id>/topics/<topic-name> . |
outputTableSpec |
Ubicación de la tabla de salida de BigQuery. Por ejemplo, my-project:my_dataset.my_table .
Según la createDisposition especificada, la tabla de salida se puede crear de forma automática mediante el archivo de esquema de entrada. |
preserveProtoFieldNames |
Opcional: true para conservar el nombre del campo Proto original en JSON. false para usar nombres JSON más estándar.
Por ejemplo, false cambiaría field_name a fieldName . (Default: false ) |
bigQueryTableSchemaPath |
Opcional: Ruta de Cloud Storage a la ruta del esquema de BigQuery. Por ejemplo, gs://path/to/my/schema.json . Si no se proporciona, entonces el esquema se infiere a partir del esquema Proto. |
javascriptTextTransformGcsPath |
Opcional:
El URI de Cloud Storage del archivo .js que define la función definida por el usuario
(UDF) de JavaScript que deseas usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
Opcional:
El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.
Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ } , el nombre de la función es myTransform . Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
|
javascriptTextTransformReloadIntervalMinutes |
Opcional: Especifica la frecuencia en minutos con la que se debe volver a cargar la UDF. Si el valor es mayor que 0, Dataflow verifica de forma periódica el archivo de UDF en Cloud Storage y vuelve a cargar la UDF si el archivo se modifica. Este parámetro te permite actualizar la UDF mientras se ejecuta la canalización, sin necesidad de reiniciar el trabajo. Si el valor es 0, se inhabilita la carga de UDF. El valor predeterminado es 0. |
udfOutputTopic |
Opcional: El tema de Pub/Sub que almacena los errores de las UDF. Por ejemplo, projects/<project-id>/topics/<topic-name> . Si no se proporciona, los errores de UDF se envían al mismo tema que outputTopic . |
writeDisposition |
Opcional: El elemento WriteDisposition de BigQuery.
Por ejemplo, WRITE_APPEND , WRITE_EMPTY o WRITE_TRUNCATE . Valor predeterminado: WRITE_APPEND . |
createDisposition |
Opcional: El elemento CreateDisposition de BigQuery.
Por ejemplo: CREATE_IF_NEEDED , CREATE_NEVER . Configuración predeterminada: CREATE_IF_NEEDED . |
useStorageWriteApi |
Opcional:
Si es true , la canalización usa la
API de BigQuery Storage Write. El valor predeterminado es false . Para obtener más información, consulta
Usa la API de BigQuery Storage Write.
|
useStorageWriteApiAtLeastOnce |
Opcional:
Cuando usas la API de Storage Write, se especifica la semántica de escritura (opcional). Para usar una
semántica de al menos una vez, establece este parámetro en true . Para usar una semántica de una y solo una vez, configura el parámetro en false . Este parámetro se aplica solo cuando useStorageWriteApi es true . El valor predeterminado es false .
|
numStorageWriteApiStreams |
Opcional: Cuando usas la API de Storage Write, se especifica la cantidad de transmisiones de escritura. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false , debes configurar este parámetro.
|
storageWriteApiTriggeringFrequencySec |
Opcional: Cuando se usa la API de Storage Write, se especifica la frecuencia de activación en segundos. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false , debes configurar este parámetro.
|
Función definida por el usuario
Para extender esta plantilla, puedes escribir una función definida por el usuario (UDF). La plantilla llama a la UDF para cada elemento de entrada. Las cargas útiles de elementos se serializan como cadenas JSON. Para obtener más información, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.
Especificación de la función
La UDF tiene la siguiente especificación:
Ejecuta la plantilla
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Proto to BigQuery template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex \ --parameters \ schemaPath=SCHEMA_PATH,\ fullMessageName=PROTO_MESSAGE_NAME,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=UNPROCESSED_TOPIC
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: Es la ruta de acceso de Cloud Storage al archivo de esquema de Proto (por ejemplo,gs://MyBucket/file.pb
).PROTO_MESSAGE_NAME
: Es el nombre del mensaje Proto (por ejemplo,package.name.MessageName
).SUBSCRIPTION_NAME
: Es el nombre de la suscripción de entrada de Pub/Sub.BIGQUERY_TABLE
: Es el nombre de la tabla de salida de BigQuery.UNPROCESSED_TOPIC
: Es el tema de Pub/Sub que se usará para la cola no procesada.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex", "parameters": { "schemaPath": "SCHEMA_PATH", "fullMessageName": "PROTO_MESSAGE_NAME", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "UNPROCESSED_TOPIC" } } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: Es la ruta de acceso de Cloud Storage al archivo de esquema de Proto (por ejemplo,gs://MyBucket/file.pb
).PROTO_MESSAGE_NAME
: Es el nombre del mensaje Proto (por ejemplo,package.name.MessageName
).SUBSCRIPTION_NAME
: Es el nombre de la suscripción de entrada de Pub/Sub.BIGQUERY_TABLE
: Es el nombre de la tabla de salida de BigQuery.UNPROCESSED_TOPIC
: Es el tema de Pub/Sub que se usará para la cola no procesada.
¿Qué sigue?
- Obtén información sobre las plantillas de Dataflow.
- Consulta la lista de plantillas que proporciona Google.