Usar contêineres personalizados com bibliotecas C++


Neste tutorial, você cria um pipeline que usa contêineres personalizados com bibliotecas C++ para executar um fluxo de trabalho altamente paralelo do HPC do Dataflow. Use este tutorial para aprender a usar o Dataflow e o Apache Beam para executar aplicativos de computação em grade que exigem a distribuição de dados para funções em execução em vários núcleos.

Neste tutorial, mostramos como executar o pipeline usando o executor direto e depois o executor do Dataflow. Ao executar o pipeline localmente, é possível testá-lo antes de implantá-lo.

Este exemplo usa vinculações e funções do Cython da biblioteca do GMP (links em inglês). Independentemente da biblioteca ou da ferramenta de vinculação usada, é possível aplicar os mesmos princípios ao pipeline.

O código está disponível no GitHub.

Objetivos

  • Crie um pipeline que use contêineres personalizados com bibliotecas C++.

  • Crie uma imagem de contêiner do Docker usando um Dockerfile.

  • Empacote o código e as dependências em um contêiner do Docker.

  • Execute o pipeline localmente para testá-lo.

  • Execute o pipeline em um ambiente distribuído.

Custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

  • Artifact Registry
  • Cloud Build
  • Cloud Storage
  • Compute Engine
  • Dataflow

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Ao concluir as tarefas descritas neste documento, é possível evitar o faturamento contínuo excluindo os recursos criados. Saiba mais em Limpeza.

Antes de começar

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Criar uma conta de serviço do worker gerenciada pelo usuário para o novo pipeline e conceder os papéis necessários à conta de serviço.

    1. Para criar a conta de serviço, execute o comando gcloud iam service-accounts create.

      gcloud iam service-accounts create parallelpipeline \
          --description="Highly parallel pipeline worker service account" \
          --display-name="Highly parallel data pipeline access"
    2. Conceda papéis à conta de serviço. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/storage.objectAdmin
      • roles/artifactregistry.reader
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      Substitua SERVICE_ACCOUNT_ROLE por cada papel individual.

    3. Conceda à sua Conta do Google uma função que permita criar tokens de acesso para a conta de serviço:

      gcloud iam service-accounts add-iam-policy-binding parallelpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator

Fazer o download do exemplo de código e mudar os diretórios.

Faça o download do exemplo de código e mude os diretórios. As amostras de código no repositório do GitHub fornecem todo o código necessário para executar esse pipeline. Quando você estiver pronto para criar seu próprio pipeline, poderá usar esse código de amostra como modelo.

Clone o repositório beam-cpp-example.

  1. Use o comando git clone para clonar o repositório do GitHub:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. Alterne para o diretório do aplicativo:

    cd dataflow-sample-applications/beam-cpp-example
    

Código do pipeline:

É possível personalizar o código do pipeline deste tutorial. Esse pipeline conclui as seguintes tarefas:

  • Produz dinamicamente todos os números inteiros em um intervalo de entrada.
  • Executa os números inteiros por meio de uma função C++ e filtra valores inválidos.
  • Grava os valores inválidos em um canal lateral.
  • Conta a ocorrência de cada horário de parada e normaliza os resultados.
  • Imprime a saída, formatando e gravando os resultados em um arquivo de texto.
  • Cria um PCollection com um único elemento.
  • Processa o único elemento com uma função map e transmite a frequência PCollection como uma entrada secundária.
  • Processa PCollection e produz uma única saída.

O arquivo inicial tem a seguinte aparência:

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import argparse
import logging
import os
import sys


def run(argv):
  # Import here to avoid __main__ session pickling issues.
  import io
  import itertools
  import matplotlib.pyplot as plt
  import collatz

  import apache_beam as beam
  from apache_beam.io import restriction_trackers
  from apache_beam.options.pipeline_options import PipelineOptions

  class RangeSdf(beam.DoFn, beam.RestrictionProvider):
    """An SDF producing all the integers in the input range.

    This is preferable to beam.Create(range(...)) as it produces the integers
    dynamically rather than materializing them up front.  It is an SDF to do
    so with perfect dynamic sharding.
    """
    def initial_restriction(self, desired_range):
      start, stop = desired_range
      return restriction_trackers.OffsetRange(start, stop)

    def restriction_size(self, _, restriction):
      return restriction.size()

    def create_tracker(self, restriction):
      return restriction_trackers.OffsetRestrictionTracker(restriction)

    def process(self, _, active_range=beam.DoFn.RestrictionParam()):
      for i in itertools.count(active_range.current_restriction().start):
        if active_range.try_claim(i):
          yield i
        else:
          break

  class GenerateIntegers(beam.PTransform):
    def __init__(self, start, stop):
      self._start = start
      self._stop = stop

    def expand(self, p):
      return (
          p
          | beam.Create([(self._start, self._stop + 1)])
          | beam.ParDo(RangeSdf()))

  parser = argparse.ArgumentParser()
  parser.add_argument('--start', dest='start', type=int, default=1)
  parser.add_argument('--stop', dest='stop', type=int, default=10000)
  parser.add_argument('--output', default='./out.png')

  known_args, pipeline_args = parser.parse_known_args(argv)
  # Store this as a local to avoid capturing the full known_args.
  output_path = known_args.output

  with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:

    # Generate the integers from start to stop (inclusive).
    integers = p | GenerateIntegers(known_args.start, known_args.stop)

    # Run them through our C++ function, filtering bad records.
    # Requires apache beam 2.34 or later.
    stopping_times, bad_values = (
        integers
        | beam.Map(collatz.total_stopping_time).with_exception_handling(
            use_subprocess=True))

    # Write the bad values to a side channel.
    bad_values | 'WriteBadValues' >> beam.io.WriteToText(
        os.path.splitext(output_path)[0] + '-bad.txt')

    # Count the occurrence of each stopping time and normalize.
    total = known_args.stop - known_args.start + 1
    frequencies = (
        stopping_times
        | 'Aggregate' >> (beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum))
        | 'Normalize' >> beam.MapTuple(lambda x, count: (x, count / total)))

    if known_args.stop <= 10:
      # Print out the results for debugging.
      frequencies | beam.Map(print)
    else:
      # Format and write them to a text file.
      (
          frequencies
          | 'Format' >> beam.MapTuple(lambda count, freq: f'{count}, {freq}')
          | beam.io.WriteToText(os.path.splitext(output_path)[0] + '.txt'))

    # Define some helper functions.
    def make_scatter_plot(xy):
      x, y = zip(*xy)
      plt.plot(x, y, '.')
      png_bytes = io.BytesIO()
      plt.savefig(png_bytes, format='png')
      png_bytes.seek(0)
      return png_bytes.read()

    def write_to_path(path, content):
      """Most Beam IOs write multiple elements to some kind of a container
      file (e.g. strings to lines of a text file, avro records to an avro file,
      etc.)  This function writes each element to its own file, given by path.
      """
      # Write to a temporary path and to a rename for fault tolerence.
      tmp_path = path + '.tmp'
      fs = beam.io.filesystems.FileSystems.get_filesystem(path)
      with fs.create(tmp_path) as fout:
        fout.write(content)
      fs.rename([tmp_path], [path])

    (
        p
        # Create a PCollection with a single element.
        | 'CreateSingleton' >> beam.Create([None])
        # Process the single element with a Map function, passing the frequency
        # PCollection as a side input.
        # This will cause the normally distributed frequency PCollection to be
        # colocated and processed as a single unit, producing a single output.
        | 'MakePlot' >> beam.Map(
            lambda _,
            data: make_scatter_plot(data),
            data=beam.pvalue.AsList(frequencies))
        # Pair this with the desired filename.
        |
        'PairWithFilename' >> beam.Map(lambda content: (output_path, content))
        # And actually write it out, using MapTuple to split the tuple into args.
        | 'WriteToOutput' >> beam.MapTuple(write_to_path))


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run(sys.argv)

Configurar o ambiente de desenvolvimento

  1. Use o SDK do Apache Beam para Python.

  2. Instale a biblioteca do GMP:

    apt-get install libgmp3-dev
    
  3. Para instalar as dependências, use o arquivo requirements.txt.

    pip install -r requirements.txt
    
  4. Para criar as vinculações do Python, execute o seguinte comando.

    python setup.py build_ext --inplace
    

É possível personalizar o arquivo requirements.txt deste tutorial. O arquivo inicial inclui as seguintes dependências:

#
#    Licensed to the Apache Software Foundation (ASF) under one or more
#    contributor license agreements.  See the NOTICE file distributed with
#    this work for additional information regarding copyright ownership.
#    The ASF licenses this file to You under the Apache License, Version 2.0
#    (the "License"); you may not use this file except in compliance with
#    the License.  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

apache-beam[gcp]==2.46.0
cython==0.29.24
pyparsing==2.4.2
matplotlib==3.4.3

Execute o pipeline localmente

Executar o pipeline localmente é útil para testes. Ao executar o pipeline localmente, é possível confirmar se ele é executado e se comporta conforme o esperado antes de implantá-lo em um ambiente distribuído.

É possível executar o pipeline localmente usando o comando a seguir. Esse comando gera uma imagem chamada out.png.

python pipeline.py

Criar os recursos do Google Cloud

Esta seção explica como criar os seguintes recursos:

  • Um bucket do Cloud Storage a ser usado como um local de armazenamento temporário e um local de saída.
  • Um contêiner do Docker para empacotar o código e as dependências do pipeline.

Crie um bucket do Cloud Storage

Comece criando um bucket do Cloud Storage usando a Google Cloud CLI. Esse bucket é usado como um local de armazenamento temporário pelo pipeline do Dataflow.

Para criar o bucket, use o comando gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Substitua:

  • BUCKET_NAME: um nome para o bucket do Cloud Storage que atende aos requisitos de nomenclatura de bucket. Os nomes dos intervalos do Cloud Storage precisam ser globalmente exclusivos.
  • LOCATION: o local do bucket.

Criar e fazer o build de uma imagem de contêiner

É possível personalizar o Dockerfile deste tutorial. O arquivo inicial tem a seguinte aparência:

FROM apache/beam_python3.9_sdk:2.46.0

# Install a C++ library.
RUN apt-get update
RUN apt-get install -y libgmp3-dev

# Install Python dependencies.
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

# Install the code and some python bindings.
COPY pipeline.py pipeline.py
COPY collatz.pyx collatz.pyx
COPY setup.py setup.py
RUN python setup.py install

Esse Dockerfile contém os comandos FROM, COPY e RUN. Veja mais informações sobre eles na referência do Dockerfile.

  1. Para fazer upload de artefatos, crie um repositório do Artifact Registry. Cada repositório pode conter artefatos para um único formato compatível.

    Todo o conteúdo do repositório é criptografado usando chaves de propriedade do Google e gerenciadas pelo Google ou chaves de criptografia gerenciadas pelo cliente. O Artifact Registry usa chaves de propriedade e gerenciadas pelo Google por padrão, e nenhuma configuração é necessária para essa opção.

    É preciso ter pelo menos o acesso "Gravador do Artifact Registry" no repositório.

    Execute o comando abaixo para criar um novo repositório. O comando usa a sinalização --async e retorna imediatamente, sem aguardar a conclusão da operação.

    gcloud artifacts repositories create REPOSITORY \
       --repository-format=docker \
       --location=LOCATION \
       --async
    

    Substitua REPOSITORY por um nome para o repositório. Para cada local de repositório em um projeto, os nomes dos repositórios precisam ser exclusivos.

  2. Criar o Dockerfile

    Para que os pacotes façam parte do contêiner do Beam, especifique-os como parte do arquivo requirements.txt. Não especifique apache-beam como parte do arquivo requirements.txt. O contêiner do Apache Beam já tem apache-beam.

  3. Antes de enviar ou extrair imagens, configure o Docker para autenticar solicitações para o Artifact Registry. Para configurar a autenticação nos repositórios do Docker, execute o seguinte comando:

    gcloud auth configure-docker LOCATION-docker.pkg.dev
    

    O comando atualiza a configuração do Docker. Agora é possível se conectar ao Artifact Registry no seu projeto do Google Cloud para enviar imagens.

  4. Crie a imagem do Docker usando um Dockerfile com o Cloud Build.

    Atualize o caminho no comando a seguir para corresponder ao Dockerfile que você criou. Esse comando cria o arquivo e o envia para o repositório do Artifact Registry.

    gcloud builds submit --tag LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest .
    

Empacotar o código e as dependências em um contêiner do Docker

  1. Para executar esse pipeline em um ambiente distribuído, empacote o código e as dependências em um contêiner do Docker.

    docker build . -t cpp_beam_container
    
  2. Depois de empacotar o código e as dependências, será possível executar o pipeline localmente para testá-lo.

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container"
    

    Esse comando grava a saída na imagem do Docker. Para ver a saída, execute o pipeline com --output e grave a saída em um bucket do Cloud Storage. Por exemplo, execute o seguinte comando.

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container" \
       --output=gs://BUCKET_NAME/out.png
    

execute o pipeline

Agora é possível executar o pipeline do Apache Beam no Dataflow consultando o arquivo com o código do pipeline e transmitindo os parâmetros exigidos pelo pipeline.

No shell ou terminal, execute o pipeline com o executor do Dataflow.

python pipeline.py \
    --runner=DataflowRunner \
    --project=PROJECT_ID \
    --region=REGION \
    --temp_location=gs://BUCKET_NAME/tmp \
    --sdk_container_image="LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest" \
    --experiment=use_runner_v2 \
    --output=gs://BUCKET_NAME/out.png

Depois de executar o comando para executar o modelo flexível, o Dataflow retorna um ID de job com o status Na fila. Pode levar alguns minutos até que o status do job atinja Em execução e você possa acessar o gráfico de jobs.

Ver os resultados

Veja os dados gravados no bucket do Cloud Storage. Use o comando gcloud storage ls para listar o conteúdo no nível superior do bucket:

gcloud storage ls gs://BUCKET_NAME

Se o comando estiver correto, ele retornará uma mensagem como esta:

gs://BUCKET_NAME/out.png

Limpar

Para evitar cobranças na sua conta do Google Cloud pelos recursos usados no tutorial, exclua o projeto que os contém ou mantenha o projeto e exclua os recursos individuais.

Excluir o projeto

A maneira mais fácil de eliminar o faturamento é excluir o projeto do Google Cloud que você criou para o tutorial.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Excluir recursos individuais

Se você quiser reutilizar o projeto, exclua os recursos criados para o tutorial.

Limpar recursos do projeto do Google Cloud

  1. Exclua o repositório do Artifact Registry.

    gcloud artifacts repositories delete REPOSITORY \
       --location=LOCATION --async
    
  2. Exclua o bucket do Cloud Storage. O bucket sozinho não gera cobranças.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Revogar credenciais

  1. Revogue os papéis concedidos à conta de serviço do worker gerenciada pelo usuário. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    • roles/artifactregistry.reader
    gcloud projects remove-iam-policy-binding PROJECT_ID \
      --member=serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com \
      --role=SERVICE_ACCOUNT_ROLE
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

A seguir