ターゲット キャンペーン パイプラインを作成する
Cloud Data Fusion を使用して、ターゲット キャンペーンの候補を選択するために顧客データをクリーニング、変換、処理する方法について説明します。
このタスクの手順をガイドに沿って Google Cloud コンソールで直接行う場合は、「ガイドを表示」をクリックしてください。
シナリオ
展開中のキャンペーンの販促用にお得意様向けのマーケティング資料を作成しようとしています。その資料は顧客の自宅のメールボックスに直接配布する予定です。
キャンペーンには次の 2 つの制約があります。
- 所在地: カリフォルニア、ワシントン、オレゴンのお客様だけに配達する。
- 費用: 燃料を節約するために、家まですぐに行き着ける顧客に配達します。配達先は住所に avenue がある顧客に限定します。
このチュートリアルでは、キャンペーン用に顧客住所のリストを生成する方法を説明します。このチュートリアルでは、次のことを行います。
- 顧客データをクリーニングする: カリフォルニア、ワシントン、オレゴンの住所に avenue がある顧客をフィルタします。
次の処理を行うパイプラインを作成します。
- フィルタした顧客データを、州の略称を含む公開データセットと結合します。
- クリーニングと結合を行ったデータを BigQuery テーブルに格納し、BigQuery ウェブ インターフェース を使用したクエリの実行や Looker Studio を使用した分析を実行します。
目標
- Cloud Data Fusion を 2 つのデータソースに接続する
- 基本的な変換を適用する
- 2 つのデータソースを結合する
- 出力データをシンクに書き込む
準備
- Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Cloud Data Fusion, BigQuery, Cloud Storage, and Dataproc API を有効にします。
-
Cloud Data Fusion インスタンスを作成します。
このチュートリアルは、デフォルトの Compute Engine サービス アカウントを使用していることを前提としています。
権限を管理
必要なカスタムロールと権限を作成して割り当てます。
カスタムロールを作成して権限を追加する
Google Cloud コンソールの [ロール] ページに移動します。
[
ロールを作成] をクリックします。[Title] 項目に「
Custom Role-Tutorial
」と入力します。[
権限を追加] をクリックします。[権限を追加] ウィンドウで次の権限を選択し、[追加] をクリックします。
bigquery.datasets.create
bigquery.jobs.create
storage.buckets.create
[作成] をクリックします。
デフォルトの Compute Engine サービス アカウントにカスタムロールを割り当てる
Cloud Data Fusion の [Instances] ページに移動します。
インスタンスの名前をクリックします。
デフォルトの Dataproc サービス アカウントをメモしておきます。この情報はインスタンスの詳細ページに含まれます。
Dataproc サービス アカウント名の形式は次のとおりです。
CUSTOMER_PROJECT_NUMBER-compute@developer.gserviceaccount.com
。Dataproc サービス アカウントの詳細を確認してください。
[IAM] ページに移動します。
[フィルタ] バーに、デフォルトの Dataproc サービス アカウントの名前を入力します。
デフォルトの Compute Engine サービス アカウントで、[
編集] をクリックします。[
別のロールを追加] をクリックします。[ロールを選択] フィールドで、[カスタムロール チュートリアル] を選択します。
[保存] をクリックします。
サービス アカウントに Cloud Data Fusion 実行者のロールがすでに割り当てられていることを確認します。
顧客データを準備する
このチュートリアルでは、次の 2 つの入力データセットが必要です。どちらも Cloud Data Fusion インスタンスで提供されています。
- 顧客データのサンプル:
customers.csv
という名前の CSV ファイル。 - 州の略称:
state_abbreviations
という名前の BigQuery テーブル。
顧客データを読み込む
Cloud Data Fusion の [Instances] ページに移動します。
使用している Cloud Data Fusion インスタンスで、[View instance] をクリックします。Cloud Data Fusion ウェブ インターフェースが新しいタブで開きます。
[Wrangler] をクリックします。[Wrangler] ページが開きます。
[Connections] ペインで、[GCS] > [Sample Buckets] に移動します。
[campaign-tutorial] をクリックします。
customers.csv をクリックします。
[Parsing options] ウィンドウで、次のように指定します。
- Format:
csv
- 引用符で囲まれた値を有効にする:
False
- 最初の行をヘッダーとして使用する:
False
- ファイル エンコード:
UTF-8
- Format:
[Confirm] をクリックすると、顧客データが Wrangler の新しいタブに読み込まれます。
顧客データをクリーニングする
これには 2 つのサブタスクが含まれます。
- スキーマを設定する
- 顧客データをフィルタリングして、必要なターゲット オーディエンスのみを表示する
スキーマを設定する
適切な名前をテーブルの列に割り当てて、データのスキーマを設定します。body_1
や body_2
などの列にわかりやすい名前を付けるには、次の手順を行います。
- 右ペインで [Columns] タブをクリックします。
- [Column names] プルダウンをクリックし、[Set all] を選択します。
[Bulk set column] ダイアログ ボックスに、カンマ区切りの次の列名を入力します。
Name,StreetAddress,City,State,Country
[適用] をクリックします。
データをフィルタリングする
カリフォルニア、オレゴン、ワシントンに住んでいる顧客のみを表示するようにデータをフィルタリングします。
これらの状態以外の値を含む行をすべて削除します。
- State 列のプルダウンをクリックして、[Filter] を選択します。
フィルタ ウィンドウで次の操作を行います。
- [Keep row] をクリックします。
- [If] プルダウンをクリックして、[value matching regex] を選択します。
次の正規表現を入力します。
^(California|Oregon|Washington)$
[適用] をクリックします。
State 列の値は、California、Oregon、Washington です。
データをフィルタリングして、住所に avenue を含む顧客のみを表示します。文字列 avenue
を含む住所のみを保持します。
- StreetAddress 列のプルダウンをクリックし、[StreetAddress] を選択します。
フィルタ ウィンドウで次の操作を行います。
- [Keep row] をクリックします。
- If プルダウンから [value contains] を選択し、「
Avenue
」と入力します。 - [Ignore case] を選択します。
[適用] をクリックします。
データセット全体に対して並列処理ジョブを実行する前、Wrangler にはデータセットの最初の 1000 個の値しか表示されません。一部のデータをフィルタリングしたため、Wrangler の表示に残った顧客はほんの一部です。
バッチ パイプラインを作成する
ここまで、データのクリーニングとデータの一部の変換を実行しました。これで、データセット全体に対して変換を実行する、バッチ パイプラインを作成できます。
Cloud Data Fusion は、スタジオでビルドしたパイプラインをエフェメラル Dataproc クラスタで変換を並列に実行する Apache Spark プログラムに変換します。このプロセスにより、インフラストラクチャを処理することなく、スケーラブルかつ信頼性の高い方法で、膨大な量のデータに対して複雑な変換を簡単に行うことができます。
- [Wrangler] ページで [Create a pipeline] をクリックします。
- [Batch pipeline] を選択します。[Studio] ページが開きます。
パイプライン タイプとして「Data Pipeline - Batch」が左上に表示されていることを確認します。
[Studio] ページで、GCSFile ソースノードが GCSFile ノードに接続されます。
Wrangler ページで適用した変換が、[Studio] ページの [Wrangler] ノードに表示されます。
適用した変換を表示するには、ポインタを [Wrangler] ノードの上に置き、[Properties] をクリックします。
適用した変換が [Directives] に表示されます。
[検証] をクリックします。
[
閉じる] をクリックします。
たとえば、Country 列は、値が常に「USA」であるため不要です。この列を削除する手順は次のとおりです。
- [Wrangle] をクリックします。
- Country の横にある下向き矢印をクリックし、[Delete Column] を選択します。
- [適用] をクリックします。Wrangler ページが閉じ、Wrangler の [Properties] ウィンドウが Studio ページで開きます。[ディレクティブ] に
drop Country
が表示されます。 - [閉じる] をクリックします。
州名を略称にする
配送車両のナビゲーション システムでは州名が略称で記されている住所のみを認識しますが(California ではなく CA)、顧客データには完全な州名が含まれています。
公開されている BigQuery state_abbreviations
テーブルには 2 つの列があり、1 つは完全な州名、もう 1 つは州名の略称です。このテーブルを使用して、顧客データの州名を更新できます。
BigQuery の州名データを表示する
別のタブで BigQuery Studio ページに移動します。
[Create SQL query] をクリックして、クエリエディタに次のクエリを入力します。
SELECT * FROM `dis-user-guide.campaign_tutorial.state_abbreviations`
[実行] をクリックします。
州名と略称が表示されます。
BigQuery テーブルにアクセスする
BigQuery state_abbreviations
テーブルにアクセスするソースをパイプラインに追加します。
- Cloud Data Fusion Studio のページに移動し、[Source] メニューを開きます。
[BigQuery] をクリックします。
BigQuery ソースノードが他の 2 つのノードとともにキャンバスに表示されます。
ポインタを BigQuery ソースノードの上に置き、[Properties] をクリックします。
- [データセット ID] フィールドに「
dis-user-guide
」と入力します。 - [Reference Name] フィールドに「
state_abbreviations
」と入力します。 - [Dataset] フィールドに「
campaign_tutorial
」と入力します。 - [Table] フィールドに「
state_abbreviations
」と入力します。
- [データセット ID] フィールドに「
[Get Schema] をクリックして、BigQuery からテーブルのスキーマを読み込みます。
[閉じる] をクリックします。
2 つのデータソースを結合する
州名の略称が記された顧客データを含む出力を生成するには、2 つのデータソース、顧客データ、州の略称を結合します。
- Cloud Data Fusion Studio ページに移動し、[分析] メニューを展開します。
[Joiner] をクリックします。
SQL Join に似たアクションを表す Joiner ノードがキャンバスに表示されます。
ソースノードの右端にある接続矢印を宛先ノードにドラッグ&ドロップして、Wrangler ノードと BigQuery ノードを Joiner ノードに接続します。
ポインタを Joiner ノードの上に置き、[Properties] をクリックします。
[Fields] セクションで [Wrangler] と [BigQuery] を開きます。
- Wrangler の [state] チェックボックスをオフにします。
- 州名は略称のみ必要で完全な州名は不要なため、BigQuery の [name] チェックボックスをオフにします。
BigQuery の [abbreviation] チェックボックスをオンのままにし、エイリアスを
State
に変更します。
[Join Type] フィールドの値は [Outer] のままにします。[Require input] で [Wrangler] チェックボックスをオンにします。
[Join condition] セクションで、Wrangler に対して [State] を選択します。BigQuery の場合は、[
Name
] を選択します。設定した結合によって得られるスキーマを生成します。[Get Schema] をクリックします。
[検証] をクリックします。
[閉じる] をクリックします。
BigQuery に出力を保存する
パイプラインの結果を BigQuery テーブルに格納します。 データを格納する場所をシンクといいます。
- Cloud Data Fusion Studio ページに移動し、[Sink] を開きます。
- [BigQuery] をクリックします。
Joiner ノードを BigQuery ノードに接続します。
ポインタを BigQuery ノードの上に置き、[Properties] をクリックします。
- [Dataset] フィールドに「
dis_user_guide
」と入力します。 - [Table] フィールドで
customer_data_abbreviated_states
を選択します。 - [閉じる] をクリックします。
- [Dataset] フィールドに「
パイプラインをデプロイして実行する
Studio ページで、[Name your pipeline] をクリックし、「
CampaignPipeline
」と入力します。[OK] をクリックします。
右上隅の [Deploy] をクリックします。
デプロイが完了したら、[Run] をクリックします。
パイプラインの実行には数分を要する場合があります。待機している間、パイプラインのステータスが [Provisioning] > [Starting] > [Running] > [Deprovisioning] > [Succeeded] に変わるのを確認できます。
結果を見る
Google Cloud コンソールで [BigQuery] ページに移動します。
[Create SQL query] をクリックします。
customer_data_abbreviated_states
テーブルに対してクエリを実行します。SELECT * FROM dis_user_guide.customer_data_abbreviated_states LIMIT 1000
これで、データ パイプラインが正常に作成されました。
クリーンアップ
このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の操作を行います。
BigQuery データセットを削除する
このチュートリアルで作成した BigQuery データセットを削除するには、次のようにします。
- Google Cloud コンソールで [BigQuery] ページに移動します。
dis_user_guide
データセットを選択します。- [deleteDelete dataset] をクリックします。
Cloud Data Fusion インスタンスを削除する
手順に従って Cloud Data Fusion インスタンスを削除します。
プロジェクトの削除
課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。
プロジェクトを削除するには:
- Google Cloud コンソールで、[リソースの管理] ページに移動します。
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。
次のステップ
- Cloud Data Fusion について学ぶ。