Modelos utilitários fornecidos pelo Google

O Google fornece um conjunto de modelos de código aberto (em inglês) do Cloud Dataflow. Para informações gerais sobre modelos, consulte a página Visão geral. Para uma lista com todos os modelos fornecidos pelo Google, consulte a página Primeiros passos com os modelos fornecidos pelo Google.

Esta página documenta modelos de utilitário:

Compactação em massa de arquivos do Cloud Storage

O modelo "Compactação em massa de arquivos do Cloud Storage" é um pipeline em lote que compacta arquivos do Cloud Storage para um local específico. Esse modelo pode ser útil para compactar grandes lotes de arquivos como parte de um processo de arquivamento periódico. Os modos de compactação compatíveis são: BZIP2, DEFLATE e GZIP. A saída de arquivos para o local de destino seguirá o esquema de nomeação do arquivo original anexado à extensão do modo de compactação. As extensões anexadas podem ser dos tipos a seguir: .bzip2, .deflate e .gz.

Qualquer erro ocorrido durante o processo de compactação é enviado para o arquivo de falha em formato CSV com o nome do arquivo e a mensagem de erro. Se nenhuma falha ocorrer durante a execução do pipeline, o arquivo de erro ainda será criado, mas não conterá registros de erro.

Requisitos para este pipeline:

  • A compactação precisa estar em um dos seguintes formatos: BZIP2, DEFLATE e GZIP.
  • O diretório de saída precisa ser criado antes de executar o pipeline.

Parâmetros do modelo

Parâmetro Descrição
inputFilePattern O padrão do arquivo de entrada a ser lido. Por exemplo, gs://bucket-name/uncompressed/*.txt
outputDirectory O local de saída da gravação. Por exemplo, gs://bucket-name/compressed/
outputFailureFile O arquivo de saída do registro de erros a ser usado para gravação de falhas ocorridas durante o processo de compactação. Por exemplo, gs://bucket-name/compressed/failed.csv. O arquivo será criado mesmo que não haja falhas, mas estará vazio. O conteúdo do arquivo está no formato CSV (nome do arquivo, erro) e consiste em uma linha para cada arquivo com falha na compactação.
compression O algoritmo de compactação usado para compactar os arquivos correspondentes. Precisa ser um dos seguintes formatos: BZIP2, DEFLATE e GZIP.

Como executar o modelo "Compactação em massa de arquivos do Cloud Storage"

CONSOLE

Executar a partir do Console do Google Cloud
  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em Criar job usando um modelo.
  4. Criar job a partir de um botão de modelo no Console do Cloud Platform
  5. Selecione the Bulk Compress Cloud Storage Files template no menu suspenso Modelo do Dataflow.
  6. Digite o nome de um job no campo Nome do job. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Digite os valores de parâmetro nos campos fornecidos.
  8. Clique em Executar job.

GCLOUD

Executar na ferramenta de linha de comandogcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, você precisa ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Substitua os valores a seguir neste exemplo:

  • Substitua YOUR_PROJECT_ID pelo ID do projeto.
  • Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Substitua YOUR_BUCKET_NAME pelo nome do bucket no Cloud Storage.
  • Substitua COMPRESSION pela opção de algoritmo de compactação.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Bulk_Compress_GCS_Files \
    --parameters \
inputFilePattern=gs://YOUR_BUCKET_NAME/uncompressed/*.txt,\
outputDirectory=gs://YOUR_BUCKET_NAME/compressed,\
outputFailureFile=gs://YOUR_BUCKET_NAME/failed/failure.csv,\
compression=COMPRESSION

API

Executar pela API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua os valores a seguir neste exemplo:

  • Substitua YOUR_PROJECT_ID pelo ID do projeto.
  • Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Substitua YOUR_BUCKET_NAME pelo nome do bucket no Cloud Storage.
  • Substitua COMPRESSION pela opção de algoritmo de compactação.
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Bulk_Compress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://YOUR_BUCKET_NAME/uncompressed/*.txt",
       "outputDirectory": "gs://YOUR_BUCKET_NAME/compressed",
       "outputFailureFile": "gs://YOUR_BUCKET_NAME/failed/failure.csv",
       "compression": "COMPRESSION"
   },
   "environment": { "zone": "us-central1-f" }
}

Descompactação em massa de arquivos do Cloud Storage

O modelo "Descompactação em massa de arquivos do Cloud Storage" é um pipeline em lote que descompacta arquivos no Cloud Storage para um local específico. Essa funcionalidade é útil quando você quer usar dados compactados para minimizar os custos de largura de banda da rede durante uma migração, mas quer maximizar a velocidade de processamento analítico ao operar em dados não compactados após a migração. O pipeline lida automaticamente com vários modos de compactação durante uma única execução e determina o modo de descompactação a ser usado com base na extensão do arquivo (.bzip2, .deflate, .gz, .zip).

Requisitos para este pipeline:

  • Os arquivos a serem descompactados precisam estar em um dos formatos a seguir: Bzip2, Deflate, Gzip, Zip.
  • O diretório de saída precisa ser criado antes de executar o pipeline.

Parâmetros do modelo

Parâmetro Descrição
inputFilePattern O padrão do arquivo de entrada a ser lido. Por exemplo, gs://bucket-name/compressed/*.gz
outputDirectory O local de saída da gravação. Por exemplo, gs://bucket-name/decompressed
outputFailureFile O arquivo de saída do registro de erros a ser usado para falhas de gravação que ocorrem durante o processo de descompactação. Por exemplo, gs://bucket-name/decompressed/failed.csv. O arquivo será criado mesmo que não haja falhas, mas estará vazio. O conteúdo do arquivo está no formato CSV (nome do arquivo, erro) e consiste em uma linha para cada arquivo que falhou na descompactação.

Como executar o modelo Descompactação em massa de arquivos do Cloud Storage

CONSOLE

Executar a partir do Console do Google Cloud
  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em Criar job usando um modelo.
  4. Criar job a partir de um botão de modelo no Console do Cloud Platform
  5. Selecione the Bulk Decompress Cloud Storage Files template no menu suspenso Modelo do Dataflow.
  6. Digite o nome de um job no campo Nome do job. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Digite os valores de parâmetro nos campos fornecidos.
  8. Clique em Executar job.

GCLOUD

Executar na ferramenta de linha de comandogcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files

Substitua os valores a seguir neste exemplo:

  • Substitua YOUR_PROJECT_ID pelo ID do projeto.
  • Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Substitua YOUR_BUCKET_NAME pelo nome do bucket no Cloud Storage.
  • Substitua OUTPUT_FAILURE_FILE_PATH pela sua opção de caminho para o arquivo que contém as informações de falha.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Bulk_Decompress_GCS_Files \
    --parameters \
inputFilePattern=gs://YOUR_BUCKET_NAME/compressed/*.gz,\
outputDirectory=gs://YOUR_BUCKET_NAME/decompressed,\
outputFailureFile=OUTPUT_FAILURE_FILE_PATH

API

Executar pela API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua os valores a seguir neste exemplo:

  • Substitua YOUR_PROJECT_ID pelo ID do projeto.
  • Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Substitua YOUR_BUCKET_NAME pelo nome do bucket no Cloud Storage.
  • Substitua OUTPUT_FAILURE_FILE_PATH pela sua opção de caminho para o arquivo que contém as informações de falha.
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Bulk_Decompress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://YOUR_BUCKET_NAME/compressed/*.gz",
       "outputDirectory": "gs://YOUR_BUCKET_NAME/decompressed",
       "outputFailureFile": "OUTPUT_FAILURE_FILE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

Exclusão em massa do Datastore

O modelo "Exclusão em massa do Datastore" é um pipeline que lê as entidades do Datastore com uma consulta GQL e exclui todas as entidades correspondentes no projeto de destino selecionado. Como opção, o pipeline pode passar as entidades do Datastore codificadas por JSON para sua UDF do JavaScript. Use-a para filtrar as entidades retornando valores nulos.

Requisitos para este pipeline:

  • O Datastore precisa ser configurado no projeto antes de executar o modelo.
  • Em caso de leitura e exclusão de instâncias do Datastore separadas, a Conta de serviço do controlador do Dataflow precisa ter permissão para ler de uma instância e excluir da outra.

Parâmetros do modelo

Parâmetro Descrição
datastoreReadGqlQuery Consulta GQL que especifica quais entidades precisam corresponder para exclusão. Por exemplo, "SELECT * FROM MyKind".
datastoreReadProjectId O ID do projeto do GCP da instância do Datastore a partir da qual você quer ler entidades (usando sua consulta GQL) que são usadas para correspondência.
datastoreDeleteProjectId O ID do projeto do GCP da instância do Datastore a partir da qual as entidades correspondentes serão excluídas. Isso pode ser o mesmo que datastoreReadProjectId se você quiser ler e excluir na mesma instância do Datastore.
datastoreReadNamespace Opcional: namespace das entidades solicitadas. Defina como "" para o namespace padrão.
javascriptTextTransformGcsPath [Opcional] Caminho do Cloud Storage que contém todo o código JavaScript. Por exemplo, "gs://mybucket/mytransforms/*.js". Se você não quiser usar uma UDF, deixe esse campo em branco.
javascriptTextTransformFunctionName [Opcional] Nome da função a ser chamada. Se essa função retornar um valor indefinido ou nulo para uma determinada entidade do Datastore, a entidade não será excluída. Caso você tenha o código JavaScript "function myTransform(inJson) { ...dostuff...}", o nome da função será "myTransform". Se você não quiser usar uma UDF, deixe esse campo em branco.

Como executar o modelo "Exclusão em massa do Datastore"

CONSOLE

Executar a partir do Console do Google Cloud
  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em Criar job usando um modelo.
  4. Criar job a partir de um botão de modelo no Console do Cloud Platform
  5. Selecione the Datastore Bulk Delete template no menu suspenso Modelo do Dataflow.
  6. Digite o nome de um job no campo Nome do job. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Digite os valores de parâmetro nos campos fornecidos.
  8. Clique em Executar job.

GCLOUD

Executar na ferramenta de linha de comandogcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete

Substitua os valores a seguir neste exemplo:

  • Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Substitua GQL_QUERY pela consulta que você usará para corresponder entidades para exclusão.
  • Substitua DATASTORE_READ_AND_DELETE_PROJECT_ID pelo código do projeto referente à instância do Datastore. Este exemplo será lido e excluído da mesma instância do Datastore.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Datastore_to_Datastore_Delete \
    --parameters \
datastoreReadGqlQuery="GQL_QUERY",\
datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\
datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID

API

Executar pela API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua os valores a seguir neste exemplo:

  • Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Substitua GQL_QUERY pela consulta que você usará para corresponder entidades para exclusão.
  • Substitua DATASTORE_READ_AND_DELETE_PROJECT_ID pelo código do projeto referente à instância do Datastore. Este exemplo será lido e excluído da mesma instância do Datastore.
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Datastore_to_Datastore_Delete
{
   "jobName": "JOB_NAME",
   "parameters": {
       "datastoreReadGqlQuery": "GQL_QUERY",
       "datastoreReadProjectId": "READ_PROJECT_ID",
       "datastoreDeleteProjectId": "DELETE_PROJECT_ID"
   },
   "environment": { "zone": "us-central1-f" }
   }
}

Gerador de dados de streaming para o Pub/Sub

Durante o desenvolvimento de pipelines do Dataflow, um dos requisitos comuns é executar uma comparação de desempenho em consultas específicas por segundo (QPS, na sigla em inglês). Este modelo pode ser usado para gerar mensagens JSON falsas com base no esquema fornecido pelo usuário na taxa especificada para um tópico do Pub/Sub.

A biblioteca Gerador de dados JSON usada pelo pipeline permite que várias funções de falsificação sejam usadas para cada campo de esquema. Consulte os documentos para mais informações sobre as funções e o formato do esquema fictícios.

Requisitos para este pipeline:

  • Crie um arquivo de esquema e armazene-o no local do Cloud Storage.
  • O tópico de saída do Pub/Sub precisa existir antes da execução.

Parâmetros do modelo

Parâmetro Descrição
schemaLocation Local do arquivo de esquema. Por exemplo, gs://mybucket/filename.json.
topic Nome do tópico do Pub/Sub no qual o pipeline deve publicar os dados. Por exemplo: projects/<project-id>/topics/<topic-name>
qps Número de mensagens a serem publicadas por segundo. Por exemplo, 100.
autoscalingAlgorithm [Opcional] Algoritmo usado para escalonamento automático dos workers. Os valores possíveis são THROUGHPUT_BASED para ativar o escalonamento automático ou NONE para desativá-lo.
maxNumWorkers [Opcional] Número máximo de máquinas de worker. Por exemplo, 10.

Como executar o modelo do Gerador de dados de streaming

CONSOLE

Executar a partir do Console do Google Cloud
  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em Criar job usando um modelo.
  4. Criar job a partir de um botão de modelo no Console do Cloud Platform
  5. Selecione the Streaming Data Generator template no menu suspenso Modelo do Dataflow.
  6. Digite o nome de um job no campo Nome do job. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Digite os valores de parâmetro nos campos fornecidos.
  8. Clique em Executar job.

GCLOUD

Executar a partir da ferramenta de linha de comando gcloud

Observação: para usar modelos com a ferramenta de linha de comando gcloud, você precisa ter a versão 284.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator

Substitua os valores a seguir neste exemplo:

  • Substitua YOUR_PROJECT_ID pelo ID do projeto.
  • Substitua REGION_NAME pelo nome da região do Dataflow. Por exemplo, us-central1.
  • Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Substitua SCHEMA_LOCATION pelo caminho para o arquivo de esquema no Cloud Storage. Por exemplo, gs://mybucket/filename.json.
  • Substitua PUBSUB_TOPIC pelo tópico do Pub/Sub de saída. Por exemplo, projects/<project-id>/topics/<topic-name>.
  • Substitua QPS pelo número de mensagens a serem publicadas por segundo.
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
topic=PUBSUB_TOPIC,\
qps=QPS
  

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

Substitua os valores a seguir neste exemplo:

  • Substitua YOUR_PROJECT_ID pelo ID do projeto.
  • Substitua LOCATION pelo nome da região do Dataflow. Por exemplo, us-central1.
  • Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Substitua SCHEMA_LOCATION pelo caminho para o arquivo de esquema no Cloud Storage. Por exemplo, gs://mybucket/filename.json.
  • Substitua PUBSUB_TOPIC pelo tópico do Pub/Sub de saída. Por exemplo, projects/<project-id>/topics/<topic-name>.
  • Substitua QPS pelo número de mensagens a serem publicadas por segundo.
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "topic": "PUBSUB_TOPIC",
          "qps": "QPS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Streaming_Data_Generator",
   }
}