Modello da Pub/Sub a Pub/Sub

Il modello da Pub/Sub a Pub/Sub è una pipeline in modalità flusso che legge da una sottoscrizione Pub/Sub e li scrive in un altro Pub/Sub. per ogni argomento. La pipeline accetta anche una chiave di attributo del messaggio facoltativa e un valore che può essere utilizzato per filtrare i messaggi da scrivere nell'argomento Pub/Sub. Puoi utilizzare questo per copiare i messaggi da una sottoscrizione Pub/Sub a un'altra sottoscrizione Pub/Sub con un filtro dei messaggi facoltativo.

Requisiti della pipeline

  • La sottoscrizione Pub/Sub di origine deve esistere prima dell'esecuzione.
  • La sottoscrizione Pub/Sub di origine deve essere una sottoscrizione pull.
  • L'argomento Pub/Sub di destinazione deve esistere prima dell'esecuzione.

Parametri del modello

Parametri obbligatori

  • inputSubscription : la sottoscrizione Pub/Sub da cui leggere l'input. ad esempio projects/your-project-id/subscriptions/your-subscription-name).
  • outputTopic : l'argomento Pub/Sub in cui scrivere l'output. ad esempio projects/your-project-id/topics/your-topic-name.

Parametri facoltativi

  • filterKey : la chiave dell'attributo da utilizzare per filtrare gli eventi. Nessun filtro applicato se filterKey non è specificato.
  • filterValue : il valore dell'attributo da utilizzare per filtrare gli eventi quando viene fornito un filterKey. Per impostazione predefinita, viene utilizzato un valore filterValue nullo.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. Il valore predefinito è us-central1.

    Per un elenco di regioni in cui è possibile eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Pub/Sub to Pub/Sub template.
  6. Inserisci i valori parametro negli appositi campi.
  7. (Facoltativo) Per passare dall'elaborazione "exactly-once" all'impostazione modalità flusso di dati almeno una volta, seleziona Almeno una volta.
  8. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

Sostituisci quanto segue:

  • JOB_NAME: un nome job univoco di tua scelta
  • REGION_NAME: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • STAGING_LOCATION: la posizione per i file locali di gestione temporanea (ad esempio, gs://your-bucket/staging)
  • SUBSCRIPTION_NAME: la sottoscrizione Pub/Sub nome
  • TOPIC_NAME: il nome dell'argomento Pub/Sub
  • FILTER_KEY: chiave dell'attributo in base alla quale gli eventi vengono filtrati. Nessun filtro viene applicato se non viene specificata alcuna chiave.
  • FILTER_VALUE: valore dell'attributo di filtro da utilizzare se viene fornita una chiave di filtro eventi. Accetta una stringa Regex Java valida come valore di filtro eventi. Se viene fornita un'espressione regolare, perché il messaggio venga filtrato deve corrispondere all'espressione completa. Corrispondenze parziali (come una sottostringa) non vengono filtrati. Per impostazione predefinita, viene utilizzato un valore del filtro dell'evento nullo.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sul API e i relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Sostituisci quanto segue:

  • PROJECT_ID: L'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco di tua scelta
  • LOCATION: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • STAGING_LOCATION: la posizione per i file locali di gestione temporanea (ad esempio, gs://your-bucket/staging)
  • SUBSCRIPTION_NAME: la sottoscrizione Pub/Sub nome
  • TOPIC_NAME: il nome dell'argomento Pub/Sub
  • FILTER_KEY: chiave dell'attributo in base alla quale gli eventi vengono filtrati. Nessun filtro viene applicato se non viene specificata alcuna chiave.
  • FILTER_VALUE: valore dell'attributo di filtro da utilizzare se viene fornita una chiave di filtro eventi. Accetta una stringa Regex Java valida come valore di filtro eventi. Se viene fornita un'espressione regolare, perché il messaggio venga filtrato deve corrispondere all'espressione completa. Corrispondenze parziali (come una sottostringa) non vengono filtrati. Per impostazione predefinita, viene utilizzato un valore del filtro dell'evento nullo.

Passaggi successivi