ターゲット キャンペーン パイプラインを作成する
Cloud Data Fusion を使用して、ターゲット キャンペーンの候補を選択するために顧客データをクリーニング、変換、処理する方法について説明します。
このタスクの手順をガイドに沿って Google Cloud コンソールで直接行う場合は、「ガイドを表示」をクリックしてください。
シナリオ
展開中のキャンペーンのプロモーション用のカスタム マーケティング資料を作成し、その資料を顧客の自宅の郵便受けに直接配達してもらいます。
キャンペーンには次の 2 つの制約があります。
- 所在地: カリフォルニア、ワシントン、オレゴンのお客様だけに配達する。
- 費用: 燃料を節約するために、家まですぐに行き着ける顧客に配達します。配達先は住所に avenue がある顧客に限定します。
このチュートリアルでは、キャンペーン用に顧客住所のリストを生成する方法を説明します。このチュートリアルでは、次のことを行います。
- 顧客データをクリーニングする: カリフォルニア、ワシントン、オレゴンの住所に avenue がある顧客をフィルタします。
次の処理を行うパイプラインを作成します。
- フィルタした顧客データを、州の略称を含む公開データセットと結合します。
- クリーニングと結合を行ったデータを BigQuery テーブルに格納し、BigQuery ウェブ インターフェース を使用したクエリの実行や Looker Studio を使用した分析を実行します。
目標
- Cloud Data Fusion を 2 つのデータソースに接続する
- 基本的な変換を適用する
- 2 つのデータソースを結合する
- 出力データをシンクに書き込む
準備
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Cloud Data Fusion, BigQuery, Cloud Storage, and Dataproc APIs.
-
Cloud Data Fusion インスタンスを作成します。
このチュートリアルは、デフォルトの Compute Engine サービス アカウントを使用していることを前提としています。
権限を管理
必要なカスタムロールと権限を作成して割り当てます。
カスタムロールを作成して権限を追加する
Google Cloud コンソールの [ロール] ページに移動します。
[
ロールを作成] をクリックします。[Title] 項目に「
Custom Role-Tutorial
」と入力します。[
権限を追加] をクリックします。[権限を追加] ウィンドウで次の権限を選択し、[追加] をクリックします。
bigquery.datasets.create
bigquery.jobs.create
storage.buckets.create
[作成] をクリックします。
デフォルトの Compute Engine サービス アカウントにカスタムロールを割り当てる
Cloud Data Fusion の [Instances] ページに移動します。
インスタンスの名前をクリックします。
デフォルトの Dataproc サービス アカウントをメモします。この情報はインスタンスの詳細ページに含まれます。
Dataproc サービス アカウント名の形式は次のとおりです。
CUSTOMER_PROJECT_NUMBER-compute@developer.gserviceaccount.com
Dataproc サービス アカウントの詳細を確認してください。
[IAM] ページに移動します。
[フィルタ] バーに、デフォルトの Dataproc サービス アカウントの名前を入力します。
デフォルトの Compute Engine サービス アカウントで、[
編集] をクリックします。[
別のロールを追加] をクリックします。[ロールを選択] フィールドで、[カスタムロール チュートリアル] を選択します。
[保存] をクリックします。
サービス アカウントに Cloud Data Fusion 実行者のロールがすでに割り当てられていることを確認します。
顧客データを準備する
このチュートリアルでは、次の 2 つの入力データセットが必要です。どちらも Cloud Data Fusion インスタンスで提供されています。
- 顧客データのサンプル:
customers.csv
という名前の CSV ファイル。 - 州の略称:
state_abbreviations
という名前の BigQuery テーブル。
顧客データを読み込む
Cloud Data Fusion の [Instances] ページに移動します。
使用している Cloud Data Fusion インスタンスで、[View instance] をクリックします。Cloud Data Fusion のウェブ インターフェースが新しいタブで開きます。
[Wrangler] をクリックします。[Wrangler] ページが開きます。
[Connections] ペインで、[GCS] > [Sample Buckets] に移動します。
[campaign-tutorial] をクリックします。
customers.csv をクリックします。
[Parsing options] ウィンドウで、次のように指定します。
- Format:
csv
- 引用符付きの値を有効にする:
False
- 引用符付きの値を有効にする:
False
- ファイル エンコード:
UTF-8
- Format:
[Confirm] をクリックすると、顧客データが Wrangler の新しいタブに読み込まれます。
顧客データをクリーニングする
これには 2 つのサブタスクが含まれます。
- スキーマを設定する
- 目的のターゲット オーディエンスのみを表示するように顧客データをフィルタする
スキーマを設定する
適切な名前をテーブルの列に割り当てて、データのスキーマを設定します。body_1
や body_2
などの列にわかりやすい名前を付ける手順は次のとおりです。
- 右ペインで [Columns] タブをクリックします。
- [Column names] プルダウンをクリックし、[Set all] を選択します。
[Bulk set column] ダイアログ ボックスに、カンマ区切りの次の列名を入力します。
Name,StreetAddress,City,State,Country
[適用] をクリックします。
データをフィルタリングする
カリフォルニア、オレゴン、ワシントンに住んでいる顧客のみを表示するようにデータをフィルタリングします。
これらの州以外の値を含むすべての行を削除します。
- State 列のプルダウンをクリックして、[Filter] を選択します。
フィルタ ウィンドウで次の操作を行います。
- [Keep row] をクリックします。
- [If] プルダウンをクリックして、[value matching regex] を選択します。
次の正規表現を入力します。
^(California|Oregon|Washington)$
[適用] をクリックします。
[State] 列の値は、California、Oregon、Washington です。
データをフィルタリングして、住所に avenue を含む顧客のみを表示します。文字列 Avenue
を含む住所のみを保持します。
- StreetAddress 列のプルダウンをクリックして、[StreetAddress] を選択します。
- フィルタ ウィンドウで次の操作を行います。
- [Keep row] をクリックします。
- If プルダウンから [value contains] を選択し、「
Avenue
」と入力します。 - [Ignore case] を選択します。
- [適用] をクリックします。
データセット全体に対して並列処理ジョブを実行する前、Wrangler にはデータセットの最初の 1000 個の値しか表示されません。一部のデータをフィルタリングしたため、Wrangler の表示に残った顧客はほんの一部です。
バッチ パイプラインを作成する
ここまで、データのクリーニングとデータの一部の変換を実行しました。これで、データセット全体に対して変換を実行する、バッチ パイプラインを作成できます。
Cloud Data Fusion は、スタジオで構築したパイプラインを Apache Spark プログラムに変換します。プログラムは、エフェメラル Dataproc クラスタで変換を並列に実行します。このプロセスにより、インフラストラクチャを処理することなく、スケーラブルかつ信頼性の高い方法で、膨大な量のデータに対して複雑な変換を簡単に行うことができます。
- [Wrangler] ページで [Create a pipeline] をクリックします。
- [Batch pipeline] を選択します。[Studio] ページが開きます。
Studio のページで、GCSFile ソースノードが GCSFile ノードに接続されています。
Wrangler ページで適用した変換は、Studio ページの [Wrangler] ノードに表示されます。
適用した変換を表示するには、Wrangler ノードの上にポインタを置いて [Properties] をクリックします。
適用した変換が [Directives] に表示されます。
[検証] をクリックします。
[
閉じる] をクリックします。
たとえば、Country 列は、値が常に USA
であるため不要です。この列を削除する手順は次のとおりです。
- [Wrangle] をクリックします。
- Country の横にある下向き矢印をクリックし、[Delete Column] を選択します。
- [適用] をクリックします。[Wrangler] ページが閉じ、[Studio] ページで [Wrangler Properties] ウィンドウが開きます。[Directives] に「
drop Country
」が表示されます。 - [閉じる] をクリックします。
州名を略称にする
配達用車両のナビゲーション システムは、州名の略称(カリフォルニア州ではなく CA)を含む住所のみを認識します。そして、顧客データには完全な州名が含まれています。
公開されている BigQuery state_abbreviations
テーブルには 2 つの列があり、1 つは完全な州名、もう 1 つは州名の略称です。このテーブルを使用して、顧客データの州名を更新できます。
BigQuery の州名データを表示する
別のタブで BigQuery Studio ページに移動します。
[Create SQL query] をクリックして、クエリエディタに次のクエリを入力します。
SELECT * FROM `dis-user-guide.campaign_tutorial.state_abbreviations`
[実行] をクリックします。
BigQuery に州名と略称のリストが表示されます。
BigQuery テーブルにアクセスする
BigQuery state_abbreviations
テーブルにアクセスするソースをパイプラインに追加します。
- Cloud Data Fusion Studio のページに移動し、[Source] メニューを開きます。
[BigQuery] をクリックします。
BigQuery ソースノードが他の 2 つのノードとともにキャンバスに表示されます。
ポインタを BigQuery ソースノードの上に置き、[Properties] をクリックします。
- [データセット ID] フィールドに「
dis-user-guide
」と入力します。 - [Reference Name] フィールドに「
state_abbreviations
」と入力します。 - [Dataset] フィールドに「
campaign_tutorial
」と入力します。 - [Table] フィールドに「
state_abbreviations
」と入力します。
- [データセット ID] フィールドに「
[Get Schema] をクリックして、BigQuery からテーブルのスキーマを読み込みます。
[閉じる] をクリックします。
2 つのデータソースを結合する
州名の略称を持つ顧客データを含む出力を生成するには、顧客データと州の略称の 2 つのデータソースを結合します。
- Cloud Data Fusion Studio のページに移動し、[分析] メニューを開きます。
[Joiner] をクリックします。
SQL Join に似たアクションを表す Joiner ノードがキャンバスに表示されます。
ソースノードの右端にある接続矢印を宛先ノードにドラッグ&ドロップして、Wrangler ノードと BigQuery ノードを Joiner ノードに接続します。
ポインタを Joiner ノードの上に置き、[Properties] をクリックします。
[Fields] セクションで [Wrangler] と [BigQuery] を開きます。
- Wrangler の [state] チェックボックスをオフにします。
- 州名は略称のみ必要で完全な州名は不要なため、BigQuery の [name] チェックボックスをオフにします。
BigQuery の [abbreviation] のチェックボックスをオンのままにして、エイリアスを
State
に変更します。
[Join Type] フィールドの値は [Outer] のままにします。[Required inputs] で、[Wrangler] チェックボックスをオンにします。
[Join condition] セクションで、Wrangler に対して [State] を選択します。BigQuery の場合は、[名前] を選択します。
設定した結合によって得られるスキーマを生成します。[Get Schema] をクリックします。
[検証] をクリックします。
[閉じる] をクリックします。
BigQuery に出力を保存する
パイプラインの結果を BigQuery テーブルに格納します。 データを格納する場所をシンクといいます。
- Cloud Data Fusion Studio ページに移動し、[Sink] を開きます。
- [BigQuery] をクリックします。
Joiner ノードを BigQuery ノードに接続します。
ポインタを BigQuery ノードの上に置き、[Properties] をクリックします。
- [Dataset] フィールドに「
dis_user_guide
」と入力します。 - [Table] フィールドで
customer_data_abbreviated_states
を選択します。 - [閉じる] をクリックします。
- [Dataset] フィールドに「
パイプラインをデプロイして実行する
- Studio ページで、[Name your pipeline] をクリックし、「
CampaignPipeline
」と入力します。 - [保存] をクリックします。
- 右上隅の [Deploy] をクリックします。
- デプロイが完了したら、[Run] をクリックします。
パイプラインの実行には数分を要する場合があります。待機している間、パイプラインのステータスが [Provisioning] > [Starting] > [Running] > [Deprovisioning] > [Succeeded] に変わるのを確認できます。
結果を見る
Google Cloud コンソールで [BigQuery] ページに移動します。
[Create SQL query] をクリックします。
customer_data_abbreviated_states
テーブルに対してクエリを実行します。SELECT * FROM dis_user_guide.customer_data_abbreviated_states LIMIT 1000
これで、データ パイプラインが正常に作成されました。
クリーンアップ
このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の操作を行います。
BigQuery データセットを削除する
このチュートリアルで作成した BigQuery データセットを削除するには、次のようにします。
- Google Cloud コンソールで [BigQuery] ページに移動します。
dis_user_guide
データセットを選択します。- [deleteDelete dataset] をクリックします。
Cloud Data Fusion インスタンスを削除する
手順に従って Cloud Data Fusion インスタンスを削除します。
プロジェクトの削除
課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。
プロジェクトを削除するには:
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
次のステップ
- Cloud Data Fusion について学ぶ。