Kembangkan notebook Apache Beam dengan runner interaktif

Gunakan runner interaktif Apache Beam dengan notebook JupyterLab untuk menyelesaikan tugas-tugas berikut:

  • Mengembangkan pipeline secara iteratif.
  • Periksa grafik pipeline Anda.
  • Mengurai setiap PCollections dalam alur kerja read-eval-print-loop (REPL).

Notebook Apache Beam ini tersedia melalui notebook yang dikelola pengguna Vertex AI Workbench, sebuah layanan yang menghosting virtual machine notebook yang telah diinstal sebelumnya dengan framework data science dan machine learning terbaru.

Panduan ini berfokus pada fungsi yang diperkenalkan oleh notebook Apache Beam, tetapi tidak menunjukkan cara membuat notebook. Untuk informasi selengkapnya tentang Apache Beam, lihat panduan pemrograman Apache Beam.

Dukungan dan batasan

  • Notebook Apache Beam hanya mendukung Python.
  • Segmen pipeline Apache Beam yang berjalan di notebook ini dijalankan di lingkungan pengujian, dan bukan terhadap runner Apache Beam produksi. Untuk meluncurkan notebook di layanan Dataflow, ekspor pipeline yang dibuat di notebook Apache Beam Anda. Untuk detail selengkapnya, baca artikel Meluncurkan tugas Dataflow dari pipeline yang dibuat di notebook Anda.

Sebelum memulai

  1. Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  3. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  4. Enable the Compute Engine and Notebooks APIs.

    Enable the APIs

  5. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  6. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  7. Enable the Compute Engine and Notebooks APIs.

    Enable the APIs

Sebelum membuat instance notebook Apache Beam, aktifkan API tambahan untuk pipeline yang menggunakan layanan lain, seperti Pub/Sub.

Jika tidak ditentukan, instance notebook akan dijalankan oleh akun layanan Compute Engine default dengan peran editor project IAM. Jika project secara eksplisit membatasi peran akun layanan, pastikan project tersebut masih memiliki otorisasi yang cukup untuk menjalankan notebook Anda. Misalnya, membaca dari topik Pub/Sub secara implisit akan membuat langganan, dan akun layanan Anda memerlukan peran editor Pub/Sub IAM. Sebaliknya, membaca dari langganan Pub/Sub hanya memerlukan peran pelanggan IAM Pub/Sub.

Setelah menyelesaikan panduan ini, hapus resource yang Anda buat untuk menghindari penagihan berkelanjutan. Untuk detail selengkapnya, lihat Pembersihan.

Meluncurkan instance notebook Apache Beam

  1. Di konsol Google Cloud, buka halaman Dataflow Workbench.

    Buka workbench

  2. Pastikan Anda berada di tab Notebook yang dikelola pengguna.

  3. Di toolbar, klik Create new.

  4. Di bagian Environment, untuk Environment, pilih Apache Beam.

  5. Opsional: Jika Anda ingin menjalankan notebook pada GPU, di bagian Machine type, pilih jenis mesin yang mendukung GPU, lalu pilih Install NVIDIA GPU driver automatically for me. Untuk mengetahui informasi selengkapnya, lihat platform GPU.

  6. Di bagian Networking, pilih subnetwork untuk VM notebook.

  7. Opsional: Jika Anda ingin menyiapkan instance notebook kustom, lihat Membuat instance notebook yang dikelola pengguna dengan properti tertentu.

  8. Klik Create. Dataflow Workbench membuat instance notebook Apache Beam baru.

  9. Setelah instance notebook dibuat, link Buka JupyterLab akan aktif. Klik Open JupyterLab.

Opsional: Menginstal dependensi

Notebook Apache Beam sudah dilengkapi dengan dependensi konektor Apache Beam dan Google Cloud yang terinstal. Jika pipeline Anda berisi konektor kustom atau PTransforms kustom yang bergantung pada library pihak ketiga, instal konektor tersebut setelah Anda membuat instance notebook. Untuk informasi selengkapnya, lihat Menginstal dependensi dalam dokumentasi notebook yang dikelola pengguna.

Mulai menggunakan notebook Apache Beam

Setelah membuka instance notebook yang dikelola pengguna, notebook contoh tersedia di folder Examples. Tersedia notebook berikut:

  • Jumlah Kata
  • Jumlah Kata Streaming
  • Streaming Data Perjalanan Taksi NYC
  • Apache Beam SQL di notebook dengan perbandingan dengan pipeline
  • Apache Beam SQL di notebook dengan Dataflow Runner
  • Apache Beam SQL di notebook
  • Jumlah Kata Dataflow
  • Flink Interaktif dalam Skala Besar
  • RunInference
  • Menggunakan GPU dengan Apache Beam
  • Visualisasikan Data

Anda dapat menemukan tutorial tambahan yang menjelaskan dasar-dasar Apache Beam di folder Tutorials. Tutorial berikut tersedia:

  • Pengoperasian Dasar
  • Operasi Element Wise
  • Agregasi
  • Windows
  • Operasi I/O
  • Streaming
  • Latihan Terakhir

Notebook ini menyertakan teks penjelasan dan blok kode yang diberi komentar untuk membantu Anda memahami konsep Apache Beam dan penggunaan API. Tutorial ini juga menyediakan latihan langsung untuk mempraktikkan konsep.

Bagian berikut menggunakan kode contoh dari notebook Jumlah Kata Streaming. Cuplikan kode dalam panduan ini dan apa yang ditemukan di notebook Jumlah Kata Streaming mungkin memiliki perbedaan kecil.

Buat instance notebook

Buka File > New > Notebook, lalu pilih kernel versi Apache Beam 2.22 atau yang lebih baru.

Notebook Apache Beam dibuat berdasarkan cabang master Apache Beam SDK. Artinya, versi terbaru kernel yang ditampilkan di UI notebook mungkin lebih cepat daripada versi SDK yang baru dirilis.

Apache Beam diinstal pada instance notebook Anda, jadi sertakan modul interactive_runner dan interactive_beam di notebook Anda.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

Jika notebook Anda menggunakan Google API lainnya, tambahkan pernyataan impor berikut:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

Menetapkan opsi interaktivitas

Baris berikut menetapkan jumlah waktu InteractiveRunner merekam data dari sumber yang tidak terikat. Dalam contoh ini, durasi disetel ke 10 menit.

ib.options.recording_duration = '10m'

Anda juga dapat mengubah batas ukuran rekaman (dalam byte) untuk sumber tak terbatas menggunakan properti recording_size_limit.

# Set the recording size limit to 1 GB.
ib.options.recording_size_limit = 1e9

Untuk opsi interaktif tambahan, lihat classinteractive_beam.options.

Membuat pipeline

Lakukan inisialisasi pipeline menggunakan objek InteractiveRunner.

options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Set the project to the default project in your current Google Cloud environment.
# The project is used to create a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

p = beam.Pipeline(InteractiveRunner(), options=options)

Membaca dan memvisualisasikan data

Contoh berikut menunjukkan pipeline Apache Beam yang membuat langganan ke topik Pub/Sub tertentu dan membaca dari langganan tersebut.

words = p
    | "read" >> beam.io.ReadFromPubSub(topic="projects/pubsub-public-data/topics/shakespeare-kinglear")

Pipeline menghitung kata berdasarkan jendela dari sumber. Ini membuat jendela tetap dengan setiap jendela berdurasi 10 detik.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

Setelah data berada dalam jendela, kata dihitung berdasarkan jendela.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

Metode show() memvisualisasikan PCollection yang dihasilkan di notebook.

ib.show(windowed_word_counts, include_window_info=True)

Metode acara yang memvisualisasikan PCollection dalam bentuk tabel.

Anda dapat menentukan cakupan hasil yang ditetapkan dari show() dengan menetapkan dua parameter opsional: n dan duration.

  • Setel n untuk membatasi hasil yang disetel agar menampilkan maksimal n jumlah elemen, misalnya 20. Jika n tidak ditetapkan, perilaku default-nya adalah mencantumkan elemen terbaru yang diambil hingga perekaman sumber selesai.
  • Setel duration untuk membatasi kumpulan hasil ke data selama jumlah detik tertentu yang dimulai dari awal perekaman sumber. Jika duration tidak ditetapkan, perilaku default-nya adalah mencantumkan semua elemen hingga perekaman selesai.

Jika kedua parameter opsional ditetapkan, show() akan berhenti setiap kali nilai minimum terpenuhi. Pada contoh berikut, show() menampilkan maksimal 20 elemen yang dihitung berdasarkan data selama 30 detik pertama dari sumber yang direkam.

ib.show(windowed_word_counts, include_window_info=True, n=20, duration=30)

Untuk menampilkan visualisasi data Anda, teruskan visualize_data=True ke metode show(). Anda dapat menerapkan beberapa filter ke visualisasi. Visualisasi berikut memungkinkan Anda memfilter berdasarkan label dan sumbu:

Metode acara yang memvisualisasikan PCollection sebagai beragam kumpulan elemen UI yang dapat difilter.

Untuk memastikan kemampuan replay saat membuat prototipe pipeline streaming, panggilan metode show() akan menggunakan kembali data yang direkam secara default. Untuk mengubah perilaku ini dan membuat metode show() selalu mengambil data baru, tetapkan interactive_beam.options.enable_capture_replay = False. Selain itu, jika Anda menambahkan sumber tak terbatas kedua ke notebook, data dari sumber tak terbatas sebelumnya akan dihapus.

Visualisasi berguna lainnya di notebook Apache Beam adalah Pandas DataFrame. Contoh berikut pertama-tama mengonversi kata menjadi huruf kecil, kemudian menghitung frekuensi setiap kata.

windowed_lower_word_counts = (windowed_words
   | beam.Map(lambda word: word.lower())
   | "count" >> beam.combiners.Count.PerElement())

Metode collect() memberikan output dalam DataFrame Pandas.

ib.collect(windowed_lower_word_counts, include_window_info=True)

Metode pengumpulan yang mewakili PCollection dalam DataFrame Pandas.

Mengedit dan menjalankan kembali sel adalah praktik umum dalam pengembangan notebook. Saat Anda mengedit dan menjalankan ulang sel di notebook Apache Beam, sel tersebut tidak mengurungkan tindakan kode yang diinginkan di sel aslinya. Misalnya, jika sebuah sel menambahkan PTransform ke pipeline, menjalankan kembali sel tersebut akan menambahkan PTransform tambahan ke pipeline. Jika Anda ingin membersihkan keadaan, mulai ulang {i>kernel<i}, lalu jalankan kembali selnya.

Memvisualisasikan data melalui Interactive Beam inspector

Anda mungkin merasa terganggu saat memasukkan data PCollection dengan terus memanggil show() dan collect(), terutama jika output menggunakan banyak ruang di layar dan menyulitkan navigasi melalui notebook. Anda juga dapat membandingkan beberapa PCollections secara berdampingan untuk memvalidasi apakah transformasi berfungsi sebagaimana mestinya. Misalnya, saat satu PCollection melalui transformasi dan menghasilkan yang lainnya. Untuk kasus penggunaan ini, Interactive Beam inspector adalah solusi yang mudah.

Interactive Beam inspector disediakan sebagai ekstensi JupyterLab apache-beam-jupyterlab-sidepanel yang telah diinstal sebelumnya di notebook Apache Beam. Dengan ekstensi tersebut, Anda dapat secara interaktif memeriksa status pipeline dan data yang terkait dengan setiap PCollection tanpa memanggil show() atau collect() secara eksplisit.

Ada 3 cara untuk membuka pemeriksa:

  • Klik Interactive Beam pada panel menu atas JupyterLab. Di menu dropdown, temukan Open Inspector, lalu klik untuk membuka pemeriksa.

    Membuka pemeriksa melalui menu

  • Menggunakan halaman peluncur. Jika tidak ada halaman peluncur yang terbuka, klik File -> New Launcher untuk membukanya. Di halaman peluncur, temukan Interactive Beam dan klik Open Inspector untuk membuka pemeriksa.

    Buka pemeriksa melalui peluncur

  • Gunakan palet perintah. Di panel menu JupyterLab, klik View > Activate Command Palette. Dalam dialog, telusuri Interactive Beam untuk mencantumkan semua opsi ekstensi. Klik Open Inspector untuk membuka pemeriksa.

    Buka inspector melalui palet perintah

Saat pemeriksa akan membuka:

  • Jika hanya ada satu notebook yang terbuka, pemeriksa akan otomatis terhubung ke sana.

  • Jika tidak ada notebook yang terbuka, akan muncul dialog yang memungkinkan Anda memilih kernel.

  • Jika beberapa notebook terbuka, sebuah dialog akan muncul dan memungkinkan Anda memilih sesi notebook.

    Pilih notebook yang akan dihubungkan

Sebaiknya buka setidaknya satu notebook dan pilih kernel untuk notebook tersebut sebelum membuka inspector. Jika Anda membuka inspector dengan kernel sebelum membuka notebook apa pun, nantinya saat membuka notebook untuk terhubung ke inspector, Anda harus memilih Interactive Beam Inspector Session dari Use Kernel from Preferred Session. Pemeriksa dan notebook terhubung ketika keduanya berbagi sesi yang sama, bukan sesi berbeda yang dibuat dari kernel yang sama. Memilih kernel yang sama dari Start Preferred Kernel akan menghasilkan sesi baru yang terpisah dari sesi notebook atau inspector terbuka yang sudah ada.

Anda dapat membuka beberapa inspector untuk notebook yang terbuka dan mengatur inspector dengan menarik lalu melepas tabnya secara bebas di ruang kerja.

Buka 2 inspector dan atur secara berdampingan

Halaman inspector otomatis dimuat ulang saat Anda menjalankan sel di notebook. Halaman ini mencantumkan pipeline dan PCollections yang ditentukan di notebook yang terhubung. PCollections diatur menurut pipeline-nya, dan Anda dapat menciutkannya dengan mengklik pipeline header.

Untuk item dalam pipeline dan daftar PCollections, saat diklik, pemeriksa akan merender visualisasi yang sesuai di sisi kanan:

  • Jika berupa PCollection, pemeriksa akan merender datanya (secara dinamis jika data masih masuk untuk PCollections yang tidak terbatas) dengan widget tambahan untuk menyesuaikan visualisasi setelah mengklik tombol APPLY.

    Halaman pemeriksa

    Karena inspector dan notebook yang terbuka berbagi sesi kernel yang sama, keduanya saling memblokir agar tidak berjalan. Misalnya, jika notebook sibuk menjalankan kode, inspector tidak akan diupdate hingga notebook menyelesaikan eksekusi tersebut. Sebaliknya, jika ingin menjalankan kode langsung di notebook saat inspector memvisualisasikan PCollection secara dinamis, Anda harus mengklik tombol STOP untuk menghentikan visualisasi dan melepaskan kernel secara preemptive ke notebook.

  • Jika berupa pipeline, pemeriksa akan menampilkan grafik pipeline.

    Halaman pemeriksa

Anda mungkin melihat pipeline anonim. Pipeline tersebut memiliki PCollections yang dapat Anda akses, tetapi tidak lagi direferensikan oleh sesi utama. Contoh:

p = beam.Pipeline()
pcoll = p | beam.Create([1, 2, 3])

p = beam.Pipeline()

Contoh sebelumnya membuat pipeline kosong p dan pipeline anonim yang berisi satu PCollection pcoll. Anda dapat mengakses pipeline anonim menggunakan pcoll.pipeline.

Anda dapat mengalihkan pipeline dan daftar PCollection guna menghemat ruang untuk visualisasi besar. Daftar Toggel kiri

Memahami status perekaman pipeline

Selain visualisasi, Anda juga dapat memeriksa status perekaman untuk satu atau semua pipeline di instance notebook Anda dengan memanggil describe.

# Return the recording status of a specific pipeline. Leave the parameter list empty to return
# the recording status of all pipelines.
ib.recordings.describe(p)

Metode describe() memberikan detail berikut:

  • Ukuran total (dalam byte) semua rekaman untuk pipeline pada disk
  • Waktu mulai saat tugas perekaman latar belakang dimulai (dalam detik dari epoch Unix)
  • Status pipeline saat ini dari tugas perekaman latar belakang
  • Variabel Python untuk pipeline

Meluncurkan tugas Dataflow dari pipeline yang dibuat di notebook Anda

  1. Opsional: Sebelum menggunakan notebook Anda untuk menjalankan tugas Dataflow, mulai ulang kernel, jalankan kembali semua sel, dan verifikasi hasilnya. Jika Anda melewati langkah ini, status tersembunyi di notebook mungkin memengaruhi grafik tugas di objek pipeline.
  2. Aktifkan Dataflow API.
  3. Tambahkan pernyataan impor berikut:

    from apache_beam.runners import DataflowRunner
    
  4. Teruskan opsi pipeline Anda.

    # Set up Apache Beam pipeline options.
    options = pipeline_options.PipelineOptions()
    
    # Set the project to the default project in your current Google Cloud
    # environment.
    _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    
    # Set the Google Cloud region to run Dataflow.
    options.view_as(GoogleCloudOptions).region = 'us-central1'
    
    # Choose a Cloud Storage location.
    dataflow_gcs_location = 'gs://<change me>/dataflow'
    
    # Set the staging location. This location is used to stage the
    # Dataflow pipeline and SDK binary.
    options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
    
    # Set the temporary location. This location is used to store temporary files
    # or intermediate results before outputting to the sink.
    options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
    
    # If and only if you are using Apache Beam SDK built from source code, set
    # the SDK location. This is used by Dataflow to locate the SDK
    # needed to run the pipeline.
    options.view_as(pipeline_options.SetupOptions).sdk_location = (
        '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %
        beam.version.__version__)
    

    Anda dapat menyesuaikan nilai parameter. Misalnya, Anda dapat mengubah nilai region dari us-central1.

  5. Jalankan pipeline dengan DataflowRunner. Langkah ini menjalankan tugas Anda pada layanan Dataflow.

    runner = DataflowRunner()
    runner.run_pipeline(p, options=options)
    

    p adalah objek pipeline dari Membuat pipeline.

Untuk contoh cara melakukan konversi ini di notebook interaktif, lihat notebook Jumlah Kata Dataflow di instance notebook Anda.

Atau, Anda dapat mengekspor notebook sebagai skrip yang dapat dieksekusi, mengubah file .py yang dihasilkan menggunakan langkah-langkah sebelumnya, lalu men-deploy pipeline ke layanan Dataflow.

Simpan notebook Anda

Notebook yang Anda buat disimpan secara lokal di instance notebook yang sedang berjalan. Jika Anda mereset atau menonaktifkan instance notebook selama pengembangan, notebook baru tersebut akan dipertahankan selama dibuat di direktori /home/jupyter. Namun, jika instance notebook dihapus, notebook tersebut juga akan dihapus.

Agar notebook Anda dapat terus digunakan di lain waktu, download notebook secara lokal ke workstation, simpan ke GitHub, atau ekspor ke format file yang berbeda.

Simpan notebook Anda ke persistent disk tambahan

Jika Anda ingin menyimpan pekerjaan seperti notebook dan skrip di berbagai instance notebook, simpan semuanya di Persistent Disk.

  1. Buat atau pasang Persistent Disk. Ikuti petunjuk untuk menggunakan ssh agar terhubung ke VM dari instance notebook dan mengeluarkan perintah di Cloud Shell yang terbuka.

  2. Perhatikan direktori tempat Persistent Disk dipasang, misalnya, /mnt/myDisk.

  3. Edit detail VM dari instance notebook untuk menambahkan entri ke kunci Custom metadata: - container-custom-params; nilai - -v /mnt/myDisk:/mnt/myDisk. Metadata tambahan yang diperlukan untuk mengikat PD yang terpasang

  4. Klik Simpan.

  5. Untuk memperbarui perubahan ini, reset instance notebook. Mereset instance notebook

  6. Setelah reset, klik Open JupyterLab. Mungkin perlu waktu beberapa saat agar UI JupyterLab tersedia. Setelah UI muncul, buka terminal dan jalankan perintah berikut: ls -al /mnt Direktori /mnt/myDisk seharusnya sudah dicantumkan. Daftar volume yang dibatasi

Sekarang Anda dapat menyimpan pekerjaan ke direktori /mnt/myDisk. Meskipun instance notebook dihapus, Persistent Disk akan tetap ada di project Anda. Selanjutnya, Anda dapat memasang Persistent Disk ini ke instance notebook lainnya.

Pembersihan

Setelah selesai menggunakan instance notebook Apache Beam, bersihkan resource yang Anda buat di Google Cloud dengan menonaktifkan instance notebook.

Langkah selanjutnya