Melakukan streaming pesan Pub/Sub Lite menggunakan Dataflow
Sebagai alternatif untuk menulis dan menjalankan program pemrosesan data Anda sendiri, Anda dapat menggunakan Dataflow dengan konektor I/O Pub/Sub Lite untuk Apache Beam. Dataflow adalah layanan terkelola sepenuhnya untuk mengubah dan mempersempit data dalam mode streaming (real-time) dan batch dengan keandalan dan kualitas yang sama. SDK ini menjalankan program yang dikembangkan menggunakan Apache Beam SDK dengan andal, yang memiliki kumpulan abstraksi pemrosesan stateful yang kuat dan dapat diperluas, serta konektor I/O ke sistem streaming dan batch lainnya.
Panduan memulai ini menunjukkan cara menulis pipeline Apache Beam yang akan:
- Membaca pesan dari Pub/Sub Lite
- Memisahkan (atau mengelompokkan) pesan menurut stempel waktu publikasi
- Menulis pesan ke Cloud Storage
Video ini juga menunjukkan cara:
- Mengirimkan pipeline untuk dijalankan di Dataflow
- Membuat Template Flex Dataflow dari pipeline Anda
Tutorial ini memerlukan Maven, tetapi Anda juga dapat mengonversi project contoh dari Maven ke Gradle. Untuk mempelajari lebih lanjut, lihat Opsional: Mengonversi dari Maven ke Gradle.
Sebelum memulai
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Buat atau pilih project Google Cloud.
-
Membuat project Google Cloud:
gcloud projects create PROJECT_ID
Ganti
PROJECT_ID
dengan nama untuk project Google Cloud yang Anda buat. -
Pilih project Google Cloud yang Anda buat:
gcloud config set project PROJECT_ID
Ganti
PROJECT_ID
dengan nama project Google Cloud Anda.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Aktifkan API Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging:
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Menyiapkan autentikasi:
-
Buat akun layanan:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Ganti
SERVICE_ACCOUNT_NAME
dengan nama untuk akun layanan. -
Memberikan peran ke akun layanan. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Ganti kode berikut:
SERVICE_ACCOUNT_NAME
: nama dari akun layanan.PROJECT_ID
: project ID dimana Anda membuat akun layananROLE
: peran yang akan diberikan
-
Memberi Akun Google Anda peran yang memungkinkan Anda menggunakan peran akun layanan dan tambahkan akun layanan tersebut ke resource lain:
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Ganti kode berikut:
SERVICE_ACCOUNT_NAME
: nama dari akun layanan.PROJECT_ID
: project ID dimana Anda membuat akun layananUSER_EMAIL
: alamat email untuk Akun Google Anda
-
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Buat atau pilih project Google Cloud.
-
Membuat project Google Cloud:
gcloud projects create PROJECT_ID
Ganti
PROJECT_ID
dengan nama untuk project Google Cloud yang Anda buat. -
Pilih project Google Cloud yang Anda buat:
gcloud config set project PROJECT_ID
Ganti
PROJECT_ID
dengan nama project Google Cloud Anda.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Aktifkan API Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging:
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Menyiapkan autentikasi:
-
Buat akun layanan:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Ganti
SERVICE_ACCOUNT_NAME
dengan nama untuk akun layanan. -
Memberikan peran ke akun layanan. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Ganti kode berikut:
SERVICE_ACCOUNT_NAME
: nama dari akun layanan.PROJECT_ID
: project ID dimana Anda membuat akun layananROLE
: peran yang akan diberikan
-
Memberi Akun Google Anda peran yang memungkinkan Anda menggunakan peran akun layanan dan tambahkan akun layanan tersebut ke resource lain:
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Ganti kode berikut:
SERVICE_ACCOUNT_NAME
: nama dari akun layanan.PROJECT_ID
: project ID dimana Anda membuat akun layananUSER_EMAIL
: alamat email untuk Akun Google Anda
-
-
Buat kredensial autentikasi lokal untuk Akun Google Anda:
gcloud auth application-default login
Menyiapkan project Pub/Sub Lite
Buat variabel untuk bucket Cloud Storage, project, dan region Dataflow Anda. Nama bucket Cloud Storage harus unik secara global. Region Dataflow harus berupa region yang valid tempat Anda dapat menjalankan tugas. Untuk mengetahui informasi selengkapnya tentang region dan lokasi, lihat Lokasi dataflow.
export PROJECT_ID=$(gcloud config get-value project)
export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.
export BUCKET=BUCKET_NAME
export DATAFLOW_REGION=DATAFLOW_REGION
Buat bucket Cloud Storage yang dimiliki oleh project ini:
gcloud storage buckets create gs://$BUCKET
Membuat topik dan langganan zonal Pub/Sub Lite
Buat topik Pub/Sub Lite zonal dan langganan Lite.
Untuk lokasi Lite, pilih
lokasi Pub/Sub Lite yang didukung. Anda juga harus
menentukan zona untuk region. Contoh, us-central1-a
.
export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \ --location=$LITE_LOCATION \ --partitions=1 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$LITE_LOCATION \ --topic=$TOPIC \ --starting-offset=beginning
Melakukan streaming pesan ke Dataflow
Mendownload kode contoh quickstart
Clone repositori memulai cepat dan buka direktori kode sampel.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics
Kode contoh
Kode contoh ini menggunakan Dataflow untuk:
- Membaca pesan dari langganan Pub/Sub Lite sebagai sumber yang tidak terbatas.
- Mengelompokkan pesan berdasarkan stempel waktu publikasinya, menggunakan periode waktu tetap dan pemicu default.
Tulis pesan yang dikelompokkan ke file di Cloud Storage.
Java
Sebelum menjalankan contoh ini, ikuti petunjuk penyiapan Java di Library Klien Pub/Sub Lite.
Memulai pipeline Dataflow
Untuk memulai pipeline di Dataflow, jalankan perintah berikut:
mvn compile exec:java \
-Dexec.mainClass=examples.PubsubliteToGcs \
-Dexec.args=" \
--subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
--output=gs://$BUCKET/samples/output \
--windowSize=1 \
--project=$PROJECT_ID \
--region=$DATAFLOW_REGION \
--tempLocation=gs://$BUCKET/temp \
--runner=DataflowRunner \
--serviceAccount=$SERVICE_ACCOUNT"
Perintah sebelumnya meluncurkan tugas Dataflow. Ikuti link di output konsol untuk mengakses tugas di konsol pemantauan Dataflow.
Mengamati progres tugas
Amati progres tugas di konsol Dataflow.
Buka tampilan detail tugas untuk melihat:
- Grafik tugas
- Detail eksekusi
- Metrik tugas
Publikasikan beberapa pesan ke topik Lite Anda.
gcloud pubsub lite-topics publish $TOPIC \
--location=$LITE_LOCATION \
--message="Hello World!"
Anda mungkin harus menunggu beberapa menit untuk melihat pesan di Log Pekerja.
Gunakan perintah di bawah untuk memeriksa file mana yang telah ditulis ke Cloud Storage.
gcloud storage ls "gs://$BUCKET/samples/"
Outputnya akan terlihat seperti berikut ini:
gs://$BUCKET/samples/output-19:41-19:42-0-of-1
gs://$BUCKET/samples/output-19:47-19:48-0-of-1
gs://$BUCKET/samples/output-19:48-19:49-0-of-1
Gunakan perintah di bawah untuk melihat konten dalam file:
gcloud storage cat "gs://$BUCKET/samples/your-filename"
Opsional: Membuat template Dataflow
Secara opsional, Anda dapat membuat Template Flex Dataflow kustom berdasarkan pipeline. Dengan template Dataflow, Anda dapat menjalankan tugas dengan parameter input yang berbeda dari konsol Google Cloud atau command line tanpa perlu menyiapkan lingkungan pengembangan Java lengkap.
Buat JAR tebal yang menyertakan semua dependensi pipeline Anda. Anda akan melihat
target/pubsublite-streaming-bundled-1.0.jar
setelah perintah berjalan.mvn clean package -DskipTests=true
Berikan nama dan lokasi untuk file template dan gambar penampung template.
export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
Buat template flex kustom. File
metadata.json
yang diperlukan, yang berisi spesifikasi yang diperlukan untuk menjalankan tugas, telah diberikan dengan contoh.gcloud dataflow flex-template build $TEMPLATE_PATH \ --image-gcr-path $TEMPLATE_IMAGE \ --sdk-language "JAVA" \ --flex-template-base-image "JAVA11" \ --metadata-file "metadata.json" \ --jar "target/pubsublite-streaming-bundled-1.0.jar" \ --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
Jalankan tugas menggunakan template flex kustom.
Konsol
Masukkan Nama tugas.
Masukkan region Dataflow.
Pilih Template Kustom Anda.
Masukkan jalur template.
Masukkan parameter yang diperlukan.
Klik Run job.
gcloud
gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
--template-file-gcs-location $TEMPLATE_PATH \
--parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
--parameters output="gs://$BUCKET/samples/template-output" \
--parameters windowSize=1 \
--region $DATAFLOW_REGION \
--serviceAccount=$SERVICE_ACCOUNT
Pembersihan
Agar tidak menimbulkan biaya pada akun Google Cloud Anda untuk resource yang digunakan pada halaman ini, hapus project Google Cloud yang berisi resource tersebut.
Di konsol Dataflow, hentikan tugas. Batalkan pipeline, bukan menghabiskannya.
Hapus topik dan langganan.
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
Hapus file yang dibuat oleh pipeline.
gcloud storage rm "gs://$BUCKET/samples/*" --recursive --continue-on-error
gcloud storage rm "gs://$BUCKET/temp/*" --recursive --continue-on-error
Hapus gambar template dan file template jika ada.
gcloud container images delete $TEMPLATE_IMAGE
gcloud storage rm $TEMPLATE_PATH
Hapus bucket Cloud Storage.
gcloud storage rm gs://$BUCKET --recursive
-
Hapus akun layanan:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Opsional: Cabut kredensial autentikasi yang Anda buat, dan hapus file kredensial lokal.
gcloud auth application-default revoke
-
Opsional: Cabut kredensial dari gcloud CLI.
gcloud auth revoke
Langkah selanjutnya
Baca selengkapnya tentang Mengonfigurasi Template Flex Dataflow.
Memahami pipeline streaming Dataflow.