Pub/Sub to Java Database Connectivity(JDBC)テンプレート

Pub/Sub to Java Database Connectivity(JDBC)テンプレートは、既存の Pub/Sub サブスクリプションから JSON 文字列としてデータを取り込み、結果のレコードを JDBC に書き込むストリーミング パイプラインです。

パイプラインの要件

  • パイプラインを実行する前に Pub/Sub サブスクリプションが存在している必要があります。
  • パイプラインを実行する前に JDBC ソースが存在している必要があります。
  • パイプラインを実行する前に Pub/Sub 出力デッドレター トピックが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
driverClassName JDBC ドライバのクラス名。例: com.mysql.jdbc.Driver
connectionUrl JDBC 接続 URL 文字列。例: jdbc:mysql://some-host:3306/sampledbこの値は、Cloud KMS 鍵で暗号化され、Base64 でエンコードされた文字列として渡すことができます。Base64 でエンコードされた文字列から空白文字を削除します。
driverJars JDBC ドライバのカンマ区切りの Cloud Storage パス。例: gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar
username 省略可: JDBC 接続に使用するユーザー名。この値は、Cloud KMS 鍵で暗号化された Base64 エンコード文字列として渡すことができます。
password 省略可: JDBC 接続に使用するパスワード。この値は、Cloud KMS 鍵で暗号化された Base64 エンコード文字列として渡すことができます。
connectionProperties 省略可: JDBC 接続に使用するプロパティ文字列。文字列の形式は [propertyName=property;]* にする必要があります。例: unicode=true;characterEncoding=UTF-8
statement データベースに対して実行するステートメント。このステートメントには、テーブルの列名を任意の順序で指定する必要があります。指定した列名の値のみが JSON から読み取られ、ステートメントに追加されます。例: INSERT INTO tableName (column1, column2) VALUES (?,?)
inputSubscription 読み込まれる Pub/Sub 入力サブスクリプション。projects/<project>/subscriptions/<subscription> の形式で指定します。
outputDeadletterTopic 配信不能メッセージを転送するための Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
KMSEncryptionKey 省略可: ユーザー名、パスワード、接続文字列を復号するための Cloud KMS 暗号鍵。Cloud KMS 鍵が渡された場合、ユーザー名、パスワード、接続文字列はすべて暗号化されて渡されます。
extraFilesToStage ワーカーにステージングするファイルのカンマ区切りの Cloud Storage パスまたは Secret Manager シークレット。これらのファイルは、各ワーカーの /extra_files ディレクトリに保存されます。例: gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id>

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Pub/Sub to JDBC template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Jdbc \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
statement=SQL_STATEMENT,\
inputSubscription=INPUT_SUBSCRIPTION,\
outputDeadletterTopic=OUTPUT_DEADLETTER_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • DRIVER_CLASS_NAME: ドライバのクラス名
  • JDBC_CONNECTION_URL: JDBC 接続 URL
  • DRIVER_PATHS: カンマで区切った JDBC ドライバの Cloud Storage パス
  • CONNECTION_USERNAME: JDBC 接続のユーザー名
  • CONNECTION_PASSWORD: JDBC 接続パスワード
  • CONNECTION_PROPERTIES: JDBC 接続プロパティ(必要に応じて)
  • SQL_STATEMENT: データベースに対して実行される SQL ステートメント
  • INPUT_SUBSCRIPTION: 読み取り元の Pub/Sub 入力サブスクリプション
  • OUTPUT_DEADLETTER_TOPIC: 配信不能メッセージを転送するための Pub/Sub トピック
  • KMS_ENCRYPTION_KEY: Cloud KMS 暗号鍵

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "driverJars": "DRIVER_PATHS",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "statement": "SQL_STATEMENT",
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "outputDeadletterTopic": "OUTPUT_DEADLETTER_TOPIC",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" },
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • DRIVER_CLASS_NAME: ドライバのクラス名
  • JDBC_CONNECTION_URL: JDBC 接続 URL
  • DRIVER_PATHS: カンマで区切った JDBC ドライバの Cloud Storage パス
  • CONNECTION_USERNAME: JDBC 接続のユーザー名
  • CONNECTION_PASSWORD: JDBC 接続パスワード
  • CONNECTION_PROPERTIES: JDBC 接続プロパティ(必要に応じて)
  • SQL_STATEMENT: データベースに対して実行される SQL ステートメント
  • INPUT_SUBSCRIPTION: 読み取り元の Pub/Sub 入力サブスクリプション
  • OUTPUT_DEADLETTER_TOPIC: 配信不能メッセージを転送するための Pub/Sub トピック
  • KMS_ENCRYPTION_KEY: Cloud KMS 暗号鍵

次のステップ