Como usar modelos flexíveis

Neste tutorial, mostramos como criar e executar um job de modelo flexível do Dataflow com uma imagem do Docker personalizada usando a ferramenta de linha de comando gcloud. Neste tutorial, você também verá um exemplo de pipeline de streaming que lê mensagens codificadas por JSON do Pub/Sub, transforma dados de mensagens com o Beam SQL e grava os resultados em uma tabela do BigQuery.

Para saber mais sobre os modelos flexíveis, consulte Modelos do Dataflow.

Objetivos

  • Criar uma imagem de contêiner do Docker.
  • Criar e executar um modelo flexível do Dataflow.

Custos

Neste tutorial, há componentes faturáveis do Google Cloud, como os seguintes:

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • Cloud Scheduler
  • App Engine
  • Container Registry
  • Cloud Build
  • BigQuery

Use a Calculadora de preços para gerar uma estimativa de custo com base no uso previsto.

Antes de começar

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. No Console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  3. Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como confirmar se o faturamento está ativado para o projeto.

  4. Ative as APIs Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Cloud Scheduler, and Cloud Build.

    Ative as APIs

  5. Crie uma conta de serviço:

    1. No Console do Cloud, acesse a página Criar conta de serviço.

      Acesse Criar conta de serviço
    2. Selecione um projeto.
    3. No campo Nome da conta de serviço, insira um nome. O Console do Cloud preenche o campo ID da conta de serviço com base nesse nome.

      No campo Descrição da conta de serviço, insira uma descrição. Por exemplo, Service account for quickstart.

    4. Clique em Criar e continuar.
    5. Clique no campo Selecionar um papel.

      Em Acesso rápido, clique em Básico e em Proprietário.

    6. Clique em Continuar.
    7. Clique em Concluído para terminar a criação da conta de serviço.

      Não feche a janela do navegador. Você vai usá-lo na próxima etapa.

  6. Crie uma chave de conta de serviço:

    1. No Console do Cloud, clique no endereço de e-mail da conta de serviço que você criou.
    2. Clique em Chaves.
    3. Clique em Adicionar chave e em Criar nova chave.
    4. Clique em Criar. O download de um arquivo de chave JSON é feito no seu computador.
    5. Clique em Fechar.
  7. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo JSON que contém a chave da conta de serviço. Essa variável só se aplica à sessão de shell atual. Dessa maneira, se você abrir uma nova sessão, defina a variável novamente.

  8. No Console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  9. Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como confirmar se o faturamento está ativado para o projeto.

  10. Ative as APIs Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Cloud Scheduler, and Cloud Build.

    Ative as APIs

  11. Crie uma conta de serviço:

    1. No Console do Cloud, acesse a página Criar conta de serviço.

      Acesse Criar conta de serviço
    2. Selecione um projeto.
    3. No campo Nome da conta de serviço, insira um nome. O Console do Cloud preenche o campo ID da conta de serviço com base nesse nome.

      No campo Descrição da conta de serviço, insira uma descrição. Por exemplo, Service account for quickstart.

    4. Clique em Criar e continuar.
    5. Clique no campo Selecionar um papel.

      Em Acesso rápido, clique em Básico e em Proprietário.

    6. Clique em Continuar.
    7. Clique em Concluído para terminar a criação da conta de serviço.

      Não feche a janela do navegador. Você vai usá-lo na próxima etapa.

  12. Crie uma chave de conta de serviço:

    1. No Console do Cloud, clique no endereço de e-mail da conta de serviço que você criou.
    2. Clique em Chaves.
    3. Clique em Adicionar chave e em Criar nova chave.
    4. Clique em Criar. O download de um arquivo de chave JSON é feito no seu computador.
    5. Clique em Fechar.
  13. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo JSON que contém a chave da conta de serviço. Essa variável só se aplica à sessão de shell atual. Dessa maneira, se você abrir uma nova sessão, defina a variável novamente.

Ao concluir este tutorial, exclua os recursos criados para evitar o faturamento contínuo. Veja mais detalhes em Como limpar.

Como criar o exemplo de origem e coletor

Nesta seção, há explicações sobre como criar o seguinte:

  • Uma fonte de streaming de dados usando o Pub/Sub.
  • Um conjunto de dados para carregar os dados no BigQuery.

Criar um bucket do Cloud Storage

Use o comando gsutil mb:

export BUCKET="my-storage-bucket"
gsutil mb gs://$BUCKET

Criar um tópico Pub/Sub e uma assinatura para esse tópico

Use a ferramenta de linha de comando gcloud:

export TOPIC="messages"
export SUBSCRIPTION="ratings"

gcloud pubsub topics create $TOPIC
gcloud pubsub subscriptions create --topic $TOPIC $SUBSCRIPTION

Criar um job do Cloud Scheduler

Nesta etapa, usamos a ferramenta de linha de comando gcloud para criar e executar um job do Cloud Scheduler que publique "avaliações positivas" e "avaliações negativas".

  1. Crie um job do Cloud Scheduler para este projeto do Google Cloud.
    gcloud scheduler jobs create pubsub positive-ratings-publisher \
      --schedule="* * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
    
  2. Isso cria e executa um editor para "avaliações positivas" que publica uma mensagem por minuto.
  3. Inicie o job do Cloud Scheduler.
    gcloud scheduler jobs run positive-ratings-publisher
    
  4. Crie e execute outro editor semelhante para "avaliações negativas" que publica uma mensagem a cada dois minutos.
    gcloud scheduler jobs create pubsub negative-ratings-publisher \
      --schedule="*/2 * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "negative"}'
    
    gcloud scheduler jobs run negative-ratings-publisher
    

Crie um conjunto de dados do BigQuery

Use o comando bq mk:

export PROJECT="$(gcloud config get-value project)"
export DATASET="beam_samples"
export TABLE="streaming_beam_sql"

bq mk --dataset "$PROJECT:$DATASET"

Como fazer o download da amostra de código

  1. Faça o download da amostra de código.

    Java

    Clone o repositório java-docs-samples (em inglês) e navegue até a amostra de código para este tutorial.

      git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
      cd java-docs-samples/dataflow/flex-templates/streaming_beam_sql

    Python

    Clone o repositório python-docs-samples (em inglês) e navegue até a amostra de código para este tutorial.

      git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
      cd python-docs-samples/dataflow/flex-templates/streaming_beam
  2. Exporte o TEMPLATE_IMAGE para este tutorial.
    export TEMPLATE_IMAGE="gcr.io/$PROJECT/samples/dataflow/streaming-beam-sql:latest"
    

Como configurar o ambiente de desenvolvimento

Java

  1. Faça o download e instale a versão 11 do Kit de desenvolvimento para Java (JDK, na sigla em inglês). Verifique se a variável de ambiente JAVA_HOME (links em inglês) está definida e aponta para a instalação do JDK.
  2. Faça o download e instale o Apache Maven seguindo o guia de instalação (links em inglês) para seu sistema operacional específico.
  3. (Opcional) Execute o pipeline do Apache Beam localmente para fazer o desenvolvimento.
      mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.samples.StreamingBeamSql \
        -Dexec.args="\
          --project=$PROJECT \
          --inputSubscription=$SUBSCRIPTION \
          --outputTable=$PROJECT:$DATASET.$TABLE \
          --tempLocation=gs://$BUCKET/samples/dataflow/temp"
  4. Crie o projeto Java em um arquivo Uber JAR.
      mvn clean package
  5. (Opcional) Anote o tamanho do arquivo Uber JAR e faça uma comparação com o arquivo original.
      ls -lh target/*.jar
    Esse arquivo Uber JAR tem todas as dependências incorporadas. É possível executar esse arquivo como um aplicativo autônomo sem dependências externas em outras bibliotecas.

Python

Use o SDK do Apache Beam para Python.

Somente Python: como criar uma imagem de contêiner

Nesta seção, você verá as etapas para usuários do Python. Se você estiver usando Java, pule as etapas a seguir.

Se o job não for executado e a mensagem de erro A Timeout in polling error message for exibida, consulte as etapas de solução de problemas.

  1. (Opcional) Ative o uso do cache do Kaniko por padrão.
    gcloud config set builds/use_kaniko True
    
    O Kaniko armazena em cache os artefatos do build de contêiner. Portanto, o uso dessa opção acelera a criação dos builds subsequentes.
  2. (Opcional) Crie o Dockerfile. É possível personalizar o Dockerfile deste tutorial. O arquivo inicial tem a seguinte aparência:

    Python

      FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
    
      ARG WORKDIR=/dataflow/template
      RUN mkdir -p ${WORKDIR}
      WORKDIR ${WORKDIR}
    
      COPY requirements.txt .
      COPY streaming_beam.py .
    
      ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
      ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
    
      RUN pip install -U -r ./requirements.txt

    Esse Dockerfile contém os comandos FROM, ENV e COPY. Veja mais informações sobre eles na referência do Dockerfile.

    Imagens começando com gcr.io/PROJECT/ são salvas no Container Registry do seu projeto, em que a imagem pode ser acessada por outros produtos do Google Cloud.
  3. Crie a imagem do Docker usando um Dockerfile com o Cloud Build.
    gcloud builds submit --tag $TEMPLATE_IMAGE .
    

Metadados

É possível ampliar o modelo com metadados adicionais para que os parâmetros personalizados sejam validados quando o modelo for executado. Se você quiser criar metadados para seu modelo, siga estas etapas:

  1. Crie um arquivo metadata.json usando os parâmetros em Parâmetros de metadados.

    Para ver um exemplo, consulte Exemplo de arquivo de metadados.

  2. Armazene o arquivo JSON no Cloud Storage na mesma pasta do modelo.

Parâmetros de metadados

Chave de parâmetro Obrigatório Descrição do valor
name Sim O nome do seu modelo.
description Não Um parágrafo curto descrevendo o parâmetro.
parameters Não Uma matriz de parâmetros adicionais que o modelo usa. Uma matriz vazia é usada por padrão.
name Sim O nome do parâmetro usado no seu modelo.
label Sim Uma string legível que é usada no Console do Cloud para rotular o parâmetro.
helpText Sim Um parágrafo curto que descreve o parâmetro.
isOptional Não false se o parâmetro for obrigatório e true se o parâmetro for opcional. A menos que definido com um valor, isOptional assume como padrão false. Se você não incluir essa chave de parâmetro nos metadados, eles se tornarão um parâmetro obrigatório.
regexes Não Uma matriz de expressões regulares POSIX-egrep em formato de string que será usada para validar o valor do parâmetro. Por exemplo: ["^[a-zA-Z][a-zA-Z0-9]+"] é uma expressão regular única que valida que o valor comece com uma letra e tenha um ou mais caracteres. Uma matriz vazia é usada por padrão.

Exemplo de arquivo de metadados

Java

{
  "name": "Streaming Beam SQL",
  "description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery",
  "parameters": [
    {
      "name": "inputSubscription",
      "label": "Pub/Sub input subscription.",
      "helpText": "Pub/Sub subscription to read from.",
      "regexes": [
        "[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "outputTable",
      "label": "BigQuery output table",
      "helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.",
      "isOptional": true,
      "regexes": [
        "[^:]+:[^.]+[.].+"
      ]
    }
  ]
}

Python

{
  "name": "Streaming beam Python flex template",
  "description": "Streaming beam example for python flex template.",
  "parameters": [
    {
      "name": "input_subscription",
      "label": "Input PubSub subscription.",
      "helpText": "Name of the input PubSub subscription to consume from.",
      "regexes": [
        "projects/[^/]+/subscriptions/[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "output_table",
      "label": "BigQuery output table name.",
      "helpText": "Name of the BigQuery output table name.",
      "isOptional": true,
      "regexes": [
        "[^:]+:[^.]+[.].+"
      ]
    }
  ]
}

É possível fazer o download de arquivos de metadados para os modelos fornecidos pelo Google no diretório de modelos do Dataflow.

Como criar um modelo flexível

Para executar um modelo, é preciso criar um arquivo de especificação de modelo em um Cloud Storage contendo todas as informações necessárias para executar o job, como informações e metadados do SDK.

Neste tutorial, usamos o arquivo de metadados de exemplo, que contém mais informações do modelo, como os campos name, description e parameters de entrada.

  1. Crie um arquivo de especificação de modelo que contenha todas as informações necessárias para executar o job, como as informações e os metadados do SDK.
    export TEMPLATE_PATH="gs://$BUCKET/samples/dataflow/templates/streaming-beam-sql.json"
    
  2. Crie o modelo flexível.

    Java

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image-gcr-path "$TEMPLATE_IMAGE" \
          --sdk-language "JAVA" \
          --flex-template-base-image JAVA11 \
          --metadata-file "metadata.json" \
          --jar "target/streaming-beam-sql-1.0.jar" \
          --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSql"

    Python

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image "$TEMPLATE_IMAGE" \
          --sdk-language "PYTHON" \
          --metadata-file "metadata.json"

O modelo agora está disponível por meio do arquivo de modelo no local do Cloud Storage que você especificou.

Como executar um pipeline de modelo flexível

Agora é possível executar o pipeline do Apache Beam no Dataflow fazendo referência ao arquivo de modelo e transmitindo os parâmetros do modelo exigidos pelo pipeline.

  1. No shell ou no terminal, execute o modelo:

    Java

    export REGION="us-central1"
    
    gcloud dataflow flex-template run "streaming-beam-sql-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters inputSubscription="$SUBSCRIPTION" \
        --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"

    Python

    export REGION="us-central1"
    
    gcloud dataflow flex-template run "streaming-beam-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters input_subscription="projects/$PROJECT/subscriptions/$SUBSCRIPTION" \
        --parameters output_table="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"
    Como alternativa, execute o modelo com uma solicitação API REST:

    Java

    export REGION="us-central1"
    
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/$REGION/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "inputSubscription": "'$SUBSCRIPTION'",
            "outputTable": "'$PROJECT:$DATASET.$TABLE'"
          },
          "containerSpecGcsPath": "'$TEMPLATE_PATH'"
        }
      }'

    Python

    export REGION="us-central1"
    
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/$REGION/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "input_subscription": "projects/'$PROJECT'/subscriptions/'$SUBSCRIPTION'",
            "output_table": "'$PROJECT:$DATASET.$TABLE'"
          },
          "containerSpecGcsPath": "'$TEMPLATE_PATH'"
        }
      }'
  2. Depois de executar o comando para executar o modelo flexível, o Dataflow retorna um ID de job com o status Na fila. Pode levar alguns minutos até que o status do job atinja Em execução e você possa acessar o gráfico de jobs.
  3. Verifique os resultados no BigQuery executando a seguinte consulta:
    bq query --use_legacy_sql=false 'SELECT * FROM `'"$PROJECT.$DATASET.$TABLE"'`'
    
    Enquanto este pipeline estiver em execução, será possível ver novas linhas anexadas à tabela do BigQuery a cada minuto.

Como limpar

Depois de concluir este tutorial, será possível limpar os recursos criados no Google Cloud para não ser cobrado por eles no futuro. Nas seções a seguir, você aprenderá a excluir e desativar esses recursos.

Limpar os recursos do modelo flexível

  1. Pare o pipeline do Dataflow.
    gcloud dataflow jobs list \
      --filter 'NAME=streaming-beam-sql AND STATE=Running' \
      --format 'value(JOB_ID)' \
      --region "$REGION" \
      | xargs gcloud dataflow jobs cancel --region "$REGION"
    
  2. Exclua o arquivo de especificações de modelo do Cloud Storage.
    gsutil rm $TEMPLATE_PATH
    
  3. Exclua a imagem do contêiner do modelo flexível do Container Registry.
    gcloud container images delete $TEMPLATE_IMAGE --force-delete-tags
    

Limpar recursos do projeto do Google Cloud

  1. Exclua os jobs do Cloud Scheduler.
    gcloud scheduler jobs delete negative-ratings-publisher
    gcloud scheduler jobs delete positive-ratings-publisher
    
  2. Exclua a assinatura e o tópico do Pub/Sub.
    gcloud pubsub subscriptions delete $SUBSCRIPTION
    gcloud pubsub topics delete $TOPIC
    
  3. Exclua a tabela do BigQuery.
    bq rm -f -t $PROJECT:$DATASET.$TABLE
    
  4. Exclua o conjunto de dados do BigQuery. Isso não gera cobranças.

    O comando a seguir também exclui todas as tabelas no conjunto de dados. Não é possível recuperar as tabelas e os dados.

    bq rm -r -f -d $PROJECT:$DATASET
    
  5. Exclua o bucket do Cloud Storage. Isso não gera cobranças.

    O comando a seguir também exclui todos os objetos no bucket. Não é possível recuperar esses objetos.

    gsutil rm -r gs://$BUCKET
    

Limitações

As seguintes limitações aplicam-se aos jobs de modelos flexíveis:

  • Use uma imagem de base fornecida pelo Google para empacotar os contêineres usando o Docker. Para uma lista de imagens aplicáveis, consulte Imagens de base do modelo flexível.
  • O programa que constrói o pipeline precisa sair após run ser chamado para que o pipeline seja iniciado.
  • waitUntilFinish (Java) e wait_until_finish (Python) não são compatíveis.

A seguir