Membuat pipeline Dataflow menggunakan Python

Dalam panduan memulai ini, Anda akan mempelajari cara menggunakan Apache Beam SDK untuk Python guna mem-build program yang menentukan pipeline. Kemudian, Anda menjalankan pipeline menggunakan runner lokal langsung atau runner berbasis cloud seperti Dataflow. Untuk pengantar pipeline WordCount, lihat video Cara menggunakan WordCount di Apache Beam.


Jika ingin mengikuti panduan langkah demi langkah untuk tugas ini langsung di Konsol Google Cloud, klik Pandu saya:

Pandu saya


Sebelum memulai

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Berikan peran ke akun layanan default Compute Engine Anda. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Ganti PROJECT_ID dengan project ID Anda.
    • Ganti PROJECT_NUMBER dengan nomor project Anda. Untuk menemukan nomor project Anda, lihat Mengidentifikasi project atau gunakan perintah gcloud projects describe.
    • Ganti SERVICE_ACCOUNT_ROLE dengan setiap peran individual.
  17. Create a Cloud Storage bucket and configure it as follows:
    • Set the storage class to S (Standar).
    • Tetapkan lokasi penyimpanan sebagai berikut: US (Amerika Serikat).
    • Ganti BUCKET_NAME dengan nama bucket yang unik. Jangan sertakan informasi sensitif pada nama bucket karena namespace bucket bersifat global dan dapat dilihat publik.
    • gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
    • Salin ID project Google Cloud dan nama bucket Cloud Storage. Anda memerlukan nilai ini nanti dalam dokumen ini.

Menyiapkan lingkungan Anda

Di bagian ini, gunakan command prompt untuk menyiapkan lingkungan virtual Python yang terisolasi guna menjalankan project pipeline Anda menggunakan venv. Proses ini memungkinkan Anda mengisolasi dependensi satu project dari dependensi project lain.

Jika tidak memiliki command prompt yang tersedia, Anda dapat menggunakan Cloud Shell. Cloud Shell telah menginstal pengelola paket untuk Python 3, sehingga Anda dapat langsung membuat lingkungan virtual.

Untuk menginstal Python, lalu membuat lingkungan virtual, ikuti langkah-langkah berikut:

  1. Pastikan Anda memiliki Python 3 dan pip yang berjalan di sistem Anda:
    python --version
    python -m pip --version
  2. Jika diperlukan, instal Python 3, lalu siapkan lingkungan virtual Python: ikuti petunjuk yang diberikan di bagian Menginstal Python dan Menyiapkan venv di halaman Menyiapkan lingkungan pengembangan Python. Jika menggunakan Python 3.10 atau yang lebih baru, Anda juga harus mengaktifkan Runner Dataflow v2. Untuk menggunakan Runner v1, gunakan Python 3.9 atau yang lebih lama.

Setelah menyelesaikan panduan memulai, Anda dapat menonaktifkan lingkungan virtual dengan menjalankan deactivate.

Mendapatkan Apache Beam SDK

Apache Beam SDK adalah model pemrograman open source untuk pipeline data. Anda menentukan pipeline dengan program Apache Beam, lalu memilih runner, seperti Dataflow, untuk menjalankan pipeline Anda.

Untuk mendownload dan menginstal Apache Beam SDK, ikuti langkah-langkah berikut:

  1. Pastikan Anda berada di lingkungan virtual Python yang Anda buat di bagian sebelumnya. Pastikan perintah dimulai dengan <env_name>, dengan env_name adalah nama lingkungan virtual.
  2. Instal standar pengemasan Python wheel:
    pip install wheel
  3. Instal Apache Beam SDK versi terbaru untuk Python:
  4. pip install 'apache-beam[gcp]'

    Di Microsoft Windows, gunakan perintah berikut:

    pip install apache-beam[gcp]

    Bergantung pada koneksi, penginstalan Anda mungkin memerlukan waktu beberapa saat.

Menjalankan pipeline secara lokal

Untuk melihat cara pipeline berjalan secara lokal, gunakan modul Python siap pakai untuk contoh wordcount yang disertakan dengan paket apache_beam.

Contoh pipeline wordcount melakukan hal berikut:

  1. Menggunakan file teks sebagai input.

    File teks ini terletak di bucket Cloud Storage dengan nama resource gs://dataflow-samples/shakespeare/kinglear.txt.

  2. Mengurai setiap baris menjadi kata.
  3. Melakukan penghitungan frekuensi pada kata yang ditokenisasi.

Untuk melakukan staging pipeline wordcount secara lokal, ikuti langkah-langkah berikut:

  1. Dari terminal lokal, jalankan contoh wordcount:
    python -m apache_beam.examples.wordcount \
      --output outputs
  2. Lihat output pipeline:
    more outputs*
  3. Untuk keluar, tekan q.
Dengan menjalankan pipeline secara lokal, Anda dapat menguji dan men-debug program Apache Beam. Anda dapat melihat kode sumber wordcount.py di GitHub Apache Beam.

Menjalankan pipeline di layanan Dataflow

Di bagian ini, jalankan pipeline contoh wordcount dari paket apache_beam di layanan Dataflow. Contoh ini menentukan DataflowRunner sebagai parameter untuk --runner.
  • Menjalankan pipeline:
    python -m apache_beam.examples.wordcount \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    Ganti kode berikut:

    • DATAFLOW_REGION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, europe-west1

      Flag --region akan mengganti region default yang ditetapkan di server metadata, klien lokal, atau variabel lingkungan.

    • BUCKET_NAME: nama bucket Cloud Storage yang Anda salin sebelumnya
    • PROJECT_ID: project ID Google Cloud yang Anda salin sebelumnya

Melihat hasil

Saat Anda menjalankan pipeline menggunakan Dataflow, hasilnya akan disimpan di bucket Cloud Storage. Di bagian ini, pastikan pipeline berjalan menggunakan Konsol Google Cloud atau terminal lokal.

Konsol Google Cloud

Untuk melihat hasil di konsol Google Cloud, ikuti langkah-langkah berikut:

  1. Di konsol Google Cloud, buka halaman Jobs Dataflow.

    Buka Tugas

    Halaman Tugas menampilkan detail tugas wordcount Anda, termasuk status Running pada awalnya, lalu Succeeded.

  2. Buka halaman Bucket Cloud Storage.

    Buka Buckets

  3. Dari daftar bucket dalam project, klik bucket penyimpanan yang Anda buat sebelumnya.

    Di direktori wordcount, file output yang dibuat oleh tugas Anda akan ditampilkan.

Terminal lokal

Lihat hasilnya dari terminal atau menggunakan Cloud Shell.

  1. Untuk mencantumkan file output, gunakan perintah gcloud storage ls:
    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
  2. Ganti BUCKET_NAME dengan nama bucket Cloud Storage yang digunakan dalam program pipeline.

  3. Untuk melihat hasilnya dalam file output, gunakan perintah gcloud storage cat:
    gcloud storage cat gs://BUCKET_NAME/results/outputs*

Mengubah kode pipeline

Pipeline wordcount dalam contoh sebelumnya membedakan antara kata berhuruf besar dan kecil. Langkah-langkah berikut menunjukkan cara memodifikasi pipeline sehingga pipeline wordcount tidak peka huruf besar/kecil.
  1. Di komputer lokal, download salinan terbaru kode wordcount dari repositori GitHub Apache Beam.
  2. Dari terminal lokal, jalankan pipeline:
    python wordcount.py --output outputs
  3. Lihat hasilnya:
    more outputs*
  4. Untuk keluar, tekan q.
  5. Di editor pilihan Anda, buka file wordcount.py.
  6. Di dalam fungsi run, periksa langkah-langkah pipeline:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum))

    Setelah split, baris akan dibagi menjadi kata-kata sebagai string.

  7. Untuk mengubah string menjadi huruf kecil, ubah baris setelah split:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'lowercase' >> beam.Map(str.lower)
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum)) 
    Modifikasi ini memetakan fungsi str.lower ke setiap kata. Baris ini setara dengan beam.Map(lambda word: str.lower(word)).
  8. Simpan file dan jalankan tugas wordcount yang dimodifikasi:
    python wordcount.py --output outputs
  9. Lihat hasil pipeline yang diubah:
    more outputs*
  10. Untuk keluar, tekan q.
  11. Jalankan pipeline yang diubah di layanan Dataflow:
    python wordcount.py \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    Ganti kode berikut:

    • DATAFLOW_REGION: region tempat Anda ingin men-deploy tugas Dataflow
    • BUCKET_NAME: nama bucket Cloud Storage Anda
    • PROJECT_ID: project ID Google Cloud Anda

Pembersihan

Agar tidak menimbulkan biaya pada akun Google Cloud Anda untuk resource yang digunakan pada halaman ini, hapus project Google Cloud yang berisi resource tersebut.

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  2. Click the checkbox for the bucket that you want to delete.
  3. To delete the bucket, click Delete, and then follow the instructions.
  4. Jika Anda mempertahankan project, cabut peran yang Anda berikan ke akun layanan default Compute Engine. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  6. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

Langkah selanjutnya