Cloud Composer を使用した Dataflow パイプラインの起動

Cloud Composer 1 | Cloud Composer 2

このページでは、DataflowTemplateOperator を使用して Cloud Composer から Dataflow パイプラインを起動する方法について説明します。Cloud Storage Text to BigQuery パイプラインは、Cloud Storage に保存されているテキスト ファイルをアップロードし、ユーザーが指定する JavaScript ユーザー定義関数(UDF)を使用してそれらのファイルを変換し、結果を BigQuery に出力するバッチ パイプラインです。

ユーザー定義の関数、入力ファイル、および JSON スキーマが Cloud Storage バケットにアップロードされます。これらのファイルを参照する DAG は Dataflow バッチ パイプラインを起動し、ユーザー定義関数と JSON スキーマファイルを入力ファイルに適用します。その後、このコンテンツが BigQuery テーブルにアップロードされます。

概要

  • ワークフローを開始する前に、次のエンティティを作成します。

    • locationaverage_temperaturemonth、必要に応じて inches_of_rainis_currentlatest_measurement の情報の列を保持する空のデータセットから作成した空の BigQuery テーブル。

    • .txt ファイルのデータを BigQuery テーブルのスキーマの正しい形式に正規化する JSON ファイル。JSON オブジェクトには BigQuery Schema の配列があり、各オブジェクトには列名、入力のタイプ、必須フィールドについての該当の有無が示されます。

    • BigQuery テーブルに一括アップロードされるデータを保持する入力 .txt ファイル。

    • .txt ファイルの各行をテーブルの関連変数に変換する、JavaScript で記述されたユーザー定義関数。

    • これらのファイルの場所を指定する Airflow DAG ファイル。

  • 次に、.txt ファイル、.js UDF ファイル、.json スキーマ ファイルを Cloud Storage バケットにアップロードします。DAG も Cloud Composer 環境にアップロードします。

  • DAG をアップロードすると、Airflow がタスクを実行します。このタスクでは、ユーザー定義関数を .txt ファイルに適用し、JSON スキーマに従ってフォーマットする Dataflow パイプラインを起動します。

  • 最後に、先ほど作成した BigQuery テーブルにデータがアップロードされます。

準備

  • このガイドを読む前に、JavaScript を理解して、ユーザー定義関数を作成できる必要があります。
  • このガイドでは、Cloud Composer 環境がすでに存在することを前提としています。作成するには、環境を作成するをご覧ください。このガイドでは、Cloud Composer の任意のバージョンを使用できます。
  • Enable the Cloud Composer, Dataflow, Cloud Storage, BigQuery APIs.

    Enable the APIs

スキーマ定義を持つ空の BigQuery テーブルを作成する

スキーマ定義を持つ BigQuery テーブルを作成します。このチュートリアルの後の部分で、このスキーマ定義を使用します。この BigQuery テーブルは、バッチ アップロードの結果を保持します。

スキーマ定義を含む空のテーブルを作成するには:

コンソール

  1. Google Cloud コンソールで [BigQuery] ページに移動します。

    BigQuery に移動

  2. ナビゲーション パネルの [リソース] セクションで、プロジェクトを展開します。

  3. 詳細パネルで [データセットを作成] をクリックします。

    [データセットを作成] ボタンをクリックする

  4. [データセットを作成] ページの [データセット ID] セクションで、データセットに average_weather という名前を付けます。他のすべてのフィールドはデフォルト状態のままにします。

    データセット ID には、「average_weather」という名前を入力します。

  5. [データセットを作成] をクリックします。

  6. ナビゲーション パネルに戻り、[リソース] セクションでプロジェクトを展開します。次に、average_weather データセットをクリックします。

  7. 詳細パネルで「テーブルを作成」をクリックします。

    [テーブルを作成] をクリックする

  8. [テーブルを作成] ページの [ソース] セクションで、[空のテーブル] を選択します。

  9. [テーブルの作成] ページの [送信先] セクションで、次の操作を行います。

    • [データセット名] で、average_weather データセットを選択します。

      average_weather データセットのデータセット オプションを選択する

    • [テーブル名] フィールドに「average_weather」という名前を入力します。

    • [Table type] が [Native table] に設定されていることを確認します。

  10. [スキーマ] セクションにスキーマ定義を入力します。代わりに、次のいずれかの方法を使用できます。

    • スキーマ情報を手動で入力します。テキストとして編集を有効にして、テーブル スキーマを JSON 配列として入力します。次のフィールドに入力します。

      [
          {
              "name": "location",
              "type": "GEOGRAPHY",
              "mode": "REQUIRED"
          },
          {
              "name": "average_temperature",
              "type": "INTEGER",
              "mode": "REQUIRED"
          },
          {
              "name": "month",
              "type": "STRING",
              "mode": "REQUIRED"
          },
          {
              "name": "inches_of_rain",
              "type": "NUMERIC"
          },
          {
              "name": "is_current",
              "type": "BOOLEAN"
          },
          {
              "name": "latest_measurement",
              "type": "DATE"
          }
      ]
      
    • [フィールドを追加] を使用して、スキーマを手動で入力します。

      [フィールドを追加] をクリックしてフィールドを入力する

  11. [パーティションとクラスタの設定] はデフォルト値(No partitioning)のままにします。

  12. [詳細オプション] セクションの [暗号化] は、デフォルト値(Google-managed key)のままにします。

  13. [テーブルを作成] をクリックします。

bq

bq mk コマンドを使用して、空のデータセットとこのデータセット内にテーブルを作成します。

次のコマンドを実行して、世界の平均的な天気情報のデータセットを作成します。

bq --location=LOCATION mk \
    --dataset PROJECT_ID:average_weather

以下を置き換えます。

次のコマンドを実行して、このデータセットにスキーマ定義を持つ空のテーブルを作成します。

bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE

テーブルを作成した後、テーブルの有効期限、説明、ラベルを更新できます。スキーマ定義を変更することもできます。

Python

このコードを dataflowtemplateoperator_create_dataset_and_table_helper.py として保存し、プロジェクトとロケーションを反映するようにコードに含まれる変数を更新してから、次のコマンドで実行します。

python dataflowtemplateoperator_create_dataset_and_table_helper.py

Python

Cloud Composer に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。


# Make sure to follow the quickstart setup instructions beforehand.
# See instructions here:
# https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries

# Before running the sample, be sure to install the bigquery library
# in your local environment by running pip install google.cloud.bigquery

from google.cloud import bigquery

# TODO(developer): Replace with your values
project = "your-project"  # Your GCP Project
location = "US"  # the location where you want your BigQuery data to reside. For more info on possible locations see https://cloud.google.com/bigquery/docs/locations
dataset_name = "average_weather"

def create_dataset_and_table(project, location, dataset_name):
    # Construct a BigQuery client object.
    client = bigquery.Client(project)

    dataset_id = f"{project}.{dataset_name}"

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # Set the location to your desired location for the dataset.
    # For more information, see this link:
    # https://cloud.google.com/bigquery/docs/locations
    dataset.location = location

    # Send the dataset to the API for creation.
    # Raises google.api_core.exceptions.Conflict if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset)  # Make an API request.

    print(f"Created dataset {client.project}.{dataset.dataset_id}")

    # Create a table from this dataset.

    table_id = f"{client.project}.{dataset_name}.average_weather"

    schema = [
        bigquery.SchemaField("location", "GEOGRAPHY", mode="REQUIRED"),
        bigquery.SchemaField("average_temperature", "INTEGER", mode="REQUIRED"),
        bigquery.SchemaField("month", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("inches_of_rain", "NUMERIC", mode="NULLABLE"),
        bigquery.SchemaField("is_current", "BOOLEAN", mode="NULLABLE"),
        bigquery.SchemaField("latest_measurement", "DATE", mode="NULLABLE"),
    ]

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)  # Make an API request.
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

Cloud Storage バケットを作成する

ワークフローに必要なすべてのファイルを保持するバケットを作成します。このガイドの後半で作成する DAG は、このストレージ バケットにアップロードするファイルを参照します。新しいストレージ バケットを作成するには:

コンソール

  1. Google Cloud コンソールで Cloud Storage を開きます。

    [Cloud Storage] に移動

  2. [バケットを作成] をクリックして、バケット作成フォームを開きます。

    1. バケット情報を入力し、[続行] をクリックして各ステップを完了します。

      • : 実際のプロジェクトのグローバルに一意の名前。このガイドでは、bucketName を例として使用します。

      • ロケーション タイプには [リージョン] を選択します。次に、バケットデータが永続的に保存される [ロケーション] を選択します。

      • データのデフォルトのストレージ クラスとして [標準] を選択します。

      • [均一] アクセス制御を選択して、オブジェクトにアクセスします。

    2. [完了] をクリックします。

gsutil

gsutil mb コマンドを使用します。

gsutil mb gs://bucketName/

以下を置き換えます。

  • bucketName: このガイドの前半で作成したバケットの名前。

コードサンプル

C#

Cloud Composer に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。


using Google.Apis.Storage.v1.Data;
using Google.Cloud.Storage.V1;
using System;

public class CreateBucketSample
{
    public Bucket CreateBucket(
        string projectId = "your-project-id",
        string bucketName = "your-unique-bucket-name")
    {
        var storage = StorageClient.Create();
        var bucket = storage.CreateBucket(projectId, bucketName);
        Console.WriteLine($"Created {bucketName}.");
        return bucket;
    }
}

Go

Cloud Composer に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
)

// createBucket creates a new bucket in the project.
func createBucket(w io.Writer, projectID, bucketName string) error {
	// projectID := "my-project-id"
	// bucketName := "bucket-name"
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()

	bucket := client.Bucket(bucketName)
	if err := bucket.Create(ctx, projectID, nil); err != nil {
		return fmt.Errorf("Bucket(%q).Create: %w", bucketName, err)
	}
	fmt.Fprintf(w, "Bucket %v created\n", bucketName)
	return nil
}

Java

Cloud Composer に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class CreateBucket {
  public static void createBucket(String projectId, String bucketName) {
    // The ID of your GCP project
    // String projectId = "your-project-id";

    // The ID to give your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();

    Bucket bucket = storage.create(BucketInfo.newBuilder(bucketName).build());

    System.out.println("Created bucket " + bucket.getName());
  }
}

Python

Cloud Composer に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

from google.cloud import storage

def create_bucket(bucket_name):
    """Creates a new bucket."""
    # bucket_name = "your-new-bucket-name"

    storage_client = storage.Client()

    bucket = storage_client.create_bucket(bucket_name)

    print(f"Bucket {bucket.name} created")

Ruby

Cloud Composer に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

def create_bucket bucket_name:
  # The ID to give your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.create_bucket bucket_name

  puts "Created bucket: #{bucket.name}"
end

出力テーブル用に JSON 形式の BigQuery スキーマを作成する

前の手順で作成した出力テーブルと一致する JSON 形式の BigQuery スキーマ ファイルを作成します。フィールド名、型、モードは、前の手順において BigQuery テーブル スキーマで定義したものと一致する必要があります。このファイルは、.txt ファイルのデータを BigQuery スキーマと互換性のある形式に正規化します。このファイルに jsonSchema.json という名前を付けます。

{
    "BigQuery Schema": [
    {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
    },
    {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "inches_of_rain",
        "type": "NUMERIC"
    },
    {
        "name": "is_current",
        "type": "BOOLEAN"
    },
    {
        "name": "latest_measurement",
        "type": "DATE"
    }]
}

JavaScript ファイルを作成してデータをフォーマットする

このファイルでは、入力ファイル内のテキストの行を変換するロジックを提供する UDF(ユーザー定義関数)を定義します。なお、この関数は、この関数が入力ファイルの各行に対し 1 度実行されるよう、入力ファイルの各行のテキストを関数自体の引数として取得します。このファイルに transformCSVtoJSON.js という名前を付けます。


function transformCSVtoJSON(line) {
  var values = line.split(',');
  var properties = [
    'location',
    'average_temperature',
    'month',
    'inches_of_rain',
    'is_current',
    'latest_measurement',
  ];
  var weatherInCity = {};

  for (var count = 0; count < values.length; count++) {
    if (values[count] !== 'null') {
      weatherInCity[properties[count]] = values[count];
    }
  }

  return JSON.stringify(weatherInCity);
}

入力ファイルを作成する

このファイルには、BigQuery テーブルにアップロードする情報が格納されます。このファイルをローカルにコピーし、inputFile.txt という名前を付けます。

POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null

バケットにファイルをアップロードする

前に作成した Cloud Storage バケットに次のファイルをアップロードします。

  • JSON 形式の BigQuery スキーマ(.json
  • JavaScript ユーザー定義関数(transformCSVtoJSON.js
  • 処理するテキストの入力ファイル(.txt

コンソール

  1. Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。

    [バケット] に移動

  2. バケットのリストで、ご利用のバケットをクリックします。

  3. バケットの [オブジェクト] タブで、次のいずれかを行います。

    • デスクトップまたはファイル マネージャーから目的のファイルを Google Cloud コンソールのメインペインにドラッグ&ドロップします。

    • [ファイルをアップロード] ボタンをクリックして、表示されたダイアログでアップロードするファイルを選択し、[開く] をクリックします。

gsutil

gsutil cp コマンドを実行します。

gsutil cp OBJECT_LOCATION gs://bucketName

以下を置き換えます。

  • bucketName: このガイドの前半で作成したバケットの名前。
  • OBJECT_LOCATION は、オブジェクトへのローカルパスです。例: Desktop/transformCSVtoJSON.js

コードサンプル

Python

Cloud Composer に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

from google.cloud import storage

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"
    # The path to your file to upload
    # source_file_name = "local/path/to/file"
    # The ID of your GCS object
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Optional: set a generation-match precondition to avoid potential race conditions
    # and data corruptions. The request to upload is aborted if the object's
    # generation number does not match your precondition. For a destination
    # object that does not yet exist, set the if_generation_match precondition to 0.
    # If the destination object already exists in your bucket, set instead a
    # generation-match precondition using its generation number.
    generation_match_precondition = 0

    blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)

    print(
        f"File {source_file_name} uploaded to {destination_blob_name}."
    )

Ruby

Cloud Composer に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

def upload_file bucket_name:, local_file_path:, file_name: nil
  # The ID of your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  # The path to your file to upload
  # local_file_path = "/local/path/to/file.txt"

  # The ID of your GCS object
  # file_name = "your-file-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.bucket bucket_name, skip_lookup: true

  file = bucket.create_file local_file_path, file_name

  puts "Uploaded #{local_file_path} as #{file.name} in bucket #{bucket_name}"
end

DataflowTemplateOperator を構成する

DAG を実行する前に、次の Airflow 変数を設定します

Airflow 変数
project_id プロジェクト ID
gce_zone Dataflow クラスタを作成する Compute Engine のゾーン
bucket_path 前の手順で作成した Cloud Storage バケットのロケーション

次に前の手順で作成したファイルを参照して、Dataflow ワークフローを開始する DAG を作成します。この DAG をコピーして、composer-dataflow-dag.py としてローカルに保存します。



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
For more info on zones where Dataflow is available see:
https://cloud.google.com/dataflow/docs/resources/locations
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataflow import (
    DataflowTemplatedJobStartOperator,
)
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplatedJobStartOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

DAG を Cloud Storage にアップロードする

環境のバケット内の /dags フォルダに DAG をアップロードします。アップロードが正常に完了したら、[Cloud Composer 環境] ページの [DAG フォルダ] リンクをクリックすると確認できます。

環境内の DAG フォルダには DAG が格納される

タスクのステータスを表示する

  1. Airflow ウェブ インターフェースに移動します。
  2. DAG ページで、DAG 名(composerDataflowDAG など)をクリックします。
  3. DAG の詳細ページで、[Graph View] をクリックします。
  4. ステータスを確認します。

    • Failed: タスクの周囲に赤いボックスが表示されます。 タスクにカーソルを合わせて [State: Failed] を探すこともできます。

    • Success: タスクの周囲に緑色のボックスが表示されます。 タスクにカーソルを合わせて [State: Success] を確認することもできます。

数分後、Dataflow と BigQuery で結果を確認できます。

Dataflow でジョブを表示する

  1. Google Cloud コンソールの [Dataflow] ページに移動します。

    Dataflow に移動

  2. ジョブには dataflow_operator_transform_csv_to_bq という名前が付けられ、名前の末尾に次のようにハイフンが付いた一意の ID が付加されます。

    Dataflow ジョブには一意の ID があります

  3. 名前をクリックすると、ジョブの詳細が表示されます。

    ジョブの詳細をすべて表示する

BigQuery で結果を表示する

  1. Google Cloud コンソールで [BigQuery] ページに移動します。

    BigQuery に移動

  2. クエリは、標準 SQL を使用して送信できます。次のクエリを使用して、テーブルに追加された行を確認します。

    SELECT * FROM projectId.average_weather.average_weather