El flujo de procesamiento de texto de Cloud Storage a BigQuery es un flujo de procesamiento en streaming que transmite archivos de texto almacenados en Cloud Storage, los transforma mediante una función de Python definida por el usuario (UDF) que proporcionas y añade el resultado a BigQuery.
La canalización se ejecuta indefinidamente y debe finalizarse manualmente mediante una cancelación y no un vaciado, debido a que usa la transformación Watch
, que es un DoFn
divisible que no admite el vaciado.
Requisitos del flujo de procesamiento
- Crea un archivo JSON que describa el esquema de la tabla de salida en BigQuery.
Asegúrate de que haya una matriz JSON de nivel superior que se llame
fields
y cuyo contenido siga este patrón:{"name": "COLUMN_NAME", "type": "DATA_TYPE"}
. Por ejemplo:{ "fields": [ { "name": "name", "type": "STRING" }, { "name": "age", "type": "INTEGER" } ] }
- Crea un archivo de Python (
.py
) con una función UDF que ofrezca la lógica necesaria para transformar las líneas de texto. Tu función debe devolver una cadena JSON.En el siguiente ejemplo, se divide cada línea de un archivo CSV, se crea un objeto JSON con los valores y se devuelve una cadena JSON:
import json def process(value): data = value.split(',') obj = { 'name': data[0], 'age': int(data[1]) } return json.dumps(obj)
Parámetros de plantilla
Parámetros obligatorios
- inputFilePattern: la ruta gs:// del texto de Cloud Storage que quieras procesar. Por ejemplo,
gs://your-bucket/your-file.txt
. - JSONPath la ruta gs:// al archivo JSON que define tu esquema de BigQuery, almacenado en Cloud Storage. Por ejemplo,
gs://your-bucket/your-schema.json
. - outputTable la ubicación de la tabla de BigQuery que se usará para almacenar los datos procesados. Si reutilizas una tabla, se sobrescribirá. Por ejemplo,
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
. - bigQueryLoadingTemporaryDirectory directorio temporal para el proceso de carga de BigQuery. Por ejemplo,
gs://your-bucket/your-files/temp-dir
.
Parámetros opcionales
- outputDeadletterTable tabla de mensajes que no se han podido enviar a la tabla de salida. Si una tabla no existe, se crea durante la ejecución de la canalización. Si no se especifica, se usa
<outputTableSpec>_error_records
. Por ejemplo,<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
. - useStorageWriteApiAtLeastOnce este parámetro solo tiene efecto si
Use BigQuery Storage Write API
está habilitado. Si está habilitada, se usará la semántica "al menos una vez" para la API Storage Write. De lo contrario, se usará la semántica "exactamente una vez". Valor predeterminado: false. - useStorageWriteApi si es true, la canalización usa la API Storage Write de BigQuery (https://cloud.google.com/bigquery/docs/write-api). El valor predeterminado es
false
. Para obtener más información, consulta el artículo sobre cómo usar la API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams cuando se usa la API Storage Write, especifica el número de flujos de escritura. Si
useStorageWriteApi
estrue
yuseStorageWriteApiAtLeastOnce
esfalse
, debe definir este parámetro. El valor predeterminado es 0. - storageWriteApiTriggeringFrequencySec cuando se usa la API Storage Write, especifica la frecuencia de activación en segundos. Si
useStorageWriteApi
estrue
yuseStorageWriteApiAtLeastOnce
esfalse
, debe definir este parámetro. - pythonExternalTextTransformGcsPath patrón de ruta de Cloud Storage del código de Python que contiene las funciones definidas por el usuario. Por ejemplo,
gs://your-bucket/your-function.py
. - pythonExternalTextTransformFunctionName nombre de la función que se va a llamar desde el archivo Python. Usa solo letras, dígitos y guiones bajos. Por ejemplo,
'transform' or 'transform_udf1'
.
Función definida por el usuario
Esta plantilla requiere una función definida por el usuario (UDF) que analice los archivos de entrada, tal como se describe en la sección Requisitos de la canalización. La plantilla llama a la función definida por el usuario para cada línea de texto de cada archivo de entrada. Para obtener más información sobre cómo crear funciones definidas por el usuario, consulta el artículo Crear funciones definidas por el usuario para plantillas de Dataflow.
Especificación de la función
La función definida por el usuario tiene las siguientes especificaciones:
- Entrada: una sola línea de texto de un archivo de entrada.
- Salida: una cadena JSON que coincide con el esquema de la tabla de destino de BigQuery.
Ejecutar la plantilla
Consola
- Ve a la página Crear tarea a partir de plantilla de Dataflow. Ir a Crear tarea a partir de plantilla
- En el campo Nombre de la tarea, introduce un nombre único.
- Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es
us-central1
.Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de flujo de datos, seleccione the Cloud Storage Text to BigQuery (Stream) with Python UDF template.
- En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
- Haz clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\ pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\ JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ inputFilePattern=PATH_TO_TEXT_DATA,\ outputTable=BIGQUERY_TABLE,\ outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
Haz los cambios siguientes:
JOB_NAME
: un nombre de trabajo único que elijasREGION_NAME
: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo,us-central1
VERSION
: la versión de la plantilla que quieres usarPuedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
STAGING_LOCATION
: la ubicación de los archivos locales de almacenamiento provisional (por ejemplo,gs://your-bucket/staging
)PYTHON_FUNCTION
: Nombre de la función definida por el usuario (UDF) de Python que quieras usar.PATH_TO_BIGQUERY_SCHEMA_JSON
: la ruta de Cloud Storage al archivo JSON que contiene la definición del esquema.PATH_TO_PYTHON_UDF_FILE
: El URI de Cloud Storage del archivo de código Python que define la función definida por el usuario (UDF) que quieres usar. Por ejemplo,gs://my-bucket/my-udfs/my_file.py
.PATH_TO_TEXT_DATA
: la ruta de Cloud Storage a tu conjunto de datos de textoBIGQUERY_TABLE
: nombre de la tabla de BigQueryBIGQUERY_UNPROCESSED_TABLE
: el nombre de la tabla de BigQuery de mensajes sin procesarPATH_TO_TEMP_DIR_ON_GCS
: la ruta de Cloud Storage al directorio temporal
API
Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION", "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON", "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE", "inputFilePattern":"PATH_TO_TEXT_DATA", "outputTable":"BIGQUERY_TABLE", "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang", } }
Haz los cambios siguientes:
PROJECT_ID
: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de DataflowJOB_NAME
: un nombre de trabajo único que elijasLOCATION
: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo,us-central1
VERSION
: la versión de la plantilla que quieres usarPuedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
STAGING_LOCATION
: la ubicación de los archivos locales de almacenamiento provisional (por ejemplo,gs://your-bucket/staging
)PYTHON_FUNCTION
: Nombre de la función definida por el usuario (UDF) de Python que quieras usar.PATH_TO_BIGQUERY_SCHEMA_JSON
: la ruta de Cloud Storage al archivo JSON que contiene la definición del esquema.PATH_TO_PYTHON_UDF_FILE
: El URI de Cloud Storage del archivo de código Python que define la función definida por el usuario (UDF) que quieres usar. Por ejemplo,gs://my-bucket/my-udfs/my_file.py
.PATH_TO_TEXT_DATA
: la ruta de Cloud Storage a tu conjunto de datos de textoBIGQUERY_TABLE
: nombre de la tabla de BigQueryBIGQUERY_UNPROCESSED_TABLE
: el nombre de la tabla de BigQuery de mensajes sin procesarPATH_TO_TEMP_DIR_ON_GCS
: la ruta de Cloud Storage al directorio temporal
Siguientes pasos
- Consulta información sobre las plantillas de Dataflow.
- Consulta la lista de plantillas proporcionadas por Google.