Data Pipelines の操作

概要

Dataflow Data Pipelines を使用すると、繰り返しジョブのスケジュールを作成して、複数のジョブ実行にリソースが使用されている場所を把握できます。データ更新頻度の目標を定義して管理し、個々のパイプライン ステージまで掘り下げて調査できます。また、パイプラインの問題を修正して最適化できます。

Data Pipelines の機能

  • スケジュールに基づいてバッチジョブを実行する繰り返しバッチ パイプラインを作成する。
  • 最新バージョンの入力データに対してバッチジョブを繰り返し実行する増分バッチ パイプラインを作成する。
  • パイプラインの概要スコアカードを使用して、パイプラインの集計容量使用量とリソース消費量を表示する。
  • ストリーミング パイプラインのデータの更新頻度を表示する。この指標は、時間の経過とともに進化し、更新頻度が特定の目標を下回った場合に通知するアラートに関連付けられます。
  • パイプライン指標グラフを使用して、バッチ パイプライン ジョブを比較して、異常を見つける。

Data Pipelines の使用制限

  • 対応できるリージョン: Dataflow Data Pipelines は App Engine アプリケーションの Cloud Scheduler を使用するため、データ パイプラインは利用可能な App Engine リージョンで使用できます。

  • 割り当て上限:

    • プロジェクトあたりのパイプラインの最大数: 500
    • 組織あたりの最大パイプライン数: 2,500

API リファレンス ドキュメント:

API ドキュメントについては、Data Pipelines リファレンスをご覧ください。

データ パイプラインの種類

Dataflow Data Pipelines には、ストリーミングとバッチの 2 種類があります。どちらのタイプのパイプラインも、Dataflow テンプレートで定義されているジョブを実行します。

ストリーミング データ パイプライン
ストリーミング データ パイプラインは、Dataflow ストリーミング ジョブが作成された直後にそれを実行します。
バッチ データ パイプライン
バッチデータ パイプラインは、ユーザー定義のスケジュールで Dataflow バッチジョブを実行します。バッチ パイプラインの入力ファイル名をパラメータ化することで、増分バッチ パイプライン処理が可能になります。

増分バッチ パイプライン

日時プレースホルダを使用すると、バッチ パイプラインの増分入力ファイルの形式を指定できます。

  • 年、月、日付、時、分、秒のプレースホルダを使用できます。strftime() 形式に従う必要があります。プレースホルダの前には、パーセント記号(%)が付きます。
  • パイプラインの作成中は、パラメータの形式は検証されません。
    • 例: パラメータ化された入力ファイルのパスとして「gs://bucket/Y」を指定すると、「%」が前に付いていない「Y」は strftime() 形式に対応していないため、「gs://bucket/Y」として評価されます。

スケジュール設定されたバッチ パイプラインの実行時刻ごとに、入力ファイルパスのプレースホルダ部分が現在(またはシフト時間)の日時に評価されます(日付はスケジュールされたジョブのタイムゾーンでの現在の日付を使用して評価されます)。評価されたファイルのパスが入力ファイルのパスと一致する場合、ファイルは、スケジュールされた時間にバッチ パイプラインで処理するために取得されます。

  • 例: バッチ パイプラインは、PST の各時間 00 分(1 時間毎)に繰り返すようにスケジュールされます。入力ファイルのパスを gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv としてパラメータ化した場合、2021 年 4 月 15 日午後 6 時(PST)の入力ファイルのパスは、gs://bucket-name/2021-04-15/prefix-18_00.csv となります。

タイムシフト パラメータの使用

波括弧で囲まれた「+」や「-」が付いた分や時間のシフト パラメータ(「{[+|-][0-9]+[m|h]}」形式)を使用して、パイプライン スケジュールの現在の日時から前後にシフトした日時とするように入力ファイルパスの照合を補足できます。バッチ パイプラインはスケジュールされた時刻に繰り返し実行されますが、入力ファイルのパスは、指定した時間オフセットで求められます。

  • 例: バッチ パイプラインは、PST の各時間 00 分(1 時間毎)に繰り返すようにスケジュールされます。入力ファイルのパスを gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv{-2h} としてパラメータ化した場合、2021 年 4 月 15 日午後 6 時(PST)の入力ファイルのパスは、gs://bucket-name/2021-04-15/prefix-16_00.csv となります。

データ パイプラインのロール

データ パイプライン操作を正常に実行するには、次のように必要な IAM ロールがユーザーに付与されていなければなりません。

  1. 操作を行うため、ユーザーには次のロールが必要です。

    • Datapipelines.admin: すべてのデータ パイプライン操作を実行できます。
    • Datapipelines.viewer: データ パイプラインとジョブを表示できます。
    • Datapipelines.invoker: データ パイプライン ジョブを実行できます(このロールは API で有効にできます)。
  2. ユーザーは、Cloud Scheduler と Dataflow で、そのサービス アカウントの roles/iam.serviceAccountUser ロールを付与されたサービス アカウントとして処理を実行できる必要があります。ユーザーが Cloud Scheduler と Dataflow のサービス アカウントを選択しない場合は、デフォルトの Compute Engine サービス アカウントが使用されます。

データ パイプラインの作成

データ パイプラインは、次の 2 つの方法で作成できます。

  1. ジョブのインポート
  2. データ パイプラインの作成

Data Pipelines の設定ページ: Cloud Console で、Dataflow パイプライン機能に初めてアクセスすると設定ページが開きます。

  1. 一覧表示された API を有効にします。
  2. Cloud Scheduler がパイプラインのスケジュール設定に使用する App Engine アプリケーションのリージョンを選択します。

ジョブのインポート

クラシックまたは Flex テンプレートに基づく Dataflow のバッチジョブやストリーミング ジョブをインポートして、データ パイプラインにできます。

  1. Cloud Console で、[Dataflow ジョブ] ページに移動し、完了したジョブを選択して、[ジョブの詳細] ページで [+パイプラインとしてインポート] を選択します。

  2. [テンプレートからのパイプラインの作成] ページで、[データ パイプライン] パイプライン オプションが選択されています。その他のパラメータには、インポートしたジョブのオプションが入力されます。

    1. バッチジョブの場合は、[テンプレート パラメータ] の [パイプラインのスケジュール設定] セクションで繰り返しのスケジュールを指定します。バッチ実行のスケジュール設定で使用される Cloud Scheduler のメール アカウント アドレスは指定しなくても構いません。指定しない場合は、デフォルトの Compute Engine サービス アカウントが使用されます。注: ユーザーには、Cloud Scheduler が使用するサービス アカウント(ユーザー指定かデフォルトの Compute Engine のサービス アカウント)の roles/iam.serviceAccountUser ロールが付与されている必要があります(データ パイプラインのロールをご覧ください)。

データ パイプラインの作成

  1. Cloud Console で [Dataflow パイプライン] ページに移動し、[+データ パイプラインの作成] を選択します。

  2. [ジョブの管理] の [テンプレートからのパイプラインの作成] ページで、[データ パイプライン] を選択し、パイプライン名を指定して、他のテンプレートの選択フィールドとパラメータ フィールドに入力します。

    1. バッチジョブの場合は、[テンプレート パラメータ] の [パイプラインのスケジュール設定] セクションで繰り返しのスケジュールを指定します。バッチ実行のスケジュール設定で使用される Cloud Scheduler のメール アカウント アドレスは指定しなくても構いません。指定しない場合は、デフォルトの Compute Engine サービス アカウントが使用されます。注: ユーザーには、Cloud Scheduler が使用するサービス アカウント(ユーザー指定かデフォルトの Compute Engine のサービス アカウント)の roles/iam.serviceAccountUser ロールが付与されている必要があります(データ パイプラインのロールをご覧ください)。

バッチデータ パイプラインの作成

このサンプル バッチデータ パイプラインを作成するには、プロジェクト内の次のリソースにアクセスできる必要があります。

このパイプラインの例では、Cloud Storage から CSV 形式のファイルを読み取り、変換を実行して、値を your-project-id:your-dataset-name.three.column_table に挿入する Cloud Storage Text to BigQuery のバッチ パイプライン テンプレートを使用します。

  1. ローカル ドライブに次のファイルを作成します。
    1. 宛先 BigQuery テーブルの次のスキーマを含む bq_three_column_table.json ファイル。
{
  "BigQuery Schema": [
    {
      "name": "col1",
      "type": "STRING"
    },
    {
      "name": "col2",
      "type": "STRING"
    },
    {
      "name": "col3",
      "type": "INT64"
    }
  ]
}
  1. BigQuery に挿入する前に入力データの単純な変換を行う split_csv_3cols.js JavaScript ファイル。
function transform(line) {
    var values = line.split(',');
    var obj = new Object();
    obj.col1 = values[0];
    obj.col2 = values[1];
    obj.col3 = values[2];
    var jsonString = JSON.stringify(obj);
    return jsonString;
}
  1. BigQuery テーブルに挿入される複数のレコードを含む file01.csv CSV ファイル。
    b8e5087a,74,27531
    7a52c051,4a,25846
    672de80f,cd,76981
    111b92bf,2e,104653
    ff658424,f0,149364
    e6c17c75,84,38840
    833f5a69,8f,76892
    d8c833ff,7d,201386
    7d3da7fb,d5,81919
    3836d29b,70,181524
    ca66e6e5,d7,172076
    c8475eb6,03,247282
    558294df,f3,155392
    737b82a8,c7,235523
    82c8f5dc,35,468039
    57ab17f9,5e,480350
    cbcdaf84,bd,354127
    52b55391,eb,423078
    825b8863,62,88160
    26f16d4f,fd,397783
      
  2. 次のように、gsutil を使用してプロジェクトの Cloud Storage バケット内のフォルダにファイルをコピーします。
    1. bq_three_column_table.jsonsplit_csv_3cols.jsgs://your-bucket/text_to_bigquery/ にコピーします。
      gsutil cp bq_three_column_table.json gs://your-bucket/text_to_bigquery/
        gsutil cp split_csv_3cols.js gs://your-bucket/text_to_bigquery/
      
    2. file01.csvgs://your-bucket/inputs/ にコピーします。
      gsutil cp file01.csv gs://your-bucket/inputs/
      
  3. Cloud Storage ブラウザyour-bucket に「tmp」フォルダを作成します。フォルダ名を選択して、バケットの詳細ページを開きます。次に、[フォルダを作成] をクリックして、バケットに「tmp」フォルダを作成します。
  4. [Dataflow パイプライン] ページに移動し、[データ パイプラインを作成] を選択します。[テンプレートからのパイプラインの作成] ページで、次の項目を入力または選択します。

    1. ジョブ管理:
      1. [データ パイプライン] を選択します。
      2. パイプライン名: 「text_to_bq_batch_data_pipeline」と入力します。
      3. [続行] をクリックします。
    2. テンプレートの選択:
      1. リージョン エンドポイント: Compute Engine リージョンを選択します。
      2. テンプレート リスト: [Process Data in Bulk (batch)] で、[Text File on Cloud Storage to BigQuery] を選択します。説明: バッチ パイプライン。Cloud Storage に保存されているテキスト ファイルを読み取り、JavaScript ユーザー定義関数(UDF)を使用して変換し、結果を BigQuery に出力します。注: [Process Data Continuously (stream)] で同じ名前のストリーミング パイプラインを選択しないでください。
      3. [続行] をクリックします。
    3. テンプレートのパラメータ:
      1. パイプラインをスケジュールする: 使用しているタイムゾーンで [Hourly at Minute 25] などのスケジュールを選択します。後で説明するように、スケジュールはパイプラインを送信した後に編集できます。
    4. 必須パラメータ:
      1. Cloud Storage 内の JavaScript UDF パス
        gs://your-bucket/text_to_bigquery/split_csv_3cols.js
        
      2. JSON パス:
        gs://your-bucket/text_to_bigquery/bq_three_column_table.json
        
      3. JavaScript UDF 名: 「transform」
      4. BigQuery 出力テーブル(完全修飾テーブル名):
        your_project_id:your_dataset.three_column_table
        
      5. Cloud Storage 入力パス:
        gs://your_bucket/inputs/file*.csv
        
      6. 一時 BigQuery ディレクトリ:
        gs://your_bucket/tmp
        
      7. 一時的なロケーション:
        gs://your_bucket/tmp
        
    5. [送信] をクリックします。
  5. パイプラインとテンプレートの情報を確認し、[パイプラインの詳細] ページで現在の値と履歴を確認します。

Dataflow Pipelines コンソールの [実行] ボタンを使用して、バッチ パイプラインをオンデマンドで実行することもできます。

サンプル ストリーミング データ パイプラインの作成

サンプル ストリーミング データ パイプラインは、サンプルのバッチ パイプラインの手順に沿って作成できます。ただし、次の違いがあります。

  • パイプライン スケジュール。ストリーミング データ パイプラインのスケジュールは指定しません。Dataflow ストリーミング ジョブはすぐに開始されます。

  • テンプレートの選択: [Process Data Continuously (stream)] で [Text Files on Cloud Storage to BigQuery (Stream)] を選択します。説明: Cloud Storage に格納されたテキスト ファイルを読み取り、ユーザー定義の JavaScript 関数を介して変換を実行し、結果を BigQuery にストリーミングできるストリーミング パイプライン。このパイプラインには、BigQuery TableSchema の JavaScript 関数と JSON 表現が必要です。

  • ワーカー マシンタイプ: パイプラインは、gs://<your_bucket>/inputs/file*.csv パターンに一致する複数の初期ファイルと、このパターンに一致し inputs/ フォルダにアップロードする追加のファイルを処理します。CSV ファイルのサイズが数 GB を超える場合は、メモリ不足エラーが発生しないように、デフォルトの n1-standard-4 マシンタイプよりも大容量のメモリを備えたマシンタイプ(n1-highmem-8 など)を選択します。

パイプラインの目標違反の調査

繰り返し実行するバッチ パイプライン

Cloud Console の [パイプラインの詳細] ページで、パイプラインの状態の初期分析に、パイプライン ステータス パネルの [個々のジョブ ステータス] グラフと [ステップあたりのスレッド時間] グラフを使用します。

調査の例:

  1. 1 時間に 1 回(毎 x 時 3 分)に実行される繰り返しバッチ パイプラインがあり、通常、各ジョブは約 9 分実行され、すべてのジョブは 10 分未満で完了するという目標が設定しているとします。

  2. [ジョブ ステータス] グラフは、ジョブが 10 分以上実行されていたことを表しています。

  3. 更新 / 実行履歴のテーブルで、対象の時間に実行されていたジョブをクリックし、Dataflow ジョブの詳細ページに移動します。そのページで実行ステージが長いジョブを探し、ログでエラーを確認して遅延の原因を特定します。

ストリーミング パイプライン

Cloud Console の [パイプラインの詳細] ページの [パイプラインの情報] タブで、パイプライン ステータス パネルのデータの更新頻度グラフを使用して、パイプラインの状態の初期分析を行います。

調査の例:

  1. 通常、ストリーミング パイプラインは、20 秒のデータの更新頻度で 1 回出力を生成します。

  2. データ更新頻度の保証目標を 30 秒に設定します。データの更新頻度グラフを確認すると、午前 9 時から 10 時の間のデータの更新頻度が、ほぼ 40 秒に上昇していることがわかります。

  3. [パイプラインの指標] タブに切り替え、さらに分析するためにスループット、CPU 使用率、メモリ使用率のグラフを表示します。