再利用可能なパイプラインを設計して作成する


このチュートリアルでは、Cloud Storage からデータを読み取り、データ品質検査を実行して、Cloud Storage に書き込む再利用可能なパイプラインを構築する方法を説明します。

再利用可能なパイプラインには正規のパイプライン構造がありますが、各パイプライン ノードの構成は、HTTP サーバーによって提供される構成に基づいて変更できます。たとえば、静的パイプラインはデータを Cloud Storage から読み取り、変換を適用して、BigQuery 出力テーブルに書き込みます。パイプラインが読み取る Cloud Storage ファイルに基づいて変換と BigQuery 出力テーブルを変更する場合は、再利用可能なパイプラインを作成します。

目標

  • Cloud Storage Argument Setter プラグインを使用して、パイプラインが実行ごとに異なる入力を読み取ることができるようにします。
  • Cloud Storage Argument Setter プラグインを使用して、パイプラインが実行ごとに異なる品質検査を実行できるようにします。
  • 実行ごとの出力データを Cloud Storage に書き込みます。

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

  • Cloud Data Fusion
  • Cloud Storage

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Google Cloud プロジェクトで課金が有効になっていることを確認します

  4. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  5. Google Cloud プロジェクトで課金が有効になっていることを確認します

  6. Cloud Data Fusion, Cloud Storage, BigQuery, and Dataproc API を有効にします。

    API を有効にする

  7. Cloud Data Fusion インスタンスを作成します

Cloud Data Fusion を使用する際は、Google Cloud コンソールと個別の Cloud Data Fusion ウェブ インターフェース の両方を使用します。 Google Cloud コンソールでは、Google Cloud コンソール プロジェクトを作成し、Cloud Data Fusion インスタンスを作成および削除できます。Cloud Data Fusion ウェブ インターフェースでは、[Pipeline Studio] や [Wrangler] などのさまざまなページで Cloud Data Fusion の機能を使用できます。

  1. Google Cloud コンソールで [インスタンス] ページを開きます。

    [インスタンス] ページを開く

  2. インスタンスの [操作] 列で、[インスタンスの表示] リンクをクリックします。Cloud Data Fusion ウェブ インターフェースが新しいブラウザタブで開きます。

Cloud Storage Argument Setter プラグインをデプロイする

  1. Cloud Data Fusion ウェブ インターフェースで、[Studio] ページに移動します。

  2. [Actions] メニューで、[GCS Argument Setter] をクリックします。

Cloud Storage からの読み取り

  1. Cloud Data Fusion ウェブ インターフェースで、[Studio] ページに移動します。
  2. [Source] をクリックし、[Cloud Storage] を選択します。Cloud Storage ソースのノードがパイプラインに表示されます。
  3. [Cloud Storage] ノードで、[プロパティ] をクリックします。

  4. [Reference name] フィールドに名前を入力します。

  5. [Path] フィールドに「${input.path}」と入力します。このマクロは、異なるパイプラインの実行で使用する Cloud Storage 入力パスを制御します。

  6. 右側の [Output Schema] パネルで、オフセット フィールド行のゴミ箱アイコンをクリックして、出力スキーマの [offset] フィールドを削除します。

  7. [Validate] をクリックして、エラーに対処します。

  8. [] をクリックして、[Properties] ダイアログを終了します。

データを変換する

  1. Cloud Data Fusion ウェブ インターフェースで、[Studio] ページのデータ パイプラインに移動します。
  2. [Transform] プルダウン メニュー で、[Wrangler] を選択します。
  3. Pipeline Studio キャンバスで、Cloud Storage ノードから Wrangler ノードに矢印をドラッグします。
  4. パイプラインの Wrangler ノードに移動し、[Properties] をクリックします。
  5. [Input field name] に「body」と入力します。
  6. [Recipe] フィールドに「${directives}」と入力します。このマクロは、異なるパイプラインの実行で使用する変換ロジックを制御します。
  7. [Validate] をクリックして、エラーに対処します。
  8. [] をクリックして、[Properties] ダイアログを終了します。

Cloud Storage への書き込み

  1. Cloud Data Fusion ウェブ インターフェースで、[Studio] ページのデータ パイプラインに移動します。
  2. [Sink] プルダウン メニュー で、[Cloud Storage] を選択します。
  3. Pipeline Studio キャンバスで、Wrangler ノードから先ほど追加した Cloud Storage ノードに矢印をドラッグします。
  4. パイプラインの Cloud Storage シンクノードに移動し、[Properties] をクリックします。
  5. [Reference name] フィールドに名前を入力します。
  6. [Path] フィールドに、パイプラインが出力ファイルを書き込むことができる、プロジェクトの Cloud Storage バケットのパスを入力します。Cloud Storage バケットがない場合は、1 つ作成します
  7. [Validate] をクリックして、エラーに対処します。
  8. [] をクリックして、[Properties] ダイアログを終了します。

マクロ引数を設定する

  1. Cloud Data Fusion ウェブ インターフェースで、[Studio] ページのデータ パイプラインに移動します。
  2. [Conditions and Actions] プルダウン メニューで [GCS Argument Setter] をクリックします。
  3. Pipeline Studio キャンバスで、Cloud Storage Argument Setter ノードから Cloud Storage ソースノードに矢印をドラッグします。
  4. パイプラインの Cloud Storage Argument Setter ノードに移動し、[Properties] をクリックします。
  5. [URL] フィールドに、次のように入力します。

    gs://reusable-pipeline-tutorial/args.json
    

    この URL は、Cloud Storage 内の一般公開オブジェクトを示しており、次のコンテンツを含みます。

    {
      "arguments" : [
        {
          "name": "input.path",
          "value": "gs://reusable-pipeline-tutorial/user-emails.txt"
        },
        {
          "name": "directives",
          "value": "send-to-error !dq:isEmail(body)"
        }
      ]
    }
    

    2 つある引数のうち最初の引数は input.path の値です。パス gs://reusable-pipeline-tutorial/user-emails.txt は、Cloud Storage の一般公開オブジェクトで、これには次のテストデータが含まれています。

    alice@example.com
    bob@example.com
    craig@invalid@example.com
    

    2 番目の引数は directives の値です。値 send-to-error !dq:isEmail(body) は、有効なメールアドレスではない行を除外するように Wrangler を設定します。たとえば、craig@invalid@example.com は除外されます。

  6. [検証] をクリックして、エラーがないことを確認します。

  7. [] をクリックして、[Properties] ダイアログを終了します。

パイプラインをデプロイして実行する

  1. [Pipeline Studio] ページの上部バーから [Name your pipeline] をクリックします。 パイプラインに名前を付け、[保存] をクリックします。

  2. [Deploy] をクリックします。

  3. ランタイム引数マクロ(ランタイム)を表示するinput.pathおよびdirectives引数を開くには、の横にあるプルダウン実行をクリックします。

    値フィールドを空白のままにすると、パイプライン内の Cloud Storage Argument Sette ノードがランタイム中にこれらの引数の値を設定することを Cloud Data Fusion に通知します。

  4. [実行] をクリックします。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

チュートリアルを完了したら、Google Cloud で作成したリソースをクリーンアップし、以後は課金されないようにします。次のセクションで、このようなリソースを削除または無効にする方法を説明します。

Cloud Data Fusion インスタンスを削除する

Cloud Data Fusion インスタンスを削除する手順に従います。

プロジェクトを削除する

課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除するには:

  1. Google Cloud コンソールで、[リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

次のステップ