Membuat pipeline Dataflow menggunakan Python
Dalam panduan memulai ini, Anda akan mempelajari cara menggunakan Apache Beam SDK untuk Python guna membangun program yang mendefinisikan pipeline. Kemudian, Anda akan menjalankan pipeline menggunakan runner lokal langsung atau runner berbasis cloud seperti Dataflow. Untuk pengantar pipeline WordCount, lihat video Cara menggunakan WordCount di Apache Beam.
Jika ingin mengikuti panduan langkah demi langkah untuk tugas ini langsung di Konsol Google Cloud, klik Pandu saya:
Sebelum memulai
- Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
- Menginstal Google Cloud CLI.
-
Untuk initialize gcloud CLI, jalankan perintah berikut:
gcloud init
-
Buat atau pilih project Google Cloud.
-
Membuat project Google Cloud:
gcloud projects create PROJECT_ID
Ganti
PROJECT_ID
dengan nama untuk project Google Cloud yang Anda buat. -
Pilih project Google Cloud yang Anda buat:
gcloud config set project PROJECT_ID
Ganti
PROJECT_ID
dengan nama project Google Cloud Anda.
-
-
Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.
-
Aktifkan API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, JSON Google Cloud Storage, BigQuery, Cloud Pub/Sub, Cloud Datastore, dan Cloud Resource Manager:
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Buat kredensial autentikasi lokal untuk Akun Google Anda:
gcloud auth application-default login
-
Berikan peran ke Akun Google Anda. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Ganti
PROJECT_ID
dengan project ID Anda. - Ganti
EMAIL_ADDRESS
dengan alamat email Anda. - Ganti
ROLE
dengan setiap peran individual.
- Ganti
- Menginstal Google Cloud CLI.
-
Untuk initialize gcloud CLI, jalankan perintah berikut:
gcloud init
-
Buat atau pilih project Google Cloud.
-
Membuat project Google Cloud:
gcloud projects create PROJECT_ID
Ganti
PROJECT_ID
dengan nama untuk project Google Cloud yang Anda buat. -
Pilih project Google Cloud yang Anda buat:
gcloud config set project PROJECT_ID
Ganti
PROJECT_ID
dengan nama project Google Cloud Anda.
-
-
Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.
-
Aktifkan API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, JSON Google Cloud Storage, BigQuery, Cloud Pub/Sub, Cloud Datastore, dan Cloud Resource Manager:
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Buat kredensial autentikasi lokal untuk Akun Google Anda:
gcloud auth application-default login
-
Berikan peran ke Akun Google Anda. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Ganti
PROJECT_ID
dengan project ID Anda. - Ganti
EMAIL_ADDRESS
dengan alamat email Anda. - Ganti
ROLE
dengan setiap peran individual.
- Ganti
Berikan peran ke akun layanan default Compute Engine Anda. Jalankan perintah berikut sekali untuk setiap peran IAM berikut:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- Ganti
PROJECT_ID
dengan project ID Anda. - Ganti
PROJECT_NUMBER
dengan nomor project Anda. Untuk menemukan nomor project Anda, lihat Identify projects atau gunakan perintahgcloud projects describe
. - Ganti
SERVICE_ACCOUNT_ROLE
dengan setiap peran individual.
-
Buat bucket Cloud Storage dan konfigurasikan sebagai berikut:
-
Tetapkan kelas penyimpanan ke
S
(Standar). -
Tetapkan lokasi penyimpanan sebagai berikut:
US
(Amerika Serikat). -
Ganti
BUCKET_NAME
dengan nama bucket yang unik. Jangan sertakan informasi sensitif pada nama bucket karena namespace bucket bersifat global dan dapat dilihat publik.
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
Tetapkan kelas penyimpanan ke
- Salin project ID Google Cloud dan nama bucket Cloud Storage. Anda memerlukan nilai-nilai ini nanti dalam dokumen ini.
Menyiapkan lingkungan Anda
Di bagian ini, gunakan command prompt untuk menyiapkan lingkungan virtual Python yang terisolasi guna menjalankan project pipeline Anda menggunakan venv. Proses ini memungkinkan Anda mengisolasi dependensi suatu project dari dependensi project lainnya.
Jika belum memiliki command prompt yang tersedia, Anda dapat menggunakan Cloud Shell. Cloud Shell telah menginstal pengelola paket untuk Python 3, sehingga Anda dapat langsung membuat lingkungan virtual.
Untuk menginstal Python, lalu membuat lingkungan virtual, ikuti langkah-langkah berikut:
- Pastikan Anda memiliki Python 3 dan
pip
yang berjalan di sistem:python --version python -m pip --version
- Jika perlu, instal Python 3, lalu siapkan lingkungan virtual Python: ikuti petunjuk yang diberikan di bagian Menginstal Python dan Menyiapkan venv pada halaman Menyiapkan lingkungan pengembangan Python. Jika menggunakan Python 3.10 atau yang lebih baru, Anda juga harus mengaktifkan Dataflow Runner v2. Untuk menggunakan Runner v1, gunakan Python 3.9 atau yang lebih lama.
Setelah menyelesaikan panduan memulai, Anda dapat menonaktifkan lingkungan virtual dengan menjalankan deactivate
.
Mendapatkan Apache Beam SDK
Apache Beam SDK adalah model pemrograman open source untuk pipeline data. Anda menentukan pipeline dengan program Apache Beam, lalu memilih runner, seperti Dataflow, untuk menjalankan pipeline.
Untuk mendownload dan menginstal Apache Beam SDK, ikuti langkah-langkah berikut:
- Pastikan bahwa Anda berada di lingkungan virtual Python yang Anda buat di bagian sebelumnya.
Pastikan prompt dimulai dengan
<env_name>
, denganenv_name
adalah nama lingkungan virtual. - Instal standar pengemasan roda Python:
pip install wheel
- Instal versi terbaru Apache Beam SDK untuk Python:
pip install 'apache-beam[gcp]'
Di Microsoft Windows, gunakan perintah berikut:
pip install apache-beam[gcp]
Bergantung pada koneksi, penginstalan Anda mungkin memerlukan waktu beberapa saat.
Menjalankan pipeline secara lokal
Untuk melihat bagaimana pipeline berjalan secara lokal, gunakan modul Python siap pakai untuk contoh wordcount
yang disertakan dengan paket apache_beam
.
Contoh pipeline wordcount
melakukan hal berikut:
Mengambil file teks sebagai input.
File teks ini berada di bucket Cloud Storage dengan nama resource
gs://dataflow-samples/shakespeare/kinglear.txt
.- Mengurai setiap baris menjadi kata.
- Melakukan penghitungan frekuensi pada kata yang ditokenkan.
Untuk melakukan staging pipeline wordcount
secara lokal, ikuti langkah-langkah berikut:
- Dari terminal lokal, jalankan contoh
wordcount
:python -m apache_beam.examples.wordcount \ --output outputs
- Lihat output pipeline:
more outputs*
- Untuk keluar, tekan q.
wordcount.py
di Apache Beam GitHub.
Menjalankan pipeline di layanan Dataflow
Di bagian ini, jalankan pipeline contohwordcount
dari paket apache_beam
di layanan Dataflow. Contoh
ini menentukan DataflowRunner
sebagai parameter untuk
--runner
.
- Menjalankan pipeline:
python -m apache_beam.examples.wordcount \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
Ganti kode berikut:
DATAFLOW_REGION
: region tempat Anda ingin men-deploy tugas Dataflow—misalnya,europe-west1
Flag
--region
menggantikan region default yang ditetapkan di server metadata, klien lokal, atau variabel lingkungan Anda.BUCKET_NAME
: nama bucket Cloud Storage yang Anda salin sebelumnyaPROJECT_ID
: project ID Google Cloud yang Anda salin sebelumnya
Lihat hasil Anda
Saat Anda menjalankan pipeline menggunakan Dataflow, hasilnya disimpan dalam bucket Cloud Storage. Di bagian ini, pastikan bahwa pipeline berjalan menggunakan konsol Google Cloud atau terminal lokal.
Konsol Google Cloud
Untuk melihat hasil Anda di Konsol Google Cloud, ikuti langkah-langkah berikut:
- Di konsol Google Cloud, buka halaman Jobs Dataflow.
Halaman Jobs menampilkan detail tugas
wordcount
Anda, termasuk status Running di awal, lalu Succeeded. - Buka halaman Buckets Cloud Storage.
Dari daftar bucket di project Anda, klik bucket penyimpanan yang Anda buat sebelumnya.
Dalam direktori
wordcount
, file output yang dibuat oleh tugas Anda akan ditampilkan.
Terminal lokal
Lihat hasilnya dari terminal Anda atau dengan menggunakan Cloud Shell.
- Untuk menampilkan daftar file output, gunakan perintah
gcloud storage ls
:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
- Untuk melihat hasilnya di file output, gunakan perintah
gcloud storage cat
:gcloud storage cat gs://BUCKET_NAME/results/outputs*
Ganti BUCKET_NAME
dengan nama bucket Cloud Storage yang digunakan dalam program pipeline.
Mengubah kode pipeline
Pipelinewordcount
pada contoh sebelumnya membedakan antara kata-kata huruf besar dan huruf kecil.
Langkah-langkah berikut menunjukkan cara memodifikasi pipeline sehingga pipeline wordcount
tidak peka huruf besar/kecil.
- Di komputer lokal Anda, download salinan terbaru kode
wordcount
dari repositori GitHub Apache Beam. - Dari terminal lokal, jalankan pipeline:
python wordcount.py --output outputs
- Lihat hasil:
more outputs*
- Untuk keluar, tekan q.
- Di editor pilihan Anda, buka file
wordcount.py
. - Di dalam fungsi
run
, periksa langkah-langkah pipeline:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
Setelah
split
, baris dibagi menjadi kata sebagai string. - Untuk membuat string menjadi huruf kecil, ubah baris setelah
split
:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'lowercase' >> beam.Map(str.lower) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
Modifikasi ini memetakan fungsistr.lower
ke setiap kata. Baris ini setara denganbeam.Map(lambda word: str.lower(word))
. - Simpan file dan jalankan tugas
wordcount
yang telah diubah:python wordcount.py --output outputs
- Lihat hasil pipeline yang diubah:
more outputs*
- Untuk keluar, tekan q.
- Jalankan pipeline yang telah diubah di layanan Dataflow:
python wordcount.py \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
Ganti kode berikut:
DATAFLOW_REGION
: region tempat Anda ingin men-deploy tugas DataflowBUCKET_NAME
: nama bucket Cloud Storage AndaPROJECT_ID
: ID project Google Cloud Anda
Pembersihan
Agar tidak menimbulkan biaya pada akun Google Cloud Anda untuk resource yang digunakan pada halaman ini, hapus project Google Cloud yang berisi resource tersebut.
- Di konsol Google Cloud, buka halaman Buckets Cloud Storage.
- Klik kotak centang untuk bucket yang ingin Anda dihapus.
- Untuk menghapus bucket, klik Hapus, lalu ikuti petunjuk.
Jika Anda mempertahankan project Anda, cabut peran yang Anda berikan ke akun layanan default Compute Engine. Jalankan perintah berikut sekali untuk setiap peran IAM berikut:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Opsional: Cabut kredensial autentikasi yang Anda buat, dan hapus file kredensial lokal.
gcloud auth application-default revoke
-
Opsional: Cabut kredensial dari gcloud CLI.
gcloud auth revoke