Membuat pipeline Dataflow menggunakan Go
Halaman ini menunjukkan cara menggunakan Apache Beam SDK untuk Go guna mem-build program yang menentukan pipeline. Kemudian, Anda menjalankan pipeline secara lokal dan di layanan Dataflow. Untuk pengantar pipeline WordCount, lihat video Cara menggunakan WordCount di Apache Beam.
Sebelum memulai
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Berikan peran ke akun layanan default Compute Engine Anda. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- Ganti
PROJECT_ID
dengan project ID Anda. - Ganti
PROJECT_NUMBER
dengan nomor project Anda. Untuk menemukan nomor project Anda, lihat Mengidentifikasi project atau gunakan perintahgcloud projects describe
. - Ganti
SERVICE_ACCOUNT_ROLE
dengan setiap peran individual.
-
Create a Cloud Storage bucket and configure it as follows:
-
Set the storage class to
S
(Standar). -
Tetapkan lokasi penyimpanan sebagai berikut:
US
(Amerika Serikat). -
Ganti
BUCKET_NAME
dengan nama bucket yang unik. Jangan sertakan informasi sensitif pada nama bucket karena namespace bucket bersifat global dan dapat dilihat publik. - Salin ID project Google Cloud dan nama bucket Cloud Storage. Anda memerlukan nilai ini nanti dalam panduan memulai ini.
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
Set the storage class to
Menyiapkan lingkungan pengembangan
Apache Beam SDK adalah model pemrograman open source untuk pipeline data. Anda menentukan pipeline dengan program Apache Beam, lalu memilih runner, seperti Dataflow, untuk menjalankan pipeline Anda.
Sebaiknya gunakan Go versi terbaru saat menggunakan Apache Beam SDK untuk Go. Jika Anda belum menginstal Go versi terbaru, gunakan Panduan download dan penginstalan Go untuk mendownload dan menginstal Go untuk sistem operasi tertentu.
Untuk memverifikasi versi Go yang telah Anda instal, jalankan perintah berikut di terminal lokal:
go version
Menjalankan contoh wordcount Beam
Apache Beam SDK untuk Go menyertakan
contoh pipeline wordcount
.
Contoh wordcount
melakukan hal berikut:
- Membaca file teks sebagai input. Secara default, fungsi ini membaca file teks yang berada di bucket Cloud Storage dengan nama resource
gs://dataflow-samples/shakespeare/kinglear.txt
. - Mengurai setiap baris menjadi kata.
- Melakukan penghitungan frekuensi pada kata yang ditokenisasi.
Untuk menjalankan contoh Beam wordcount
versi terbaru di komputer lokal,
gunakan perintah berikut. Flag input
menentukan file yang akan dibaca,
dan flag output
menentukan nama file untuk output jumlah frekuensi.
go run github.com/apache/beam/sdks/v2/go/examples/wordcount@latest \
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output outputs
Setelah pipeline selesai, lihat hasil output:
more outputs*
Untuk keluar, tekan q.
Mengubah kode pipeline
Pipeline wordcount
Beam membedakan antara kata
besar dan kecil. Langkah-langkah berikut menunjukkan cara membuat modul Go Anda sendiri, mengubah
pipeline wordcount
sehingga pipeline tidak peka huruf besar/kecil, dan menjalankannya di
Dataflow.
Membuat modul Go
Untuk membuat perubahan pada kode pipeline, ikuti langkah-langkah berikut.
Buat direktori untuk modul Go di lokasi pilihan Anda:
mkdir wordcount
cd wordcount
Buat modul Go. Untuk contoh ini, gunakan
example/dataflow
sebagai jalur modul.go mod init example/dataflow
Download salinan terbaru kode
wordcount
dari repositori GitHub Apache Beam. Masukkan file ini ke dalam direktoriwordcount
yang Anda buat.Jika menggunakan sistem operasi non-Linux, Anda harus mendapatkan paket Go
unix
. Paket ini diperlukan untuk menjalankan pipeline di layanan Dataflow.go get -u golang.org/x/sys/unix
Pastikan file
go.mod
cocok dengan kode sumber modul:go mod tidy
Menjalankan pipeline yang tidak diubah
Pastikan pipeline wordcount
yang tidak dimodifikasi berjalan secara lokal.
Dari terminal, build dan jalankan pipeline secara lokal:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
Lihat hasil output:
more outputs*
Untuk keluar, tekan q.
Mengubah kode pipeline
Untuk mengubah pipeline agar tidak peka huruf besar/kecil, ubah kode untuk menerapkan fungsi strings.ToLower
ke semua kata.
Di editor pilihan Anda, buka file
wordcount.go
.Periksa blok
init
(komentar telah dihapus agar lebih jelas):func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() }
Tambahkan baris baru untuk mendaftarkan fungsi
strings.ToLower
:func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() register.Function1x1(strings.ToLower) }
Periksa fungsi
CountWords
:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Count the number of times each word occurs. return stats.Count(s, col) }
Untuk mengubah kata menjadi huruf kecil, tambahkan ParDo yang menerapkan
strings.ToLower
ke setiap kata:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Map all letters to lowercase. lowercaseWords := beam.ParDo(s, strings.ToLower, col) // Count the number of times each word occurs. return stats.Count(s, lowercaseWords) }
Simpan file.
Menjalankan pipeline yang diperbarui secara lokal
Jalankan pipeline wordcount
yang telah diperbarui secara lokal dan pastikan output telah berubah.
Build dan jalankan pipeline
wordcount
yang dimodifikasi:go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
Lihat hasil output pipeline yang diubah. Semua kata harus dalam huruf kecil.
more outputs*
Untuk keluar, tekan q.
Menjalankan pipeline di layanan Dataflow
Untuk menjalankan contoh wordcount
yang diperbarui di layanan Dataflow,
gunakan perintah berikut:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://BUCKET_NAME/results/outputs \
--runner dataflow \
--project PROJECT_ID \
--region DATAFLOW_REGION \
--staging_location gs://BUCKET_NAME/binaries/
Ganti kode berikut:
BUCKET_NAME
: nama bucket Cloud Storage.PROJECT_ID
: project ID Google Cloud.DATAFLOW_REGION
: Region tempat Anda ingin men-deploy tugas Dataflow. Misalnya,europe-west1
. Untuk mengetahui daftar lokasi yang tersedia, lihat Lokasi dataflow. Flag--region
akan menggantikan region default yang ditetapkan di server metadata, klien lokal, atau variabel lingkungan.
Melihat hasil
Anda dapat melihat daftar tugas Dataflow di konsol Google Cloud. Di konsol Google Cloud, buka halaman Jobs Dataflow.
Halaman Tugas menampilkan detail tugas wordcount
Anda, termasuk status
Running pada awalnya, lalu Succeeded.
Saat Anda menjalankan pipeline menggunakan Dataflow, hasilnya akan disimpan di bucket Cloud Storage. Lihat hasil output menggunakan konsol Google Cloud atau terminal lokal.
Konsol
Untuk melihat hasil di konsol Google Cloud, buka halaman Bucket Cloud Storage.
Dari daftar bucket di project Anda, klik bucket penyimpanan yang Anda buat sebelumnya. File output yang dibuat oleh tugas Anda ditampilkan di direktori results
.
Terminal
Lihat hasilnya dari terminal atau menggunakan Cloud Shell.
Untuk mencantumkan file output, gunakan perintah
gcloud storage ls
:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
Ganti
BUCKET_NAME
dengan nama bucket Cloud Storage output yang ditentukan.Untuk melihat hasilnya dalam file output, gunakan perintah
gcloud storage cat
:gcloud storage cat gs://BUCKET_NAME/results/outputs*
Pembersihan
Agar tidak menimbulkan biaya pada akun Google Cloud Anda untuk resource yang digunakan pada halaman ini, hapus project Google Cloud yang berisi resource tersebut.
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.
Jika Anda mempertahankan project, cabut peran yang Anda berikan ke akun layanan default Compute Engine. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke