Mengalirkan pesan dari Pub/Sub menggunakan Dataflow
Dataflow adalah layanan terkelola sepenuhnya untuk mengubah dan memperkaya data dalam mode streaming (real-time) dan batch dengan keandalan dan ekspresi yang sama. Platform ini menyediakan lingkungan pengembangan pipeline yang disederhanakan menggunakan Apache Beam SDK, yang memiliki beragam rangkaian dasar windowing dan analisis sesi, serta ekosistem konektor sumber dan sink. Panduan memulai ini menunjukkan cara menggunakan Dataflow untuk:
- Membaca pesan yang dipublikasikan ke topik Pub/Sub
- Jendela (atau kelompokkan) pesan menurut stempel waktu
- Menulis pesan ke Cloud Storage
Panduan memulai ini memperkenalkan penggunaan Dataflow dalam Java dan Python. SQL juga didukung. Panduan memulai ini juga ditawarkan sebagai tutorial Google Cloud Skills Boost yang menawarkan kredensial sementara untuk membantu Anda memulai.
Anda juga dapat memulai dengan menggunakan template Dataflow berbasis UI jika tidak ingin melakukan pemrosesan data kustom.
Sebelum memulai
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Buat atau pilih project Google Cloud.
-
Membuat project Google Cloud:
gcloud projects create PROJECT_ID
Ganti
PROJECT_ID
dengan nama untuk project Google Cloud yang Anda buat. -
Pilih project Google Cloud yang Anda buat:
gcloud config set project PROJECT_ID
Ganti
PROJECT_ID
dengan nama project Google Cloud Anda.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Aktifkan API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Menyiapkan autentikasi:
-
Buat akun layanan:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Ganti
SERVICE_ACCOUNT_NAME
dengan nama untuk akun layanan. -
Memberikan peran ke akun layanan. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Ganti kode berikut:
SERVICE_ACCOUNT_NAME
: nama dari akun layanan.PROJECT_ID
: project ID dimana Anda membuat akun layananROLE
: peran yang akan diberikan
-
Memberi Akun Google Anda peran yang memungkinkan Anda menggunakan peran akun layanan dan tambahkan akun layanan tersebut ke resource lain:
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Ganti kode berikut:
SERVICE_ACCOUNT_NAME
: nama dari akun layanan.PROJECT_ID
: project ID dimana Anda membuat akun layananUSER_EMAIL
: alamat email untuk Akun Google Anda
-
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Buat atau pilih project Google Cloud.
-
Membuat project Google Cloud:
gcloud projects create PROJECT_ID
Ganti
PROJECT_ID
dengan nama untuk project Google Cloud yang Anda buat. -
Pilih project Google Cloud yang Anda buat:
gcloud config set project PROJECT_ID
Ganti
PROJECT_ID
dengan nama project Google Cloud Anda.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Aktifkan API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Menyiapkan autentikasi:
-
Buat akun layanan:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Ganti
SERVICE_ACCOUNT_NAME
dengan nama untuk akun layanan. -
Memberikan peran ke akun layanan. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Ganti kode berikut:
SERVICE_ACCOUNT_NAME
: nama dari akun layanan.PROJECT_ID
: project ID dimana Anda membuat akun layananROLE
: peran yang akan diberikan
-
Memberi Akun Google Anda peran yang memungkinkan Anda menggunakan peran akun layanan dan tambahkan akun layanan tersebut ke resource lain:
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Ganti kode berikut:
SERVICE_ACCOUNT_NAME
: nama dari akun layanan.PROJECT_ID
: project ID dimana Anda membuat akun layananUSER_EMAIL
: alamat email untuk Akun Google Anda
-
-
Buat kredensial autentikasi lokal untuk Akun Google Anda:
gcloud auth application-default login
Menyiapkan project Pub/Sub
-
Buat variabel untuk bucket, project, dan region Anda. Nama bucket Cloud Storage harus unik secara global. Pilih region Dataflow yang dekat dengan tempat Anda menjalankan perintah di panduan memulai ini. Nilai variabel
REGION
harus berupa nama wilayah yang valid. Untuk mengetahui informasi selengkapnya tentang region dan lokasi, lihat Lokasi Dataflow.BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
-
Buat bucket Cloud Storage yang dimiliki project ini:
gsutil mb gs://$BUCKET_NAME
-
Buat topik Pub/Sub dalam project ini:
gcloud pubsub topics create $TOPIC_ID
-
Buat tugas Cloud Scheduler di project ini. Tugas ini memublikasikan pesan ke topik Pub/Sub pada interval satu menit.
Jika aplikasi App Engine tidak ada untuk project, langkah ini akan membuat aplikasi.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
Mulai tugas.
gcloud scheduler jobs run publisher-job --location=$REGION
-
Gunakan perintah berikut untuk meng-clone repositori panduan memulai dan membuka direktori kode contoh:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Mengalirkan pesan dari Pub/Sub ke Cloud Storage
Contoh kode
Kode contoh ini menggunakan Dataflow untuk:
- Membaca pesan Pub/Sub.
- Jendela (atau kelompokkan) pesan ke dalam interval ukuran tetap dengan memublikasikan stempel waktu.
Tulis pesan di setiap jendela ke file di Cloud Storage.
Java
Python
Memulai pipeline
Untuk memulai pipeline, jalankan perintah berikut:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
Perintah sebelumnya berjalan secara lokal dan meluncurkan tugas Dataflow yang berjalan di cloud. Saat perintah menampilkan JOB_MESSAGE_DETAILED: Workers
have started successfully
, keluar dari program lokal menggunakan Ctrl+C
.
Mengamati progres tugas dan pipeline
Anda dapat mengamati progres tugas di konsol Dataflow.
Buka tampilan detail pekerjaan untuk melihat:
- Struktur pekerjaan
- Log tugas
- Metrik stage
Anda mungkin perlu menunggu beberapa menit untuk melihat file output di Cloud Storage.
Atau, gunakan command line di bawah untuk memeriksa file mana yang telah ditulis.
gsutil ls gs://${BUCKET_NAME}/samples/
Outputnya akan terlihat seperti berikut ini:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
Pembersihan
Agar tidak menimbulkan biaya pada akun Google Cloud Anda untuk resource yang digunakan pada halaman ini, hapus project Google Cloud yang berisi resource tersebut.
Hapus tugas Cloud Scheduler.
gcloud scheduler jobs delete publisher-job --location=$REGION
Di konsol Dataflow, hentikan tugas. Membatalkan pipeline tanpa menghabiskannya.
Hapus topik.
gcloud pubsub topics delete $TOPIC_ID
Hapus file yang dibuat oleh pipeline.
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*" gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
Hapus bucket Cloud Storage.
gsutil rb gs://${BUCKET_NAME}
-
Hapus akun layanan:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Opsional: Cabut kredensial autentikasi yang Anda buat, dan hapus file kredensial lokal.
gcloud auth application-default revoke
-
Opsional: Cabut kredensial dari gcloud CLI.
gcloud auth revoke
Langkah selanjutnya
Jika ingin membuat jendela pesan Pub/Sub berdasarkan stempel waktu kustom, Anda dapat menentukan stempel waktu sebagai atribut dalam pesan Pub/Sub, lalu gunakan stempel waktu kustom dengan
withTimestampAttribute
PubsubIO.Lihat template Dataflow open source Google yang dirancang untuk streaming.
Baca selengkapnya tentang cara Dataflow terintegrasi dengan Pub/Sub.
Lihat tutorial ini yang membaca dari Pub/Sub dan menulis ke BigQuery menggunakan template Dataflow Flex.
Untuk informasi selengkapnya tentang windowing, lihat contoh Apache Beam Mobile Gaming Pipeline.