C++ 라이브러리에서 커스텀 컨테이너 사용하기


이 튜토리얼에서는 C++ 라이브러리와 커스텀 컨테이너를 사용하여 Dataflow HPC 높은 병렬 워크플로를 실행하는 파이프라인을 만듭니다. 이 튜토리얼에서는 Dataflow 및 Apache Beam을 사용하여 여러 코어에서 실행되는 함수에 데이터를 배포해야 하는 그리드 컴퓨팅 애플리케이션을 실행하는 방법을 알아봅니다.

이 튜토리얼에서는 먼저 Direct Runner를 사용한 다음 Dataflow Runner를 사용하여 파이프라인을 실행하는 방법을 보여줍니다. 파이프라인을 로컬에서 실행함으로써 파이프라인을 배포하기 전에 테스트할 수 있습니다.

이 예에서는 GMP 라이브러리Cython 바인딩과 함수를 사용합니다. 사용하는 라이브러리 또는 바인딩 도구에 관계없이 파이프라인에 동일한 원칙을 적용할 수 있습니다.

예시 코드는 GitHub에서 제공됩니다.

목표

  • C++ 라이브러리와 함께 커스텀 컨테이너를 사용하는 파이프라인을 만듭니다.

  • Dockerfile을 사용하여 Docker 컨테이너 이미지 빌드하기

  • 코드 및 종속 항목을 Docker 컨테이너에 패키징하기

  • 파이프라인을 로컬에서 실행하여 테스트하기

  • 분산 환경에서 파이프라인 실행하기

비용

이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

  • Artifact Registry
  • Cloud Build
  • Cloud Storage
  • Compute Engine
  • Dataflow

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

이 문서에 설명된 태스크를 완료했으면 만든 리소스를 삭제하여 청구가 계속되는 것을 방지할 수 있습니다. 자세한 내용은 삭제를 참조하세요.

시작하기 전에

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. 새 파이프라인의 사용자 관리형 작업자 서비스 계정을 만들고 서비스 계정에 필요한 역할을 부여합니다.

    1. 서비스 계정을 만들려면 gcloud iam service-accounts create 명령어를 실행합니다.

      gcloud iam service-accounts create parallelpipeline \
          --description="Highly parallel pipeline worker service account" \
          --display-name="Highly parallel data pipeline access"
    2. 서비스 계정에 역할을 부여합니다. 다음 IAM 역할마다 다음 명령어를 1회 실행합니다.

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/storage.objectAdmin
      • roles/artifactregistry.reader
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      SERVICE_ACCOUNT_ROLE을 각 개별 역할로 바꿉니다.

    3. Google 계정에 서비스 계정에 대해 액세스 토큰을 만들 수 있게 해주는 역할을 부여합니다.

      gcloud iam service-accounts add-iam-policy-binding parallelpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator

코드 샘플 다운로드 및 디렉터리 변경

코드 샘플을 다운로드한 후 디렉터리를 변경합니다. GitHub 저장소의 코드 샘플은 이 파이프라인을 실행하는 데 필요한 모든 코드를 제공합니다. 자체 파이프라인을 빌드할 준비가 되었으면 이 샘플 코드를 템플릿으로 사용할 수 있습니다.

beam-cpp-example 저장소를 클론합니다.

  1. git clone 명령어를 사용하여 GitHub 저장소를 클론합니다.

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. 애플리케이션 디렉터리로 전환합니다.

    cd dataflow-sample-applications/beam-cpp-example
    

파이프라인 코드

이 튜토리얼에서 파이프라인 코드를 맞춤설정할 수 있습니다. 이 파이프라인은 다음 작업을 완료합니다.

  • 입력 범위의 모든 정수를 동적으로 생성합니다.
  • C++ 함수를 통해 정수를 실행하고 잘못된 값을 필터링합니다.
  • 잘못된 값을 부채널에 씁니다.
  • 각 중지 시간의 발생 횟수를 세고 결과를 정규화합니다.
  • 출력을 출력하고 결과를 형식 지정하여 텍스트 파일에 씁니다.
  • 단일 요소로 PCollection을 만듭니다.
  • map 함수로 단일 요소를 처리하고 주파수 PCollection을 부차 입력으로 전달합니다.
  • PCollection을 처리하고 단일 출력을 생성합니다.

시작 파일은 다음과 같습니다.

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import argparse
import logging
import os
import sys


def run(argv):
  # Import here to avoid __main__ session pickling issues.
  import io
  import itertools
  import matplotlib.pyplot as plt
  import collatz

  import apache_beam as beam
  from apache_beam.io import restriction_trackers
  from apache_beam.options.pipeline_options import PipelineOptions

  class RangeSdf(beam.DoFn, beam.RestrictionProvider):
    """An SDF producing all the integers in the input range.

    This is preferable to beam.Create(range(...)) as it produces the integers
    dynamically rather than materializing them up front.  It is an SDF to do
    so with perfect dynamic sharding.
    """
    def initial_restriction(self, desired_range):
      start, stop = desired_range
      return restriction_trackers.OffsetRange(start, stop)

    def restriction_size(self, _, restriction):
      return restriction.size()

    def create_tracker(self, restriction):
      return restriction_trackers.OffsetRestrictionTracker(restriction)

    def process(self, _, active_range=beam.DoFn.RestrictionParam()):
      for i in itertools.count(active_range.current_restriction().start):
        if active_range.try_claim(i):
          yield i
        else:
          break

  class GenerateIntegers(beam.PTransform):
    def __init__(self, start, stop):
      self._start = start
      self._stop = stop

    def expand(self, p):
      return (
          p
          | beam.Create([(self._start, self._stop + 1)])
          | beam.ParDo(RangeSdf()))

  parser = argparse.ArgumentParser()
  parser.add_argument('--start', dest='start', type=int, default=1)
  parser.add_argument('--stop', dest='stop', type=int, default=10000)
  parser.add_argument('--output', default='./out.png')

  known_args, pipeline_args = parser.parse_known_args(argv)
  # Store this as a local to avoid capturing the full known_args.
  output_path = known_args.output

  with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:

    # Generate the integers from start to stop (inclusive).
    integers = p | GenerateIntegers(known_args.start, known_args.stop)

    # Run them through our C++ function, filtering bad records.
    # Requires apache beam 2.34 or later.
    stopping_times, bad_values = (
        integers
        | beam.Map(collatz.total_stopping_time).with_exception_handling(
            use_subprocess=True))

    # Write the bad values to a side channel.
    bad_values | 'WriteBadValues' >> beam.io.WriteToText(
        os.path.splitext(output_path)[0] + '-bad.txt')

    # Count the occurrence of each stopping time and normalize.
    total = known_args.stop - known_args.start + 1
    frequencies = (
        stopping_times
        | 'Aggregate' >> (beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum))
        | 'Normalize' >> beam.MapTuple(lambda x, count: (x, count / total)))

    if known_args.stop <= 10:
      # Print out the results for debugging.
      frequencies | beam.Map(print)
    else:
      # Format and write them to a text file.
      (
          frequencies
          | 'Format' >> beam.MapTuple(lambda count, freq: f'{count}, {freq}')
          | beam.io.WriteToText(os.path.splitext(output_path)[0] + '.txt'))

    # Define some helper functions.
    def make_scatter_plot(xy):
      x, y = zip(*xy)
      plt.plot(x, y, '.')
      png_bytes = io.BytesIO()
      plt.savefig(png_bytes, format='png')
      png_bytes.seek(0)
      return png_bytes.read()

    def write_to_path(path, content):
      """Most Beam IOs write multiple elements to some kind of a container
      file (e.g. strings to lines of a text file, avro records to an avro file,
      etc.)  This function writes each element to its own file, given by path.
      """
      # Write to a temporary path and to a rename for fault tolerence.
      tmp_path = path + '.tmp'
      fs = beam.io.filesystems.FileSystems.get_filesystem(path)
      with fs.create(tmp_path) as fout:
        fout.write(content)
      fs.rename([tmp_path], [path])

    (
        p
        # Create a PCollection with a single element.
        | 'CreateSingleton' >> beam.Create([None])
        # Process the single element with a Map function, passing the frequency
        # PCollection as a side input.
        # This will cause the normally distributed frequency PCollection to be
        # colocated and processed as a single unit, producing a single output.
        | 'MakePlot' >> beam.Map(
            lambda _,
            data: make_scatter_plot(data),
            data=beam.pvalue.AsList(frequencies))
        # Pair this with the desired filename.
        |
        'PairWithFilename' >> beam.Map(lambda content: (output_path, content))
        # And actually write it out, using MapTuple to split the tuple into args.
        | 'WriteToOutput' >> beam.MapTuple(write_to_path))


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run(sys.argv)

개발 환경 설정

  1. Python용 Apache Beam SDK를 사용합니다.

  2. GMP 라이브러리를 설치합니다.

    apt-get install libgmp3-dev
    
  3. 종속 항목을 설치하려면 requirements.txt 파일을 사용하세요.

    pip install -r requirements.txt
    
  4. Python 바인딩을 빌드하려면 다음 명령어를 실행합니다.

    python setup.py build_ext --inplace
    

이 튜토리얼에서 requirements.txt을 맞춤설정할 수 있습니다. 시작 파일에는 다음과 같은 종속 항목이 포함되어 있습니다.

#
#    Licensed to the Apache Software Foundation (ASF) under one or more
#    contributor license agreements.  See the NOTICE file distributed with
#    this work for additional information regarding copyright ownership.
#    The ASF licenses this file to You under the Apache License, Version 2.0
#    (the "License"); you may not use this file except in compliance with
#    the License.  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

apache-beam[gcp]==2.46.0
cython==0.29.24
pyparsing==2.4.2
matplotlib==3.4.3

로컬에서 파이프라인 실행

파이프라인을 로컬에서 실행하면 테스트에 유용합니다. 파이프라인을 로컬에서 실행하면 파이프라인을 분산 환경에 배포하기 전에 파이프라인이 실행되고 예상대로 작동하는지 확인할 수 있습니다.

다음 명령어를 사용하여 파이프라인을 로컬에서 실행할 수 있습니다. 이 명령어는 out.png라는 이미지를 출력합니다.

python pipeline.py

Google Cloud 리소스 만들기

이 섹션에서는 다음을 만드는 방법을 설명합니다.

  • 임시 스토리지 위치 및 출력 위치로 사용할 Cloud Storage 버킷
  • 파이프라인 코드 및 종속 항목을 패키징하는 Docker 컨테이너

Cloud Storage 버킷 만들기

Google Cloud CLI를 사용하여 Cloud Storage 버킷을 만드는 것으로 시작합니다. 이 버킷은 Dataflow 파이프라인에서 임시 스토리지 위치로 사용됩니다.

버킷을 만들기 위해 gcloud storage buckets create 명령어를 사용합니다.

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

다음을 바꿉니다.

  • BUCKET_NAME: 버킷 이름 지정 요구사항을 충족하는 Cloud Storage 버킷의 이름입니다. Cloud Storage 버킷 이름은 전역에서 고유해야 합니다.
  • LOCATION: 버킷의 위치입니다.

컨테이너 이미지 만들기 및 빌드

이 튜토리얼에서 Dockerfile을 맞춤설정할 수 있습니다. 시작 파일은 다음과 같습니다.

FROM apache/beam_python3.9_sdk:2.46.0

# Install a C++ library.
RUN apt-get update
RUN apt-get install -y libgmp3-dev

# Install Python dependencies.
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

# Install the code and some python bindings.
COPY pipeline.py pipeline.py
COPY collatz.pyx collatz.pyx
COPY setup.py setup.py
RUN python setup.py install

이 Dockerfile에는 FROM, COPY, RUN 명령어가 포함되어 있습니다. 이에 대한 자세한 내용은 Dockerfile 참조를 확인하세요.

  1. 아티팩트를 업로드하려면 Artifact Registry 저장소를 만듭니다. 저장소마다 지원되는 단일 형식의 아티팩트가 포함될 수 있습니다.

    모든 저장소 콘텐츠는 Google 소유 및 Google 관리 키 또는 고객 관리 암호화 키를 통해 암호화됩니다. Artifact Registry는 기본적으로 Google 소유 및 Google 관리 키를 사용하며 이 옵션을 구성할 필요가 없습니다.

    저장소에 최소한 Artifact Registry 작성자 액세스 권한이 있어야 합니다.

    다음 명령어를 실행하여 새 저장소를 만듭니다. 이 명령어는 진행 중인 작업이 완료될 때까지 기다리지 않고 --async 플래그를 사용하여 즉시 반환합니다.

    gcloud artifacts repositories create REPOSITORY \
       --repository-format=docker \
       --location=LOCATION \
       --async
    

    REPOSITORY를 저장소 이름으로 바꿉니다. 프로젝트의 저장소 위치마다 저장소 이름이 고유해야 합니다.

  2. Dockerfile 만들기.

    패키지를 Beam 컨테이너에 포함하기 위해서는 requirements.txt 파일의 일부로 지정해야 합니다. requirements.txt 파일의 일부로 apache-beam을 지정하지 않은 것을 확인하세요. Apache Beam 컨테이너에는 이미 apache-beam이 있습니다.

  3. 이미지를 내보내거나 가져오려면 먼저 Artifact Registry 요청을 인증하도록 Docker를 구성해야 합니다. Docker 저장소에 인증을 설정하려면 다음 명령어를 실행합니다.

    gcloud auth configure-docker LOCATION-docker.pkg.dev
    

    이 명령어는 Docker 구성을 업데이트합니다. 이제 Google Cloud 프로젝트의 Artifact Registry와 연결하여 이미지를 내보낼 수 있습니다.

  4. Cloud Build에서 Dockerfile을 사용하여 Docker 이미지를 빌드합니다.

    생성한 Dockerfile과 일치하도록 다음 명령어의 경로를 업데이트합니다. 이 명령어는 파일을 빌드하여 Artifact Registry 저장소에 푸시합니다.

    gcloud builds submit --tag LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest .
    

코드 및 종속 항목을 Docker 컨테이너에 패키징

  1. 분산 환경에서 이 파이프라인을 실행하려면 코드와 종속 항목을 Docker 컨테이너에 패키징합니다.

    docker build . -t cpp_beam_container
    
  2. 코드와 종속 항목을 패키징한 후 로컬에서 파이프라인을 실행하여 테스트할 수 있습니다.

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container"
    

    이 명령어는 Docker 이미지 내에 출력을 씁니다. 출력을 확인하려면 --output으로 파이프라인을 실행하고 Cloud Storage 버킷에 출력을 씁니다. 예를 들어 다음 명령어를 실행합니다.

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container" \
       --output=gs://BUCKET_NAME/out.png
    

파이프라인 실행하기

이제 파이프라인 코드가 포함된 파일을 참조하고 파이프라인에 필요한 매개변수를 전달하여 Dataflow에서 Apache Beam 파이프라인을 실행할 수 있습니다.

셸 또는 터미널에서 Dataflow 실행기로 파이프라인을 실행합니다.

python pipeline.py \
    --runner=DataflowRunner \
    --project=PROJECT_ID \
    --region=REGION \
    --temp_location=gs://BUCKET_NAME/tmp \
    --sdk_container_image="LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest" \
    --experiment=use_runner_v2 \
    --output=gs://BUCKET_NAME/out.png

명령어를 실행하여 파이프라인을 실행하면 Dataflow는 작업 상태가 대기 중인 작업 ID를 반환합니다. 작업 상태가 실행 중이 되기까지 몇 분 정도 걸릴 수 있으며 작업 그래프에 액세스할 수 있습니다.

결과 보기

Cloud Storage 버킷에 작성된 데이터를 확인합니다. gcloud storage ls 명령어를 사용하여 버킷의 최상위 수준에 있는 콘텐츠를 나열합니다.

gcloud storage ls gs://BUCKET_NAME

완료하면 다음과 유사한 메시지가 반환됩니다.

gs://BUCKET_NAME/out.png

삭제

이 튜토리얼에서 사용된 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 리소스가 포함된 프로젝트를 삭제하거나 프로젝트를 유지하고 개별 리소스를 삭제하세요.

프로젝트 삭제

비용이 청구되지 않도록 하는 가장 쉬운 방법은 튜토리얼에서 만든 Google Cloud 프로젝트를 삭제하는 것입니다.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

개별 리소스 삭제

프로젝트를 재사용하려면 튜토리얼용으로 만든 리소스를 삭제합니다.

Google Cloud 프로젝트 리소스 삭제

  1. Artifact Registry 저장소를 삭제합니다.

    gcloud artifacts repositories delete REPOSITORY \
       --location=LOCATION --async
    
  2. Cloud Storage 버킷을 삭제합니다. 버킷만으로는 비용이 발생하지 않습니다.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

사용자 인증 정보 취소

  1. 사용자 관리 작업자 서비스 계정에 부여한 역할을 취소합니다. 다음 IAM 역할마다 다음 명령어를 1회 실행합니다.

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    • roles/artifactregistry.reader
    gcloud projects remove-iam-policy-binding PROJECT_ID \
      --member=serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com \
      --role=SERVICE_ACCOUNT_ROLE
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

다음 단계