Definir opções de pipeline do Dataflow

Nesta página, explicamos como definir opções de pipeline para jobs do Dataflow. Essas opções de pipeline configuram como e onde o pipeline é executado e quais recursos são usados.

A execução do pipeline é separada da execução do programa no Apache Beam. O programa Apache Beam que você escreveu cria um pipeline para execução adiada. Isso significa que o programa gera uma série de etapas que qualquer executor do Apache Beam compatível pode executar. Os executores compatíveis incluem o executor do Dataflow no Google Cloud e o executor direto que executa o pipeline diretamente em um ambiente local.

É possível transmitir parâmetros para um job do Dataflow no ambiente de execução. Para mais informações sobre como definir opções de pipeline no ambiente de execução, consulte Como configurar opções de pipeline.

Usar as opções de pipeline com os SDKs do Apache Beam

É possível usar os SDKs a seguir para definir opções de pipeline para jobs do Dataflow:

  • SDK do Apache Beam para Python
  • SDK do Apache Beam para Java
  • SDK do Apache Beam para Go

Para usar os SDKs, defina o executor de pipeline e outros parâmetros de execução usando a classe PipelineOptions do SDK do Apache Beam.

Há dois métodos para especificar as opções de pipeline:

  • Defina as opções de pipeline de maneira programática fornecendo uma lista de opções de pipeline.
  • Defina as opções do pipeline diretamente na linha de comando ao executar o código do pipeline.

Definir opções do pipeline de maneira programática

É possível definir opções de pipeline de maneira programática criando e modificando um objeto PipelineOptions.

Java

Crie um objeto PipelineOptions usando o método PipelineOptionsFactory.fromArgs.

Para ver um exemplo, consulte a seção Iniciar no exemplo do Dataflow nesta página.

Python

Criar um objeto PipelineOptions.

Para ver um exemplo, consulte a seção Iniciar no exemplo do Dataflow nesta página.

Go

A configuração programática de opções de pipeline usando PipelineOptions não é compatível com o SDK do Apache Beam para Go. Use argumentos de linha de comando do Go.

Para ver um exemplo, consulte a seção Iniciar no exemplo do Dataflow nesta página.

Definir opções de pipeline na linha de comando

É possível definir opções de pipeline usando argumentos da linha de comando.

Java

A sintaxe de exemplo a seguir é do pipeline WordCount no Guia de início rápido do Java.

mvn -Pdataflow-runner compile exec:java \
  -Dexec.mainClass=org.apache.beam.examples.WordCount \
  -Dexec.args="--project=PROJECT_ID \
  --gcpTempLocation=gs://BUCKET_NAME/temp/ \
  --output=gs://BUCKET_NAME/output \
  --runner=DataflowRunner \
  --region=REGION"

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • REGION: uma região do Dataflow, us-central1

Python

A sintaxe de exemplo a seguir é do pipeline WordCount no Guia de início rápido do Python.

python -m apache_beam.examples.wordcount \
  --region DATAFLOW_REGION \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output gs://STORAGE_BUCKET/results/outputs \
  --runner DataflowRunner \
  --project PROJECT_ID \
  --temp_location gs://STORAGE_BUCKET/tmp/

Substitua:

  • DATAFLOW_REGION: a região onde você quer implantar o job do Dataflow, por exemplo, europe-west1

    A sinalização --region substitui a região padrão definida no servidor de metadados, no cliente local ou nas variáveis de ambiente.

  • STORAGE_BUCKET: o nome do bucket do Cloud Storage

  • PROJECT_ID: o ID do projeto do Google Cloud

Go

A sintaxe de exemplo a seguir é do pipeline WordCount no Guia de início rápido do Go.

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
   --output gs://BUCKET_NAME/results/outputs \
   --runner dataflow \
   --project PROJECT_ID \
   --region DATAFLOW_REGION \
   --staging_location gs://BUCKET_NAME/binaries/

Substitua:

  • BUCKET_NAME: o nome do bucket do Cloud Storage

  • PROJECT_ID: o ID do projeto do Google Cloud

  • DATAFLOW_REGION: a região onde você quer implantar o job do Dataflow. Por exemplo, europe-west1. A flag --region substitui a região padrão definida no servidor de metadados, no cliente local ou nas variáveis de ambiente.

Definir opções experimentais de pipeline

Nos SDKs do Java, Python e Go, a opção de pipeline experiments ativa recursos experimentais ou pré-GA do Dataflow.

Definir programaticamente

Para definir a opção experiments de maneira programática, use a seguinte sintaxe.

Java

No objeto PipelineOptions, inclua a opção experiments usando a seguinte sintaxe. Este exemplo define o tamanho do disco de inicialização como 80 GB com a sinalização do experimento.

options.setExperiments("streaming_boot_disk_size_gb=80")

Para ver um exemplo de como criar o objeto PipelineOptions, consulte a seção Lançar no exemplo do Dataflow nesta página.

Python

No objeto PipelineOptions, inclua a opção experiments usando a seguinte sintaxe. Este exemplo define o tamanho do disco de inicialização como 80 GB com a sinalização do experimento.

beam_options = PipelineOptions(
  beam_args,
  experiments=['streaming_boot_disk_size_gb=80'])

Para ver um exemplo de como criar o objeto PipelineOptions, consulte a seção Lançar no exemplo do Dataflow nesta página.

Go

A configuração programática de opções de pipeline usando PipelineOptions não é compatível com o SDK do Apache Beam para Go. Use argumentos de linha de comando do Go.

Definir na linha de comando

Para definir a opção experiments na linha de comando, use a seguinte sintaxe:

Java

Este exemplo define o tamanho do disco de inicialização como 80 GB com a sinalização do experimento.

--experiments=streaming_boot_disk_size_gb=80

Python

Este exemplo define o tamanho do disco de inicialização como 80 GB com a sinalização do experimento.

--experiments=streaming_boot_disk_size_gb=80

Go

Este exemplo define o tamanho do disco de inicialização como 80 GB com a sinalização do experimento.

--experiments=streaming_boot_disk_size_gb=80

Definido em um modelo

Para ativar um recurso experimental ao executar um modelo do Dataflow, use a sinalização --additional-experiments.

Modelo clássico

gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]

modelo Flex

gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]

Acessar o objeto de opções de pipeline

Ao criar o objeto Pipeline no programa Apache Beam, transmita PipelineOptions. Quando o serviço Dataflow executa o pipeline, ele envia uma cópia do PipelineOptions para cada worker.

Java

Acesse PipelineOptions dentro de qualquer instância DoFn da transformação ParDo usando o método ProcessContext.getPipelineOptions.

Python

Esse recurso não está disponível no SDK do Apache Beam para Python.

Go

Acesse as opções do pipeline usando beam.PipelineOptions.

Iniciar no Dataflow

Execute seu job em recursos gerenciados do Google Cloud usando o serviço de execução do Dataflow. Executar o pipeline com o Dataflow cria um job do Dataflow que usa os recursos do Compute Engine e do Cloud Storage no projeto do Google Cloud. Para saber mais sobre as permissões do Dataflow, consulte Segurança e permissões do Dataflow.

Os jobs do Dataflow usam o Cloud Storage para armazenar arquivos temporários durante a execução do pipeline. Para evitar cobranças por custos desnecessários de armazenamento, desative o recurso de exclusão reversível nos buckets que os jobs do Dataflow usam para armazenamento temporário. Para mais informações, consulte Remover uma política de exclusão reversível de um bucket.

Definir as opções necessárias

Para executar o pipeline usando o Dataflow, defina as opções de pipeline a seguir:

Java

  • project: o ID do seu projeto do Google Cloud.
  • runner: o executor que executa o pipeline. Para a execução no Google Cloud, precisa ser DataflowRunner.
  • gcpTempLocation: um caminho do Cloud Storage para o Dataflow preparar a maioria dos arquivos temporários. Para especificar um bucket, crie o bucket com antecedência. Se você não definir gcpTempLocation, defina a opção de pipeline tempLocation e, em seguida, gcpTempLocation é definido como o valor de tempLocation. Se nenhum deles for especificado, um gcpTempLocation padrão será criado.
  • stagingLocation: um caminho do Cloud Storage para o Dataflow preparar os arquivos binários. Se você estiver usando o SDK 2.28 do Apache Beam ou uma versão mais recente, não defina essa opção. No SDK 2.28 do Apache Beam ou em versões anteriores, se você não definir essa opção, o que você especificou para tempLocation será usado no local de preparação.

    Será criado um gcpTempLocation padrão, se nem ele nem tempLocation for especificado. Se tempLocation for especificado e gcpTempLocation não for, tempLocation precisará ser um caminho do Cloud Storage e gcpTempLocation será o padrão. Se tempLocation não for especificado e gcpTempLocation for, tempLocation não será preenchido.

Python

  • project: é o ID do projeto do Google Cloud.
  • region: a região do job do Dataflow.
  • runner: o executor que executa o pipeline. Para a execução no Google Cloud, precisa ser DataflowRunner.
  • temp_location: um caminho do Cloud Storage para o Dataflow organizar os arquivos de job temporários criados durante a execução do pipeline.

Go

  • project: é o ID do projeto do Google Cloud.
  • region: a região do job do Dataflow.
  • runner: o executor que executa o pipeline. Para a execução no Google Cloud, precisa ser dataflow.
  • staging_location: um caminho do Cloud Storage para o Dataflow organizar os arquivos de job temporários criados durante a execução do pipeline.

Definir opções do pipeline de maneira programática

O código de exemplo a seguir mostra como construir um pipeline por meio da configuração programática do executor e outras opções necessárias para executar o pipeline usando o Dataflow.

Java

// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
    beam_args,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']

# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
  pass  # build your pipeline here.

Go

O SDK do Apache Beam para Go usa argumentos de linha de comando do Go. Use flag.Set() para definir valores de sinalização.

// Use the Go flag package to parse custom options.
flag.Parse()

// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")

beam.Init()

// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()

Depois de construir o pipeline, especifique todas as leituras, transformações e gravações dele e execute-o.

Usar opções de pipeline na linha de comando

O exemplo a seguir mostra como usar as opções de pipeline especificadas na linha de comando. Este exemplo não define as opções do pipeline de maneira programática:

Java

// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

Use o módulo argparse do Python para analisar opções de linha de comando.

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# For more details on how to use argparse, take a look at:
#   https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
    '--input-file',
    default='gs://dataflow-samples/shakespeare/kinglear.txt',
    help='The file path for the input text to process.')
parser.add_argument(
    '--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()

# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | 'Read files' >> beam.io.ReadFromText(args.input_file)
      | 'Write files' >> beam.io.WriteToText(args.output_path))

Go

Use o pacote flag do Go para analisar opções de linha de comando. Você precisa analisar as opções antes de chamar beam.Init(). Neste exemplo, output é uma opção de linha de comando.

// Define configuration options
var (
  output = flag.String("output", "", "Output file (required).")
)

// Parse options before beam.Init()
flag.Parse()

beam.Init()

// Input validation must be done after beam.Init()
if *output == "" {
  log.Fatal("No output provided!")
}

p := beam.NewPipeline()

Depois de construir o pipeline, especifique todas as leituras, transformações e gravações dele. Em seguida, execute o pipeline.

Controlar modos de execução

Quando um programa do Apache Beam executa um pipeline em um serviço como o Dataflow, ele pode executá-lo de maneira assíncrona ou bloquear até a conclusão do pipeline. É possível alterar esse comportamento usando as orientações a seguir.

Java

Quando um programa em Java do Apache Beam executa um pipeline em um serviço como o Dataflow, ele normalmente é executado de maneira assíncrona. Para executar um pipeline e aguardar até que o job seja concluído, defina DataflowRunner como o executor de pipeline e chame pipeline.run().waitUntilFinish() explicitamente.

Quando você usa DataflowRunner e chama waitUntilFinish() no objeto PipelineResult retornado de pipeline.run(), o pipeline é executado no Google Cloud, mas o código local aguarda o job para concluir e retornar o objeto DataflowPipelineJob final. Durante a execução do job, as atualizações de status dele e as mensagens no console são impressas no serviço do Cloud Dataflow.

Python

Quando um programa em Python do Apache Beam executa um pipeline em um serviço como o Dataflow, ele normalmente é executado de maneira assíncrona. Para bloquear até a conclusão do pipeline, use o método wait_until_finish() do objeto PipelineResult, retornado do método run() do executor.

Go

Quando um programa em Go do Apache Beam executa um pipeline no Dataflow, ele é síncrono por padrão e bloqueia até a conclusão do pipeline. Se você não quiser bloquear, há duas opções:

  1. Inicie o job em uma rotina do Go.

    go func() {
      pr, err := beamx.Run(ctx, p)
      if err != nil {
        // Handle the error.
      }
      // Send beam.PipelineResult into a channel.
      results <- pr
    }()
    // Do other operations while the pipeline runs.
    
  2. Use a sinalização de linha de comando --async, que está no pacote jobopts.

Para visualizar os detalhes de execução, monitorar o progresso e verificar o status de conclusão do job, use a interface de monitoramento do Dataflow ou a interface de linha de comando do Dataflow.

Usar origens de streaming

Java

Se o pipeline fizer leituras em uma fonte de dados sem limites, como o Pub/Sub, ele será executado automaticamente no modo de streaming.

Python

Se o pipeline usar uma fonte de dados ilimitada, como o Pub/Sub, você precisará definir a opção streaming como verdadeira.

Go

Se o pipeline fizer leituras em uma fonte de dados sem limites, como o Pub/Sub, ele será executado automaticamente no modo de streaming.

Por padrão, os jobs de streaming usam n1-standard-2 ou superior como tipo de máquina do Compute Engine.

Iniciar localmente

Em vez de executar o pipeline em recursos de nuvem gerenciados, você tem a opção de fazê-lo localmente. Isso proporciona certas vantagens nos testes, na depuração ou execução do pipeline em pequenos conjuntos de dados. Por exemplo, a execução local remove a dependência do serviço remoto do Dataflow e do projeto do Google Cloud associado.

Ao usar a execução local, você precisa executar o pipeline com conjuntos de dados pequenos o suficiente para caber na memória local. É possível criar um conjunto de dados pequeno na memória usando uma transformação Create ou usar uma transformação Read para trabalhar com arquivos pequenos locais ou remotos. Normalmente, a execução local oferece uma maneira mais rápida e fácil de realizar testes e depuração com menos dependências externas, mas é limitada pela memória disponível no ambiente local.

No exemplo de código a seguir, há uma demonstração de como construir um pipeline que pode ser executado no ambiente local.

Java

// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()

# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)

with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | beam.io.ReadFromText(args.input)
      | beam.io.WriteToText(args.output))

Go

// Parse options before beam.Init()
flag.Parse()

beam.Init()

p := beam.NewPipeline()

Depois de construir o canal, execute-o.

Criar opções de pipeline personalizadas

Além do PipelineOptions padrão, é possível adicionar suas próprias opções personalizadas. A linha de comando do Apache Beam também pode analisar opções personalizadas usando argumentos de linha de comando especificados no mesmo formato.

Java

Para adicionar as suas opções, defina uma interface com os métodos getter e setter para cada uma delas, como no exemplo a seguir:

public interface MyOptions extends PipelineOptions {
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Python

Para adicionar suas próprias opções, use o método add_argument(), que se comporta da mesma forma que o módulo argparse padrão do Python, como no exemplo a seguir:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Go

Para adicionar suas próprias opções, use o pacote de sinalização Go, conforme mostrado no exemplo a seguir.

var (
  input  = flag.String("input", "", "")
  output = flag.String("output", "", "")
)

Além de um valor padrão, também é possível especificar uma descrição, que aparece quando um usuário transmite --help como um argumento de linha de comando.

Java

Defina a descrição e o valor padrão usando anotações, da seguinte maneira:

public interface MyOptions extends PipelineOptions {
  @Description("My custom command line argument.")
  @Default.String("DEFAULT")
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Recomendamos que você registre sua interface em PipelineOptionsFactory e a transmita ao criar o objeto PipelineOptions. Se você fizer o registro da interface em PipelineOptionsFactory, o --help poderá encontrar sua interface de opções personalizadas e a adicionar à saída do comando --help. PipelineOptionsFactory valida se suas opções personalizadas são compatíveis com todas as outras opções registradas.

No código de exemplo a seguir, há uma demonstração de como registrar a interface de opções personalizadas com PipelineOptionsFactory:

PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                          .withValidation()
                                          .as(MyOptions.class);

Agora seu pipeline pode aceitar --myCustomOption=value como um argumento de linha de comando.

Python

Defina a descrição e o valor padrão da seguinte maneira:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        default='gs://dataflow-samples/shakespeare/kinglear.txt',
        help='The file path for the input text to process.')
    parser.add_argument(
        '--output', required=True, help='The path prefix for output files.')

Go

Defina a descrição e o valor padrão da seguinte maneira:

var (
  input  = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
  output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)