Menggunakan container kustom dengan library C++


Dalam tutorial ini, Anda akan membuat pipeline yang menggunakan container kustom dengan library C++ untuk menjalankan alur kerja HPC Dataflow yang sangat paralel. Gunakan tutorial ini untuk mempelajari cara menggunakan Dataflow dan Apache Beam untuk menjalankan aplikasi komputasi petak yang memerlukan data untuk didistribusikan ke fungsi yang berjalan di banyak core.

Tutorial ini menunjukkan cara menjalankan pipeline terlebih dahulu menggunakan Direct Runner, lalu menggunakan Dataflow Runner. Dengan menjalankan pipeline secara lokal, Anda dapat menguji pipeline sebelum men-deploy-nya.

Contoh ini menggunakan binding dan fungsi Cython dari library GMP. Terlepas dari library atau alat binding yang digunakan, Anda dapat menerapkan prinsip yang sama ke pipeline Anda.

Kode contoh tersedia di GitHub.

Tujuan

  • Membuat pipeline yang menggunakan container kustom dengan library C++.

  • Bangun image container Docker menggunakan Dockerfile.

  • Kemas kode dan dependensi ke dalam container Docker.

  • Jalankan pipeline secara lokal untuk mengujinya.

  • Jalankan pipeline di lingkungan terdistribusi.

Biaya

Dalam dokumen ini, Anda menggunakan komponen Google Cloud yang dapat ditagih berikut:

  • Artifact Registry
  • Cloud Build
  • Cloud Storage
  • Compute Engine
  • Dataflow

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga. Pengguna baru Google Cloud mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Setelah menyelesaikan tugas yang dijelaskan dalam dokumen ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Untuk mengetahui informasi selengkapnya, lihat Pembersihan.

Sebelum memulai

  1. Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. Menginstal Google Cloud CLI.
  3. Untuk initialize gcloud CLI, jalankan perintah berikut:

    gcloud init
  4. Buat atau pilih project Google Cloud.

    • Membuat project Google Cloud:

      gcloud projects create PROJECT_ID

      Ganti PROJECT_ID dengan nama untuk project Google Cloud yang Anda buat.

    • Pilih project Google Cloud yang Anda buat:

      gcloud config set project PROJECT_ID

      Ganti PROJECT_ID dengan nama project Google Cloud Anda.

  5. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  6. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  7. Buat kredensial autentikasi lokal untuk Akun Google Anda:

    gcloud auth application-default login
  8. Berikan peran ke Akun Google Anda. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Ganti PROJECT_ID dengan project ID Anda.
    • Ganti EMAIL_ADDRESS dengan alamat email Anda.
    • Ganti ROLE dengan setiap peran individual.
  9. Menginstal Google Cloud CLI.
  10. Untuk initialize gcloud CLI, jalankan perintah berikut:

    gcloud init
  11. Buat atau pilih project Google Cloud.

    • Membuat project Google Cloud:

      gcloud projects create PROJECT_ID

      Ganti PROJECT_ID dengan nama untuk project Google Cloud yang Anda buat.

    • Pilih project Google Cloud yang Anda buat:

      gcloud config set project PROJECT_ID

      Ganti PROJECT_ID dengan nama project Google Cloud Anda.

  12. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  13. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  14. Buat kredensial autentikasi lokal untuk Akun Google Anda:

    gcloud auth application-default login
  15. Berikan peran ke Akun Google Anda. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Ganti PROJECT_ID dengan project ID Anda.
    • Ganti EMAIL_ADDRESS dengan alamat email Anda.
    • Ganti ROLE dengan setiap peran individual.
  16. Buat akun layanan pekerja yang dikelola pengguna untuk pipeline baru Anda dan berikan peran yang diperlukan ke akun layanan tersebut.

    1. Untuk membuat akun layanan, jalankan perintah gcloud iam service-accounts create:

      gcloud iam service-accounts create parallelpipeline \
          --description="Highly parallel pipeline worker service account" \
          --display-name="Highly parallel data pipeline access"
    2. Memberikan peran ke akun layanan. Jalankan perintah berikut sekali untuk setiap peran IAM berikut:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/storage.objectAdmin
      • roles/artifactregistry.reader
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      Ganti SERVICE_ACCOUNT_ROLE dengan setiap peran individual.

    3. Memberi Akun Google Anda peran yang memungkinkan Anda membuat token akses untuk akun layanan:

      gcloud iam service-accounts add-iam-policy-binding parallelpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator

Mendownload contoh kode dan mengubah direktori

Download contoh kode, lalu ubah direktori. Contoh kode di repositori GitHub menyediakan semua kode yang Anda perlukan untuk menjalankan pipeline ini. Jika sudah siap untuk membangun pipeline sendiri, Anda dapat menggunakan kode contoh ini sebagai template.

Clone repositori contoh beam-cpp.

  1. Gunakan perintah git clone untuk meng-clone repositori GitHub:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. Beralih ke direktori aplikasi:

    cd dataflow-sample-applications/beam-cpp-example
    

Kode pipeline

Anda dapat menyesuaikan kode pipeline dari tutorial ini. Pipeline ini menyelesaikan tugas-tugas berikut:

  • Secara dinamis menghasilkan semua bilangan bulat dalam rentang input.
  • Menjalankan bilangan bulat melalui fungsi C++ dan memfilter nilai buruk.
  • Menuliskan nilai buruk ke saluran samping.
  • Menghitung kemunculan setiap waktu penghentian dan menormalisasi hasilnya.
  • Mencetak output, memformat, dan menulis hasilnya ke file teks.
  • Membuat PCollection dengan satu elemen.
  • Memproses satu elemen dengan fungsi map dan meneruskan frekuensi PCollection sebagai input samping.
  • Memproses PCollection dan menghasilkan satu output.

File awal terlihat seperti berikut:

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import argparse
import logging
import os
import sys

def run(argv):
  # Import here to avoid __main__ session pickling issues.
  import io
  import itertools
  import matplotlib.pyplot as plt
  import collatz

  import apache_beam as beam
  from apache_beam.io import restriction_trackers
  from apache_beam.options.pipeline_options import PipelineOptions

  class RangeSdf(beam.DoFn, beam.RestrictionProvider):
    """An SDF producing all the integers in the input range.

    This is preferable to beam.Create(range(...)) as it produces the integers
    dynamically rather than materializing them up front.  It is an SDF to do
    so with perfect dynamic sharding.
    """
    def initial_restriction(self, desired_range):
      start, stop = desired_range
      return restriction_trackers.OffsetRange(start, stop)

    def restriction_size(self, _, restriction):
      return restriction.size()

    def create_tracker(self, restriction):
      return restriction_trackers.OffsetRestrictionTracker(restriction)

    def process(self, _, active_range=beam.DoFn.RestrictionParam()):
      for i in itertools.count(active_range.current_restriction().start):
        if active_range.try_claim(i):
          yield i
        else:
          break

  class GenerateIntegers(beam.PTransform):
    def __init__(self, start, stop):
      self._start = start
      self._stop = stop

    def expand(self, p):
      return (
          p
          | beam.Create([(self._start, self._stop + 1)])
          | beam.ParDo(RangeSdf()))

  parser = argparse.ArgumentParser()
  parser.add_argument('--start', dest='start', type=int, default=1)
  parser.add_argument('--stop', dest='stop', type=int, default=10000)
  parser.add_argument('--output', default='./out.png')

  known_args, pipeline_args = parser.parse_known_args(argv)
  # Store this as a local to avoid capturing the full known_args.
  output_path = known_args.output

  with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:

    # Generate the integers from start to stop (inclusive).
    integers = p | GenerateIntegers(known_args.start, known_args.stop)

    # Run them through our C++ function, filtering bad records.
    # Requires apache beam 2.34 or later.
    stopping_times, bad_values = (
        integers
        | beam.Map(collatz.total_stopping_time).with_exception_handling(
            use_subprocess=True))

    # Write the bad values to a side channel.
    bad_values | 'WriteBadValues' >> beam.io.WriteToText(
        os.path.splitext(output_path)[0] + '-bad.txt')

    # Count the occurrence of each stopping time and normalize.
    total = known_args.stop - known_args.start + 1
    frequencies = (
        stopping_times
        | 'Aggregate' >> (beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum))
        | 'Normalize' >> beam.MapTuple(lambda x, count: (x, count / total)))

    if known_args.stop <= 10:
      # Print out the results for debugging.
      frequencies | beam.Map(print)
    else:
      # Format and write them to a text file.
      (
          frequencies
          | 'Format' >> beam.MapTuple(lambda count, freq: f'{count}, {freq}')
          | beam.io.WriteToText(os.path.splitext(output_path)[0] + '.txt'))

    # Define some helper functions.
    def make_scatter_plot(xy):
      x, y = zip(*xy)
      plt.plot(x, y, '.')
      png_bytes = io.BytesIO()
      plt.savefig(png_bytes, format='png')
      png_bytes.seek(0)
      return png_bytes.read()

    def write_to_path(path, content):
      """Most Beam IOs write multiple elements to some kind of a container
      file (e.g. strings to lines of a text file, avro records to an avro file,
      etc.)  This function writes each element to its own file, given by path.
      """
      # Write to a temporary path and to a rename for fault tolerence.
      tmp_path = path + '.tmp'
      fs = beam.io.filesystems.FileSystems.get_filesystem(path)
      with fs.create(tmp_path) as fout:
        fout.write(content)
      fs.rename([tmp_path], [path])

    (
        p
        # Create a PCollection with a single element.
        | 'CreateSingleton' >> beam.Create([None])
        # Process the single element with a Map function, passing the frequency
        # PCollection as a side input.
        # This will cause the normally distributed frequency PCollection to be
        # colocated and processed as a single unit, producing a single output.
        | 'MakePlot' >> beam.Map(
            lambda _,
            data: make_scatter_plot(data),
            data=beam.pvalue.AsList(frequencies))
        # Pair this with the desired filename.
        |
        'PairWithFilename' >> beam.Map(lambda content: (output_path, content))
        # And actually write it out, using MapTuple to split the tuple into args.
        | 'WriteToOutput' >> beam.MapTuple(write_to_path))

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run(sys.argv)

Menyiapkan lingkungan pengembangan

  1. Gunakan Apache Beam SDK untuk Python.

  2. Instal library GMP:

    apt-get install libgmp3-dev
    
  3. Untuk menginstal dependensi, gunakan file requirements.txt.

    pip install -r requirements.txt
    
  4. Untuk mem-build binding Python, jalankan perintah berikut.

    python setup.py build_ext --inplace
    

Anda dapat menyesuaikan file requirements.txt dari tutorial ini. File awal menyertakan dependensi berikut:

#
#    Licensed to the Apache Software Foundation (ASF) under one or more
#    contributor license agreements.  See the NOTICE file distributed with
#    this work for additional information regarding copyright ownership.
#    The ASF licenses this file to You under the Apache License, Version 2.0
#    (the "License"); you may not use this file except in compliance with
#    the License.  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

apache-beam[gcp]==2.46.0
cython==0.29.24
pyparsing==2.4.2
matplotlib==3.4.3

Menjalankan pipeline secara lokal

Menjalankan pipeline secara lokal berguna untuk pengujian. Dengan menjalankan pipeline secara lokal, Anda dapat mengonfirmasi bahwa pipeline berjalan dan berperilaku seperti yang diharapkan sebelum Anda men-deploy pipeline ke lingkungan terdistribusi.

Anda dapat menjalankan pipeline secara lokal menggunakan perintah berikut. Perintah ini menghasilkan image bernama out.png.

python pipeline.py

Membuat resource Google Cloud

Bagian ini menjelaskan cara membuat referensi berikut:

  • Bucket Cloud Storage yang akan digunakan sebagai lokasi penyimpanan sementara dan lokasi output.
  • Container Docker untuk mengemas dependensi dan kode pipeline.

Membuat bucket Cloud Storage

Mulailah dengan membuat bucket Cloud Storage menggunakan Google Cloud CLI. Bucket ini digunakan sebagai lokasi penyimpanan sementara oleh pipeline Dataflow.

Untuk membuat bucket, gunakan perintah gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Ganti kode berikut:

  • BUCKET_NAME: nama untuk bucket Cloud Storage yang memenuhi persyaratan penamaan bucket. Nama bucket Cloud Storage harus unik secara global.
  • LOCATION: lokasi bucket.

Membuat dan membangun image container

Anda dapat menyesuaikan Dockerfile dari tutorial ini. File awal terlihat seperti berikut:

FROM apache/beam_python3.9_sdk:2.46.0

# Install a C++ library.
RUN apt-get update
RUN apt-get install -y libgmp3-dev

# Install Python dependencies.
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

# Install the code and some python bindings.
COPY pipeline.py pipeline.py
COPY collatz.pyx collatz.pyx
COPY setup.py setup.py
RUN python setup.py install

Dockerfile ini berisi perintah FROM, COPY, dan RUN, yang dapat Anda baca di referensi Dockerfile.

  1. Untuk mengupload artefak, buat repositori Artifact Registry. Setiap repositori dapat berisi artefak untuk satu format yang didukung.

    Semua konten repositori dienkripsi menggunakan kunci enkripsi yang dikelola Google atau yang dikelola pelanggan. Artifact Registry menggunakan kunci enkripsi yang dikelola Google secara default dan tidak diperlukan konfigurasi untuk opsi ini.

    Anda harus memiliki setidaknya akses Artifact Registry Writer ke repositori.

    Jalankan perintah berikut untuk membuat repositori baru. Perintah ini menggunakan flag --async dan segera ditampilkan, tanpa menunggu operasi selesai.

    gcloud artifacts repositories create REPOSITORY \
       --repository-format=docker \
       --location=LOCATION \
       --async
    

    Ganti REPOSITORY dengan nama untuk repositori Anda. Untuk setiap lokasi repositori dalam sebuah project, nama repositori harus unik.

  2. Buat Dockerfile.

    Agar paket menjadi bagian dari container Apache Beam, Anda harus menentukannya sebagai bagian dari file requirements.txt. Pastikan Anda tidak menentukan apache-beam sebagai bagian dari file requirements.txt. Penampung Apache Beam sudah memiliki apache-beam.

  3. Sebelum Anda dapat mengirim atau mengambil image, konfigurasikan Docker untuk mengautentikasi permintaan Artifact Registry. Untuk menyiapkan autentikasi ke repositori Docker, jalankan perintah berikut:

    gcloud auth configure-docker LOCATION-docker.pkg.dev
    

    Perintah ini memperbarui konfigurasi Docker Anda. Sekarang Anda dapat terhubung dengan Artifact Registry di project Google Cloud untuk mengirim image.

  4. Bangun image Docker menggunakan Dockerfile dengan Cloud Build.

    Perbarui jalur di perintah berikut agar cocok dengan Dockerfile yang Anda buat. Perintah ini membangun file dan mengirimkannya ke repositori Artifact Registry Anda.

    gcloud builds submit --tag LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest .
    

Mengemas kode dan dependensi dalam container Docker

  1. Untuk menjalankan pipeline ini di lingkungan terdistribusi, kemas kode dan dependensi ke dalam container docker.

    docker build . -t cpp_beam_container
    
  2. Setelah mengemas kode dan dependensi, Anda dapat menjalankan pipeline secara lokal untuk mengujinya.

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container"
    

    Perintah ini menulis output di dalam image Docker. Untuk melihat output, jalankan pipeline dengan --output, lalu tulis output ke bucket Cloud Storage. Misalnya, jalankan perintah berikut.

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container" \
       --output=gs://BUCKET_NAME/out.png
    

Menjalankan pipeline

Sekarang Anda dapat menjalankan pipeline Apache Beam di Dataflow dengan merujuk pada file yang berisi kode pipeline dan meneruskan parameter yang diperlukan oleh pipeline.

Di shell atau terminal, jalankan pipeline dengan Dataflow Runner.

python pipeline.py \
    --runner=DataflowRunner \
    --project=PROJECT_ID \
    --region=REGION \
    --temp_location=gs://BUCKET_NAME/tmp \
    --sdk_container_image="LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest" \
    --experiment=use_runner_v2 \
    --output=gs://BUCKET_NAME/out.png

Setelah Anda menjalankan perintah untuk menjalankan pipeline, Dataflow akan menampilkan ID Tugas dengan status tugas Queued. Mungkin perlu waktu beberapa menit sebelum status tugas mencapai Running dan Anda dapat mengakses grafik tugas.

Lihat hasil Anda

Melihat data yang ditulis ke bucket Cloud Storage Anda. Gunakan perintah gcloud storage ls untuk menampilkan konten di tingkat teratas bucket Anda:

gcloud storage ls gs://BUCKET_NAME

Jika berhasil, perintah menampilkan pesan yang mirip dengan:

gs://BUCKET_NAME/out.png

Pembersihan

Agar tidak perlu membayar biaya pada akun Google Cloud Anda untuk resource yang digunakan dalam tutorial ini, hapus project yang berisi resource tersebut, atau simpan project dan hapus setiap resource.

Menghapus project

Cara termudah untuk menghilangkan penagihan adalah dengan menghapus project Google Cloud yang Anda buat untuk tutorial.

  1. Di konsol Google Cloud, buka halaman Manage resource.

    Buka Manage resource

  2. Pada daftar project, pilih project yang ingin Anda hapus, lalu klik Delete.
  3. Pada dialog, ketik project ID, lalu klik Shut down untuk menghapus project.

Menghapus resource satu per satu

Jika Anda ingin menggunakan kembali project tersebut, hapus resource yang Anda buat untuk tutorial.

Membersihkan resource project Google Cloud

  1. Hapus repositori Artifact Registry.

    gcloud artifacts repositories delete REPOSITORY \
       --location=LOCATION --async
    
  2. Hapus bucket Cloud Storage. Bucket ini saja tidak dikenai biaya apa pun.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Mencabut kredensial

  1. Cabut peran yang Anda berikan ke akun layanan pekerja yang dikelola pengguna. Jalankan perintah berikut sekali untuk setiap peran IAM berikut:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    • roles/artifactregistry.reader
    gcloud projects remove-iam-policy-binding PROJECT_ID \
      --member=serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com \
      --role=SERVICE_ACCOUNT_ROLE
  2. Opsional: Cabut kredensial autentikasi yang Anda buat, dan hapus file kredensial lokal.

    gcloud auth application-default revoke
  3. Opsional: Cabut kredensial dari gcloud CLI.

    gcloud auth revoke

Langkah selanjutnya