Melakukan streaming pesan Pub/Sub Lite menggunakan Dataflow

Sebagai alternatif untuk menulis dan menjalankan program pemrosesan data Anda sendiri, Anda dapat menggunakan Dataflow dengan konektor I/O Pub/Sub Lite untuk Apache Beam. Dataflow adalah layanan terkelola sepenuhnya untuk mengubah dan memperkaya data dalam mode streaming (real-time) dan batch dengan keandalan dan kualitas yang sama. SDK ini menjalankan program yang dikembangkan menggunakan Apache Beam SDK dengan andal, yang memiliki kumpulan abstraksi pemrosesan stateful yang kuat dan dapat diperluas, serta konektor I/O ke sistem streaming dan batch lainnya.

Panduan memulai ini menunjukkan cara menulis pipeline Apache Beam yang akan:

  • Membaca pesan dari Pub/Sub Lite
  • Memisahkan (atau mengelompokkan) pesan menurut stempel waktu publikasi
  • Menulis pesan ke Cloud Storage

Video ini juga menunjukkan cara:

  • Mengirimkan pipeline untuk dijalankan di Dataflow
  • Membuat Template Flex Dataflow dari pipeline Anda

Tutorial ini memerlukan Maven, tetapi Anda juga dapat mengonversi project contoh dari Maven ke Gradle. Untuk mempelajari lebih lanjut, lihat Opsional: Mengonversi dari Maven ke Gradle.

Sebelum memulai

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
    3. Grant the required role to the principal that will attach the service account to other resources.

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • USER_EMAIL: the email address for a Google Account
  8. Install the Google Cloud CLI.
  9. To initialize the gcloud CLI, run the following command:

    gcloud init
  10. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  11. Make sure that billing is enabled for your Google Cloud project.

  12. Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  13. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
    3. Grant the required role to the principal that will attach the service account to other resources.

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • USER_EMAIL: the email address for a Google Account
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login

Menyiapkan project Pub/Sub Lite

  1. Buat variabel untuk bucket Cloud Storage, project, dan region Dataflow Anda. Nama bucket Cloud Storage harus unik secara global. Region Dataflow harus berupa region yang valid tempat Anda dapat menjalankan tugas. Untuk mengetahui informasi selengkapnya tentang region dan lokasi, lihat Lokasi dataflow.

    export PROJECT_ID=$(gcloud config get-value project)
    export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
    export BUCKET=BUCKET_NAME
    export DATAFLOW_REGION=DATAFLOW_REGION
  2. Buat bucket Cloud Storage yang dimiliki oleh project ini:

       gcloud storage buckets create gs://$BUCKET

Membuat topik dan langganan zonal Pub/Sub Lite

Buat topik Pub/Sub Lite zonal dan langganan Lite.

Untuk lokasi Lite, pilih lokasi Pub/Sub Lite yang didukung. Anda juga harus menentukan zona untuk region. Contoh, us-central1-a.

export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \
      --location=$LITE_LOCATION \
      --partitions=1 \
      --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
      --location=$LITE_LOCATION \
      --topic=$TOPIC \
      --starting-offset=beginning

Melakukan streaming pesan ke Dataflow

Mendownload kode contoh quickstart

Clone repositori memulai cepat dan buka direktori kode sampel.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics

Kode contoh

Kode contoh ini menggunakan Dataflow untuk:

  • Membaca pesan dari langganan Pub/Sub Lite sebagai sumber yang tidak terbatas.
  • Mengelompokkan pesan berdasarkan stempel waktu publikasinya, menggunakan periode waktu tetap dan pemicu default.
  • Tulis pesan yang dikelompokkan ke file di Cloud Storage.

Java

Sebelum menjalankan contoh ini, ikuti petunjuk penyiapan Java di Library Klien Pub/Sub Lite.


import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubliteToGcs {
  /*
   * Define your own configuration options. Add your arguments to be processed
   * by the command-line parser.
   */
  public interface PubsubliteToGcsOptions extends StreamingOptions {
    @Description("Your Pub/Sub Lite subscription.")
    @Required
    String getSubscription();

    void setSubscription(String value);

    @Description("Window size of output files in minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Filename prefix of output files.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  private static final Logger LOG = LoggerFactory.getLogger(PubsubliteToGcs.class);

  public static void main(String[] args) throws InterruptedException {
    // The maximum number of shards when writing output files.
    int numShards = 1;

    PubsubliteToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubliteToGcsOptions.class);

    options.setStreaming(true);

    SubscriberOptions subscriberOptions =
        SubscriberOptions.newBuilder()
            .setSubscriptionPath(SubscriptionPath.parse(options.getSubscription()))
            .build();

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("Read From Pub/Sub Lite", PubsubLiteIO.read(subscriberOptions))
        .apply(
            "Convert messages",
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (SequencedMessage sequencedMessage) -> {
                      String data = sequencedMessage.getMessage().getData().toStringUtf8();
                      LOG.info("Received: " + data);
                      long publishTime = sequencedMessage.getPublishTime().getSeconds();
                      return data + "\t" + publishTime;
                    }))
        .apply(
            "Apply windowing function",
            Window
                // Group the elements using fixed-sized time intervals based on the element
                // timestamp (using the default event time trigger). The element timestamp
                // is the publish timestamp associated with a message.
                //
                // NOTE: If data is not being continuously ingested, such as with a batch or
                // intermittent publisher, the final window will never close as the watermark
                // will not advance. If this is a possibility with your pipeline, you should
                // add an additional processing time trigger to force window closure after
                // enough time has passed. See
                // https://beam.apache.org/documentation/programming-guide/#triggers
                // for more information.
                .<String>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        .apply("Write elements to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline. You may add `.waitUntilFinish()` to observe logs in your console, but
    // `waitUntilFinish()` will not work in Dataflow Flex Templates.
    pipeline.run();
  }
}

Memulai pipeline Dataflow

Untuk memulai pipeline di Dataflow, jalankan perintah berikut:

mvn compile exec:java \
    -Dexec.mainClass=examples.PubsubliteToGcs \
    -Dexec.args=" \
        --subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
        --output=gs://$BUCKET/samples/output \
        --windowSize=1 \
        --project=$PROJECT_ID \
        --region=$DATAFLOW_REGION \
        --tempLocation=gs://$BUCKET/temp \
        --runner=DataflowRunner \
        --serviceAccount=$SERVICE_ACCOUNT"

Perintah sebelumnya meluncurkan tugas Dataflow. Ikuti link di output konsol untuk mengakses tugas di konsol pemantauan Dataflow.

Mengamati progres tugas

Amati progres tugas di konsol Dataflow.

Buka konsol Dataflow

Buka tampilan detail tugas untuk melihat:

  • Grafik tugas
  • Detail eksekusi
  • Metrik tugas

Publikasikan beberapa pesan ke topik Lite Anda.

gcloud pubsub lite-topics publish $TOPIC \
    --location=$LITE_LOCATION \
    --message="Hello World!"

Anda mungkin harus menunggu beberapa menit untuk melihat pesan di Log Pekerja.

Gunakan perintah di bawah untuk memeriksa file mana yang telah ditulis ke Cloud Storage.

gcloud storage ls "gs://$BUCKET/samples/"

Outputnya akan terlihat seperti berikut ini:

 gs://$BUCKET/samples/output-19:41-19:42-0-of-1
 gs://$BUCKET/samples/output-19:47-19:48-0-of-1
 gs://$BUCKET/samples/output-19:48-19:49-0-of-1

Gunakan perintah di bawah untuk melihat konten dalam file:

gcloud storage cat "gs://$BUCKET/samples/your-filename"

Opsional: Membuat template Dataflow

Secara opsional, Anda dapat membuat Template Flex Dataflow kustom berdasarkan pipeline. Dengan template Dataflow, Anda dapat menjalankan tugas dengan parameter input yang berbeda dari konsol Google Cloud atau command line tanpa perlu menyiapkan lingkungan pengembangan Java lengkap.

  1. Buat fat JAR yang menyertakan semua dependensi pipeline Anda. Anda akan melihat target/pubsublite-streaming-bundled-1.0.jar setelah perintah berjalan.

    mvn clean package -DskipTests=true
  2. Berikan nama dan lokasi untuk file template dan gambar penampung template.

    export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
    export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
  3. Buat template flex kustom. File metadata.json yang diperlukan, yang berisi spesifikasi yang diperlukan untuk menjalankan tugas, telah disediakan dengan contoh.

    gcloud dataflow flex-template build $TEMPLATE_PATH \
        --image-gcr-path $TEMPLATE_IMAGE \
        --sdk-language "JAVA" \
        --flex-template-base-image "JAVA11" \
        --metadata-file "metadata.json" \
        --jar "target/pubsublite-streaming-bundled-1.0.jar" \
        --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
  4. Jalankan tugas menggunakan template flex kustom.

Konsol

  1. Buat tugas dari template.

  2. Masukkan Nama tugas.

  3. Masukkan region Dataflow.

  4. Pilih Template Kustom Anda.

  5. Masukkan jalur template.

  6. Masukkan parameter yang diperlukan.

  7. Klik Run job.

gcloud

gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
     --template-file-gcs-location $TEMPLATE_PATH \
     --parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
     --parameters output="gs://$BUCKET/samples/template-output" \
     --parameters windowSize=1 \
     --region $DATAFLOW_REGION \ 
     --serviceAccount=$SERVICE_ACCOUNT

Pembersihan

Agar tidak menimbulkan biaya pada akun Google Cloud Anda untuk resource yang digunakan pada halaman ini, hapus project Google Cloud yang berisi resource tersebut.

  1. Di konsol Dataflow, hentikan tugas. Batalkan pipeline, bukan menghabiskannya.

  2. Hapus topik dan langganan.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
  3. Hapus file yang dibuat oleh pipeline.

    gcloud storage rm "gs://$BUCKET/samples/*" --recursive --continue-on-error
    gcloud storage rm "gs://$BUCKET/temp/*" --recursive --continue-on-error
  4. Hapus gambar template dan file template jika ada.

    gcloud container images delete $TEMPLATE_IMAGE
    gcloud storage rm $TEMPLATE_PATH
  5. Hapus bucket Cloud Storage.

    gcloud storage rm gs://$BUCKET --recursive

  6. Hapus akun layanan:
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  8. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

Langkah selanjutnya