Plantilla de flujos de cambios de Spanner a Pub/Sub

La plantilla de flujos de cambios de Spanner a Pub/Sub es un flujo de procesamiento en streaming que transmite registros de cambios de datos de Spanner y los escribe en temas de Pub/Sub con Dataflow Runner V2.

Para enviar sus datos a un nuevo tema de Pub/Sub, primero debe crear el tema. Después de crear un tema, Pub/Sub genera y adjunta automáticamente una suscripción al tema nuevo. Si intentas enviar datos a un tema de Pub/Sub que no existe, la canalización de flujo de datos genera una excepción y se bloquea, ya que intenta continuamente establecer una conexión.

Si ya existe el tema de Pub/Sub necesario, puedes enviar datos a ese tema.

Para obtener más información, consulta los artículos Acerca de los flujos de cambios, Crear conexiones de flujos de cambios con Dataflow y Prácticas recomendadas para usar flujos de cambios.

Requisitos del flujo de procesamiento

  • La instancia de Spanner debe existir antes de ejecutar el flujo de procesamiento.
  • La base de datos de Spanner debe existir antes de ejecutar el flujo de procesamiento.
  • La instancia de metadatos de Spanner debe existir antes de ejecutar el flujo de procesamiento.
  • La base de datos de metadatos de Spanner debe existir antes de ejecutar el flujo de procesamiento.
  • El flujo de cambios de Spanner debe existir antes de ejecutar el flujo de procesamiento.
  • El tema de Pub/Sub debe existir antes de ejecutar el flujo de procesamiento.

Parámetros de plantilla

Parámetros obligatorios

  • spannerInstanceId la instancia de Spanner de la que se van a leer los flujos de cambios.
  • spannerDatabase la base de datos de Spanner de la que se van a leer los flujos de cambios.
  • spannerMetadataInstanceId: la instancia de Spanner que se va a usar para la tabla de metadatos del conector de flujos de cambios.
  • spannerMetadataDatabase la base de datos de Spanner que se va a usar para la tabla de metadatos del conector de flujos de cambios.
  • spannerChangeStreamName el nombre del flujo de cambios de Spanner del que se van a leer los datos.
  • pubsubTopic el tema de Pub/Sub para la salida de los flujos de cambios.

Parámetros opcionales

  • spannerProjectId el proyecto del que se van a leer los flujos de cambios. En este proyecto también se crea la tabla de metadatos del conector de flujos de cambios. El valor predeterminado de este parámetro es el proyecto en el que se ejecuta la canalización de Dataflow.
  • spannerDatabaseRole el rol de base de datos de Spanner que se usará al ejecutar la plantilla. Este parámetro solo es obligatorio cuando la entidad de IAM que ejecuta la plantilla es un usuario con control de acceso detallado. El rol de la base de datos debe tener el privilegio SELECT en el flujo de cambios y el privilegio EXECUTE en la función de lectura del flujo de cambios. Para obtener más información, consulta Control de acceso granular para secuencias de cambios (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName nombre de la tabla de metadatos del conector de cambios en tiempo real de Spanner que se va a usar. Si no se proporciona, Spanner crea automáticamente la tabla de metadatos del conector de Streams durante el cambio de flujo de la canalización. Debe proporcionar este parámetro al actualizar una canalización. No utilice este parámetro en otros casos.
  • startTimestamp la fecha y hora de inicio (https://tools.ietf.org/html/rfc3339), incluida, que se usará para leer los flujos de cambios. Por ejemplo, ex- 2021-10-12T07:20:50.52Z. El valor predeterminado es la marca de tiempo en la que se inicia la canalización, es decir, la hora actual.
  • endTimestamp DateTime final (https://tools.ietf.org/html/rfc3339), incluido, que se usará para leer los flujos de cambios. Por ejemplo, ex- 2021-10-12T07:20:50.52Z. El valor predeterminado es un tiempo infinito en el futuro.
  • spannerHost el endpoint de Cloud Spanner al que se llama en la plantilla. Solo se usa para hacer pruebas. Por ejemplo, https://spanner.googleapis.com. El valor predeterminado es https://spanner.googleapis.com.
  • outputDataFormat el formato de la salida. La salida se envuelve en muchos PubsubMessages y se envía a un tema de Pub/Sub. Los formatos permitidos son JSON y AVRO. El valor predeterminado es JSON.
  • pubsubAPI la API de Pub/Sub que se usa para implementar la canalización. Las APIs permitidas son pubsubio y native_client. Si el número de consultas por segundo (CPS) es bajo, native_client tiene menos latencia. Si el número de CPS es elevado, pubsubio ofrece un rendimiento mejor y más estable. El valor predeterminado es pubsubio.
  • pubsubProjectId: proyecto del tema de Pub/Sub. El valor predeterminado de este parámetro es el proyecto en el que se ejecuta la canalización de Dataflow.
  • rpcPriority la prioridad de la solicitud para las llamadas de Spanner. Los valores permitidos son HIGH, MEDIUM y LOW. El valor predeterminado es HIGH).
  • includeSpannerSource indica si se debe incluir o no el ID de la base de datos y el ID de la instancia de Spanner para leer el flujo de cambios en los datos del mensaje de salida. Valor predeterminado: false.
  • outputMessageMetadata valor de cadena del campo personalizado outputMessageMetadata en el mensaje de publicación/suscripción de salida. El valor predeterminado es una cadena vacía y el campo outputMessageMetadata solo se rellena si este valor no está vacío. Evite los caracteres especiales al introducir el valor(por ejemplo, las comillas dobles).

Ejecutar la plantilla

Consola

  1. Ve a la página Crear tarea a partir de plantilla de Dataflow.
  2. Ir a Crear tarea a partir de plantilla
  3. En el campo Nombre de la tarea, introduce un nombre único.
  4. Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es us-central1.

    Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de flujo de datos, seleccione the Cloud Spanner change streams to Pub/Sub template.
  6. En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
  7. Haz clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

Haz los cambios siguientes:

  • JOB_NAME: un nombre de trabajo único que elijas
  • VERSION: la versión de la plantilla que quieres usar

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • SPANNER_INSTANCE_ID: ID de instancia de Spanner
  • SPANNER_DATABASE: base de datos de Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID de instancia de metadatos de Spanner
  • SPANNER_METADATA_DATABASE: base de datos de metadatos de Spanner
  • SPANNER_CHANGE_STREAM: flujo de cambios de Spanner
  • PUBSUB_TOPIC: el tema de Pub/Sub para la salida de los flujos de cambios.

API

Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch.

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
  • JOB_NAME: un nombre de trabajo único que elijas
  • VERSION: la versión de la plantilla que quieres usar

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
  • LOCATION: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • SPANNER_INSTANCE_ID: ID de instancia de Spanner
  • SPANNER_DATABASE: base de datos de Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID de instancia de metadatos de Spanner
  • SPANNER_METADATA_DATABASE: base de datos de metadatos de Spanner
  • SPANNER_CHANGE_STREAM: flujo de cambios de Spanner
  • PUBSUB_TOPIC: el tema de Pub/Sub para la salida de los flujos de cambios.

Siguientes pasos