Plantilla de archivos de texto de Cloud Storage a Pub/Sub (streaming)

Esta plantilla crea un flujo de procesamiento en streaming que sondea continuamente los nuevos archivos de texto subidos a Cloud Storage, lee cada archivo línea por línea y publica cadenas en un tema de Pub/Sub. La plantilla publica los registros de un archivo delimitado por líneas nuevas con registros JSON o los de un archivo CSV en un tema de Pub/Sub para procesarlos en tiempo real. Puedes usar esta plantilla para volver a reproducir datos con Pub/Sub.

La canalización se ejecuta indefinidamente y debe finalizarse manualmente mediante una cancelación, no una purga, debido a que usa la transformación "Watch", que es una "SplittableDoFn" que no admite purgas.

Actualmente, el intervalo de sondeo es fijo y está establecido en 10 segundos. Esta plantilla no establece una marca de tiempo en los registros concretos, así que la hora del evento coincidirá con la de publicación durante la ejecución. No uses este flujo de procesamiento si necesitas que la hora del evento sea exacta para que se procese tu flujo de procesamiento.

Requisitos del flujo de procesamiento

  • Los archivos de entrada deben tener un formato JSON delimitado por líneas nuevas o un formato CSV. Los registros que abarcan varias líneas en los archivos de origen pueden provocar errores en el canal de bajada porque cada línea se publica como un mensaje en Pub/Sub.
  • Debes crear un tema de Pub/Sub antes de poner en marcha la ejecución.
  • El flujo de trabajo se ejecuta indefinidamente y debe finalizarse manualmente.

Parámetros de plantilla

Parámetros obligatorios

  • inputFilePattern el patrón del archivo de entrada del que se va a leer. Por ejemplo, gs://bucket-name/files/*.json.
  • outputTopic el tema de entrada de Pub/Sub en el que se va a escribir. El nombre debe tener el formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>. Por ejemplo, projects/your-project-id/topics/your-topic-name.

Ejecutar la plantilla

Consola

  1. Ve a la página Crear tarea a partir de plantilla de Dataflow.
  2. Ir a Crear tarea a partir de plantilla
  3. En el campo Nombre de la tarea, introduce un nombre único.
  4. Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es us-central1.

    Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de flujo de datos, seleccione the Text Files on Cloud Storage to Pub/Sub (Stream) template.
  6. En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
  7. Opcional: Para cambiar del procesamiento una sola vez al modo de streaming al menos una vez, selecciona Al menos una vez.
  8. Haz clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location STAGING_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

Haz los cambios siguientes:

  • JOB_NAME: un nombre de trabajo único que elijas
  • REGION_NAME: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • STAGING_LOCATION: la ubicación de los archivos locales de almacenamiento provisional (por ejemplo, gs://your-bucket/staging)
  • TOPIC_NAME: nombre del tema de Pub/Sub
  • BUCKET_NAME: el nombre de tu segmento de Cloud Storage
  • FILE_PATTERN: el patrón glob del archivo que se va a leer del bucket de Cloud Storage (por ejemplo, path/*.csv)

API

Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
  • JOB_NAME: un nombre de trabajo único que elijas
  • LOCATION: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • STAGING_LOCATION: la ubicación de los archivos locales de almacenamiento provisional (por ejemplo, gs://your-bucket/staging)
  • TOPIC_NAME: nombre del tema de Pub/Sub
  • BUCKET_NAME: el nombre de tu segmento de Cloud Storage
  • FILE_PATTERN: el patrón glob del archivo que se va a leer del bucket de Cloud Storage (por ejemplo, path/*.csv)

Siguientes pasos