Plantilla de Pub/Sub Proto a BigQuery con UDF de Python

La plantilla de Pub/Sub proto a BigQuery es un flujo de procesamiento en streaming que ingiere datos proto desde una suscripción de Pub/Sub y los escribe en una tabla de BigQuery. Los errores que se producen al escribir en la tabla de BigQuery se envían a un tema de Pub/Sub sin procesar.

Se puede proporcionar una función definida por el usuario (UDF) de Python para transformar los datos. Los errores que se produzcan al ejecutar la UDF se pueden enviar a un tema de Pub/Sub independiente o al mismo tema sin procesar que los errores de BigQuery.

Antes de ejecutar una canalización de Dataflow para este caso, plantéate si una suscripción de Pub/Sub a BigQuery con una UDF se ajusta a tus necesidades.

Requisitos del flujo de procesamiento

  • Se debe crear la suscripción de Pub/Sub de entrada.
  • Se debe crear el archivo de esquema de los registros de Proto en Cloud Storage.
  • Se debe crear el tema de Pub/Sub de salida.
  • Se debe crear el conjunto de datos de BigQuery de salida.
  • Si la tabla de BigQuery existe, debe tener un esquema que coincida con los datos de proto, independientemente del valor de createDisposition.

Parámetros de plantilla

Parámetro Descripción
protoSchemaPath Ubicación en Cloud Storage del archivo de esquema proto independiente. Por ejemplo, gs://path/to/my/file.pb. Este archivo se puede generar con la marca --descriptor_set_out del comando protoc. La marca --include_imports garantiza que el archivo sea independiente.
fullMessageName El nombre completo del mensaje proto. Por ejemplo, package.name.MessageName, donde package.name es el valor proporcionado para la instrucción package y no para la instrucción java_package.
inputSubscription Suscripción de entrada de Pub/Sub desde la que se va a leer. Por ejemplo, projects/<project>/subscriptions/<subscription>.
outputTopic El tema de Pub/Sub que se va a usar para los registros sin procesar. Por ejemplo, projects/<project-id>/topics/<topic-name>.
outputTableSpec Ubicación de la tabla de salida de BigQuery. Por ejemplo, my-project:my_dataset.my_table. En función del valor de createDisposition especificado, la tabla de salida se puede crear automáticamente con el archivo de esquema de entrada.
preserveProtoFieldNames Opcional: true para conservar el nombre del campo Proto original en JSON. false para usar nombres de JSON más estándar. Por ejemplo, false cambiaría field_name por fieldName. Valor predeterminado: false
bigQueryTableSchemaPath Opcional: ruta de Cloud Storage a la ruta del esquema de BigQuery. Por ejemplo, gs://path/to/my/schema.json. Si no se proporciona, el esquema se deduce del esquema Proto.
pythonExternalTextTransformGcsPath Opcional: URI de Cloud Storage del archivo de código Python que define la función definida por el usuario (UDF) que quieres usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName Opcional: Nombre de la función definida por el usuario (UDF) de Python que quieras usar.
udfOutputTopic Opcional: el tema de Pub/Sub que almacena los errores de la función definida por el usuario. Por ejemplo, projects/<project-id>/topics/<topic-name>. Si no se proporciona, los errores de UDF se envían al mismo tema que outputTopic.
writeDisposition Opcional: el WriteDispositionde BigQuery. Por ejemplo, WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Valor predeterminado: WRITE_APPEND.
createDisposition Opcional: el CreateDispositionde BigQuery. Por ejemplo, CREATE_IF_NEEDED, CREATE_NEVER. Valor predeterminado: CREATE_IF_NEEDED.
useStorageWriteApi Opcional: Si true, la canalización usa la API Storage Write de BigQuery. El valor predeterminado es false. Para obtener más información, consulta el artículo Usar la API Storage Write.
useStorageWriteApiAtLeastOnce Opcional: cuando se usa la API Storage Write, especifica la semántica de escritura. Para usar la semántica al menos una vez, asigna el valor true a este parámetro. Para usar la semántica de entrega única, asigna el valor false al parámetro. Este parámetro solo se aplica cuando useStorageWriteApi es true. El valor predeterminado es false.
numStorageWriteApiStreams Opcional: Cuando se usa la API Storage Write, especifica el número de flujos de escritura. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debe definir este parámetro.
storageWriteApiTriggeringFrequencySec Opcional: Cuando se usa la API Storage Write, especifica la frecuencia de activación en segundos. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debe definir este parámetro.

Función definida por el usuario

También puedes ampliar esta plantilla escribiendo una función definida por el usuario (UDF). La plantilla llama a la función definida por el usuario para cada elemento de entrada. Las cargas útiles de los elementos se serializan como cadenas JSON. Para obtener más información, consulta el artículo sobre cómo crear funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La función definida por el usuario tiene las siguientes especificaciones:

  • Entrada: el campo de datos del mensaje de Pub/Sub, serializado como una cadena JSON.
  • Salida: una cadena JSON que coincide con el esquema de la tabla de destino de BigQuery.
  • Ejecutar la plantilla

    Consola

    1. Ve a la página Crear tarea a partir de plantilla de Dataflow.
    2. Ir a Crear tarea a partir de plantilla
    3. En el campo Nombre de la tarea, introduce un nombre único.
    4. Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es us-central1.

      Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.

    5. En el menú desplegable Plantilla de flujo de datos, seleccione the Pub/Sub Proto to BigQuery with Python UDF template.
    6. En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
    7. Haz clic en Ejecutar trabajo.

    gcloud

    En tu shell o terminal, ejecuta la plantilla:

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang \
        --parameters \
    schemaPath=SCHEMA_PATH,\
    fullMessageName=PROTO_MESSAGE_NAME,\
    inputSubscription=SUBSCRIPTION_NAME,\
    outputTableSpec=BIGQUERY_TABLE,\
    outputTopic=UNPROCESSED_TOPIC
      

    Haz los cambios siguientes:

    • JOB_NAME: un nombre de trabajo único que elijas
    • REGION_NAME: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
    • VERSION: la versión de la plantilla que quieres usar

      Puedes usar los siguientes valores:

      • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
      • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
    • SCHEMA_PATH: la ruta de Cloud Storage al archivo de esquema Proto (por ejemplo, gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME: el nombre del mensaje Proto (por ejemplo, package.name.MessageName)
    • SUBSCRIPTION_NAME: el nombre de la suscripción de entrada de Pub/Sub
    • BIGQUERY_TABLE: nombre de la tabla de salida de BigQuery
    • UNPROCESSED_TOPIC: el tema de Pub/Sub que se va a usar para la cola sin procesar.

    API

    Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"
          }
       }
    }
      

    Haz los cambios siguientes:

    • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
    • JOB_NAME: un nombre de trabajo único que elijas
    • LOCATION: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
    • VERSION: la versión de la plantilla que quieres usar

      Puedes usar los siguientes valores:

      • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
      • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
    • SCHEMA_PATH: la ruta de Cloud Storage al archivo de esquema Proto (por ejemplo, gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME: el nombre del mensaje Proto (por ejemplo, package.name.MessageName)
    • SUBSCRIPTION_NAME: el nombre de la suscripción de entrada de Pub/Sub
    • BIGQUERY_TABLE: nombre de la tabla de salida de BigQuery
    • UNPROCESSED_TOPIC: el tema de Pub/Sub que se va a usar para la cola sin procesar.

    Siguientes pasos