Plantilla de texto de Cloud Storage a BigQuery (transmisión)

La canalización de Cloud Storage Text a BigQuery es una canalización de transmisión que transmite archivos de texto almacenados en Cloud Storage, los transforma con una función definida por el usuario (UDF) de JavaScript que proporcionas y adjunta el resultado a BigQuery.

La canalización se ejecuta de forma indefinida, y deberás detenerla de forma manual a través de una cancelación y no una desviación, debido a su uso de la transformación Watch, que es una DoFn divisible que no admite el desvío.

Requisitos de la canalización

  • Crea un archivo JSON que describa el esquema de tu tabla de salida en BigQuery.

    Asegúrate de que haya un array JSON de nivel superior titulado fields y que su contenido siga el patrón {"name": "COLUMN_NAME", "type": "DATA_TYPE"}. Por ejemplo:

    {
      "fields": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        }
      ]
    }
    
  • Crea un archivo JavaScript (.js) con tu función UDF que proporciona la lógica para transformar las líneas de texto. La función debe mostrar una string JSON.

    En el siguiente ejemplo, se divide cada línea de un archivo CSV, se crea un objeto JSON con los valores y se muestra una string JSON:

    function process(inJson) {
      val = inJson.split(",");
    
      const obj = {
        "name": val[0],
        "age": parseInt(val[1])
      };
      return JSON.stringify(obj);
    }

Parámetros de la plantilla

Parámetro Descripción
javascriptTextTransformGcsPath El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
JSONPath Ubicación de Cloud Storage de tu archivo de esquema de BigQuery, descrito como un JSON. Por ejemplo: gs://path/to/my/schema.json.
javascriptTextTransformFunctionName es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar. Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
javascriptTextTransformReloadIntervalMinutes Opcional: Especifica la frecuencia en minutos con la que se debe volver a cargar la UDF. Si el valor es mayor que 0, Dataflow verifica de forma periódica el archivo de UDF en Cloud Storage y vuelve a cargar la UDF si el archivo se modificó. Este parámetro te permite actualizar la UDF mientras se ejecuta la canalización, sin necesidad de reiniciar el trabajo. Si el valor es 0, la carga de UDF está inhabilitada. El valor predeterminado es 0.
outputTable La tabla de BigQuery calificada por completo. Por ejemplo: my-project:dataset.table
inputFilePattern Ubicación en Cloud Storage del texto que quieres procesar. Por ejemplo: gs://my-bucket/my-files/text.txt.
bigQueryLoadingTemporaryDirectory Directorio temporal para el proceso de carga de BigQuery. Por ejemplo: gs://my-bucket/my-files/temp_dir.
outputDeadletterTable Tabla de mensajes que no llegaron a la tabla de resultados. Por ejemplo: my-project:dataset.my-unprocessed-table. Si no existe, se crea durante la ejecución de la canalización. Si no se especifica, se usa <outputTableSpec>_error_records en su lugar.

Función definida por el usuario

Esta plantilla requiere una UDF que analice los archivos de entrada, como se describe en Requisitos de canalización. La plantilla llama a la UDF para cada línea de texto en cada archivo de entrada. Para obtener más información sobre cómo crear UDF, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La UDF tiene la siguiente especificación:

  • Entrada: una sola línea de texto de un archivo de entrada.
  • Resultado: Una cadena JSON que coincide con el esquema de la tabla de destino de BigQuery.

Ejecuta la plantilla

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Cloud Storage Text to BigQuery (Stream) template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.

    Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: Es la ruta de acceso de Cloud Storage al archivo JSON que contiene la definición de esquema.
  • PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: Es la ruta de acceso de Cloud Storage al conjunto de datos de texto.
  • BIGQUERY_TABLE: Es el nombre de la tabla de BigQuery.
  • BIGQUERY_UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery para los mensajes no procesados.
  • PATH_TO_TEMP_DIR_ON_GCS: Es la ruta de acceso de Cloud Storage al directorio temporal.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex",
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.

    Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: Es la ruta de acceso de Cloud Storage al archivo JSON que contiene la definición de esquema.
  • PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: Es la ruta de acceso de Cloud Storage al conjunto de datos de texto.
  • BIGQUERY_TABLE: Es el nombre de la tabla de BigQuery.
  • BIGQUERY_UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery para los mensajes no procesados.
  • PATH_TO_TEMP_DIR_ON_GCS: Es la ruta de acceso de Cloud Storage al directorio temporal.

¿Qué sigue?