Salesforce(SFDC)との連携

このページでは、Cortex Framework Data Foundation での Salesforce(SFDC)運用ワークロードの統合手順について説明します。Cortex Framework は、Salesforce のデータを Dataflow パイプラインと統合して BigQuery に接続します。Cloud Composer は、これらの Dataflow パイプラインのスケジュール設定とモニタリングを行い、データから分析情報を取得します。

構成ファイル

Cortex Framework Data Foundation リポジトリconfig.json ファイルには、Salesforce などの任意のデータソースからデータを転送するために必要な設定が構成されています。このファイルには、運用中の Salesforce ワークロードの次のパラメータが含まれています。

    "SFDC": {
        "deployCDC": true,
        "createMappingViews": true,
        "createPlaceholders": true,
        "datasets": {
            "cdc": "",
            "raw": "",
            "reporting": "REPORTING_SFDC"
        }
    }

次の表に、各 SFDC オペレーション パラメータの値を示します。

パラメータ 意味 デフォルト値 説明
SFDC.deployCDC CDC をデプロイする true Cloud Composer で DAG として実行する CDC 処理スクリプトを生成します。Salesforce Sales Cloud のさまざまな取り込みオプションについては、ドキュメントをご覧ください。
SFDC.createMappingViews マッピング ビューを作成する true 提供されている DAG は、Salesforce API から新しいレコードを取得し、ランディング ページでレコードを更新します。この値を true に設定すると、CDC で処理されたデータセットにビューが生成され、元のデータセットの「最新バージョンの真実」を含むテーブルが公開されます。false で SFDC.deployCDCtrue の場合、DAG は SystemModstamp に基づく変更データ キャプチャ(CDC)処理で生成されます。詳しくは、Salesforce の CDC 処理をご覧ください。
SFDC.createPlaceholders プレースホルダを作成する true ダウンストリーム レポートのデプロイがエラーなく実行されるように、取り込みプロセスで生成されない場合に備えて、空のプレースホルダ テーブルを作成します。
SFDC.datasets.raw 未加工のランディング データセット - CDC プロセスで使用され、レプリケーション ツールが Salesforce からデータを配置する場所です。テストデータを使用する場合は、空のデータセットを作成します。
SFDC.datasets.cdc CDC 処理済みデータセット - レポートビューのソースとして機能し、DAG で処理されるレコードのターゲットとして機能するデータセット。テストデータを使用する場合は、空のデータセットを作成します。
SFDC.datasets.reporting レポート データセット SFDC "REPORTING_SFDC" エンドユーザーがレポート用にアクセスできるデータセットの名前。ビューとユーザー向けテーブルがデプロイされます。
SFDC.currencies 通貨のフィルタ [ "USD" ] テストデータを使用しない場合は、ビジネスに関連する単一の通貨([ "USD" ] など)または複数の通貨([ "USD", "CAD" ] など)を入力します。これらの値は、利用可能な場合は分析モデルの SQL のプレースホルダの置換に使用されます。

データモデル

このセクションでは、エンティティ関係図(ERD)を使用して Salesforce(SFDC)データモデルについて説明します。

SFDC のエンティティ関係図

図 2Salesforce(SFDC): エンティティ リレーションシップ図。

ベースビュー

これらは ERD の青色のオブジェクトであり、CDC テーブルのビューで、一部の列名エイリアス以外の変換はありません。スクリプトは src/SFDC/src/reporting/ddls にあります。

レポートビュー

これらは ERD の緑色のオブジェクトであり、レポート テーブルで使用される関連するディメンション属性が含まれています。スクリプトは src/SFDC/src/reporting/ddls にあります。

Salesforce データの要件

このセクションでは、Cortex Framework で使用するために Salesforce データをどのように構造化するかについて説明します。

  • テーブル構造:
    • 命名: テーブル名には snake_case(アンダースコアで区切られた小文字の単語)を使用し、複数形にします。例: some_objects
    • データ型: 列は、Salesforce 内で表されるデータ型と同じデータ型を保持します。
    • 可読性: レポートレイヤの明確性を高めるために、一部のフィールド名が若干調整される場合があります。
  • 空のテーブルとデプロイ: 元のデータセットにない必要なテーブルは、デプロイ プロセス中に空のテーブルとして自動的に作成されます。これにより、CDC デプロイ ステップをスムーズに実行できます。
  • CDC の要件: Id フィールドと SystemModstamp フィールドは、CDC スクリプトがデータの変更を追跡するために重要です。これらの名前をそのまま使用することも、別の名前を使用することもできます。提供されている未加工処理スクリプトは、これらのフィールドを API から自動的に取得し、ターゲット レプリケーション テーブルを更新します。
    • Id: 各レコードの一意の識別子として機能します。
    • SystemModstamp: このフィールドには、レコードが最後に変更された日時を示すタイムスタンプが格納されます。
  • 元データ処理スクリプト:提供される元データ処理スクリプトでは、追加の(CDC)処理は必要ありません。この動作は、デプロイ時にデフォルトで設定されます。

通貨換算のソーステーブル

Salesforce では、次の 2 つの方法で通貨を管理できます。

  • 基本: デフォルトです。すべてのデータで単一の通貨が使用されます。
  • 詳細: 為替レートに基づいて複数の通貨を変換します(詳細な通貨管理を有効にする必要があります)。

高度な通貨管理を使用する場合、Salesforce は次の 2 つの特別なテーブルを使用します。

  • CurrencyTypes: このテーブルには、使用するさまざまな通貨(米ドル、ユーロなど)に関する情報が保存されます。
  • DatedConversionRates: この表には、通貨間の為替レートの経時的な変化が示されます。

Cortex Framework では、高度な通貨管理を使用する場合は、これらのテーブルが存在することが前提とされています。高度な通貨管理を使用しない場合は、これらのテーブルに関連するエントリを構成ファイル(src/SFDC/config/ingestion_settings.yaml)から削除できます。これにより、存在しないテーブルからデータを抽出する不要な試行を防ぐことができます。

SFDC データを BigQuery に読み込む

Cortex Framework は、Apache AirflowSalesforce Bulk API 2.0 でスケジュール設定された Python スクリプトに基づくレプリケーション ソリューションを提供します。これらの Python スクリプトは、任意のツールで適応してスケジュール設定できます。詳細については、SFDC 抽出モジュールをご覧ください。

Cortex Framework には、データの取得元と管理方法に応じて、データを統合する 3 つの方法があります。

  1. API 呼び出し: このオプションは、API を介して直接アクセスできるデータに使用します。Cortex Framework は API を呼び出してデータを取得し、BigQuery 内の「未加工」データセットに保存できます。データセットに既存のレコードがある場合、Cortex Framework は新しいデータでレコードを更新できます。
  2. 構造マッピング ビュー: この方法は、別のツールを使用して BigQuery にデータがすでに読み込まれているが、データ構造が Cortex Framework の要件と一致しない場合などに便利です。Cortex Framework は、「ビュー」(仮想テーブルなど)を使用して、既存のデータ構造を Cortex Framework のレポート機能で想定される形式に変換します。
  3. CDC(変更データ キャプチャ)処理スクリプト: このオプションは、絶えず変化するデータ用に特別に設計されています。CDC スクリプトはこれらの変更を追跡し、それに応じて BigQuery のデータを更新します。これらのスクリプトは、データ内の 2 つの特別なフィールドに依存しています。

    • Id: 各レコードの一意の識別子。
    • SystemModstamp: レコードが変更された日時を示すタイムスタンプ。

    データにこれらの名前が正確に含まれていない場合は、スクリプトを調整して、異なる名前で認識するようにできます。このプロセス中にデータスキーマにカスタム フィールドを追加することもできます。たとえば、Account オブジェクトのデータを含むソーステーブルには、元の Id フィールドと SystemModstamp フィールドが必要です。これらのフィールドの名前が異なる場合は、src/SFDC/src/table_schema/accounts.csv ファイルを更新して、Id フィールドの名前を AccountId にマッピングし、システム変更タイムスタンプ フィールドを SystemModstamp にマッピングする必要があります。詳細については、SystemModStamp のドキュメントをご覧ください。

別のツールでデータをすでに読み込んでいて、そのデータが常に更新されている場合でも、Cortex でそのデータを使用できます。CDC スクリプトには、既存のデータ構造を Cortex Framework が必要とする形式に変換できるマッピング ファイルが付属しています。このプロセス中にデータにカスタム フィールドを追加することもできます。

API 統合と CDC を構成する

Salesforce データを BigQuery に取り込むには、次の方法を使用します。

  1. API 呼び出し用の Cortex スクリプト: Salesforce または任意のデータ レプリケーション ツールのレプリケーション スクリプトを提供します。重要なのは、取り込まれるデータが Salesforce API から取得されたものと同じように見えることです。
  2. レプリケーション ツールと常に追加 : レプリケーションにツールを使用している場合、この方法は、新しいデータレコード(_appendalways_pattern)を追加するか、既存のレコードを更新できるツールに適しています。
  3. レプリケーション ツールと新しいレコードの追加: ツールがレコードを更新せず、変更を新しいレコードとしてターゲット(未加工)テーブルに複製する場合、Cortex Data Foundation には CDC 処理スクリプトを作成するオプションがあります。詳細については、CDC プロセスをご覧ください。

Salesforce ワークロード: データ インテグレーション オプション

図 1Salesforce ワークロード: データ インテグレーション オプション。

データが Cortex Framework の想定と一致するようにするには、マッピング構成を調整して、レプリケーション ツールまたは既存のスキーマをマッピングします。これにより、Cortex Framework Data Foundation が想定する構造と互換性のあるマッピング ビューが生成されます。

ingestion_settings.yaml ファイルを使用して、Salesforce API を呼び出してデータを元のデータセットに複製するスクリプトの生成(セクション salesforce_to_raw_tables)と、元のデータセットと CDC で処理されたデータセットに受信した変更を処理するスクリプトの生成(セクション raw_to_cdc_tables)を構成します。

デフォルトでは、API から読み取るために提供されたスクリプトは、変更を元のデータセットに更新するため、CDC 処理スクリプトは必要ありません。代わりに、ソーススキーマを想定スキーマに合わせるマッピング ビューが作成されます。

config.jsonSFDC.createMappingViews=true が空白の場合、CDC 処理スクリプトの生成は実行されません(デフォルトの動作)。CDC スクリプトが必要な場合は、SFDC.createMappingViews=false を設定します。この 2 番目のステップでは、Cortex Framework Data Foundation の要件に応じて、ソース スキーマと必要なスキーマとの間でマッピングすることもできます。

次の setting.yaml 構成ファイルの例は、option 3 に示すように、レプリケーション ツールがレプリケートされたデータセットにデータを直接更新するときにマッピング ビューが生成される様子を示しています(つまり、CDC は不要で、テーブルとフィールド名の再マッピングのみが必要です)。CDC は不要であるため、config.json ファイルのパラメータ SFDC.createMappingViewstrue のままである限り、このオプションは実行されます。

  salesforce_to_raw_tables:
  - base_table: accounts
    raw_table: Accounts
    api_name: Account
      load_frequency: "@daily"
  - base_table: cases
    raw_table: cases2
    api_name: Case
    load_frequency: "@daily"

この例では、セクションからベーステーブルの構成またはすべての構成を削除すると、salesforce_to_raw_tables に示すように、そのベーステーブルまたはセクション全体の DAG の生成がスキップされます。このシナリオでは、CDC 処理スクリプトを生成する必要がないため、パラメータ deployCDC : False を設定しても同じ効果があります。

データ マッピング

受信データフィールドを Cortex Data Foundation が想定する形式にマッピングする必要があります。たとえば、ソースデータ システムの unicornId という名前のフィールドは、名前を変更して Cortex Data Foundation 内で AccountId(文字列データ型)として認識する必要があります。

  • ソース フィールド: unicornId(ソースシステムで使用される名前)
  • Cortex フィールド: AccountId(Cortex が想定する名前)
  • データタイプ: String(Cortex が想定するデータ型)

ポリモーフィック フィールドのマッピング

Cortex Framework Data Foundation は、名前は異なるが構造は同じであるポリモーフィック フィールドのマッピングをサポートしています。多型フィールド型名(Who.Type など)は、それぞれのマッピング CSV ファイル(src/SFDC/src/table_schema/tasks.csv)に [Field Name]_Type アイテムを追加することで複製できます。たとえば、Task オブジェクトの Who.Type フィールドを複製する必要がある場合は、Who_Type,Who_Type,STRING 行を追加します。これにより、自身にマッピングされ(同じ名前を保持)、文字列データ型を持つ Who.Type という名前の新しいフィールドが定義されます。

DAG テンプレートの変更

Airflow または Cloud Composer のインスタンスに応じて、CDC 用の DAG テンプレートまたは元データ処理用の DAG テンプレートを調整することが必要になる場合があります。詳細については、Cloud Composer の設定の収集をご覧ください。

API 呼び出しからの CDC または元データの生成が不要な場合は、deployCDC=false を設定します。また、ingestion_settings.yaml のセクションの内容を削除することもできます。データ構造が Cortex Framework Data Foundation で想定されるものと一致していることがわかっている場合は、SFDC.createMappingViews=false を設定してマッピング ビューの生成をスキップできます。

抽出モジュールの構成

このセクションでは、Data Foundation が提供する Salesforce to BigQuery 抽出モジュールを使用する手順について説明します。要件とフローはお使いのシステムと既存の構成によって異なる場合があります。または、利用可能な他のツールを使用することもできます。

認証情報と接続アプリを設定する

Salesforce インスタンスに管理者としてログインし、次の操作を行います。

  1. 次の要件を満たすプロファイルを Salesforce で作成するか、特定します。
    1. Permission for Apex REST Services and API Enabled は [システム パーミッション] で付与されます。
    2. View All 権限は、複製するすべてのオブジェクトに付与されます。たとえば、[アカウント] や [ケース] などです。制限や問題がないかセキュリティ管理者に確認します。
    3. ユーザー インターフェースのログインに関連する権限が付与されていません(Lightning Experience の Salesforce Anywhere、モバイルの Salesforce Anywhere、Lightning Experience ユーザー、Lightning ログイン ユーザーなど)。セキュリティ管理者に制限や問題がないか確認します。
  2. Salesforce で既存のユーザーを作成または識別します。ユーザーのユーザー名パスワードセキュリティ トークンを把握しておく必要があります。次の点を考慮してください。
    • 理想的には、このレプリケーションの実行専用のユーザーにする必要があります。
    • ユーザーは、ステップ 1 で作成または識別したプロファイルに割り当てる必要があります。
    • [ユーザー名] と [パスワード] をここで確認、再設定できます。
    • セキュリティ トークンがない場合や、別のプロセスで使用されていない場合は、セキュリティ トークンをリセットできます。
  3. 接続アプリを作成します。これは、プロファイル、Salesforce API、標準ユーザー認証情報、セキュリティ トークンを使用して、外部から Salesforce への接続を確立する唯一の通信チャネルです。
    1. 手順に沿って API 統合の OAuth 設定を有効にする
    2. [API (Enabled OAuth Settings)] セクションで Require Secret for Web Server FlowRequire Secretfor Refresh Token Flow が有効になっていることを確認します。
    3. コンシューマ キー(後でクライアント ID として使用)を取得する方法については、ドキュメントをご覧ください。問題や制限については、セキュリティ管理者にお問い合わせください。
  4. 作成したプロファイルに接続アプリケーションを割り当てます。
    1. Salesforce のホーム画面の右上にある [設定] を選択します。
    2. [クイック検索] ボックスに「profile」と入力し、[プロフィール] を選択します。手順 1 で作成したプロファイルを検索します。
    3. プロファイルを開きます。
    4. [割り当てられた接続済みアプリ] リンクをクリックします。
    5. [編集] をクリックします。
    6. 新しく作成した接続済みアプリを追加します。
    7. [保存] をクリックします。

Secret Manager を設定する

接続の詳細を保存するように Secret Manager を構成します。Salesforce-to-BigQuery モジュールは、Secret Manager を使用して、Salesforce と BigQuery への接続に必要な認証情報を安全に保存します。この方法では、パスワードなどの機密情報をコードや構成ファイルに直接公開しないため、セキュリティが強化されます。

次の仕様でシークレットを作成します。詳細な手順については、シークレットを作成するをご覧ください。

  • シークレット名: airflow-connections-salesforce-conn
  • シークレット値:

    http://USERNAME:PASSWORD@https%3A%2F%2FINSTANCE_NAME.lightning.force.com?client_id=CLIENT_ID&security_token=SECRET_TOKEN`
    

    次のように置き換えます。

    • USERNAME は、ユーザー名に置き換えます。
    • PASSWORD は、パスワードに置き換えます。
    • INSTANCE_NAME はインスタンス名に置き換えます。
    • CLIENT_ID は、クライアント ID に置き換えます。
    • SECRET_TOKEN は、シークレット トークンに置き換えます。

詳細については、インスタンス名を確認する方法をご覧ください。

レプリケーション用の Cloud Composer ライブラリ

Cortex Framework Data Foundation が提供する DAG で Python スクリプトを実行するには、いくつかの依存関係をインストールする必要があります。Airflow バージョン 1.10 の場合は、Cloud Composer 1 の Python 依存関係をインストールするのドキュメントに沿って、次のパッケージを順番にインストールします。

tableauserverclient==0.17
apache-airflow-backport-providers-salesforce==2021.3.3

Airflow バージョン 2.x の場合は、Cloud Composer 2 の Python 依存関係をインストールするドキュメントで apache-airflow-providers-salesforce~=5.2.0 をインストールします。

次のコマンドを使用して、必要なパッケージをそれぞれインストールします。

  gcloud composer environments update ENVIRONMENT_NAME \
  --location LOCATION \
   --update-pypi-package PACKAGE_NAME EXTRAS_AND_VERSION

次のように置き換えます。

  • ENVIRONMENT_NAME は、割り当てられた環境名に置き換えます。
  • LOCATION は、ロケーションに置き換えます。
  • PACKAGE_NAME は、選択したパッケージ名に置き換えます。
  • EXTRAS_AND_VERSION は、追加機能とバージョンの仕様に置き換えます。

次のコマンドは、必要なパッケージのインストールの例です。

gcloud composer environments update my-composer-instance \
  --location us-central1 \
  --update-pypi-package apache-airflow-backport-providers-salesforce>=2021.3.3

Secret Manager をバックエンドとして有効にする

セキュリティ バックエンドとして Google Secret Manager を有効にします。この手順では、Cloud Composer 環境で使用されるパスワードや API キーなどの機密情報のプライマリ ストレージ ロケーションとして Secret Manager を有効にします。これにより、専用サービスで認証情報を一元化して管理することで、セキュリティが強化されます。詳細については、Secret Manager をご覧ください。

Composer サービス アカウントにシークレットへのアクセスを許可する

この手順により、Cloud Composer に関連付けられたサービス アカウントに、Secret Manager に保存されているシークレットにアクセスするために必要な権限が付与されます。デフォルトでは、Cloud Composer は Compute Engine サービス アカウントを使用します。必要な権限は Secret Manager Secret Accessor です。この権限により、サービス アカウントは Secret Manager に保存されているシークレットを取得して使用できます。Secret Manager でアクセス制御を構成する包括的なガイドについては、アクセス制御のドキュメントをご覧ください。

Airflow での BigQuery 接続

Cloud Composer の設定の収集に従って、接続 sfdc_cdc_bq を作成してください。この接続は、Salesforce から BigQuery へのモジュールが BigQuery との通信を確立するために使用される可能性があります。

次のステップ