Cloud Composer を使用した Dataflow パイプラインの構築

このページでは、DataflowTemplateOperator を使用して Cloud Composer から Dataflow パイプラインを起動する方法について説明します。Cloud Storage Text to BigQuery パイプラインは、Cloud Storage に保存されているテキスト ファイルをアップロードし、ユーザーが指定する JavaScript ユーザー定義関数(UDF)を使用してそれらのファイルを変換し、結果を BigQuery に出力するバッチ パイプラインです。

ユーザー定義関数、入力ファイル、json スキーマが Cloud Storage バケットにアップロードされます。これらのファイルを参照する DAG は Dataflow バッチ パイプラインを起動します。これにより、ユーザー定義関数と json スキーマ ファイルが入力ファイルに適用されます。その後、このコンテンツは BigQuery テーブルにアップロードされます。

  • ワークフローを開始する前に、次のエンティティを作成する必要があります。

    • locationaverage_temperaturemonth、必要に応じて inches_of_rainis_currentlatest_measurement の情報の列を保持する空のデータセットから作成した空の BigQuery テーブル。

    • .txt ファイルのデータを BigQuery テーブルのスキーマの正しい形式に正規化する JSON ファイル。JSON オブジェクトには BigQuery Schema の配列があり、各オブジェクトには列名、入力のタイプ、必須フィールドについての該当の有無が示されます。

    • BigQuery テーブルに一括アップロードするデータを保持する入力 .txt ファイル。

    • .txt ファイルの各行をテーブルの関連変数に変換する、JavaScript で記述されたユーザー定義関数。

    • 上記のファイルの場所を指す有向非巡回グラフ(DAG)ファイル。

  • 次に、.txt ファイル、.js UDF ファイル、.json スキーマ ファイルを Storage バケットにアップロードします。DAG も Cloud Composer 環境にアップロードします。

  • DAG をアップロードすると、Airflow タスクが開始されます。タスクは、ユーザー定義関数を .txt ファイルに適用し、JSON スキーマに従ってフォーマットする Cloud Dataflow パイプラインを開始します。

  • 最後に、作成しておいた BigQuery テーブルにデータがアップロードされます。

費用

このチュートリアルでは、Google Cloud の課金対象となる以下のコンポーネントを使用します。

  • Cloud Composer
  • Dataflow
  • Cloud Storage
  • BigQuery

要件

  • Cloud Composer 環境が作成済みであることを確認します
  • 1.9.0 バージョン以降の Cloud Composer が必要です。イメージのバージョンを確認するには、環境の詳細をご覧ください。
  • このチュートリアルでは、ユーザー定義関数を記述するために JavaScript に精通している必要があります。
  • Cloud Composer, Dataflow, Cloud Storage, BigQuery API を有効にします。

    API を有効にする

環境設定

スキーマ定義を持つ空の BigQuery テーブルを作成する

まず、スキーマ定義を持つ BigQuery テーブルを作成します。このチュートリアルの後の部分で、このスキーマ定義を使用します。この BigQuery テーブルは、バッチ アップロードの結果を保持します。

スキーマ定義を含む空のテーブルを作成するには:

Console

  1. Cloud Console で BigQuery ウェブ UI を開きます。
    BigQuery ウェブ UI に移動

  2. ナビゲーション パネルの [リソース] セクションで、プロジェクトを展開します。

  3. ウィンドウの右側の詳細パネルで、[データセットを作成] をクリックします。

ウィンドウの右側にある [データセットを作成] ボタンをクリックします。

  1. [データセットを作成] ページの [データセット ID] セクションで、データセットに average_weather という名前を付けます。他のフィールドはすべてデフォルトの状態のままにします。

データセット ID には、「average_weather」という名前を入力します。

  1. [データセットを作成] をクリックします。

  2. ナビゲーション パネルに戻り、[リソース] セクションでプロジェクトを展開します。次に、average_weather データセットをクリックします。

  3. ウィンドウの右側の詳細パネルで、[テーブルを作成] をクリックします。

ウィンドウの右側にある [テーブルを作成] をクリックします。

  1. [テーブルの作成] ページの [ソース] セクションで、[空のテーブル] を選択します。

  2. [テーブルの作成] ページの [送信先] セクションで、次の操作を行います。

    • [データセット名] で、average_weather データセットを選択します。

      average_weather データセットのデータセット オプションを選択する

    • [テーブル名] フィールドに「average_weather」という名前を入力します。

    • [テーブルタイプ] が [ネイティブ テーブル] に設定されていることを確認します。

  3. [スキーマ] セクションでスキーマ定義を入力します。

    • スキーマ情報を手動で入力します。

      • [テキストとして編集] を有効にし、テーブル スキーマを JSON 配列として入力します。 このオプションの次のフィールドに入力します。

        [
        {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
        },
        {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
        },
        {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
        },
        {
        "name": "inches_of_rain",
        "type": "NUMERIC"
        },
        {
        "name": "is_current",
        "type": "BOOLEAN"
        },
        {
        "name": "latest_measurement",
        "type": "DATE"
        }
        ]
        

      • [フィールドを追加] を使用して、スキーマを手動で入力します。

      画面の下部にある [フィールドを追加] をクリックして、フィールドを入力します。

  4. [パーティションとクラスタの設定] はデフォルト値(No partitioning)のままにします。

  5. [詳細オプション] セクションの [暗号化] の値は、デフォルト(Google-managed key)のままにします。Compute Engine はデフォルトで、お客様の保存コンテンツを暗号化します

  6. [テーブルを作成] をクリックします。

CLI

空のデータセットを作成するには、--location フラグを指定して bq mk コマンドを使用します。 PROJECT_ID はプロジェクト ID に、LOCATION は任意のロケーションに置き換えます。レイテンシを最小限に抑えるため、Composer 環境が配置されているのと同じリージョンを選択することをおすすめします。

次のコマンドをコピーして、世界の平均的な天気情報のデータセットを作成します。

bq --location=LOCATION mk \
--dataset \
PROJECT_ID:average_weather

このデータセットにスキーマ定義を持つ空のテーブルを作成するには、以下のコマンドで PROJECT_ID をプロジェクト ID に置き換えて、ターミナルに入力します。

bq mk \
--table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE

テーブルを作成した後、テーブルの有効期限、説明、ラベルを更新できます。スキーマ定義を変更することもできます。

Python

サンプルを実行する前に、次のコマンドを実行してお使いの環境にライブラリをインストールします。

pip install google.cloud.bigquery

このコードを dataflowtemplateoperator_create_dataset_and_table_helper.py として保存し、プロジェクトとロケーションを反映するようにコードに含まれる変数を更新してから、次のコマンドで実行します。

python dataflowtemplateoperator_create_dataset_and_table_helper.py

Python

このサンプルを試す前に、Compute Engine クイックスタート: クライアント ライブラリの使用に記載されている Python の設定手順に従ってください。 詳細については、Compute Engine Python API のリファレンス ドキュメントをご覧ください。


# Make sure to follow the quickstart setup instructions beforehand.
# See instructions here:
# https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries

# Before running the sample, be sure to install the bigquery library
# in your local environment by running pip install google.cloud.bigquery

from google.cloud import bigquery

# TODO(developer): Replace with your values
project = 'your-project'  # Your GCP Project
location = 'US'  # the location where you want your BigQuery data to reside. For more info on possible locations see https://cloud.google.com/bigquery/docs/locations
dataset_name = 'average_weather'

def create_dataset_and_table(project, location, dataset_name):
    # Construct a BigQuery client object.
    client = bigquery.Client(project)

    dataset_id = f"{project}.{dataset_name}"

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # Set the location to your desired location for the dataset.
    # For more information, see this link:
    # https://cloud.google.com/bigquery/docs/locations
    dataset.location = location

    # Send the dataset to the API for creation.
    # Raises google.api_core.exceptions.Conflict if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset)  # Make an API request.

    print(f"Created dataset {client.project}.{dataset.dataset_id}")

    # Create a table from this dataset.

    table_id = f"{client.project}.{dataset_name}.average_weather"

    schema = [
        bigquery.SchemaField("location", "GEOGRAPHY", mode="REQUIRED"),
        bigquery.SchemaField("average_temperature", "INTEGER", mode="REQUIRED"),
        bigquery.SchemaField("month", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("inches_of_rain", "NUMERIC", mode="NULLABLE"),
        bigquery.SchemaField("is_current", "BOOLEAN", mode="NULLABLE"),
        bigquery.SchemaField("latest_measurement", "DATE", mode="NULLABLE"),
    ]

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)  # Make an API request.
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

Storage バケットの作成

次に、ワークフローに必要なすべてのファイルを保持するストレージ バケットを作成する必要があります。今後作成する DAG は、このストレージ バケットにアップロードしたファイルを参照します。新しいストレージ バケットを作成するには:

コンソール

  1. Cloud Console で Cloud Storage を開きます。

    Cloud Storage を開く

  2. [バケットを作成] をクリックして、バケット作成フォームを開きます。

  3. バケット情報を入力し、[続行] をクリックして各手順を完了します。

    • グローバルに一意の名前をバケットに指定します(チュートリアルの残りの部分では bucketName として参照します)。

    • ロケーション タイプには [リージョン] を選択します。次に、バケットデータが永続的に保存されるロケーションを選択します。

    • データのデフォルトのストレージ クラスとして [標準] を選択します。

    • [均一] アクセス制御を選択して、オブジェクトにアクセスします。

  4. [完了] をクリックします

gsutil

  1. gsutil mb コマンドを使用します。
    gsutil mb gs://bucketName/
    

Python

Python

このサンプルを試す前に、Compute Engine クイックスタート: クライアント ライブラリの使用に記載されている Python の設定手順に従ってください。 詳細については、Compute Engine Python API のリファレンス ドキュメントをご覧ください。

from google.cloud import storage

def create_bucket(bucket_name):
    """Creates a new bucket."""
    # bucket_name = "your-new-bucket-name"

    storage_client = storage.Client()

    bucket = storage_client.create_bucket(bucket_name)

    print("Bucket {} created".format(bucket.name))

出力テーブル用に JSON 形式の BigQuery スキーマを作成する

前の手順で作成した出力テーブルと一致する JSON 形式の BigQuery スキーマ ファイルを作成します。フィールド名、型、モードは、前の手順において BigQuery テーブル スキーマで定義したものと一致する必要があります。このファイルは、.txt ファイルのデータを BigQuery スキーマと互換性のある形式に正規化します。このファイルに jsonSchema.json という名前を付けます。

{
    "BigQuery Schema": [
    {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
    },
    {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "inches_of_rain",
        "type": "NUMERIC"
    },
    {
        "name": "is_current",
        "type": "BOOLEAN"
    },
    {
        "name": "latest_measurement",
        "type": "DATE"
    }]
}

JavaScript(.js)ファイルを作成してデータをフォーマットする

このファイルでは、入力ファイル内のテキスト行の変換ロジックを提供する UDF(ユーザー定義関数)を定義します。この関数は、入力ファイル内のテキストの各行を独自の引数として受け取り、入力ファイルの行ごとに 1 回実行されることに注意してください。このファイルに transformCSVtoJSON.js という名前を付けます。

Python


function transformCSVtoJSON(line) {
  var values = line.split(',');
  var properties = [
    'location',
    'average_temperature',
    'month',
    'inches_of_rain',
    'is_current',
    'latest_measurement',
  ];
  const weatherInCity = {};

  for (var count = 0; count < values.length; count++) {
    if (values[count] !== 'null') {
      weatherInCity[properties[count]] = values[count];
    }
  }

  var jsonString = JSON.stringify(weatherInCity);
  return jsonString;
}

入力ファイルを作成する

このファイルは、BigQuery テーブルにアップロードする情報を保持します。 このファイルをローカルにコピーし、inputFile.txt という名前を付けます。

POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null

ファイルを Storage バケットにアップロードしてステージング フォルダを作成する

前の手順で作成したストレージ バケットに次のファイルをアップロードします。

  • JSON 形式の BigQuery スキーマ(.json
  • JavaScript ユーザー定義関数(transformCSVtoJSON.js
  • 処理するテキストの入力ファイル(.txt

コンソール

  1. Google Cloud Console で Cloud Storage ブラウザを開きます。
    Cloud Storage ブラウザを開く
  2. バケットのリストで、バケット bucketName をクリックします。

  3. バケットの [オブジェクト] タブで次のいずれかを行います。

    • デスクトップまたはファイル マネージャーから目的のファイルを Cloud Console のメインペインにドラッグ&ドロップします。

    • [ファイルをアップロード] ボタンをクリックし、表示されたダイアログでアップロードするファイルを選択し、[開く] をクリックします。

gsutil

[gsutil cp] コマンドを使用します。

gsutil cp [OBJECT_LOCATION] gs://bucketName

ここで

  • [OBJECT_LOCATION] は、オブジェクトへのローカルパスです。例: Desktop/dog.png

  • [bucketName] は、前の手順で作成したグローバルに一意のバケット名です。

成功した場合は、次の例のようなレスポンスになります。

Operation completed over 1 objects/58.8 KiB.

Python

Python

このサンプルを試す前に、Compute Engine クイックスタート: クライアント ライブラリの使用に記載されている Python の設定手順に従ってください。 詳細については、Compute Engine Python API のリファレンス ドキュメントをご覧ください。

from google.cloud import storage

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # bucket_name = "your-bucket-name"
    # source_file_name = "local/path/to/file"
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_filename(source_file_name)

    print(
        "File {} uploaded to {}.".format(
            source_file_name, destination_blob_name
        )
    )

DataflowTemplateOperator の構成

サンプルを実行する前に、適切な環境変数が設定されていることを確認してください。これを行うには、gcloud または Airflow UI を使用します。

gcloud

次のコマンドを入力します。

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set project_id PROJECT_ID

ここで

  • ENVIRONMENT は、Cloud Composer 環境の名前です。
  • LOCATION は、Cloud Composer 環境が配置されているリージョンです。
  • PROJECT_ID はユーザーの Google Cloud プロジェクト ID です。
gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set gce_region GCE_REGION

ここで

  • GCE_REGION は Compute Engine リージョンのリージョンです。
gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set gce_zone GCE_ZONE

ここで

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set bucket_path BUCKET_PATH

ここで

  • BUCKET_PATH は、前の手順で作成した Cloud Storage バケットの場所です。

Airflow UI

  1. ツールバーで、[Admin] > [Variables] をクリックします。

  2. [作成] をクリックします。

  3. 次の情報を入力します。

    • Key:project_id
    • Val: PROJECT_ID(Google Cloud プロジェクト ID)
  4. [Save and Add Another] 左下隅にある [Save and Add Another] オプションを選択します。 をクリックします。

  5. 次の情報を入力します。

    • Key:bucket_path
    • Val: BUCKET_PATH。Cloud Storage バケットの場所です(例: gs://my-bucket)。
  6. [Save and Add Another] をクリックします。

  7. 次の情報を入力します。

    • Key:gce_region
    • 値: GCE_REGION(Compute Engine リージョンのリージョン)。
  8. [Save and Add Another] をクリックします。

  9. 次の情報を入力します。

  10. [保存] をクリックします。

次に前の手順で作成したファイルを参照して、Dataflow ワークフローを開始する DAG を作成します。この DAG をコピーして、composer-dataflow-dag.py としてローカルに保存します。

Python

このサンプルを試す前に、Compute Engine クイックスタート: クライアント ライブラリの使用に記載されている Python の設定手順に従ってください。 詳細については、Compute Engine Python API のリファレンス ドキュメントをご覧ください。



"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
* gce_region - Google Compute Engine region where Cloud Dataflow cluster should be
  created.
Learn more about the difference between the two here:
https://cloud.google.com/compute/docs/regions-zones
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.utils.dates import days_ago

bucket_path = models.Variable.get("bucket_path")
project_id = models.Variable.get("project_id")
gce_zone = models.Variable.get("gce_zone")
gce_region = models.Variable.get("gce_region")

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your region
        "region": gce_region,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "temp_location": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:

    start_template_job = DataflowTemplateOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

DAG の Cloud Storage へのアップロード

DAG を環境フォルダにアップロードします。アップロードが正常に完了したら、[Cloud Composer 環境] ページの [DAG フォルダ] リンクをクリックすると表示されるはずです。

環境内の DAG フォルダには DAG が格納されます。

タスクのステータスの表示

  1. Airflow ウェブ インターフェースに移動します。
  2. DAG ページで、DAG 名(composerDataflowDAG など)をクリックします。
  3. DAG の詳細ページで、[Graph View] をクリックします。
  4. ステータスを確認します。

    • Failed: タスクの周囲に赤いボックスが表示されます。 タスクにカーソルを合わせて [State: Failed] を探すこともできます。タスクの周囲に失敗したことを示す赤いボックスが表示されている

数分後、結果が Dataflow と BigQuery に表示されます。

Dataflow でジョブを表示する

  1. Dataflow ウェブ UI に移動に移動します。 Dataflow ウェブ UI に移動

  2. ジョブには dataflow_operator_transform_csv_to_bq という名前が付けられ、名前の末尾に次のようにハイフンが付いた一意の ID が付加されます。Dataflow ジョブには一意の ID があります

  3. 名前をクリックして、ジョブの詳細を表示します。Dataflow ジョブの詳細について学習する。 以下のすべてのジョブの詳細を表示する

BigQuery で結果を表示する

  1. BigQuery ウェブ UI に移動します。 BigQuery ウェブ UI に移動

  2. クエリは、標準 SQL を使用して送信できます。 次のクエリを使用して、テーブルに追加された行を確認します。

    SELECT * FROM projectId.average_weather
    

クリーンアップ

Google Cloud Platform アカウントに課金されないようにするには、このチュートリアルで使用したリソースを削除します。

  1. Cloud Composer 環境を削除します
  2. Cloud Composer 環境の Cloud Storage バケットを削除します。Cloud Composer 環境を削除しても、バケットは削除されません。
  3. Dataflow ジョブを停止します
  4. BigQuery テーブルBigQuery データセットを削除します。