Menggunakan fitur lanjutan notebook Apache Beam

Menggunakan runner interaktif Apache Beam dengan notebook JupyterLab memungkinkan Anda secara iteratif mengembangkan pipeline, memeriksa grafik pipeline, dan mengurai PCollections individual dalam alur kerja read-eval-print-loop (REPL). Untuk tutorial yang menunjukkan cara menggunakan runner interaktif Apache Beam dengan notebook JupyterLab, lihat Mengembangkan dengan notebook Apache Beam.

Halaman ini menyediakan detail tentang fitur lanjutan yang dapat digunakan dengan notebook Apache Beam Anda.

FlinkRunner interaktif di cluster yang dikelola notebook

Untuk menangani data berukuran produksi secara interaktif dari notebook, Anda dapat menggunakan FlinkRunner dengan beberapa opsi pipeline umum untuk memberi tahu sesi notebook agar mengelola cluster Dataproc yang tahan lama dan menjalankan pipeline Apache Beam secara terdistribusi.

Prasyarat

Untuk menggunakan fitur ini:

  • Mengaktifkan Dataproc API.
  • Berikan peran admin atau editor ke akun layanan yang menjalankan instance notebook untuk Dataproc.
  • Gunakan kernel notebook dengan Apache Beam SDK versi 2.40.0 atau yang lebih baru.

Konfigurasi

Setidaknya, Anda memerlukan penyiapan berikut:

# Set a Cloud Storage bucket to cache source recording and PCollections.
# By default, the cache is on the notebook instance itself, but that does not
# apply to the distributed execution scenario.
ib.options.cache_root = 'gs://<BUCKET_NAME>/flink'

# Define an InteractiveRunner that uses the FlinkRunner under the hood.
interactive_flink_runner = InteractiveRunner(underlying_runner=FlinkRunner())

options = PipelineOptions()
# Instruct the notebook that Google Cloud is used to run the FlinkRunner.
cloud_options = options.view_as(GoogleCloudOptions)
cloud_options.project = 'PROJECT_ID'

Ketentuan eksplisit (opsional)

Anda dapat menambahkan opsi berikut.

# Change this if the pipeline needs to run in a different region
# than the default, 'us-central1'. For example, to set it to 'us-west1':
cloud_options.region = 'us-west1'

# Explicitly provision the notebook-managed cluster.
worker_options = options.view_as(WorkerOptions)
# Provision 40 workers to run the pipeline.
worker_options.num_workers=40
# Use the default subnetwork.
worker_options.subnetwork='default'
# Choose the machine type for the workers.
worker_options.machine_type='n1-highmem-8'

# When working with non-official Apache Beam releases, such as Apache Beam built from source
# code, configure the environment to use a compatible released SDK container.
# If needed, build a custom container and use it. For more information, see:
# https://beam.apache.org/documentation/runtime/environments/
options.view_as(PortableOptions).environment_config = 'apache/beam_python3.7_sdk:2.41.0 or LOCATION.pkg.dev/PROJECT_ID/REPOSITORY/your_custom_container'

Penggunaan

# The parallelism is applied to each step, so if your pipeline has 10 steps, you
# end up having 10 * 10 = 100 tasks scheduled, which can be run in parallel.
options.view_as(FlinkRunnerOptions).parallelism = 10

p_word_count = beam.Pipeline(interactive_flink_runner, options=options)
word_counts = (
    p_word_count
    | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
    | 'count' >> beam.combiners.Count.PerElement())
# The notebook session automatically starts and manages a cluster to run
# your pipelines with the FlinkRunner.
ib.show(word_counts)

# Interactively adjust the parallelism.
options.view_as(FlinkRunnerOptions).parallelism = 150
# The BigQuery read needs a Cloud Storage bucket as a temporary location.
options.view_as(GoogleCloudOptions).temp_location = ib.options.cache_root
p_bq = beam.Pipeline(runner=interactive_flink_runner, options=options)
delays_by_airline = (
    p_bq
    | 'Read Dataset from BigQuery' >> beam.io.ReadFromBigQuery(
        project=project, use_standard_sql=True,
        query=('SELECT airline, arrival_delay '
               'FROM `bigquery-samples.airline_ontime_data.flights` '
               'WHERE date >= "2010-01-01"'))
    | 'Rebalance Data to TM Slots' >> beam.Reshuffle(num_buckets=1000)
    | 'Extract Delay Info' >> beam.Map(
        lambda e: (e['airline'], e['arrival_delay'] > 0))
    | 'Filter Delayed' >> beam.Filter(lambda e: e[1])
    | 'Count Delayed Flights Per Airline' >> beam.combiners.Count.PerKey())
# This step reuses the existing cluster.
ib.collect(delays_by_airline)

# Describe the cluster running the pipelines.
# You can access the Flink dashboard from the printed link.
ib.clusters.describe()

# Cleans up all long-lasting clusters managed by the notebook session.
ib.clusters.cleanup(force=True)

Cluster yang dikelola notebook

  • Secara default, jika Anda tidak memberikan opsi pipeline apa pun, Interactive Apache Beam selalu menggunakan kembali cluster yang terakhir digunakan untuk menjalankan pipeline dengan FlinkRunner.
    • Untuk menghindari perilaku ini, misalnya, untuk menjalankan pipeline lain di sesi notebook yang sama dengan FlinkRunner yang tidak dihosting oleh notebook, jalankan ib.clusters.set_default_cluster(None).
  • Saat membuat instance pipeline baru yang menggunakan konfigurasi project, region, dan penyediaan yang dipetakan ke cluster Dataproc yang ada, Dataflow juga akan menggunakan kembali cluster tersebut, meskipun mungkin tidak menggunakan cluster yang terakhir digunakan.
  • Namun, setiap kali perubahan penyediaan diberikan, misalnya saat mengubah ukuran cluster, cluster baru akan dibuat untuk menjalankan perubahan yang diinginkan. Jika Anda ingin mengubah ukuran cluster, agar resource cloud tidak habis, bersihkan cluster yang tidak perlu menggunakan ib.clusters.cleanup(pipeline).
  • Saat master_url Flink ditentukan, jika cluster tersebut termasuk dalam cluster yang dikelola oleh sesi notebook, Dataflow akan menggunakan kembali cluster yang dikelola.
    • Jika master_url tidak diketahui oleh sesi notebook, berarti FlinkRunner yang dihosting sendiri oleh pengguna diperlukan. {i>Notebook<i} tidak melakukan apa pun secara implisit.

Pemecahan masalah

Bagian ini memberikan informasi untuk membantu Anda memecahkan masalah dan men-debug FlinkRunner Interaktif pada cluster yang dikelola notebook.

Agar lebih mudah, konfigurasi buffering jaringan Flink tidak akan diekspos untuk konfigurasi.

Jika grafik tugas terlalu rumit atau paralelisme ditetapkan terlalu tinggi, kardinalitas langkah yang dikalikan dengan paralelisme mungkin terlalu besar, menyebabkan terlalu banyak tugas yang dijadwalkan secara paralel, dan gagal dalam eksekusi.

Gunakan tips berikut untuk meningkatkan kecepatan lari interaktif:

  • Hanya tetapkan PCollection yang ingin Anda periksa ke variabel.
  • Periksa PCollections satu per satu.
  • Gunakan reshuffle setelah transformasi fanout tinggi.
  • Menyesuaikan paralelisme berdasarkan ukuran data. Terkadang lebih kecil itu lebih cepat.

Membutuhkan waktu terlalu lama untuk memeriksa data

Periksa dasbor Flink untuk tugas yang sedang berjalan. Anda mungkin melihat langkah dengan ratusan tugas telah selesai dan hanya satu yang tersisa, karena data yang sedang berlangsung berada di satu mesin dan tidak diacak.

Selalu gunakan perombakan setelah transformasi fanout yang tinggi, seperti saat:

  • Membaca baris dari file
  • Membaca baris dari tabel BigQuery

Tanpa perombakan, data fanout selalu dijalankan pada pekerja yang sama, dan Anda tidak dapat memanfaatkan paralelisme.

Berapa banyak pekerja yang saya butuhkan?

Prinsipnya, cluster Flink memiliki jumlah vCPU dikalikan dengan jumlah slot pekerja. Misalnya, jika Anda memiliki 40 pekerja n1-highmem-8, cluster Flink memiliki maksimal 320 slot, atau 8 dikalikan dengan 40.

Idealnya, pekerja dapat mengelola tugas yang membaca, memetakan, dan menggabungkan dengan paralelisme yang ditetapkan dalam ratusan, yang menjadwalkan ribuan tugas secara paralel.

Apakah fitur ini dapat digunakan untuk streaming?

Pipeline streaming saat ini tidak kompatibel dengan Flink interaktif pada fitur cluster yang dikelola notebook.

Beam SQL dan beam_sql keajaiban

Beam SQL memungkinkan Anda membuat kueri PCollections yang terikat dan tidak terbatas dengan pernyataan SQL. Jika menggunakan notebook Apache Beam, Anda dapat menggunakan keajaiban kustom beam_sql IPython untuk mempercepat pengembangan pipeline.

Anda dapat memeriksa penggunaan ajaib beam_sql dengan opsi -h atau --help:

Memeriksa bantuan beam_sql

Anda dapat membuat PCollection dari nilai konstanta:

Membuat PCollection dari nilai konstanta

Anda dapat bergabung ke beberapa PCollections:

Gabung dengan beberapa PCollections

Anda dapat meluncurkan tugas Dataflow dengan opsi -r DataflowRunner atau --runner DataflowRunner:

Meluncurkan tugas Dataflow dengan Apache Beam SQL

Untuk mempelajari lebih lanjut, lihat contoh notebook Apache Beam SQL di notebook.

Mempercepat penggunaan JIT compiler dan GPU

Anda dapat menggunakan library seperti numba dan GPU untuk mempercepat kode Python dan pipeline Apache Beam. Di instance notebook Apache Beam yang dibuat dengan GPU nvidia-tesla-t4, untuk berjalan pada GPU, kompilasi kode Python Anda dengan numba.cuda.jit. Secara opsional, untuk mempercepat eksekusi di CPU, kompilasi kode Python ke dalam kode mesin dengan numba.jit atau numba.njit.

Contoh berikut membuat DoFn yang memproses pada GPU:

class Sampler(beam.DoFn):
    def __init__(self, blocks=80, threads_per_block=64):
        # Uses only 1 cuda grid with below config.
        self.blocks = blocks
        self.threads_per_block = threads_per_block

    def setup(self):
        import numpy as np
        # An array on host as the prototype of arrays on GPU to
        # hold accumulated sub count of points in the circle.
        self.h_acc = np.zeros(
            self.threads_per_block * self.blocks, dtype=np.float32)

    def process(self, element: Tuple[int, int]):
        from numba import cuda
        from numba.cuda.random import create_xoroshiro128p_states
        from numba.cuda.random import xoroshiro128p_uniform_float32

        @cuda.jit
        def gpu_monte_carlo_pi_sampler(rng_states, sub_sample_size, acc):
            """Uses GPU to sample random values and accumulates the sub count
            of values within a circle of radius 1.
            """
            pos = cuda.grid(1)
            if pos < acc.shape[0]:
                sub_acc = 0
                for i in range(sub_sample_size):
                    x = xoroshiro128p_uniform_float32(rng_states, pos)
                    y = xoroshiro128p_uniform_float32(rng_states, pos)
                    if (x * x + y * y) <= 1.0:
                        sub_acc += 1
                acc[pos] = sub_acc

        rng_seed, sample_size = element
        d_acc = cuda.to_device(self.h_acc)
        sample_size_per_thread = sample_size // self.h_acc.shape[0]
        rng_states = create_xoroshiro128p_states(self.h_acc.shape[0], seed=rng_seed)
        gpu_monte_carlo_pi_sampler[self.blocks, self.threads_per_block](
            rng_states, sample_size_per_thread, d_acc)
        yield d_acc.copy_to_host()

Gambar berikut menunjukkan notebook yang berjalan di GPU:

Jalankan DoFn di GPU

Detail selengkapnya dapat ditemukan di notebook contoh Menggunakan GPU dengan Apache Beam.

Membangun container kustom

Pada umumnya, jika pipeline Anda tidak memerlukan dependensi Python tambahan atau file yang dapat dieksekusi, Apache Beam dapat otomatis menggunakan image container resminya untuk menjalankan kode yang ditentukan pengguna. Gambar ini dilengkapi dengan banyak modul Python umum, dan Anda tidak perlu mem-build atau menentukannya secara eksplisit.

Dalam beberapa kasus, Anda mungkin memiliki dependensi Python tambahan atau bahkan dependensi non-Python. Dalam skenario ini, Anda dapat mem-build container kustom dan menyediakannya untuk dijalankan di cluster Flink. Daftar berikut memberikan keuntungan menggunakan penampung kustom:

  • Waktu penyiapan yang lebih cepat untuk eksekusi berturut-turut dan interaktif
  • Konfigurasi dan dependensi yang stabil
  • Lebih fleksibel: Anda dapat menyiapkan lebih banyak dari dependensi Python

Proses build container mungkin membosankan, tetapi Anda dapat melakukan semuanya di notebook menggunakan pola penggunaan berikut.

Membuat ruang kerja lokal

Pertama, buat direktori kerja lokal di bawah direktori home Jupyter.

!mkdir -p /home/jupyter/.flink

Menyiapkan dependensi Python

Selanjutnya, instal semua dependensi Python tambahan yang mungkin Anda gunakan, lalu ekspor ke file persyaratan.

%pip install dep_a
%pip install dep_b
...

Anda dapat secara eksplisit membuat file persyaratan menggunakan keajaiban notebook %%writefile.

%%writefile /home/jupyter/.flink/requirements.txt
dep_a
dep_b
...

Atau, Anda dapat membekukan semua dependensi lokal ke dalam file persyaratan. Opsi ini mungkin menimbulkan dependensi yang tidak diinginkan.

%pip freeze > /home/jupyter/.flink/requirements.txt

Menyiapkan dependensi non-Python

Salin semua dependensi non-Python ke ruang kerja. Jika Anda tidak memiliki dependensi non-Python, lewati langkah ini.

!cp /path/to/your-dep /home/jupyter/.flink/your-dep
...

Membuat Dockerfile

Buat Dockerfile dengan keajaiban notebook %%writefile. Contoh:

%%writefile /home/jupyter/.flink/Dockerfile
FROM apache/beam_python3.7_sdk:2.40.0

COPY  requirements.txt /tmp/requirements.txt
COPY  your_dep /tmp/your_dep
...

RUN python -m pip install -r /tmp/requirements.txt

Penampung contoh menggunakan gambar Apache Beam SDK versi 2.40.0 dengan Python 3.7 sebagai dasarnya, menambahkan file your_dep, dan menginstal dependensi Python tambahan. Gunakan Dockerfile ini sebagai template, lalu edit untuk kasus penggunaan Anda.

Di pipeline Apache Beam, saat merujuk ke dependensi non-Python, gunakan tujuan COPY-nya. Misalnya, /tmp/your_dep adalah jalur file dari file your_dep.

Bangun image container di Artifact Registry menggunakan Cloud Build

  1. Aktifkan layanan Cloud Build dan Artifact Registry, jika belum diaktifkan.

    !gcloud services enable cloudbuild.googleapis.com
    !gcloud services enable artifactregistry.googleapis.com
    
  2. Buat repositori Artifact Registry agar Anda dapat mengupload artefak. Setiap repositori dapat berisi artefak untuk satu format yang didukung.

    Semua konten repositori dienkripsi menggunakan kunci enkripsi yang dikelola Google atau yang dikelola pelanggan. Artifact Registry menggunakan kunci enkripsi yang dikelola Google secara default dan tidak ada konfigurasi yang diperlukan untuk opsi ini.

    Anda harus memiliki setidaknya akses Artifact Registry Writer ke repositori.

    Jalankan perintah berikut untuk membuat repositori baru. Perintah ini menggunakan flag --async dan segera ditampilkan, tanpa menunggu operasi selesai.

    gcloud artifacts repositories create REPOSITORY \
    --repository-format=docker \
    --location=LOCATION \
    --async
    

    Ganti nilai berikut:

    • REPOSITORY: nama untuk repositori Anda. Untuk setiap lokasi repositori dalam sebuah project, nama repositori harus unik.
    • LOCATION: lokasi untuk repositori Anda.
  3. Sebelum Anda dapat mengirim atau mengambil image, konfigurasikan Docker guna mengautentikasi permintaan untuk Artifact Registry. Untuk menyiapkan autentikasi ke repositori Docker, jalankan perintah berikut:

    gcloud auth configure-docker LOCATION-docker.pkg.dev
    

    Perintah ini memperbarui konfigurasi Docker Anda. Sekarang Anda dapat terhubung dengan Artifact Registry di project Google Cloud untuk mengirim image.

  4. Gunakan Cloud Build untuk mem-build image container, lalu menyimpannya ke Artifact Registry.

    !cd /home/jupyter/.flink \
    && gcloud builds submit \
     --tag LOCATION.pkg.dev/PROJECT_ID/REPOSITORY/flink:latest \
     --timeout=20m
    

    Ganti PROJECT_ID dengan project ID project Anda.

Menggunakan container kustom

Bergantung pada runner-nya, Anda dapat menggunakan container kustom untuk berbagai tujuan.

Untuk penggunaan container Apache Beam secara umum, lihat:

Untuk penggunaan container Dataflow, lihat:

Nonaktifkan alamat IP eksternal

Saat membuat instance notebook Apache Beam, nonaktifkan alamat IP eksternal untuk meningkatkan keamanan. Karena instance notebook perlu mendownload beberapa resource internet publik, seperti Artifact Registry, Anda harus membuat jaringan VPC baru tanpa alamat IP eksternal terlebih dahulu. Kemudian, buat gateway Cloud NAT untuk jaringan VPC ini. Untuk mengetahui informasi selengkapnya tentang Cloud NAT, lihat dokumentasi Cloud NAT. Gunakan jaringan VPC dan gateway Cloud NAT untuk mengakses resource internet publik yang diperlukan tanpa mengaktifkan alamat IP eksternal.