Plantilla de Pub/Sub a BigQuery con UDF de Python

La plantilla de Pub/Sub a BigQuery con UDF de Python es un flujo de procesamiento en streaming que lee mensajes con formato JSON de Pub/Sub y los escribe en una tabla de BigQuery. De forma opcional, puedes proporcionar una función definida por el usuario (UDF) escrita en Python para procesar los mensajes entrantes.

Requisitos del flujo de procesamiento

  • La tabla de BigQuery debe existir y tener un esquema.
  • Los datos de los mensajes de Pub/Sub deben estar en formato JSON o debes proporcionar una UDF que convierta los datos de los mensajes a JSON. Los datos JSON deben coincidir con el esquema de la tabla de BigQuery. Por ejemplo, si las cargas útiles JSON tienen el formato {"k1":"v1", "k2":"v2"}, la tabla de BigQuery debe tener dos columnas de cadena llamadas k1 y k2.
  • Especifica el parámetro inputSubscription o inputTopic, pero no ambos.

Parámetros de plantilla

Parámetros obligatorios

  • outputTableSpec la tabla de BigQuery en la que se escribirá, con el formato PROJECT_ID:DATASET_NAME.TABLE_NAME.

Parámetros opcionales

  • inputTopic el tema de Pub/Sub del que se va a leer, con el formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • inputSubscription la suscripción de Pub/Sub desde la que se va a leer, con el formato projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME>.
  • outputDeadletterTable la tabla de BigQuery que se va a usar para los mensajes que no se hayan podido enviar a la tabla de salida, con el formato PROJECT_ID:DATASET_NAME.TABLE_NAME. Si la tabla no existe, se crea cuando se ejecuta la canalización. Si no se especifica este parámetro, se usará el valor OUTPUT_TABLE_SPEC_error_records.
  • useStorageWriteApiAtLeastOnce cuando se usa la API Storage Write, especifica la semántica de escritura. Para usar la semántica de al menos una vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), asigna el valor "true" a este parámetro. Para usar la semántica de entrega única, asigna el valor false al parámetro. Este parámetro solo se aplica cuando useStorageWriteApi es true. El valor predeterminado es false.
  • useStorageWriteApi si es true, la canalización usa la API Storage Write de BigQuery (https://cloud.google.com/bigquery/docs/write-api). El valor predeterminado es false. Para obtener más información, consulta el artículo sobre cómo usar la API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams cuando se usa la API Storage Write, especifica el número de flujos de escritura. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debe definir este parámetro. El valor predeterminado es 0.
  • storageWriteApiTriggeringFrequencySec cuando se usa la API Storage Write, especifica la frecuencia de activación en segundos. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debe definir este parámetro.
  • pythonExternalTextTransformGcsPath patrón de ruta de Cloud Storage del código de Python que contiene las funciones definidas por el usuario. Por ejemplo, gs://your-bucket/your-function.py.
  • pythonExternalTextTransformFunctionName nombre de la función que se va a llamar desde el archivo Python. Usa solo letras, dígitos y guiones bajos. Por ejemplo, 'transform' or 'transform_udf1'.

Función definida por el usuario

También puedes ampliar esta plantilla escribiendo una función definida por el usuario (UDF). La plantilla llama a la función definida por el usuario para cada elemento de entrada. Las cargas útiles de los elementos se serializan como cadenas JSON. Para obtener más información, consulta el artículo sobre cómo crear funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La función definida por el usuario tiene las siguientes especificaciones:

  • Entrada: el campo de datos del mensaje de Pub/Sub, serializado como una cadena JSON.
  • Salida: una cadena JSON que coincide con el esquema de la tabla de destino de BigQuery.
  • Ejecutar la plantilla

    Consola

    1. Ve a la página Crear tarea a partir de plantilla de Dataflow.
    2. Ir a Crear tarea a partir de plantilla
    3. En el campo Nombre de la tarea, introduce un nombre único.
    4. Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es us-central1.

      Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.

    5. En el menú desplegable Plantilla de flujo de datos, seleccione the Pub/Sub to BigQuery with Python UDF template.
    6. En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
    7. Opcional: Para cambiar del procesamiento una sola vez al modo de streaming al menos una vez, selecciona Al menos una vez.
    8. Haz clic en Ejecutar trabajo.

    gcloud

    En tu shell o terminal, ejecuta la plantilla:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Xlang \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

    Haz los cambios siguientes:

    • JOB_NAME: un nombre de trabajo único que elijas
    • REGION_NAME: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
    • VERSION: la versión de la plantilla que quieres usar

      Puedes usar los siguientes valores:

      • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
      • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
    • STAGING_LOCATION: la ubicación de los archivos locales de almacenamiento provisional (por ejemplo, gs://your-bucket/staging)
    • TOPIC_NAME: nombre del tema de Pub/Sub
    • DATASET: tu conjunto de datos de BigQuery
    • TABLE_NAME: nombre de la tabla de BigQuery

    API

    Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Xlang",
       }
    }

    Haz los cambios siguientes:

    • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
    • JOB_NAME: un nombre de trabajo único que elijas
    • LOCATION: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
    • VERSION: la versión de la plantilla que quieres usar

      Puedes usar los siguientes valores:

      • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
      • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
    • STAGING_LOCATION: la ubicación de los archivos locales de almacenamiento provisional (por ejemplo, gs://your-bucket/staging)
    • TOPIC_NAME: nombre del tema de Pub/Sub
    • DATASET: tu conjunto de datos de BigQuery
    • TABLE_NAME: nombre de la tabla de BigQuery

    Siguientes pasos