Mengembangkan notebook Apache Beam dengan runner interaktif

Gunakan runner interaktif Apache Beam dengan notebook JupyterLab untuk menyelesaikan tugas berikut:

  • Mengembangkan pipeline secara iteratif.
  • Periksa grafik pipeline Anda.
  • Mengurai PCollections individual dalam alur kerja read-eval-print-loop (REPL).

Notebook Apache Beam ini tersedia melalui Vertex AI Workbench, layanan yang menghosting virtual machine notebook yang telah diinstal sebelumnya dengan framework data science dan machine learning terbaru. Dataflow hanya mendukung instance Workbench yang menggunakan penampung Apache Beam.

Panduan ini berfokus pada fitur yang diperkenalkan oleh notebook Apache Beam, tetapi tidak menunjukkan cara membuat notebook. Untuk mengetahui informasi selengkapnya tentang Apache Beam, lihat panduan pemrograman Apache Beam.

Dukungan dan batasan

  • Notebook Apache Beam hanya mendukung Python.
  • Segmen pipeline Apache Beam yang berjalan di notebook ini dijalankan di lingkungan pengujian, dan bukan terhadap runner Apache Beam produksi. Untuk meluncurkan notebook di layanan Dataflow, ekspor pipeline yang dibuat di notebook Apache Beam Anda. Untuk mengetahui detail selengkapnya, lihat Meluncurkan tugas Dataflow dari pipeline yang dibuat di notebook Anda.

Sebelum memulai

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine and Notebooks APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine and Notebooks APIs.

    Enable the APIs

  8. Sebelum membuat instance notebook Apache Beam, aktifkan API tambahan untuk pipeline yang menggunakan layanan lain, seperti Pub/Sub.

    Jika tidak ditentukan, instance notebook akan dieksekusi oleh akun layanan Compute Engine default dengan peran editor project IAM. Jika project secara eksplisit membatasi peran akun layanan, pastikan akun layanan tersebut masih memiliki otorisasi yang memadai untuk menjalankan notebook Anda. Misalnya, membaca dari topik Pub/Sub secara implisit membuat langganan, dan akun layanan Anda memerlukan peran editor Pub/Sub IAM. Sebaliknya, membaca dari langganan Pub/Sub hanya memerlukan peran IAM Pub/Sub Subscriber.

    Setelah menyelesaikan panduan ini, untuk menghindari penagihan berkelanjutan, hapus resource yang Anda buat. Untuk detail selengkapnya, lihat Pembersihan.

    Meluncurkan instance notebook Apache Beam

    1. Di konsol Google Cloud , buka halaman Workbench Dataflow.

      Buka workbench

    2. Pastikan Anda berada di tab INSTANCES.

    3. Di toolbar, klik Buat baru.

    4. Di bagian Environment, untuk Environment, Container harus berupa Apache Beam. Hanya JupyterLab 3.x yang didukung untuk notebook Apache Beam.

    5. Opsional: Jika Anda ingin menjalankan notebook di GPU, di bagian Jenis mesin, pilih jenis mesin yang mendukung GPU. Untuk mengetahui informasi selengkapnya, lihat Platform GPU.

    6. Di bagian Networking, pilih subnetwork untuk VM notebook.

    7. Opsional: Jika Anda ingin menyiapkan instance notebook kustom, lihat Membuat instance menggunakan container kustom.

    8. Klik Buat. Dataflow Workbench membuat instance notebook Apache Beam baru.

    9. Setelah instance notebook dibuat, link Open JupyterLab akan aktif. Klik Open JupyterLab.

    Opsional: Menginstal dependensi

    Notebook Apache Beam sudah dilengkapi dengan dependensi konektor Apache Beam danGoogle Cloud yang terinstal. Jika pipeline Anda berisi konektor kustom atau PTransforms kustom yang bergantung pada library pihak ketiga, instal library tersebut setelah Anda membuat instance notebook.

    Contoh notebook Apache Beam

    Setelah membuat instance notebook, buka di JupyterLab. Di tab Files di sidebar JupyterLab, folder Examples berisi notebook contoh. Untuk mengetahui informasi selengkapnya tentang cara menggunakan file JupyterLab, lihat Bekerja dengan file dalam panduan pengguna JupyterLab.

    Notebook berikut tersedia:

    • Jumlah Kata
    • Streaming Jumlah Kata
    • Streaming Data Perjalanan Taksi NYC
    • Apache Beam SQL di notebook dengan perbandingan terhadap pipeline
    • Apache Beam SQL di notebook dengan Dataflow Runner
    • Apache Beam SQL di notebook
    • Jumlah Kata Dataflow
    • Flink Interaktif dalam Skala Besar
    • RunInference
    • Menggunakan GPU dengan Apache Beam
    • Visualisasikan Data

    Folder Tutorials berisi tutorial tambahan yang menjelaskan dasar-dasar Apache Beam. Tutorial berikut tersedia:

    • Operasi Dasar
    • Operasi Per Elemen
    • Agregasi
    • Windows
    • Operasi I/O
    • Streaming
    • Latihan Akhir

    Notebook ini mencakup teks penjelasan dan blok kode yang diberi komentar untuk membantu Anda memahami konsep Apache Beam dan penggunaan API. Tutorial ini juga menyediakan latihan untuk mempraktikkan konsep.

    Bagian berikut menggunakan contoh kode dari notebook Streaming Word Count. Cuplikan kode dalam panduan ini dan yang ditemukan di notebook Streaming Word Count mungkin memiliki sedikit perbedaan.

    Buat instance notebook

    Buka File > New > Notebook, lalu pilih kernel yang merupakan Apache Beam 2.22 atau yang lebih baru.

    Notebook Apache Beam dibuat berdasarkan cabang utama Apache Beam SDK. Artinya, versi kernel terbaru yang ditampilkan di UI notebook mungkin lebih baru daripada versi SDK yang baru dirilis.

    Apache Beam diinstal di instance notebook Anda, jadi sertakan modul interactive_runner dan interactive_beam di notebook Anda.

    import apache_beam as beam
    from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
    import apache_beam.runners.interactive.interactive_beam as ib
    

    Jika notebook Anda menggunakan Google API lain, tambahkan pernyataan impor berikut:

    from apache_beam.options import pipeline_options
    from apache_beam.options.pipeline_options import GoogleCloudOptions
    import google.auth
    

    Menetapkan opsi interaktivitas

    Baris berikut menetapkan jumlah waktu InteractiveRunner merekam data dari sumber yang tidak terikat. Dalam contoh ini, durasi ditetapkan ke 10 menit.

    ib.options.recording_duration = '10m'
    

    Anda juga dapat mengubah batas ukuran perekaman (dalam byte) untuk sumber yang tidak terikat dengan menggunakan properti recording_size_limit.

    # Set the recording size limit to 1 GB.
    ib.options.recording_size_limit = 1e9
    

    Untuk opsi interaktif tambahan, lihat class interactive_beam.options.

    Membuat pipeline

    Lakukan inisialisasi pipeline menggunakan objek InteractiveRunner.

    options = pipeline_options.PipelineOptions(flags={})
    
    # Set the pipeline mode to stream the data from Pub/Sub.
    options.view_as(pipeline_options.StandardOptions).streaming = True
    
    # Set the project to the default project in your current Google Cloud environment.
    # The project is used to create a subscription to the Pub/Sub topic.
    _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    
    p = beam.Pipeline(InteractiveRunner(), options=options)
    

    Membaca dan memvisualisasikan data

    Contoh berikut menunjukkan pipeline Apache Beam yang membuat langganan ke topik Pub/Sub tertentu dan membaca dari langganan tersebut.

    words = p | "read" >> beam.io.ReadFromPubSub(topic="projects/pubsub-public-data/topics/shakespeare-kinglear")
    

    Pipeline menghitung kata-kata berdasarkan jendela dari sumber. Operator ini membuat windowing tetap dengan setiap jendela berdurasi 10 detik.

    windowed_words = (words
       | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
    

    Setelah data di-window, kata-kata dihitung berdasarkan window.

    windowed_word_counts = (windowed_words
       | "count" >> beam.combiners.Count.PerElement())
    

    Metode show() memvisualisasikan PCollection yang dihasilkan di notebook.

    ib.show(windowed_word_counts, include_window_info=True)
    

    Metode show yang memvisualisasikan PCollection dalam bentuk tabel.

    Anda dapat membatasi cakupan kumpulan hasil kembali dari show() dengan menetapkan dua parameter opsional: n dan duration.

    • Tetapkan n untuk membatasi kumpulan hasil agar menampilkan paling banyak n jumlah elemen, seperti 20. Jika n tidak disetel, perilaku default-nya adalah mencantumkan elemen terbaru yang direkam hingga perekaman sumber selesai.
    • Tetapkan duration untuk membatasi kumpulan hasil ke data yang bernilai sejumlah detik tertentu yang dimulai dari awal rekaman sumber. Jika duration tidak ditetapkan, perilaku defaultnya adalah mencantumkan semua elemen hingga perekaman berakhir.

    Jika kedua parameter opsional disetel, show() akan berhenti setiap kali salah satu nilai minimum tercapai. Dalam contoh berikut, show() menampilkan maksimal 20 elemen yang dihitung berdasarkan data 30 detik pertama dari sumber yang direkam.

    ib.show(windowed_word_counts, include_window_info=True, n=20, duration=30)
    

    Untuk menampilkan visualisasi data Anda, teruskan visualize_data=True ke dalam metode show(). Anda dapat menerapkan beberapa filter ke visualisasi. Visualisasi berikut memungkinkan Anda memfilter menurut label dan sumbu:

    Metode show memvisualisasikan PCollection sebagai kumpulan elemen UI yang dapat difilter.

    Untuk memastikan kemampuan pemutaran ulang saat membuat prototipe pipeline streaming, panggilan metode show() menggunakan kembali data yang diambil secara default. Untuk mengubah perilaku ini dan membuat metode show() selalu mengambil data baru, tetapkan interactive_beam.options.enable_capture_replay = False. Selain itu, jika Anda menambahkan sumber tidak terbatas kedua ke notebook, data dari sumber tidak terbatas sebelumnya akan dihapus.

    Visualisasi lain yang berguna di notebook Apache Beam adalah Pandas DataFrame. Contoh berikut pertama-tama mengonversi kata menjadi huruf kecil, lalu menghitung frekuensi setiap kata.

    windowed_lower_word_counts = (windowed_words
       | beam.Map(lambda word: word.lower())
       | "count" >> beam.combiners.Count.PerElement())
    

    Metode collect() memberikan output dalam DataFrame Pandas.

    ib.collect(windowed_lower_word_counts, include_window_info=True)
    

    Metode pengumpulan yang merepresentasikan PCollection dalam Pandas DataFrame.

    Mengedit dan menjalankan ulang sel adalah praktik umum dalam pengembangan notebook. Saat Anda mengedit dan menjalankan ulang sel di notebook Apache Beam, sel tidak mengurungkan tindakan yang dimaksudkan dari kode di sel asli. Misalnya, jika sel menambahkan PTransform ke pipeline, menjalankan kembali sel tersebut akan menambahkan PTransform lain ke pipeline. Jika Anda ingin menghapus status, mulai ulang kernel, lalu jalankan kembali sel.

    Memvisualisasikan data melalui pemeriksa Interactive Beam

    Anda mungkin merasa terganggu saat memeriksa data PCollection dengan terus-menerus memanggil show() dan collect(), terutama saat output menggunakan banyak ruang di layar dan menyulitkan Anda untuk menavigasi notebook. Anda mungkin juga ingin membandingkan beberapa PCollections secara berdampingan untuk memvalidasi apakah transformasi berfungsi seperti yang diharapkan. Misalnya, saat satu PCollection mengalami transformasi dan menghasilkan PCollection lainnya. Untuk kasus penggunaan ini, pemeriksa Interactive Beam adalah solusi yang praktis.

    Pemeriksa Beam interaktif disediakan sebagai ekstensi JupyterLab apache-beam-jupyterlab-sidepanel yang sudah diinstal sebelumnya di notebook Apache Beam. Dengan ekstensi ini, Anda dapat memeriksa status pipeline dan data yang terkait dengan setiap PCollection secara interaktif tanpa memanggil show() atau collect() secara eksplisit.

    Ada 3 cara untuk membuka pemeriksa:

    • Klik Interactive Beam di panel menu atas JupyterLab. Di dropdown, temukan Open Inspector, lalu klik untuk membuka pemeriksa.

      Membuka pemeriksa melalui menu

    • Gunakan halaman peluncur. Jika tidak ada halaman peluncur yang terbuka, klik File -> New Launcher untuk membukanya. Di halaman peluncur, temukan Interactive Beam lalu klik Open Inspector untuk membuka pemeriksa.

      Membuka inspector melalui peluncur

    • Gunakan palet perintah. Di panel menu JupyterLab, klik View > Activate Command Palette. Dalam dialog, telusuri Interactive Beam untuk mencantumkan semua opsi ekstensi. Klik Open Inspector untuk membuka pemeriksa.

      Membuka pemeriksa melalui palet perintah

    Saat pemeriksa akan terbuka:

    • Jika hanya ada satu notebook yang terbuka, pemeriksa akan otomatis terhubung ke notebook tersebut.

    • Jika tidak ada notebook yang terbuka, dialog akan muncul yang memungkinkan Anda memilih kernel.

    • Jika beberapa notebook terbuka, dialog akan muncul yang memungkinkan Anda memilih sesi notebook.

      Pilih notebook yang akan dihubungkan

    Sebaiknya buka setidaknya satu notebook dan pilih kernel untuk notebook tersebut sebelum membuka pemeriksa. Jika Anda membuka pemeriksa dengan kernel sebelum membuka notebook, nanti saat Anda membuka notebook untuk terhubung ke pemeriksa, Anda harus memilih Interactive Beam Inspector Session dari Use Kernel from Preferred Session. Pemeriksa dan notebook terhubung saat keduanya berbagi sesi yang sama, bukan sesi berbeda yang dibuat dari kernel yang sama. Memilih kernel yang sama dari Start Preferred Kernel akan membuat sesi baru yang terpisah dari sesi notebook atau pemeriksa yang sudah dibuka.

    Anda dapat membuka beberapa pemeriksa untuk notebook yang terbuka dan mengatur pemeriksa dengan menarik lalu melepas tabnya secara bebas di ruang kerja.

    Buka 2 pemeriksa dan susun berdampingan

    Halaman pemeriksa akan otomatis dimuat ulang saat Anda menjalankan sel di notebook. Halaman ini mencantumkan pipeline dan PCollections yang ditentukan dalam notebook yang terhubung. PCollections diatur berdasarkan pipeline tempatnya berada, dan Anda dapat menciutkannya dengan mengklik pipeline header.

    Untuk item dalam daftar PCollections dan alur, saat diklik, pemeriksa akan merender visualisasi yang sesuai di sisi kanan:

    • Jika berupa PCollection, pemeriksa akan merender datanya (secara dinamis jika data masih masuk untuk PCollections yang tidak terikat) dengan widget tambahan untuk menyesuaikan visualisasi setelah mengklik tombol APPLY.

      Halaman pemeriksa

      Karena inspektur dan notebook yang dibuka berbagi sesi kernel yang sama, keduanya saling memblokir agar tidak berjalan. Misalnya, jika notebook sedang menjalankan kode, inspektur tidak akan diperbarui hingga notebook menyelesaikan eksekusi tersebut. Sebaliknya, jika Anda ingin menjalankan kode secara langsung di notebook saat pemeriksa memvisualisasikan PCollection secara dinamis, Anda harus mengklik tombol STOP untuk menghentikan visualisasi dan melepaskan kernel ke notebook secara lebih awal.

    • Jika berupa pipeline, pemeriksa akan menampilkan grafik pipeline.

      Halaman pemeriksa

    Anda mungkin melihat pipeline anonim. Pipeline tersebut memiliki PCollections yang dapat Anda akses, tetapi tidak lagi dirujuk oleh sesi utama. Contoh:

    p = beam.Pipeline()
    pcoll = p | beam.Create([1, 2, 3])
    
    p = beam.Pipeline()
    

    Contoh sebelumnya membuat pipeline p kosong dan pipeline anonim yang berisi satu PCollection pcoll. Anda dapat mengakses pipeline anonim menggunakan pcoll.pipeline.

    Anda dapat mengganti pipeline dan daftar PCollection untuk menghemat ruang bagi visualisasi besar. Mengaktifkan/menonaktifkan daftar kiri

    Memahami status perekaman pipeline

    Selain visualisasi, Anda juga dapat memeriksa status perekaman untuk satu atau semua pipeline di instance notebook dengan memanggil describe.

    # Return the recording status of a specific pipeline. Leave the parameter list empty to return
    # the recording status of all pipelines.
    ib.recordings.describe(p)
    

    Metode describe() memberikan detail berikut:

    • Ukuran total (dalam byte) semua rekaman untuk pipeline di disk
    • Waktu mulai saat tugas perekaman latar belakang dimulai (dalam detik dari epoch Unix)
    • Status pipeline saat ini dari tugas perekaman di latar belakang
    • Variabel Python untuk pipeline

    Meluncurkan tugas Dataflow dari pipeline yang dibuat di notebook Anda

    1. Opsional: Sebelum menggunakan notebook untuk menjalankan tugas Dataflow, mulai ulang kernel, jalankan ulang semua sel, dan verifikasi output. Jika Anda melewati langkah ini, status tersembunyi di notebook dapat memengaruhi grafik tugas di objek pipeline.
    2. Aktifkan Dataflow API.
    3. Tambahkan pernyataan impor berikut:

      from apache_beam.runners import DataflowRunner
      
    4. Teruskan opsi pipeline Anda.

      # Set up Apache Beam pipeline options.
      options = pipeline_options.PipelineOptions()
      
      # Set the project to the default project in your current Google Cloud
      # environment.
      _, options.view_as(GoogleCloudOptions).project = google.auth.default()
      
      # Set the Google Cloud region to run Dataflow.
      options.view_as(GoogleCloudOptions).region = 'us-central1'
      
      # Choose a Cloud Storage location.
      dataflow_gcs_location = 'gs://<change me>/dataflow'
      
      # Set the staging location. This location is used to stage the
      # Dataflow pipeline and SDK binary.
      options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
      
      # Set the temporary location. This location is used to store temporary files
      # or intermediate results before outputting to the sink.
      options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
      
      # If and only if you are using Apache Beam SDK built from source code, set
      # the SDK location. This is used by Dataflow to locate the SDK
      # needed to run the pipeline.
      options.view_as(pipeline_options.SetupOptions).sdk_location = (
          '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %
          beam.version.__version__)
      

      Anda dapat menyesuaikan nilai parameter. Misalnya, Anda dapat mengubah nilai region dari us-central1.

    5. Jalankan pipeline dengan DataflowRunner. Langkah ini menjalankan tugas Anda di layanan Dataflow.

      runner = DataflowRunner()
      runner.run_pipeline(p, options=options)
      

      p adalah objek pipeline dari Membuat pipeline.

    Untuk contoh tentang cara melakukan konversi ini di notebook interaktif, lihat notebook Penghitung Kata Dataflow di instance notebook Anda.

    Atau, Anda dapat mengekspor notebook sebagai skrip yang dapat dieksekusi, mengubah file .py yang dihasilkan menggunakan langkah-langkah sebelumnya, lalu men-deploy pipeline ke layanan Dataflow.

    Menyimpan notebook Anda

    Notebook yang Anda buat disimpan secara lokal di instance notebook yang sedang berjalan. Jika Anda mereset atau mematikan instance notebook selama pengembangan, notebook baru tersebut akan dipertahankan selama dibuat di direktori /home/jupyter. Namun, jika instance notebook dihapus, notebook tersebut juga akan dihapus.

    Untuk menyimpan notebook Anda untuk digunakan di masa mendatang, download notebook secara lokal ke workstation Anda, simpan ke GitHub, atau ekspor ke format file lain.

    Menyimpan notebook Anda ke persistent disk tambahan

    Jika Anda ingin menyimpan pekerjaan Anda seperti notebook dan skrip di berbagai instance notebook, simpan di Persistent Disk.

    1. Buat atau pasang Persistent Disk. Ikuti petunjuk untuk menggunakan ssh guna terhubung ke VM instance notebook dan mengeluarkan perintah di Cloud Shell yang terbuka.

    2. Catat direktori tempat Persistent Disk dipasang, misalnya, /mnt/myDisk.

    3. Edit detail VM instance notebook untuk menambahkan entri ke Custom metadata: key - container-custom-params; value - -v /mnt/myDisk:/mnt/myDisk. Metadata tambahan yang diperlukan untuk mengikat PD yang ter-mount

    4. Klik Simpan.

    5. Untuk memperbarui perubahan ini, reset instance notebook. Mereset instance notebook

    6. Setelah direset, klik Open JupyterLab. UI JupyterLab mungkin memerlukan waktu untuk tersedia. Setelah UI muncul, buka terminal dan jalankan perintah berikut: ls -al /mnt Direktori /mnt/myDisk akan tercantum. Mencantumkan volume terikat

    Sekarang Anda dapat menyimpan pekerjaan Anda ke direktori /mnt/myDisk. Meskipun instance notebook dihapus, Persistent Disk tetap ada di project Anda. Selanjutnya, Anda dapat memasang Persistent Disk ini ke instance notebook lain.

    Pembersihan

    Setelah selesai menggunakan instance notebook Apache Beam, bersihkan resource yang Anda buat di Google Cloud dengan menonaktifkan instance notebook.

    Langkah berikutnya