Membuat pipeline Dataflow menggunakan Go

Halaman ini menunjukkan cara menggunakan Apache Beam SDK untuk Go untuk membuat program yang mendefinisikan pipeline. Kemudian, Anda akan menjalankan pipeline secara lokal dan di layanan Dataflow. Untuk pengantar pipeline WordCount, lihat video Cara menggunakan WordCount di Apache Beam.

Sebelum memulai

  1. Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. Menginstal Google Cloud CLI.
  3. Untuk initialize gcloud CLI, jalankan perintah berikut:

    gcloud init
  4. Buat atau pilih project Google Cloud.

    • Membuat project Google Cloud:

      gcloud projects create PROJECT_ID

      Ganti PROJECT_ID dengan nama untuk project Google Cloud yang Anda buat.

    • Pilih project Google Cloud yang Anda buat:

      gcloud config set project PROJECT_ID

      Ganti PROJECT_ID dengan nama project Google Cloud Anda.

  5. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  6. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  7. Buat kredensial autentikasi lokal untuk Akun Google Anda:

    gcloud auth application-default login
  8. Berikan peran ke Akun Google Anda. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Ganti PROJECT_ID dengan project ID Anda.
    • Ganti EMAIL_ADDRESS dengan alamat email Anda.
    • Ganti ROLE dengan setiap peran individual.
  9. Menginstal Google Cloud CLI.
  10. Untuk initialize gcloud CLI, jalankan perintah berikut:

    gcloud init
  11. Buat atau pilih project Google Cloud.

    • Membuat project Google Cloud:

      gcloud projects create PROJECT_ID

      Ganti PROJECT_ID dengan nama untuk project Google Cloud yang Anda buat.

    • Pilih project Google Cloud yang Anda buat:

      gcloud config set project PROJECT_ID

      Ganti PROJECT_ID dengan nama project Google Cloud Anda.

  12. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  13. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  14. Buat kredensial autentikasi lokal untuk Akun Google Anda:

    gcloud auth application-default login
  15. Berikan peran ke Akun Google Anda. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Ganti PROJECT_ID dengan project ID Anda.
    • Ganti EMAIL_ADDRESS dengan alamat email Anda.
    • Ganti ROLE dengan setiap peran individual.
  16. Berikan peran ke akun layanan default Compute Engine Anda. Jalankan perintah berikut sekali untuk setiap peran IAM berikut:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Ganti PROJECT_ID dengan project ID Anda.
    • Ganti PROJECT_NUMBER dengan nomor project Anda. Untuk menemukan nomor project Anda, lihat Identify projects atau gunakan perintah gcloud projects describe.
    • Ganti SERVICE_ACCOUNT_ROLE dengan setiap peran individual.
  17. Buat bucket Cloud Storage dan konfigurasikan sebagai berikut:
    • Tetapkan kelas penyimpanan ke S (Standar).
    • Tetapkan lokasi penyimpanan sebagai berikut: US (Amerika Serikat).
    • Ganti BUCKET_NAME dengan nama bucket yang unik. Jangan sertakan informasi sensitif pada nama bucket karena namespace bucket bersifat global dan dapat dilihat publik.
    gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
  18. Salin project ID Google Cloud dan nama bucket Cloud Storage. Anda memerlukan nilai ini nanti dalam panduan memulai ini.

Menyiapkan lingkungan pengembangan

Apache Beam SDK adalah model pemrograman open source untuk pipeline data. Anda menentukan pipeline dengan program Apache Beam, lalu memilih runner, seperti Dataflow, untuk menjalankan pipeline.

Sebaiknya gunakan Go versi terbaru saat menggunakan Apache Beam SDK untuk Go. Jika Anda belum menginstal Go versi terbaru, gunakan Panduan download dan penginstalan Go untuk mendownload dan menginstal Go untuk sistem operasi spesifik Anda.

Untuk memverifikasi versi Go yang telah Anda instal, jalankan perintah berikut di terminal lokal Anda:

go version

Menjalankan contoh jumlah kata Beam

Apache Beam SDK untuk Go menyertakan contoh pipeline wordcount. Contoh wordcount melakukan hal berikut:

  1. Membaca file teks sebagai input. Secara default, ini akan membaca file teks yang berada di bucket Cloud Storage dengan nama resource gs://dataflow-samples/shakespeare/kinglear.txt.
  2. Mengurai setiap baris menjadi kata.
  3. Melakukan penghitungan frekuensi pada kata yang ditokenkan.

Untuk menjalankan contoh wordcount Beam versi terbaru di mesin lokal Anda, gunakan perintah berikut. Flag input menentukan file yang akan dibaca, dan flag output menentukan nama file untuk output jumlah frekuensi.

go run github.com/apache/beam/sdks/v2/go/examples/wordcount@latest \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output outputs

Setelah pipeline selesai, lihat hasil output:

more outputs*

Untuk keluar, tekan q.

Mengubah kode pipeline

Pipeline wordcount Beam membedakan antara kata-kata huruf besar dan huruf kecil. Langkah-langkah berikut menunjukkan cara membuat modul Go Anda sendiri, mengubah pipeline wordcount sehingga pipeline tidak peka huruf besar/kecil, dan menjalankannya di Dataflow.

Membuat modul Go

Untuk membuat perubahan pada kode pipeline, ikuti langkah-langkah berikut.

  1. Buat direktori untuk modul Go di lokasi pilihan Anda:

    mkdir wordcount
    cd wordcount
    
  2. Membuat modul Go. Untuk contoh ini, gunakan example/dataflow sebagai jalur modul.

    go mod init example/dataflow
    
  3. Download salinan terbaru kode wordcount dari repositori GitHub Apache Beam. Masukkan file ini ke direktori wordcount yang Anda buat.

  4. Jika menggunakan sistem operasi non-Linux, Anda harus mendapatkan paket unix Go. Paket ini diperlukan untuk menjalankan pipeline di layanan Dataflow.

    go get -u golang.org/x/sys/unix
    
  5. Pastikan bahwa file go.mod cocok dengan kode sumber modul:

    go mod tidy
    

Menjalankan pipeline yang tidak dimodifikasi

Pastikan pipeline wordcount yang tidak dimodifikasi berjalan secara lokal.

  1. Dari terminal, build dan jalankan pipeline secara lokal:

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
    
  2. Lihat hasil output:

     more outputs*
    
  3. Untuk keluar, tekan q.

Mengubah kode pipeline

Untuk mengubah pipeline agar tidak peka huruf besar/kecil, ubah kode untuk menerapkan fungsi strings.ToLower ke semua kata.

  1. Di editor pilihan Anda, buka file wordcount.go.

  2. Periksa blok init (komentar telah dihapus agar lebih jelas):

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
     }
    
  3. Tambahkan baris baru untuk mendaftarkan fungsi strings.ToLower:

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
       register.Function1x1(strings.ToLower)
     }
    
  4. Periksa fungsi CountWords:

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Count the number of times each word occurs.
       return stats.Count(s, col)
     }
    
  5. Untuk huruf kecil pada kata, tambahkan ParDo yang menerapkan strings.ToLower ke setiap kata:

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Map all letters to lowercase.
       lowercaseWords := beam.ParDo(s, strings.ToLower, col)
    
       // Count the number of times each word occurs.
       return stats.Count(s, lowercaseWords)
     }
    
  6. Simpan file.

Menjalankan pipeline yang diperbarui secara lokal

Jalankan pipeline wordcount yang telah diperbarui secara lokal dan pastikan outputnya telah berubah.

  1. Bangun dan jalankan pipeline wordcount yang telah diubah:

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
    
  2. Melihat hasil output dari pipeline yang diubah. Semua kata harus dalam huruf kecil.

     more outputs*
    
  3. Untuk keluar, tekan q.

Menjalankan pipeline di layanan Dataflow

Untuk menjalankan contoh wordcount yang telah diupdate di layanan Dataflow, gunakan perintah berikut:

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://BUCKET_NAME/results/outputs \
    --runner dataflow \
    --project PROJECT_ID \
    --region DATAFLOW_REGION \
    --staging_location gs://BUCKET_NAME/binaries/

Ganti kode berikut:

  • BUCKET_NAME: nama bucket Cloud Storage.

  • PROJECT_ID: ID project Google Cloud.

  • DATAFLOW_REGION: region tempat Anda ingin men-deploy tugas Dataflow. Misalnya, europe-west1. Untuk daftar lokasi yang tersedia, lihat Lokasi Dataflow. Flag --region menggantikan region default yang ditetapkan di server metadata, klien lokal, atau variabel lingkungan Anda.

Lihat hasil Anda

Anda dapat melihat daftar tugas Dataflow di Konsol Google Cloud. Di konsol Google Cloud, buka halaman Jobs Dataflow.

Buka Tugas

Halaman Jobs menampilkan detail tugas wordcount Anda, termasuk status Running di awal, lalu Succeeded.

Saat Anda menjalankan pipeline menggunakan Dataflow, hasilnya disimpan dalam bucket Cloud Storage. Lihat hasil output menggunakan konsol Google Cloud atau terminal lokal.

Konsol

Untuk melihat hasilnya di konsol Google Cloud, buka halaman Buckets Cloud Storage.

Buka Buckets

Dari daftar bucket dalam project Anda, klik bucket penyimpanan yang Anda buat sebelumnya. File output yang dibuat oleh tugas Anda ditampilkan di direktori results.

Terminal

Lihat hasilnya dari terminal Anda atau dengan menggunakan Cloud Shell.

  1. Untuk menampilkan daftar file output, gunakan perintah gcloud storage ls:

    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
    

    Ganti BUCKET_NAME dengan nama bucket Cloud Storage output yang ditentukan.

  2. Untuk melihat hasilnya di file output, gunakan perintah gcloud storage cat:

    gcloud storage cat gs://BUCKET_NAME/results/outputs*
    

Pembersihan

Agar tidak menimbulkan biaya pada akun Google Cloud Anda untuk resource yang digunakan pada halaman ini, hapus project Google Cloud yang berisi resource tersebut.

  1. Di konsol Google Cloud, buka halaman Buckets Cloud Storage.

    Buka Buckets

  2. Klik kotak centang untuk bucket yang ingin Anda dihapus.
  3. Untuk menghapus bucket, klik Hapus, lalu ikuti petunjuk.
  4. Jika Anda mempertahankan project Anda, cabut peran yang Anda berikan ke akun layanan default Compute Engine. Jalankan perintah berikut sekali untuk setiap peran IAM berikut:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. Opsional: Cabut kredensial autentikasi yang Anda buat, dan hapus file kredensial lokal.

    gcloud auth application-default revoke
  6. Opsional: Cabut kredensial dari gcloud CLI.

    gcloud auth revoke

Langkah selanjutnya