MySQL または PostgreSQL への Datastream(Stream)テンプレート

Datastream to SQL テンプレートは、Datastream データを読み取り、そのデータを MySQL データベースまたは PostgreSQL データベースに複製するストリーミング パイプラインです。このテンプレートは、Pub/Sub 通知を使用して Cloud Storage からデータを読み取り、このデータを SQL レプリカ テーブルに複製します。

このテンプレートはデータ定義言語(DDL)をサポートしていないため、すべてのテーブルがすでにデータベースに存在することを想定しています。レプリケーションでは、Dataflow ステートフル変換を使用して最新でないデータをフィルタし、順不同データの整合性を確保します。たとえば、ある行のより新しいバージョンの読み取りがすでに行われている場合、その行の遅れて到着したバージョンは無視されます。データ操作言語(DML)を実行することは、ソースデータとターゲット データを完全に複製するための最善の方法です。実行される DML ステートメントは、次のルールに従います。

  • 主キーが存在する場合、insert と update のオペレーションでは upsert 構文を使用します(例: INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE)。
  • 主キーが存在する場合、削除 DML として delete が複製されます。
  • 主キーが存在しない場合、insert と update の両方のオペレーションがテーブルに挿入されます。
  • 主キーが存在しない場合、delete は無視されます。

Oracle から Postgres へのユーティリティを使用する場合で、主キーが存在しない場合は、SQL に ROWID を主キーとして追加します。

パイプラインの要件

  • データを複製する準備ができている、またはすでに複製している Datastream ストリーム。
  • Datastream データに対して Cloud Storage の Pub/Sub 通知が有効になっている。
  • PostgreSQL データベースが必要なスキーマでシードされている。
  • Dataflow ワーカーと PostgreSQL 間のネットワーク アクセスが設定されている。

テンプレートのパラメータ

必須パラメータ

  • inputFilePattern: 複製する Cloud Storage 内の Datastream ファイルの場所。通常、このファイルの場所はストリームのルートパスです。
  • databaseHost: 接続先の SQL ホスト。
  • databaseUser: レプリケーションのすべてのテーブルへの書き込みに必要なすべての権限を持つ SQL ユーザー。
  • databasePassword: SQL ユーザーのパスワード。

オプション パラメータ

  • gcsPubSubSubscription: Datastream ファイル通知を含む Pub/Sub サブスクリプション。たとえば、projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID> のようにします。
  • inputFileFormat: Datastream によって生成された出力ファイルの形式。たとえば、avrojson です。デフォルトは avro です。
  • streamName: スキーマ情報をポーリングするストリームの名前またはテンプレート。デフォルト値は {_metadata_stream} です。
  • rfcStartDateTime: Cloud Storage(https://tools.ietf.org/html/rfc3339)からのフェッチに使用される開始日時。デフォルトは 1970-01-01T00:00:00.00Z です。
  • dataStreamRootUrl: Datastream API のルート URL。デフォルトは https://datastream.googleapis.com/ です。
  • databaseType: 書き込み先のデータベースの種類(Postgres など)。デフォルトは postgres です。
  • databasePort: 接続する SQL データベース ポート。デフォルト値は 5432 です。
  • databaseName: 接続する SQL データベースの名前。デフォルト値は postgres です。
  • schemaMap: スキーマ名の変更を指示するために使用される Key-Value のマップ(例: old_name:new_name、CaseError:case_error)。デフォルトは空です。
  • customConnectionString: デフォルトのデータベース文字列の代わりに使用される接続文字列(省略可)。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Cloud Datastream to SQL template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: Datastream データへの Cloud Storage パス。例: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: 変更されたファイルを読み取る Pub/Sub サブスクリプション。例: projects/my-project-id/subscriptions/my-subscription-id
  • DATABASE_HOST: SQL ホスト IP。
  • DATABASE_USER: SQL ユーザー。
  • DATABASE_PASSWORD: SQL パスワード。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: Datastream データへの Cloud Storage パス。例: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: 変更されたファイルを読み取る Pub/Sub サブスクリプション。例: projects/my-project-id/subscriptions/my-subscription-id
  • DATABASE_HOST: SQL ホスト IP。
  • DATABASE_USER: SQL ユーザー。
  • DATABASE_PASSWORD: SQL パスワード。

次のステップ