Plantilla de texto de Cloud Storage a BigQuery con UDF de Python

La canalización de Cloud Storage Text a BigQuery con UDF de Python es una canalización por lotes que lee archivos de texto almacenados en Cloud Storage, los transforma con una función definida por el usuario (UDF) de Python y adjunta el resultado a una tabla de BigQuery.

Requisitos de la canalización

  • Crea un archivo JSON que describa tu esquema de BigQuery.

    Asegúrate de que haya un array JSON de nivel superior titulado BigQuery Schema y que su contenido siga el patrón {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

    La plantilla de texto de Cloud Storage a BigQuery no admite la importación de datos a los campos STRUCT (Record) en la tabla BigQuery de destino.

    En el siguiente JSON, se describe un esquema de BigQuery de ejemplo:

    {
      "BigQuery Schema": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        },
      ]
    }
  • Crea un archivo Python (.py) con tu función UDF que proporciona la lógica para transformar las líneas de texto. La función debe mostrar una string JSON.

    Por ejemplo, esta función divide cada línea de un archivo CSV y muestra una string JSON después de transformar los valores.

    import json
    def process(value):
      data = value.split(',')
      obj = { 'name': data[0], 'age': int(data[1]) }
      return json.dumps(obj)

Parámetros de la plantilla

Parámetro Descripción
JSONPath Ruta gs:// al archivo JSON que define el esquema BigQuery, almacenado en Cloud Storage. Por ejemplo, gs://path/to/my/schema.json
pythonExternalTextTransformGcsPath El URI de Cloud Storage del archivo de código de Python que define la función definida por el usuario (UDF) que deseas usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName El nombre de la función definida por el usuario (UDF) de Python que deseas usar.
inputFilePattern Ruta gs:// al texto en Cloud Storage que deseas procesar. Por ejemplo, gs://path/to/my/text/data.txt.
outputTable Nombre de la tabla de BigQuery que quieres crear para almacenar los datos procesados. Si vuelves a usar una tabla de BigQuery existente, los datos se agregan a la tabla de destino. Por ejemplo, my-project-name:my-dataset.my-table.
bigQueryLoadingTemporaryDirectory El directorio temporal para el proceso de carga de BigQuery. Por ejemplo, gs://my-bucket/my-files/temp_dir.
useStorageWriteApi Opcional: Si es true, la canalización usa la API de BigQuery Storage Write. El valor predeterminado es false. Para obtener más información, consulta Usa la API de BigQuery Storage Write.
useStorageWriteApiAtLeastOnce Opcional: Cuando usas la API de Storage Write, se especifica la semántica de escritura (opcional). Para usar una semántica de al menos una vez, establece este parámetro en true. Para usar una semántica de una y solo una vez, configura el parámetro en false. Este parámetro se aplica solo cuando useStorageWriteApi es true. El valor predeterminado es false.

Función definida por el usuario

Para extender esta plantilla, puedes escribir una función definida por el usuario (UDF). La plantilla llama a la UDF para cada elemento de entrada. Las cargas útiles de elementos se serializan como cadenas JSON. Para obtener más información, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La UDF tiene la siguiente especificación:

  • Entrada: una línea de texto de un archivo de entrada de Cloud Storage.
  • Resultado: Una cadena JSON que coincide con el esquema de la tabla de destino de BigQuery.

Ejecuta la plantilla

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Text Files on Cloud Storage to BigQuery with Python UDF (Batch) template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_Text_to_BigQuery_Xlang \
    --region REGION_NAME \
    --parameters \
pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • PYTHON_FUNCTION: el nombre de la función definida por el usuario (UDF) de Python que deseas usar.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: Es la ruta de acceso de Cloud Storage al archivo JSON que contiene la definición de esquema.
  • PATH_TO_PYTHON_UDF_FILE: El URI de Cloud Storage del archivo de código de Python que define la función definida por el usuario (UDF) que deseas usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.py
  • PATH_TO_TEXT_DATA: Es la ruta de acceso de Cloud Storage al conjunto de datos de texto.
  • BIGQUERY_TABLE: Es el nombre de la tabla de BigQuery.
  • PATH_TO_TEMP_DIR_ON_GCS: Es la ruta de acceso de Cloud Storage al directorio temporal.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
        "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION",
        "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
        "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE",
        "inputFilePattern":"PATH_TO_TEXT_DATA",
        "outputTable":"BIGQUERY_TABLE",
        "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_Text_to_BigQuery_Xlang",
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • PYTHON_FUNCTION: el nombre de la función definida por el usuario (UDF) de Python que deseas usar.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: Es la ruta de acceso de Cloud Storage al archivo JSON que contiene la definición de esquema.
  • PATH_TO_PYTHON_UDF_FILE: El URI de Cloud Storage del archivo de código de Python que define la función definida por el usuario (UDF) que deseas usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.py
  • PATH_TO_TEXT_DATA: Es la ruta de acceso de Cloud Storage al conjunto de datos de texto.
  • BIGQUERY_TABLE: Es el nombre de la tabla de BigQuery.
  • PATH_TO_TEMP_DIR_ON_GCS: Es la ruta de acceso de Cloud Storage al directorio temporal.

¿Qué sigue?