Texto de Cloud Storage a BigQuery (transmisión) con plantilla UDF de Python

La canalización de Cloud Storage Text a BigQuery es una canalización de transmisión que transmite archivos de texto almacenados en Cloud Storage, los transforma mediante una función definida por el usuario (UDF) de Python que proporcionas y adjunta el resultado a BigQuery.

La canalización se ejecuta de forma indefinida, y deberás detenerla de forma manual a través de una cancelación y no una desviación, debido a su uso de la transformación Watch, que es una DoFn divisible que no admite el desvío.

Requisitos de la canalización

  • Crea un archivo JSON que describa el esquema de tu tabla de salida en BigQuery.

    Asegúrate de que haya un array JSON de nivel superior titulado fields y que su contenido siga el patrón {"name": "COLUMN_NAME", "type": "DATA_TYPE"}. Por ejemplo:

    {
      "fields": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        }
      ]
    }
  • Crea un archivo Python (.py) con tu función UDF que proporciona la lógica para transformar las líneas de texto. La función debe mostrar una string JSON.

    En el siguiente ejemplo, se divide cada línea de un archivo CSV, se crea un objeto JSON con los valores y se muestra una string JSON:

    import json
    def process(value):
      data = value.split(',')
      obj = { 'name': data[0], 'age': int(data[1]) }
      return json.dumps(obj)

Parámetros de la plantilla

Parámetro Descripción
pythonExternalTextTransformGcsPath El URI de Cloud Storage del archivo de código de Python que define la función definida por el usuario (UDF) que deseas usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName El nombre de la función definida por el usuario (UDF) de Python que deseas usar.
JSONPath Ubicación de Cloud Storage de tu archivo de esquema de BigQuery, descrito como un JSON. Por ejemplo: gs://path/to/my/schema.json.
outputTable La tabla de BigQuery completamente calificada. Por ejemplo: my-project:dataset.table
inputFilePattern Ubicación en Cloud Storage del texto que quieres procesar. Por ejemplo: gs://my-bucket/my-files/text.txt.
bigQueryLoadingTemporaryDirectory Directorio temporal para el proceso de carga de BigQuery. Por ejemplo: gs://my-bucket/my-files/temp_dir.
outputDeadletterTable Tabla de mensajes que no llegaron a la tabla de resultados. Por ejemplo: my-project:dataset.my-unprocessed-table. Si no existe, se crea durante la ejecución de la canalización. Si no se especifica, se usa <outputTableSpec>_error_records en su lugar.

Función definida por el usuario

Esta plantilla requiere una UDF que analice los archivos de entrada, como se describe en Requisitos de canalización. La plantilla llama a la UDF para cada línea de texto en cada archivo de entrada. Para obtener más información sobre cómo crear UDF, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La UDF tiene la siguiente especificación:

  • Entrada: una sola línea de texto de un archivo de entrada.
  • Resultado: Una cadena JSON que coincide con el esquema de la tabla de destino de BigQuery.

Ejecuta la plantilla

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Cloud Storage Text to BigQuery (Stream) with Python UDF template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\
pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • PYTHON_FUNCTION: El nombre de la función definida por el usuario (UDF) de Python que deseas usar.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: Es la ruta de acceso de Cloud Storage al archivo JSON que contiene la definición de esquema.
  • PATH_TO_PYTHON_UDF_FILE: El URI de Cloud Storage del archivo de código de Python que define la función definida por el usuario (UDF) que deseas usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.py
  • PATH_TO_TEXT_DATA: Es la ruta de acceso de Cloud Storage al conjunto de datos de texto.
  • BIGQUERY_TABLE: Es el nombre de la tabla de BigQuery.
  • BIGQUERY_UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery para los mensajes no procesados.
  • PATH_TO_TEMP_DIR_ON_GCS: Es la ruta de acceso de Cloud Storage al directorio temporal.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
       "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang",
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • PYTHON_FUNCTION: El nombre de la función definida por el usuario (UDF) de Python que deseas usar.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: Es la ruta de acceso de Cloud Storage al archivo JSON que contiene la definición de esquema.
  • PATH_TO_PYTHON_UDF_FILE: El URI de Cloud Storage del archivo de código de Python que define la función definida por el usuario (UDF) que deseas usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.py
  • PATH_TO_TEXT_DATA: Es la ruta de acceso de Cloud Storage al conjunto de datos de texto.
  • BIGQUERY_TABLE: Es el nombre de la tabla de BigQuery.
  • BIGQUERY_UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery para los mensajes no procesados.
  • PATH_TO_TEMP_DIR_ON_GCS: Es la ruta de acceso de Cloud Storage al directorio temporal.

¿Qué sigue?