Esegui il flusso di messaggi Pub/Sub Lite utilizzando Dataflow

In alternativa alla scrittura e all'esecuzione di programmi di elaborazione dati, puoi utilizzare Dataflow con il connettore I/O Pub/Sub Lite per Apache Beam. Dataflow è un servizio completamente gestito per la trasformazione e l'arricchimento dei dati in modalità flusso (in tempo reale) e batch con gli stessi livelli di affidabilità ed espressività. Esegue in modo affidabile i programmi sviluppati utilizzando l'SDK Apache Beam, che dispone di un set estensibile di potenti astrazioni di elaborazione stateful, e connettori di I/O ad altri sistemi di elaborazione in modalità flusso e batch.

Questa guida rapida mostra come scrivere una pipeline Apache Beam che:

  • Leggi i messaggi da Pub/Sub Lite
  • Finestra (o raggruppa) i messaggi in base al timestamp di pubblicazione
  • Scrivi i messaggi in Cloud Storage

Ti mostra anche come:

  • Invia la pipeline da eseguire su Dataflow
  • Crea un modello flessibile Dataflow dalla tua pipeline

Questo tutorial richiede Maven, ma è anche possibile convertire il progetto di esempio da Maven a Gradle. Per scoprire di più, consulta (Facoltativo) Convertire da Maven a Gradle.

Prima di iniziare

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. Installa Google Cloud CLI.
  3. Per initialize gcloud CLI, esegui questo comando:

    gcloud init
  4. Crea o seleziona un progetto Google Cloud.

    • Crea un progetto Google Cloud:

      gcloud projects create PROJECT_ID

      Sostituisci PROJECT_ID con un nome per il progetto Google Cloud che stai creando.

    • Seleziona il progetto Google Cloud che hai creato:

      gcloud config set project PROJECT_ID

      Sostituisci PROJECT_ID con il nome del tuo progetto Google Cloud.

  5. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  6. Abilita le API Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging.

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  7. Configura l'autenticazione:

    1. Crea l'account di servizio:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Sostituisci SERVICE_ACCOUNT_NAME con un nome per l'account di servizio.

    2. Concedi i ruoli all'account di servizio. Esegui il comando seguente una volta per ciascuno dei seguenti ruoli IAM: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Sostituisci quanto segue:

      • SERVICE_ACCOUNT_NAME: il nome dell'account di servizio.
      • PROJECT_ID: l'ID progetto in cui hai creato l'account di servizio
      • ROLE: il ruolo da concedere
    3. Concedi al tuo Account Google un ruolo che ti consenta di utilizzare i ruoli dell'account di servizio e collegarlo ad altre risorse:

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Sostituisci quanto segue:

      • SERVICE_ACCOUNT_NAME: il nome dell'account di servizio.
      • PROJECT_ID: l'ID progetto in cui hai creato l'account di servizio
      • USER_EMAIL: l'indirizzo email del tuo Account Google
  8. Installa Google Cloud CLI.
  9. Per initialize gcloud CLI, esegui questo comando:

    gcloud init
  10. Crea o seleziona un progetto Google Cloud.

    • Crea un progetto Google Cloud:

      gcloud projects create PROJECT_ID

      Sostituisci PROJECT_ID con un nome per il progetto Google Cloud che stai creando.

    • Seleziona il progetto Google Cloud che hai creato:

      gcloud config set project PROJECT_ID

      Sostituisci PROJECT_ID con il nome del tuo progetto Google Cloud.

  11. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  12. Abilita le API Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging.

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  13. Configura l'autenticazione:

    1. Crea l'account di servizio:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Sostituisci SERVICE_ACCOUNT_NAME con un nome per l'account di servizio.

    2. Concedi i ruoli all'account di servizio. Esegui il comando seguente una volta per ciascuno dei seguenti ruoli IAM: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Sostituisci quanto segue:

      • SERVICE_ACCOUNT_NAME: il nome dell'account di servizio.
      • PROJECT_ID: l'ID progetto in cui hai creato l'account di servizio
      • ROLE: il ruolo da concedere
    3. Concedi al tuo Account Google un ruolo che ti consenta di utilizzare i ruoli dell'account di servizio e collegarlo ad altre risorse:

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Sostituisci quanto segue:

      • SERVICE_ACCOUNT_NAME: il nome dell'account di servizio.
      • PROJECT_ID: l'ID progetto in cui hai creato l'account di servizio
      • USER_EMAIL: l'indirizzo email del tuo Account Google
  14. Crea credenziali di autenticazione locali per il tuo Account Google:

    gcloud auth application-default login

Configura il progetto Pub/Sub Lite

  1. Creare variabili per il bucket Cloud Storage, il progetto e la regione Dataflow. I nomi dei bucket Cloud Storage devono essere univoci a livello globale. La regione Dataflow deve essere una regione valida in cui puoi eseguire il job. Per ulteriori informazioni su regioni e località, consulta Località Dataflow.

    export PROJECT_ID=$(gcloud config get-value project)
    export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
    export BUCKET=BUCKET_NAME
    export DATAFLOW_REGION=DATAFLOW_REGION
    
  2. Crea un bucket Cloud Storage di proprietà di questo progetto:

       gsutil mb gs://$BUCKET
    

crea un argomento e una sottoscrizione Lite a livello di zona Pub/Sub Lite

Creare un argomento Lite Pub/Sub Lite a livello di zona e una sottoscrizione Lite.

Per la località Lite, scegli una località Pub/Sub Lite supportata. Devi anche specificare una zona per la regione. Ad esempio, us-central1-a.

export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \
      --location=$LITE_LOCATION \
      --partitions=1 \
      --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
      --location=$LITE_LOCATION \
      --topic=$TOPIC \
      --starting-offset=beginning

Trasmetti messaggi in Dataflow

Scarica il codice campione della guida rapida

Clona il repository della guida rapida e passa alla directory del codice campione.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics

Codice di esempio

Questo codice campione utilizza Dataflow per:

  • Leggi i messaggi da una sottoscrizione Pub/Sub Lite come origine illimitata.
  • Raggruppa i messaggi in base ai relativi timestamp di pubblicazione, utilizzando finestre temporali fisse e l'attivatore predefinito.
  • Scrivi i messaggi raggruppati in file su Cloud Storage.

Java

Prima di eseguire questo esempio, segui le istruzioni di configurazione Java in Librerie client di Pub/Sub Lite.


import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubliteToGcs {
  /*
   * Define your own configuration options. Add your arguments to be processed
   * by the command-line parser.
   */
  public interface PubsubliteToGcsOptions extends StreamingOptions {
    @Description("Your Pub/Sub Lite subscription.")
    @Required
    String getSubscription();

    void setSubscription(String value);

    @Description("Window size of output files in minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Filename prefix of output files.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  private static final Logger LOG = LoggerFactory.getLogger(PubsubliteToGcs.class);

  public static void main(String[] args) throws InterruptedException {
    // The maximum number of shards when writing output files.
    int numShards = 1;

    PubsubliteToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubliteToGcsOptions.class);

    options.setStreaming(true);

    SubscriberOptions subscriberOptions =
        SubscriberOptions.newBuilder()
            .setSubscriptionPath(SubscriptionPath.parse(options.getSubscription()))
            .build();

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("Read From Pub/Sub Lite", PubsubLiteIO.read(subscriberOptions))
        .apply(
            "Convert messages",
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (SequencedMessage sequencedMessage) -> {
                      String data = sequencedMessage.getMessage().getData().toStringUtf8();
                      LOG.info("Received: " + data);
                      long publishTime = sequencedMessage.getPublishTime().getSeconds();
                      return data + "\t" + publishTime;
                    }))
        .apply(
            "Apply windowing function",
            Window
                // Group the elements using fixed-sized time intervals based on the element
                // timestamp (using the default event time trigger). The element timestamp
                // is the publish timestamp associated with a message.
                //
                // NOTE: If data is not being continuously ingested, such as with a batch or
                // intermittent publisher, the final window will never close as the watermark
                // will not advance. If this is a possibility with your pipeline, you should
                // add an additional processing time trigger to force window closure after
                // enough time has passed. See
                // https://beam.apache.org/documentation/programming-guide/#triggers
                // for more information.
                .<String>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        .apply("Write elements to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline. You may add `.waitUntilFinish()` to observe logs in your console, but
    // `waitUntilFinish()` will not work in Dataflow Flex Templates.
    pipeline.run();
  }
}

Avvia la pipeline Dataflow

Per avviare la pipeline in Dataflow, esegui questo comando:

mvn compile exec:java \
    -Dexec.mainClass=examples.PubsubliteToGcs \
    -Dexec.args=" \
        --subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
        --output=gs://$BUCKET/samples/output \
        --windowSize=1 \
        --project=$PROJECT_ID \
        --region=$DATAFLOW_REGION \
        --tempLocation=gs://$BUCKET/temp \
        --runner=DataflowRunner \
        --serviceAccount=$SERVICE_ACCOUNT"

Il comando precedente avvia un job Dataflow. Segui il link nell'output della console per accedere al job nella console di monitoraggio di Dataflow.

Osserva i progressi del lavoro

Osserva l'avanzamento del job nella console Dataflow.

Vai alla console Dataflow

Apri la visualizzazione dei dettagli del job per vedere:

  • Grafico job
  • Dettagli esecuzione
  • Metriche job

Pubblica alcuni messaggi nell'argomento Lite.

gcloud pubsub lite-topics publish $TOPIC \
    --location=$LITE_LOCATION \
    --message="Hello World!"

Potrebbe essere necessario attendere qualche minuto per visualizzare i messaggi nei log di worker.

Utilizza il comando seguente per controllare quali file sono stati scritti in Cloud Storage.

gsutil ls "gs://$BUCKET/samples/"

L'output dovrebbe essere simile al seguente:

 gs://$BUCKET/samples/output-19:41-19:42-0-of-1
 gs://$BUCKET/samples/output-19:47-19:48-0-of-1
 gs://$BUCKET/samples/output-19:48-19:49-0-of-1

Utilizza il comando seguente per esaminare i contenuti di un file:

gsutil cat "gs://$BUCKET/samples/your-filename"

(Facoltativo) Crea un modello Dataflow

Facoltativamente, puoi creare un modello Dataflow Flex personalizzato in base alla tua pipeline. I modelli Dataflow consentono di eseguire job con parametri di input diversi dalla console Google Cloud o dalla riga di comando, senza la necessità di configurare un ambiente di sviluppo Java completo.

  1. Crea un JAR fat che includa tutte le dipendenze della tua pipeline. Dovresti vedere target/pubsublite-streaming-bundled-1.0.jar dopo l'esecuzione del comando.

    mvn clean package -DskipTests=true
    
  2. Fornisci i nomi e le posizioni del file modello e dell'immagine container del modello.

    export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
    export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
    
  3. Crea un modello flessibile personalizzato. Con l'esempio è stato fornito un file metadata.json obbligatorio contenente le specifiche necessarie per eseguire il job.

    gcloud dataflow flex-template build $TEMPLATE_PATH \
        --image-gcr-path $TEMPLATE_IMAGE \
        --sdk-language "JAVA" \
        --flex-template-base-image "JAVA11" \
        --metadata-file "metadata.json" \
        --jar "target/pubsublite-streaming-bundled-1.0.jar" \
        --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
    
  4. Esegui un job utilizzando il modello flessibile personalizzato.

Console

  1. Crea job da modello:

  2. Inserisci un Nome job.

  3. Inserisci la regione Dataflow.

  4. Scegli il tuo modello personalizzato.

  5. Inserisci il percorso del modello.

  6. Inserisci i parametri richiesti.

  7. Fai clic su Esegui job.

gcloud

gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
     --template-file-gcs-location $TEMPLATE_PATH \
     --parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
     --parameters output="gs://$BUCKET/samples/template-output" \
     --parameters windowSize=1 \
     --region $DATAFLOW_REGION \
     --serviceAccount=$SERVICE_ACCOUNT

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, elimina il progetto Google Cloud con le risorse.

  1. Nella console Dataflow, interrompi il job. Annulla la pipeline anziché svuotarla.

  2. Elimina l'argomento e la sottoscrizione.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  3. Eliminare i file creati dalla pipeline.

    gsutil -m rm -rf "gs://$BUCKET/samples/*"
    gsutil -m rm -rf "gs://$BUCKET/temp/*"
    
  4. Elimina l'immagine modello e il file del modello, se esistenti.

    gcloud container images delete $TEMPLATE_IMAGE
    gsutil rm $TEMPLATE_PATH
    
  5. Rimuovi il bucket Cloud Storage.

    gsutil rb gs://$BUCKET
    

  6. Elimina l'account di servizio:
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. Facoltativo: revoca le credenziali di autenticazione che hai creato ed elimina il file delle credenziali locale.

    gcloud auth application-default revoke
  8. Facoltativo: revoca le credenziali dallgcloud CLI.

    gcloud auth revoke

Passaggi successivi