Depois que o pipeline é construído no programa Apache Beam, é necessário executá-lo. A execução do canal ocorre separadamente da execução do programa. Quando o programa Apache Beam constrói o canal, uma série de etapas é gerada no código que você escreveu para processamento em um executor do canal. O executor do pipeline pode ser o serviço gerenciado do Dataflow no Google Cloud, um serviço de terceiros ou um executor de pipeline local que execute as etapas diretamente no ambiente local.
É possível especificar o executor de pipeline e outras opções de execução usando a classe PipelineOptions
do SDK do Apache Beam. Use PipelineOptions
para configurar como e onde o pipeline será executado e quais recursos ele utilizará.
Na maioria das vezes, você quer que o pipeline seja executado em recursos gerenciados do Google Cloud usando o serviço de execução do Dataflow. Executar o pipeline com o serviço Dataflow cria um job do Dataflow que usa os recursos do Compute Engine e do Cloud Storage no projeto do Google Cloud.
Também é possível executar o canal localmente. Quando você executa o pipeline localmente, as transformações do pipeline são executadas na mesma máquina em que o programa do Dataflow é executado. A execução local será útil para fins de teste e depuração, especialmente se o canal puder usar conjuntos de dados menores na memória.
Como definir PipelineOptions
Transmita PipelineOptions
ao criar o objeto Pipeline
no
programa do Dataflow. Quando o serviço Dataflow executa o pipeline, ele
envia uma cópia do PipelineOptions
para cada instância de worker.
Java: SDK 2.x
Observação: é possível acessar PipelineOptions
em qualquer instância DoFn
de ParDo
usando o método ProcessContext.getPipelineOptions
.
Python
Esse recurso ainda não está disponível no SDK do Apache Beam para Python.
Java: SDK 1.x
Como definir PipelineOptions a partir de argumentos da linha de comando
Embora seja possível configurar o pipeline criando um objeto PipelineOptions
e definindo os campos diretamente, os SDKs do Apache Beam incluem um analisador de linha de comando que pode ser usado para definir campos em PipelineOptions
usando argumentos de linha de comando.
Java: SDK 2.x
Para ler as opções na linha de comando, construa o objeto PipelineOptions
usando o método PipelineOptionsFactory.fromArgs
, conforme mostrado no seguinte código de exemplo:
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Observação: anexar o método .withValidation
faz com que o
Dataflow verifique os argumentos obrigatórios da linha de comando e valide os valores dos argumentos.
O uso de PipelineOptionsFactory.fromArgs
interpreta os argumentos de linha de comando que seguem o formato:
--<option>=<value>
Quando você cria PipelineOptions
dessa forma, pode especificar qualquer uma das opções em qualquer subinterface de org.apache.beam.sdk.options.PipelineOptions (em inglês) como um argumento de linha de comando.
Python
Para ler as opções na linha de comando, crie o objeto PipelineOptions
conforme mostrado no seguinte código de exemplo:
from apache_beam.options.pipeline_options import PipelineOptions options = PipelineOptions(flags=argv)
O argumento (flags=argv)
para PipelineOptions
interpreta os argumentos de linha de comando que seguem o formato:
--<option>=<value>
Quando você cria PipelineOptions
dessa maneira, pode especificar qualquer uma das opções como subclasse de PipelineOptions
.
Java: SDK 1.x
Como criar opções personalizadas
Além do PipelineOptions
padrão, é possível adicionar suas próprias opções personalizadas.
Com o analisador de linha de comando do Dataflow, também é possível configurar essas opções usando argumentos de
linha de comando especificados no mesmo formato.
Java: SDK 2.x
Para adicionar suas próprias opções, defina uma interface com os métodos getter e setter para cada uma delas, como no exemplo a seguir:
public interface MyOptions extends PipelineOptions { String getMyCustomOption(); void setMyCustomOption(String myCustomOption); }
Python
Para adicionar suas próprias opções, use o método add_argument()
, que se comporta da mesma forma que o módulo argparse (em inglês) padrão do Python, como no exemplo a seguir:
from apache_beam.options.pipeline_options import PipelineOptions class MyOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument('--input') parser.add_argument('--output')
Java: SDK 1.x
Além de um valor padrão, também é possível especificar uma descrição, que aparece quando um usuário transmite --help
como um argumento de linha de comando.
Java: SDK 2.x
Defina a descrição e o valor padrão usando anotações, da seguinte maneira:
public interface MyOptions extends PipelineOptions { @Description("My custom command line argument.") @Default.String("DEFAULT") String getMyCustomOption(); void setMyCustomOption(String myCustomOption); }
Recomendamos que você registre sua interface em PipelineOptionsFactory
e a transmita ao criar o objeto PipelineOptions
. Se você fizer o registro da interface em PipelineOptionsFactory
, o --help
poderá encontrar sua interface de opções personalizadas e a adicionar à saída do comando --help
.
PipelineOptionsFactory
também confirmará que as suas opções personalizadas são compatíveis com todas as outras opções registradas.
No código de exemplo a seguir, há uma demonstração de como registrar a interface de opções personalizadas em PipelineOptionsFactory
:
PipelineOptionsFactory.register(MyOptions.class); MyOptions options = PipelineOptionsFactory.fromArgs(args) .withValidation() .as(MyOptions.class);
Agora seu pipeline pode aceitar --myCustomOption=value
como um argumento de linha de comando.
Python
Defina a descrição e o valor padrão da seguinte maneira:
from apache_beam.options.pipeline_options import PipelineOptions class MyOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--input', help='Input for the pipeline', default='gs://my-bucket/input') parser.add_argument( '--output', help='Output for the pipeline', default='gs://my-bucket/output')
Java: SDK 1.x
Como configurar PipelineOptions para execução no serviço do Cloud Dataflow
Para executar o pipeline usando o serviço gerenciado do Dataflow,
você precisa definir os seguintes campos em
PipelineOptions
:
Java: SDK 2.x
project
: o ID do seu projeto do Google Cloud.runner
: o executor de pipeline que analisará o programa e construirá o pipeline. Para a execução na nuvem, precisa serDataflowRunner
.gcpTempLocation
: um caminho do Cloud Storage para o Dataflow organizar qualquer arquivo temporário. É preciso criar esse bucket antes de executar o pipeline. Se você não especificargcpTempLocation
, especifique a opção de pipelinetempLocation
e, em seguida,gcpTempLocation
será definido como o valortempLocation
. Caso nenhum deles seja especificado, será criado umgcpTempLocation
padrão.stagingLocation
: um bucket do Cloud Storage para o Dataflow preparar os arquivos binários. Caso esta opção não seja definida, o que você especificou paratempLocation
será usado também para o local de preparo.
Se nem ele nem
tempLocation
for especificado, será criado um gcpTempLocation
padrão. Se tempLocation
for especificado, mas gcpTempLocation
não for, tempLocation
precisará ser um caminho do Cloud Storage e gcpTempLocation
será definido como o padrão dele. Se tempLocation
não for especificado, mas gcpTempLocation
for, tempLocation
não será preenchido.
Observação: se você usar o SDK do Apache Beam para Java 2.15.0 ou mais recente, também precisará especificar
region
.
Python
project
: o ID do seu projeto do Google Cloud.runner
: o executor de pipeline que analisará o programa e construirá o pipeline. Para a execução na nuvem, precisa serDataflowRunner
.staging_location
: um caminho do Cloud Storage para o Dataflow preparar os pacotes de códigos necessários para os workers que executam o job.temp_location
: um caminho do Cloud Storage para o Dataflow organizar os arquivos de job temporários criados durante a execução do pipeline.
Observação: se você usar o SDK do Apache Beam para Python 2.15.0 ou mais recente, também precisará especificar
region
.
Java: SDK 1.x
Configure essas opções por programação ou especifique-as usando a linha de comando. O código de exemplo a seguir mostra como criar um pipeline configurando de maneira programática o executor, assim como outras opções necessárias para executar o pipeline usando o serviço gerenciado do Dataflow.
Java: SDK 2.x
// Create and set your PipelineOptions. DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); // For Cloud execution, set the Cloud Platform project, staging location, // and specify DataflowRunner. options.setProject("my-project-id"); options.setStagingLocation("gs://my-bucket/binaries"); options.setRunner(DataflowRunner.class); // Create the Pipeline with the specified options. Pipeline p = Pipeline.create(options);
Python
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions # Create and set your PipelineOptions. # For Cloud execution, specify DataflowRunner and set the Cloud Platform # project, job name, temporary files location, and region. # For more information about regions, check: # https://cloud.google.com/dataflow/docs/concepts/regional-endpoints options = PipelineOptions( flags=argv, runner='DataflowRunner', project='my-project-id', job_name='unique-job-name', temp_location='gs://my-bucket/temp', region='us-central1') # Create the Pipeline with the specified options. # with beam.Pipeline(options=options) as pipeline: # pass # build your pipeline here.
Java: SDK 1.x
Depois de construir o canal, especifique todas as leituras, transformações e gravações dele e execute-o.
No código a seguir, você tem um exemplo de como criar as opções necessárias para a execução do serviço Dataflow usando a linha de comando:
Java: SDK 2.x
// Create and set your PipelineOptions. MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation(); // Create the Pipeline with the specified options. Pipeline p = Pipeline.create(options);
Python
# Use Python argparse module to parse custom arguments import argparse import apache_beam as beam parser = argparse.ArgumentParser() parser.add_argument('--input') parser.add_argument('--output') args, beam_args = parser.parse_known_args(argv) # Create the Pipeline with remaining arguments. with beam.Pipeline(argv=beam_args) as pipeline: lines = pipeline | 'Read files' >> beam.io.ReadFromText(args.input) lines | 'Write files' >> beam.io.WriteToText(args.output)
Java: SDK 1.x
Depois de construir o canal, especifique todas as leituras, transformações e gravações dele e execute-o.
Java: SDK 2.x
Ao transmitir as opções obrigatórias na linha de comando, use as opções --project
, --runner
, --gcpTempLocation
e, como alternativa, --stagingLocation
.
Python
Ao transmitir as opções obrigatórias na linha de comando, use as opções --project
, --runner
e --staging_location
.
Java: SDK 1.x
Execução assíncrona
Java: SDK 2.x
O uso de DataflowRunner
faz com que o pipeline seja executado de forma assíncrona na nuvem do Google. Durante a execução do canal, monitore o andamento do job, veja detalhes da
execução e receba atualizações sobre os resultados do canal usando a
interface de monitoramento do Dataflow ou
a interface de linha de comando do
Dataflow.
Python
O uso de DataflowRunner
faz com que o pipeline seja executado de forma assíncrona na nuvem do Google. Durante a execução do canal, monitore o andamento do job, veja detalhes da
execução e receba atualizações sobre os resultados do canal usando a
interface de monitoramento do Dataflow ou
a interface de linha de comando do
Dataflow.
Java: SDK 1.x
Execução síncrona
Java: SDK 2.x
Especifique DataflowRunner
como o executor de pipeline e chame pipeline.run().waitUntilFinish()
explicitamente.
Quando você usa DataflowRunner
e chama waitUntilFinish()
no objeto
PipelineResult
retornado de pipeline.run()
, o
pipeline é executado na nuvem, mas o código local aguarda a conclusão do job de nuvem e, em seguida, retorna
o objeto DataflowPipelineJob
final.
Enquanto o job é executado, o serviço Dataflow imprime atualizações de status do job e mensagens do
console.
Se você foi um usuário do Java SDK 1.x e usou --runner
BlockingDataflowPipelineRunner
na linha de comando para induzir interativamente seu programa principal a bloquear até o pipeline ser encerrado, com o Java 2.x esse programa precisa chamar explicitamente waitUntilFinish()
.
Python
Para bloquear até a conclusão do pipeline, use o método wait_until_finish()
do objeto PipelineResult
, retornado do método run()
do executor.
Java: SDK 1.x
Observação: se você digitar Ctrl+C
na linha de comando, o job não será cancelado. O serviço do Dataflow ainda está executando o job no Google Cloud. Para
cancelar o job, você precisará usar a Interface
de monitoramento do Dataflow ou a Interface
de linha de comando do Dataflow.
Execução em streaming
Java: SDK 2.x
Se o pipeline fizer leituras em uma fonte de dados sem limites, como o Cloud Pub/Sub, ele será executado automaticamente no modo de streaming.
Se o pipeline usar coletores e fontes de dados sem limites, será necessário escolher uma estratégia de gestão de janelas para as PCollections ilimitadas antes de usar qualquer agregação, como GroupByKey
.
Python
Se o pipeline usar um coletor ou fonte de dados ilimitados, como o Pub/Sub, você precisará definir a opção
streaming
como "true".
Por padrão, os jobs de streaming usam n1-standard-2
ou superior como tipo de máquina do Compute Engine. Não modifique esse valor porque n1-standard-2
é o tipo de máquina mínimo exigido para executar jobs de streaming.
Se o pipeline usar fontes de dados e coletores ilimitados, será necessário escolher uma estratégia de gestão de janelas para as PCollections ilimitadas antes de usar qualquer agregação, como GroupByKey
.
Java: SDK 1.x
Como definir outras opções de pipeline do Cloud Dataflow
Para executar o pipeline na nuvem, defina de maneira programática os seguintes campos no objeto PipelineOptions
:
Java: SDK 2.x
Campo | Tipo | Descrição | Valor padrão |
---|---|---|---|
runner |
Class (NameOfRunner) |
O PipelineRunner a ser usado. Esse campo permite determinar PipelineRunner no ambiente de execução. |
DirectRunner (modo local) |
streaming |
boolean |
Especifica se o modo de streaming está ativado ou desativado. O valor true significa ativado. | Se o pipeline fizer leituras em uma fonte ilimitada, o valor padrão será true .
Caso contrário, false . |
project |
String |
O código do projeto do Google Cloud. Isso é necessário se você quiser executar o pipeline usando o serviço gerenciado do Dataflow. | Por padrão, se não tiver sido definido, o projeto atualmente configurado no Cloud SDK será usado. |
jobName |
String |
O nome do job do Dataflow que está sendo executado na lista de jobs do Dataflow e os detalhes do job. Também usado ao atualizar um pipeline existente. | O Dataflow gera um nome exclusivo automaticamente. |
gcpTempLocation |
String |
Caminho do Cloud Storage para arquivos temporários. Precisa ser um URL do Cloud Storage válido, começando com gs:// . |
|
stagingLocation |
String |
Caminho do Cloud Storage para preparar arquivos locais. Precisa ser um URL do Cloud Storage válido, começando com gs:// . |
Se não for definido, será usado como padrão o que você especificou para tempLocation . |
autoscalingAlgorithm |
String
| O modo de escalonamento automático do job do Dataflow. Os valores possíveis são THROUGHPUT_BASED para ativar o escalonamento automático ou NONE para desativá-lo. Consulte
Recursos de ajuste automático para saber
mais sobre o funcionamento do escalonamento automático no serviço gerenciado do Dataflow. |
O padrão é THROUGHPUT_BASED para todos os jobs em lote do Dataflow e para
jobs de streaming que usam o Streaming Engine.
O padrão é NONE para jobs de streaming que não usam o Streaming Engine. |
numWorkers |
int |
O número inicial de instâncias do Google Compute Engine a serem usadas na execução do canal. Essa opção determina quantos trabalhadores são iniciados no serviço Dataflow quando o job começa. | Se não for especificado, o serviço Dataflow determinará um número adequado de workers. |
maxNumWorkers |
int |
O número máximo de instâncias do Compute Engine a serem disponibilizadas para o canal durante a execução. Observe que esse valor pode ser superior ao número inicial de workers, especificado por numWorkers para permitir que o job seja escalonado automaticamente ou não. |
Se não foi especificado, um número adequado de workers é determinado no serviço Dataflow. |
numberOfWorkerHarnessThreads |
int |
O número de linhas de execução por uso de worker. | Se não for especificado, o serviço do Dataflow determina um número apropriado de linhas de execução por worker. |
region |
String |
Especifica um endpoint regional para implantar os jobs do Dataflow. | Se não for definido, o padrão será us-central1 . |
workerRegion |
String |
Especifica uma região do Compute Engine para iniciar instâncias de worker para executar o pipeline. Essa opção é usada para executar workers em um local diferente do Observação: essa opção não pode ser combinada com |
Se não for definido, usará o valor padrão region . |
workerZone |
String |
Especifica uma zona do Compute Engine para inicialização de instâncias de worker a fim de executar o pipeline. Essa opção é usada para executar workers em um local diferente do Observação: essa opção não pode ser combinada com |
Se você especificar region ou workerRegion , workerZone
usará como padrão uma zona da região correspondente. É possível modificar esse comportamento
especificando uma zona diferente. |
zone |
String |
(Obsoleto) Para o SDK 2.17.0 ou anterior do Apache Beam, isso especifica a zona do Compute Engine para iniciar instâncias de worker para executar o pipeline. | Se você especificar region , zone
usará como padrão uma zona da região correspondente. É possível modificar esse comportamento especificando uma zona diferente. |
dataflowKmsKey |
String |
Especifica a chave de criptografia gerenciada pelo cliente (CMEK, na sigla em inglês) usada para criptografar dados em repouso. É possível controlar a chave de criptografia por meio do Cloud KMS.
Para usar este recurso, você também precisa especificar gcpTempLocation . |
Se não for especificado, o Dataflow usará a criptografia padrão do Google Cloud em vez de um CMEK. |
flexRSGoal |
String |
Especifica o Flexible Resource Scheduling (FlexRS) para jobs em lote com escalonamento automático. Afeta os parâmetros numWorkers , autoscalingAlgorithm , zone , region e workerMachineType . Para mais informações, consulte a seção de opções de pipeline do FlexRS. |
Se não for especificado, será utilizado o padrão SPEED_OPTIMIZED, ou seja, o comportamento será igual à omissão desse sinalizador. Para ativar o FlexRS, especifique o valor COST_OPTIMIZED para permitir que o serviço Dataflow escolha outro recurso disponível de melhor custo-benefício. |
filesToStage |
List<String> |
Uma lista não vazia de arquivos locais, diretórios ou arquivos (como JAR ou zip
) a serem disponibilizados para cada worker. Quando essa opção é definida, apenas o upload dos arquivos especificados é feito, e o classpath do Java é ignorado. Especifique todos os recursos na ordem correta do classpath. Esses recursos não se limitam ao código. Eles também podem incluir arquivos de configuração e outros recursos que precisam ser disponibilizados a todos os workers. Para acessar os recursos listados, use os métodos de procura padrão do Java no código. Atenção: especificar um caminho de diretório está abaixo do ideal, já que o
Dataflow compactará os arquivos antes de fazer o upload, o que envolve um custo de inicialização mais
alto. Além disso, não use essa opção para transferir dados a workers que precisam ser processados
pelo pipeline, pois isso é significativamente mais lento do que o uso de APIs
Cloud Storage/BigQuery nativas em conjunto com a fonte de dados apropriada do
Dataflow.
|
Se filesToStage é omitido, o Dataflow infere os arquivos a serem organizados com base
no classpath do Java. As considerações e precauções mencionadas na coluna da esquerda também se aplicam aqui: tipos de arquivos a serem listados e como acessá-los a partir do código.
|
network |
String |
A rede do Compute Engine para iniciar instâncias desse serviço para a execução do canal. Saiba como especificar a rede. | Se não for definido, o Google Cloud pressupõe que você pretende usar uma rede chamada default . |
subnetwork |
String |
A sub-rede do Compute Engine para iniciar instâncias desse serviço para execução do canal. Saiba como especificar a sub-rede. | O valor padrão é determinado no serviço Dataflow. |
usePublicIps |
boolean |
Especifica se os workers do Cloud Dataflow usarão
endereços IP públicos.
Se o valor estiver definido como false , os workers do Dataflow usarão
endereços IP particulares para todas as comunicações. Nesse caso, se a opção subnetwork for especificada, a opção network será ignorada. Verifique se o network ou o subnetwork especificado está com o Acesso privado do Google ativado. |
Se não estiver definido, o valor padrão será true e os workers do Dataflow usarão
endereços IP públicos. |
enableStreamingEngine |
boolean |
Especifica se o Streaming Engine do Dataflow está ativado ou desativado. Se ativado, "true". A ativação do Streaming Engine permite que você execute as etapas do pipeline de streaming no back-end do serviço Dataflow, economizando CPU, memória e recursos de armazenamento de disco permanente. | O valor padrão é false. Isso significa que as etapas do pipeline de streaming são executadas inteiramente nas VMs de worker. |
createFromSnapshot |
String |
Especifica o ID do snapshot a ser usado ao criar um job de streaming. Os snapshots salvam o estado de um pipeline de streaming e permitem iniciar uma nova versão do job a partir desse estado. Para mais informações sobre snapshots, consulte Como usar snapshots. | Se não for definido, nenhum snapshot será usado para criar um job. |
hotKeyLoggingEnabled |
boolean |
Especifica que, quando uma chave com uso intenso é detectada no pipeline, ela é impressa no projeto do Cloud Logging do usuário. | Se não for definido, somente a presença de uma chave com uso intenso será registrada. |
diskSizeGb |
int |
O tamanho do disco, em gigabytes, a ser usado em cada instância de worker remota do Compute Engine. Se definido, especifique pelo menos 30 GB para incluir a imagem de inicialização e os registros locais do worker. Para jobs em lote que usam o Dataflow Shuffle, essa opção define o tamanho do disco de inicialização de uma VM de worker. Para jobs em lote que não usam o Dataflow Shuffle, essa opção define o tamanho dos discos usados para armazenar dados aleatórios. O tamanho do disco de inicialização não é afetado.
Para jobs de streaming que usam o
Streaming Engine,
essa opção define o tamanho dos discos de inicialização. Para jobs de streaming que não usam o
Streaming Engine, essa opção define o tamanho de cada disco permanente adicional criado pelo
serviço Dataflow. O disco de inicialização não é afetado.
Se um job de streaming não usar o Streaming Engine, será possível definir o tamanho do disco de inicialização com o
sinalizador experimental |
Defina como Se um job em lote usar o Dataflow Shuffle, o padrão será 25 GB. Caso contrário, o padrão será 250 GB. Se um job de streaming usar o Streaming Engine, o padrão será 30 GB. Caso contrário, o padrão será 400 GB. Aviso: reduzir o tamanho do disco reduz a E/S aleatória disponível. Jobs com limite aleatório que não usam o Dataflow Shuffle ou o Streaming Engine podem resultar em aumento do tempo de execução e do custo do job. |
serviceAccount |
String |
Especifica uma conta de serviço do controlador gerenciada pelo usuário, usando o formato my-service-account-name@<project-id>.iam.gserviceaccount.com . Para mais informações, consulte a seção Conta de serviço do controlador, na página de permissões e segurança do Cloud Dataflow.
|
Se não definido, os workers usam a conta de serviço do Compute Engine do seu projeto como a conta de serviço do controlador. |
workerDiskType |
String |
O tipo de disco permanente a ser usado, especificado por um URL completo do recurso de tipo de disco. Por exemplo, use compute.googleapis.com/projects//zones//diskTypes/pd-ssd para especificar um disco permanente SSD. Para mais informações, consulte a página de referência da API Compute Engine para diskTypes. |
O valor padrão é determinado no serviço Dataflow. |
workerMachineType |
String |
O tipo de máquina do Compute Engine que o Dataflow usa ao iniciar VMs de worker. Você pode usar qualquer uma das famílias de tipos de máquina do Compute Engine disponíveis, assim como usar tipos de máquinas personalizadas. Para melhores resultados, use tipos de máquina O Dataflow cobra pelo número de vCPUs e GB de memória nos workers. O faturamento não depende da família de tipos de máquinas. |
No serviço Dataflow, o tipo de máquina é escolhido com base no job, caso você não defina essa opção. |
Consulte a documentação de referência da API para Java da interface PipelineOptions e as respectivas subinterfaces para uma lista completa de opções de configuração de pipeline.
Python
Campo | Tipo | Descrição | Valor padrão |
---|---|---|---|
runner |
str |
O PipelineRunner a ser usado. Este campo pode ser DirectRunner ou DataflowRunner . |
DirectRunner (modo local) |
streaming |
bool |
Especifica se o modo de streaming está ativado ou desativado. O valor true significa ativado. | false |
project |
str |
O código do projeto do Google Cloud. Isso é necessário se você quiser executar o pipeline usando o serviço gerenciado do Dataflow. | Se não está definido, um erro é emitido. |
job_name |
String |
O nome do job do Dataflow que está sendo executado na lista de jobs do Dataflow e os detalhes do job. | O Dataflow gera um nome exclusivo automaticamente. |
temp_location |
str |
Caminho do Cloud Storage para arquivos temporários. Precisa ser um URL do Cloud Storage válido, começando com gs:// . |
Se não for definido, o padrão será staging_location . Para executar o pipeline na nuvem do Google, é preciso especificar pelo menos um temp_location ou staging_location . |
staging_location |
str |
Caminho do Cloud Storage para preparar arquivos locais. Precisa ser um URL do Cloud Storage válido, começando com gs:// . |
Se não for definido, será usado um diretório de organização temporário em temp_location . Para executar o pipeline na nuvem do Google, é preciso especificar pelo menos um temp_location ou staging_location . |
autoscaling_algorithm |
str
| O modo de escalonamento automático do job do Dataflow. Os valores possíveis são THROUGHPUT_BASED para ativar o escalonamento automático ou NONE para desativá-lo. Consulte
Recursos de ajuste automático para saber
mais sobre o funcionamento do escalonamento automático no serviço gerenciado do Dataflow. |
O padrão é THROUGHPUT_BASED para todos os jobs em lote do Dataflow e para
jobs de streaming que usam o Streaming Engine.
O padrão é NONE para jobs de streaming que não usam o Streaming Engine. |
num_workers |
int |
O número de instâncias do Compute Engine a serem usadas na execução do pipeline. | Se não foi especificado, um número adequado de workers é determinado no serviço Dataflow. |
max_num_workers |
int |
O número máximo de instâncias do Compute Engine a serem disponibilizadas para o canal durante a execução. Observe que esse valor pode ser superior ao número inicial de workers, especificado por num_workers para permitir que o job seja escalonado automaticamente ou não. |
Se não foi especificado, um número adequado de workers é determinado no serviço Dataflow. |
number_of_worker_harness_threads |
int |
O número de linhas de execução por uso de worker. | Se não for especificado, o serviço do Dataflow determina um número apropriado de linhas de execução por worker. Para usar esse parâmetro, também é necessário usar a sinalização --experiments=use_runner_v2 |
region |
str |
Especifica um endpoint regional para implantar os jobs do Dataflow. | Se não for definido, o padrão será us-central1 . |
worker_region |
String |
Especifica uma região do Compute Engine para iniciar instâncias de worker para executar o pipeline. Essa opção é usada para executar workers em um local diferente do Observação: essa opção não pode ser combinada com |
Se não for definido, usará o valor padrão region . |
worker_zone |
String |
Especifica uma zona do Compute Engine para inicialização de instâncias de worker a fim de executar o pipeline. Essa opção é usada para executar workers em um local diferente do Observação: essa opção não pode ser combinada com |
Se você especificar region ou worker_region , worker_zone
usará como padrão uma zona da região correspondente. É possível modificar esse comportamento
especificando uma zona diferente. |
zone |
str |
(Obsoleto) Para o SDK 2.17.0 ou anterior do Apache Beam, isso especifica a zona do Compute Engine para iniciar instâncias de worker para executar o pipeline. | Se você especificar region , zone
usará como padrão uma zona da região correspondente. É possível modificar esse comportamento especificando uma zona diferente. |
dataflow_kms_key |
str |
Especifica a chave de criptografia gerenciada pelo cliente (CMEK, na sigla em inglês) usada para criptografar dados em repouso. É possível controlar a chave de criptografia por meio do Cloud KMS.
Para usar este recurso, você também precisa especificar temp_location . |
Se não for especificado, o Dataflow usará a criptografia padrão do Google Cloud em vez de um CMEK. |
flexrs_goal |
str |
Especifica o Flexible Resource Scheduling (FlexRS) para jobs em lote com escalonamento automático. Afeta os parâmetros num_workers , autoscaling_algorithm , zone , region e machine_type . Para mais informações, consulte a seção de opções de pipeline do FlexRS. |
Se não for especificado, será utilizado o padrão SPEED_OPTIMIZED, ou seja, o comportamento será igual à omissão desse sinalizador. Para ativar o FlexRS, especifique o valor COST_OPTIMIZED para permitir que o serviço Dataflow escolha outro recurso disponível de melhor custo-benefício. |
network |
str |
A rede do Compute Engine para iniciar instâncias desse serviço para a execução do canal. Saiba como especificar a rede. | Se não for definido, o Google Cloud pressupõe que você pretende usar uma rede chamada default . |
subnetwork |
str |
A sub-rede do Compute Engine para iniciar instâncias desse serviço para execução do canal. Saiba como especificar a sub-rede. | O valor padrão é determinado no serviço Dataflow. |
use_public_ips |
bool |
Especifica que os workers do Dataflow precisam usar
endereços IP públicos.
Se o valor estiver definido como false , os workers do Dataflow usarão
endereços IP particulares para todas as comunicações. Nesse caso, se a opção subnetwork for especificada, a opção network será ignorada. Verifique se o network ou o subnetwork especificado está com o Acesso privado do Google ativado. Essa opção requer o SDK do Beam para Python.
O SDK do Dataflow para Python obsoleto não é compatível. |
Se não forem definidos, os workers do Dataflow usarão endereços IP públicos. |
enable_streaming_engine |
bool |
Especifica se o Streaming Engine do Dataflow está ativado ou desativado. Se ativado, "true". A ativação do Streaming Engine permite que você execute as etapas do pipeline de streaming no back-end do serviço Dataflow, economizando CPU, memória e recursos de armazenamento de disco permanente. | O valor padrão é false. Isso significa que as etapas do pipeline de streaming são executadas inteiramente nas VMs de worker. |
disk_size_gb |
int |
O tamanho do disco, em gigabytes, a ser usado em cada instância de worker remota do Compute Engine. Se definido, especifique pelo menos 30 GB para a imagem de inicialização do worker e os registros locais. Para jobs em lote que usam o Dataflow Shuffle, essa opção define o tamanho do disco de inicialização de uma VM de worker. Para jobs em lote que não usam o Dataflow Shuffle, essa opção define o tamanho dos discos usados para armazenar dados aleatórios. O tamanho do disco de inicialização não é afetado.
Para jobs de streaming que usam o
Streaming Engine,
essa opção define o tamanho dos discos de inicialização. Para jobs de streaming que não usam o
Streaming Engine, essa opção define o tamanho de cada disco permanente adicional criado pelo
serviço Dataflow. O disco de inicialização não é afetado.
Se um job de streaming não usar o Streaming Engine, será possível definir o tamanho do disco de inicialização com o
sinalizador experimental |
Defina como Se um job em lote usar o Dataflow Shuffle, o padrão será 25 GB. Caso contrário, o padrão será 250 GB. Se um job de streaming usar o Streaming Engine, o padrão será 30 GB. Caso contrário, o padrão será 400 GB. Aviso: reduzir o tamanho do disco reduz a E/S aleatória disponível. Jobs com limite aleatório que não usam o Dataflow Shuffle ou o Streaming Engine podem resultar em aumento do tempo de execução e do custo do job. |
service_account_email |
str |
Especifica uma conta de serviço do controlador gerenciada pelo usuário, usando o formato my-service-account-name@<project-id>.iam.gserviceaccount.com . Para mais informações, consulte a seção Conta de serviço do controlador, na página de permissões e segurança do Cloud Dataflow.
|
Se não definido, os workers usam a conta de serviço do Compute Engine do seu projeto como a conta de serviço do controlador. |
worker_disk_type |
str |
O tipo de disco permanente a ser usado, especificado por um URL completo do recurso de tipo de disco. Por exemplo, use compute.googleapis.com/projects//zones//diskTypes/pd-ssd para especificar um disco permanente SSD. Para mais informações, consulte a página de referência da API Compute Engine para diskTypes. |
O valor padrão é determinado no serviço Dataflow. |
machine_type |
str |
O tipo de máquina do Compute Engine que o Dataflow usa ao iniciar VMs de worker. Você pode usar qualquer uma das famílias de tipos de máquina do Compute Engine disponíveis, assim como usar tipos de máquinas personalizadas. Para melhores resultados, use tipos de máquina O Dataflow cobra pelo número de vCPUs e GB de memória nos workers. O faturamento não depende da família de tipos de máquinas. |
No serviço Dataflow, o tipo de máquina é escolhido com base no job, caso você não defina essa opção. |
Java: SDK 1.x
Como configurar o PipelineOptions para execução local
Em vez de executar o pipeline em recursos de nuvem gerenciados, você tem a opção de fazê-lo localmente. Isso proporciona certas vantagens nos testes, depuração ou execução em pequenos conjuntos de dados. Por exemplo, a execução local remove a dependência do serviço remoto do Dataflow e do Google Cloud Project associado.
Ao usar a execução local, é altamente recomendável executar o pipeline com conjuntos de dados pequenos o suficiente para caber na memória local. É possível criar um conjunto de dados pequeno na memória usando uma transformação Create
ou usar uma transformação Read
para trabalhar com arquivos locais ou remotos pequenos. Essa é uma maneira fácil e rápida de fazer testes e depuração com menos dependências externas, mas é limitada pela memória disponível no ambiente local.
O exemplo de código a seguir mostra como construir um canal que pode ser executado no ambiente local.
Java: SDK 2.x
// Create and set our Pipeline Options. PipelineOptions options = PipelineOptionsFactory.create(); // Create the Pipeline with the specified options. Pipeline p = Pipeline.create(options);
Observação: no modo local, não é necessário definir o executor porque DirectRunner
já é o padrão. No entanto, é preciso incluir explicitamente DirectRunner
como uma dependência ou adicioná-lo a classpath.
Python
# Create and set your Pipeline Options. options = PipelineOptions(flags=argv) my_options = options.view_as(MyOptions) with Pipeline(options=options) as pipeline: pass # build your pipeline here.
Observação: no modo local, não é necessário definir o executor porque DirectRunner
já é o padrão.
Java: SDK 1.x
Depois de construir o canal, execute-o.
Como definir outras opções de pipeline local
Quando o pipeline é executado localmente, os valores padrão das propriedades em PipelineOptions
geralmente são suficientes.
Java: SDK 2.x
Veja os valores padrão de PipelineOptions
do Java na referência da API Java. Consulte as informações completas na listagem da classe PipelineOptions (em inglês).
Se seu pipeline usa o Google Cloud, como BigQuery ou Cloud Storage para E/S, talvez
seja necessário definir algumas opções de credenciais e projetos do Google Cloud. Nesses casos, use GcpOptions.setProject
para definir o ID do projeto do Google Cloud. Talvez também seja necessário definir explicitamente as credenciais. Consulte a classe GcpOptions para os detalhes completos.
Python
Veja os valores padrão de PipelineOptions
do Python na referência da API Python. Consulte as informações completas na listagem da classe PipelineOptions (em inglês).
Se seu pipeline usa serviços do Google Cloud, como BigQuery ou
Cloud Storage para E/S, talvez seja necessário definir algumas opções de credenciais e
projetos do Google Cloud. Nesses casos, use options.view_as(GoogleCloudOptions).project
para definir o ID do projeto do Google Cloud. Talvez também seja necessário definir explicitamente as credenciais. Consulte a classe GoogleCloudOptions para ver os detalhes completos.
Java: SDK 1.x