Modelo do Pub/Sub para Java Database Connectivity (JDBC)

O modelo do Pub/Sub para Java Database Connectivity (JDBC) é um pipeline de streaming que ingere dados de uma assinatura preexistente do Cloud Pub/Sub como strings JSON e grava os registros resultantes no JDBC.

Requisitos de pipeline

  • A assinatura do Pub/Sub precisa ter sido criada antes da execução do pipeline.
  • A origem do JDBC precisa ter sido criada antes da execução do pipeline.
  • O tópico de mensagens mortas do Pub/Sub precisa existir antes da execução do pipeline.

Parâmetros do modelo

Parâmetro Descrição
driverClassName O nome da classe do driver do JDBC. Por exemplo, com.mysql.jdbc.Driver.
connectionUrl A string do URL de conexão do JDBC. Por exemplo, jdbc:mysql://some-host:3306/sampledb É possível transmitir esse valor como uma string criptografada com uma chave do Cloud KMS e, em seguida, codificada em Base64. Remova os caracteres de espaço em branco da string codificada em Base64.
driverJars Caminhos do Cloud Storage separados por vírgulas para drivers JDBC. Por exemplo, gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar
username Opcional: o nome do usuário a ser usado para a conexão JDBC. É possível transmitir esse valor criptografado por uma chave do Cloud KMS como uma string codificada em Base64.
password Opcional: a senha a ser usada para a conexão JDBC. É possível transmitir esse valor criptografado por uma chave do Cloud KMS como uma string codificada em Base64.
connectionProperties Opcional: o string de propriedades a ser usada para a conexão JDBC. O formato da string precisa ser [propertyName=property;]*. Por exemplo, unicode=true;characterEncoding=UTF-8.
statement Instrução a ser executada no banco de dados. A instrução precisa especificar os nomes das colunas da tabela em qualquer ordem. Somente os valores dos nomes das colunas especificadas são lidos no JSON e adicionados à instrução. Por exemplo, INSERT INTO tableName (column1, column2) VALUES (?,?).
inputSubscription O tópico de entrada do Cloud Pub/Sub que será lido, no formato de projects/<project>/subscriptions/<subscription>.
outputDeadletterTopic O tópico do Pub/Sub para encaminhar mensagens não entregues. Por exemplo, projects/<project-id>/topics/<topic-name>
KMSEncryptionKey Opcional: a chave de criptografia do Cloud KMS para descriptografar o nome de usuário, senha e string de conexão. Se a chave do Cloud KMS for transmitida, o nome de usuário, senha e string de conexão precisarão ser transmitidos criptografados.
extraFilesToStage Caminhos do Cloud Storage separados ou vírgulas do Secret Manager para que os arquivos sejam organizados no worker. Esses arquivos serão salvos no diretório /extra_files de cada worker. Por exemplo, gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id>.

Executar o modelo

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to JDBC template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Jdbc \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
statement=SQL_STATEMENT,\
inputSubscription=INPUT_SUBSCRIPTION,\
outputDeadletterTopic=OUTPUT_DEADLETTER_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • DRIVER_CLASS_NAME: o nome da classe do
  • JDBC_CONNECTION_URL: o URL de conexão de JDBC
  • DRIVER_PATHS: os caminho(s) do Cloud Storage separado(s) por vírgula do(s) driver(s) JDBC
  • CONNECTION_USERNAME: o nome de usuário da conexão JDBC.
  • CONNECTION_PASSWORD: a senha de conexão JDBC
  • CONNECTION_PROPERTIES: as propriedades da conexão JDBC, se necessário
  • SQL_STATEMENT: a instrução SQL a ser executada no banco de dados
  • INPUT_SUBSCRIPTION: a assinatura de entrada do Pub/Sub da qual ler
  • OUTPUT_DEADLETTER_TOPIC: o tópico do Pub/Sub para encaminhar mensagens não entregues
  • KMS_ENCRYPTION_KEY: a chave de criptografia do Cloud KMS

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "driverJars": "DRIVER_PATHS",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "statement": "SQL_STATEMENT",
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "outputDeadletterTopic": "OUTPUT_DEADLETTER_TOPIC",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" },
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • DRIVER_CLASS_NAME: o nome da classe do
  • JDBC_CONNECTION_URL: o URL de conexão de JDBC
  • DRIVER_PATHS: os caminho(s) do Cloud Storage separado(s) por vírgula do(s) driver(s) JDBC
  • CONNECTION_USERNAME: o nome de usuário da conexão JDBC.
  • CONNECTION_PASSWORD: a senha de conexão JDBC
  • CONNECTION_PROPERTIES: as propriedades da conexão JDBC, se necessário
  • SQL_STATEMENT: a instrução SQL a ser executada no banco de dados
  • INPUT_SUBSCRIPTION: a assinatura de entrada do Pub/Sub da qual ler
  • OUTPUT_DEADLETTER_TOPIC: o tópico do Pub/Sub para encaminhar mensagens não entregues
  • KMS_ENCRYPTION_KEY: a chave de criptografia do Cloud KMS
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.io.DynamicJdbcIO;
import com.google.cloud.teleport.v2.options.PubsubToJdbcOptions;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.utils.JsonStringToQueryMapper;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Splitter;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link PubsubToJdbc} streaming pipeline reads data from Google Cloud PubSub and publishes to
 * JDBC.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Pubsub_to_Jdbc.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Pubsub_to_Jdbc",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub to JDBC",
    description =
        "The Pub/Sub to Java Database Connectivity (JDBC) template is a streaming pipeline that ingests data from a "
            + "pre-existing Cloud Pub/Sub subscription as JSON strings, and writes the resulting records to JDBC.",
    optionsClass = PubsubToJdbcOptions.class,
    flexContainerName = "pubsub-to-jdbc",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-jdbc",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The Cloud Pub/Sub subscription must exist prior to running the pipeline.",
      "The JDBC source must exist prior to running the pipeline.",
      "The Cloud Pub/Sub output deadletter topic must exist prior to running the pipeline.",
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class PubsubToJdbc {

  /* Logger for class.*/
  private static final Logger LOG = LoggerFactory.getLogger(PubsubToJdbc.class);

  /** String/String Coder for FailsafeElement. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    PubsubToJdbcOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubToJdbcOptions.class);

    run(options);
  }

  /**
   * Runs a pipeline which reads message from Pub/Sub and writes to JDBC.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(PubsubToJdbcOptions options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    LOG.info("Starting Pubsub-to-Jdbc Pipeline.");

    /*
     * Steps:
     *  1) Read data from a Pub/Sub subscription
     *  2) Write to Jdbc Table
     *  3) Write errors to deadletter topic
     */
    PCollection<String> pubsubData =
        pipeline.apply(
            "readFromPubSubSubscription",
            PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));

    DynamicJdbcIO.DynamicDataSourceConfiguration dataSourceConfiguration =
        DynamicJdbcIO.DynamicDataSourceConfiguration.create(
                options.getDriverClassName(),
                maybeDecrypt(options.getConnectionUrl(), options.getKMSEncryptionKey()).get())
            .withDriverJars(options.getDriverJars());
    if (options.getUsername() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withUsername(
              maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()).get());
    }
    if (options.getPassword() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withPassword(
              maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()).get());
    }
    if (options.getConnectionProperties() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withConnectionProperties(options.getConnectionProperties());
    }

    PCollection<FailsafeElement<String, String>> errors =
        pubsubData
            .apply(
                "writeToJdbc",
                DynamicJdbcIO.<String>write()
                    .withDataSourceConfiguration(dataSourceConfiguration)
                    .withStatement(options.getStatement())
                    .withPreparedStatementSetter(
                        new JsonStringToQueryMapper(getKeyOrder(options.getStatement()))))
            .setCoder(FAILSAFE_ELEMENT_CODER);

    errors.apply(
        "WriteFailedRecords",
        ErrorConverters.WriteStringMessageErrorsToPubSub.newBuilder()
            .setErrorRecordsTopic(options.getOutputDeadletterTopic())
            .build());

    return pipeline.run();
  }

  private static List<String> getKeyOrder(String statement) {
    int startIndex = statement.indexOf("(");
    int endIndex = statement.indexOf(")");
    String data = statement.substring(startIndex + 1, endIndex);
    return Splitter.on(',').trimResults().splitToList(data);
  }
}

A seguir