C++ ライブラリでカスタム コンテナを使用する


このチュートリアルでは、C++ ライブラリでカスタム コンテナを使用して、Dataflow HPC の高度な並列ワークフローを実行するパイプラインを作成します。このチュートリアルでは、Dataflow と Apache Beam を使用して、データを多くのコアで実行される関数に分散させるグリッド コンピューティング アプリケーションを実行する方法について説明します。

このチュートリアルでは、まず、Direct Runner を使用してパイプラインを実行する方法を説明します。次に、Dataflow Runner を使用してパイプラインを実行する方法を説明します。パイプラインをローカルで実行することで、デプロイする前にパイプラインをテストできます。

この例では、Cython のバインディングと、GMP ライブラリの関数を使用しています。使用するライブラリやバインディング ツールに関係なく、同じ原則をパイプラインに適用できます。

サンプルコードは GitHub で入手できます。

目標

  • C++ ライブラリでカスタム コンテナを使用するパイプラインを作成する。

  • Dockerfile を使用して Docker コンテナ イメージをビルドする。

  • コードと依存関係を Docker コンテナにパッケージ化する。

  • パイプラインをローカルで実行してテストする。

  • 分散環境でパイプラインを実行する。

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

  • Artifact Registry
  • Cloud Build
  • Cloud Storage
  • Compute Engine
  • Dataflow

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. 新しいパイプライン用にユーザー管理のワーカー サービス アカウントを作成し、このサービス アカウントに必要なロールを付与します。

    1. サービス アカウントを作成するには、gcloud iam service-accounts create コマンドを実行します。

      gcloud iam service-accounts create parallelpipeline \
          --description="Highly parallel pipeline worker service account" \
          --display-name="Highly parallel data pipeline access"
    2. サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/storage.objectAdmin
      • roles/artifactregistry.reader
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      SERVICE_ACCOUNT_ROLE は、個々のロールに置き換えます。

    3. Google アカウントに、サービス アカウントのアクセス トークンを作成できるロールを付与します。

      gcloud iam service-accounts add-iam-policy-binding parallelpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator

コードサンプルをダウンロードしてディレクトリを変更する

コードサンプルをダウンロードして、ディレクトリを変更します。GitHub リポジトリのコードサンプルには、このパイプラインの実行に必要なすべてのコードが含まれています。独自のパイプラインを構築する準備ができたら、このサンプルコードをテンプレートとして使用できます。

beam-cpp-example リポジトリのクローンを作成します。

  1. git clone コマンドを使用して GitHub リポジトリのクローンを作成します。

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. アプリケーション ディレクトリに切り替えます。

    cd dataflow-sample-applications/beam-cpp-example
    

パイプライン コード

このチュートリアルのパイプライン コードをカスタマイズできます。このパイプラインは次のタスクを完了します。

  • 入力範囲内のすべての整数を動的に生成します。
  • C++ 関数で整数を処理し、不正な値をフィルタリングします。
  • 無効な値をサイドチャネルに書き込みます。
  • 停止時間が発生するごとにカウントし、結果を正規化します。
  • 出力を生成します。書式設定を行い、結果をテキスト ファイルに書き込みます。
  • 単一の要素で PCollection を作成します。
  • map 関数で単一の要素を処理し、副入力として頻度 PCollection を渡します。
  • PCollection を処理し、単一の出力を生成します。

スターター ファイルは、次のようになります。

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import argparse
import logging
import os
import sys


def run(argv):
  # Import here to avoid __main__ session pickling issues.
  import io
  import itertools
  import matplotlib.pyplot as plt
  import collatz

  import apache_beam as beam
  from apache_beam.io import restriction_trackers
  from apache_beam.options.pipeline_options import PipelineOptions

  class RangeSdf(beam.DoFn, beam.RestrictionProvider):
    """An SDF producing all the integers in the input range.

    This is preferable to beam.Create(range(...)) as it produces the integers
    dynamically rather than materializing them up front.  It is an SDF to do
    so with perfect dynamic sharding.
    """
    def initial_restriction(self, desired_range):
      start, stop = desired_range
      return restriction_trackers.OffsetRange(start, stop)

    def restriction_size(self, _, restriction):
      return restriction.size()

    def create_tracker(self, restriction):
      return restriction_trackers.OffsetRestrictionTracker(restriction)

    def process(self, _, active_range=beam.DoFn.RestrictionParam()):
      for i in itertools.count(active_range.current_restriction().start):
        if active_range.try_claim(i):
          yield i
        else:
          break

  class GenerateIntegers(beam.PTransform):
    def __init__(self, start, stop):
      self._start = start
      self._stop = stop

    def expand(self, p):
      return (
          p
          | beam.Create([(self._start, self._stop + 1)])
          | beam.ParDo(RangeSdf()))

  parser = argparse.ArgumentParser()
  parser.add_argument('--start', dest='start', type=int, default=1)
  parser.add_argument('--stop', dest='stop', type=int, default=10000)
  parser.add_argument('--output', default='./out.png')

  known_args, pipeline_args = parser.parse_known_args(argv)
  # Store this as a local to avoid capturing the full known_args.
  output_path = known_args.output

  with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:

    # Generate the integers from start to stop (inclusive).
    integers = p | GenerateIntegers(known_args.start, known_args.stop)

    # Run them through our C++ function, filtering bad records.
    # Requires apache beam 2.34 or later.
    stopping_times, bad_values = (
        integers
        | beam.Map(collatz.total_stopping_time).with_exception_handling(
            use_subprocess=True))

    # Write the bad values to a side channel.
    bad_values | 'WriteBadValues' >> beam.io.WriteToText(
        os.path.splitext(output_path)[0] + '-bad.txt')

    # Count the occurrence of each stopping time and normalize.
    total = known_args.stop - known_args.start + 1
    frequencies = (
        stopping_times
        | 'Aggregate' >> (beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum))
        | 'Normalize' >> beam.MapTuple(lambda x, count: (x, count / total)))

    if known_args.stop <= 10:
      # Print out the results for debugging.
      frequencies | beam.Map(print)
    else:
      # Format and write them to a text file.
      (
          frequencies
          | 'Format' >> beam.MapTuple(lambda count, freq: f'{count}, {freq}')
          | beam.io.WriteToText(os.path.splitext(output_path)[0] + '.txt'))

    # Define some helper functions.
    def make_scatter_plot(xy):
      x, y = zip(*xy)
      plt.plot(x, y, '.')
      png_bytes = io.BytesIO()
      plt.savefig(png_bytes, format='png')
      png_bytes.seek(0)
      return png_bytes.read()

    def write_to_path(path, content):
      """Most Beam IOs write multiple elements to some kind of a container
      file (e.g. strings to lines of a text file, avro records to an avro file,
      etc.)  This function writes each element to its own file, given by path.
      """
      # Write to a temporary path and to a rename for fault tolerence.
      tmp_path = path + '.tmp'
      fs = beam.io.filesystems.FileSystems.get_filesystem(path)
      with fs.create(tmp_path) as fout:
        fout.write(content)
      fs.rename([tmp_path], [path])

    (
        p
        # Create a PCollection with a single element.
        | 'CreateSingleton' >> beam.Create([None])
        # Process the single element with a Map function, passing the frequency
        # PCollection as a side input.
        # This will cause the normally distributed frequency PCollection to be
        # colocated and processed as a single unit, producing a single output.
        | 'MakePlot' >> beam.Map(
            lambda _,
            data: make_scatter_plot(data),
            data=beam.pvalue.AsList(frequencies))
        # Pair this with the desired filename.
        |
        'PairWithFilename' >> beam.Map(lambda content: (output_path, content))
        # And actually write it out, using MapTuple to split the tuple into args.
        | 'WriteToOutput' >> beam.MapTuple(write_to_path))


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run(sys.argv)

開発環境を設定する

  1. Python の場合は Apache Beam SDK を使用します。

  2. GMP ライブラリをインストールします。

    apt-get install libgmp3-dev
    
  3. 依存関係をインストールするには、requirements.txt ファイルを使用します。

    pip install -r requirements.txt
    
  4. Python バインディングをビルドするには、次のコマンドを実行します。

    python setup.py build_ext --inplace
    

このチュートリアルの requirements.txt ファイルをカスタマイズできます。スターター ファイルには、次の依存関係が含まれています。

#
#    Licensed to the Apache Software Foundation (ASF) under one or more
#    contributor license agreements.  See the NOTICE file distributed with
#    this work for additional information regarding copyright ownership.
#    The ASF licenses this file to You under the Apache License, Version 2.0
#    (the "License"); you may not use this file except in compliance with
#    the License.  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

apache-beam[gcp]==2.46.0
cython==0.29.24
pyparsing==2.4.2
matplotlib==3.4.3

パイプラインをローカルで実行する

パイプラインをローカルで実行すると、テストが容易になります。パイプラインをローカルで実行すると、パイプラインを分散環境にデプロイする前に、パイプラインが想定どおりに動作していることを確認できます。

パイプラインをローカルで実行するには、次のコマンドを使用します。このコマンドは、out.png という名前のイメージを出力します。

python pipeline.py

Google Cloud リソースを作成する

このセクションでは、次のリソースを作成します。

  • 一時的な保存場所と出力場所として使用する Cloud Storage バケット。
  • パイプライン コードと依存関係をパッケージ化する Docker コンテナ。

Cloud Storage バケットを作成する

まず、Google Cloud CLI を使用して Cloud Storage バケットを作成します。このバケットは、Dataflow パイプラインによって一時ストレージの場所として使用されます。

バケットを作成するには、gcloud storage buckets create コマンドを使用します。

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

次のように置き換えます。

  • BUCKET_NAME: バケットの命名要件を満たす Cloud Storage バケットの名前。Cloud Storage のバケット名は、グローバルに一意である必要があります。
  • LOCATION: バケットのロケーション

コンテナ イメージを作成してビルドする

このチュートリアルの Dockerfile をカスタマイズできます。スターター ファイルは、次のようになります。

FROM apache/beam_python3.9_sdk:2.46.0

# Install a C++ library.
RUN apt-get update
RUN apt-get install -y libgmp3-dev

# Install Python dependencies.
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

# Install the code and some python bindings.
COPY pipeline.py pipeline.py
COPY collatz.pyx collatz.pyx
COPY setup.py setup.py
RUN python setup.py install

この Dockerfile には、FROMCOPYRUN の各コマンドが含まれています。これらのコマンドについては、Dockerfile リファレンスをご覧ください。

  1. アーティファクトをアップロードするには、Artifact Registry リポジトリを作成します。各リポジトリには、サポートされている単一の形式のアーティファクトを含めることができます。

    リポジトリのコンテンツはすべて、Google が所有し Google が管理する鍵か、顧客管理の暗号鍵を使用して暗号化されます。Artifact Registry では Google が所有し Google が管理する鍵がデフォルトで使用されるため、このオプションの構成は不要です。

    ユーザーには少なくとも、リポジトリに対する Artifact Registry 書き込みアクセス権が必要です。

    次のコマンドを実行して新しいリポジトリを作成します。このコマンドは --async フラグを使用します。実行中のオペレーションの完了を待たずにすぐに戻ります。

    gcloud artifacts repositories create REPOSITORY \
       --repository-format=docker \
       --location=LOCATION \
       --async
    

    REPOSITORY は、リポジトリの名前に置き換えます。プロジェクト内のリポジトリのロケーションごとに、リポジトリ名は一意でなければなりません。

  2. Dockerfile を作成します。

    パッケージを Apache Beam コンテナに含めるには、パッケージを requirements.txt ファイルの一部として指定する必要があります。requirements.txt ファイルの一部として apache-beam を指定しないでください。Apache Beam コンテナには、すでに apache-beam があります。

  3. イメージを push または pull する前に、Artifact Registry に対するリクエストを認証するように Docker を構成します。Docker リポジトリの認証を設定するには、次のコマンドを実行します。

    gcloud auth configure-docker LOCATION-docker.pkg.dev
    

    このコマンドにより、Docker 構成が更新されます。これで、Google Cloud プロジェクトの Artifact Registry に接続して、イメージを push できるようになりました。

  4. Cloud Build で Dockerfile を使用して、Docker イメージをビルドします。

    作成した Dockerfile と一致するように、次のコマンドのパスを更新します。このコマンドは、ファイルをビルドして Artifact Registry リポジトリに push します。

    gcloud builds submit --tag LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest .
    

コードと依存関係を Docker コンテナにパッケージ化する

  1. このパイプラインを分散環境で実行するには、コードと依存関係を Docker コンテナにパッケージ化します。

    docker build . -t cpp_beam_container
    
  2. コードと依存関係をパッケージ化したら、パイプラインをローカルで実行してテストできます。

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container"
    

    このコマンドは、Docker イメージ内に出力を書き込みます。出力を表示するには、--output を使用してパイプラインを実行し、出力を Cloud Storage バケットに書き込みます。たとえば、次のコマンドを実行します。

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container" \
       --output=gs://BUCKET_NAME/out.png
    

パイプラインを実行する

次に、Dataflow で Apache Beam パイプラインを実行します。それには、パイプライン コードを含むファイルを参照して、パイプラインに必要なパラメータを渡します。

シェルまたはターミナルで、Dataflow ランナーを使用してパイプラインを実行します。

python pipeline.py \
    --runner=DataflowRunner \
    --project=PROJECT_ID \
    --region=REGION \
    --temp_location=gs://BUCKET_NAME/tmp \
    --sdk_container_image="LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest" \
    --experiment=use_runner_v2 \
    --output=gs://BUCKET_NAME/out.png

コマンドでパイプラインを実行すると、Dataflow はジョブ ID とジョブ ステータス(Queued)を返します。ジョブのステータスが「Running」に変わり、ジョブグラフにアクセス可能になるまでに数分かかる場合があります。

結果を表示する

Cloud Storage バケットに書き込まれたデータを表示します。gcloud storage ls コマンドを使用して、トップレベルのバケットの内容を一覧表示します。

gcloud storage ls gs://BUCKET_NAME

成功した場合、次のようなメッセージが返されます。

gs://BUCKET_NAME/out.png

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

プロジェクトを削除する

課金を停止する最も簡単な方法は、チュートリアル用に作成した Google Cloud プロジェクトを削除することです。

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

個々のリソースを削除する

プロジェクトを再利用する場合は、チュートリアル用に作成したリソースを削除します。

Google Cloud プロジェクトのリソースをクリーンアップする

  1. Artifact Registry リポジトリを削除します。

    gcloud artifacts repositories delete REPOSITORY \
       --location=LOCATION --async
    
  2. Cloud Storage バケットを削除します。このバケットだけでは料金は発生しません。

    gcloud storage rm gs://BUCKET_NAME --recursive
    

認証情報を取り消す

  1. ユーザー管理のワーカー サービス アカウントに付与したロールを取り消します。次の IAM ロールごとに次のコマンドを 1 回実行します。

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    • roles/artifactregistry.reader
    gcloud projects remove-iam-policy-binding PROJECT_ID \
      --member=serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com \
      --role=SERVICE_ACCOUNT_ROLE
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

次のステップ