Dataflow SQL を使用する

このページでは、Dataflow SQL の使用方法と Dataflow SQL ジョブの作成方法について説明します。

Dataflow SQL ジョブを作成するには、Dataflow SQL クエリを作成して実行する必要があります。

Dataflow SQL エディタを使用する

Dataflow SQL エディタは、Dataflow SQL ジョブを作成するためのクエリを作成して実行する Google Cloud Console のページです。

Dataflow SQL エディタにアクセスするには、次の手順を行います。

次の手順で、Dataflow モニタリング インターフェースから Dataflow SQL エディタにアクセスすることもできます。

  1. Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。

    [ジョブ] に移動

  2. Dataflow のメニューで、[SQL ワークスペース] をクリックします。

Dataflow SQL クエリを作成する

Dataflow SQL クエリは Dataflow SQL クエリ構文を使用します。Dataflow SQL クエリ構文は BigQuery の標準 SQLに似ています。

Dataflow SQL ストリーミング拡張機能を使用すると、Pub/Sub などの Dataflow ソースを継続的に更新し、データを集約できます。

たとえば、次のクエリは、タクシー乗車データの Pub/Sub ストリームで乗客数を 1 分ごとカウントします。

SELECT
  DATETIME(tr.window_start) AS starttime,
  SUM(tr.passenger_count) AS pickup_count
FROM TUMBLE ((SELECT * FROM pubsub.topic.`pubsub-public-data`.`taxirides-realtime`),
DESCRIPTOR(event_timestamp), 'INTERVAL 1 MINUTE') AS tr
WHERE
  tr.ride_status = "pickup"
GROUP BY DATETIME(tr.window_start)

Dataflow SQL クエリを実行する

Dataflow SQL クエリを実行すると、Dataflow はクエリを Apache Beam パイプラインに変換し、パイプラインを実行します。

Dataflow SQL クエリを実行するには、Google Cloud コンソールまたは Google Cloud CLI を使用します。

コンソール

Dataflow SQL クエリを実行するには、Dataflow SQL エディタを使用します。

  1. Dataflow SQL エディタページに移動します。

    Dataflow SQL エディタに移動

  2. クエリエディタに Dataflow SQL クエリを入力します。

  3. [ジョブを作成] をクリックしてジョブ オプションのパネルを開きます。

  4. (省略可)[ジョブ名] に、固有のジョブ名を入力します。

  5. [リージョン エンドポイント] で、メニューから値を選択します。

  6. (省略可)[オプション パラメータを表示] をクリックして、提供された Dataflow パイプライン オプションの値を入力します。

  7. [送信先] で [出力タイプ] を選択し、表示されたフィールドに値を入力します。

  8. (省略可)[SQL クエリ パラメータ] セクションで、パラメータを追加し、表示されたフィールドに値を入力します。

  9. [作成] をクリックします。

gcloud

Dataflow SQL クエリを実行するには、gcloud dataflow sql query コマンドを実行します。作成する SQL クエリの例を以下に示します。

gcloud dataflow sql query \
  --job-name=JOB_NAME \
  --region=REGION \
  --bigquery-table=BIGQUERY_TABLE \
  --bigquery-dataset=BIGQUERY_DATASET \
  --bigquery-project=BIGQUERY_PROJECT \
'SQL_QUERY'

以下を置き換えます。

  • JOB_NAME: 実際の Dataflow SQL ジョブの名前
  • REGION: Dataflow ジョブをデプロイする Dataflow のロケーション
  • BIGQUERY_TABLE: 出力を書き込む BigQuery テーブルの名前
  • BIGQUERY_DATASET: 出力テーブルを含む BigQuery データセット ID
  • BIGQUERY_PROJECT: 出力 BigQuery テーブルを含む Google Cloud プロジェクト ID
  • SQL_QUERY: Dataflow SQL クエリ

パイプライン オプションを設定する

Dataflow SQL ジョブに Dataflow パイプライン オプションを設定できます。Dataflow パイプライン オプションは、Dataflow SQL クエリの実行方法と場所を構成する実行パラメータです。

Dataflow SQL ジョブに Dataflow パイプライン オプションを設定するには、Dataflow SQL クエリの実行時に次のパラメータを指定します。

コンソール

パラメータ 説明 デフォルト値
リージョン エンドポイント String クエリを実行するリージョン。Dataflow SQL クエリは、Dataflow のロケーションのあるリージョンで実行できます。 設定されていない場合、デフォルトの us-central1. になります。
最大ワーカー数 int 実行中にパイプラインで使用できる Compute Engine インスタンスの最大数。 指定しない場合、Dataflow は適切なワーカー数を自動的に設定します。
ワーカー リージョン String ワーカー インスタンスを起動してパイプラインを実行するための Compute Engine のリージョン。Compute Engine ワーカー リージョンは、Dataflow ジョブ リージョンと異なるリージョンに存在できます。 設定されていない場合、指定した Dataflow リージョンがデフォルトで使用されます。
ワーカーゾーン String ワーカー インスタンスを起動してパイプラインを実行するための Compute Engine のゾーン。Compute Engine ゾーンは、Dataflow ジョブ リージョンと異なるリージョンに存在できます。

設定されていない場合、ワーカー リージョンのゾーンがデフォルトになります。

ワーカー リージョンが設定されていない場合、指定した Dataflow リージョンのゾーンがデフォルトになります。

サービス アカウントのメールアドレス String パイプラインの実行に使用するワーカー サービス アカウントのメールアドレス。メールアドレスは my-service-account-name@<project-id>.iam.gserviceaccount.com の形式にする必要があります。 設定しない場合、Dataflow ワーカーは、現在のプロジェクトの Compute Engine サービス アカウントをワーカー サービス アカウントとして使用します。
マシンタイプ String

Dataflow がワーカーの起動時に使用する Compute Engine のマシンタイプ。使用可能な任意の Compute Engine マシンタイプ ファミリーまたはカスタム マシンタイプを使用できます。

n1 マシンタイプを使用すると最適な結果が得られます。f1g1 シリーズのワーカーなどの共有コア マシンタイプは、Dataflow のサービスレベル契約ではサポートされません。

ワーカーの vCPU の数およびメモリ量(GB)に基づいて料金が請求されます。請求はマシンタイプ ファミリーとは独立しています。

設定しない場合、Dataflow は自動的にマシンタイプを選択します。
追加テスト String 有効にするテスト。テストには、enable_streaming_engine などの値や、shuffle_mode=service などの Key-Value ペアを指定できます。テストはカンマ区切りのリストで行う必要があります。 指定しない場合、テストは有効になりません。
ワーカー IP アドレスの構成 String

Dataflow ワーカーでパブリック IP アドレスを使用するかどうかを指定します。

この値を Private に設定すると、Dataflow ワーカーではすべての通信にプライベート IP アドレスが使用されます。指定する Network または Subnetwork限定公開の Google アクセスが有効になっている必要があります。

値を Private に設定し、Subnetwork オプションを指定した場合、Network オプションは無視されます。

設定しない場合、Public がデフォルトになります。
ネットワーク String ワーカーが割り当てられている Compute Engine ネットワーク 設定しない場合、ネットワーク default がデフォルトになります。
サブネットワーク String ワーカーが割り当てられている Compute Engine サブネットワーク。サブネットワークは regions/region/subnetworks/subnetwork の形式にする必要があります。 設定しない場合、Dataflow は自動的にサブネットワークを決定します。

gcloud

フラグ 説明 デフォルト値
‑‑region String クエリを実行するリージョン。Dataflow SQL クエリは、Dataflow のロケーションのあるリージョンで実行できます。 設定されていない場合は、エラーがスローされます。
‑‑max‑workers int 実行中にパイプラインで使用できる Compute Engine インスタンスの最大数。 指定しない場合、Dataflow は適切なワーカー数を自動的に設定します。
‑‑num‑workers int パイプラインの実行時に使用する Compute Engine インスタンスの初期の数。このパラメータにより、ジョブの開始時に Dataflow が起動するワーカー数が決まります。 指定しない場合、Dataflow は適切なワーカー数を自動的に設定します。
‑‑worker‑region String

ワーカー インスタンスを起動してパイプラインを実行するための Compute Engine のリージョン。Compute Engine ワーカー リージョンは、Dataflow ジョブ リージョンと異なるリージョンに存在できます。

‑‑worker‑region または ‑‑worker‑zone のいずれかを指定できます。

設定されていない場合、指定した Dataflow リージョンがデフォルトで使用されます。
‑‑worker‑zone String

ワーカー インスタンスを起動してパイプラインを実行するための Compute Engine のゾーン。Compute Engine ゾーンは、Dataflow ジョブ リージョンと異なるリージョンに存在できます。

‑‑worker‑region または ‑‑worker‑zone のいずれかを指定できます。

設定されていない場合、指定した Dataflow リージョンのゾーンがデフォルトになります。
‑‑worker‑machine‑type String

Dataflow がワーカーの起動時に使用する Compute Engine のマシンタイプ。使用可能な任意の Compute Engine マシンタイプ ファミリーまたはカスタム マシンタイプを使用できます。

n1 マシンタイプを使用すると最適な結果が得られます。f1g1 シリーズのワーカーなどの共有コア マシンタイプは、Dataflow のサービスレベル契約ではサポートされません。

ワーカーの vCPU の数およびメモリ量(GB)に基づいて料金が請求されます。請求はマシンタイプ ファミリーとは独立しています。

設定しない場合、Dataflow は自動的にマシンタイプを選択します。
‑‑service‑account‑email String パイプラインの実行に使用するワーカー サービス アカウントのメールアドレス。メールアドレスは my-service-account-name@<project-id>.iam.gserviceaccount.com の形式にする必要があります。 設定しない場合、Dataflow ワーカーは、現在のプロジェクトの Compute Engine サービス アカウントをワーカー サービス アカウントとして使用します。
‑‑disable‑public‑ips boolean

Dataflow ワーカーでパブリック IP アドレスを使用するかどうかを指定します。

設定した場合、Dataflow ワーカーはすべての通信にプライベート IP アドレスを使用します。

設定されていない場合、Dataflow ワーカーでパブリック IP アドレスが使用されます。
‑‑network String ワーカーが割り当てられている Compute Engine ネットワーク 設定しない場合、ネットワーク default がデフォルトになります。
‑‑subnetwork String ワーカーが割り当てられている Compute Engine サブネットワーク。サブネットワークは regions/region/subnetworks/subnetwork の形式にする必要があります。 設定しない場合、Dataflow は自動的にサブネットワークを決定します。
‑‑dataflow‑kms‑key String 保存データの暗号化に使用する顧客管理の暗号鍵(CMEK)。暗号鍵は Cloud KMS で制御できます。鍵はジョブと同じ場所に置く必要があります。 指定しない場合、CMEK の代わりにデフォルトの Google Cloud 暗号化が使用されます。

詳細については、gcloud dataflow sql query コマンド リファレンスをご覧ください。

Dataflow SQL ジョブを停止する

Dataflow SQL ジョブを停止するには、キャンセルする必要があります。drain オプションを使用して Dataflow SQL ジョブを停止することはできません。

料金

Dataflow SQL では標準の Dataflow 料金を使用します。別の料金体系はありません。SQL ステートメントに基づいて作成した Dataflow ジョブが使用するリソースは課金対象となります。vCPU、メモリ、Persistent Disk、Streaming Engine、Dataflow Shuffle などのリソースに対して Dataflow の標準料金が適用されます。

Dataflow SQL ジョブで Cloud Pub/Sub や BigQuery などの追加リソースが使用される可能性があり、その場合はそれぞれ固有の料金で課金されます。

Dataflow の料金の詳細については、Dataflow の料金をご覧ください。

次のステップ