Salesforce(SFDC)との連携
このページでは、Cortex Framework Data Foundation での Salesforce(SFDC)運用ワークロードの統合手順について説明します。Cortex Framework は、Salesforce のデータを Dataflow パイプラインと統合して BigQuery に接続します。Cloud Composer は、これらの Dataflow パイプラインのスケジュール設定とモニタリングを行い、データから分析情報を取得します。
構成ファイル
Cortex Framework Data Foundation リポジトリの config.json
ファイルには、Salesforce などの任意のデータソースからデータを転送するために必要な設定が構成されています。このファイルには、運用中の Salesforce ワークロードの次のパラメータが含まれています。
"SFDC": {
"deployCDC": true,
"createMappingViews": true,
"createPlaceholders": true,
"datasets": {
"cdc": "",
"raw": "",
"reporting": "REPORTING_SFDC"
}
}
次の表に、各 SFDC オペレーション パラメータの値を示します。
パラメータ | 意味 | デフォルト値 | 説明 |
SFDC.deployCDC
|
CDC をデプロイする | true
|
Cloud Composer で DAG として実行する CDC 処理スクリプトを生成します。Salesforce Sales Cloud のさまざまな取り込みオプションについては、ドキュメントをご覧ください。 |
SFDC.createMappingViews
|
マッピング ビューを作成する | true
|
提供されている DAG は、Salesforce API から新しいレコードを取得し、ランディング ページでレコードを更新します。この値を true に設定すると、CDC で処理されたデータセットにビューが生成され、元のデータセットの「最新バージョンの真実」を含むテーブルが公開されます。false で SFDC.deployCDC が true の場合、DAG は SystemModstamp に基づく変更データ キャプチャ(CDC)処理で生成されます。詳しくは、Salesforce の CDC 処理をご覧ください。 |
SFDC.createPlaceholders
|
プレースホルダを作成する | true
|
ダウンストリーム レポートのデプロイがエラーなく実行されるように、取り込みプロセスで生成されない場合に備えて、空のプレースホルダ テーブルを作成します。 |
SFDC.datasets.raw
|
未加工のランディング データセット | - | CDC プロセスで使用され、レプリケーション ツールが Salesforce からデータを配置する場所です。テストデータを使用する場合は、空のデータセットを作成します。 |
SFDC.datasets.cdc
|
CDC 処理済みデータセット | - | レポートビューのソースとして機能し、DAG で処理されるレコードのターゲットとして機能するデータセット。テストデータを使用する場合は、空のデータセットを作成します。 |
SFDC.datasets.reporting
|
レポート データセット SFDC | "REPORTING_SFDC"
|
エンドユーザーがレポート用にアクセスできるデータセットの名前。ビューとユーザー向けテーブルがデプロイされます。 |
SFDC.currencies
|
通貨のフィルタ | [ "USD" ]
|
テストデータを使用しない場合は、ビジネスに関連する単一の通貨([ "USD" ] など)または複数の通貨([ "USD", "CAD" ] など)を入力します。これらの値は、利用可能な場合は分析モデルの SQL のプレースホルダの置換に使用されます。 |
データモデル
このセクションでは、エンティティ関係図(ERD)を使用して Salesforce(SFDC)データモデルについて説明します。
ベースビュー
これらは ERD の青色のオブジェクトであり、CDC テーブルのビューで、一部の列名エイリアス以外の変換はありません。スクリプトは src/SFDC/src/reporting/ddls
にあります。
レポートビュー
これらは ERD の緑色のオブジェクトであり、レポート テーブルで使用される関連するディメンション属性が含まれています。スクリプトは src/SFDC/src/reporting/ddls
にあります。
Salesforce データの要件
このセクションでは、Cortex Framework で使用するために Salesforce データをどのように構造化するかについて説明します。
- テーブル構造:
- 命名: テーブル名には
snake_case
(アンダースコアで区切られた小文字の単語)を使用し、複数形にします。例:some_objects
- データ型: 列は、Salesforce 内で表されるデータ型と同じデータ型を保持します。
- 可読性: レポートレイヤの明確性を高めるために、一部のフィールド名が若干調整される場合があります。
- 命名: テーブル名には
- 空のテーブルとデプロイ: 元のデータセットにない必要なテーブルは、デプロイ プロセス中に空のテーブルとして自動的に作成されます。これにより、CDC デプロイ ステップをスムーズに実行できます。
- CDC の要件:
Id
フィールドとSystemModstamp
フィールドは、CDC スクリプトがデータの変更を追跡するために重要です。これらの名前をそのまま使用することも、別の名前を使用することもできます。提供されている未加工処理スクリプトは、これらのフィールドを API から自動的に取得し、ターゲット レプリケーション テーブルを更新します。Id
: 各レコードの一意の識別子として機能します。SystemModstamp
: このフィールドには、レコードが最後に変更された日時を示すタイムスタンプが格納されます。
- 元データ処理スクリプト:提供される元データ処理スクリプトでは、追加の(CDC)処理は必要ありません。この動作は、デプロイ時にデフォルトで設定されます。
通貨換算のソーステーブル
Salesforce では、次の 2 つの方法で通貨を管理できます。
- 基本: デフォルトです。すべてのデータで単一の通貨が使用されます。
- 詳細: 為替レートに基づいて複数の通貨を変換します(詳細な通貨管理を有効にする必要があります)。
高度な通貨管理を使用する場合、Salesforce は次の 2 つの特別なテーブルを使用します。
- CurrencyTypes: このテーブルには、使用するさまざまな通貨(米ドル、ユーロなど)に関する情報が保存されます。
- DatedConversionRates: この表には、通貨間の為替レートの経時的な変化が示されます。
Cortex Framework では、高度な通貨管理を使用する場合は、これらのテーブルが存在することが前提とされています。高度な通貨管理を使用しない場合は、これらのテーブルに関連するエントリを構成ファイル(src/SFDC/config/ingestion_settings.yaml
)から削除できます。これにより、存在しないテーブルからデータを抽出する不要な試行を防ぐことができます。
SFDC データを BigQuery に読み込む
Cortex Framework は、Apache Airflow と Salesforce Bulk API 2.0 でスケジュール設定された Python スクリプトに基づくレプリケーション ソリューションを提供します。これらの Python スクリプトは、任意のツールで適応してスケジュール設定できます。詳細については、SFDC 抽出モジュールをご覧ください。
Cortex Framework には、データの取得元と管理方法に応じて、データを統合する 3 つの方法があります。
- API 呼び出し: このオプションは、API を介して直接アクセスできるデータに使用します。Cortex Framework は API を呼び出してデータを取得し、BigQuery 内の「未加工」データセットに保存できます。データセットに既存のレコードがある場合、Cortex Framework は新しいデータでレコードを更新できます。
- 構造マッピング ビュー: この方法は、別のツールを使用して BigQuery にデータがすでに読み込まれているが、データ構造が Cortex Framework の要件と一致しない場合などに便利です。Cortex Framework は、「ビュー」(仮想テーブルなど)を使用して、既存のデータ構造を Cortex Framework のレポート機能で想定される形式に変換します。
CDC(変更データ キャプチャ)処理スクリプト: このオプションは、絶えず変化するデータ用に特別に設計されています。CDC スクリプトはこれらの変更を追跡し、それに応じて BigQuery のデータを更新します。これらのスクリプトは、データ内の 2 つの特別なフィールドに依存しています。
Id
: 各レコードの一意の識別子。SystemModstamp
: レコードが変更された日時を示すタイムスタンプ。
データにこれらの名前が正確に含まれていない場合は、スクリプトを調整して、異なる名前で認識するようにできます。このプロセス中にデータスキーマにカスタム フィールドを追加することもできます。たとえば、Account オブジェクトのデータを含むソーステーブルには、元の
Id
フィールドとSystemModstamp
フィールドが必要です。これらのフィールドの名前が異なる場合は、src/SFDC/src/table_schema/accounts.csv
ファイルを更新して、Id
フィールドの名前をAccountId
にマッピングし、システム変更タイムスタンプ フィールドをSystemModstamp
にマッピングする必要があります。詳細については、SystemModStamp のドキュメントをご覧ください。
別のツールでデータをすでに読み込んでいて、そのデータが常に更新されている場合でも、Cortex でそのデータを使用できます。CDC スクリプトには、既存のデータ構造を Cortex Framework が必要とする形式に変換できるマッピング ファイルが付属しています。このプロセス中にデータにカスタム フィールドを追加することもできます。
API 統合と CDC を構成する
Salesforce データを BigQuery に取り込むには、次の方法を使用します。
- API 呼び出し用の Cortex スクリプト: Salesforce または任意のデータ レプリケーション ツールのレプリケーション スクリプトを提供します。重要なのは、取り込まれるデータが Salesforce API から取得されたものと同じように見えることです。
- レプリケーション ツールと常に追加 : レプリケーションにツールを使用している場合、この方法は、新しいデータレコード(_appendalways_pattern)を追加するか、既存のレコードを更新できるツールに適しています。
- レプリケーション ツールと新しいレコードの追加: ツールがレコードを更新せず、変更を新しいレコードとしてターゲット(未加工)テーブルに複製する場合、Cortex Data Foundation には CDC 処理スクリプトを作成するオプションがあります。詳細については、CDC プロセスをご覧ください。
データが Cortex Framework の想定と一致するようにするには、マッピング構成を調整して、レプリケーション ツールまたは既存のスキーマをマッピングします。これにより、Cortex Framework Data Foundation が想定する構造と互換性のあるマッピング ビューが生成されます。
ingestion_settings.yaml
ファイルを使用して、Salesforce API を呼び出してデータを元のデータセットに複製するスクリプトの生成(セクション salesforce_to_raw_tables
)と、元のデータセットと CDC で処理されたデータセットに受信した変更を処理するスクリプトの生成(セクション raw_to_cdc_tables
)を構成します。
デフォルトでは、API から読み取るために提供されたスクリプトは、変更を元のデータセットに更新するため、CDC 処理スクリプトは必要ありません。代わりに、ソーススキーマを想定スキーマに合わせるマッピング ビューが作成されます。
config.json
の SFDC.createMappingViews=true
が空白の場合、CDC 処理スクリプトの生成は実行されません(デフォルトの動作)。CDC スクリプトが必要な場合は、SFDC.createMappingViews=false
を設定します。この 2 番目のステップでは、Cortex Framework Data Foundation の要件に応じて、ソース スキーマと必要なスキーマとの間でマッピングすることもできます。
次の setting.yaml
構成ファイルの例は、option 3
に示すように、レプリケーション ツールがレプリケートされたデータセットにデータを直接更新するときにマッピング ビューが生成される様子を示しています(つまり、CDC は不要で、テーブルとフィールド名の再マッピングのみが必要です)。CDC は不要であるため、config.json ファイルのパラメータ SFDC.createMappingViews
が true
のままである限り、このオプションは実行されます。
salesforce_to_raw_tables:
- base_table: accounts
raw_table: Accounts
api_name: Account
load_frequency: "@daily"
- base_table: cases
raw_table: cases2
api_name: Case
load_frequency: "@daily"
この例では、セクションからベーステーブルの構成またはすべての構成を削除すると、salesforce_to_raw_tables
に示すように、そのベーステーブルまたはセクション全体の DAG の生成がスキップされます。このシナリオでは、CDC 処理スクリプトを生成する必要がないため、パラメータ deployCDC : False
を設定しても同じ効果があります。
データ マッピング
受信データフィールドを Cortex Data Foundation が想定する形式にマッピングする必要があります。たとえば、ソースデータ システムの unicornId
という名前のフィールドは、名前を変更して Cortex Data Foundation 内で AccountId
(文字列データ型)として認識する必要があります。
- ソース フィールド:
unicornId
(ソースシステムで使用される名前) - Cortex フィールド:
AccountId
(Cortex が想定する名前) - データタイプ:
String
(Cortex が想定するデータ型)
ポリモーフィック フィールドのマッピング
Cortex Framework Data Foundation は、名前は異なるが構造は同じであるポリモーフィック フィールドのマッピングをサポートしています。多型フィールド型名(Who.Type
など)は、それぞれのマッピング CSV ファイル(src/SFDC/src/table_schema/tasks.csv
)に [Field Name]_Type
アイテムを追加することで複製できます。たとえば、Task
オブジェクトの Who.Type
フィールドを複製する必要がある場合は、Who_Type,Who_Type,STRING
行を追加します。これにより、自身にマッピングされ(同じ名前を保持)、文字列データ型を持つ Who.Type
という名前の新しいフィールドが定義されます。
DAG テンプレートの変更
Airflow または Cloud Composer のインスタンスに応じて、CDC 用の DAG テンプレートまたは元データ処理用の DAG テンプレートを調整することが必要になる場合があります。詳細については、Cloud Composer の設定の収集をご覧ください。
API 呼び出しからの CDC または元データの生成が不要な場合は、deployCDC=false
を設定します。また、ingestion_settings.yaml
のセクションの内容を削除することもできます。データ構造が Cortex Framework Data Foundation で想定されるものと一致していることがわかっている場合は、SFDC.createMappingViews=false
を設定してマッピング ビューの生成をスキップできます。
抽出モジュールの構成
このセクションでは、Data Foundation が提供する Salesforce to BigQuery 抽出モジュールを使用する手順について説明します。要件とフローはお使いのシステムと既存の構成によって異なる場合があります。または、利用可能な他のツールを使用することもできます。
認証情報と接続アプリを設定する
Salesforce インスタンスに管理者としてログインし、次の操作を行います。
- 次の要件を満たすプロファイルを Salesforce で作成するか、特定します。
Permission for Apex REST Services and API Enabled
は [システム パーミッション] で付与されます。View All
権限は、複製するすべてのオブジェクトに付与されます。たとえば、[アカウント] や [ケース] などです。制限や問題がないかセキュリティ管理者に確認します。- ユーザー インターフェースのログインに関連する権限が付与されていません(Lightning Experience の Salesforce Anywhere、モバイルの Salesforce Anywhere、Lightning Experience ユーザー、Lightning ログイン ユーザーなど)。セキュリティ管理者に制限や問題がないか確認します。
- Salesforce で既存のユーザーを作成または識別します。ユーザーのユーザー名、パスワード、セキュリティ トークンを把握しておく必要があります。次の点を考慮してください。
- 理想的には、このレプリケーションの実行専用のユーザーにする必要があります。
- ユーザーは、ステップ 1 で作成または識別したプロファイルに割り当てる必要があります。
- [ユーザー名] と [パスワード] をここで確認、再設定できます。
- セキュリティ トークンがない場合や、別のプロセスで使用されていない場合は、セキュリティ トークンをリセットできます。
- 接続アプリを作成します。これは、プロファイル、Salesforce API、標準ユーザー認証情報、セキュリティ トークンを使用して、外部から Salesforce への接続を確立する唯一の通信チャネルです。
- 手順に沿って API 統合の OAuth 設定を有効にする。
- [API (Enabled OAuth Settings)] セクションで
Require Secret for Web Server Flow
とRequire Secretfor Refresh Token Flow
が有効になっていることを確認します。 - コンシューマ キー(後でクライアント ID として使用)を取得する方法については、ドキュメントをご覧ください。問題や制限については、セキュリティ管理者にお問い合わせください。
- 作成したプロファイルに接続アプリケーションを割り当てます。
- Salesforce のホーム画面の右上にある [設定] を選択します。
- [クイック検索] ボックスに「
profile
」と入力し、[プロフィール] を選択します。手順 1 で作成したプロファイルを検索します。 - プロファイルを開きます。
- [割り当てられた接続済みアプリ] リンクをクリックします。
- [編集] をクリックします。
- 新しく作成した接続済みアプリを追加します。
- [保存] をクリックします。
Secret Manager を設定する
接続の詳細を保存するように Secret Manager を構成します。Salesforce-to-BigQuery モジュールは、Secret Manager を使用して、Salesforce と BigQuery への接続に必要な認証情報を安全に保存します。この方法では、パスワードなどの機密情報をコードや構成ファイルに直接公開しないため、セキュリティが強化されます。
次の仕様でシークレットを作成します。詳細な手順については、シークレットを作成するをご覧ください。
- シークレット名:
airflow-connections-salesforce-conn
シークレット値:
http://USERNAME:PASSWORD@https%3A%2F%2FINSTANCE_NAME.lightning.force.com?client_id=CLIENT_ID&security_token=SECRET_TOKEN`
次のように置き換えます。
USERNAME
は、ユーザー名に置き換えます。PASSWORD
は、パスワードに置き換えます。INSTANCE_NAME
はインスタンス名に置き換えます。CLIENT_ID
は、クライアント ID に置き換えます。SECRET_TOKEN
は、シークレット トークンに置き換えます。
詳細については、インスタンス名を確認する方法をご覧ください。
レプリケーション用の Cloud Composer ライブラリ
Cortex Framework Data Foundation が提供する DAG で Python スクリプトを実行するには、いくつかの依存関係をインストールする必要があります。Airflow バージョン 1.10 の場合は、Cloud Composer 1 の Python 依存関係をインストールするのドキュメントに沿って、次のパッケージを順番にインストールします。
tableauserverclient==0.17
apache-airflow-backport-providers-salesforce==2021.3.3
Airflow バージョン 2.x の場合は、Cloud Composer 2 の Python 依存関係をインストールするドキュメントで apache-airflow-providers-salesforce~=5.2.0
をインストールします。
次のコマンドを使用して、必要なパッケージをそれぞれインストールします。
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--update-pypi-package PACKAGE_NAME EXTRAS_AND_VERSION
次のように置き換えます。
ENVIRONMENT_NAME
は、割り当てられた環境名に置き換えます。LOCATION
は、ロケーションに置き換えます。PACKAGE_NAME
は、選択したパッケージ名に置き換えます。EXTRAS_AND_VERSION
は、追加機能とバージョンの仕様に置き換えます。
次のコマンドは、必要なパッケージのインストールの例です。
gcloud composer environments update my-composer-instance \
--location us-central1 \
--update-pypi-package apache-airflow-backport-providers-salesforce>=2021.3.3
Secret Manager をバックエンドとして有効にする
セキュリティ バックエンドとして Google Secret Manager を有効にします。この手順では、Cloud Composer 環境で使用されるパスワードや API キーなどの機密情報のプライマリ ストレージ ロケーションとして Secret Manager を有効にします。これにより、専用サービスで認証情報を一元化して管理することで、セキュリティが強化されます。詳細については、Secret Manager をご覧ください。
Composer サービス アカウントにシークレットへのアクセスを許可する
この手順により、Cloud Composer に関連付けられたサービス アカウントに、Secret Manager に保存されているシークレットにアクセスするために必要な権限が付与されます。デフォルトでは、Cloud Composer は Compute Engine サービス アカウントを使用します。必要な権限は Secret Manager Secret Accessor
です。この権限により、サービス アカウントは Secret Manager に保存されているシークレットを取得して使用できます。Secret Manager でアクセス制御を構成する包括的なガイドについては、アクセス制御のドキュメントをご覧ください。
Airflow での BigQuery 接続
Cloud Composer の設定の収集に従って、接続 sfdc_cdc_bq
を作成してください。この接続は、Salesforce から BigQuery へのモジュールが BigQuery との通信を確立するために使用される可能性があります。
次のステップ
- その他のデータソースとワークロードの詳細については、データソースとワークロードをご覧ください。
- 本番環境にデプロイする手順の詳細については、Cortex Framework Data Foundation のデプロイの前提条件をご覧ください。