将 FHIR 资源更改流式传输至 BigQuery

本页面介绍了如何配置 FHIR 存储区,以便在每次创建、更新、修补或删除 FHIR 资源时自动将 FHIR 资源导出到 BigQuery 表。此过程称为 BigQuery 流式处理

您可以使用 BigQuery 流式传输执行以下操作:

  • 近乎实时地将 FHIR 存储区中的数据与 BigQuery 数据集同步。
  • 您可以对 FHIR 数据执行复杂查询,而无需在每次想要分析数据时将其导出到 BigQuery。

为提高查询性能并降低费用,您可以将 BigQuery 流式传输配置为分区表。如需了解相关说明,请参阅将 FHIR 资源流式传输到分区表

准备工作

请参阅将 FHIR 资源导出到 BigQuery,了解导出过程的工作原理。

限制

如果您从 Cloud Storage 导入 FHIR 资源,这些更改不会流式传输到 BigQuery。

设置 BigQuery 权限

如需启用 BigQuery 流式传输,您必须向 Cloud Healthcare Service Agent 服务帐号授予其他权限。如需了解详情,请参阅FHIR 存储区 BigQuery 权限

在 FHIR 存储区中配置 BigQuery 流式传输

如需启用 BigQuery 流式传输,请在 FHIR 存储区中配置 StreamConfigs 对象。在 StreamConfigs 中,您可以配置 resourceTypes[] 数组来控制 BigQuery 流式传输所适用的 FHIR 资源类型。如果您未指定 resourceTypes[],则 BigQuery 流式传输会应用于所有 FHIR 资源类型。

如需了解 StreamConfigs 中提供的其他配置(例如 BigQueryDestination),请参阅导出 FHIR 资源

以下示例展示了如何在现有 FHIR 存储区中启用 BigQuery 流式传输。

控制台

如需使用 Google Cloud 控制台在现有 FHIR 存储区中配置 BigQuery 流式传输,请完成以下步骤:

  1. 在 Google Cloud 控制台中,进入数据集页面。

    前往“数据集”页面

  2. 选择包含您要修改的 FHIR 存储区的数据集。

  3. 数据存储区列表中,点击要修改的 FHIR 存储区。

  4. BigQuery 流式传输部分,完成以下步骤:

    1. 点击添加新的流式传输配置
    2. 新建流式传输配置部分,点击浏览以选择要流式传输的 FHIR 资源的 BigQuery 数据集。
    3. 架构类型下拉列表中,选择 BigQuery 表的输出架构。您可以使用以下架构:
      • Analytics(分析)。一个基于在 FHIR 上构建 SQL 文档的架构。由于 BigQuery 仅允许每个表包含 10,000 列,因此不会为 Parameters.parameter.resourceBundle.entry.resourceBundle.entry.response.outcome 字段生成架构。
      • Google Analytics(分析)V2。一个类似于 Google Analytics(分析)架构的架构,增加了对以下内容的支持:
    4. Recursive Structure Depth 滑块中选择深度,以设置输出架构中所有递归结构的深度。默认情况下,递归值为 2。
    5. 选择 FHIR 资源类型列表中,选择要流式传输的资源类型。
  5. 点击完成以保存流式传输配置。

gcloud

gcloud CLI 不支持此操作。请改用 Google Cloud 控制台、curl、 PowerShell 或您的首选语言。

REST

如需在现有 FHIR 存储区中配置 BigQuery 流式传输,请使用 projects.locations.datasets.fhirStores.patch 方法。

以下示例未指定 resourceTypes[] 数组,因此为所有 FHIR 资源类型启用了 BigQuery 流式传输。

在使用任何请求数据之前,请先进行以下替换:

  • PROJECT_ID:您的 Google Cloud 项目的 ID
  • LOCATION:数据集位置
  • DATASET_ID:FHIR 存储区的父数据集
  • FHIR_STORE_ID:FHIR 存储区 ID
  • BIGQUERY_DATASET_ID:您要流式传输 FHIR 资源更改的现有 BigQuery 数据集的名称
  • SCHEMA_TYPESchemaType 枚举的值。请使用以下某个值:
    • ANALYTICS - 基于 SQL on FHIR 文档的架构。由于 BigQuery 仅允许每个表包含 10,000 列,因此不会为 Parameters.parameter.resourceBundle.entry.resourceBundle.entry.response.outcome 字段生成架构。
    • ANALYTICS_V2 类似于 ANALYTICS 的架构,添加了对以下内容的支持:

      ANALYTICS_V2”在目标表中占用的空间比 ANALYTICS

      .
  • WRITE_DISPOSITIONWriteDisposition 枚举的值。请使用以下某个值:
    • WRITE_EMPTY。仅在目标 BigQuery 表为空时才导出数据。
    • WRITE_TRUNCATE。请先清除 BigQuery 表中的所有现有数据,然后再写入 FHIR 资源。
    • WRITE_APPEND。将数据附加到目标 BigQuery 表。

请求 JSON 正文:

{
  "streamConfigs": [
    {
      "bigqueryDestination": {
        "datasetUri": "bq://PROJECT_ID.BIGQUERY_DATASET_ID",
        "schemaConfig": {
          "schemaType": "SCHEMA_TYPE",
        },
        "writeDisposition": "WRITE_DISPOSITION"
      }
    }
  ]
}

如需发送请求,请选择以下方式之一:

curl

将请求正文保存在名为 request.json 的文件中。在终端中运行以下命令,在当前目录中创建或覆盖此文件:

cat > request.json << 'EOF'
{
  "streamConfigs": [
    {
      "bigqueryDestination": {
        "datasetUri": "bq://PROJECT_ID.BIGQUERY_DATASET_ID",
        "schemaConfig": {
          "schemaType": "SCHEMA_TYPE",
        },
        "writeDisposition": "WRITE_DISPOSITION"
      }
    }
  ]
}
EOF

然后,执行以下命令以发送 REST 请求:

curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://healthcare.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/datasets/DATASET_ID/fhirStores/FHIR_STORE_ID?updateMask=streamConfigs"

PowerShell

将请求正文保存在名为 request.json 的文件中。在终端中运行以下命令,在当前目录中创建或覆盖此文件:

@'
{
  "streamConfigs": [
    {
      "bigqueryDestination": {
        "datasetUri": "bq://PROJECT_ID.BIGQUERY_DATASET_ID",
        "schemaConfig": {
          "schemaType": "SCHEMA_TYPE",
        },
        "writeDisposition": "WRITE_DISPOSITION"
      }
    }
  ]
}
'@  | Out-File -FilePath request.json -Encoding utf8

然后,执行以下命令以发送 REST 请求:

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method PATCH `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://healthcare.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/datasets/DATASET_ID/fhirStores/FHIR_STORE_ID?updateMask=streamConfigs" | Select-Object -Expand Content

API Explorer

复制请求正文并打开方法参考页面。API Explorer 面板会在页面右侧打开。您可以与此工具进行交互以发送请求。将请求正文粘贴到此工具中,填写任何其他必填字段,然后点击执行

您应该会收到类似于以下内容的 JSON 响应。

如果您在 FhirStore 资源中配置了任何字段,它们也会在响应中显示。

默认情况下,当您将 FHIR 资源更改流式传输到 BigQuery 时,系统会为流式传输的每个资源创建一个视图。该视图具有以下属性:

  • 它与资源以及 BigQuery 数据集中的资源表同名。例如,当您流式传输患者资源时,系统会创建一个名为 Patient 的表,其中包含一个名为 Patientview 的视图。
  • 它仅包含资源的当前版本,而不是所有历史版本。

将 FHIR 资源流式传输到分区表

如需将 FHIR 资源导出到 BigQuery 分区表,请在 FHIR 存储区的 lastUpdatedPartitionConfig 字段中设置 TimePartitioning 枚举。

分区表的工作原理类似于 BigQuery 时间单位分区表。分区表新增了一个名为 lastUpdated 的列,它与 FHIR 资源中的 meta.lastUpdated 字段生成的 meta.lastUpdated 列重复。BigQuery 使用 lastUpdated 列按小时、天、月或年对表进行分区。

如需了解有关如何选择分区粒度的建议,请参阅选择每日、每小时、每月或每年分区

您无法将现有的非分区 BigQuery 表转换为分区表。如果您将患者资源更改导出到非分区的 Patients 表,随后创建了具有表分区的新 FHIR 存储区,并且该分区会导出到同一 BigQuery 数据集,则 Cloud Healthcare API 仍会将数据导出到非分区的 Patients 表。如需开始使用分区表,请删除现有 Patients 表或使用其他 BigQuery 数据集。

如果您向现有 FHIR 存储区配置添加分区,仍然可以导出到现有的非分区表。但是,分区仅对新表生效。

以下示例展示了如何将 BigQuery 流式传输到现有 FHIR 存储区中的分区表。

控制台

Google Cloud 控制台和 gcloud CLI 不支持此操作。请改用 curl、 PowerShell 或您的首选语言。

gcloud

Google Cloud 控制台和 gcloud CLI 不支持此操作。请改用 curl、 PowerShell 或您的首选语言。

REST

如需配置 BigQuery 流式传输到现有 FHIR 存储区中的分区表,请使用 projects.locations.datasets.fhirStores.patch 方法。

在使用任何请求数据之前,请先进行以下替换:

  • PROJECT_ID:您的 Google Cloud 项目的 ID
  • LOCATION:数据集位置
  • DATASET_ID:FHIR 存储区的父数据集
  • FHIR_STORE_ID:FHIR 存储区 ID
  • BIGQUERY_DATASET_ID:您要流式传输 FHIR 资源更改的现有 BigQuery 数据集的名称
  • SCHEMA_TYPESchemaType 枚举的值。请使用以下某个值:
    • ANALYTICS - 基于 SQL on FHIR 文档的架构。由于 BigQuery 仅允许每个表包含 10,000 列,因此不会为 Parameters.parameter.resourceBundle.entry.resourceBundle.entry.response.outcome 字段生成架构。
    • ANALYTICS_V2 类似于 ANALYTICS 的架构,添加了对以下内容的支持:

      ANALYTICS_V2”在目标表中占用的空间比 ANALYTICS

      .
  • TIME_PARTITION_TYPE:对导出的 FHIR 资源进行分区的粒度。请使用以下某个值:
    • HOUR:按小时分区数据
    • DAY:按天分区数据
    • MONTH:按月分区数据
    • YEAR:按年分区数据
  • WRITE_DISPOSITIONWriteDisposition 枚举的值。请使用以下某个值:
    • WRITE_EMPTY。仅在目标 BigQuery 表为空时才导出数据。
    • WRITE_TRUNCATE。请先清除 BigQuery 表中的所有现有数据,然后再写入 FHIR 资源。
    • WRITE_APPEND。将数据附加到目标 BigQuery 表。

请求 JSON 正文:

{
  "streamConfigs": [
    {
      "bigqueryDestination": {
        "datasetUri": "bq://PROJECT_ID.BIGQUERY_DATASET_ID",
        "schemaConfig": {
          "schemaType": "SCHEMA_TYPE",
          "lastUpdatedPartitionConfig": {
            "type": "TIME_PARTITION_TYPE"
          }
        },
        "writeDisposition": "WRITE_DISPOSITION"
      }
    }
  ]
}

如需发送请求,请选择以下方式之一:

curl

将请求正文保存在名为 request.json 的文件中。在终端中运行以下命令,在当前目录中创建或覆盖此文件:

cat > request.json << 'EOF'
{
  "streamConfigs": [
    {
      "bigqueryDestination": {
        "datasetUri": "bq://PROJECT_ID.BIGQUERY_DATASET_ID",
        "schemaConfig": {
          "schemaType": "SCHEMA_TYPE",
          "lastUpdatedPartitionConfig": {
            "type": "TIME_PARTITION_TYPE"
          }
        },
        "writeDisposition": "WRITE_DISPOSITION"
      }
    }
  ]
}
EOF

然后,执行以下命令以发送 REST 请求:

curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://healthcare.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/datasets/DATASET_ID/fhirStores/FHIR_STORE_ID?updateMask=streamConfigs"

PowerShell

将请求正文保存在名为 request.json 的文件中。在终端中运行以下命令,在当前目录中创建或覆盖此文件:

@'
{
  "streamConfigs": [
    {
      "bigqueryDestination": {
        "datasetUri": "bq://PROJECT_ID.BIGQUERY_DATASET_ID",
        "schemaConfig": {
          "schemaType": "SCHEMA_TYPE",
          "lastUpdatedPartitionConfig": {
            "type": "TIME_PARTITION_TYPE"
          }
        },
        "writeDisposition": "WRITE_DISPOSITION"
      }
    }
  ]
}
'@  | Out-File -FilePath request.json -Encoding utf8

然后,执行以下命令以发送 REST 请求:

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method PATCH `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://healthcare.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/datasets/DATASET_ID/fhirStores/FHIR_STORE_ID?updateMask=streamConfigs" | Select-Object -Expand Content

API Explorer

复制请求正文并打开方法参考页面。API Explorer 面板会在页面右侧打开。您可以与此工具进行交互以发送请求。将请求正文粘贴到此工具中,填写任何其他必填字段,然后点击执行

您应该收到类似以下内容的 JSON 响应:

查询分区表

如需降低查询分区表时的查询费用,请使用 WHERE 子句按时间单位过滤。

例如,假设您将 PartitionType 枚举设置为 DAY。如需查询 Patients 表以获取在特定日期发生更改的患者资源,请运行以下查询:

SELECT * FROM `PROJECT_ID.BIGQUERY_DATASET.Patients`
  WHERE DATE(lastUpdated) = 'YYYY-MM-DD'

从 Google Analytics(分析)迁移到 Google Analytics(分析)V2

您无法使用任何方法将现有 BigQuery 数据集从 Analytics 架构迁移到 Analytics V2 架构,其中包括:

这是因为 Analytics 架构中 FHIR 扩展程序的 BigQuery 表列的模式设置为 NULLABLE,而 Analytics V2 架构中的列设置为 REPEATED。BigQuery 不允许将列的模式从 NULLABLE 更改为 REPEATED。因此,这两种架构类型不兼容。

如需将导出的 FHIR 资源的架构类型从 Analytics 迁移到 Analytics V2,您必须使用具有更新架构类型的新流式传输配置将 FHIR 资源导出到新的 BigQuery 数据集。为此,请执行以下步骤:

  1. 创建新的 BigQuery 数据集

  2. 设置 BigQuery 数据集的权限

  3. 将新的流式传输配置添加到 FHIR 存储区,并将架构类型设置为 Analytics V2

  4. 使用以下设置导出现有 FHIR 数据以回填现有数据。请参阅导出 FHIR 资源,了解如何使用 Google Cloud 控制台、Google Cloud CLI 或 REST API 配置这些设置。以下设置适用于 REST API:

BigQuery 中与原始 BigQuery 数据集内的部分或所有 FHIR 资源对应的视图可能缺少新数据集。如需排查此问题,请参阅缺少 FHIR 资源视图创建

排查 FHIR 流式传输问题

如果在将资源更改发送到 BigQuery 时发生错误,系统会将错误记录到 Cloud Logging。如需了解详情,请参阅在 Cloud Logging 中查看错误日志

缺失 FHIR 资源视图创建

如果您在流式传输该 FHIR 资源之前,将该资源批量导出到 BigQuery,则 BigQuery 不会为该 FHIR 资源创建视图。

例如,在以下情况下,您可能会看到“就诊”资源的任何视图:

  1. 您可以在 FHIR 存储区上配置 BigQuery 流式传输,然后使用 REST API 创建一个“患者”资源。

    BigQuery 会为该“患者”资源创建表和视图。

  2. 将“就诊”资源批量导出到与上一步相同的 BigQuery 数据集中。

    BigQuery 会为该“就诊”资源创建表。

  3. 您可以使用 REST API 创建“就诊”资源。

    完成此步骤后,系统不会为“就诊”资源创建 BigQuery 视图。

如需解决此问题,请使用以下查询来创建视图

SELECT
    * EXCEPT (_resource_row_id)
FROM (
  SELECT
    ROW_NUMBER() OVER(PARTITION BY id ORDER BY meta.lastUpdated DESC) as _resource_row_id,
    *
    FROM `PROJECT_ID.BIGQUERY_DATASET_ID.RESOURCE_TABLE` AS p
) AS p
WHERE
  p._resource_row_id=1
  AND
  NOT EXISTS (
  SELECT
    *
  FROM
    UNNEST(p.meta.tag)
  WHERE
    code = 'DELETE');

请替换以下内容:

  • PROJECT_ID:您的 Google Cloud 项目的 ID
  • BIGQUERY_DATASET_ID:您用于批量导出 FHIR 资源的 BigQuery 数据集的 ID
  • RESOURCE_TABLE:您要为其创建视图的 FHIR 资源对应的表名称

创建视图后,您可以继续流式传输针对 FHIR 资源的更改,并相应地更新视图。

后续步骤

有关如何流式传输 FHIR 资源更改的用例的教程,请参阅使用 BigQuery 流式传输并同步 FHIR 资源