Melakukan streaming pesan Pub/Sub Lite menggunakan Dataflow

Sebagai alternatif untuk menulis dan menjalankan program pemrosesan data Anda sendiri, Anda dapat menggunakan Dataflow dengan konektor I/O Pub/Sub Lite untuk Apache Beam. Dataflow adalah layanan terkelola sepenuhnya untuk mengubah dan memperkaya data dalam mode streaming (real-time) dan batch dengan keandalan dan ekspresi yang sama. API ini menjalankan program yang dikembangkan menggunakan Apache Beam SDK, yang memiliki serangkaian abstraksi pemrosesan stateful yang andal dan konektor I/O ke sistem streaming dan batch lainnya.

Panduan memulai ini menunjukkan cara menulis pipeline Apache Beam yang akan:

  • Membaca pesan dari Pub/Sub Lite
  • Jendela (atau kelompokkan) pesan menurut stempel waktu publikasi
  • Menulis pesan ke Cloud Storage

Bagian ini juga menunjukkan cara:

  • Mengirim pipeline untuk dijalankan di Dataflow
  • Membuat Template Dataflow Flex dari pipeline Anda

Tutorial ini memerlukan Maven, tetapi Anda juga dapat mengonversi project contoh dari Maven ke Gradle. Untuk mempelajari lebih lanjut, lihat Opsional: Mengonversi dari Maven ke Gradle.

Sebelum memulai

  1. Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. Menginstal Google Cloud CLI.
  3. Untuk initialize gcloud CLI, jalankan perintah berikut:

    gcloud init
  4. Buat atau pilih project Google Cloud.

    • Membuat project Google Cloud:

      gcloud projects create PROJECT_ID

      Ganti PROJECT_ID dengan nama untuk project Google Cloud yang Anda buat.

    • Pilih project Google Cloud yang Anda buat:

      gcloud config set project PROJECT_ID

      Ganti PROJECT_ID dengan nama project Google Cloud Anda.

  5. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  6. Aktifkan API Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging:

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  7. Menyiapkan autentikasi:

    1. Buat akun layanan:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Ganti SERVICE_ACCOUNT_NAME dengan nama untuk akun layanan.

    2. Memberikan peran ke akun layanan. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Ganti kode berikut:

      • SERVICE_ACCOUNT_NAME: nama dari akun layanan.
      • PROJECT_ID: project ID dimana Anda membuat akun layanan
      • ROLE: peran yang akan diberikan
    3. Memberi Akun Google Anda peran yang memungkinkan Anda menggunakan peran akun layanan dan tambahkan akun layanan tersebut ke resource lain:

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Ganti kode berikut:

      • SERVICE_ACCOUNT_NAME: nama dari akun layanan.
      • PROJECT_ID: project ID dimana Anda membuat akun layanan
      • USER_EMAIL: alamat email untuk Akun Google Anda
  8. Menginstal Google Cloud CLI.
  9. Untuk initialize gcloud CLI, jalankan perintah berikut:

    gcloud init
  10. Buat atau pilih project Google Cloud.

    • Membuat project Google Cloud:

      gcloud projects create PROJECT_ID

      Ganti PROJECT_ID dengan nama untuk project Google Cloud yang Anda buat.

    • Pilih project Google Cloud yang Anda buat:

      gcloud config set project PROJECT_ID

      Ganti PROJECT_ID dengan nama project Google Cloud Anda.

  11. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  12. Aktifkan API Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging:

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  13. Menyiapkan autentikasi:

    1. Buat akun layanan:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Ganti SERVICE_ACCOUNT_NAME dengan nama untuk akun layanan.

    2. Memberikan peran ke akun layanan. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Ganti kode berikut:

      • SERVICE_ACCOUNT_NAME: nama dari akun layanan.
      • PROJECT_ID: project ID dimana Anda membuat akun layanan
      • ROLE: peran yang akan diberikan
    3. Memberi Akun Google Anda peran yang memungkinkan Anda menggunakan peran akun layanan dan tambahkan akun layanan tersebut ke resource lain:

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Ganti kode berikut:

      • SERVICE_ACCOUNT_NAME: nama dari akun layanan.
      • PROJECT_ID: project ID dimana Anda membuat akun layanan
      • USER_EMAIL: alamat email untuk Akun Google Anda
  14. Buat kredensial autentikasi lokal untuk Akun Google Anda:

    gcloud auth application-default login

Menyiapkan project Pub/Sub Lite

  1. Buat variabel untuk bucket Cloud Storage, project, dan region Dataflow. Nama bucket Cloud Storage harus unik secara global. Region Dataflow harus merupakan region yang valid, tempat Anda dapat menjalankan tugas. Untuk mengetahui informasi selengkapnya tentang region dan lokasi, lihat Lokasi Dataflow.

    export PROJECT_ID=$(gcloud config get-value project)
    export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
    export BUCKET=BUCKET_NAME
    export DATAFLOW_REGION=DATAFLOW_REGION
    
  2. Buat bucket Cloud Storage yang dimiliki project ini:

       gsutil mb gs://$BUCKET
    

Membuat topik dan langganan Pub/Sub Lite zona Lite

Buat topik Pub/Sub Lite zona dan langganan Lite.

Untuk lokasi Lite, pilih lokasi Pub/Sub Lite yang didukung. Anda juga harus menentukan zona untuk region tersebut. Contoh, us-central1-a.

export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \
      --location=$LITE_LOCATION \
      --partitions=1 \
      --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
      --location=$LITE_LOCATION \
      --topic=$TOPIC \
      --starting-offset=beginning

Mengalirkan pesan ke Dataflow

Download kode contoh panduan memulai

Clone repositori panduan memulai dan buka direktori kode contoh.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics

Kode contoh

Kode contoh ini menggunakan Dataflow untuk:

  • Membaca pesan dari langganan Pub/Sub Lite sebagai sumber tidak terbatas.
  • Kelompokkan pesan berdasarkan stempel waktu publikasinya, menggunakan periode waktu tetap dan pemicu default.
  • Menulis pesan yang dikelompokkan ke file di Cloud Storage.

Java

Sebelum menjalankan contoh ini, ikuti petunjuk penyiapan Java di Library Klien Pub/Sub Lite.


import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubliteToGcs {
  /*
   * Define your own configuration options. Add your arguments to be processed
   * by the command-line parser.
   */
  public interface PubsubliteToGcsOptions extends StreamingOptions {
    @Description("Your Pub/Sub Lite subscription.")
    @Required
    String getSubscription();

    void setSubscription(String value);

    @Description("Window size of output files in minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Filename prefix of output files.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  private static final Logger LOG = LoggerFactory.getLogger(PubsubliteToGcs.class);

  public static void main(String[] args) throws InterruptedException {
    // The maximum number of shards when writing output files.
    int numShards = 1;

    PubsubliteToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubliteToGcsOptions.class);

    options.setStreaming(true);

    SubscriberOptions subscriberOptions =
        SubscriberOptions.newBuilder()
            .setSubscriptionPath(SubscriptionPath.parse(options.getSubscription()))
            .build();

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("Read From Pub/Sub Lite", PubsubLiteIO.read(subscriberOptions))
        .apply(
            "Convert messages",
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (SequencedMessage sequencedMessage) -> {
                      String data = sequencedMessage.getMessage().getData().toStringUtf8();
                      LOG.info("Received: " + data);
                      long publishTime = sequencedMessage.getPublishTime().getSeconds();
                      return data + "\t" + publishTime;
                    }))
        .apply(
            "Apply windowing function",
            Window
                // Group the elements using fixed-sized time intervals based on the element
                // timestamp (using the default event time trigger). The element timestamp
                // is the publish timestamp associated with a message.
                //
                // NOTE: If data is not being continuously ingested, such as with a batch or
                // intermittent publisher, the final window will never close as the watermark
                // will not advance. If this is a possibility with your pipeline, you should
                // add an additional processing time trigger to force window closure after
                // enough time has passed. See
                // https://beam.apache.org/documentation/programming-guide/#triggers
                // for more information.
                .<String>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        .apply("Write elements to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline. You may add `.waitUntilFinish()` to observe logs in your console, but
    // `waitUntilFinish()` will not work in Dataflow Flex Templates.
    pipeline.run();
  }
}

Memulai pipeline Dataflow

Untuk memulai pipeline di Dataflow, jalankan perintah berikut:

mvn compile exec:java \
    -Dexec.mainClass=examples.PubsubliteToGcs \
    -Dexec.args=" \
        --subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
        --output=gs://$BUCKET/samples/output \
        --windowSize=1 \
        --project=$PROJECT_ID \
        --region=$DATAFLOW_REGION \
        --tempLocation=gs://$BUCKET/temp \
        --runner=DataflowRunner \
        --serviceAccount=$SERVICE_ACCOUNT"

Perintah sebelumnya meluncurkan tugas Dataflow. Ikuti link di output konsol untuk mengakses tugas di konsol pemantauan Dataflow.

Mengamati progres tugas

Mengamati progres tugas di konsol Dataflow.

Buka konsol Dataflow

Buka tampilan detail pekerjaan untuk melihat:

  • Grafik tugas
  • Detail eksekusi
  • Metrik tugas

Publikasikan beberapa pesan ke topik Lite Anda.

gcloud pubsub lite-topics publish $TOPIC \
    --location=$LITE_LOCATION \
    --message="Hello World!"

Anda mungkin harus menunggu beberapa menit untuk melihat pesan di Log Pekerja.

Gunakan perintah di bawah ini untuk memeriksa file mana yang telah ditulis ke Cloud Storage.

gsutil ls "gs://$BUCKET/samples/"

Outputnya akan terlihat seperti berikut ini:

 gs://$BUCKET/samples/output-19:41-19:42-0-of-1
 gs://$BUCKET/samples/output-19:47-19:48-0-of-1
 gs://$BUCKET/samples/output-19:48-19:49-0-of-1

Gunakan perintah di bawah untuk melihat konten dalam file:

gsutil cat "gs://$BUCKET/samples/your-filename"

Opsional: Membuat template Dataflow

Secara opsional, Anda dapat membuat Template Dataflow Flex kustom berdasarkan pipeline Anda. Dengan template Dataflow, Anda dapat menjalankan tugas dengan berbagai parameter input dari Konsol Google Cloud atau command line tanpa perlu menyiapkan lingkungan pengembangan Java yang lengkap.

  1. Buat JAR lemak yang menyertakan semua dependensi pipeline Anda. Anda akan melihat target/pubsublite-streaming-bundled-1.0.jar setelah perintah berjalan.

    mvn clean package -DskipTests=true
    
  2. Berikan nama dan lokasi untuk file template dan gambar penampung template Anda.

    export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
    export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
    
  3. Buat template fleksibel kustom. File metadata.json yang diperlukan, yang berisi spesifikasi yang diperlukan untuk menjalankan tugas, telah diberikan dengan contoh.

    gcloud dataflow flex-template build $TEMPLATE_PATH \
        --image-gcr-path $TEMPLATE_IMAGE \
        --sdk-language "JAVA" \
        --flex-template-base-image "JAVA11" \
        --metadata-file "metadata.json" \
        --jar "target/pubsublite-streaming-bundled-1.0.jar" \
        --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
    
  4. Jalankan tugas menggunakan template flex kustom.

Konsol

  1. Membuat tugas dari template.

  2. Masukkan Nama pekerjaan.

  3. Masukkan Region Dataflow Anda.

  4. Pilih Template Kustom.

  5. Masukkan jalur template Anda.

  6. Masukkan parameter yang diperlukan.

  7. Klik Run job.

gcloud

gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
     --template-file-gcs-location $TEMPLATE_PATH \
     --parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
     --parameters output="gs://$BUCKET/samples/template-output" \
     --parameters windowSize=1 \
     --region $DATAFLOW_REGION \
     --serviceAccount=$SERVICE_ACCOUNT

Pembersihan

Agar tidak menimbulkan biaya pada akun Google Cloud Anda untuk resource yang digunakan pada halaman ini, hapus project Google Cloud yang berisi resource tersebut.

  1. Di konsol Dataflow, hentikan tugas. Membatalkan pipeline, bukan menghabiskannya.

  2. Hapus topik dan langganan.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  3. Hapus file yang dibuat oleh pipeline.

    gsutil -m rm -rf "gs://$BUCKET/samples/*"
    gsutil -m rm -rf "gs://$BUCKET/temp/*"
    
  4. Hapus gambar template dan file template jika ada.

    gcloud container images delete $TEMPLATE_IMAGE
    gsutil rm $TEMPLATE_PATH
    
  5. Hapus bucket Cloud Storage.

    gsutil rb gs://$BUCKET
    

  6. Hapus akun layanan:
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. Opsional: Cabut kredensial autentikasi yang Anda buat, dan hapus file kredensial lokal.

    gcloud auth application-default revoke
  8. Opsional: Cabut kredensial dari gcloud CLI.

    gcloud auth revoke

Langkah selanjutnya