BigQuery への FHIR リソースの変更のストリーミング

このページでは、FHIR リソースが作成、更新、パッチ適用、または削除されるたびに FHIR リソースを自動的に BigQuery テーブルにエクスポートするように FHIR ストアを構成する方法について説明します。このプロセスは BigQuery ストリーミングと呼ばれます。

BigQuery ストリーミングを使用すると、次のことができます。

  • ほぼリアルタイムで BigQuery データセットを使用して FHIR ストアのデータを同期できます。
  • 分析のたびに BigQuery にエクスポートしなくても、FHIR データに対して複雑なクエリを実行できます。

クエリのパフォーマンスを向上させ、コストを削減するには、BigQuery ストリーミングをパーティション分割テーブルに構成します。手順については、パーティション分割テーブルに FHIR リソースをストリーミングするをご覧ください。

始める前に

エクスポート プロセスの仕組みについては、BigQuery への FHIR リソースのエクスポートをご覧ください。

制限事項

Cloud Storage から FHIR リソースをインポートする場合、変更は BigQuery にストリーミングされません。

BigQuery の権限を設定する

BigQuery ストリーミングを有効にするには、Cloud Healthcare Service Agentサービス アカウントに追加の権限を付与する必要があります。詳しくは、FHIR ストアの BigQuery 権限をご覧ください。

FHIR ストアで BigQuery ストリーミングを構成する

BigQuery ストリーミングを有効にするには、FHIR ストアで StreamConfigs オブジェクトを構成します。StreamConfigs では、BigQuery ストリーミングを適用する FHIR リソースのタイプを制御するように resourceTypes[] 配列を構成できます。resourceTypes[] を指定しない場合、BigQuery ストリーミングはすべての FHIR リソースタイプに適用されます。

BigQueryDestination など、StreamConfigs で使用できる他の構成の説明については、FHIR リソースのエクスポートをご覧ください。

次のサンプルは、既存の FHIR ストアで BigQuery ストリーミングを有効にする方法を示しています。

コンソール

Google Cloud コンソールを使用して既存の FHIR ストアで BigQuery ストリーミングを構成するには、次の手順を行います。

  1. Google Cloud コンソールで、[データセット] ページに移動します。

    [データセット] に移動

  2. 編集する FHIR ストアを含むデータセットを選択します。

  3. [データストア] リストで、編集する FHIR ストアをクリックします。

  4. [BigQuery ストリーミング] セクションで、次の操作を行います。

    1. [新しいストリーミング構成を追加] をクリックします。
    2. [新しいストリーミング構成] セクションで [参照] をクリックし、変更された FHIR リソースをストリーミングする BigQuery データセットを選択します。
    3. [スキーマタイプ] プルダウンで、BigQuery テーブルの出力スキーマを選択します。次のスキーマを使用できます。
      • アナリティクスSQL on FHIR ドキュメントに基づくスキーマ。BigQuery ではテーブルあたり 10,000 列しか許容されないため、Parameters.parameter.resourceBundle.entry.resourceBundle.entry.response.outcome の各フィールドに対してはスキーマは生成されません。
      • アナリティクス V2。次のサポートが追加されている、アナリティクス スキーマと類似したスキーマ。アナリティクス V2 スキーマは、アナリティクス スキーマよりも多く宛先テーブルの領域を使用します。
    4. [Recursive Structure Depth] スライダーで深度レベルを選択して、出力スキーマ内のすべての再帰構造の深度を設定します。デフォルトでは、再帰的深度は 2 です。
    5. [FHIR リソースタイプを選択する] リストで、ストリーミングするリソースタイプを選択します。
  5. [完了] をクリックしてストリーミング構成を保存します。

gcloud

gcloud CLI では、この操作はサポートされていません。代わりに、Google Cloud コンソール、curl、PowerShell、または設定言語を使用してください。

REST

既存の FHIR ストアで BigQuery ストリーミングを構成するには、projects.locations.datasets.fhirStores.patch メソッドを使用します。

次のサンプルでは、resourceTypes[] 配列が指定されていないため、すべての FHIR リソースタイプで BigQuery ストリーミングが有効になっています。

リクエストのデータを使用する前に、次のように置き換えます。

  • PROJECT_ID: Google Cloud プロジェクトの ID
  • LOCATION: データセットの場所
  • DATASET_ID: FHIR ストアの親データセット
  • FHIR_STORE_ID: FHIR ストア ID
  • BIGQUERY_DATASET_ID: FHIR リソースの変更をストリーミングする既存の BigQuery データセットの名前
  • SCHEMA_TYPE: SchemaType enum の値。次の値のいずれかを使用できます。
    • ANALYTICS: SQL on FHIR ドキュメントに基づくスキーマ。BigQuery ではテーブルあたり 10,000 列しか許容されないため、Parameters.parameter.resourceBundle.entry.resourceBundle.entry.response.outcome の各フィールドに対してはスキーマは生成されません。
    • ANALYTICS_V2ANALYTICS と類似したスキーマで、次のサポートが追加されています。

      ANALYTICS_V2 は、宛先テーブル内のスペースを ANALYTICS より多く使用します。

      .
  • WRITE_DISPOSITION: WriteDisposition enum の値。次の値のいずれかを使用できます。
    • WRITE_EMPTY: 宛先 BigQuery テーブルが空の場合にのみ、データをエクスポートします。
    • WRITE_TRUNCATE: FHIR リソースを書き込む前に、BigQuery テーブル内の既存のデータをすべて消去します。
    • WRITE_APPEND: 宛先 BigQuery テーブルにデータを追加します。

JSON 本文のリクエスト:

{
  "streamConfigs": [
    {
      "bigqueryDestination": {
        "datasetUri": "bq://PROJECT_ID.BIGQUERY_DATASET_ID",
        "schemaConfig": {
          "schemaType": "SCHEMA_TYPE",
        },
        "writeDisposition": "WRITE_DISPOSITION"
      }
    }
  ]
}

リクエストを送信するには、次のいずれかのオプションを選択します。

curl

リクエスト本文を request.json という名前のファイルに保存します。ターミナルで次のコマンドを実行して、このファイルを現在のディレクトリに作成または上書きします。

cat > request.json << 'EOF'
{
  "streamConfigs": [
    {
      "bigqueryDestination": {
        "datasetUri": "bq://PROJECT_ID.BIGQUERY_DATASET_ID",
        "schemaConfig": {
          "schemaType": "SCHEMA_TYPE",
        },
        "writeDisposition": "WRITE_DISPOSITION"
      }
    }
  ]
}
EOF

その後、次のコマンドを実行して REST リクエストを送信します。

curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://healthcare.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/datasets/DATASET_ID/fhirStores/FHIR_STORE_ID?updateMask=streamConfigs"

PowerShell

リクエスト本文を request.json という名前のファイルに保存します。ターミナルで次のコマンドを実行して、このファイルを現在のディレクトリに作成または上書きします。

@'
{
  "streamConfigs": [
    {
      "bigqueryDestination": {
        "datasetUri": "bq://PROJECT_ID.BIGQUERY_DATASET_ID",
        "schemaConfig": {
          "schemaType": "SCHEMA_TYPE",
        },
        "writeDisposition": "WRITE_DISPOSITION"
      }
    }
  ]
}
'@  | Out-File -FilePath request.json -Encoding utf8

その後、次のコマンドを実行して REST リクエストを送信します。

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method PATCH `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://healthcare.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/datasets/DATASET_ID/fhirStores/FHIR_STORE_ID?updateMask=streamConfigs" | Select-Object -Expand Content

API Explorer

リクエスト本文をコピーして、メソッド リファレンス ページを開きます。ページの右側に [API Explorer] パネルが開きます。このツールを操作してリクエストを送信できます。このツールにリクエスト本文を貼り付け、その他の必須フィールドに入力して、[Execute] をクリックします。

次のような JSON レスポンスが返されます。

FhirStore リソースでフィールドを構成した場合は、そのフィールドもレスポンスに表示されます。

デフォルトでは、FHIR リソースの変更を BigQuery にストリーミングすると、ストリーミングされるリソースごとにビューが作成されます。ビューには次のプロパティが配置されています。

  • BigQuery データセット内のリソースおよびリソースのテーブルと同じ名前です。たとえば、患者リソースをストリーミングすると、Patientview という名前のビューを含む Patient という名前のテーブルが作成されます。
  • 過去のすべてのバージョンではなく、現在のバージョンのみが含まれます。

パーティション分割テーブルへの FHIR リソースのストリーミング

FHIR リソースを BigQuery パーティション分割テーブルにエクスポートするには、FHIR ストアで lastUpdatedPartitionConfig フィールドに TimePartitioning 列挙型を構成します。

パーティション分割テーブルは、BigQuery の時間単位のパーティション分割テーブルと同様に機能します。パーティション分割テーブルには lastUpdated という名前の列が追加されます。これは、FHIR リソースの meta.lastUpdated フィールドから生成された meta.lastUpdated 列のコピーです。BigQuery は、lastUpdated 列を使用して、時間、日、月、年でテーブルをパーティション分割します。

パーティションの粒度を選択する方法に関する推奨事項については、日単位、時間単位、月単位、年単位によるパーティショニングを選択するをご覧ください。

パーティション分割されていない既存の BigQuery テーブルをパーティション分割テーブルに変換することはできません。患者リソースの変更をパーティション分割されていない Patients テーブルにエクスポートし、後で同じ BigQuery データセットにエクスポートするテーブル パーティショニングを設定した新しい FHIR ストアを作成した場合、Cloud Healthcare API は引き続きパーティション分割されていない Patients テーブルにエクスポートします。パーティション分割テーブルの使用を開始するには、既存の Patients テーブルを削除するか、異なる BigQuery データセットを使用します。

既存の FHIR ストア構成にパーティショニングを追加しても、既存のパーティション分割されていないテーブルに依然としてエクスポートできます。ただし、パーティショニングは新しいテーブルに対してのみ有効です。

次のサンプルは、既存の FHIR ストアのパーティション分割テーブルへの BigQuery ストリーミングを有効にする方法を示しています。

コンソール

Google Cloud コンソールと gcloud CLI では、この操作はサポートされていません。代わりに、curl、PowerShell、またはご希望の言語を使用してください。

gcloud

Google Cloud コンソールと gcloud CLI では、この操作はサポートされていません。代わりに、curl、PowerShell、またはご希望の言語を使用してください。

REST

既存の FHIR ストア上のパーティション分割テーブルへの BigQuery ストリーミングを構成するには、projects.locations.datasets.fhirStores.patch メソッドを使用します。

リクエストのデータを使用する前に、次のように置き換えます。

  • PROJECT_ID: Google Cloud プロジェクトの ID
  • LOCATION: データセットの場所
  • DATASET_ID: FHIR ストアの親データセット
  • FHIR_STORE_ID: FHIR ストア ID
  • BIGQUERY_DATASET_ID: FHIR リソースの変更をストリーミングする既存の BigQuery データセットの名前
  • SCHEMA_TYPE: SchemaType enum の値。次の値のいずれかを使用できます。
    • ANALYTICS: SQL on FHIR ドキュメントに基づくスキーマ。BigQuery ではテーブルあたり 10,000 列しか許容されないため、Parameters.parameter.resourceBundle.entry.resourceBundle.entry.response.outcome の各フィールドに対してはスキーマは生成されません。
    • ANALYTICS_V2ANALYTICS と類似したスキーマで、次のサポートが追加されています。

      ANALYTICS_V2 は、宛先テーブル内のスペースを ANALYTICS より多く使用します。

      .
  • TIME_PARTITION_TYPE: エクスポートされた FHIR リソースを分割する粒度。次の値のいずれかを使用できます。
    • HOUR: 時間単位でデータを分割する
    • DAY: 日付単位でデータを分割する
    • MONTH: 月単位でデータを分割する
    • YEAR: 年単位でデータを分割する
  • WRITE_DISPOSITION: WriteDisposition enum の値。次の値のいずれかを使用できます。
    • WRITE_EMPTY: 宛先 BigQuery テーブルが空の場合にのみ、データをエクスポートします。
    • WRITE_TRUNCATE: FHIR リソースを書き込む前に、BigQuery テーブル内の既存のデータをすべて消去します。
    • WRITE_APPEND: 宛先 BigQuery テーブルにデータを追加します。

JSON 本文のリクエスト:

{
  "streamConfigs": [
    {
      "bigqueryDestination": {
        "datasetUri": "bq://PROJECT_ID.BIGQUERY_DATASET_ID",
        "schemaConfig": {
          "schemaType": "SCHEMA_TYPE",
          "lastUpdatedPartitionConfig": {
            "type": "TIME_PARTITION_TYPE"
          }
        },
        "writeDisposition": "WRITE_DISPOSITION"
      }
    }
  ]
}

リクエストを送信するには、次のいずれかのオプションを選択します。

curl

リクエスト本文を request.json という名前のファイルに保存します。ターミナルで次のコマンドを実行して、このファイルを現在のディレクトリに作成または上書きします。

cat > request.json << 'EOF'
{
  "streamConfigs": [
    {
      "bigqueryDestination": {
        "datasetUri": "bq://PROJECT_ID.BIGQUERY_DATASET_ID",
        "schemaConfig": {
          "schemaType": "SCHEMA_TYPE",
          "lastUpdatedPartitionConfig": {
            "type": "TIME_PARTITION_TYPE"
          }
        },
        "writeDisposition": "WRITE_DISPOSITION"
      }
    }
  ]
}
EOF

その後、次のコマンドを実行して REST リクエストを送信します。

curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://healthcare.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/datasets/DATASET_ID/fhirStores/FHIR_STORE_ID?updateMask=streamConfigs"

PowerShell

リクエスト本文を request.json という名前のファイルに保存します。ターミナルで次のコマンドを実行して、このファイルを現在のディレクトリに作成または上書きします。

@'
{
  "streamConfigs": [
    {
      "bigqueryDestination": {
        "datasetUri": "bq://PROJECT_ID.BIGQUERY_DATASET_ID",
        "schemaConfig": {
          "schemaType": "SCHEMA_TYPE",
          "lastUpdatedPartitionConfig": {
            "type": "TIME_PARTITION_TYPE"
          }
        },
        "writeDisposition": "WRITE_DISPOSITION"
      }
    }
  ]
}
'@  | Out-File -FilePath request.json -Encoding utf8

その後、次のコマンドを実行して REST リクエストを送信します。

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method PATCH `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://healthcare.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/datasets/DATASET_ID/fhirStores/FHIR_STORE_ID?updateMask=streamConfigs" | Select-Object -Expand Content

API Explorer

リクエスト本文をコピーして、メソッド リファレンス ページを開きます。ページの右側に [API Explorer] パネルが開きます。このツールを操作してリクエストを送信できます。このツールにリクエスト本文を貼り付け、その他の必須フィールドに入力して、[Execute] をクリックします。

次のような JSON レスポンスが返されます。

パーティション分割テーブルをクエリする

パーティション分割テーブルに対するクエリ実行時のクエリ費用を削減するには、WHERE 句を使用して時間単位でフィルタします。

たとえば、PartitionType 列挙型を DAY に設定するとします。特定の日付に変更された Patient リソースの Patients テーブルをクエリするには、次のクエリを実行します。

SELECT * FROM `PROJECT_ID.BIGQUERY_DATASET.Patients`
  WHERE DATE(lastUpdated) = 'YYYY-MM-DD'

アナリティクスからアナリティクス V2 への移行

次のような方法では、既存の BigQuery データセットを Analytics スキーマから Analytics V2 スキーマに移行できません。

これは、Analytics スキーマの FHIR 拡張機能の BigQuery テーブルの列のモードが NULLABLE に設定され、Analytics V2 スキーマのモードが REPEATED に設定されているためです。BigQuery では、列のモードを NULLABLE から REPEATED に変更できません。そのため、2 つのスキーマタイプには互換性がありません。

エクスポートされた FHIR リソースのスキーマタイプを Analytics から Analytics V2 に移行するには、更新されたスキーマタイプを持つ新しいストリーミング構成を使用して、FHIR リソースを新しい BigQuery データセットにエクスポートする必要があります。次の手順を実施します。

  1. 新しい BigQuery データセットを作成します

  2. BigQuery データセットの権限を設定します

  3. スキーマタイプを Analytics V2 に設定して、新しいストリーミング構成を FHIR ストアに追加します。

  4. 次の設定を使用して既存の FHIR データをエクスポートすることで、既存のデータをバックフィルします。Google Cloud コンソール、Google Cloud CLI、または REST API を使用してこれらの設定を構成する方法については、FHIR リソースのエクスポートをご覧ください。REST API には次の設定が適用されます。

元の BigQuery データセット内の FHIR リソースの一部またはすべてに対応する BigQuery のビューが、新しいデータセットに含まれていない可能性があります。この問題を解決するには、FHIR リソースビューの作成が不明をご覧ください。

FHIR ストリーミングのトラブルシューティング

リソースの変更が BigQuery に送信されたときにエラーが発生した場合、エラーは Cloud Logging に記録されます。詳細については、Cloud Logging でのエラーログの表示をご覧ください。

列を NULLABLE から REPEATED に変換できません

このエラーは、拡張子の繰り返しが原因で発生します。このエラーを解決するには、ANALYTICS_V2 スキーマタイプを使用します。すでに ANALYTICS_V2 を使用している場合、2 つの拡張機能間、または拡張機能と別のフィールドの間で競合が発生している可能性があります。

列名は、拡張 URL の最後の / 文字の後のテキストから生成されます。拡張機能の URL の末尾が /resource_field name のような値の場合、競合が発生する可能性があります。

このエラーの再発を防ぐため、フィールド名が入力しているリソース フィールドと同じ場合は拡張機能を使用しないでください。

FHIR リソースビューの作成が不明

FHIR リソースをストリーミングする前に、FHIR リソースを BigQuery に一括エクスポートすると、BigQuery は FHIR リソースのビューを作成しません。

たとえば、次の状況では、Encounter リソースのビューが表示されないことがあります。

  1. FHIR ストアで BigQuery ストリーミングを構成し、REST API を使用して患者リソースを作成します。

    BigQuery は、患者リソースのテーブルとビューを作成します。

  2. Encounter リソースを、前の手順と同じ BigQuery データセットに一括エクスポートします。

    BigQuery が Encounter リソースのテーブルを作成します。

  3. REST API を使用して Encounter リソースを作成します。

    この手順後に、Encounter リソースの BigQuery ビューは作成されません。

この問題を解決するには、次のクエリを使用してビューを作成します。

SELECT
    * EXCEPT (_resource_row_id)
FROM (
  SELECT
    ROW_NUMBER() OVER(PARTITION BY id ORDER BY meta.lastUpdated DESC) as _resource_row_id,
    *
    FROM `PROJECT_ID.BIGQUERY_DATASET_ID.RESOURCE_TABLE` AS p
) AS p
WHERE
  p._resource_row_id=1
  AND
  NOT EXISTS (
  SELECT
    *
  FROM
    UNNEST(p.meta.tag)
  WHERE
    code = 'DELETE');

次のように置き換えます。

  • PROJECT_ID: Google Cloud プロジェクトの ID。
  • BIGQUERY_DATASET_ID: FHIR リソースを一括エクスポートした BigQuery データセットの ID
  • RESOURCE_TABLE: ビューを作成する FHIR リソースに対応するテーブルの名前

ビューを作成した後も FHIR リソースに変更をストリーミングし続けることができ、それに応じてビューを更新できます。

次のステップ

FHIR リソースの変更をストリーミングするユースケースのチュートリアルについては、FHIR リソースを BigQuery にストリーミングして同期するをご覧ください。