Membuat pipeline Dataflow menggunakan Python
Dokumen ini menunjukkan cara menggunakan Apache Beam SDK untuk Python guna membuat program yang menentukan pipeline. Kemudian, Anda menjalankan pipeline menggunakan runner lokal langsung atau runner berbasis cloud seperti Dataflow. Untuk pengantar tentang pipeline WordCount, lihat video Cara menggunakan WordCount di Apache Beam.
Untuk mengikuti panduan langkah demi langkah tugas ini langsung di Google Cloud konsol, klik Pandu saya:
Sebelum memulai
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.
-
Untuk melakukan inisialisasi gcloud CLI, jalankan perintah berikut:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
-
Install the Google Cloud CLI.
-
Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.
-
Untuk melakukan inisialisasi gcloud CLI, jalankan perintah berikut:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
Beri peran ke akun layanan default Compute Engine Anda. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- Ganti
PROJECT_ID
dengan project ID Anda. - Ganti
PROJECT_NUMBER
dengan nomor project Anda. Untuk menemukan nomor project Anda, lihat Mengidentifikasi project atau gunakan perintahgcloud projects describe
. - Ganti
SERVICE_ACCOUNT_ROLE
dengan setiap peran individual.
-
Create a Cloud Storage bucket and configure it as follows:
-
Set the storage class to
S
(Standar). -
Tetapkan lokasi penyimpanan sebagai berikut:
US
(Amerika Serikat). -
Ganti
BUCKET_NAME
dengan nama bucket yang unik. Jangan sertakan informasi sensitif pada nama bucket karena namespace bucket bersifat global dan dapat dilihat publik. - Salin ID project Google Cloud dan nama bucket Cloud Storage. Anda memerlukan nilai ini nanti dalam dokumen ini.
- Pastikan Anda telah menjalankan Python 3 dan
pip
di sistem Anda:python --version python -m pip --version
- Jika diperlukan, instal Python 3, lalu siapkan lingkungan virtual Python: ikuti petunjuk yang diberikan di bagian Menginstal Python dan Menyiapkan venv di halaman Menyiapkan lingkungan pengembangan Python.
- Pastikan Anda berada di lingkungan virtual Python yang Anda buat di bagian sebelumnya.
Pastikan perintah dimulai dengan
<env_name>
, denganenv_name
adalah nama lingkungan virtual. - Instal versi terbaru Apache Beam SDK untuk Python:
Menggunakan file teks sebagai input.
File teks ini berada di bucket Cloud Storage dengan nama resource
gs://dataflow-samples/shakespeare/kinglear.txt
.- Mengurai setiap baris menjadi kata-kata.
- Melakukan penghitungan frekuensi pada kata yang di-tokenisasi.
- Dari terminal lokal, jalankan contoh
wordcount
:python -m apache_beam.examples.wordcount \ --output outputs
- Lihat output pipeline:
more outputs*
- Untuk keluar, tekan q.
- Menjalankan pipeline:
python -m apache_beam.examples.wordcount \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
Ganti kode berikut:
DATAFLOW_REGION
: the region tempat Anda ingin men-deploy tugas Dataflow—misalnya,europe-west1
Flag
--region
menggantikan region default yang ditetapkan di server metadata, klien lokal, atau variabel lingkungan.BUCKET_NAME
: nama bucket Cloud Storage yang Anda salin sebelumnyaPROJECT_ID
: Google Cloud project ID yang Anda salin sebelumnya
- Di konsol Google Cloud , buka halaman Jobs Dataflow.
Halaman Jobs menampilkan detail tugas
wordcount
Anda, termasuk status Running pada awalnya, lalu Succeeded. - Buka halaman Bucket Cloud Storage.
Dari daftar bucket di project Anda, klik bucket penyimpanan yang Anda buat sebelumnya.
Di direktori
wordcount
, file output yang dibuat oleh tugas Anda akan ditampilkan.- Untuk mencantumkan file output, gunakan perintah
gcloud storage ls
:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
- Untuk melihat hasil dalam file output, gunakan perintah
gcloud storage cat
:gcloud storage cat gs://BUCKET_NAME/results/outputs*
- Di komputer lokal, download salinan terbaru
kode
wordcount
dari repositori GitHub Apache Beam. - Dari terminal lokal, jalankan pipeline:
python wordcount.py --output outputs
- Lihat hasilnya:
more outputs*
- Untuk keluar, tekan q.
- Di editor pilihan Anda, buka file
wordcount.py
. - Di dalam fungsi
run
, periksa langkah-langkah pipeline:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
Setelah
split
, baris dibagi menjadi kata-kata sebagai string. - Untuk mengubah string menjadi huruf kecil, ubah baris setelah
split
: Modifikasi ini memetakan fungsicounts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'lowercase' >> beam.Map(str.lower) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
str.lower
ke setiap kata. Baris ini setara denganbeam.Map(lambda word: str.lower(word))
. - Simpan file dan jalankan tugas
wordcount
yang telah diubah:python wordcount.py --output outputs
- Lihat hasil pipeline yang diubah:
more outputs*
- Untuk keluar, tekan q.
- Jalankan pipeline yang diubah pada layanan Dataflow:
python wordcount.py \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
Ganti kode berikut:
DATAFLOW_REGION
: region tempat Anda ingin men-deploy tugas DataflowBUCKET_NAME
: nama bucket Cloud Storage AndaPROJECT_ID
: project ID Anda Google Cloud
-
Hapus bucket:
gcloud storage buckets delete BUCKET_NAME
Jika Anda mempertahankan project, batalkan peran yang Anda berikan ke akun layanan default Compute Engine. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
Menyiapkan lingkungan Anda
Di bagian ini, gunakan command prompt untuk menyiapkan lingkungan virtual Python yang terisolasi guna menjalankan project pipeline Anda dengan menggunakan venv. Proses ini memungkinkan Anda mengisolasi dependensi satu project dari dependensi project lainnya.
Jika Anda belum memiliki command prompt yang tersedia, Anda dapat menggunakan Cloud Shell. Cloud Shell telah menginstal pengelola paket untuk Python 3, sehingga Anda dapat langsung membuat lingkungan virtual.
Untuk menginstal Python, lalu membuat lingkungan virtual, ikuti langkah-langkah berikut:
Setelah menyelesaikan panduan memulai, Anda dapat menonaktifkan lingkungan virtual dengan menjalankan
deactivate
.Mendapatkan Apache Beam SDK
Apache Beam SDK adalah model pemrograman open source untuk pipeline data. Anda menentukan pipeline dengan program Apache Beam, lalu memilih runner, seperti Dataflow, untuk menjalankan pipeline Anda.
Untuk mendownload dan menginstal Apache Beam SDK, ikuti langkah-langkah berikut:
pip install apache-beam[gcp]
Menjalankan pipeline secara lokal
Untuk melihat cara menjalankan pipeline secara lokal, gunakan modul Python siap pakai untuk contoh
wordcount
yang disertakan dengan paketapache_beam
.Contoh pipeline
wordcount
melakukan hal berikut:Untuk menyiapkan pipeline
wordcount
secara lokal, ikuti langkah-langkah berikut:wordcount.py
di Apache Beam GitHub.Menjalankan pipeline di layanan Dataflow
Di bagian ini, jalankan contoh pipelinewordcount
dari paketapache_beam
di layanan Dataflow. Contoh ini menentukanDataflowRunner
sebagai parameter untuk--runner
.Melihat hasil Anda
Saat Anda menjalankan pipeline menggunakan Dataflow, hasilnya akan disimpan di bucket Cloud Storage. Di bagian ini, pastikan pipeline berjalan menggunakan konsol Google Cloud atau terminal lokal.
Google Cloud console
Untuk melihat hasil di konsol Google Cloud , ikuti langkah-langkah berikut:
Terminal lokal
Lihat hasil dari terminal Anda atau menggunakan Cloud Shell.
Ganti
BUCKET_NAME
dengan nama bucket Cloud Storage yang digunakan dalam program pipeline.Ubah kode pipeline
Pipelinewordcount
dalam contoh sebelumnya membedakan antara kata huruf besar dan huruf kecil. Langkah-langkah berikut menunjukkan cara mengubah pipeline sehingga pipelinewordcount
tidak peka huruf besar/kecil.Pembersihan
Agar tidak menimbulkan biaya pada akun Google Cloud Anda untuk resource yang digunakan di halaman ini, hapus project Google Cloud yang berisi resource tersebut.
Langkah berikutnya
-
Set the storage class to