Como especificar parâmetros de execução de pipeline

Depois que o pipeline é construído no programa Apache Beam, é necessário executá-lo. A execução do canal ocorre separadamente da execução do programa. Quando o programa Apache Beam constrói o canal, uma série de etapas é gerada no código que você escreveu para processamento em um executor do canal. O executor do pipeline pode ser o serviço gerenciado do Dataflow no Google Cloud, um serviço de terceiros ou um executor de pipeline local que execute as etapas diretamente no ambiente local.

É possível especificar o executor de pipeline e outras opções de execução usando a classe PipelineOptions do SDK do Apache Beam. Use PipelineOptions para configurar como e onde o pipeline será executado e quais recursos ele utilizará.

Na maioria das vezes, você quer que o pipeline seja executado em recursos gerenciados do Google Cloud usando o serviço de execução do Dataflow. Executar o pipeline com o serviço Dataflow cria um job do Dataflow que usa os recursos do Compute Engine e do Cloud Storage no projeto do Google Cloud.

Também é possível executar o canal localmente. Quando você executa o pipeline localmente, as transformações do pipeline são executadas na mesma máquina em que o programa do Dataflow é executado. A execução local será útil para fins de teste e depuração, especialmente se o canal puder usar conjuntos de dados menores na memória.

Como definir PipelineOptions

Transmita PipelineOptions ao criar o objeto Pipeline no programa do Dataflow. Quando o serviço Dataflow executa o pipeline, ele envia uma cópia do PipelineOptions para cada instância de worker.

Java: SDK 2.x

Observação: é possível acessar PipelineOptions em qualquer instância DoFn de ParDo usando o método ProcessContext.getPipelineOptions.

Python

Esse recurso ainda não está disponível no SDK do Apache Beam para Python.

Java: SDK 1.x

Como definir PipelineOptions a partir de argumentos da linha de comando

Embora seja possível configurar o pipeline criando um objeto PipelineOptions e definindo os campos diretamente, os SDKs do Apache Beam incluem um analisador de linha de comando que pode ser usado para definir campos em PipelineOptions usando argumentos de linha de comando.

Java: SDK 2.x

Para ler as opções na linha de comando, construa o objeto PipelineOptions usando o método PipelineOptionsFactory.fromArgs, conforme mostrado no seguinte código de exemplo:

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

Observação: anexar o método .withValidation faz com que o Dataflow verifique os argumentos obrigatórios da linha de comando e valide os valores dos argumentos.

O uso de PipelineOptionsFactory.fromArgs interpreta os argumentos de linha de comando que seguem o formato:

--<option>=<value>

Quando você cria PipelineOptions dessa forma, pode especificar qualquer uma das opções em qualquer subinterface de org.apache.beam.sdk.options.PipelineOptions (em inglês) como um argumento de linha de comando.

Python

Para ler as opções na linha de comando, crie o objeto PipelineOptions conforme mostrado no seguinte código de exemplo:

from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions(flags=argv)

O argumento (flags=argv) para PipelineOptions interpreta os argumentos de linha de comando que seguem o formato:

--<option>=<value>

Quando você cria PipelineOptions dessa maneira, pode especificar qualquer uma das opções como subclasse de PipelineOptions.

Java: SDK 1.x

Como criar opções personalizadas

Além do PipelineOptions padrão, é possível adicionar suas próprias opções personalizadas. Com o analisador de linha de comando do Dataflow, também é possível configurar essas opções usando argumentos de linha de comando especificados no mesmo formato.

Java: SDK 2.x

Para adicionar suas próprias opções, defina uma interface com os métodos getter e setter para cada uma delas, como no exemplo a seguir:

  public interface MyOptions extends PipelineOptions {
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Python

Para adicionar suas próprias opções, use o método add_argument(), que se comporta da mesma forma que o módulo argparse (em inglês) padrão do Python, como no exemplo a seguir:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Java: SDK 1.x

Além de um valor padrão, também é possível especificar uma descrição, que aparece quando um usuário transmite --help como um argumento de linha de comando.

Java: SDK 2.x

Defina a descrição e o valor padrão usando anotações, da seguinte maneira:

  public interface MyOptions extends PipelineOptions {
    @Description("My custom command line argument.")
    @Default.String("DEFAULT")
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Recomendamos que você registre sua interface em PipelineOptionsFactory e a transmita ao criar o objeto PipelineOptions. Se você fizer o registro da interface em PipelineOptionsFactory, o --help poderá encontrar sua interface de opções personalizadas e a adicionar à saída do comando --help. PipelineOptionsFactory também confirmará que as suas opções personalizadas são compatíveis com todas as outras opções registradas.

No código de exemplo a seguir, há uma demonstração de como registrar a interface de opções personalizadas em PipelineOptionsFactory:

  PipelineOptionsFactory.register(MyOptions.class);
  MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                            .withValidation()
                                            .as(MyOptions.class);

Agora seu pipeline pode aceitar --myCustomOption=value como um argumento de linha de comando.

Python

Defina a descrição e o valor padrão da seguinte maneira:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        help='Input for the pipeline',
        default='gs://my-bucket/input')
    parser.add_argument(
        '--output',
        help='Output for the pipeline',
        default='gs://my-bucket/output')

Java: SDK 1.x

Como configurar PipelineOptions para execução no serviço do Cloud Dataflow

Para executar o pipeline usando o serviço gerenciado do Dataflow, você precisa definir os seguintes campos em PipelineOptions:

Java: SDK 2.x

  • project: o ID do seu projeto do Google Cloud.
  • runner: o executor de pipeline que analisará o programa e construirá o pipeline. Para a execução na nuvem, precisa ser DataflowRunner.
  • gcpTempLocation: um caminho do Cloud Storage para o Dataflow organizar qualquer arquivo temporário. É preciso criar esse bucket antes de executar o pipeline. Se você não especificar gcpTempLocation, especifique a opção de pipeline tempLocation e, em seguida, gcpTempLocation será definido como o valor tempLocation. Caso nenhum deles seja especificado, será criado um gcpTempLocation padrão.
  • stagingLocation: um bucket do Cloud Storage para o Dataflow preparar os arquivos binários. Caso esta opção não seja definida, o que você especificou para tempLocation será usado também para o local de preparo.
  • Se nem ele nem tempLocation for especificado, será criado um gcpTempLocation padrão. Se tempLocation for especificado, mas gcpTempLocation não for, tempLocation precisará ser um caminho do Cloud Storage e gcpTempLocation será definido como o padrão dele. Se tempLocation não for especificado, mas gcpTempLocation for, tempLocation não será preenchido.

Observação: se você usar o SDK do Apache Beam para Java 2.15.0 ou mais recente, também precisará especificar region.

Python

  • project: o ID do seu projeto do Google Cloud.
  • runner: o executor de pipeline que analisará o programa e construirá o pipeline. Para a execução na nuvem, precisa ser DataflowRunner.
  • staging_location: um caminho do Cloud Storage para o Dataflow preparar os pacotes de códigos necessários para os workers que executam o job.
  • temp_location: um caminho do Cloud Storage para o Dataflow organizar os arquivos de job temporários criados durante a execução do pipeline.

Observação: se você usar o SDK do Apache Beam para Python 2.15.0 ou mais recente, também precisará especificar region.

Java: SDK 1.x

Configure essas opções por programação ou especifique-as usando a linha de comando. O código de exemplo a seguir mostra como criar um pipeline configurando de maneira programática o executor, assim como outras opções necessárias para executar o pipeline usando o serviço gerenciado do Dataflow.

Java: SDK 2.x

  // Create and set your PipelineOptions.
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowRunner.
  options.setProject("my-project-id");
  options.setStagingLocation("gs://my-bucket/binaries");
  options.setRunner(DataflowRunner.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
options = PipelineOptions(
    flags=argv,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')

# Create the Pipeline with the specified options.
# with beam.Pipeline(options=options) as pipeline:
#   pass  # build your pipeline here.

Java: SDK 1.x

Depois de construir o canal, especifique todas as leituras, transformações e gravações dele e execute-o.

No código a seguir, você tem um exemplo de como criar as opções necessárias para a execução do serviço Dataflow usando a linha de comando:

Java: SDK 2.x

  // Create and set your PipelineOptions.
  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam

parser = argparse.ArgumentParser()
parser.add_argument('--input')
parser.add_argument('--output')
args, beam_args = parser.parse_known_args(argv)

# Create the Pipeline with remaining arguments.
with beam.Pipeline(argv=beam_args) as pipeline:
  lines = pipeline | 'Read files' >> beam.io.ReadFromText(args.input)
  lines | 'Write files' >> beam.io.WriteToText(args.output)

Java: SDK 1.x

Depois de construir o canal, especifique todas as leituras, transformações e gravações dele e execute-o.

Java: SDK 2.x

Ao transmitir as opções obrigatórias na linha de comando, use as opções --project, --runner, --gcpTempLocation e, como alternativa, --stagingLocation.

Python

Ao transmitir as opções obrigatórias na linha de comando, use as opções --project, --runner e --staging_location.

Java: SDK 1.x

Execução assíncrona

Java: SDK 2.x

O uso de DataflowRunner faz com que o pipeline seja executado de forma assíncrona na nuvem do Google. Durante a execução do canal, monitore o andamento do job, veja detalhes da execução e receba atualizações sobre os resultados do canal usando a interface de monitoramento do Dataflow ou a interface de linha de comando do Dataflow.

Python

O uso de DataflowRunner faz com que o pipeline seja executado de forma assíncrona na nuvem do Google. Durante a execução do canal, monitore o andamento do job, veja detalhes da execução e receba atualizações sobre os resultados do canal usando a interface de monitoramento do Dataflow ou a interface de linha de comando do Dataflow.

Java: SDK 1.x

Execução síncrona

Java: SDK 2.x

Especifique DataflowRunner como o executor de pipeline e chame pipeline.run().waitUntilFinish() explicitamente.

Quando você usa DataflowRunner e chama waitUntilFinish() no objeto PipelineResult retornado de pipeline.run(), o pipeline é executado na nuvem, mas o código local aguarda a conclusão do job de nuvem e, em seguida, retorna o objeto DataflowPipelineJob final. Enquanto o job é executado, o serviço Dataflow imprime atualizações de status do job e mensagens do console.

Se você foi um usuário do Java SDK 1.x e usou --runner BlockingDataflowPipelineRunner na linha de comando para induzir interativamente seu programa principal a bloquear até o pipeline ser encerrado, com o Java 2.x esse programa precisa chamar explicitamente waitUntilFinish().

Python

Para bloquear até a conclusão do pipeline, use o método wait_until_finish() do objeto PipelineResult, retornado do método run() do executor.

Java: SDK 1.x

Observação: se você digitar Ctrl+C na linha de comando, o job não será cancelado. O serviço do Dataflow ainda está executando o job no Google Cloud. Para cancelar o job, você precisará usar a Interface de monitoramento do Dataflow ou a Interface de linha de comando do Dataflow.

Execução em streaming

Java: SDK 2.x

Se o pipeline fizer leituras em uma fonte de dados sem limites, como o Cloud Pub/Sub, ele será executado automaticamente no modo de streaming.

Se o pipeline usar coletores e fontes de dados sem limites, será necessário escolher uma estratégia de gestão de janelas para as PCollections ilimitadas antes de usar qualquer agregação, como GroupByKey.

Python

Se o pipeline usar um coletor ou fonte de dados ilimitados, como o Pub/Sub, você precisará definir a opção streaming como "true".

Por padrão, os jobs de streaming usam n1-standard-2 ou superior como tipo de máquina do Compute Engine. Não modifique esse valor porque n1-standard-2 é o tipo de máquina mínimo exigido para executar jobs de streaming.

Se o pipeline usar fontes de dados e coletores ilimitados, será necessário escolher uma estratégia de gestão de janelas para as PCollections ilimitadas antes de usar qualquer agregação, como GroupByKey.

Java: SDK 1.x

Como definir outras opções de pipeline do Cloud Dataflow

Para executar o pipeline na nuvem, defina de maneira programática os seguintes campos no objeto PipelineOptions:

Java: SDK 2.x

Campo Tipo Descrição Valor padrão
runner Class (NameOfRunner) O PipelineRunner a ser usado. Esse campo permite determinar PipelineRunner no ambiente de execução. DirectRunner (modo local)
streaming boolean Especifica se o modo de streaming está ativado ou desativado. O valor true significa ativado. Se o pipeline fizer leituras em uma fonte ilimitada, o valor padrão será true. Caso contrário, false.
project String O código do projeto do Google Cloud. Isso é necessário se você quiser executar o pipeline usando o serviço gerenciado do Dataflow. Por padrão, se não tiver sido definido, o projeto atualmente configurado no Cloud SDK será usado.
jobName String O nome do job do Dataflow que está sendo executado na lista de jobs do Dataflow e os detalhes do job. Também usado ao atualizar um pipeline existente. O Dataflow gera um nome exclusivo automaticamente.
gcpTempLocation String Caminho do Cloud Storage para arquivos temporários. Precisa ser um URL do Cloud Storage válido, começando com gs://.
stagingLocation String Caminho do Cloud Storage para preparar arquivos locais. Precisa ser um URL do Cloud Storage válido, começando com gs://. Se não for definido, será usado como padrão o que você especificou para tempLocation.
autoscalingAlgorithm String O modo de escalonamento automático do job do Dataflow. Os valores possíveis são THROUGHPUT_BASED para ativar o escalonamento automático ou NONE para desativá-lo. Consulte Recursos de ajuste automático para saber mais sobre o funcionamento do escalonamento automático no serviço gerenciado do Dataflow. O padrão é THROUGHPUT_BASED para todos os jobs em lote do Dataflow e para jobs de streaming que usam o Streaming Engine. O padrão é NONE para jobs de streaming que não usam o Streaming Engine.
numWorkers int O número inicial de instâncias do Google Compute Engine a serem usadas na execução do canal. Essa opção determina quantos trabalhadores são iniciados no serviço Dataflow quando o job começa. Se não for especificado, o serviço Dataflow determinará um número adequado de workers.
maxNumWorkers int O número máximo de instâncias do Compute Engine a serem disponibilizadas para o canal durante a execução. Observe que esse valor pode ser superior ao número inicial de workers, especificado por numWorkers para permitir que o job seja escalonado automaticamente ou não. Se não foi especificado, um número adequado de workers é determinado no serviço Dataflow.
numberOfWorkerHarnessThreads int O número de linhas de execução por uso de worker. Se não for especificado, o serviço do Dataflow determina um número apropriado de linhas de execução por worker.
region String Especifica um endpoint regional para implantar os jobs do Dataflow. Se não for definido, o padrão será us-central1.
workerRegion String

Especifica uma região do Compute Engine para iniciar instâncias de worker para executar o pipeline. Essa opção é usada para executar workers em um local diferente do region usado para implantar, gerenciar e monitorar jobs. A zona de workerRegion é atribuída automaticamente.

Observação: essa opção não pode ser combinada com workerZone ou zone.

Se não for definido, usará o valor padrão region.
workerZone String

Especifica uma zona do Compute Engine para inicialização de instâncias de worker a fim de executar o pipeline. Essa opção é usada para executar workers em um local diferente do region usado para implantar, gerenciar e monitorar jobs.

Observação: essa opção não pode ser combinada com workerRegion ou zone.

Se você especificar region ou workerRegion, workerZone usará como padrão uma zona da região correspondente. É possível modificar esse comportamento especificando uma zona diferente.
zone String (Obsoleto) Para o SDK 2.17.0 ou anterior do Apache Beam, isso especifica a zona do Compute Engine para iniciar instâncias de worker para executar o pipeline. Se você especificar region, zone usará como padrão uma zona da região correspondente. É possível modificar esse comportamento especificando uma zona diferente.
dataflowKmsKey String Especifica a chave de criptografia gerenciada pelo cliente (CMEK, na sigla em inglês) usada para criptografar dados em repouso. É possível controlar a chave de criptografia por meio do Cloud KMS. Para usar este recurso, você também precisa especificar gcpTempLocation. Se não for especificado, o Dataflow usará a criptografia padrão do Google Cloud em vez de um CMEK.
flexRSGoal String Especifica o Flexible Resource Scheduling (FlexRS) para jobs em lote com escalonamento automático. Afeta os parâmetros numWorkers, autoscalingAlgorithm, zone, region e workerMachineType. Para mais informações, consulte a seção de opções de pipeline do FlexRS. Se não for especificado, será utilizado o padrão SPEED_OPTIMIZED, ou seja, o comportamento será igual à omissão desse sinalizador. Para ativar o FlexRS, especifique o valor COST_OPTIMIZED para permitir que o serviço Dataflow escolha outro recurso disponível de melhor custo-benefício.
filesToStage List<String> Uma lista dos arquivos locais, diretórios ou arquivos .jar, .zip que ficam disponíveis para cada worker. Quando essa opção é definida, apenas o upload dos arquivos especificados é feito, e o classpath do Java é ignorado. Especifique todos os recursos na ordem correta do classpath. Esses recursos não se limitam ao código. Eles também podem incluir arquivos de configuração e outros recursos que precisam ser disponibilizados a todos os workers. Para acessar os recursos listados, use os métodos de procura padrão do Java no código. Atenção: especificar um caminho de diretório está abaixo do ideal, já que o Dataflow compactará os arquivos antes de fazer o upload, o que envolve um custo de inicialização mais alto. Além disso, não use essa opção para transferir dados a workers que precisam ser processados pelo pipeline, pois isso é significativamente mais lento do que o uso de APIs Cloud Storage/BigQuery nativas em conjunto com a fonte de dados apropriada do Dataflow. Quando filesToStage está em branco, o Cloud Dataflow infere os arquivos a serem organizados com base no classpath do Java. As considerações e precauções mencionadas na coluna da esquerda também se aplicam aqui: tipos de arquivos a serem listados e como acessá-los a partir do código.
network String A rede do Compute Engine para iniciar instâncias desse serviço para a execução do canal. Saiba como especificar a rede. Se não for definido, o Google Cloud pressupõe que você pretende usar uma rede chamada default.
subnetwork String A sub-rede do Compute Engine para iniciar instâncias desse serviço para execução do canal. Saiba como especificar a sub-rede. O valor padrão é determinado no serviço Dataflow.
usePublicIps boolean Especifica se os workers do Cloud Dataflow usarão endereços IP públicos. Se o valor estiver definido como false, os workers do Dataflow usarão endereços IP particulares para todas as comunicações. Nesse caso, se a opção subnetwork for especificada, a opção network será ignorada. Verifique se o network ou o subnetwork especificado está com o Acesso privado do Google ativado. Se não estiver definido, o valor padrão será true e os workers do Dataflow usarão endereços IP públicos.
enableStreamingEngine boolean Especifica se o Streaming Engine do Dataflow está ativado ou desativado. Se ativado, "true". A ativação do Streaming Engine permite que você execute as etapas do pipeline de streaming no back-end do serviço Dataflow, economizando CPU, memória e recursos de armazenamento de disco permanente. O valor padrão é false. Isso significa que as etapas do pipeline de streaming são executadas inteiramente nas VMs de worker.
createFromSnapshot String Especifica o ID do snapshot a ser usado ao criar um job de streaming. Os snapshots salvam o estado de um pipeline de streaming e permitem iniciar uma nova versão do job a partir desse estado. Para mais informações sobre snapshots, consulte Como usar snapshots. Se não for definido, nenhum snapshot será usado para criar um job.
diskSizeGb int

O tamanho do disco, em gigabytes, a ser usado em cada instância de worker remota do Compute Engine. Se definido, especifique pelo menos 30 GB para a imagem de inicialização do worker e os registros locais.

Para jobs em lote que usam o Dataflow Shuffle, essa opção define o tamanho do disco de inicialização de uma VM de worker. Para jobs em lote que não usam o Dataflow Shuffle, essa opção define o tamanho dos discos usados para armazenar dados aleatórios. O tamanho do disco de inicialização não é afetado.

Para jobs de streaming que usam o Streaming Engine, essa opção define o tamanho dos discos de inicialização. Para jobs de streaming que não usam o Streaming Engine, essa opção define o tamanho de cada disco permanente adicional criado pelo serviço Dataflow. O disco de inicialização não é afetado. Se um job de streaming não usar o Streaming Engine, será possível definir o tamanho do disco de inicialização com o sinalizador experimental streaming_boot_disk_size_gb. Por exemplo, especifique --experiments=streaming_boot_disk_size_gb=80 para criar discos de inicialização de 80 GB.

Defina como 0 para usar o tamanho padrão definido no seu projeto do Cloud Platform.

Se um job em lote usar o Dataflow Shuffle, o padrão será 25 GB. Caso contrário, o padrão será 250 GB.

Se um job de streaming usar o Streaming Engine, o padrão será 30 GB. Caso contrário, o padrão será 400 GB.

Aviso: reduzir o tamanho do disco reduz a E/S aleatória disponível. Jobs com limite aleatório que não usam o Dataflow Shuffle ou o Streaming Engine podem resultar em aumento do tempo de execução e do custo do job.

serviceAccount String Especifica uma conta de serviço do controlador gerenciada pelo usuário, usando o formato my-service-account-name@<project-id>.iam.gserviceaccount.com. Para mais informações, consulte a seção Conta de serviço do controlador, na página de permissões e segurança do Cloud Dataflow. Se não definido, os workers usam a conta de serviço do Compute Engine do seu projeto como a conta de serviço do controlador.
workerDiskType String O tipo de disco permanente a ser usado, especificado por um URL completo do recurso de tipo de disco. Por exemplo, use compute.googleapis.com/projects//zones//diskTypes/pd-ssd para especificar um disco permanente SSD. Para mais informações, consulte a página de referência da API Compute Engine para diskTypes. O valor padrão é determinado no serviço Dataflow.
workerMachineType String

O tipo de máquina do Compute Engine que o Dataflow usa ao iniciar VMs de worker. Você pode usar qualquer uma das famílias de tipos de máquina do Compute Engine disponíveis, assim como usar tipos de máquinas personalizadas.

Para melhores resultados, use tipos de máquina n1. Os tipos de máquina com núcleo compartilhado, como workers da série f1 e g1, não recebem suporte de acordo com o Contrato de nível de serviço do Dataflow.

O Dataflow cobra pelo número de vCPUs e GB de memória nos workers. O faturamento não depende da família de tipos de máquinas.

No serviço Dataflow, o tipo de máquina é escolhido com base no job, caso você não defina essa opção.

Consulte a documentação de referência da API para Java da interface PipelineOptions e as respectivas subinterfaces para uma lista completa de opções de configuração de pipeline.

Python

Campo Tipo Descrição Valor padrão
runner str O PipelineRunner a ser usado. Este campo pode ser DirectRunner ou DataflowRunner. DirectRunner (modo local)
streaming bool Especifica se o modo de streaming está ativado ou desativado. O valor true significa ativado. false
project str O código do projeto do Google Cloud. Isso é necessário se você quiser executar o pipeline usando o serviço gerenciado do Dataflow. Se não está definido, um erro é emitido.
job_name String O nome do job do Dataflow que está sendo executado na lista de jobs do Dataflow e os detalhes do job. O Dataflow gera um nome exclusivo automaticamente.
temp_location str Caminho do Cloud Storage para arquivos temporários. Precisa ser um URL do Cloud Storage válido, começando com gs://. Se não for definido, o padrão será staging_location. Para executar o pipeline na nuvem do Google, é preciso especificar pelo menos um temp_location ou staging_location.
staging_location str Caminho do Cloud Storage para preparar arquivos locais. Precisa ser um URL do Cloud Storage válido, começando com gs://. Se não for definido, será usado um diretório de organização temporário em temp_location. Para executar o pipeline na nuvem do Google, é preciso especificar pelo menos um temp_location ou staging_location.
autoscaling_algorithm str O modo de escalonamento automático do job do Dataflow. Os valores possíveis são THROUGHPUT_BASED para ativar o escalonamento automático ou NONE para desativá-lo. Consulte Recursos de ajuste automático para saber mais sobre o funcionamento do escalonamento automático no serviço gerenciado do Dataflow. O padrão é THROUGHPUT_BASED para todos os jobs em lote do Dataflow e para jobs de streaming que usam o Streaming Engine. O padrão é NONE para jobs de streaming que não usam o Streaming Engine.
num_workers int O número de instâncias do Compute Engine a serem usadas na execução do pipeline. Se não foi especificado, um número adequado de workers é determinado no serviço Dataflow.
max_num_workers int O número máximo de instâncias do Compute Engine a serem disponibilizadas para o canal durante a execução. Observe que esse valor pode ser superior ao número inicial de workers, especificado por num_workers para permitir que o job seja escalonado automaticamente ou não. Se não foi especificado, um número adequado de workers é determinado no serviço Dataflow.
number_of_worker_harness_threads int O número de linhas de execução por uso de worker. Se não for especificado, o serviço do Dataflow determina um número apropriado de linhas de execução por worker. Para usar esse parâmetro, também é necessário usar a sinalização --experiments=use_runner_v2
region str Especifica um endpoint regional para implantar os jobs do Dataflow. Se não for definido, o padrão será us-central1.
worker_region String

Especifica uma região do Compute Engine para iniciar instâncias de worker para executar o pipeline. Essa opção é usada para executar workers em um local diferente do region usado para implantar, gerenciar e monitorar jobs. A zona de worker_region é atribuída automaticamente.

Observação: essa opção não pode ser combinada com worker_zone ou zone.

Se não for definido, usará o valor padrão region.
worker_zone String

Especifica uma zona do Compute Engine para inicialização de instâncias de worker a fim de executar o pipeline. Essa opção é usada para executar workers em um local diferente do region usado para implantar, gerenciar e monitorar jobs.

Observação: essa opção não pode ser combinada com worker_region ou zone.

Se você especificar region ou worker_region, worker_zone usará como padrão uma zona da região correspondente. É possível modificar esse comportamento especificando uma zona diferente.
zone str (Obsoleto) Para o SDK 2.17.0 ou anterior do Apache Beam, isso especifica a zona do Compute Engine para iniciar instâncias de worker para executar o pipeline. Se você especificar region, zone usará como padrão uma zona da região correspondente. É possível modificar esse comportamento especificando uma zona diferente.
dataflow_kms_key str Especifica a chave de criptografia gerenciada pelo cliente (CMEK, na sigla em inglês) usada para criptografar dados em repouso. É possível controlar a chave de criptografia por meio do Cloud KMS. Para usar este recurso, você também precisa especificar temp_location. Se não for especificado, o Dataflow usará a criptografia padrão do Google Cloud em vez de um CMEK.
flexrs_goal str Especifica o Flexible Resource Scheduling (FlexRS) para jobs em lote com escalonamento automático. Afeta os parâmetros num_workers, autoscaling_algorithm, zone, region e machine_type. Para mais informações, consulte a seção de opções de pipeline do FlexRS. Se não for especificado, será utilizado o padrão SPEED_OPTIMIZED, ou seja, o comportamento será igual à omissão desse sinalizador. Para ativar o FlexRS, especifique o valor COST_OPTIMIZED para permitir que o serviço Dataflow escolha outro recurso disponível de melhor custo-benefício.
network str A rede do Compute Engine para iniciar instâncias desse serviço para a execução do canal. Saiba como especificar a rede. Se não for definido, o Google Cloud pressupõe que você pretende usar uma rede chamada default.
subnetwork str A sub-rede do Compute Engine para iniciar instâncias desse serviço para execução do canal. Saiba como especificar a sub-rede. O valor padrão é determinado no serviço Dataflow.
use_public_ips bool Especifica que os workers do Dataflow precisam usar endereços IP públicos. Se o valor estiver definido como false, os workers do Dataflow usarão endereços IP particulares para todas as comunicações. Nesse caso, se a opção subnetwork for especificada, a opção network será ignorada. Verifique se o network ou o subnetwork especificado está com o Acesso privado do Google ativado. Essa opção requer o SDK do Beam para Python. O SDK do Dataflow para Python obsoleto não é compatível. Se não forem definidos, os workers do Dataflow usarão endereços IP públicos.
enable_streaming_engine bool Especifica se o Streaming Engine do Dataflow está ativado ou desativado. Se ativado, "true". A ativação do Streaming Engine permite que você execute as etapas do pipeline de streaming no back-end do serviço Dataflow, economizando CPU, memória e recursos de armazenamento de disco permanente. O valor padrão é false. Isso significa que as etapas do pipeline de streaming são executadas inteiramente nas VMs de worker.
disk_size_gb int

O tamanho do disco, em gigabytes, a ser usado em cada instância de worker remota do Compute Engine. Se definido, especifique pelo menos 30 GB para a imagem de inicialização do worker e os registros locais.

Para jobs em lote que usam o Dataflow Shuffle, essa opção define o tamanho do disco de inicialização de uma VM de worker. Para jobs em lote que não usam o Dataflow Shuffle, essa opção define o tamanho dos discos usados para armazenar dados aleatórios. O tamanho do disco de inicialização não é afetado.

Para jobs de streaming que usam o Streaming Engine, essa opção define o tamanho dos discos de inicialização. Para jobs de streaming que não usam o Streaming Engine, essa opção define o tamanho de cada disco permanente adicional criado pelo serviço Dataflow. O disco de inicialização não é afetado. Se um job de streaming não usar o Streaming Engine, será possível definir o tamanho do disco de inicialização com o sinalizador experimental streaming_boot_disk_size_gb. Por exemplo, especifique --experiments=streaming_boot_disk_size_gb=80 para criar discos de inicialização de 80 GB.

Defina como 0 para usar o tamanho padrão definido no seu projeto do Cloud Platform.

Se um job em lote usar o Dataflow Shuffle, o padrão será 25 GB. Caso contrário, o padrão será 250 GB.

Se um job de streaming usar o Streaming Engine, o padrão será 30 GB. Caso contrário, o padrão será 400 GB.

Aviso: reduzir o tamanho do disco reduz a E/S aleatória disponível. Jobs com limite aleatório que não usam o Dataflow Shuffle ou o Streaming Engine podem resultar em aumento do tempo de execução e do custo do job.

service_account_email str Especifica uma conta de serviço do controlador gerenciada pelo usuário, usando o formato my-service-account-name@<project-id>.iam.gserviceaccount.com. Para mais informações, consulte a seção Conta de serviço do controlador, na página de permissões e segurança do Cloud Dataflow. Se não definido, os workers usam a conta de serviço do Compute Engine do seu projeto como a conta de serviço do controlador.
worker_disk_type str O tipo de disco permanente a ser usado, especificado por um URL completo do recurso de tipo de disco. Por exemplo, use compute.googleapis.com/projects//zones//diskTypes/pd-ssd para especificar um disco permanente SSD. Para mais informações, consulte a página de referência da API Compute Engine para diskTypes. O valor padrão é determinado no serviço Dataflow.
machine_type str

O tipo de máquina do Compute Engine que o Dataflow usa ao iniciar VMs de worker. Você pode usar qualquer uma das famílias de tipos de máquina do Compute Engine disponíveis, assim como usar tipos de máquinas personalizadas.

Para melhores resultados, use tipos de máquina n1. Os tipos de máquina com núcleo compartilhado, como workers da série f1 e g1, não recebem suporte de acordo com o Contrato de nível de serviço do Dataflow.

O Dataflow cobra pelo número de vCPUs e GB de memória nos workers. O faturamento não depende da família de tipos de máquinas.

No serviço Dataflow, o tipo de máquina é escolhido com base no job, caso você não defina essa opção.

Java: SDK 1.x

Como configurar o PipelineOptions para execução local

Em vez de executar o pipeline em recursos de nuvem gerenciados, você tem a opção de fazê-lo localmente. Isso proporciona certas vantagens nos testes, depuração ou execução em pequenos conjuntos de dados. Por exemplo, a execução local remove a dependência do serviço remoto do Dataflow e do Google Cloud Project associado.

Ao usar a execução local, é altamente recomendável executar o pipeline com conjuntos de dados pequenos o suficiente para caber na memória local. É possível criar um conjunto de dados pequeno na memória usando uma transformação Create ou usar uma transformação Read para trabalhar com arquivos locais ou remotos pequenos. Essa é uma maneira fácil e rápida de fazer testes e depuração com menos dependências externas, mas é limitada pela memória disponível no ambiente local.

O exemplo de código a seguir mostra como construir um canal que pode ser executado no ambiente local.

Java: SDK 2.x

  // Create and set our Pipeline Options.
  PipelineOptions options = PipelineOptionsFactory.create();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Observação: no modo local, não é necessário definir o executor porque DirectRunner já é o padrão. No entanto, é preciso incluir explicitamente DirectRunner como uma dependência ou adicioná-lo a classpath.

Python

# Create and set your Pipeline Options.
options = PipelineOptions(flags=argv)
my_options = options.view_as(MyOptions)

with Pipeline(options=options) as pipeline:
  pass  # build your pipeline here.

Observação: no modo local, não é necessário definir o executor porque DirectRunner já é o padrão.

Java: SDK 1.x

Depois de construir o canal, execute-o.

Como definir outras opções de pipeline local

Quando o pipeline é executado localmente, os valores padrão das propriedades em PipelineOptions geralmente são suficientes.

Java: SDK 2.x

Veja os valores padrão de PipelineOptions do Java na referência da API Java. Consulte as informações completas na listagem da classe PipelineOptions (em inglês).

Se seu pipeline usa o Google Cloud, como BigQuery ou Cloud Storage para E/S, talvez seja necessário definir algumas opções de credenciais e projetos do Google Cloud. Nesses casos, use GcpOptions.setProject para definir o ID do projeto do Google Cloud. Talvez também seja necessário definir explicitamente as credenciais. Consulte a classe GcpOptions para os detalhes completos.

Python

Veja os valores padrão de PipelineOptions do Python na referência da API Python. Consulte as informações completas na listagem da classe PipelineOptions (em inglês).

Se seu pipeline usa serviços do Google Cloud, como BigQuery ou Cloud Storage para E/S, talvez seja necessário definir algumas opções de credenciais e projetos do Google Cloud. Nesses casos, use options.view_as(GoogleCloudOptions).project para definir o ID do projeto do Google Cloud. Talvez também seja necessário definir explicitamente as credenciais. Consulte a classe GoogleCloudOptions para ver os detalhes completos.

Java: SDK 1.x