このチュートリアルでは、Cloud Data Fusion を使用して、ターゲット キャンペーンの候補を選択するために顧客データをクリーニング、変換、処理する方法を説明します。
シナリオ
展開中のキャンペーンの販促用にお得意様向けのマーケティング資料を作成しようとしています。その資料は顧客の自宅のメールボックスに直接配布する予定です。
キャンペーンには次の 2 つの制約があります。
- 所在地: カリフォルニア、ワシントン、オレゴンのお客様だけに配達する。
- 費用: 燃料を節約するために、家まで簡単に行き着ける顧客に配達します。配達先は住所に avenue がある顧客に限定します。
このチュートリアルでは、キャンペーン用に顧客住所のリストを生成する方法を説明します。このチュートリアルでは、次のことを行います。
- 顧客データをクリーニングする: カリフォルニア、ワシントン、オレゴンの住所に avenue がある顧客をフィルタします。
- 次の処理を行うパイプラインを作成します。
- フィルタした顧客データを、州の略称を含む公開データセットと結合します。
- クリーニングと結合を行ったデータを BigQuery テーブルに格納し、BigQuery ウェブ インターフェース を使用したクエリの実行や Looker Studio を使用した分析を実行します。
目標
- Cloud Data Fusion を 2 つのデータソースに接続する
- 基本的な変換を適用する
- 2 つのデータソースを結合する
- 出力データをシンクに書き込む
費用
このドキュメントでは、課金対象である次の Google Cloud コンポーネントを使用します。
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
このチュートリアルを実施するたびに、3 個のノード(1 個のマスター、2 個のワーカー)からなる Dataproc クラスタが実行され、約 6 分間で約 6 MB のデータが BigQuery に書き込まれます。BigQuery に格納するデータは少量であるため、この見積もりでは BigQuery の費用を無視できます。これらの数値に基づき、このパイプラインを 1 回実行する費用は概算で次のようになります。
総費用 = Cloud Data Fusion の費用 + Dataproc の費用
この数式の各要素は、次のように細かく分けることができます。
Cloud Data Fusion の費用 = (時間 * Cloud Data Fusion のレート)
Dataproc の費用 = (時間 * VM の数 * (Compute Engine のレート + Dataproc のレート))
たとえば、us-west1 リージョンに 3 つの n1-standard4 VM がある Cloud Data Fusion のデフォルト Compute Engine プロファイルを使用した場合の、6 分間のジョブの費用の見積もりを考えてみます。
- 時間 = 0.1 時間
- Cloud Data Fusion のレート = 1 時間あたり $1.8
- VM の数 = 3
- Compute Engine のレート = VM ごとに 1 時間あたり $0.19
- Dataproc のレート = VM ごとに 1 時間あたり $0.01
費用計算式にこれらの値を入れて計算すると、この例の場合の合計費用は次のようになります。
(0.1 * 1.8) + (0.1 * 3 * (0.1900 + 0.01)) = $0.24(24 セント)
始める前に
- Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Google Cloud プロジェクトの課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Google Cloud プロジェクトの課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
-
Cloud Data Fusion, BigQuery, Cloud Storage, and Dataproc API を有効にします。
- Cloud Data Fusion インスタンスを作成します。
顧客データを準備する
このチュートリアルでは、次の 2 つの入力データセットが必要です。どちらも Cloud Data Fusion インスタンスで提供されています。
- 顧客データのサンプル:
customers.csv
という名前の CSV ファイル。 - 州の略称:
state_abbreviations
という名前の BigQuery テーブル。
顧客データを読み込む
- Cloud Data Fusion ウェブ インターフェースに移動します。
- ウェブ UI の [Wrangler] ページに移動します。
- 左側のパネルで、Cloud Storage のサンプル バケットをクリックします。
- [campaign-tutorial] をクリックします。
- customers.csv をクリックします。顧客データが表示されます。
顧客データをクリーニングする
顧客データを解析してテーブル形式にし、スキーマを設定し、顧客データをフィルタリングして、必要なターゲット ユーザーのみを表示します。
データを解析する
- body 列のプルダウンをクリックします。
- [Parse > CSV] をクリックします。
- 区切り文字をカンマにして、[Apply] をクリックします。
- データは複数の列に分割されるため、元の body 列は不要になります。body 列のプルダウンをクリックし、[Delete Column] を選択します。
スキーマを設定する
適切な名前をテーブルの列に割り当てて、データのスキーマを設定します。列の名前を bodyXX
から、それぞれの列に含まれる情報を表す名前に変更します。
- 右側の [Columns] タブで [Column names] プルダウンをクリックし、[Set all] を選択します。
- [Bulk set column names] ダイアログ ボックスに、カンマ区切りテキスト
Name,StreetAddress,City,State,Country
を入力します。 - [Apply](適用)をクリックします。
データをフィルタリングする
カリフォルニア、オレゴン、ワシントンに住んでいる顧客のみを表示するようにデータをフィルタリングします。
上記以外の値を含む行をすべて削除します。
- State 列のプルダウンをクリックします。
- [Filter] を選択します。
フィルタ ウィンドウで次の操作を行います。
- [Keep row] をクリックします。
- [If] プルダウンで、[value matches regex] を選択します。
- 次の正規表現を入力します。
^(California|Oregon|Washington)$
- [Apply](適用)をクリックします。
State 列の値は、California、Oregon、Washington です。
データをフィルタリングして、住所に avenue を含む顧客のみを表示します。文字列「avenue」を含むアドレスのみを保持します。
- StreetAddress 列の左側にある、下矢印をクリックして [Filter] を選択します。
- If プルダウンから [value contains] を選択し、「
Avenue
」と入力します。 - [Ignore case] を選択します。
データセット全体に対して並列処理ジョブを実行する前、Wrangler にはデータセットの最初の 1000 個の値しか表示されません。一部のデータをフィルタリングしたため、Wrangler の表示に残った顧客はほんの一部です。
パイプラインを作成する
ここまで、データのクリーニングとデータの一部の変換を実行しました。これで、データセット全体に対して変換を実行する、バッチ パイプラインを作成できます。
Cloud Data Fusion は、Studio でビルドしたパイプラインを、エフェメラル Dataproc クラスタで変換を並列に実行する Apache Spark または MapReduce プログラムに変換します。このプロセスにより、インフラストラクチャを処理することなく、膨大な量のデータに対して複雑な変換をスケーラブルで信頼性の高い方法で実行できます。
- Wrangler ページで [Create pipeline] をクリックします。
- [Batch pipeline] を選択します。Studio ページが開きます。
- 左上に Data Pipeline - Batch がパイプライン タイプとして表示されていることを確認します。
Studio ページで、GCSFile
ソースノードが Wrangler
ノードに接続されます。

Wrangler ページに適用した変換が Studio ページの Wrangler
ノードに表示されます。適用した変換を確認するには、ポインタを Wrangler
ノードの上に置き、[プロパティ] をクリックします。適用した変換はディレクティブに表示されます。
<figure id="wrangler-config">
<img src="/data-fusion/docs/images/tutorials/targeting-campaign-pipeline/wrangler-config.png"
class="screenshot" width="550">
</figure>
[Wrangle] をクリックすると、[Wrangler] ページに戻ります。追加した変換が Studio ページに表示されます。
たとえば、Country 列は、値が常に「USA」であるため不要です。この列を削除する手順は次のとおりです。
- [Wrangle] をクリックします。
- Country の横にある下向き矢印をクリックし、[Delete Column] を選択します。
- [適用] をクリックします。Wrangler ページが閉じ、Wrangler の [Properties] ウィンドウが Studio ページで開きます。[ディレクティブ] に
drop Country
が表示されます。 - [閉じる] をクリックします。
州名を略称にする
配送車両のナビゲーション システムでは州名が略称で記されている住所のみを認識しますが(California ではなく CA)、現在の顧客データには完全な州名が含まれています。
公開されている BigQuery state_abbreviations
テーブルには 2 つの列があり、1 つは完全な州名、もう 1 つは州名の略称です。このテーブルを使用して、顧客データの州名を更新できます。
BigQuery の州名データを表示する
別のタブで、Google Cloud コンソールの BigQuery ページに移動します。
クエリ エディタに次のクエリを入力し、[実行] をクリックします。
SELECT * FROM `dis-user-guide.campaign_tutorial.state_abbreviations`
BigQuery テーブルにアクセスする
この BigQuery state_abbreviations
テーブルにアクセスするソースをパイプラインに追加します。
- Cloud Data Fusion Studio ページに移動し、[ソース] メニューを展開します。
[BigQuery] をクリックします。
BigQuery ソースノードが他の 2 つのノードとともにキャンバスに表示されます。
ポインタを BigQuery ソースノードの上に置き、[Properties] をクリックします。
- [データセット ID] フィールドに「
dis-user-guide
」と入力します。 - [Reference Name] フィールドに「
state_abbreviations
」と入力します。 - [Dataset] フィールドに「
campaign_tutorial
」と入力します。 - [テーブル] フィールドに「
state_abbreviations
」と入力します。
- [データセット ID] フィールドに「
[Get Schema] をクリックして、BigQuery からテーブルのスキーマを読み込みます。
[閉じる] をクリックします。
2 つのデータソースを結合する
州名の略称が記された顧客データを含む出力を生成するには、2 つのデータソース、顧客データ、州の略称を結合します。
- Cloud Data Fusion Studio ページに移動し、[分析] メニューを展開します。
[Joiner] をクリックします。
SQL Join に似たアクションを表す
Joiner
ノードがキャンバスに表示されます。Wrangler ノードと BigQuery ノードを
Joiner
ノードに接続します。ソースノードの右端にある接続矢印をドラッグして宛先ノードにドロップします。ポインタを
Joiner
ノードの上に置き、[Properties] をクリックします。- [Wrangler] フィールドと [BigQuery] フィールドを展開します。
- BigQuery の [name] チェックボックスは、完全な州名ではなく略称のみを含めるため、クリアします。
- BigQuery の略語チェックボックスをオンにして、エイリアスを
State
に変更します。
- [Join Type] フィールドの値は、[Outer] のままにします。[Require Inputs] には Wrangler チェックボックスをオンにします。
- [Wrangler] 結合条件のフィールドに「
State
」と入力します。[BigQuery] 結合条件のフィールドに「name
」と入力します。 - 設定した結合によって得られるスキーマを生成します。[Get Schema] をクリックします。
- [閉じる] をクリックします。
- [Wrangler] フィールドと [BigQuery] フィールドを展開します。
BigQuery に出力を保存する
パイプラインの結果を BigQuery テーブルに格納します。 データを格納する場所をシンクといいます。
- Cloud Data Fusion Studio のページに移動し、[シンク] メニューを展開します。
- [BigQuery] をクリックします。
Joiner
ノードをBigQuery
ノードに接続します。- ポインタを
BigQuery
ノードの上に置き、[Properties] をクリックします。- [Reference Name] フィールドに「
customer_data_abbreviated_states
」と入力します。 - [Dataset] フィールドに「
dis_user_guide
」と入力します。 - [Table] フィールドで [
customer_data_abbreviated_states
] を選択します。 - [閉じる] をクリックします。
- [Reference Name] フィールドに「
パイプラインをデプロイして実行する
- Studio ページで、[パイプラインに名前を付ける] をクリックして、「
CampaignPipeline
」と入力します。 - 右上隅の [Deploy] をクリックします。
- デプロイが完了したら、[Run] をクリックします。パイプラインの実行には数分を要する場合があります。待機している間に、パイプラインの [ステータス] はプロビジョニング、開始中、実行中、プロビジョニング解除中、成功と推移します。
結果を見る
- BigQuery ページで、
campaign_targets
テーブルに対してクエリを実行します。 次のクエリのプロジェクト名を、実際のプロジェクト名に更新します。
SELECT * FROM `titan.PROJECT_NAME.campaign_targets` LIMIT 1000
クリーンアップ
このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。
プロジェクトの削除
課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。
プロジェクトを削除するには:
- Google Cloud コンソールで、[リソースの管理] ページに移動します。
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。
Cloud Data Fusion インスタンスを削除する
手順に従って Cloud Data Fusion インスタンスを削除します。
BigQuery データセットの削除
このチュートリアルで作成した BigQuery データセットを削除するには、次のようにします。
- Google Cloud コンソールで [BigQuery] ページに移動します。
dis_user_guide
データセットを選択します。- [deleteDelete dataset] をクリックします。
次のステップ
- Cloud Data Fusion について学ぶ。
- 入門ガイドを読む。