Veja nesta página várias opções de configuração de modelos flexíveis do Dataflow, incluindo:
- Permissões
- Variáveis de ambiente do Dockerfile
- Dependências do pacote
- imagens do Docker
- Opções de pipeline
- Locais temporários e de preparo
Para configurar um modelo flexível de amostra, consulte o tutorial de modelo flexível.
Entender as permissões dos modelos Flex
Ao trabalhar com modelos Flex, você precisa de três conjuntos de permissões:
- Permissões para criar recursos
- Permissões para criar um modelo flexível
- Permissões para executar um modelo flexível
Permissões para criar recursos
Para desenvolver e executar um pipeline de modelo flexível, é necessário criar vários recursos (por exemplo, um bucket de preparo). Para tarefas únicas de criação de recursos, use o papel de proprietário básico.
Permissões para criar um modelo flexível
Como desenvolvedor de um modelo flexível, é necessário criar o modelo para disponibilizá-lo aos usuários. A criação envolve o upload de uma especificação de modelo em um bucket do Cloud Storage e o provisionamento de uma imagem do Docker com o código e as dependências necessárias para executar o pipeline. Para criar um modelo flexível, você precisa ter acesso de leitura e gravação ao Cloud Storage e ao acesso de gravador do Artifact Registry ao seu repositório do Artifact Registry. Para conceder essas permissões, atribua estes papéis:
- Administrador do Storage (
roles/storage.admin
) - Editor do Cloud Build (
roles/cloudbuild.builds.editor
) - Gravador do Artifact Registry (
roles/artifactregistry.writer
)
Permissões para executar um modelo flexível
Quando você executa um modelo flexível, o Dataflow cria um job para você. Para criar o job, a conta de serviço do Dataflow precisa da seguinte permissão:
dataflow.serviceAgent
Quando você usa o Dataflow pela primeira vez, o serviço atribui esse papel a você para que não precise conceder essa permissão.
Por padrão, a conta de serviço do Compute Engine é usada para VMs de inicializador e de worker. A conta de serviço precisa dos seguintes papéis e capacidades:
- Administrador de objetos do Storage (
roles/storage.objectAdmin
) - Visualizador (
roles/viewer
) - Worker do Dataflow (
roles/dataflow.worker
) - Acesso de leitura e gravação ao bucket de preparo
- Acesso de leitura à imagem do modelo flexível
Para conceder acesso de leitura e gravação ao bucket de preparo, use o papel Administrador de objetos do Storage (roles/storage.objectAdmin
). Para mais informações, consulte Papéis do IAM para o Cloud Storage.
Para conceder acesso de leitura à imagem do modelo flexível, use o papel Visualizador de objeto do Storage (roles/storage.objectViewer
). Para mais informações, consulte Como configurar o controle de acesso.
Definir as variáveis de ambiente necessárias do Dockerfile
Se você quiser criar seu próprio Dockerfile para um job de modelo flexível, especifique as seguintes variáveis de ambiente:
Java
Especifique FLEX_TEMPLATE_JAVA_MAIN_CLASS
e FLEX_TEMPLATE_JAVA_CLASSPATH
no Dockerfile.
ENV | Descrição | Obrigatório |
---|---|---|
FLEX_TEMPLATE_JAVA_MAIN_CLASS |
Especifica qual classe Java será executada para iniciar o modelo flexível. | SIM |
FLEX_TEMPLATE_JAVA_CLASSPATH |
Especifica o local dos arquivos de classe. | SIM |
FLEX_TEMPLATE_JAVA_OPTIONS |
Especifica as opções do Java a serem transmitidas ao iniciar o modelo flexível. | NÃO |
Python
Especifique FLEX_TEMPLATE_PYTHON_PY_FILE
no Dockerfile.
Para gerenciar dependências de pipeline, defina variáveis no Dockerfile, como as seguintes:
FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
FLEX_TEMPLATE_PYTHON_PY_OPTIONS
FLEX_TEMPLATE_PYTHON_SETUP_FILE
FLEX_TEMPLATE_PYTHON_EXTRA_PACKAGES
Por exemplo, as variáveis de ambiente a seguir são definidas no tutorial de streaming no modelo Flex do Python (em inglês) no GitHub:
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
ENV | Descrição | Obrigatório |
---|---|---|
FLEX_TEMPLATE_PYTHON_PY_FILE |
Especifica qual arquivo Python executar para iniciar o modelo flexível. | SIM |
FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE |
Especifica o arquivo de requisitos com dependências de pipeline. Para mais informações, consulte Dependências PyPI na documentação do Apache Beam. | NÃO |
FLEX_TEMPLATE_PYTHON_SETUP_FILE |
Especifica o caminho para o arquivo "setup.py" do pacote de pipeline. Para mais informações, consulte Dependências de vários arquivos na documentação do Apache Beam. | NÃO |
FLEX_TEMPLATE_PYTHON_EXTRA_PACKAGES |
Especifica os pacotes que não estão disponíveis publicamente. Para informações sobre como usar pacotes extras, leia Dependências locais ou não PyPI. |
NÃO |
FLEX_TEMPLATE_PYTHON_PY_OPTIONS |
Especifica as opções do Python que serão transmitidas ao iniciar o modelo flexível. | NÃO |
Dependências do pacote
Quando um pipeline em Python do Dataflow usa outras dependências, pode ser necessário configurar o modelo Flex para instalar outras dependências em VMs de worker do Dataflow.
Quando você executa um job do Dataflow em Python que usa modelos Flex em um ambiente que restringe o acesso à Internet, é necessário pré-empacotar as dependências ao criar o modelo.
Use uma das seguintes opções para pré-empacotar as dependências de Python.
Para instruções sobre como gerenciar dependências de pipeline em pipelines Java e Go, consulte Gerenciar dependências de pipeline no Dataflow.
Usar um arquivo de requisitos e pré-empacotar as dependências com o modelo
Se você estiver usando seu próprio Dockerfile para definir a imagem do modelo Flex, siga estas etapas:
Crie um arquivo
requirements.txt
que liste as dependências do pipeline.COPY requirements.txt /template/ ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="/template/requirements.txt"
Instale as dependências na imagem do modelo Flex.
RUN pip install --no-cache-dir -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
Faça o download das dependências para o cache de requisitos locais, que é organizado para os workers do Dataflow quando o modelo é iniciado.
RUN pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
Confira a seguir um exemplo de código que faz o download prévio das dependências.
Usar um contêiner personalizado que pré-instala todas as dependências
Essa opção é preferível para pipelines executados em ambientes sem acesso à Internet.
Siga estas etapas para usar um contêiner personalizado:
Criar um contêiner personalizado que pré-instala as dependências necessárias.
Pré-instale as mesmas dependências no Dockerfile do modelo Flex. Não use a opção
FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
.Um
dataflow/flex-templates/streaming_beam/Dockerfile
modificado pode ser parecido com este exemplo:FROM gcr.io/dataflow-templates-base/python3-template-launcher-base ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/streaming_beam.py" COPY . /template RUN pip install --no-cache-dir -r /template/requirements.txt
Como alternativa, para reduzir o número de imagens a serem mantidas, use sua imagem de contêiner personalizada como uma imagem de base para o modelo Flex.
Se você usa o SDK do Apache Beam versão 2.49.0 ou anterior, adicione a opção de pipeline
--sdk_location=container
na tela de início do pipeline. Essa opção instrui o pipeline a usar o SDK do contêiner personalizado em vez de Baixar o SDK.options = PipelineOptions(beam_args, save_main_session=True, streaming=True, sdk_location="container")
Defina o parâmetro
sdk_container_image
no comandoflex-template run
. Exemplo:gcloud dataflow flex-template run $JOB_NAME \ --region=$REGION \ --template-file-gcs-location=$TEMPLATE_PATH \ --parameters=sdk_container_image=$CUSTOM_CONTAINER_IMAGE \ --additional-experiments=use_runner_v2
Para mais informações, consulte Usar contêineres personalizados no Dataflow.
Estruturar o pipeline como um pacote
Se você estruturar o pipeline como um pacote,
use a opção FLEX_TEMPLATE_PYTHON_SETUP_FILE
. Para mais informações
sobre como estruturar o pipeline como um pacote, consulte
Várias dependências de arquivos
na documentação do Apache Beam.
Se você usar seu próprio Dockerfile para definir a imagem do modelo Flex, instale o pacote no Dockerfile.
O Dockerfile do modelo Flex pode incluir o seguinte:
COPY setup.py .
COPY main.py .
COPY package_name package_name
RUN pip install -e .
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/main.py"
Se você usar esse método e usar uma imagem de contêiner personalizada para pré-instalar dependências no ambiente de execução, recomendamos que você instale o pacote de pipeline na imagem de contêiner personalizada quando criá-lo. Não especifique a opção FLEX_TEMPLATE_PYTHON_SETUP_FILE
.
Para conferir um exemplo que segue essa abordagem, consulte o tutorial Modelo Flex de um pipeline com dependências e uma imagem de contêiner personalizada (em inglês) no GitHub.
Escolher uma imagem de base.
É possível usar uma imagem
base fornecida pelo Google para empacotar as imagens
de contêiner do modelo usando o Docker. Escolha a tag mais recente nas
imagens de base dos modelos flexíveis.
É recomendável usar uma tag de imagem específica em vez de latest
.
Especifique a imagem base no seguinte formato:
gcr.io/dataflow-templates-base/IMAGE_NAME:TAG
Substitua:
IMAGE_NAME
: uma imagem base fornecida pelo Google.TAG
: um nome de versão para a imagem de base, encontrado na referência de imagens de base dos modelos Flex.
Usar imagens de contêiner personalizadas
Se o pipeline usar uma imagem de contêiner personalizada, recomendamos usá-la como imagem base para a imagem do Docker de modelo flexível. Para fazer isso, copie o binário do inicializador do modelo Flex da imagem base do modelo fornecido pelo Google para a imagem personalizada.
Um exemplo de Dockerfile
para uma imagem que pode ser
usada como imagem de contêiner do SDK personalizado e como modelo Flex
pode ter a seguinte aparência:
FROM gcr.io/dataflow-templates-base/IMAGE_NAME:TAG as template_launcher
FROM apache/beam_python3.10_sdk:2.56.0
# RUN <...Make image customizations here...>
# See: https://cloud.google.com/dataflow/docs/guides/build-container-image
# Configure the Flex Template here.
COPY --from=template_launcher /opt/google/dataflow/python_template_launcher /opt/google/dataflow/python_template_launcher
COPY my_pipeline.py /template/
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/my_pipeline.py"
Substitua:
IMAGE_NAME
: uma imagem base fornecida pelo Google. Por exemplo,python311-template-launcher-base
.TAG
: uma tag de versão para a imagem base, encontrada na referência de imagens base dos modelos Flex. Para melhor estabilidade e solução de problemas, evite usarlatest
. Em vez disso, fixe uma tag de versão específica.
Para conferir um exemplo que segue essa abordagem, consulte o tutorial Modelo Flex de um pipeline com dependências e uma imagem de contêiner personalizada (em inglês).
Usar uma imagem de um registro particular
É possível criar uma imagem de modelo flexível armazenada em um registro particular do Docker, se o registro particular usar HTTPS e tiver um certificado válido.
Para usar uma imagem de um registro particular, especifique o caminho da imagem e um nome de usuário e senha para o registro. O nome de usuário e a senha precisam ser armazenados no Gerenciador de secrets. É possível fornecer a chave secreta em um dos seguintes formatos:
projects/{project}/secrets/{secret}/versions/{secret_version}
projects/{project}/secrets/{secret}
Se você usar o segundo formato, porque ele não especifica a versão, o Dataflow usará a versão mais recente.
Se o registro usar um certificado autoassinado, você também precisará especificar o caminho para o certificado autoassinado no Cloud Storage.
A tabela a seguir descreve as opções da CLI gcloud que podem ser usadas para configurar um registro particular.
Parâmetro | Descrição |
---|---|
image
|
O endereço do registro. Exemplo:
gcp.repository.example.com:9082/registry/example/image:latest .
|
image-repository-username-secret-id
|
O código do Secret Manager para que o nome de usuário seja autenticado
no registro particular. Exemplo:
projects/example-project/secrets/username-secret .
|
image-repository-password-secret-id
|
O código secreto do Secret Manager da senha para autenticar
no registro particular. Exemplo:
projects/example-project/secrets/password-secret/versions/latest .
|
image-repository-cert-path
|
O URL completo do Cloud Storage para um certificado autoassinado para o
registro particular. Esse valor só é necessário se o registro usar um certificado autoassinado. Exemplo:
gs://example-bucket/self-signed.crt .
|
Veja um exemplo de comando da Google Cloud CLI que cria um modelo flexível usando uma imagem em um registro particular com um certificado autoassinado.
gcloud dataflow flex-template build gs://example-bucket/custom-pipeline-private-repo.json --sdk-language=JAVA --image="gcp.repository.example.com:9082/registry/example/image:latest" --image-repository-username-secret-id="projects/example-project/secrets/username-secret" --image-repository-password-secret-id="projects/example-project/secrets/password-secret/versions/latest" --image-repository-cert-path="gs://example-bucket/self-signed.crt" --metadata-file=metadata.json
Para criar seu próprio modelo flexível, substitua os valores de exemplo e talvez seja necessário especificar opções diferentes ou extras. Para saber mais, leia os seguintes artigos:
Especificar opções de pipeline
Para mais informações sobre as opções de pipeline com suporte direto nos modelos Flex, leia Opções de pipeline.
Use qualquer opção de pipeline do Apache Beam indiretamente. Se você estiver usando um arquivo metadata.json
para o job de modelo flexível, inclua essas opções de pipeline no arquivo. Esse arquivo de metadados precisa seguir o formato em TemplateMetadata
.
Caso contrário, ao iniciar o job de modelo flexível, transmita essas opções de pipeline usando o campo de parâmetros.
API
Inclua opções de pipeline usando o campo parameters
.
gcloud
Inclua opções de pipeline usando a sinalização
parameters
.
Ao transmitir parâmetros do tipo List
ou Map
, talvez seja necessário definir parâmetros em um arquivo YAML e usar o flags-file
.
Para ver um exemplo dessa abordagem, consulte a etapa "Criar um arquivo com parâmetros..." nesta solução.
Ao usar modelos Flex, é possível configurar algumas opções de pipeline durante a inicialização, mas outras opções não podem ser alteradas. Se os argumentos de linha de comando exigidos pelo modelo Flex forem substituídos, o job pode ignorar, substituir ou descartar as opções de pipeline transmitidas pelo inicializador de modelos. A inicialização do job pode falhar ou um job que não usa o modelo Flex pode ser iniciado. Para mais informações, consulte Falha ao ler o arquivo do job.
Durante a inicialização, não altere as seguintes opções de pipeline:
Java
runner
project
jobName
templateLocation
region
Python
runner
project
job_name
template_location
region
Go
runner
project
job_name
template_location
region
Bloquear chaves SSH do projeto de VMs que usam chaves SSH baseadas em metadados
Para impedir que as VMs aceitem chaves SSH armazenadas nos metadados do projeto, bloqueie as chaves SSH do projeto das VMs. Use a sinalização additional-experiments
com
a opção de serviço block_project_ssh_keys
:
--additional-experiments=block_project_ssh_keys
Para mais informações, consulte Opções de serviço do Dataflow.
Metadados
É possível ampliar o modelo com metadados adicionais para que os parâmetros personalizados sejam validados quando o modelo for executado. Se você quiser criar metadados para seu modelo, siga estas etapas:
- Crie um arquivo
metadata.json
usando os parâmetros em Parâmetros de metadados.Para ver um exemplo, consulte Exemplo de arquivo de metadados.
- Armazene o arquivo JSON no Cloud Storage na mesma pasta do modelo.
Parâmetros de metadados
Chave de parâmetro | Obrigatório | Descrição do valor | |
---|---|---|---|
name |
Yes | O nome do seu modelo. | |
description |
No | Um parágrafo curto descrevendo o parâmetro. | |
streaming |
No | Se true , o modelo é compatível com streaming. O valor padrão é
false . |
|
supportsAtLeastOnce |
No | Se true , o modelo é compatível com o processamento "Pelo menos uma vez". O valor padrão é false . Defina esse parâmetro como true se o modelo for projetado para funcionar com o modo de streaming "Pelo menos uma vez".
|
|
supportsExactlyOnce |
No | Se true , o modelo é compatível com o processamento "Exatamente uma vez". O valor padrão é true . |
|
defaultStreamingMode |
No | O modo de streaming padrão, para modelos compatíveis com os modos "pelo menos uma vez" e
"exatamente uma". Use um dos seguintes valores: "AT_LEAST_ONCE" , "EXACTLY_ONCE" . Se não for especificado, o modo de streaming padrão será exatamente uma vez.
|
|
parameters |
No | Uma matriz de parâmetros adicionais que o modelo usa. Uma matriz vazia é usada por padrão. | |
name |
Yes | O nome do parâmetro usado no seu modelo. | |
label |
Yes | Uma string legível que é usada no Console do Google Cloud para rotular o parâmetro. | |
helpText |
Yes | Um parágrafo curto que descreve o parâmetro. | |
isOptional |
No | false se o parâmetro for obrigatório e true se o parâmetro for opcional. A menos que definido com um valor, isOptional assume como padrão false .
Se você não incluir essa chave de parâmetro nos metadados, eles se tornarão um parâmetro
obrigatório. |
|
regexes |
No | Uma matriz de expressões regulares POSIX-egrep em formato de string que será usada para validar o
valor do parâmetro. Por exemplo: ["^[a-zA-Z][a-zA-Z0-9]+"] é uma
expressão regular única que valida que o valor comece com uma letra e tenha um ou
mais caracteres. Uma matriz vazia é usada por padrão. |
Exemplo de arquivo de metadados
Java
{ "name": "Streaming Beam SQL", "description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery", "parameters": [ { "name": "inputSubscription", "label": "Pub/Sub input subscription.", "helpText": "Pub/Sub subscription to read from.", "regexes": [ "[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}" ] }, { "name": "outputTable", "label": "BigQuery output table", "helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.", "isOptional": true, "regexes": [ "[^:]+:[^.]+[.].+" ] } ] }
Python
{ "name": "Streaming beam Python flex template", "description": "Streaming beam example for python flex template.", "parameters": [ { "name": "input_subscription", "label": "Input PubSub subscription.", "helpText": "Name of the input PubSub subscription to consume from.", "regexes": [ "projects/[^/]+/subscriptions/[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}" ] }, { "name": "output_table", "label": "BigQuery output table name.", "helpText": "Name of the BigQuery output table name.", "isOptional": true, "regexes": [ "([^:]+:)?[^.]+[.].+" ] } ] }
É possível fazer o download de arquivos de metadados para os modelos fornecidos pelo Google no diretório de modelos do Dataflow.
Entender o local de preparo e o local temporário
A Google Cloud CLI fornece as opções --staging-location
e --temp-location
quando você executa um modelo flexível.
Da mesma forma, a API REST do Dataflow fornece os campos stagingLocation
e
tempLocation
para
FlexTemplateRuntimeEnvironment.
Nos modelos Flex, o local de preparo é o URL do Cloud Storage em que os arquivos são gravados durante a etapa de preparação do lançamento de um modelo. O Dataflow lê esses arquivos preparados para criar o gráfico de modelo. O local temporário é o URL do Cloud Storage em que os arquivos temporários são gravados durante a etapa de execução.
Atualizar um job de modelo Flex
O exemplo de solicitação a seguir mostra como atualizar um job de streaming de modelo usando o método projects.locations.flexTemplates.launch. Se você quiser usar a gcloud CLI, consulte Atualizar um pipeline que já existe.
Para atualizar um modelo clássico, use projects.locations.templates.launch.
Siga as etapas para criar um job de streaming com base em um modelo Flex. Envie a seguinte solicitação POST HTTP com os seguintes valores modificados:
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch { "launchParameter": { "update": true "jobName": "JOB_NAME", "parameters": { "input_subscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "output_table": "PROJECT_ID:DATASET.TABLE_NAME" }, "containerSpecGcsPath": "STORAGE_PATH" }, }
- Substitua
PROJECT_ID
pela ID do seu projeto. - Substitua
REGION
pela região do Dataflow do job que você está atualizando. - Substitua
JOB_NAME
pelo nome exato do job que você quer atualizar. - Defina
parameters
como sua lista de pares de chave/valor. Os parâmetros listados são específicos desse modelo de exemplo. Se você estiver usando um modelo personalizado, modifique os parâmetros conforme necessário. Se você estiver usando o modelo de exemplo, substitua as variáveis a seguir.- Substitua
SUBSCRIPTION_NAME
pelo nome da sua assinatura do Pub/Sub. - Substitua
DATASET
pelo nome do conjunto de dados do BigQuery. - Substitua
TABLE_NAME
pelo nome da sua tabela do BigQuery.
- Substitua
- Substitua
STORAGE_PATH
pelo local do Cloud Storage do arquivo de modelo. O local precisa começar comgs://
.
- Substitua
Use o parâmetro
environment
para alterar as configurações do ambiente. Para mais informações, consulteFlexTemplateRuntimeEnvironment
.Opcional: para enviar sua solicitação usando curl (Linux, macOS ou Cloud Shell), salve a solicitação em um arquivo JSON e execute o seguinte comando:
curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
Substitua FILE_PATH pelo caminho para o arquivo JSON que contém o corpo da solicitação.
Use a interface de monitoramento do Dataflow para verificar se um novo job com o mesmo nome foi criado. Esse job tem o status Atualizado.
Limitações
As seguintes limitações aplicam-se aos jobs de modelos flexíveis:
- Use uma imagem de base fornecida pelo Google para empacotar os contêineres usando o Docker. Para uma lista de imagens aplicáveis, consulte Imagens de base do modelo flexível.
- O programa que constrói o pipeline precisa sair após
run
ser chamado para que o pipeline seja iniciado. waitUntilFinish
(Java) ewait_until_finish
(Python) não são compatíveis.
A seguir
- Para saber mais sobre os modelos clássicos e Flex e os cenários de caso de uso, consulte Modelos do Dataflow.
- Para ver informações sobre a solução de problemas de modelos Flex, consulte Resolver problemas de tempo limite do modelo Flex.
- Para mais arquiteturas de referência, diagramas e práticas recomendadas, confira a Central de arquitetura do Cloud.