Plantilla de Pub/Sub a BigQuery

La plantilla de Pub/Sub a BigQuery es una canalización de transmisión que lee mensajes con formato JSON de Pub/Sub y los escribe en una tabla de BigQuery. De forma opcional, puedes proporcionar una función definida por el usuario (UDF) escrita en JavaScript para procesar los mensajes entrantes.

Requisitos de la canalización

  • La tabla de BigQuery debe existir y tener un esquema.
  • Los datos del mensaje de Pub/Sub deben usar el formato JSON, o bien debes proporcionar una UDF que convierta los datos del mensaje en JSON. Los datos JSON deben coincidir con el esquema de la tabla de BigQuery. Por ejemplo, si las cargas útiles de JSON tienen el formato {"k1":"v1", "k2":"v2"}, la tabla de BigQuery debe tener dos columnas de cadenas llamadas k1 y k2.
  • Especifica el parámetro inputSubscription o inputTopic, pero no ambos.

Parámetros de la plantilla

Parámetros obligatorios

  • outputTableSpec: La tabla de BigQuery en la que se escribirá, con el formato "PROJECT_ID:DATASET_NAME.TABLE_NAME".

Parámetros opcionales

  • inputTopic: El tema de Pub/Sub desde el que se va a leer, con el formato "projects/<PROJECT_ID>/topics/<TOPIC_NAME>".
  • inputSubscription: La suscripción a Pub/Sub desde la que se va a leer, con el formato "projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME>".
  • outputDeadletterTable: La tabla de BigQuery que se usará para los mensajes que no llegaron a la tabla de salida, con el formato "PROJECT_ID:DATASET_NAME.TABLE_NAME". Si la tabla no existe, se crea cuando se ejecuta la canalización. Si no se especifica este parámetro, se usa el valor "OUTPUT_TABLE_SPEC_error_records" en su lugar.
  • useStorageWriteApiAtLeastOnce: cuando usas la API de Storage Write, se especifica la semántica de escritura. Para usar una semántica de al menos una vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), configura el parámetro en true. Para usar una semántica de una y solo una vez, configura el parámetro en false. Este parámetro se aplica solo cuando useStorageWriteApi es true. El valor predeterminado es false.
  • useStorageWriteApi : si es verdadero, la canalización usa la API de BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). El valor predeterminado es false. Para obtener más información, consulta Usa la API de Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: cuando usas la API de Storage Write, se especifica la cantidad de transmisiones de escritura. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debes configurar este parámetro. La configuración predeterminada es 0.
  • storageWriteApiTriggeringFrequencySec: cuando se usa la API de Storage Write, se especifica la frecuencia de activación en segundos. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debes configurar este parámetro.
  • javascriptTextTransformGcsPath: el URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que se usará. (Ejemplo: gs://my-bucket/my-udfs/my_file.js).
  • javascriptTextTransformFunctionName: el nombre de la función definida por el usuario (UDF) de JavaScript que se usará. Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: Especifica la frecuencia en minutos con la que se debe volver a cargar la UDF. Si el valor es mayor que 0, Dataflow comprueba de forma periódica el archivo de UDF en Cloud Storage y vuelve a cargar la UDF si el archivo se modifica. Este parámetro te permite actualizar la UDF mientras se ejecuta la canalización, sin necesidad de reiniciar el trabajo. Si el valor es 0, se inhabilita la carga de UDF. El valor predeterminado es 0.

Función definida por el usuario

Para extender esta plantilla, puedes escribir una función definida por el usuario (UDF). La plantilla llama a la UDF para cada elemento de entrada. Las cargas útiles de elementos se serializan como cadenas JSON. Para obtener más información, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La UDF tiene la siguiente especificación:

  • Entrada: el campo de datos del mensaje de Pub/Sub, serializado como una cadena JSON
  • Resultado: Una cadena JSON que coincide con el esquema de la tabla de destino de BigQuery.
  • Ejecuta la plantilla

    Console

    1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
    2. Ir a Crear un trabajo a partir de una plantilla
    3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
    4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

      Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

    5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to BigQuery template.
    6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
    7. Opcional: Para cambiar del procesamiento “exactamente una vez” al modo de transmisión al menos una vez, selecciona Al menos una vez.
    8. Haz clic en Ejecutar trabajo.

    gcloud

    En tu shell o terminal, ejecuta la plantilla:

    gcloud dataflow flex-template run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Flex \
        --template-file-gcs-location REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

    Reemplaza lo siguiente:

    • JOB_NAME: Es el nombre del trabajo que elijas
    • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
    • VERSION: Es la versión de la plantilla que deseas usar.

      Puedes usar los siguientes valores:

      • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
      • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
    • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
    • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
    • DATASET: Es el conjunto de datos de BigQuery.
    • TABLE_NAME: Es el nombre de la tabla de BigQuery.

    API

    Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Flex",
       }
    }

    Reemplaza lo siguiente:

    • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
    • JOB_NAME: Es el nombre del trabajo que elijas
    • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
    • VERSION: Es la versión de la plantilla que deseas usar.

      Puedes usar los siguientes valores:

      • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
      • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
    • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
    • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
    • DATASET: Es el conjunto de datos de BigQuery.
    • TABLE_NAME: Es el nombre de la tabla de BigQuery.

    ¿Qué sigue?