Salesforce(SFDC)との統合

このページでは、Cortex Framework Data Foundation で Salesforce(SFDC)の運用ワークロードを統合する手順について説明します。Cortex Framework は、Salesforce のデータを Dataflow パイプラインを介して BigQuery に統合します。一方、Cloud Composer は、これらの Dataflow パイプラインをスケジュールしてモニタリングし、データから分析情報を取得します。

構成ファイル

Cortex Framework Data Foundation リポジトリconfig.json ファイルは、Salesforce を含む任意のデータソースからデータを転送するために必要な設定を構成します。このファイルには、運用 Salesforce ワークロードの次のパラメータが含まれています。

    "SFDC": {
        "deployCDC": true,
        "createMappingViews": true,
        "createPlaceholders": true,
        "datasets": {
            "cdc": "",
            "raw": "",
            "reporting": "REPORTING_SFDC"
        }
    }

次の表に、SFDC の各オペレーション パラメータの値を示します。

パラメータ 意味 デフォルト値 説明
SFDC.deployCDC CDC をデプロイする true Cloud Composer で DAG として実行する CDC 処理スクリプトを生成します。Salesforce Sales Cloud のさまざまな取り込みオプションについては、ドキュメントをご覧ください。
SFDC.createMappingViews マッピング ビューを作成する true Salesforce API から新しいレコードを取得するために提供された DAG は、ランディング ページでレコードを更新します。この値を true に設定すると、CDC 処理済みデータセットにビューが生成され、Raw データセットから「最新バージョンの真実」を含むテーブルが公開されます。false で SFDC.deployCDCtrue の場合、SystemModstamp に基づく変更データ キャプチャ(CDC)処理で DAG が生成されます。詳細については、Salesforce の CDC 処理をご覧ください。
SFDC.createPlaceholders プレースホルダを作成する true 取り込みプロセスで生成されない場合に、空のプレースホルダ テーブルを作成して、ダウンストリーム レポートのデプロイが失敗せずに実行されるようにします。
SFDC.datasets.raw 未加工のランディング データセット - CDC プロセスで使用されます。レプリケーション ツールが Salesforce からのデータをここに配置します。テストデータを使用する場合は、空のデータセットを作成します。
SFDC.datasets.cdc CDC 処理済みデータセット - レポートビューのソースとして機能し、処理された DAG のレコードのターゲットとなるデータセット。テストデータを使用する場合は、空のデータセットを作成します。
SFDC.datasets.reporting レポート データセット SFDC "REPORTING_SFDC" レポート用にエンドユーザーがアクセスできるデータセットの名前。ビューとユーザー向けテーブルがデプロイされます。
SFDC.currencies 通貨のフィルタリング [ "USD" ] テストデータを使用していない場合は、ビジネスに関連する単一の通貨([ "USD" ] など)または複数の通貨([ "USD", "CAD" ] など)を入力します。これらの値は、使用可能な分析モデルの SQL でプレースホルダを置き換えるために使用されます。

データモデル

このセクションでは、エンティティ関連図(ERD)を使用して Salesforce(SFDC)データモデルについて説明します。

SFDC のエンティティ関係図

図 2. Salesforce(SFDC): エンティティ関連図。

ベースビュー

これらは ERD の青いオブジェクトであり、一部の列名エイリアス以外の変換がない CDC テーブルのビューです。src/SFDC/src/reporting/ddls のスクリプトをご覧ください。

レポートビュー

これらは ERD の緑色のオブジェクトで、レポート テーブルで使用される関連するディメンション属性が含まれています。src/SFDC/src/reporting/ddls のスクリプトをご覧ください。

Salesforce のデータ要件

このセクションでは、Cortex Framework で使用するために Salesforce データを構造化する方法について詳しく説明します。

  • テーブル構造:
    • 命名: テーブル名には snake_case(アンダースコアで区切られた小文字の単語)が使用され、複数形になります。例: some_objects
    • データ型: 列は、Salesforce 内で表現されているものと同じデータ型を維持します。
    • 読みやすさ: レポートレイヤのわかりやすさを高めるために、一部のフィールド名が若干調整されることがあります。
  • 空のテーブルとデプロイ: 元のデータセットにない必要なテーブルは、デプロイ プロセス中に空のテーブルとして自動的に作成されます。これにより、CDC デプロイ ステップがスムーズに実行されます。
  • CDC の要件: Id フィールドと SystemModstamp フィールドは、CDC スクリプトがデータの変更を追跡するために不可欠です。これらの名前は、同じでも異なってもかまいません。提供されている Raw 処理スクリプトは、これらのフィールドを API から自動的に取得し、ターゲットのレプリケーション テーブルを更新します。
    • Id: 各レコードの一意の識別子として機能します。
    • SystemModstamp: このフィールドには、レコードが最後に変更された日時を示すタイムスタンプが保存されます。
  • 未加工処理スクリプト:提供された未加工処理スクリプトでは、追加の(CDC)処理は必要ありません。この動作は、デフォルトでデプロイ時に設定されます。

通貨換算のソーステーブル

Salesforce では、次の 2 つの方法で通貨を管理できます。

  • 基本: デフォルトでは、すべてのデータで単一の通貨が使用されます。
  • Advanced: 複数の通貨を為替レートに基づいて換算します(高度な通貨管理を有効にする必要があります)。

高度な通貨管理を使用している場合、Salesforce では次の 2 つの特別なテーブルが使用されます。

  • CurrencyTypes: このテーブルには、使用するさまざまな通貨(USD、EUR など)に関する情報が保存されます。
  • DatedConversionRates: このテーブルには、通貨間の為替レートが経時的に保持されます。

高度な通貨管理を使用する場合、Cortex Framework はこれらのテーブルが存在することを想定しています。高度な通貨管理を使用しない場合は、構成ファイル(src/SFDC/config/ingestion_settings.yaml)からこれらのテーブルに関連するエントリを削除できます。この手順により、存在しないテーブルからデータを抽出する不要な試行を防ぐことができます。

SFDC データを BigQuery に読み込む

Cortex Framework は、Apache AirflowSalesforce Bulk API 2.0 でスケジュールされた Python スクリプトに基づくレプリケーション ソリューションを提供します。これらの Python スクリプトは、選択したツールで調整してスケジュール設定できます。詳細については、SFDC 抽出モジュールをご覧ください。

Cortex Framework には、データの取得元と管理方法に応じて、データを統合する 3 つの方法も用意されています。

  1. API 呼び出し: このオプションは、API を介して直接アクセスできるデータ用です。Cortex Framework は API を呼び出してデータを取得し、BigQuery 内の「Raw」データセットに保存できます。データセットに既存のレコードがある場合、Cortex Framework は新しいデータでレコードを更新できます。
  2. 構造マッピング ビュー: この方法は、別のツールを使用して BigQuery にデータを読み込んでいるが、データ構造が Cortex Framework の要件と一致しない場合に便利です。Cortex Framework は「ビュー」(仮想テーブルなど)を使用して、既存のデータ構造を Cortex Framework のレポート機能で想定される形式に変換します。
  3. CDC(変更データ キャプチャ)処理スクリプト: このオプションは、常に変化するデータを対象に設計されています。CDC スクリプトはこれらの変更を追跡し、BigQuery のデータを適宜更新します。これらのスクリプトは、データの 2 つの特別なフィールドに依存しています。

    • Id: 各レコードの一意の識別子。
    • SystemModstamp: レコードが変更された日時を示すタイムスタンプ。

    データにこれらの名前が正確にない場合は、スクリプトを調整して、別の名前で認識できるようにします。このプロセスで、データスキーマにカスタム フィールドを追加することもできます。たとえば、Account オブジェクトのデータを含むソーステーブルには、元の Id フィールドと SystemModstamp フィールドが必要です。これらのフィールドの名前が異なる場合は、src/SFDC/src/table_schema/accounts.csv ファイルを更新して、Id フィールドの名前を AccountId にマッピングし、システム変更タイムスタンプ フィールドを SystemModstamp にマッピングする必要があります。詳細については、SystemModStamp のドキュメントをご覧ください。

別のツールでデータをすでに読み込んでいる場合(常に更新されている場合)、Cortex はそのデータを使用できます。CDC スクリプトには、既存のデータ構造を Cortex Framework が必要とする形式に変換できるマッピング ファイルが付属しています。このプロセス中に、データにカスタム フィールドを追加することもできます。

API 統合と CDC を構成する

Salesforce データを BigQuery に取り込むには、次の方法を使用できます。

  1. API 呼び出し用の Cortex スクリプト: Salesforce または選択したデータ レプリケーション ツール用のレプリケーション スクリプトを提供します。重要なのは、取り込むデータが Salesforce API から取得したデータと同じように見えることです。
  2. 複製ツールと追加のみ : 複製ツールを使用している場合、この方法は、新しいデータレコード(_appendalways_pattern)を追加するか、既存のレコードを更新できるツールで使用します。
  3. レプリケーション ツールと新しいレコードの追加: ツールがレコードを更新せず、変更を新しいレコードとしてターゲット(Raw)テーブルに複製する場合、Cortex Data Foundation には CDC 処理スクリプトを作成するオプションが用意されています。詳細については、CDC プロセスをご覧ください。

Salesforce ワークロード: データ統合オプション

図 1. Salesforce ワークロード: データ統合オプション。

データが Cortex Framework の想定と一致するように、マッピング構成を調整して、レプリケーション ツールまたは既存のスキーマをマッピングできます。これにより、Cortex Framework Data Foundation で想定される構造と互換性のあるマッピング ビューが生成されます。

ingestion_settings.yaml ファイルを使用して、Salesforce API を呼び出してデータを Raw データセット(セクション salesforce_to_raw_tables)に複製するスクリプトの生成と、Raw データセットと CDC 処理済みデータセット(セクション raw_to_cdc_tables)に届く変更を処理するスクリプトの生成を構成します。

デフォルトでは、API から読み取るために提供されるスクリプトは、変更を Raw データセットに更新するため、CDC 処理スクリプトは必要ありません。代わりに、ソーススキーマを想定されるスキーマに合わせるためのマッピング ビューが作成されます。

config.jsonSFDC.createMappingViews=true が(デフォルトの動作)の場合、CDC 処理スクリプトの生成は実行されません。CDC スクリプトが必要な場合は、SFDC.createMappingViews=false を設定します。この 2 番目のステップでは、Cortex Framework Data Foundation で必要とされるスキーマにソース スキーマをマッピングすることもできます。

次の setting.yaml 構成ファイルの例は、option 3 に示すように、レプリケーション ツールが複製されたデータセットにデータを直接更新するときにマッピング ビューを生成する方法を示しています(つまり、CDC は不要で、テーブルとフィールド名の再マッピングのみが必要です)。CDC は必要ないため、このオプションは config.json ファイルのパラメータ SFDC.createMappingViewstrue のままであれば実行されます。

  salesforce_to_raw_tables:
  - base_table: accounts
    raw_table: Accounts
    api_name: Account
      load_frequency: "@daily"
  - base_table: cases
    raw_table: cases2
    api_name: Case
    load_frequency: "@daily"

この例では、salesforce_to_raw_tables で示されているように、セクションからベーステーブルまたはすべてのベーステーブルの構成を削除すると、そのベーステーブルまたはセクション全体の DAG の生成がスキップされます。このシナリオでは、CDC 処理スクリプトを生成する必要がないため、パラメータ deployCDC : False を設定しても同じ効果が得られます。

データ マッピング

受信したデータフィールドを Cortex Data Foundation が想定する形式にマッピングする必要があります。たとえば、ソースデータ システムの unicornId という名前のフィールドは、Cortex Data Foundation 内で AccountId(文字列データ型)として名前変更され、認識される必要があります。

  • ソース フィールド: unicornId(ソースシステムで使用される名前)
  • Cortex フィールド: AccountId(Cortex が想定する名前)
  • データ型: String(Cortex が想定するデータ型)

ポリモーフィック フィールドのマッピング

Cortex Framework Data Foundation は、名前は異なるが構造は一貫しているフィールドであるポリモーフィック フィールドのマッピングをサポートしています。ポリモーフィック フィールドの型名(Who.Type など)は、それぞれのマッピング CSV ファイルに [Field Name]_Type 項目を追加することで複製できます(src/SFDC/src/table_schema/tasks.csv)。たとえば、Task オブジェクトの Who.Type フィールドを複製する必要がある場合は、Who_Type,Who_Type,STRING 行を追加します。これにより、Who.Type という名前の新しいフィールドが定義されます。このフィールドは、それ自体にマッピングされ(同じ名前が保持されます)、文字列データ型を持ちます。

DAG テンプレートの変更

Airflow または Cloud Composer のインスタンスの要件に応じて、CDC 用の DAG テンプレートまたは元データ処理用の DAG テンプレートを調整する必要がある場合があります。詳細については、Cloud Composer の設定の収集をご覧ください。

API 呼び出しからの CDC または未加工データの生成が不要な場合は、deployCDC=false を設定します。また、ingestion_settings.yaml のセクションの内容を削除することもできます。データ構造が Cortex Framework Data Foundation で想定されているものと一致することがわかっている場合は、SFDC.createMappingViews=false を設定してマッピング ビューの生成をスキップできます。

抽出モジュールの構成

このセクションでは、Data Foundation が提供する Salesforce から BigQuery への抽出モジュールを使用する手順について説明します。要件とフローは、システムと既存の構成によって異なる場合があります。他の利用可能なツールを使用することもできます。

認証情報と接続済みアプリを設定する

Salesforce インスタンスに管理者としてログインして、次の操作を行います。

  1. 次の要件を満たす Salesforce のプロファイルを作成するか、特定します。
    1. Permission for Apex REST Services and API Enabled は、システム パーミッションで付与されます。
    2. 複製するすべてのオブジェクトに View All 権限が付与されている。たとえば、Account、Cases などです。セキュリティ管理者にお問い合わせください。
    3. ユーザー インターフェース ログインに関連する権限(Lightning Experience の Salesforce Anywhere、モバイルの Salesforce Anywhere、Lightning Experience ユーザー、Lightning ログイン ユーザーなど)が付与されていません。セキュリティ管理者による制限や問題がないか確認します。
  2. Salesforce でユーザーを作成するか、既存のユーザーを特定します。ユーザーのユーザー名パスワードセキュリティ トークンを知っておく必要があります。次の点を考慮してください。
    • 理想的には、このレプリケーションの実行専用のユーザーにする必要があります。
    • ユーザーは、ステップ 1 で作成または特定したプロファイルに割り当てられている必要があります。
    • ここで、ユーザー名を確認し、パスワードを再設定できます。
    • セキュリティ トークンがない場合や、別のプロセスで使用されていない場合は、セキュリティ トークンをリセットできます。
  3. 接続アプリを作成します。これは、プロファイル、Salesforce API、標準ユーザー認証情報、セキュリティ トークンを使用して、外部から Salesforce への接続を確立する唯一の通信チャネルです。
    1. 手順に沿って API 統合の OAuth 設定を有効にします
    2. [API (OAuth 設定の有効化)] セクションで Require Secret for Web Server FlowRequire Secretfor Refresh Token Flow が有効になっていることを確認します。
    3. コンシューマー キー(後でクライアント ID として使用)を取得する方法については、ドキュメントをご覧ください。問題や制限については、セキュリティ管理者に確認してください。
  4. 作成したプロファイルに接続アプリケーションを割り当てます。
    1. Salesforce のホーム画面の右上にある [設定] を選択します。
    2. [クイック検索] ボックスに「profile」と入力し、[プロファイル] を選択します。手順 1 で作成したプロファイルを検索します。
    3. プロファイルを開きます。
    4. [割り当てられた接続アプリ] リンクをクリックします。
    5. [編集] をクリックします。
    6. 新しく作成した接続アプリケーションを追加します。
    7. [保存] をクリックします。

Secret Manager を設定する

接続の詳細を保存するように Secret Manager を構成します。Salesforce-to-BigQuery モジュールは、Salesforce と BigQuery への接続に必要な認証情報を安全に保存するために Secret Manager を使用します。この方法では、パスワードなどの機密情報がコードや構成ファイルに直接公開されることがないため、セキュリティが強化されます。

次の仕様で Secret を作成します。詳細な手順については、シークレットを作成するをご覧ください。

  • シークレット名: airflow-connections-salesforce-conn
  • シークレット値:

    http://USERNAME:PASSWORD@https%3A%2F%2FINSTANCE_NAME.lightning.force.com?client_id=CLIENT_ID&security_token=SECRET_TOKEN`
    

    次のように置き換えます。

    • USERNAME は、ユーザー名に置き換えます。
    • PASSWORD は、パスワードに置き換えます。
    • INSTANCE_NAME はインスタンス名に置き換えます。
    • CLIENT_ID はクライアント ID に置き換えます。
    • SECRET_TOKEN は、シークレット トークンに置き換えます。

詳細については、インスタンス名を確認する方法をご覧ください。

レプリケーション用の Cloud Composer ライブラリ

Cortex Framework Data Foundation によって提供される DAG で Python スクリプトを実行するには、いくつかの依存関係をインストールする必要があります。Airflow バージョン 1.10 の場合は、Cloud Composer 1 の Python 依存関係をインストールするドキュメントに沿って、次のパッケージを順番にインストールします。

tableauserverclient==0.17
apache-airflow-backport-providers-salesforce==2021.3.3

Airflow バージョン 2.x の場合は、Cloud Composer 2 の Python 依存関係をインストールするのドキュメントを参照して、apache-airflow-providers-salesforce~=5.2.0 をインストールします。

次のコマンドを使用して、必要なパッケージをそれぞれインストールします。

  gcloud composer environments update ENVIRONMENT_NAME \
  --location LOCATION \
   --update-pypi-package PACKAGE_NAME EXTRAS_AND_VERSION

次のように置き換えます。

  • ENVIRONMENT_NAME は、割り当てられた環境名に置き換えます。
  • LOCATION は、ロケーションに置き換えます。
  • PACKAGE_NAME は、選択したパッケージ名に置き換えます。
  • EXTRAS_AND_VERSION は、追加とバージョンの仕様に置き換えます。

次のコマンドは、必要なパッケージのインストール例です。

gcloud composer environments update my-composer-instance \
  --location us-central1 \
  --update-pypi-package apache-airflow-backport-providers-salesforce>=2021.3.3

Secret Manager をバックエンドとして有効にする

セキュリティ バックエンドとして Google Secret Manager を有効にします。この手順では、Cloud Composer 環境で使用されるパスワードや API キーなどの機密情報のプライマリ保存場所として Secret Manager を有効にするよう指示します。これにより、専用サービスで認証情報を一元管理してセキュリティを強化できます。詳細については、Secret Manager をご覧ください。

Composer サービス アカウントにシークレットへのアクセスを許可する

この手順により、Cloud Composer に関連付けられたサービス アカウントに、Secret Manager に保存されているシークレットにアクセスするために必要な権限が付与されます。デフォルトでは、Cloud Composer は Compute Engine サービス アカウントを使用します。必要な権限は Secret Manager Secret Accessor です。この権限により、サービス アカウントは Secret Manager に保存されているシークレットを取得して使用できます。Secret Manager でアクセス制御を構成する包括的なガイドについては、アクセス制御のドキュメントをご覧ください。

Airflow の BigQuery 接続

Cloud Composer の設定を収集するに従って、接続 sfdc_cdc_bq を作成してください。この接続は、Salesforce-to-BigQuery モジュールが BigQuery との通信を確立するために使用される可能性があります。

次のステップ