Como usar modelos flexíveis

Neste tutorial, mostramos como criar e executar um job de modelo flexível do Dataflow com uma imagem do Docker personalizada usando a ferramenta de linha de comando gcloud. Neste tutorial, você também verá um exemplo de pipeline de streaming que lê mensagens codificadas por JSON do Pub/Sub, transforma dados de mensagens com o Beam SQL e grava os resultados em uma tabela do BigQuery.

Objetivos

  • Criar uma imagem de contêiner do Docker.
  • Criar e executar um modelo flexível do Dataflow.

Custos

Neste tutorial, há componentes faturáveis do Google Cloud, como os seguintes:

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • Cloud Scheduler
  • App Engine
  • Container Registry
  • Cloud Build
  • BigQuery

Use a Calculadora de preços para gerar uma estimativa de custo com base no uso previsto.

Antes de começar

  1. Faça login na sua conta do Google.

    Se você ainda não tiver uma, inscreva-se.

  2. No Console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar a página do seletor de projetos

  3. Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como confirmar se o faturamento está ativado para o projeto.

  4. Ative as APIs Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Cloud Scheduler, and Cloud Build.

    Ative as APIs

  5. Configurar a autenticação:
    1. No Console do Cloud, acesse a página Criar chave da conta de serviço.

      Acessar página "Criar chave da conta de serviço"
    2. Na lista Conta de serviço, selecione Nova conta de serviço.
    3. No campo Nome da conta de serviço, insira um nome.
    4. Na lista Papel, selecione Projeto > Proprietário.

    5. Clique em Criar. O download de um arquivo JSON que contém sua chave é feito no seu computador.
  6. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo JSON que contém a chave da conta de serviço. Essa variável só se aplica à sessão de shell atual. Dessa maneira, se você abrir uma nova sessão, defina a variável novamente.

Ao concluir este tutorial, exclua os recursos criados para evitar o faturamento contínuo. Veja mais detalhes em Como limpar.

Como criar o exemplo de origem e coletor

Nesta seção, há explicações sobre como criar o seguinte:

  • Uma fonte de streaming de dados usando o Pub/Sub.
  • Um conjunto de dados para carregar os dados no BigQuery.

Criar um bucket do Cloud Storage

Use o comando gsutil mb:

export BUCKET="my-storage-bucket"
gsutil mb gs://$BUCKET

Criar um tópico Pub/Sub e uma assinatura para esse tópico

Use a ferramenta de linha de comando gcloud:

export TOPIC="messages"
export SUBSCRIPTION="ratings"

gcloud pubsub topics create $TOPIC
gcloud pubsub subscriptions create --topic $TOPIC $SUBSCRIPTION

Criar um job do Cloud Scheduler

Nesta etapa, usamos a ferramenta de linha de comando gcloud para criar e executar um job do Cloud Scheduler que publique "avaliações positivas" e "avaliações negativas".

  1. Crie um job do Cloud Scheduler para este projeto do Google Cloud.
    gcloud scheduler jobs create pubsub positive-ratings-publisher \
      --schedule="* * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
    
  2. Isso cria e executa um editor para "avaliações positivas" que publica uma mensagem por minuto.
  3. Inicie o job do Cloud Scheduler.
    gcloud scheduler jobs run positive-ratings-publisher
    
  4. Crie e execute outro editor semelhante para "avaliações negativas" que publica uma mensagem a cada dois minutos.
    gcloud scheduler jobs create pubsub negative-ratings-publisher \
      --schedule="*/2 * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "negative"}'
    
    gcloud scheduler jobs run negative-ratings-publisher
    

Crie um conjunto de dados do BigQuery

Use o comando bq mk:

export PROJECT="$(gcloud config get-value project)"
export DATASET="beam_samples"
export TABLE="streaming_beam_sql"

bq mk --dataset "$PROJECT:$DATASET"

Como fazer o download da amostra de código

  1. Faça o download da amostra de código.

    Java

    Clone o repositório java-docs-samples (em inglês) e navegue até a amostra de código para este tutorial.

      git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
      cd java-docs-samples/dataflow/flex-templates/streaming_beam_sql

    Python

    Clone o repositório python-docs-samples (em inglês) e navegue até a amostra de código para este tutorial.

      git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
      cd python-docs-samples/dataflow/flex-templates/streaming_beam
  2. Exporte o TEMPLATE_IMAGE para este tutorial.
    export TEMPLATE_IMAGE="gcr.io/$PROJECT/samples/dataflow/streaming-beam-sql:latest"
    

Como configurar o ambiente de desenvolvimento

Java

  1. Faça o download e instale a versão 11 do Kit de desenvolvimento para Java (JDK, na sigla em inglês). Verifique se a variável de ambiente JAVA_HOME (links em inglês) está definida e aponta para a instalação do JDK.
  2. Faça o download e instale o Apache Maven seguindo o guia de instalação (links em inglês) para seu sistema operacional específico.
  3. (Opcional) Execute o pipeline do Apache Beam localmente para fazer o desenvolvimento.
      mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.samples.StreamingBeamSql \
        -Dexec.args="\
          --project=$PROJECT \
          --inputSubscription=$SUBSCRIPTION \
          --outputTable=$PROJECT:$DATASET.$TABLE \
          --tempLocation=gs://$BUCKET/samples/dataflow/temp"
  4. Crie o projeto Java em um arquivo Uber JAR.
      mvn clean package
  5. (Opcional) Anote o tamanho do arquivo Uber JAR e faça uma comparação com o arquivo original.
      ls -lh target/*.jar
    Esse arquivo Uber JAR tem todas as dependências incorporadas. É possível executar esse arquivo como um aplicativo autônomo sem dependências externas em outras bibliotecas.

Python

Use o SDK do Apache Beam para Python com pip e as versão 2.7, 3.5, 3.6 ou 3.7 do Python. Para verificar se você tem uma instalação do Python e pip em funcionamento, execute:

    python --version
    python -m pip --version

Caso você não tenha o Python, as etapas de instalação para seu sistema operacional podem ser encontradas na página de instalação do Python.

Somente Python: como criar uma imagem de contêiner

Nesta seção, você verá as etapas para usuários do Python. Se você estiver usando Java, pule as etapas a seguir.

  1. (Opcional) Ative o uso do cache do Kaniko por padrão.
    gcloud config set builds/use_kaniko True
    
    O Kaniko armazena em cache os artefatos do build de contêiner. Portanto, o uso dessa opção acelera a criação dos builds subsequentes.
  2. (Opcional) Crie o Dockerfile. É possível personalizar o Dockerfile deste tutorial. O arquivo inicial tem a seguinte aparência:

    Python

      FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
    
      ARG WORKDIR=/dataflow/template
      RUN mkdir -p ${WORKDIR}
      WORKDIR ${WORKDIR}
    
      # Due to a change in the Apache Beam base image in version 2.24, you must to install
      # libffi-dev manually as a dependency. For more information:
      #   https://github.com/GoogleCloudPlatform/python-docs-samples/issues/4891
      RUN apt-get update && apt-get install -y libffi-dev && rm -rf /var/lib/apt/lists/*
    
      COPY requirements.txt .
      COPY streaming_beam.py .
    
      ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
      ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
    
      RUN pip install -U -r ./requirements.txt

    Esse Dockerfile contém os comandos FROM, ENV e COPY. Veja mais informações sobre eles na referência do Dockerfile.

    Imagens começando com gcr.io/PROJECT/ são salvas no Container Registry do seu projeto, em que a imagem pode ser acessada por outros produtos do Google Cloud.
  3. Crie a imagem do Docker usando um Dockerfile com o Cloud Build.
    gcloud builds submit --tag $TEMPLATE_IMAGE .
    

Como criar um modelo flexível

Para executar um modelo, é preciso criar um arquivo de especificação de modelo em um Cloud Storage contendo todas as informações necessárias para executar o job, como informações e metadados do SDK.

O arquivo metadata.json (em inglês) neste exemplo contém mais informações do modelo, como os campos name, description e parameters de entrada.

  1. Crie um arquivo de especificação de modelo que contenha todas as informações necessárias para executar o job, como as informações e os metadados do SDK.
    export TEMPLATE_PATH="gs://$BUCKET/samples/dataflow/templates/streaming-beam-sql.json"
    
  2. Crie o modelo flexível.

    Java

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image-gcr-path "$TEMPLATE_IMAGE" \
          --sdk-language "JAVA" \
          --flex-template-base-image JAVA11 \
          --metadata-file "metadata.json" \
          --jar "target/streaming-beam-sql-1.0.jar" \
          --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSQL"

    Python

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image "$TEMPLATE_IMAGE" \
          --sdk-language "PYTHON" \
          --metadata-file "metadata.json"

O modelo agora está disponível por meio do arquivo de modelo no local do Cloud Storage que você especificou.

Como executar um pipeline de modelo flexível

Agora é possível executar o pipeline do Apache Beam no Dataflow fazendo referência ao arquivo de modelo e transmitindo os parâmetros do modelo exigidos pelo pipeline.

  1. Execute o modelo.

    Java

      export REGION="us-central1"
    
      gcloud dataflow flex-template run "streaming-beam-sql-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters inputSubscription="$SUBSCRIPTION" \
        --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"

    Python

      export REGION="us-central1"
    
      gcloud dataflow flex-template run "streaming-beam-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters input_subscription="$SUBSCRIPTION" \
        --parameters output_table="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"
    Como alternativa, execute o modelo com uma solicitação da API REST.
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/us-central1/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "inputSubscription": "'$SUBSCRIPTION'",
            "outputTable": "'$PROJECT:$DATASET.$TABLE'"
          },
          "containerSpecGcsPath": "'$TEMPLATE_PATH'"
        }
      }'
    
  2. Depois de executar o comando para executar o modelo flexível, o Dataflow retorna um ID de job com o status Na fila. Pode levar alguns minutos até que o status do job atinja Em execução e você possa acessar o gráfico de jobs.
  3. Verifique os resultados no BigQuery executando a seguinte consulta:
    bq query --use_legacy_sql=false 'SELECT * FROM `'"$PROJECT.$DATASET.$TABLE"'`'
    
    Enquanto este pipeline estiver em execução, será possível ver novas linhas anexadas à tabela do BigQuery a cada minuto.

Como limpar

Depois de concluir este tutorial, será possível limpar os recursos criados no Google Cloud para não ser cobrado por eles no futuro. Nas seções a seguir, você aprenderá a excluir e desativar esses recursos.

Limpar os recursos do modelo flexível

  1. Pare o pipeline do Dataflow.
    gcloud dataflow jobs list \
      --filter 'NAME:streaming-beam-sql AND STATE=Running' \
      --format 'value(JOB_ID)' \
      --region "$REGION" \
      | xargs gcloud dataflow jobs cancel --region "$REGION"
    
  2. Exclua o arquivo de especificações de modelo do Cloud Storage.
    gsutil rm $TEMPLATE_PATH
    
  3. Exclua a imagem do contêiner do modelo flexível do Container Registry.
    gcloud container images delete $TEMPLATE_IMAGE --force-delete-tags
    

Limpar recursos do projeto do Google Cloud

  1. Exclua os jobs do Cloud Scheduler.
    gcloud scheduler jobs delete negative-ratings-publisher
    gcloud scheduler jobs delete positive-ratings-publisher
    
  2. Exclua a assinatura e o tópico do Pub/Sub.
    gcloud pubsub subscriptions delete $SUBSCRIPTION
    gcloud pubsub topics delete $TOPIC
    
  3. Exclua a tabela do BigQuery.
    bq rm -f -t $PROJECT:$DATASET.$TABLE
    
  4. Exclua o conjunto de dados do BigQuery. Isso não gera cobranças.

    O comando a seguir também exclui todas as tabelas no conjunto de dados. Não é possível recuperar as tabelas e os dados.

    bq rm -r -f -d $PROJECT:$DATASET
    
  5. Exclua o bucket do Cloud Storage. Isso não gera cobranças.

    O comando a seguir também exclui todos os objetos no bucket. Não é possível recuperar esses objetos.

    gsutil rm -r gs://$BUCKET
    

Limitações

As seguintes limitações aplicam-se aos jobs de modelos flexíveis:

  • Use uma imagem de base fornecida pelo Google para empacotar os contêineres usando o Docker.
  • Não é possível atualizar jobs de streaming.
  • Não há suporte para o uso do FlexRS.
  • waitUntilFinish (Java) e wait_until_finish (Python) não são compatíveis.

A seguir