Modifiche in tempo reale di Spanner al modello Pub/Sub

Le modifiche in tempo reale di Spanner al modello Pub/Sub è una pipeline di inserimento flussi che trasmette i flussi di record delle modifiche dei dati di Spanner e li scrive in argomenti Pub/Sub utilizzando Dataflow Runner V2.

Per inviare i dati a un nuovo argomento Pub/Sub, devi prima creare l'argomento. Dopo la creazione, Pub/Sub genera e collega automaticamente una sottoscrizione al nuovo argomento. Se provi a inviare dati a un argomento Pub/Sub che non esiste, la pipeline Dataflow genera un'eccezione e la pipeline si blocca mentre tenta continuamente di stabilire una connessione.

Se l'argomento Pub/Sub necessario esiste già, puoi inviare i dati a quell'argomento.

Per ulteriori informazioni, consulta Informazioni sulle modifiche in tempo reale, Creare connessioni per le modifiche in tempo reale con Dataflow e Best practice per i flussi di modifiche.

Requisiti della pipeline

  • L'istanza Spanner deve esistere prima di eseguire la pipeline.
  • Il database Spanner deve esistere prima dell'esecuzione della pipeline.
  • L'istanza dei metadati Spanner deve esistere prima dell'esecuzione della pipeline.
  • Il database di metadati Spanner deve esistere prima dell'esecuzione della pipeline.
  • Le modifiche in tempo reale di Spanner devono esistere prima dell'esecuzione della pipeline.
  • L'argomento Pub/Sub deve esistere prima dell'esecuzione della pipeline.

Parametri del modello

Parametri obbligatori

  • spannerInstanceId : l'istanza Spanner da cui leggere le modifiche in tempo reale.
  • spannerDatabase : il database Spanner da cui leggere modifiche in tempo reale.
  • spannerMetadataInstanceId : l'istanza Spanner da utilizzare per la tabella dei metadati del connettore di modifiche in tempo reale.
  • spannerMetadataDatabase : il database Spanner da utilizzare per la tabella dei metadati del connettore di modifiche in tempo reale.
  • spannerChangeStreamName : il nome del flusso di modifiche Spanner da cui leggere.
  • pubsubTopic : l'argomento Pub/Sub per l'output delle modifiche in tempo reale.

Parametri facoltativi

  • spannerProjectId : il progetto da cui leggere modifiche in tempo reale. In questo progetto viene anche creata la tabella dei metadati del connettore di modifiche in tempo reale. Il valore predefinito per questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.
  • spannerDatabaseRole : il ruolo del database Spanner da utilizzare durante l'esecuzione del modello. Questo parametro è obbligatorio solo se l'entità IAM che esegue il modello è un utente con controllo dell'accesso dell'accesso granulare. Il ruolo del database deve avere il privilegio SELECT per la modifica in tempo reale e il privilegio EXECUTE per la funzione di lettura del flusso di modifiche. Per ulteriori informazioni, consulta Controllo dell'accesso granulare per le modifiche in tempo reale (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName : la modifiche in tempo reale del nome della tabella dei metadati del connettore di Spanner da utilizzare. Se non viene specificato, Spanner crea automaticamente la tabella dei metadati del connettore dei flussi durante la modifica del flusso della pipeline. Devi fornire questo parametro quando aggiorni una pipeline esistente. Non usare questo parametro in altri casi.
  • startTimestamp : il valore DateTime iniziale (https://tools.ietf.org/html/rfc3339), incluso, da utilizzare per la lettura delle modifiche in tempo reale. Ad esempio, ad esempio: 2021-10-12T07:20:50.52Z. Il valore predefinito è il timestamp di avvio della pipeline, ovvero l'ora attuale.
  • endTimestamp : il valore DateTime finale (https://tools.ietf.org/html/rfc3339), incluso, da utilizzare per la lettura delle modifiche in tempo reale. Ad esempio, ad esempio: 2021-10-12T07:20:50.52Z. Il valore predefinito è un tempo infinito nel futuro.
  • spannerHost : l'endpoint di Cloud Spanner da chiamare nel modello. Utilizzato solo per i test. Esempio: https://spanner.googleapis.com. Il valore predefinito è: https://spanner.googleapis.com.
  • outputDataFormat : il formato dell'output. L'output viene aggregato in molti messaggi PubsubMessages e inviato a un argomento Pub/Sub. I formati consentiti sono JSON e AVRO. Il valore predefinito è JSON.
  • pubsubAPI : l'API Pub/Sub utilizzata per implementare la pipeline. Le API consentite sono pubsubio e native_client. Per un numero ridotto di query al secondo (QPS), native_client ha meno latenza. Per un numero elevato di QPS, pubsubio offre prestazioni migliori e più stabili. Il valore predefinito è pubsubio.
  • pubsubProjectId : progetto dell'argomento Pub/Sub. Il valore predefinito per questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.
  • rpcPriority : la priorità delle richieste per le chiamate Spanner. I valori consentiti sono HIGH, MEDIUM e LOW. Il valore predefinito è: HIGH).

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco di regioni in cui è possibile eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Cloud Spanner change streams to Pub/Sub template.
  6. Inserisci i valori parametro negli appositi campi.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

Sostituisci quanto segue:

  • JOB_NAME: un nome job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella padre non con data del bucket: gs://dataflow-templates-REGION_NAME/latest/
    • il nome della versione, ad esempio 2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che è possibile trovare nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • SPANNER_INSTANCE_ID: ID istanza Spanner
  • SPANNER_DATABASE: database Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID istanza di metadati di Spanner
  • SPANNER_METADATA_DATABASE: database di metadati Spanner
  • SPANNER_CHANGE_STREAM: modifiche in tempo reale di Spanner
  • PUBSUB_TOPIC: l'argomento Pub/Sub per l'output delle modifiche in tempo reale

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella padre non con data del bucket: gs://dataflow-templates-REGION_NAME/latest/
    • il nome della versione, ad esempio 2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che è possibile trovare nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • SPANNER_INSTANCE_ID: ID istanza Spanner
  • SPANNER_DATABASE: database Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID istanza di metadati di Spanner
  • SPANNER_METADATA_DATABASE: database di metadati Spanner
  • SPANNER_CHANGE_STREAM: modifiche in tempo reale di Spanner
  • PUBSUB_TOPIC: l'argomento Pub/Sub per l'output delle modifiche in tempo reale

Passaggi successivi