La canalización de texto de Cloud Storage a BigQuery es una canalización de transmisión que transmite archivos de texto almacenados en Cloud Storage, los transforma con una función definida por el usuario (UDF) de JavaScript que proporcionas y adjunta el resultado a BigQuery.
La canalización se ejecuta de forma indefinida, y deberás detenerla de forma manual a través de una cancelación y no una desviación, debido a su uso de la transformación Watch
, que es una DoFn
divisible que no admite el desvío.
Requisitos de la canalización
- Crea un archivo JSON que describa el esquema de tu tabla de salida en BigQuery.
Asegúrate de que haya un array JSON de nivel superior titulado
fields
y que su contenido siga el patrón{"name": "COLUMN_NAME", "type": "DATA_TYPE"}
. Por ejemplo:{ "fields": [ { "name": "name", "type": "STRING" }, { "name": "age", "type": "INTEGER" } ] }
- Crea un archivo JavaScript (
.js
) con tu función UDF que proporciona la lógica para transformar las líneas de texto. La función debe mostrar una string JSON.En el siguiente ejemplo, se divide cada línea de un archivo CSV, se crea un objeto JSON con los valores y se muestra una string JSON:
function process(inJson) { val = inJson.split(","); const obj = { "name": val[0], "age": parseInt(val[1]) }; return JSON.stringify(obj); }
Parámetros de la plantilla
Parámetros obligatorios
- inputFilePattern: La ruta de acceso gs:// al texto en Cloud Storage que deseas procesar. Por ejemplo,
gs://your-bucket/your-file.txt
- JSONPath: Es la ruta de acceso gs:// al archivo JSON que define tu esquema de BigQuery, almacenado en Cloud Storage. Por ejemplo,
gs://your-bucket/your-schema.json
- outputTable: La ubicación de la tabla de BigQuery que se usará para almacenar los datos procesados. Si vuelves a usar una tabla existente, se reemplaza. Por ejemplo,
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
- javascriptTextTransformGcsPath: El URI de Cloud Storage del archivo
.js
que define la función definida por el usuario (UDF) de JavaScript que deseas usar. Por ejemplo,gs://your-bucket/your-transforms/*.js
. - javascriptTextTransformFunctionName: Es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar. Por ejemplo, si el código de tu función de JavaScript es
myTransform(inJson) { /*...do stuff...*/ }
, el nombre de la función esmyTransform
. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). Por ejemplo,transform_udf1
. - bigQueryLoadingTemporaryDirectory: Es el directorio temporal para el proceso de carga de BigQuery. Por ejemplo,
gs://your-bucket/your-files/temp-dir
Parámetros opcionales
- outputDeadletterTable: Es la tabla de mensajes que no llegaron a la tabla de resultados. Si una tabla no existe, se crea durante la ejecución de la canalización. Si no se especifica, se usa
<outputTableSpec>_error_records
. Por ejemplo,<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
. - useStorageWriteApiAtLeastOnce: Este parámetro solo se aplica si
Use BigQuery Storage Write API
está habilitado. Si se habilita, se usará la semántica de “al menos una vez” para la API de Storage Write; de lo contrario, se usará la semántica de “exactamente una vez”. La configuración predeterminada es "false". - useStorageWriteApi: Si es verdadero, la canalización usa la API de BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). El valor predeterminado es
false
. Para obtener más información, consulta Usa la API de Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams: Cuando usas la API de Storage Write, se especifica la cantidad de transmisiones de escritura. Si
useStorageWriteApi
estrue
yuseStorageWriteApiAtLeastOnce
esfalse
, debes configurar este parámetro. La configuración predeterminada es 0. - storageWriteApiTriggeringFrequencySec: Cuando se usa la API de Storage Write, se especifica la frecuencia de activación en segundos. Si
useStorageWriteApi
estrue
yuseStorageWriteApiAtLeastOnce
esfalse
, debes configurar este parámetro. - pythonExternalTextTransformGcsPath: El patrón de ruta de acceso de Cloud Storage para el código de Python que contiene las funciones definidas por el usuario. Por ejemplo,
gs://your-bucket/your-function.py
- javascriptTextTransformReloadIntervalMinutes: Especifica la frecuencia en minutos con la que se debe volver a cargar la UDF. Si el valor es mayor que 0, Dataflow comprueba de forma periódica el archivo de UDF en Cloud Storage y vuelve a cargar la UDF si el archivo se modifica. Este parámetro te permite actualizar la UDF mientras se ejecuta la canalización, sin necesidad de reiniciar el trabajo. Si el valor es
0
, se inhabilita la carga de UDF. El valor predeterminado es0
.
Función definida por el usuario
Esta plantilla requiere una UDF que analice los archivos de entrada, como se describe en Requisitos de canalización. La plantilla llama a la UDF para cada línea de texto en cada archivo de entrada. Para obtener más información sobre cómo crear UDF, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.
Especificación de la función
La UDF tiene la siguiente especificación:
- Entrada: una sola línea de texto de un archivo de entrada.
- Resultado: Una cadena JSON que coincide con el esquema de la tabla de destino de BigQuery.
Ejecuta la plantilla
Console
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Cloud Storage Text to BigQuery (Stream) template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ inputFilePattern=PATH_TO_TEXT_DATA,\ outputTable=BIGQUERY_TABLE,\ outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
STAGING_LOCATION
: la ubicación para los archivos locales de etapa de pruebas (por ejemplo,gs://your-bucket/staging
).JAVASCRIPT_FUNCTION
es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.Por ejemplo, si el código de tu función de JavaScript es
myTransform(inJson) { /*...do stuff...*/ }
, el nombre de la función esmyTransform
. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.PATH_TO_BIGQUERY_SCHEMA_JSON
: Es la ruta de acceso de Cloud Storage al archivo JSON que contiene la definición de esquema.PATH_TO_JAVASCRIPT_UDF_FILE
: El URI de Cloud Storage de.js
archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
PATH_TO_TEXT_DATA
: Es la ruta de acceso de Cloud Storage al conjunto de datos de texto.BIGQUERY_TABLE
: Es el nombre de la tabla de BigQuery.BIGQUERY_UNPROCESSED_TABLE
: Es el nombre de la tabla de BigQuery para los mensajes no procesados.PATH_TO_TEMP_DIR_ON_GCS
: Es la ruta de acceso de Cloud Storage al directorio temporal.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "inputFilePattern":"PATH_TO_TEXT_DATA", "outputTable":"BIGQUERY_TABLE", "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex", } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
STAGING_LOCATION
: la ubicación para los archivos locales de etapa de pruebas (por ejemplo,gs://your-bucket/staging
).JAVASCRIPT_FUNCTION
es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.Por ejemplo, si el código de tu función de JavaScript es
myTransform(inJson) { /*...do stuff...*/ }
, el nombre de la función esmyTransform
. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.PATH_TO_BIGQUERY_SCHEMA_JSON
: Es la ruta de acceso de Cloud Storage al archivo JSON que contiene la definición de esquema.PATH_TO_JAVASCRIPT_UDF_FILE
: El URI de Cloud Storage de.js
archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
PATH_TO_TEXT_DATA
: Es la ruta de acceso de Cloud Storage al conjunto de datos de texto.BIGQUERY_TABLE
: Es el nombre de la tabla de BigQuery.BIGQUERY_UNPROCESSED_TABLE
: Es el nombre de la tabla de BigQuery para los mensajes no procesados.PATH_TO_TEMP_DIR_ON_GCS
: Es la ruta de acceso de Cloud Storage al directorio temporal.
¿Qué sigue?
- Obtén información sobre las plantillas de Dataflow.
- Consulta la lista de plantillas que proporciona Google.