El flujo de procesamiento de texto de Cloud Storage a BigQuery con UDF de Python es un flujo de procesamiento por lotes que lee archivos de texto almacenados en Cloud Storage, los transforma mediante una función definida por el usuario (UDF) de Python y añade el resultado a una tabla de BigQuery.
Requisitos del flujo de procesamiento
- Crea un archivo JSON que describa tu esquema de BigQuery.
Asegúrate de que haya una matriz JSON de nivel superior que se llame
BigQuery Schema
y cuyo contenido siga este patrón:{"name": "COLUMN_NAME", "type": "DATA_TYPE"}
.La plantilla de lote de texto de Cloud Storage a BigQuery no admite la importación de datos en campos
STRUCT
(Record) de la tabla de BigQuery de destino.El siguiente JSON describe un ejemplo de esquema de BigQuery:
{ "BigQuery Schema": [ { "name": "name", "type": "STRING" }, { "name": "age", "type": "INTEGER" }, ] }
- Crea un archivo de Python (
.py
) con una función UDF que ofrezca la lógica necesaria para transformar las líneas de texto. Tu función debe devolver una cadena JSON.Por ejemplo, esta función divide cada línea de un archivo CSV y devuelve una cadena JSON después de transformar los valores.
import json def process(value): data = value.split(',') obj = { 'name': data[0], 'age': int(data[1]) } return json.dumps(obj)
Parámetros de plantilla
Parámetro | Descripción |
---|---|
JSONPath |
La ruta gs:// al archivo JSON que define tu esquema de BigQuery, almacenado en Cloud Storage. Por ejemplo, gs://path/to/my/schema.json . |
pythonExternalTextTransformGcsPath |
El URI de Cloud Storage del archivo de código Python que define la función definida por el usuario (UDF) que quieres usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.py .
|
pythonExternalTextTransformFunctionName |
Nombre de la función definida por el usuario (UDF) de Python que quieras usar. |
inputFilePattern |
La ruta gs:// al texto de Cloud Storage que quieras procesar. Por ejemplo, gs://path/to/my/text/data.txt . |
outputTable |
El nombre de la tabla de BigQuery que quieres crear para almacenar los datos procesados.
Si reutilizas una tabla de BigQuery, los datos se añadirán a la tabla de destino.
Por ejemplo, my-project-name:my-dataset.my-table . |
bigQueryLoadingTemporaryDirectory |
Directorio temporal del proceso de carga de BigQuery.
Por ejemplo, gs://my-bucket/my-files/temp_dir . |
useStorageWriteApi |
Opcional:
Si true , la canalización usa la
API Storage Write de BigQuery. El valor predeterminado es false . Para obtener más información, consulta el artículo
Usar la API Storage Write.
|
useStorageWriteApiAtLeastOnce |
Opcional:
cuando se usa la API Storage Write, especifica la semántica de escritura. Para usar la semántica
al menos una vez, asigna el valor true a este parámetro. Para usar la semántica de entrega única, asigna el valor false al parámetro. Este parámetro solo se aplica cuando useStorageWriteApi es true . El valor predeterminado es false .
|
Función definida por el usuario
También puedes ampliar esta plantilla escribiendo una función definida por el usuario (UDF). La plantilla llama a la función definida por el usuario para cada elemento de entrada. Las cargas útiles de los elementos se serializan como cadenas JSON. Para obtener más información, consulta el artículo sobre cómo crear funciones definidas por el usuario para plantillas de Dataflow.
Especificación de la función
La función definida por el usuario tiene las siguientes especificaciones:
- Entrada: una línea de texto de un archivo de entrada de Cloud Storage.
- Salida: una cadena JSON que coincide con el esquema de la tabla de destino de BigQuery.
Ejecutar la plantilla
Consola
- Ve a la página Crear tarea a partir de plantilla de Dataflow. Ir a Crear tarea a partir de plantilla
- En el campo Nombre de la tarea, introduce un nombre único.
- Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es
us-central1
.Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de flujo de datos, seleccione the Text Files on Cloud Storage to BigQuery with Python UDF (Batch) template.
- En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
- Haz clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_Text_to_BigQuery_Xlang \ --region REGION_NAME \ --parameters \ pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\ JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\ inputFilePattern=PATH_TO_TEXT_DATA,\ outputTable=BIGQUERY_TABLE,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
Haz los cambios siguientes:
PROJECT_ID
: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de DataflowJOB_NAME
: un nombre de trabajo único que elijasVERSION
: la versión de la plantilla que quieres usarPuedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo,us-central1
PYTHON_FUNCTION
: Nombre de la función definida por el usuario (UDF) de Python que quieras usar.PATH_TO_BIGQUERY_SCHEMA_JSON
: la ruta de Cloud Storage al archivo JSON que contiene la definición del esquema.PATH_TO_PYTHON_UDF_FILE
: El URI de Cloud Storage del archivo de código Python que define la función definida por el usuario (UDF) que quieres usar. Por ejemplo,gs://my-bucket/my-udfs/my_file.py
.PATH_TO_TEXT_DATA
: la ruta de Cloud Storage a tu conjunto de datos de textoBIGQUERY_TABLE
: nombre de la tabla de BigQueryPATH_TO_TEMP_DIR_ON_GCS
: la ruta de Cloud Storage al directorio temporal
API
Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION", "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON", "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE", "inputFilePattern":"PATH_TO_TEXT_DATA", "outputTable":"BIGQUERY_TABLE", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_Text_to_BigQuery_Xlang", } }
Haz los cambios siguientes:
PROJECT_ID
: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de DataflowJOB_NAME
: un nombre de trabajo único que elijasVERSION
: la versión de la plantilla que quieres usarPuedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
LOCATION
: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo,us-central1
PYTHON_FUNCTION
: Nombre de la función definida por el usuario (UDF) de Python que quieras usar.PATH_TO_BIGQUERY_SCHEMA_JSON
: la ruta de Cloud Storage al archivo JSON que contiene la definición del esquema.PATH_TO_PYTHON_UDF_FILE
: El URI de Cloud Storage del archivo de código Python que define la función definida por el usuario (UDF) que quieres usar. Por ejemplo,gs://my-bucket/my-udfs/my_file.py
.PATH_TO_TEXT_DATA
: la ruta de Cloud Storage a tu conjunto de datos de textoBIGQUERY_TABLE
: nombre de la tabla de BigQueryPATH_TO_TEMP_DIR_ON_GCS
: la ruta de Cloud Storage al directorio temporal
Siguientes pasos
- Consulta información sobre las plantillas de Dataflow.
- Consulta la lista de plantillas proporcionadas por Google.