Menggunakan penampung kustom dengan library C++


Dalam tutorial ini, Anda akan membuat pipeline yang menggunakan penampung kustom dengan library C++ untuk menjalankan alur kerja Dataflow HPC yang sangat paralel. Gunakan tutorial ini untuk mempelajari cara menggunakan Dataflow dan Apache Beam untuk menjalankan aplikasi komputasi grid yang memerlukan data untuk didistribusikan ke fungsi yang berjalan di banyak core.

Tutorial ini menunjukkan cara menjalankan pipeline terlebih dahulu dengan menggunakan Direct Runner, lalu menggunakan Dataflow Runner. Dengan menjalankan pipeline secara lokal, Anda dapat menguji pipeline sebelum men-deploynya.

Contoh ini menggunakan binding dan fungsi Cython dari library GMP. Terlepas dari library atau alat binding yang Anda gunakan, Anda dapat menerapkan prinsip yang sama ke pipeline.

Contoh kode ini tersedia di GitHub.

Tujuan

  • Membuat pipeline yang menggunakan penampung kustom dengan library C++.

  • Build image container Docker menggunakan Dockerfile.

  • Kemas kode dan dependensi ke dalam container Docker.

  • Jalankan pipeline secara lokal untuk mengujinya.

  • Jalankan pipeline di lingkungan terdistribusi.

Biaya

Dalam dokumen ini, Anda menggunakan komponen Google Cloud yang dapat ditagih berikut:

  • Artifact Registry
  • Cloud Build
  • Cloud Storage
  • Compute Engine
  • Dataflow

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga. Pengguna baru Google Cloud mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Setelah menyelesaikan tugas yang dijelaskan dalam dokumen ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Untuk mengetahui informasi selengkapnya, lihat Pembersihan.

Sebelum memulai

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Buat atau pilih project Google Cloud.

    • Membuat project Google Cloud:

      gcloud projects create PROJECT_ID

      Ganti PROJECT_ID dengan nama untuk project Google Cloud yang Anda buat.

    • Pilih project Google Cloud yang Anda buat:

      gcloud config set project PROJECT_ID

      Ganti PROJECT_ID dengan nama project Google Cloud Anda.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Aktifkan API Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  7. Buat kredensial autentikasi lokal untuk Akun Google Anda:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Buat atau pilih project Google Cloud.

    • Membuat project Google Cloud:

      gcloud projects create PROJECT_ID

      Ganti PROJECT_ID dengan nama untuk project Google Cloud yang Anda buat.

    • Pilih project Google Cloud yang Anda buat:

      gcloud config set project PROJECT_ID

      Ganti PROJECT_ID dengan nama project Google Cloud Anda.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Aktifkan API Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  14. Buat kredensial autentikasi lokal untuk Akun Google Anda:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Buat akun layanan pekerja yang dikelola pengguna untuk pipeline baru Anda dan berikan peran yang diperlukan ke akun layanan.

    1. Untuk membuat akun layanan, jalankan gcloud iam service-accounts create perintah:

      gcloud iam service-accounts create parallelpipeline \
          --description="Highly parallel pipeline worker service account" \
          --display-name="Highly parallel data pipeline access"
    2. Memberikan peran ke akun layanan. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/storage.objectAdmin
      • roles/artifactregistry.reader
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:parallelpipeline@PROJECT_ID." --role=SERVICE_ACCOUNT_ROLE

      Ganti SERVICE_ACCOUNT_ROLE dengan setiap peran individual.

    3. Memberi Akun Google Anda peran yang memungkinkan Anda membuat token akses untuk akun layanan:

      gcloud iam service-accounts add-iam-policy-binding parallelpipeline@PROJECT_ID. --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator

Mendownload contoh kode dan mengubah direktori

Download contoh kode, lalu ubah direktori. Contoh kode di repositori GitHub menyediakan semua kode yang Anda perlukan untuk menjalankan pipeline ini. Jika sudah siap membuat pipeline Anda sendiri, Anda dapat menggunakan kode contoh ini sebagai template.

Clone repositori contoh beam-cpp.

  1. Gunakan perintah git clone untuk meng-clone repositori GitHub:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. Beralih ke direktori aplikasi:

    cd dataflow-sample-applications/beam-cpp-example
    

Kode pipeline

Anda dapat menyesuaikan kode pipeline dari tutorial ini. Pipeline ini menyelesaikan tugas berikut:

  • Secara dinamis menghasilkan semua bilangan bulat dalam rentang input.
  • Menjalankan bilangan bulat melalui fungsi C++ dan memfilter nilai yang buruk.
  • Menulis nilai yang buruk ke saluran samping.
  • Menghitung kemunculan setiap waktu berhenti dan menormalisasi hasilnya.
  • Mencetak output, memformat, dan menulis hasilnya ke file teks.
  • Membuat PCollection dengan satu elemen.
  • Memproses elemen tunggal dengan fungsi map dan meneruskan frekuensi PCollection sebagai input samping.
  • Memproses PCollection dan menghasilkan satu output.

File awal terlihat seperti berikut:

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import argparse
import logging
import os
import sys


def run(argv):
  # Import here to avoid __main__ session pickling issues.
  import io
  import itertools
  import matplotlib.pyplot as plt
  import collatz

  import apache_beam as beam
  from apache_beam.io import restriction_trackers
  from apache_beam.options.pipeline_options import PipelineOptions

  class RangeSdf(beam.DoFn, beam.RestrictionProvider):
    """An SDF producing all the integers in the input range.

    This is preferable to beam.Create(range(...)) as it produces the integers
    dynamically rather than materializing them up front.  It is an SDF to do
    so with perfect dynamic sharding.
    """
    def initial_restriction(self, desired_range):
      start, stop = desired_range
      return restriction_trackers.OffsetRange(start, stop)

    def restriction_size(self, _, restriction):
      return restriction.size()

    def create_tracker(self, restriction):
      return restriction_trackers.OffsetRestrictionTracker(restriction)

    def process(self, _, active_range=beam.DoFn.RestrictionParam()):
      for i in itertools.count(active_range.current_restriction().start):
        if active_range.try_claim(i):
          yield i
        else:
          break

  class GenerateIntegers(beam.PTransform):
    def __init__(self, start, stop):
      self._start = start
      self._stop = stop

    def expand(self, p):
      return (
          p
          | beam.Create([(self._start, self._stop + 1)])
          | beam.ParDo(RangeSdf()))

  parser = argparse.ArgumentParser()
  parser.add_argument('--start', dest='start', type=int, default=1)
  parser.add_argument('--stop', dest='stop', type=int, default=10000)
  parser.add_argument('--output', default='./out.png')

  known_args, pipeline_args = parser.parse_known_args(argv)
  # Store this as a local to avoid capturing the full known_args.
  output_path = known_args.output

  with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:

    # Generate the integers from start to stop (inclusive).
    integers = p | GenerateIntegers(known_args.start, known_args.stop)

    # Run them through our C++ function, filtering bad records.
    # Requires apache beam 2.34 or later.
    stopping_times, bad_values = (
        integers
        | beam.Map(collatz.total_stopping_time).with_exception_handling(
            use_subprocess=True))

    # Write the bad values to a side channel.
    bad_values | 'WriteBadValues' >> beam.io.WriteToText(
        os.path.splitext(output_path)[0] + '-bad.txt')

    # Count the occurrence of each stopping time and normalize.
    total = known_args.stop - known_args.start + 1
    frequencies = (
        stopping_times
        | 'Aggregate' >> (beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum))
        | 'Normalize' >> beam.MapTuple(lambda x, count: (x, count / total)))

    if known_args.stop <= 10:
      # Print out the results for debugging.
      frequencies | beam.Map(print)
    else:
      # Format and write them to a text file.
      (
          frequencies
          | 'Format' >> beam.MapTuple(lambda count, freq: f'{count}, {freq}')
          | beam.io.WriteToText(os.path.splitext(output_path)[0] + '.txt'))

    # Define some helper functions.
    def make_scatter_plot(xy):
      x, y = zip(*xy)
      plt.plot(x, y, '.')
      png_bytes = io.BytesIO()
      plt.savefig(png_bytes, format='png')
      png_bytes.seek(0)
      return png_bytes.read()

    def write_to_path(path, content):
      """Most Beam IOs write multiple elements to some kind of a container
      file (e.g. strings to lines of a text file, avro records to an avro file,
      etc.)  This function writes each element to its own file, given by path.
      """
      # Write to a temporary path and to a rename for fault tolerence.
      tmp_path = path + '.tmp'
      fs = beam.io.filesystems.FileSystems.get_filesystem(path)
      with fs.create(tmp_path) as fout:
        fout.write(content)
      fs.rename([tmp_path], [path])

    (
        p
        # Create a PCollection with a single element.
        | 'CreateSingleton' >> beam.Create([None])
        # Process the single element with a Map function, passing the frequency
        # PCollection as a side input.
        # This will cause the normally distributed frequency PCollection to be
        # colocated and processed as a single unit, producing a single output.
        | 'MakePlot' >> beam.Map(
            lambda _,
            data: make_scatter_plot(data),
            data=beam.pvalue.AsList(frequencies))
        # Pair this with the desired filename.
        |
        'PairWithFilename' >> beam.Map(lambda content: (output_path, content))
        # And actually write it out, using MapTuple to split the tuple into args.
        | 'WriteToOutput' >> beam.MapTuple(write_to_path))


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run(sys.argv)

Menyiapkan lingkungan pengembangan

  1. Gunakan Apache Beam SDK untuk Python.

  2. Instal library GMP:

    apt-get install libgmp3-dev
    
  3. Untuk menginstal dependensi, gunakan file requirements.txt.

    pip install -r requirements.txt
    
  4. Untuk mem-build binding Python, jalankan perintah berikut.

    python setup.py build_ext --inplace
    

Anda dapat menyesuaikan file requirements.txt dari tutorial ini. File awal mencakup dependensi berikut:

#
#    Licensed to the Apache Software Foundation (ASF) under one or more
#    contributor license agreements.  See the NOTICE file distributed with
#    this work for additional information regarding copyright ownership.
#    The ASF licenses this file to You under the Apache License, Version 2.0
#    (the "License"); you may not use this file except in compliance with
#    the License.  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

apache-beam[gcp]==2.46.0
cython==0.29.24
pyparsing==2.4.2
matplotlib==3.4.3

Menjalankan pipeline secara lokal

Menjalankan pipeline secara lokal berguna untuk pengujian. Dengan menjalankan pipeline secara lokal, Anda dapat mengonfirmasi bahwa pipeline berjalan dan berperilaku seperti yang diharapkan sebelum men-deploy pipeline ke lingkungan terdistribusi.

Anda dapat menjalankan pipeline secara lokal menggunakan perintah berikut. Perintah ini menghasilkan gambar bernama out.png.

python pipeline.py

Membuat resource Google Cloud

Bagian ini menjelaskan cara membuat resource berikut:

  • Bucket Cloud Storage yang akan digunakan sebagai lokasi penyimpanan sementara dan lokasi output.
  • Penampung Docker untuk memaketkan kode dan dependensi pipeline.

Membuat bucket Cloud Storage

Mulai dengan membuat bucket Cloud Storage menggunakan Google Cloud CLI. Bucket ini digunakan sebagai lokasi penyimpanan sementara oleh pipeline Dataflow.

Untuk membuat bucket, gunakan perintah gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Ganti kode berikut:

  • BUCKET_NAME: nama untuk bucket Cloud Storage Anda yang memenuhi persyaratan penamaan bucket. Nama bucket Cloud Storage harus unik secara global.
  • LOCATION: lokasi untuk bucket.

Membuat dan mem-build image container

Anda dapat menyesuaikan Dockerfile dari tutorial ini. File awal terlihat seperti berikut:

FROM apache/beam_python3.9_sdk:2.46.0

# Install a C++ library.
RUN apt-get update
RUN apt-get install -y libgmp3-dev

# Install Python dependencies.
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

# Install the code and some python bindings.
COPY pipeline.py pipeline.py
COPY collatz.pyx collatz.pyx
COPY setup.py setup.py
RUN python setup.py install

Dockerfile ini berisi perintah FROM, COPY, dan RUN, yang dapat Anda baca di referensi Dockerfile.

  1. Untuk mengupload artefak, buat repositori Artifact Registry. Setiap repositori dapat berisi artefak untuk satu format yang didukung.

    Semua konten repositori dienkripsi menggunakan kunci milik dan dikelola Google atau kunci enkripsi yang dikelola pelanggan. Artifact Registry menggunakan kunci yang dimiliki dan dikelola Google secara default, dan tidak memerlukan konfigurasi untuk opsi ini.

    Anda harus memiliki setidaknya akses Artifact Registry Writer ke repositori.

    Jalankan perintah berikut untuk membuat repositori baru. Perintah ini menggunakan flag --async dan segera ditampilkan, tanpa menunggu operasi yang sedang berlangsung selesai.

    gcloud artifacts repositories create REPOSITORY \
       --repository-format=docker \
       --location=LOCATION \
       --async
    

    Ganti REPOSITORY dengan nama untuk repositori Anda. Untuk setiap lokasi repositori dalam project, nama repositori harus unik.

  2. Buat Dockerfile.

    Agar paket menjadi bagian dari penampung Apache Beam, Anda harus menentukannya sebagai bagian dari file requirements.txt. Pastikan Anda tidak menentukan apache-beam sebagai bagian dari file requirements.txt. Penampung Apache Beam sudah memiliki apache-beam.

  3. Sebelum Anda dapat mengirim atau mengambil image, konfigurasikan Docker untuk mengautentikasi permintaan untuk Artifact Registry. Untuk menyiapkan autentikasi ke repositori Docker, jalankan perintah berikut:

    gcloud auth configure-docker LOCATION-docker.pkg.dev
    

    Perintah ini memperbarui konfigurasi Docker Anda. Sekarang Anda dapat terhubung dengan Artifact Registry di project Google Cloud untuk mengirim image.

  4. Build image Docker menggunakan Dockerfile dengan Cloud Build.

    Perbarui jalur dalam perintah berikut agar cocok dengan Dockerfile yang Anda buat. Perintah ini akan mem-build file dan mengirimkannya ke repositori Artifact Registry Anda.

    gcloud builds submit --tag LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest .
    

Mengemas kode dan dependensi dalam container Docker

  1. Untuk menjalankan pipeline ini di lingkungan terdistribusi, paketkan kode dan dependensi ke dalam penampung docker.

    docker build . -t cpp_beam_container
    
  2. Setelah memaketkan kode dan dependensi, Anda dapat menjalankan pipeline secara lokal untuk mengujinya.

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container"
    

    Perintah ini menulis output di dalam image Docker. Untuk melihat output, jalankan pipeline dengan --output, dan tulis output ke bucket Cloud Storage. Misalnya, jalankan perintah berikut.

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container" \
       --output=gs://BUCKET_NAME/out.png
    

Menjalankan pipeline

Sekarang Anda dapat menjalankan pipeline Apache Beam di Dataflow dengan merujuk ke file dengan kode pipeline dan meneruskan parameter yang diperlukan oleh pipeline.

Di shell atau terminal, jalankan pipeline dengan Dataflow Runner.

python pipeline.py \
    --runner=DataflowRunner \
    --project=PROJECT_ID \
    --region=REGION \
    --temp_location=gs://BUCKET_NAME/tmp \
    --sdk_container_image="LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest" \
    --experiment=use_runner_v2 \
    --output=gs://BUCKET_NAME/out.png

Setelah Anda mengeksekusi perintah untuk menjalankan pipeline, Dataflow akan menampilkan ID Tugas dengan status tugas Diantrekan. Mungkin perlu waktu beberapa menit sebelum status tugas mencapai Running dan Anda dapat mengakses grafik tugas.

Melihat hasil

Melihat data yang ditulis ke bucket Cloud Storage Anda. Gunakan perintah gcloud storage ls untuk menampilkan daftar konten di level teratas bucket Anda:

gcloud storage ls gs://BUCKET_NAME

Jika berhasil, perintah menampilkan pesan yang mirip dengan:

gs://BUCKET_NAME/out.png

Pembersihan

Agar tidak perlu membayar biaya pada akun Google Cloud Anda untuk resource yang digunakan dalam tutorial ini, hapus project yang berisi resource tersebut, atau simpan project dan hapus setiap resource.

Menghapus project

Cara termudah untuk menghilangkan penagihan adalah dengan menghapus project Google Cloud yang Anda buat untuk tutorial.

  1. Di konsol Google Cloud, buka halaman Manage resource.

    Buka Manage resource

  2. Pada daftar project, pilih project yang ingin Anda hapus, lalu klik Delete.
  3. Pada dialog, ketik project ID, lalu klik Shut down untuk menghapus project.

Menghapus resource satu per satu

Jika Anda ingin menggunakan kembali project, hapus resource yang Anda buat untuk tutorial.

Menghapus resource project Google Cloud

  1. Hapus repositori Artifact Registry.

    gcloud artifacts repositories delete REPOSITORY \
       --location=LOCATION --async
    
  2. Hapus bucket Cloud Storage. Bucket ini saja tidak dikenai biaya apa pun.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Mencabut kredensial

  1. Cabut peran yang Anda berikan ke akun layanan pekerja yang dikelola pengguna. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    • roles/artifactregistry.reader
    gcloud projects remove-iam-policy-binding PROJECT_ID \
      --member=serviceAccount:parallelpipeline@PROJECT_ID. \
      --role=SERVICE_ACCOUNT_ROLE
  2. Opsional: Cabut kredensial autentikasi yang Anda buat, dan hapus file kredensial lokal.

    gcloud auth application-default revoke
  3. Opsional: Cabut kredensial dari gcloud CLI.

    gcloud auth revoke

Langkah selanjutnya