ジョブビルダーを使用してパイプラインを作成する

ジョブビルダーは、コードを記述せずに Google Cloud コンソールで Dataflow パイプラインを構築して実行するためのビジュアル UI です。

次のスクリーンショットは、ジョブビルダー UI の詳細を示しています。ここでは、Pub/Sub から BigQuery に読み込まれるパイプラインを作成しています。

ジョブビルダー UI のスクリーンショット

概要

ジョブビルダーは、次の種類のデータの読み取りと書き込みをサポートしています。

  • Pub/Sub メッセージ
  • BigQuery テーブルデータ
  • Cloud Storage 内の CSV ファイル、JSON ファイル、テキスト ファイル

フィルタ、結合、マッピング、グループ化、展開(配列のフラット化)などのパイプライン変換をサポートしています。

ジョブビルダーは、パイプラインを Apache Beam YAML ファイルとして保存することもできます。この機能を使用すると、ジョブビルダーでパイプラインを設計し、YAML ファイルを Cloud Storage またはソース管理リポジトリに保存して再利用できます。

次のようなユースケースで、ジョブビルダーの利用をご検討ください。

  • コードを記述することなくパイプラインを迅速に構築したい。
  • パイプラインを YAML に保存して再利用する必要がある。
  • サポートされているソース、シンク、変換を使用してパイプラインを表現できる。
  • ユースケースに一致する Google 提供のテンプレートが存在しない。

新しいパイプラインを作成する

ジョブビルダーで新しいパイプラインを作成する手順は次のとおりです。

  1. Google Cloud コンソールの [ジョブ] ページに移動します。

    [ジョブ] に移動

  2. [テンプレートからジョブを作成] をクリックします

  3. [ジョブビルダー] をクリックします。

  4. [ジョブ名] に、ジョブの名前を入力します。

  5. [バッチ] または [ストリーミング] を選択します。

  6. [ストリーミング] を選択した場合は、ウィンドウ処理モードを選択します。次のように、ウィンドウの仕様を入力します。

    • 固定ウィンドウ: ウィンドウ サイズ(秒単位)を入力します。
    • スライディング ウィンドウ: ウィンドウ サイズとウィンドウ期間(秒単位)を入力します。
    • セッション ウィンドウ: セッションのギャップ(秒単位)を入力します。

    ウィンドウ処理の詳細については、ウィンドウとウィンドウ処理関数をご覧ください。

次に、以下の各セクションで説明するように、ソース、変換、シンクをパイプラインに追加します。

パイプラインにソースを追加する

パイプラインには少なくとも 1 つのソースが必要です。ジョブビルダーにはもともと、空のソースが 1 つ含まれています。ソースを構成する手順は次のとおりです。

  1. [ソース名] ボックスにソースの名前を入力するか、デフォルトの名前を使用します。ジョブを実行すると、この名前がジョブグラフに表示されます。

  2. [ソースタイプ] リストで、データソースのタイプを選択します。

  3. ソースタイプに応じて、追加の構成情報を指定します。たとえば、BigQuery を選択した場合は、読み取るテーブルを指定します。

    Pub/Sub を選択した場合は、メッセージ スキーマを指定します。Pub/Sub メッセージから読み取る各フィールドの名前とデータ型を入力します。パイプラインは、スキーマで指定されていないフィールドを削除します。

  4. 省略可: 一部のソースタイプでは、[ソースデータをプレビュー] をクリックして、ソースデータのプレビューを表示できます。

パイプラインに別のソースを追加するには、[ソースを追加] をクリックします。複数のソースのデータを結合するには、Join 変換をパイプラインに追加します。

パイプラインに変換を追加する

必要に応じて、パイプラインに 1 つ以上の変換を追加します。変換を追加する手順は次のとおりです。

  1. [変換を追加] をクリックします。

  2. [変換] 名のボックスに、変換の名前を入力するか、デフォルトの名前を使用します。ジョブを実行すると、この名前がジョブグラフに表示されます。

  3. [変換タイプ] リストで、変換のタイプを選択します。

  4. 変換タイプに応じて、追加の構成情報を指定します。たとえば、[フィルタ(Python)] を選択した場合は、フィルタとして使用する Python 式を入力します。

  5. 変換の入力ステップを選択します。入力ステップとは、この変換の入力データとなる出力を提供する、ソースまたは変換のことです。

パイプラインにシンクを追加する

パイプラインには少なくとも 1 つのシンクが必要です。ジョブビルダーにはもともと、空のシンクが 1 つ含まれています。シンクを構成する手順は次のとおりです。

  1. [シンク名] ボックスにシンクの名前を入力するか、デフォルトの名前を使用します。ジョブを実行すると、この名前がジョブグラフに表示されます。

  2. [シンクの種類] リストで、シンクの種類を選択します。

  3. シンクの種類に応じて、追加の構成情報を指定します。たとえば、BigQuery シンクを選択した場合は、書き込み先の BigQuery テーブルを選択します。

  4. シンクの入力ステップを選択します。入力ステップとは、この変換の入力データとなる出力を提供する、ソースまたは変換のことです。

  5. パイプラインにさらにシンクを追加するには、[シンクを追加] をクリックします。

パイプラインを実行する

ジョブビルダーからパイプラインを実行する手順は次のとおりです。

  1. 省略可: Dataflow ジョブ オプションを設定します。[Dataflow オプション] セクションを開くには、展開矢印 をクリックします。

  2. [ジョブを実行] をクリックします。ジョブビルダーは、送信されたジョブのジョブグラフに移動します。ジョブグラフを使用して、ジョブのステータスをモニタリングできます。

パイプラインを保存する

パイプラインを Beam YAML に保存する手順は次のとおりです。

  1. [保存] をクリックして、[YAML の保存] ウィンドウを開きます。

  2. 次のいずれかの操作を行います。

    • YAML をクリップボードにコピーするには、 [コピー] をクリックします。
    • Cloud Storage に保存するには、Cloud Storage パスを入力して [保存] をクリックします。
    • ローカル ファイルをダウンロードするには、[ダウンロード] をクリックします。

パイプラインを読み込む

パイプラインを Beam YAML に保存したら、ジョブビルダーに再度読み込むことができます。その後、ジョブビルダーを使用してパイプラインを変更または実行できます。

Beam YAML は、Cloud Storage またはテキストから読み込むことができます。

Cloud Storage からパイプラインを読み込む

Cloud Storage からパイプラインを読み込む手順は次のとおりです。

  1. [読み込み] をクリックします。
  2. [Cloud Storage からの読み込み] をクリックします。
  3. [YAML ファイルの場所] ボックスに、YAML ファイルの Cloud Storage 内の場所を入力するか、[参照] をクリックしてファイルを選択します。
  4. [読み込み] をクリックします。

テキストからパイプラインを読み込む

テキストからパイプラインを読み込む手順は次のとおりです。

  1. [読み込み] をクリックします。
  2. [テキストからの読み込み] をクリックします。
  3. YAML をウィンドウに貼り付けます。
  4. [読み込み] をクリックします。

次のステップ