Defina as opções da pipeline do Dataflow

Esta página explica como definir as opções de pipeline para os seus trabalhos do Dataflow. Estas opções de pipeline configuram como e onde o seu pipeline é executado e que recursos usa.

A execução do pipeline é separada da execução do programa Apache Beam. O programa Apache Beam que escreveu cria um pipeline para execução diferida. Isto significa que o programa gera uma série de passos que qualquer executor do Apache Beam suportado pode executar. Os executores compatíveis incluem o executor do Dataflow no Google Cloud e o executor direto que executa o pipeline diretamente num ambiente local.

Pode transmitir parâmetros para uma tarefa do Dataflow no momento da execução. Para mais informações sobre como definir opções de pipeline em tempo de execução, consulte o artigo Configurar opções de pipeline.

Use opções de pipeline com SDKs do Apache Beam

Pode usar os seguintes SDKs para definir opções de pipeline para tarefas do Dataflow:

  • SDK Apache Beam para Python
  • SDK Apache Beam para Java
  • SDK do Apache Beam para Go

Para usar os SDKs, define o executor do pipeline e outros parâmetros de execução através da classe PipelineOptions do SDK Apache Beam.

Existem dois métodos para especificar opções de pipeline:

  • Defina as opções do pipeline de forma programática, fornecendo uma lista de opções do pipeline.
  • Defina as opções do pipeline diretamente na linha de comandos quando executar o código do pipeline.

Defina opções de pipelines de forma programática

Pode definir opções de pipeline de forma programática criando e modificando um objeto PipelineOptions.

Java

Construa um objeto PipelineOptions usando o método PipelineOptionsFactory.fromArgs.

Para ver um exemplo, consulte a secção Inicie no exemplo do Dataflow nesta página.

Python

Crie um objeto PipelineOptions.

Para ver um exemplo, consulte a secção Inicie no exemplo do Dataflow nesta página.

Go

A definição de opções de pipeline de forma programática através de PipelineOptions não é suportada no SDK do Apache Beam para Go. Use argumentos da linha de comandos do Go.

Para ver um exemplo, consulte a secção Inicie no exemplo do Dataflow nesta página.

Defina opções de pipeline na linha de comandos

Pode definir opções de pipeline através de argumentos da linha de comandos.

Java

A sintaxe de exemplo seguinte é do pipeline WordCount no tutorial de Java.

mvn -Pdataflow-runner compile exec:java \
  -Dexec.mainClass=org.apache.beam.examples.WordCount \
  -Dexec.args="--project=PROJECT_ID \
  --gcpTempLocation=gs://BUCKET_NAME/temp/ \
  --output=gs://BUCKET_NAME/output \
  --runner=DataflowRunner \
  --region=REGION"

Substitua o seguinte:

  • PROJECT_ID: o ID do seu Google Cloud projeto
  • BUCKET_NAME: o nome do seu contentor do Cloud Storage
  • REGION: a região do Dataflow>, us-central1

Python

A sintaxe do exemplo seguinte é da pipeline WordCount no tutorial do Python.

python -m apache_beam.examples.wordcount \
  --region DATAFLOW_REGION \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output gs://STORAGE_BUCKET/results/outputs \
  --runner DataflowRunner \
  --project PROJECT_ID \
  --temp_location gs://STORAGE_BUCKET/tmp/

Substitua o seguinte:

  • DATAFLOW_REGION: a região onde quer implementar a tarefa do Dataflow, por exemplo, europe-west1

    A flag --region substitui a região predefinida definida no servidor de metadados, no cliente local ou nas variáveis de ambiente.

  • STORAGE_BUCKET: o nome do contentor do Cloud Storage

  • PROJECT_ID: o Google Cloud ID do projeto

Go

A sintaxe de exemplo seguinte é da pipeline WordCount no tutorial do Go.

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
   --output gs://BUCKET_NAME/results/outputs \
   --runner dataflow \
   --project PROJECT_ID \
   --region DATAFLOW_REGION \
   --staging_location gs://BUCKET_NAME/binaries/

Substitua o seguinte:

  • BUCKET_NAME: o nome do contentor do Cloud Storage

  • PROJECT_ID: o Google Cloud ID do projeto

  • DATAFLOW_REGION: a região onde quer implementar a tarefa do Dataflow. Por exemplo, europe-west1. A flag --region substitui a região predefinida definida no servidor de metadados, no cliente local ou nas variáveis de ambiente.

Defina opções de pipeline experimentais

Nos SDKs Java, Python e Go, a experiments opção de pipeline ativa funcionalidades experimentais ou de pré-GA do Dataflow.

Definido programaticamente

Para definir a opção experiments programaticamente, use a seguinte sintaxe.

Java

No objeto PipelineOptions, inclua a opção experiments através da seguinte sintaxe. Este exemplo define o tamanho do disco de arranque para 80 GB com a flag da experiência.

options.setExperiments("streaming_boot_disk_size_gb=80")

Para ver um exemplo que mostra como criar o objeto PipelineOptions, consulte a secção Exemplo de lançamento no Dataflow nesta página.

Python

No objeto PipelineOptions, inclua a opção experiments através da seguinte sintaxe. Este exemplo define o tamanho do disco de arranque para 80 GB com a flag da experiência.

beam_options = PipelineOptions(
  beam_args,
  experiments=['streaming_boot_disk_size_gb=80'])

Para ver um exemplo que mostra como criar o objeto PipelineOptions, consulte a secção Exemplo de lançamento no Dataflow nesta página.

Go

A definição de opções de pipeline de forma programática através de PipelineOptions não é suportada no SDK do Apache Beam para Go. Use argumentos da linha de comandos do Go.

Definido na linha de comandos

Para definir a opção experiments na linha de comandos, use a seguinte sintaxe.

Java

Este exemplo define o tamanho do disco de arranque para 80 GB com a flag da experiência.

--experiments=streaming_boot_disk_size_gb=80

Python

Este exemplo define o tamanho do disco de arranque para 80 GB com a flag da experiência.

--experiments=streaming_boot_disk_size_gb=80

Go

Este exemplo define o tamanho do disco de arranque para 80 GB com a flag da experiência.

--experiments=streaming_boot_disk_size_gb=80

Definir num modelo

Para ativar uma funcionalidade experimental quando executa um modelo do Dataflow, use a flag --additional-experiments.

Modelo clássico

gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]

Modelo Flex

gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]

Aceda ao objeto de opções da pipeline

Quando cria o objeto Pipeline no seu programa Apache Beam, transmita PipelineOptions. Quando o serviço Dataflow executa o pipeline, envia uma cópia do PipelineOptions a cada trabalhador.

Java

Aceda a PipelineOptions dentro de qualquer instância ParDo de transformação através do método DoFn.ProcessContext.getPipelineOptions

Python

Esta funcionalidade não é suportada no SDK Apache Beam para Python.

Go

Aceder às opções de pipeline através de beam.PipelineOptions.

Inicie no Dataflow

Execute a sua tarefa em recursos Google Cloud geridos através do serviço de execução do Dataflow. A execução do pipeline com o Dataflow cria uma tarefa do Dataflow, que usa recursos do Compute Engine e do Cloud Storage no seu Google Cloudprojeto. Para informações sobre as autorizações do Dataflow, consulte o artigo Segurança e autorizações do Dataflow.

As tarefas do Dataflow usam o Cloud Storage para armazenar ficheiros temporários durante a execução do pipeline. Para evitar a faturação de custos de armazenamento desnecessários, desative a funcionalidade de eliminação temporária em contentores que os seus trabalhos do Dataflow usam para armazenamento temporário. Para mais informações, consulte o artigo Desative a eliminação temporária.

Defina as opções necessárias

Para executar o pipeline através do Dataflow, defina as seguintes opções do pipeline:

Java

  • project: o ID do seu projeto do Google Cloud .
  • runner: o executor de pipelines que executa o seu pipeline. Para a execução deGoogle Cloud , este valor tem de ser DataflowRunner.
  • gcpTempLocation: um caminho do Cloud Storage para o Dataflow preparar a maioria dos ficheiros temporários. O contentor especificado já tem de existir.

    Se não especificar gcpTempLocation, o Dataflow usa o valor da opção tempLocation. Se não especificar nenhuma destas opções, o Dataflow cria um novo contentor do Cloud Storage.

Python

  • project: o ID do seu projeto Google Cloud .
  • region: a região da sua tarefa do Dataflow.
  • runner: o executor de pipelines que executa o seu pipeline. Para a execução deGoogle Cloud , este valor tem de ser DataflowRunner.
  • temp_location: um caminho do Cloud Storage para o Dataflow preparar ficheiros de tarefas temporários criados durante a execução do pipeline.

Go

  • project: o ID do seu projeto Google Cloud .
  • region: a região da sua tarefa do Dataflow.
  • runner: o executor de pipelines que executa o seu pipeline. Para a execução deGoogle Cloud , este valor tem de ser dataflow.
  • staging_location: um caminho do Cloud Storage para o Dataflow preparar ficheiros de tarefas temporários criados durante a execução do pipeline.

Defina opções de pipelines de forma programática

O código de exemplo seguinte mostra como criar um pipeline definindo programaticamente o executor e outras opções necessárias para executar o pipeline através do Dataflow.

Java

// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
    beam_args,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']

# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
  pass  # build your pipeline here.

Go

O SDK Apache Beam para Go usa argumentos da linha de comandos do Go. Use flag.Set() para definir valores de flags.

// Use the Go flag package to parse custom options.
flag.Parse()

// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")

beam.Init()

// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()

Depois de construir o pipeline, especifique todas as leituras, transformações e escritas do pipeline e execute-o.

Use opções de pipeline a partir da linha de comandos

O exemplo seguinte mostra como usar opções de pipeline especificadas na linha de comandos. Este exemplo não define as opções do pipeline de forma programática.

Java

// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

Use o módulo argparse do Python para analisar as opções da linha de comandos.

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# For more details on how to use argparse, take a look at:
#   https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
    '--input-file',
    default='gs://dataflow-samples/shakespeare/kinglear.txt',
    help='The file path for the input text to process.')
parser.add_argument(
    '--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()

# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | 'Read files' >> beam.io.ReadFromText(args.input_file)
      | 'Write files' >> beam.io.WriteToText(args.output_path))

Go

Use o pacote Go flag para analisar as opções da linha de comandos. Tem de analisar as opções antes de chamar beam.Init(). Neste exemplo, output é uma opção da linha de comandos.

// Define configuration options
var (
  output = flag.String("output", "", "Output file (required).")
)

// Parse options before beam.Init()
flag.Parse()

beam.Init()

// Input validation must be done after beam.Init()
if *output == "" {
  log.Fatal("No output provided!")
}

p := beam.NewPipeline()

Depois de construir o pipeline, especifique todas as leituras, transformações e escritas do pipeline e, em seguida, execute-o.

Controle os modos de execução

Quando um programa Apache Beam executa um pipeline num serviço como o Dataflow, o programa pode executar o pipeline de forma assíncrona ou pode bloquear até à conclusão do pipeline. Pode alterar este comportamento seguindo as orientações abaixo.

Java

Quando um programa Java do Apache Beam executa um pipeline num serviço como o Dataflow, é normalmente executado de forma assíncrona. Para executar um pipeline e aguardar até que a tarefa seja concluída, defina DataflowRunner como o executor de pipelines e chame explicitamente pipeline.run().waitUntilFinish().

Quando usa DataflowRunner e chama waitUntilFinish() no objeto PipelineResult devolvido por pipeline.run(), o pipeline é executado em Google Cloud , mas o código local aguarda a conclusão da tarefa na nuvem e devolve o objeto DataflowPipelineJob final. Enquanto a tarefa é executada, o serviço Dataflow imprime atualizações do estado da tarefa e mensagens da consola enquanto aguarda.

Python

Quando um programa Python do Apache Beam executa um pipeline num serviço como o Dataflow, é normalmente executado de forma assíncrona. Para bloquear até à conclusão do pipeline, use o método wait_until_finish() do objeto PipelineResult, devolvido pelo método run() do executor.

Go

Quando um programa Apache Beam Go executa um pipeline no Dataflow, é síncrono por predefinição e bloqueia até à conclusão do pipeline. Se não quiser bloquear, tem duas opções:

  1. Inicie a tarefa numa rotina do Go.

    go func() {
      pr, err := beamx.Run(ctx, p)
      if err != nil {
        // Handle the error.
      }
      // Send beam.PipelineResult into a channel.
      results <- pr
    }()
    // Do other operations while the pipeline runs.
    
  2. Use o sinalizador de linha de comandos --async, que se encontra no pacote jobopts.

Para ver detalhes de execução, monitorizar o progresso e verificar o estado de conclusão da tarefa, use a interface de monitorização do Dataflow ou a interface de linha de comandos do Dataflow.

Use origens de streaming

Java

Se o pipeline ler a partir de uma origem de dados ilimitada, como o Pub/Sub, o pipeline é executado automaticamente no modo de streaming.

Python

Se o seu pipeline usar uma origem de dados ilimitada, como o Pub/Sub, tem de definir a opção streaming como verdadeira.

Go

Se o pipeline ler a partir de uma origem de dados ilimitada, como o Pub/Sub, o pipeline é executado automaticamente no modo de streaming.

As tarefas de streaming usam um tipo de máquina do Compute Engine de n1-standard-2 ou superior por predefinição.

Inicie localmente

Em vez de executar o pipeline em recursos de nuvem geridos, pode optar por executar o pipeline localmente. A execução local tem determinadas vantagens para testar, depurar ou executar o seu pipeline em pequenos conjuntos de dados. Por exemplo, a execução local remove a dependência do serviço Dataflow remoto e do projeto Google Cloud associado.

Quando usa a execução local, tem de executar o pipeline com conjuntos de dados suficientemente pequenos para caberem na memória local. Pode criar um pequeno conjunto de dados na memória usando uma transformação Create ou usar uma transformação Read para trabalhar com pequenos ficheiros locais ou remotos. Normalmente, a execução local oferece uma forma mais rápida e fácil de realizar testes e depuração com menos dependências externas, mas está limitada pela memória disponível no seu ambiente local.

O exemplo de código seguinte mostra como criar um pipeline que é executado no seu ambiente local.

Java

// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()

# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)

with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | beam.io.ReadFromText(args.input)
      | beam.io.WriteToText(args.output))

Go

// Parse options before beam.Init()
flag.Parse()

beam.Init()

p := beam.NewPipeline()

Depois de criar o pipeline, execute-o.

Crie opções de pipeline personalizadas

Pode adicionar as suas próprias opções personalizadas, além das opções padrão PipelineOptions. A linha de comandos do Apache Beam também pode analisar opções personalizadas através de argumentos de linha de comandos especificados no mesmo formato.

Java

Para adicionar as suas próprias opções, defina uma interface com métodos getter e setter para cada opção, como no exemplo seguinte:

public interface MyOptions extends PipelineOptions {
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Python

Para adicionar as suas próprias opções, use o método add_argument() (que se comporta exatamente como o módulo argparse padrão do Python), como no exemplo seguinte:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Go

Para adicionar as suas próprias opções, use o pacote de flags Go, conforme mostrado no exemplo seguinte:

var (
  input  = flag.String("input", "", "")
  output = flag.String("output", "", "")
)

Também pode especificar uma descrição, que é apresentada quando um utilizador transmite --help como um argumento de linha de comandos, e um valor predefinido.

Java

Define a descrição e o valor predefinido através de anotações, da seguinte forma:

public interface MyOptions extends PipelineOptions {
  @Description("My custom command line argument.")
  @Default.String("DEFAULT")
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Recomendamos que registe a sua interface com PipelineOptionsFactory e, em seguida, transmita a interface quando criar o objeto PipelineOptions. Quando regista a sua interface com o PipelineOptionsFactory, o --help pode encontrar a sua interface de opções personalizadas e adicioná-la ao resultado do comando --help. PipelineOptionsFactory valida se as suas opções personalizadas são compatíveis com todas as outras opções registadas.

O exemplo de código seguinte mostra como registar a sua interface de opções personalizadas com PipelineOptionsFactory:

PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                          .withValidation()
                                          .as(MyOptions.class);

Agora, o seu pipeline pode aceitar --myCustomOption=value como um argumento de linha de comandos.

Python

Define a descrição e o valor predefinido da seguinte forma:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        default='gs://dataflow-samples/shakespeare/kinglear.txt',
        help='The file path for the input text to process.')
    parser.add_argument(
        '--output', required=True, help='The path prefix for output files.')

Go

Define a descrição e o valor predefinido da seguinte forma:

var (
  input  = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
  output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)