再利用可能なパイプラインの作成

コレクションでコンテンツを整理 必要に応じて、コンテンツの保存と分類を行います。

このチュートリアルでは、Cloud Storage からデータを読み取り、データ品質検査を実行して、Cloud Storage に書き込む再利用可能なパイプラインを構築する方法を説明します。

再利用可能なパイプラインには通常のパイプライン構造がありますが、各パイプライン ノードの構成は HTTP サーバーによって提供される構成に基づいて変更できます。たとえば、静的パイプラインでは、Cloud Storage からデータを読み取り、変換を適用して、BigQuery 出力テーブルに書き込みを行います。代わりに、変換と BigQuery 出力テーブルを、パイプラインが読み取る Cloud Storage ファイルに基づいて変更する場合は、再利用可能なパイプラインを作成します。

目標

  • Argument Setter プラグインを使用して、パイプラインが実行ごとに異なる入力を読み取ることができるようにします。
  • Argument Setter プラグインを使用して、パイプラインが実行ごとに異なる品質検査を実行できるようにします。
  • 実行ごとの出力データを Cloud Storage に書き込みます。

費用

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

  • Cloud Data Fusion
  • Cloud Storage

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  4. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  5. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  6. Cloud Data Fusion, Cloud Storage, and Cloud Dataproc API を有効にします。

    API を有効にする

  7. Cloud Data Fusion インスタンスを作成します

Cloud Data Fusion を使用する場合は、Google Cloud Console と個別の Cloud Data Fusion UI の両方を使用します。Google Cloud Console で Google Cloud Console プロジェクトを作成し、Cloud Data Fusion インスタンスを作成、削除できます。Cloud Data Fusion UI では、Pipeline StudioWrangler などのさまざまなページを使用して Cloud Data Fusion の機能を使用できます。

  1. Google Cloud Console で、[インスタンス] ページを開きます。

    [インスタンス] ページを開く

  2. インスタンスの [操作] 列で、[インスタンスの表示] リンクをクリックします。新しいブラウザタブで Cloud Data Fusion UI が開きます。

Argument Setter プラグインをデプロイする

  1. Cloud Data Fusion ウェブ UI で、右上にある [Hub] をクリックします。

  2. [Augument Setter action plugin] をクリックして、[Deploy] をクリックします。

  3. 開いた [Deploy] ウィンドウで、[Finish] をクリックします。

  4. [Create a pipeline] をクリックします。[Pipeline Studio] ページが開きます。

Cloud Storage からの読み取り

  1. Pipeline Studio の左パネルで、[Source] プルダウン メニュー から [Google Cloud Storage] を選択します。
  2. Cloud Storage のソースカードにカーソルを合わせ、表示される [プロパティ] ボタンをクリックします。
  3. [Reference name] フィールドに名前を入力します。
  4. [Path] フィールドに「${input.path}」と入力します。このマクロは、異なるパイプラインの実行で使用する Cloud Storage 入力パスを制御します。
  5. 右側の [出力スキーマ] パネルで、オフセット フィールド行のゴミ箱アイコンをクリックして、出力スキーマから offset フィールドを削除します。
  6. [検証] をクリックして、エラーがないことを確認します。
  7. [X] ボタンをクリックして [プロパティ] ダイアログ ボックスを閉じます。

データを変換する

  1. [Pipeline Studio] ページの左側のパネルで、[Transform] プルダウン メニュー から [Wrangler] を選択します。
  2. Pipeline Studio のキャンバスで、Cloud Storage カードから Wrangler カードに矢印をドラッグします。
  3. Wrangler カードにカーソルを合わせて、表示される [Properties] ボタンをクリックします。
  4. [Input field name] に「body」と入力します。
  5. [Recipe] フィールドに「${directives}」と入力します。このマクロは、異なるパイプラインの実行で使用する変換ロジックを制御します。
  6. [検証] をクリックして、エラーがないことを確認します。
  7. [X] ボタンをクリックして [プロパティ] ダイアログ ボックスを閉じます。

Cloud Storage への書き込み

  1. [Pipeline Studio] ページの左側のパネルで、[Sink] プルダウン メニュー を使用して、[Cloud Storage] を選択します。
  2. Pipeline Studio のキャンバスで、Wrangler カードから先ほど追加した Cloud Storage カードに矢印をドラッグします。
  3. Cloud Storage Sink カードにカーソルを合わせ、表示される [Properties] ボタンをクリックします。
  4. [Reference name] フィールドに名前を入力します。
  5. [Path] フィールドに、パイプラインが出力ファイルを書き込むことができる、プロジェクトの Cloud Storage バケットのパスを入力します。Cloud Storage バケットがない場合は、1 つ作成します
  6. [検証] をクリックして、エラーがないことを確認します。
  7. [X] ボタンをクリックして、[Properties] メニューを閉じます。

マクロ引数を設定する

  1. [Pipeline Studio] ページの左側のパネルで、[Conditions and Actions] プルダウン メニュー を使用して Argument Setter プラグインを選択します。
  2. Pipeline Studio のキャンバスで、引数セッター カードから Cloud Storage のソースカードに矢印をドラッグします。
  3. Argument Setter カードにカーソルを合わせて、表示される [プロパティ] ボタンをクリックします。
  4. [URL] フィールドに、次のように入力します。

    https://storage.googleapis.com/reusable-pipeline-tutorial/args.json
    

    この URL は、Cloud Storage 内の一般公開オブジェクトを示しており、次のコンテンツを含みます。

    {
      "arguments" : [
        {
          "name": "input.path",
          "value": "gs://reusable-pipeline-tutorial/user-emails.txt"
        },
        {
          "name": "directives",
          "value": "send-to-error !dq:isEmail(body)"
        }
      ]
    }
    

    2 つある引数のうち最初の引数は input.path の値です。パス gs://reusable-pipeline-tutorial/user-emails.txt は、Cloud Storage の一般公開オブジェクトで、これには次のテストデータが含まれています。

    alice@example.com
    bob@example.com
    craig@invalid@example.com
    

    2 番目の引数は directives の値です。値 send-to-error !dq:isEmail(body) は、有効なメールアドレスではない行を除外するように Wrangler を設定します。たとえば、craig@invalid@example.com は除外されます。

  5. [検証] をクリックして、エラーがないことを確認します。

  6. [X] ボタンをクリックして、[Properties] メニューを閉じます。

パイプラインをデプロイして実行する

  1. [Pipeline Studio] ページの上部バーから [Name your pipeline] をクリックします。 パイプラインに名前を付け、[保存] をクリックします。
  2. [デプロイ] をクリックします。
  3. 次に [実行] から、プルダウン メニュー をクリックして、[ランタイム引数] を開き、input.path および directives 引数マクロ(runtime)を表示します。値フィールドを空白のままにすると、パイプライン内の Argument Setter ノードがランタイム中にこれらの引数の値を設定することを Cloud Data Fusion に通知します。
  4. [実行] をクリックします。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

チュートリアルを完了したら、Google Cloud で作成したリソースをクリーンアップし、以後は課金されないようにします。次のセクションで、このようなリソースを削除または無効にする方法を説明します。

Cloud Data Fusion インスタンスを削除する

手順に従って Cloud Data Fusion インスタンスを削除します

プロジェクトを削除する

課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除するには:

  1. Google Cloud コンソールで、[リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

次のステップ