Plantilla de texto de Cloud Storage a BigQuery con UDF de Python

El flujo de procesamiento de texto de Cloud Storage a BigQuery con UDF de Python es un flujo de procesamiento por lotes que lee archivos de texto almacenados en Cloud Storage, los transforma mediante una función definida por el usuario (UDF) de Python y añade el resultado a una tabla de BigQuery.

Requisitos del flujo de procesamiento

  • Crea un archivo JSON que describa tu esquema de BigQuery.

    Asegúrate de que haya una matriz JSON de nivel superior que se llame BigQuery Schema y cuyo contenido siga este patrón: {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

    La plantilla de lote de texto de Cloud Storage a BigQuery no admite la importación de datos en campos STRUCT (Record) de la tabla de BigQuery de destino.

    El siguiente JSON describe un ejemplo de esquema de BigQuery:

    {
      "BigQuery Schema": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        },
      ]
    }
  • Crea un archivo de Python (.py) con una función UDF que ofrezca la lógica necesaria para transformar las líneas de texto. Tu función debe devolver una cadena JSON.

    Por ejemplo, esta función divide cada línea de un archivo CSV y devuelve una cadena JSON después de transformar los valores.

    import json
    def process(value):
      data = value.split(',')
      obj = { 'name': data[0], 'age': int(data[1]) }
      return json.dumps(obj)

Parámetros de plantilla

Parámetro Descripción
JSONPath La ruta gs:// al archivo JSON que define tu esquema de BigQuery, almacenado en Cloud Storage. Por ejemplo, gs://path/to/my/schema.json.
pythonExternalTextTransformGcsPath El URI de Cloud Storage del archivo de código Python que define la función definida por el usuario (UDF) que quieres usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName Nombre de la función definida por el usuario (UDF) de Python que quieras usar.
inputFilePattern La ruta gs:// al texto de Cloud Storage que quieras procesar. Por ejemplo, gs://path/to/my/text/data.txt.
outputTable El nombre de la tabla de BigQuery que quieres crear para almacenar los datos procesados. Si reutilizas una tabla de BigQuery, los datos se añadirán a la tabla de destino. Por ejemplo, my-project-name:my-dataset.my-table.
bigQueryLoadingTemporaryDirectory Directorio temporal del proceso de carga de BigQuery. Por ejemplo, gs://my-bucket/my-files/temp_dir.
useStorageWriteApi Opcional: Si true, la canalización usa la API Storage Write de BigQuery. El valor predeterminado es false. Para obtener más información, consulta el artículo Usar la API Storage Write.
useStorageWriteApiAtLeastOnce Opcional: cuando se usa la API Storage Write, especifica la semántica de escritura. Para usar la semántica al menos una vez, asigna el valor true a este parámetro. Para usar la semántica de entrega única, asigna el valor false al parámetro. Este parámetro solo se aplica cuando useStorageWriteApi es true. El valor predeterminado es false.

Función definida por el usuario

También puedes ampliar esta plantilla escribiendo una función definida por el usuario (UDF). La plantilla llama a la función definida por el usuario para cada elemento de entrada. Las cargas útiles de los elementos se serializan como cadenas JSON. Para obtener más información, consulta el artículo sobre cómo crear funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La función definida por el usuario tiene las siguientes especificaciones:

  • Entrada: una línea de texto de un archivo de entrada de Cloud Storage.
  • Salida: una cadena JSON que coincide con el esquema de la tabla de destino de BigQuery.

Ejecutar la plantilla

Consola

  1. Ve a la página Crear tarea a partir de plantilla de Dataflow.
  2. Ir a Crear tarea a partir de plantilla
  3. En el campo Nombre de la tarea, introduce un nombre único.
  4. Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es us-central1.

    Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de flujo de datos, seleccione the Text Files on Cloud Storage to BigQuery with Python UDF (Batch) template.
  6. En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
  7. Haz clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_Text_to_BigQuery_Xlang \
    --region REGION_NAME \
    --parameters \
pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
  • JOB_NAME: un nombre de trabajo único que elijas
  • VERSION: la versión de la plantilla que quieres usar

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • PYTHON_FUNCTION: Nombre de la función definida por el usuario (UDF) de Python que quieras usar.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: la ruta de Cloud Storage al archivo JSON que contiene la definición del esquema.
  • PATH_TO_PYTHON_UDF_FILE: El URI de Cloud Storage del archivo de código Python que define la función definida por el usuario (UDF) que quieres usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.py.
  • PATH_TO_TEXT_DATA: la ruta de Cloud Storage a tu conjunto de datos de texto
  • BIGQUERY_TABLE: nombre de la tabla de BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: la ruta de Cloud Storage al directorio temporal

API

Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
        "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION",
        "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
        "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE",
        "inputFilePattern":"PATH_TO_TEXT_DATA",
        "outputTable":"BIGQUERY_TABLE",
        "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_Text_to_BigQuery_Xlang",
   }
}

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
  • JOB_NAME: un nombre de trabajo único que elijas
  • VERSION: la versión de la plantilla que quieres usar

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
  • LOCATION: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • PYTHON_FUNCTION: Nombre de la función definida por el usuario (UDF) de Python que quieras usar.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: la ruta de Cloud Storage al archivo JSON que contiene la definición del esquema.
  • PATH_TO_PYTHON_UDF_FILE: El URI de Cloud Storage del archivo de código Python que define la función definida por el usuario (UDF) que quieres usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.py.
  • PATH_TO_TEXT_DATA: la ruta de Cloud Storage a tu conjunto de datos de texto
  • BIGQUERY_TABLE: nombre de la tabla de BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: la ruta de Cloud Storage al directorio temporal

Siguientes pasos