Modelos utilitários fornecidos pelo Google

O Google fornece um conjunto de modelos de código aberto (em inglês) do Cloud Dataflow. Para informações gerais sobre modelos, consulte a página Visão geral. Para uma lista de todos os modelos fornecidos pelo Google, consulte esta página.

Esta página documenta modelos de utilitário.

Compactação em massa de arquivos do Cloud Storage

O modelo "Compactação em massa de arquivos do Cloud Storage" é um pipeline em lote que compacta arquivos do Cloud Storage para um local específico. Esse modelo pode ser útil para compactar grandes lotes de arquivos como parte de um processo de arquivamento periódico. Os modos de compactação compatíveis são: BZIP2, DEFLATE e GZIP. A saída de arquivos para o local de destino seguirá o esquema de nomeação do arquivo original anexado à extensão do modo de compactação. As extensões anexadas podem ser dos tipos a seguir: .bzip2, .deflate e .gz.

Qualquer erro ocorrido durante o processo de compactação é enviado para o arquivo de falha em formato CSV com o nome do arquivo e a mensagem de erro. Se nenhuma falha ocorrer durante a execução do pipeline, o arquivo de erro ainda será criado, mas não conterá registros de erro.

Requisitos para este pipeline:

  • A compactação precisa estar em um dos seguintes formatos: BZIP2, DEFLATE e GZIP.
  • O diretório de saída precisa ser criado antes de executar o pipeline.

Parâmetros do modelo

Parâmetro Descrição
inputFilePattern O padrão do arquivo de entrada a ser lido. Por exemplo, gs://bucket-name/uncompressed/*.txt
outputDirectory O local de saída da gravação. Por exemplo, gs://bucket-name/compressed/
outputFailureFile O arquivo de saída do registro de erros a ser usado para gravação de falhas ocorridas durante o processo de compactação. Por exemplo, gs://bucket-name/compressed/failed.csv. O arquivo será criado mesmo que não haja falhas, mas estará vazio. O conteúdo do arquivo está no formato CSV (nome do arquivo, erro) e consiste em uma linha para cada arquivo com falha na compactação.
compression O algoritmo de compactação usado para compactar os arquivos correspondentes. Precisa ser um dos seguintes formatos: BZIP2, DEFLATE e GZIP.

Como executar o modelo Compactação em massa de arquivos do Cloud Storage

Console

Executar a partir do Console do Google Cloud
  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em Criar job usando um modelo.
  4. Criar job usando um botão de modelo no Console do Cloud Platform
  5. Selecione the Bulk Compress Cloud Storage Files template no menu suspenso Modelo do Dataflow.
  6. Digite o nome de um job no campo Nome do job.
  7. Digite os valores de parâmetro nos campos fornecidos.
  8. Clique em Executar job.

gcloud

Executar na ferramenta de linha de comandogcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Bulk_Compress_GCS_Files \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/uncompressed/*.txt,\
outputDirectory=gs://BUCKET_NAME/compressed,\
outputFailureFile=gs://BUCKET_NAME/failed/failure.csv,\
compression=COMPRESSION

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • COMPRESSION: opção de algoritmo de compactação

API

Executar pela API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Bulk_Compress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/uncompressed/*.txt",
       "outputDirectory": "gs://BUCKET_NAME/compressed",
       "outputFailureFile": "gs://BUCKET_NAME/failed/failure.csv",
       "compression": "COMPRESSION"
   },
   "environment": { "zone": "us-central1-f" }
}

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • COMPRESSION: opção de algoritmo de compactação

Descompactação em massa de arquivos do Cloud Storage

O modelo "Descompactação em massa de arquivos do Cloud Storage" é um pipeline em lote que descompacta arquivos no Cloud Storage para um local específico. Essa funcionalidade é útil quando você quer usar dados compactados para minimizar os custos de largura de banda da rede durante uma migração, mas quer maximizar a velocidade de processamento analítico ao operar em dados não compactados após a migração. O pipeline lida automaticamente com vários modos de compactação durante uma única execução e determina o modo de descompactação a ser usado com base na extensão do arquivo (.bzip2, .deflate, .gz, .zip).

Requisitos para este pipeline:

  • Os arquivos a serem descompactados precisam estar em um dos formatos a seguir: Bzip2, Deflate, Gzip, Zip.
  • O diretório de saída precisa ser criado antes de executar o pipeline.

Parâmetros do modelo

Parâmetro Descrição
inputFilePattern O padrão do arquivo de entrada a ser lido. Por exemplo, gs://bucket-name/compressed/*.gz
outputDirectory O local de saída da gravação. Por exemplo, gs://bucket-name/decompressed
outputFailureFile O arquivo de saída do registro de erros a ser usado para falhas de gravação que ocorrem durante o processo de descompactação. Por exemplo, gs://bucket-name/decompressed/failed.csv. O arquivo será criado mesmo que não haja falhas, mas estará vazio. O conteúdo do arquivo está no formato CSV (nome do arquivo, erro) e consiste em uma linha para cada arquivo que falhou na descompactação.

Como executar o modelo Descompactação em massa de arquivos do Cloud Storage

Console

Executar a partir do Console do Google Cloud
  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em Criar job usando um modelo.
  4. Criar job usando um botão de modelo no Console do Cloud Platform
  5. Selecione the Bulk Decompress Cloud Storage Files template no menu suspenso Modelo do Dataflow.
  6. Digite o nome de um job no campo Nome do job.
  7. Digite os valores de parâmetro nos campos fornecidos.
  8. Clique em Executar job.

gcloud

Executar na ferramenta de linha de comandogcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Bulk_Decompress_GCS_Files \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/compressed/*.gz,\
outputDirectory=gs://BUCKET_NAME/decompressed,\
outputFailureFile=OUTPUT_FAILURE_FILE_PATH

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • OUTPUT_FAILURE_FILE_PATH: sua escolha de caminho para o arquivo que contém as informações de falha

API

Executar pela API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Bulk_Decompress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/compressed/*.gz",
       "outputDirectory": "gs://BUCKET_NAME/decompressed",
       "outputFailureFile": "OUTPUT_FAILURE_FILE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • OUTPUT_FAILURE_FILE_PATH: sua escolha de caminho para o arquivo que contém as informações de falha

Exclusão em massa do Datastore

O modelo "Exclusão em massa do Datastore" é um pipeline que lê as entidades do Datastore com uma consulta GQL e exclui todas as entidades correspondentes no projeto de destino selecionado. Como opção, o pipeline pode passar as entidades do Datastore codificadas por JSON para sua UDF do JavaScript. Use-a para filtrar as entidades retornando valores nulos.

Requisitos para este pipeline:

  • O Datastore precisa ser configurado no projeto antes de executar o modelo.
  • Em caso de leitura e exclusão de instâncias do Datastore separadas, a Conta de serviço do controlador do Dataflow precisa ter permissão para ler de uma instância e excluir da outra.

Parâmetros do modelo

Parâmetro Descrição
datastoreReadGqlQuery Consulta GQL que especifica quais entidades devem corresponder para exclusão. Usar uma consulta apenas de chaves pode melhorar o desempenho. Por exemplo: "SELECT __key__ FROM MyKind".
datastoreReadProjectId O ID do projeto do GCP da instância do Datastore a partir da qual você quer ler entidades (usando sua consulta GQL) que são usadas para correspondência.
datastoreDeleteProjectId O ID do projeto do GCP da instância do Datastore a partir da qual as entidades correspondentes serão excluídas. Isso pode ser o mesmo que datastoreReadProjectId se você quiser ler e excluir na mesma instância do Datastore.
datastoreReadNamespace Opcional: namespace das entidades solicitadas. Defina como "" para o namespace padrão.
javascriptTextTransformGcsPath (Opcional) Um caminho do Cloud Storage que contém todo o código do JavaScript. Por exemplo, gs://mybucket/mytransforms/*.js Se você não quiser usar uma UDF, deixe esse campo em branco.
javascriptTextTransformFunctionName [Opcional] Nome da função a ser chamada. Se essa função retornar um valor indefinido ou nulo para uma determinada entidade do Datastore, a entidade não será excluída. Caso você tenha o código JavaScript "function myTransform(inJson) { ...dostuff...}", o nome da função será "myTransform". Se você não quiser usar uma UDF, deixe esse campo em branco.

Como executar o modelo "Exclusão em massa do Datastore"

Console

Executar a partir do Console do Google Cloud
  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em Criar job usando um modelo.
  4. Criar job usando um botão de modelo no Console do Cloud Platform
  5. Selecione the Datastore Bulk Delete template no menu suspenso Modelo do Dataflow.
  6. Digite o nome de um job no campo Nome do job.
  7. Digite os valores de parâmetro nos campos fornecidos.
  8. Clique em Executar job.

gcloud

Executar na ferramenta de linha de comandogcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, é necessário ter a versão 138.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Datastore_to_Datastore_Delete \
    --parameters \
datastoreReadGqlQuery="GQL_QUERY",\
datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\
datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • GQL_QUERY: a consulta que você usará para corresponder entidades para exclusão
  • DATASTORE_READ_AND_DELETE_PROJECT_ID: o ID do projeto da instância do Datastore. Este exemplo lê e exclui da mesma instância do Datastore.

API

Executar pela API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Datastore_to_Datastore_Delete
{
   "jobName": "JOB_NAME",
   "parameters": {
       "datastoreReadGqlQuery": "GQL_QUERY",
       "datastoreReadProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID",
       "datastoreDeleteProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID"
   },
   "environment": { "zone": "us-central1-f" }
   }
}

Substitua:

  • PROJECT_ID: ID do projeto
  • JOB_NAME: um nome de job de sua escolha
  • GQL_QUERY: a consulta que você usará para corresponder entidades para exclusão
  • DATASTORE_READ_AND_DELETE_PROJECT_ID: o ID do projeto da instância do Datastore. Este exemplo lê e exclui da mesma instância do Datastore.

Gerador de dados de streaming para Pub/Sub, BigQuery e Cloud Storage

O modelo Gerador de dados de streaming é usado para gerar um número ilimitado ou fixo de registros sintéticos ou mensagens com base no esquema fornecido pelo usuário na taxa especificada. Os destinos compatíveis incluem tópicos do Pub/Sub, tabelas do BigQuery e buckets do Cloud Storage.

Veja a seguir um conjunto de alguns casos de uso possíveis:

  • Simule a publicação de eventos em tempo real em grande escala em um tópico do Pub/Sub para medir e determinar o número e o tamanho dos consumidores necessários para processar eventos publicados.
  • Gere dados sintéticos para uma tabela do BigQuery ou um bucket do Cloud Storage para avaliar comparativos de mercado de desempenho ou servir como prova de conceito.

Coletores e formatos de codificação compatíveis

A tabela a seguir descreve quais coletores e formatos de codificação são compatíveis com este modelo:
JSON Avro Parquet
Pub/Sub Sim Sim Não
BigQuery Sim Não Não
Cloud Storage Sim Sim Sim

A biblioteca Gerador de dados JSON usada pelo pipeline permite que várias funções faker sejam usadas em cada campo de esquema. Para saber mais sobre as funções faker e o formato de esquema, consulte a documentação do json-data-generator.

Requisitos para este pipeline:

  • Crie um arquivo de esquema de mensagens e armazene-o em um local do Cloud Storage.
  • O destino da saída precisa existir antes da execução. O destino precisa ser um tópico do Pub/Sub, uma tabela do BigQuery ou um bucket do Cloud Storage, dependendo do tipo de coletor.
  • Se a codificação de saída for Avro ou Parquet, crie um arquivo de esquema Avro e armazene-o em um local do Cloud Storage.

Parâmetros do modelo

Parâmetro Descrição
schemaLocation Local do arquivo de esquema. Por exemplo, gs://mybucket/filename.json.
qps Número de mensagens a serem publicadas por segundo. Por exemplo, 100.
sinkType [Opcional] Tipo de coletor de saída. Os valores possíveis são PUBSUB, BIGQUERY, GCS. O padrão é PUBSUB.
outputType [Opcional] Tipo de codificação de saída. Os valores possíveis são JSON, AVRO, PARQUET. O padrão é JSON.
avroSchemaLocation [Opcional] Local do arquivo de esquema AVRO. Obrigatório quando outputType for AVRO ou PARQUET. Por exemplo, gs://mybucket/filename.avsc.
topic [Opcional] Nome do tópico do Pub/Sub em que o pipeline precisa publicar dados. Obrigatório quando sinkType for Pub/Sub. Por exemplo, projects/<project-id>/topics/<topic-name>.
outputTableSpec [Opcional] Nome da tabela de saída do BigQuery. Obrigatório quando sinkType for BigQuery. Por exemplo, your-project:your-dataset.your-table-name.
writeDisposition [Opcional] Disposição de gravação do BigQuery. Os valores possíveis são WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. O padrão é WRITE_APPEND.
outputDeadletterTable [Opcional] Nome da tabela de saída do BigQuery para guardar os registros com falha. Se não for fornecido, o pipeline criará a tabela durante a execução com o nome {nome_da_tabela_de_saida}_registros_de_erros. Por exemplo, your-project:your-dataset.your-table-name.
outputDirectory [Opcional] Caminho do local de saída do Cloud Storage. Obrigatório quando sinkType for Cloud Storage. Por exemplo, gs://mybucket/pathprefix/.
outputFilenamePrefix [Opcional] Prefixo do nome do arquivo de saída gravado no Cloud Storage. O padrão é output-.
windowDuration [Opcional] Intervalo de janela em que a saída é gravada no Cloud Storage. O padrão é 1 min (ou seja, 1 minuto).
numShards [Opcional] Número máximo de fragmentos de saída. Obrigatório quando sinkType é Cloud Storage e precisa ser definido como um número maior ou igual a 1.
messagesLimit [Opcional] Número máximo de mensagens de saída. O padrão é 0, que indica ilimitado.
autoscalingAlgorithm [Opcional] Algoritmo usado para escalonamento automático dos workers. Os valores possíveis são THROUGHPUT_BASED para ativar o escalonamento automático ou NONE para desativá-lo.
maxNumWorkers [Opcional] Número máximo de máquinas de worker. Por exemplo, 10.

Como executar o modelo do Gerador de dados de streaming

Console

Executar a partir do Console do Google Cloud
  1. Acesse a página do Dataflow no Console do Cloud.
  2. Acessar a página do Dataflow
  3. Clique em Criar job usando um modelo.
  4. Criar job usando um botão de modelo no Console do Cloud Platform
  5. Selecione the Streaming Data Generator template no menu suspenso Modelo do Dataflow.
  6. Digite o nome de um job no campo Nome do job.
  7. Digite os valores de parâmetro nos campos fornecidos.
  8. Clique em Executar job.

gcloud

Executar a partir da ferramenta de linha de comando gcloud

Observação: para executar modelos usando a ferramenta de linha de comando gcloud, você precisa ter a versão 284.0.0 ou mais recente do SDK do Cloud.

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator
gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • PROJECT_ID: ID do projeto
  • REGION_NAME: o nome da região do Dataflow (por exemplo, us-central1)
  • SCHEMA_LOCATION: o caminho para o arquivo de esquema no Cloud Storage. Por exemplo, gs://mybucket/filename.json.
  • QPS: o número de mensagens a serem publicadas por segundo
  • PUBSUB_TOPIC: o tópico de saída do Pub/Sub. Por exemplo, projects/<project-id>/topics/<topic-name>.

API

Executar a partir da API REST

Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:

gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator

Para executar esse modelo com uma solicitação de API REST, envie uma solicitação HTTP POST com o ID do projeto. Essa solicitação requer uma autorização.

POST  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Streaming_Data_Generator",
   }
}
  

Substitua:

  • PROJECT_ID: ID do projeto
  • LOCATION: o nome da região do Dataflow (por exemplo, us-central1)
  • JOB_NAME: um nome de job de sua escolha
  • SCHEMA_LOCATION: o caminho para o arquivo de esquema no Cloud Storage. Por exemplo, gs://mybucket/filename.json.
  • QPS: o número de mensagens a serem publicadas por segundo
  • PUBSUB_TOPIC: o tópico de saída do Pub/Sub. Por exemplo, projects/<project-id>/topics/<topic-name>.