Modello Pub/Sub a Pub/Sub

Il modello da Pub/Sub a Pub/Sub è una pipeline di inserimento flussi che legge i messaggi da una sottoscrizione Pub/Sub e li scrive in un altro argomento Pub/Sub. La pipeline accetta anche una chiave facoltativa di attributo del messaggio e un valore che può essere utilizzato per filtrare i messaggi da scrivere nell'argomento Pub/Sub. Puoi utilizzare questo modello per copiare i messaggi da una sottoscrizione Pub/Sub a un altro argomento Pub/Sub con un filtro dei messaggi facoltativo.

Requisiti della pipeline

  • La sottoscrizione Pub/Sub di origine deve esistere prima dell'esecuzione.
  • La sottoscrizione Pub/Sub di origine deve essere una sottoscrizione pull.
  • L'argomento Pub/Sub di destinazione deve esistere prima dell'esecuzione.

Parametri del modello

Parametro Descrizione
inputSubscription Sottoscrizione Pub/Sub da cui leggere l'input. Ad esempio, projects/<project-id>/subscriptions/<subscription-name>.
outputTopic Argomento Cloud Pub/Sub in cui scrivere l'output. Ad esempio, projects/<project-id>/topics/<topic-name>.
filterKey (Facoltativo) Filtra gli eventi in base a una chiave dell'attributo. Nessun filtro applicato se filterKey non è specificato.
filterValue (Facoltativo) Valore dell'attributo di filtro da utilizzare nel caso in cui venga fornita una chiave filterKey. Per impostazione predefinita viene utilizzato un valore filterValue null.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, vedi Località Dataflow.

  5. Nel menu a discesa Modello Dataflow, seleziona the Pub/Sub to Pub/Sub template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. (Facoltativo) Per passare dall'elaborazione "exactly-once" alla modalità di streaming "at-least-once", seleziona Almeno una volta.
  8. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

Sostituisci quanto segue:

  • JOB_NAME: un nome job univoco a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • STAGING_LOCATION: la posizione per la gestione temporanea dei file locali (ad esempio, gs://your-bucket/staging)
  • SUBSCRIPTION_NAME: nome della sottoscrizione Pub/Sub
  • TOPIC_NAME: nome dell'argomento Pub/Sub
  • FILTER_KEY: la chiave dell'attributo in base alla quale vengono filtrati gli eventi. Nessun filtro applicato se non viene specificata alcuna chiave.
  • FILTER_VALUE: valore dell'attributo di filtro da utilizzare se viene fornita una chiave di filtro eventi. Accetta una stringa Java Regex valida come valore di filtro eventi. Se viene fornita un'espressione regolare, l'espressione completa deve corrispondere per consentire al messaggio di essere filtrato. Le corrispondenze parziali, ad esempio le sottostringhe, non vengono filtrate. Per impostazione predefinita viene utilizzato un valore di filtro per eventi null.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • STAGING_LOCATION: la posizione per la gestione temporanea dei file locali (ad esempio, gs://your-bucket/staging)
  • SUBSCRIPTION_NAME: nome della sottoscrizione Pub/Sub
  • TOPIC_NAME: nome dell'argomento Pub/Sub
  • FILTER_KEY: la chiave dell'attributo in base alla quale vengono filtrati gli eventi. Nessun filtro applicato se non viene specificata alcuna chiave.
  • FILTER_VALUE: valore dell'attributo di filtro da utilizzare se viene fornita una chiave di filtro eventi. Accetta una stringa Java Regex valida come valore di filtro eventi. Se viene fornita un'espressione regolare, l'espressione completa deve corrispondere per consentire al messaggio di essere filtrato. Le corrispondenze parziali, ad esempio le sottostringhe, non vengono filtrate. Per impostazione predefinita viene utilizzato un valore di filtro per eventi null.

Passaggi successivi