Esta página explica como definir as opções de pipeline para os seus trabalhos do Dataflow. Estas opções de pipeline configuram como e onde o seu pipeline é executado e que recursos usa.
A execução do pipeline é separada da execução do programa Apache Beam. O programa Apache Beam que escreveu cria um pipeline para execução diferida. Isto significa que o programa gera uma série de passos que qualquer executor do Apache Beam suportado pode executar. Os executores compatíveis incluem o executor do Dataflow no Google Cloud e o executor direto que executa o pipeline diretamente num ambiente local.
Pode transmitir parâmetros para uma tarefa do Dataflow no momento da execução. Para mais informações sobre como definir opções de pipeline em tempo de execução, consulte o artigo Configurar opções de pipeline.
Use opções de pipeline com SDKs do Apache Beam
Pode usar os seguintes SDKs para definir opções de pipeline para tarefas do Dataflow:
- SDK Apache Beam para Python
- SDK Apache Beam para Java
- SDK do Apache Beam para Go
Para usar os SDKs, define o executor do pipeline e outros parâmetros de execução através da classe PipelineOptions
do SDK Apache Beam.
Existem dois métodos para especificar opções de pipeline:
- Defina as opções do pipeline de forma programática, fornecendo uma lista de opções do pipeline.
- Defina as opções do pipeline diretamente na linha de comandos quando executar o código do pipeline.
Defina opções de pipelines de forma programática
Pode definir opções de pipeline de forma programática criando e modificando um objeto PipelineOptions
.
Java
Construa um objeto PipelineOptions
usando o método PipelineOptionsFactory.fromArgs
.
Para ver um exemplo, consulte a secção Inicie no exemplo do Dataflow nesta página.
Python
Crie um objeto
PipelineOptions
.
Para ver um exemplo, consulte a secção Inicie no exemplo do Dataflow nesta página.
Go
A definição de opções de pipeline de forma programática através de PipelineOptions
não é suportada no SDK do Apache Beam para Go. Use argumentos da linha de comandos do Go.
Para ver um exemplo, consulte a secção Inicie no exemplo do Dataflow nesta página.
Defina opções de pipeline na linha de comandos
Pode definir opções de pipeline através de argumentos da linha de comandos.
Java
A sintaxe de exemplo seguinte é do pipeline WordCount
no tutorial de Java.
mvn -Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=PROJECT_ID \
--gcpTempLocation=gs://BUCKET_NAME/temp/ \
--output=gs://BUCKET_NAME/output \
--runner=DataflowRunner \
--region=REGION"
Substitua o seguinte:
PROJECT_ID
: o ID do seu Google Cloud projetoBUCKET_NAME
: o nome do seu contentor do Cloud StorageREGION
: a região do Dataflow>,us-central1
Python
A sintaxe do exemplo seguinte é da pipeline WordCount
no tutorial do Python.
python -m apache_beam.examples.wordcount \
--region DATAFLOW_REGION \
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://STORAGE_BUCKET/results/outputs \
--runner DataflowRunner \
--project PROJECT_ID \
--temp_location gs://STORAGE_BUCKET/tmp/
Substitua o seguinte:
DATAFLOW_REGION
: a região onde quer implementar a tarefa do Dataflow, por exemplo,europe-west1
A flag
--region
substitui a região predefinida definida no servidor de metadados, no cliente local ou nas variáveis de ambiente.STORAGE_BUCKET
: o nome do contentor do Cloud StoragePROJECT_ID
: o Google Cloud ID do projeto
Go
A sintaxe de exemplo seguinte é da pipeline WordCount
no tutorial do Go.
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://BUCKET_NAME/results/outputs \
--runner dataflow \
--project PROJECT_ID \
--region DATAFLOW_REGION \
--staging_location gs://BUCKET_NAME/binaries/
Substitua o seguinte:
BUCKET_NAME
: o nome do contentor do Cloud StoragePROJECT_ID
: o Google Cloud ID do projetoDATAFLOW_REGION
: a região onde quer implementar a tarefa do Dataflow. Por exemplo,europe-west1
. A flag--region
substitui a região predefinida definida no servidor de metadados, no cliente local ou nas variáveis de ambiente.
Defina opções de pipeline experimentais
Nos SDKs Java, Python e Go, a experiments
opção de pipeline
ativa funcionalidades experimentais ou de pré-GA do Dataflow.
Definido programaticamente
Para definir a opção experiments
programaticamente, use a seguinte sintaxe.
Java
No objeto
PipelineOptions
, inclua a opção experiments
através da seguinte sintaxe.
Este exemplo define o tamanho do disco de arranque para 80 GB com a flag da experiência.
options.setExperiments("streaming_boot_disk_size_gb=80")
Para ver um exemplo que mostra como criar o objeto PipelineOptions
, consulte a secção
Exemplo de lançamento no Dataflow
nesta página.
Python
No objeto
PipelineOptions
, inclua a opção experiments
através da seguinte sintaxe.
Este exemplo define o tamanho do disco de arranque para 80 GB com a flag da experiência.
beam_options = PipelineOptions(
beam_args,
experiments=['streaming_boot_disk_size_gb=80'])
Para ver um exemplo que mostra como criar o objeto PipelineOptions
, consulte a secção
Exemplo de lançamento no Dataflow
nesta página.
Go
A definição de opções de pipeline de forma programática através de PipelineOptions
não é suportada no SDK do Apache Beam para Go. Use argumentos da linha de comandos do Go.
Definido na linha de comandos
Para definir a opção experiments
na linha de comandos, use a seguinte sintaxe.
Java
Este exemplo define o tamanho do disco de arranque para 80 GB com a flag da experiência.
--experiments=streaming_boot_disk_size_gb=80
Python
Este exemplo define o tamanho do disco de arranque para 80 GB com a flag da experiência.
--experiments=streaming_boot_disk_size_gb=80
Go
Este exemplo define o tamanho do disco de arranque para 80 GB com a flag da experiência.
--experiments=streaming_boot_disk_size_gb=80
Definir num modelo
Para ativar uma funcionalidade experimental quando executa um modelo do Dataflow, use a flag --additional-experiments
.
Modelo clássico
gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]
Modelo Flex
gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]
Aceda ao objeto de opções da pipeline
Quando cria o objeto Pipeline
no seu programa Apache Beam, transmita
PipelineOptions
. Quando o serviço Dataflow executa o pipeline, envia uma cópia do PipelineOptions
a cada trabalhador.
Java
Aceda a PipelineOptions
dentro de qualquer instância ParDo
de transformação através do método DoFn
.ProcessContext.getPipelineOptions
Python
Esta funcionalidade não é suportada no SDK Apache Beam para Python.
Go
Aceder às opções de pipeline através de beam.PipelineOptions
.
Inicie no Dataflow
Execute a sua tarefa em recursos Google Cloud geridos através do serviço de execução do Dataflow. A execução do pipeline com o Dataflow cria uma tarefa do Dataflow, que usa recursos do Compute Engine e do Cloud Storage no seu Google Cloudprojeto. Para informações sobre as autorizações do Dataflow, consulte o artigo Segurança e autorizações do Dataflow.
As tarefas do Dataflow usam o Cloud Storage para armazenar ficheiros temporários durante a execução do pipeline. Para evitar a faturação de custos de armazenamento desnecessários, desative a funcionalidade de eliminação temporária em contentores que os seus trabalhos do Dataflow usam para armazenamento temporário. Para mais informações, consulte o artigo Desative a eliminação temporária.
Defina as opções necessárias
Para executar o pipeline através do Dataflow, defina as seguintes opções do pipeline:
Java
project
: o ID do seu projeto do Google Cloud .runner
: o executor de pipelines que executa o seu pipeline. Para a execução deGoogle Cloud , este valor tem de serDataflowRunner
.gcpTempLocation
: um caminho do Cloud Storage para o Dataflow preparar a maioria dos ficheiros temporários. O contentor especificado já tem de existir.Se não especificar
gcpTempLocation
, o Dataflow usa o valor da opçãotempLocation
. Se não especificar nenhuma destas opções, o Dataflow cria um novo contentor do Cloud Storage.
Python
project
: o ID do seu projeto Google Cloud .region
: a região da sua tarefa do Dataflow.runner
: o executor de pipelines que executa o seu pipeline. Para a execução deGoogle Cloud , este valor tem de serDataflowRunner
.temp_location
: um caminho do Cloud Storage para o Dataflow preparar ficheiros de tarefas temporários criados durante a execução do pipeline.
Go
project
: o ID do seu projeto Google Cloud .region
: a região da sua tarefa do Dataflow.runner
: o executor de pipelines que executa o seu pipeline. Para a execução deGoogle Cloud , este valor tem de serdataflow
.staging_location
: um caminho do Cloud Storage para o Dataflow preparar ficheiros de tarefas temporários criados durante a execução do pipeline.
Defina opções de pipelines de forma programática
O código de exemplo seguinte mostra como criar um pipeline definindo programaticamente o executor e outras opções necessárias para executar o pipeline através do Dataflow.
Java
// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()
# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
beam_args,
runner='DataflowRunner',
project='my-project-id',
job_name='unique-job-name',
temp_location='gs://my-bucket/temp',
region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']
# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
pass # build your pipeline here.
Go
O SDK Apache Beam para Go usa argumentos da linha de comandos do Go. Use
flag.Set()
para definir valores de flags.
// Use the Go flag package to parse custom options.
flag.Parse()
// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")
beam.Init()
// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()
Depois de construir o pipeline, especifique todas as leituras, transformações e escritas do pipeline e execute-o.
Use opções de pipeline a partir da linha de comandos
O exemplo seguinte mostra como usar opções de pipeline especificadas na linha de comandos. Este exemplo não define as opções do pipeline de forma programática.
Java
// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
Use o módulo argparse do Python para analisar as opções da linha de comandos.
# Use Python argparse module to parse custom arguments
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# For more details on how to use argparse, take a look at:
# https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
'--input-file',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()
# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| 'Read files' >> beam.io.ReadFromText(args.input_file)
| 'Write files' >> beam.io.WriteToText(args.output_path))
Go
Use o pacote Go flag
para analisar as opções da linha de comandos. Tem de analisar as opções antes de chamar
beam.Init()
. Neste exemplo, output
é uma opção da linha de comandos.
// Define configuration options
var (
output = flag.String("output", "", "Output file (required).")
)
// Parse options before beam.Init()
flag.Parse()
beam.Init()
// Input validation must be done after beam.Init()
if *output == "" {
log.Fatal("No output provided!")
}
p := beam.NewPipeline()
Depois de construir o pipeline, especifique todas as leituras, transformações e escritas do pipeline e, em seguida, execute-o.
Controle os modos de execução
Quando um programa Apache Beam executa um pipeline num serviço como o Dataflow, o programa pode executar o pipeline de forma assíncrona ou pode bloquear até à conclusão do pipeline. Pode alterar este comportamento seguindo as orientações abaixo.
Java
Quando um programa Java do Apache Beam executa um pipeline num serviço como o Dataflow, é normalmente executado de forma assíncrona. Para executar um pipeline e aguardar até que a tarefa seja concluída, defina DataflowRunner
como o executor de pipelines e chame explicitamente pipeline.run().waitUntilFinish()
.
Quando usa DataflowRunner
e chama waitUntilFinish()
no objeto PipelineResult
devolvido por pipeline.run()
, o pipeline é executado em Google Cloud , mas o código local aguarda a conclusão da tarefa na nuvem e devolve o objeto DataflowPipelineJob
final. Enquanto a tarefa é executada, o serviço Dataflow imprime atualizações do estado da tarefa e mensagens da consola enquanto aguarda.
Python
Quando um programa Python do Apache Beam executa um pipeline num serviço como o Dataflow, é normalmente executado de forma assíncrona. Para bloquear
até à conclusão do pipeline, use o método wait_until_finish()
do objeto PipelineResult
, devolvido pelo método run()
do executor.
Go
Quando um programa Apache Beam Go executa um pipeline no Dataflow, é síncrono por predefinição e bloqueia até à conclusão do pipeline. Se não quiser bloquear, tem duas opções:
Inicie a tarefa numa rotina do Go.
go func() { pr, err := beamx.Run(ctx, p) if err != nil { // Handle the error. } // Send beam.PipelineResult into a channel. results <- pr }() // Do other operations while the pipeline runs.
Use o sinalizador de linha de comandos
--async
, que se encontra no pacotejobopts
.
Para ver detalhes de execução, monitorizar o progresso e verificar o estado de conclusão da tarefa, use a interface de monitorização do Dataflow ou a interface de linha de comandos do Dataflow.
Use origens de streaming
Java
Se o pipeline ler a partir de uma origem de dados ilimitada, como o Pub/Sub, o pipeline é executado automaticamente no modo de streaming.
Python
Se o seu pipeline usar uma origem de dados ilimitada, como o Pub/Sub, tem de definir a opção streaming
como verdadeira.
Go
Se o pipeline ler a partir de uma origem de dados ilimitada, como o Pub/Sub, o pipeline é executado automaticamente no modo de streaming.
As tarefas de streaming usam um tipo de máquina do Compute Engine de n1-standard-2
ou superior por predefinição.
Inicie localmente
Em vez de executar o pipeline em recursos de nuvem geridos, pode optar por executar o pipeline localmente. A execução local tem determinadas vantagens para testar, depurar ou executar o seu pipeline em pequenos conjuntos de dados. Por exemplo, a execução local remove a dependência do serviço Dataflow remoto e do projeto Google Cloud associado.
Quando usa a execução local, tem de executar o pipeline com conjuntos de dados suficientemente pequenos para caberem na memória local. Pode criar um pequeno conjunto de dados na memória usando uma transformação Create
ou usar uma transformação Read
para trabalhar com pequenos ficheiros locais ou remotos. Normalmente, a execução local oferece uma forma mais rápida e fácil de realizar testes e depuração com menos dependências externas, mas está limitada pela memória disponível no seu ambiente local.
O exemplo de código seguinte mostra como criar um pipeline que é executado no seu ambiente local.
Java
// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()
# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| beam.io.ReadFromText(args.input)
| beam.io.WriteToText(args.output))
Go
// Parse options before beam.Init()
flag.Parse()
beam.Init()
p := beam.NewPipeline()
Depois de criar o pipeline, execute-o.
Crie opções de pipeline personalizadas
Pode adicionar as suas próprias opções personalizadas, além das opções padrão
PipelineOptions
. A linha de comandos do Apache Beam também pode analisar opções personalizadas através de argumentos de linha de comandos especificados no mesmo formato.
Java
Para adicionar as suas próprias opções, defina uma interface com métodos getter e setter para cada opção, como no exemplo seguinte:
public interface MyOptions extends PipelineOptions {
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
Python
Para adicionar as suas próprias opções, use o método add_argument()
(que se comporta exatamente como o módulo argparse padrão do Python), como no exemplo seguinte:
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--input')
parser.add_argument('--output')
Go
Para adicionar as suas próprias opções, use o pacote de flags Go, conforme mostrado no exemplo seguinte:
var (
input = flag.String("input", "", "")
output = flag.String("output", "", "")
)
Também pode especificar uma descrição, que é apresentada quando um utilizador transmite --help
como um argumento de linha de comandos, e um valor predefinido.
Java
Define a descrição e o valor predefinido através de anotações, da seguinte forma:
public interface MyOptions extends PipelineOptions {
@Description("My custom command line argument.")
@Default.String("DEFAULT")
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
Recomendamos que registe a sua interface com PipelineOptionsFactory
e, em seguida, transmita a interface quando criar o objeto PipelineOptions
. Quando
regista a sua interface com o PipelineOptionsFactory
, o --help
pode
encontrar a sua interface de opções personalizadas e adicioná-la ao resultado do comando --help
. PipelineOptionsFactory
valida se as suas opções personalizadas são
compatíveis com todas as outras opções registadas.
O exemplo de código seguinte mostra como registar a sua interface de opções personalizadas
com PipelineOptionsFactory
:
PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(MyOptions.class);
Agora, o seu pipeline pode aceitar --myCustomOption=value
como um argumento de linha de comandos.
Python
Define a descrição e o valor predefinido da seguinte forma:
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output', required=True, help='The path prefix for output files.')
Go
Define a descrição e o valor predefinido da seguinte forma:
var (
input = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)