Modelo do Pub/Sub para o MongoDB

O modelo Pub/Sub para MongoDB é um pipeline de streaming que lê mensagens codificadas em JSON de uma subscrição do Pub/Sub e escreve-as no MongoDB como documentos. Se necessário, este pipeline suporta transformações adicionais que podem ser incluídas através de uma função definida pelo utilizador (UDF) em JavaScript.

Se ocorrerem erros durante o processamento de registos, o modelo escreve-os numa tabela do BigQuery, juntamente com a mensagem de entrada. Por exemplo, podem ocorrer erros devido a uma incompatibilidade de esquemas, a um JSON com formato incorreto ou durante a execução de transformações. Especifique o nome da tabela no parâmetro deadletterTable. Se a tabela não existir, o pipeline cria-a automaticamente.

Requisitos do pipeline

  • A subscrição do Pub/Sub tem de existir e as mensagens têm de estar codificadas num formato JSON válido.
  • O cluster do MongoDB tem de existir e deve estar acessível a partir das máquinas de trabalho do Dataflow.

Parâmetros de modelos

Parâmetros obrigatórios

  • inputSubscription: nome da subscrição do Pub/Sub. Por exemplo, projects/your-project-id/subscriptions/your-subscription-name.
  • mongoDBUri: lista separada por vírgulas de servidores MongoDB. Por exemplo, host1:port,host2:port,host3:port.
  • base de dados: base de dados no MongoDB para armazenar a coleção. Por exemplo, my-db.
  • collection: nome da coleção na base de dados MongoDB. Por exemplo, my-collection.
  • deadletterTable: a tabela do BigQuery que armazena mensagens causadas por falhas, como esquemas incompatíveis, JSON com formato incorreto, etc. Por exemplo, your-project-id:your-dataset.your-table-name.

Parâmetros opcionais

  • batchSize: tamanho do lote usado para a inserção em lote de documentos no MongoDB. A predefinição é: 1000.
  • batchSizeBytes: tamanho do lote em bytes. A predefinição é: 5242880.
  • maxConnectionIdleTime: tempo de inatividade máximo permitido em segundos antes de ocorrer o limite de tempo da ligação. A predefinição é: 60000.
  • sslEnabled: valor booleano que indica se a ligação ao MongoDB tem o SSL ativado. A predefinição é: true.
  • ignoreSSLCertificate: valor booleano que indica se o certificado SSL deve ser ignorado. A predefinição é: true.
  • withOrdered: valor Booleano que permite inserções em massa ordenadas no MongoDB. A predefinição é: true.
  • withSSLInvalidHostNameAllowed: valor booleano que indica se é permitido um nome do anfitrião inválido para a ligação SSL. A predefinição é: true.
  • javascriptTextTransformGcsPath: o URI do Cloud Storage do ficheiro .js que define a função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: o nome da função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo, se o código da função JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função é myTransform. Para ver exemplos de UDFs JavaScript, consulte Exemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: especifica a frequência com que o UDF é recarregado, em minutos. Se o valor for superior a 0, o Dataflow verifica periodicamente o ficheiro UDF no Cloud Storage e recarrega a UDF se o ficheiro for modificado. Este parâmetro permite-lhe atualizar a UDF enquanto o pipeline está em execução, sem ter de reiniciar a tarefa. Se o valor for 0, o recarregamento das FDU está desativado. O valor predefinido é 0.

Função definida pelo utilizador

Opcionalmente, pode estender este modelo escrevendo uma função definida pelo utilizador (FDU). O modelo chama a FDU para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.

Especificação da função

A FDU tem a seguinte especificação:

  • Entrada: uma única linha de um ficheiro CSV de entrada.
  • Saída: um documento JSON convertido em string para inserir no MongoDB.

Execute o modelo

Consola

  1. Aceda à página do fluxo de dados Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the Pub/Sub to MongoDB template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Clique em Executar tarefa.

gcloud

Na shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • INPUT_SUBSCRIPTION: a subscrição do Pub/Sub (por exemplo, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: os endereços do servidor MongoDB (por exemplo, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: o nome da base de dados do MongoDB (por exemplo, users)
  • COLLECTION: o nome da coleção do MongoDB (por exemplo, profiles)
  • UNPROCESSED_TABLE: o nome da tabela do BigQuery (por exemplo, your-project:your-dataset.your-table-name)

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • INPUT_SUBSCRIPTION: a subscrição do Pub/Sub (por exemplo, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: os endereços do servidor MongoDB (por exemplo, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: o nome da base de dados do MongoDB (por exemplo, users)
  • COLLECTION: o nome da coleção do MongoDB (por exemplo, profiles)
  • UNPROCESSED_TABLE: o nome da tabela do BigQuery (por exemplo, your-project:your-dataset.your-table-name)

O que se segue?