使用 DataStream API

Datastream

概览

对于拥有许多独立数据源的企业而言,访问整个组织内的企业数据(尤其是实时访问)并非易事。这会导致数据访问受限且速度缓慢,因而造成组织无法进行检查。

Datastream 支持近乎实时地访问来自各种本地和云端数据源的数据,从而创建对组织数据的访问权限。Datastream 提供简单的设置体验和统一的使用 API,使组织能够更广泛地访问整个组织内可用的最新企业数据,从而为集成式近乎实时的场景提供支持。

其中一种场景是,将数据从来源数据库转移到云端存储服务或消息传递队列,然后将这些数据转换为可供与该存储服务或消息传递队列通信的其他应用和服务读取的形式。

在本教程中,您将学习如何使用 Datastream 将架构、表和数据从源 Oracle 数据库转移到 Cloud Storage 存储桶中的文件夹。Cloud Storage 是一种用于在 Google Cloud 上存储和访问数据的 Web 服务。该服务将 Google 云的高性能和可扩展性与先进的安全和共享功能融合在一起。

在将此信息转移到目标 Cloud Storage 存储桶中的文件夹的过程中,Datastream 会将此信息转换为 Avro。Avro 由以 JavaScript 对象表示法 (JSON) 编写的架构定义。通过进行这种转换,您可以采用统一的方式读取不同数据源中的数据。

目标

在本教程中,您将学习如何:

  • 设置环境变量。在您向 Datastream 发出请求来创建和管理连接配置文件数据流时将使用这些变量。
  • 创建和管理用于 Cloud Storage 中源数据库和目标存储桶的连接配置文件。创建这些连接配置文件后,您将创建包含来源数据库和 Cloud Storage 目标存储桶的相关信息的记录。Datastream 中的数据流使用连接配置文件中的信息将数据从源数据库转移到目标存储桶中的文件夹。
  • 创建和管理数据流。Datastream 使用此数据流将数据、架构和表从源数据库转移到目标存储桶中的文件夹。
  • 验证 Datastream 是否将与源 Oracle 数据库的架构关联的数据和表转移到目标存储桶中的文件夹,以及是否将这些数据转换为 Avro 文件格式。
  • 清理您在 Datastream 上创建的资源,以避免这些资源占用配额并在日后产生费用。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

  • Cloud Storage

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. 启用 Datastream API。

    启用 API

  7. 确保您已为您的用户账号分配 Datastream Admin 角色。

    转到 IAM 页面

  8. 确保您有一个 Datastream 可以访问的来源数据库。本教程使用 Oracle 数据库作为来源。
  9. 配置来源数据库以允许来自 Datastream 公共 IP 地址的传入连接。如需访问所有 Datastream 区域的位置及其关联的公共 IP 地址,请参阅 IP 许可名单和区域
  10. 确保您已配置一个目标 Cloud Storage 存储桶,Datastream 可以使用 IP 许可名单、SSH 隧道或 VPC 对等互连网络连接方法对其进行访问。
  11. 确保来源数据库中存在 Datastream 可转移到 Cloud Storage 目标存储桶中的文件夹的数据、表和架构。
  12. 下载并安装 Cloud Shell。此客户端应用可让您通过命令行访问云资源(包括 Datastream)。
  13. 安装并配置 jq 实用程序。此实用程序是一个轻量级且灵活的命令行 JSON 处理器。您将使用此处理器,以简洁易懂的文本来展示复杂的 cURL 命令。

设置环境变量

在此过程中,您将设置以下变量:

  • $PROJECT:此变量与您的 Google Cloud 项目相关联。您分配和使用的任何 Google Cloud 资源都必须属于一个项目。
  • $TOKEN:此变量与访问令牌相关联。访问令牌提供了一个会话,Cloud Shell 可以使用该会话使用 REST API 在 Datastream 中执行任务。
  1. 启动 Cloud Shell 应用。

  2. 使用 Google 帐号在您的应用中进行身份验证后,请输入 gcloud auth login

  3. Do you want to continue (Y/n)? 提示符处,输入 Y

  4. 打开网络浏览器并将网址复制到浏览器中。

  5. 使用您的 Google 帐号向 Google Cloud SDK 进行身份验证。登录页面上会显示验证码。此代码是您的访问令牌。

  6. 复制访问令牌,将其粘贴到 Cloud Shell 应用中的 Enter verification code: 参数中,然后按 Enter

  7. 在提示符处,输入 PROJECT="YOUR_PROJECT_NAME" 以将 $PROJECT 环境变量设置为您的 Google Cloud 项目。

  8. 在提示符处,输入 gcloud config set project YOUR_PROJECT_NAME 以将您希望处理的项目设置为您的 Google Cloud 项目。

    您的命令提示符会进行更新以反映您的活跃项目,并遵循以下格式:USERNAME@cloudshell:~ (YOUR_PROJECT_NAME)$

  9. 在提示符处,输入 TOKEN=$(gcloud auth print-access-token) 以检索访问令牌并将其存储为变量。

  10. 在提示符后输入以下命令,以确保正确设置了 $PROJECT$TOKEN 变量:

    • echo $PROJECT
    • echo $TOKEN

现在您已设置了变量,可以向 Datastream 发出请求来创建和管理连接配置文件和数据流。

创建和管理连接配置文件

在本部分中,您将为源 Oracle 数据库和 Cloud Storage 中的目标存储桶创建和管理连接配置文件。

创建这些连接配置文件后,您将创建包含来源数据库和 Cloud Storage 目标存储桶的相关信息的记录。Datastream 使用连接配置文件中的信息将数据从来源数据库转移到目标存储桶中的文件夹。

创建和管理连接配置文件的操作包括:

  • 为来源 Oracle 数据库和 Cloud Storage 中的目标存储桶创建连接配置文件
  • 检索连接配置文件的相关信息
  • 修改连接配置文件
  • 对来源 Oracle 连接配置文件执行 Discover API 调用。通过此调用,您可以在数据库中查找以查看与其关联的对象。这些对象包含数据库数据所属的架构和表。使用 Datastream 配置数据流时,您可能不希望从数据库中拉取所有对象,而是想要提取对象的一部分(例如,只有数据库的特定表和架构)。使用 Discover API 可帮助您查找(或发现)要拉取的那部分数据库对象。

创建连接配置文件

在此过程中,您将创建两个连接配置文件:一个与来源 Oracle 数据库相关联,另一个与 Cloud Storage 中的目标存储桶相关联。

  1. 为源 Oracle 数据库创建连接配置文件。在提示符处,输入以下命令:
ORACLE="{\"displayName\":\"DISPLAY_NAME\",\"oracle_profile\":{\"hostname\":\"HOSTNAME\",\"username\":\"USERNAME\",\"password\":\"PASSWORD\",\"database_service\":\"DATABASE_SERVICE\",\"port\":"PORT_NUMBER\"},\"no_connectivity\":{}}"
  

下表可以帮助您了解源 Oracle 数据库的参数值:

参数值替换为
DISPLAY_NAME到源数据库的连接配置文件的显示名称。
HOSTNAME源数据库服务器的主机名。
USERNAME来源数据库账号的用户名(例如 ROOT)。
PASSWORD来源数据库账号的密码。
DATABASE_SERVICE用于确保来源数据库受到保护和监控的服务。对于 Oracle 数据库,数据库服务通常为 ORCL
PORT_NUMBER为源数据库预留的端口号。对于 Oracle 数据库,端口号通常为 1521。

  1. 在提示符处,输入 echo $ORACLE | jq 命令以查看以简单易读的文本创建的源连接配置文件。

    {
      "displayName": "DISPLAY_NAME",
      "oracle_profile": {
        "hostname": "HOSTNAME",
        "username": "USERNAME",
        "password": "PASSWORD",
        "database_service": "DATABASE_SERVICE",
        "port": PORT_NUMBER
       },
      "no_connectivity": {}
    }
    
  2. 提交 Oracle 连接配置文件,以便创建该配置文件。在提示符处,输入以下命令:

    curl -X POST -d $ORACLE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles?connection_profile_id=SOURCE_CONNECTION_PROFILE_ID
    

    下表可帮助了解此命令的参数值:

    参数值替换为
    DATASTREAM_API_VERSIONDatastream API 的当前版本(例如 v1)。
    PROJECT_PATHGoogle Cloud 项目的完整路径(例如 projects/$PROJECT/locations/YOUR_PROJECT_LOCATION)。
    SOURCE_CONNECTION_PROFILE_ID为此连接配置文件预留的唯一标识符(例如 cp-1)。
  3. 验证您是否看到以下代码行:

    {
      "name": "PROJECT_PATH/operations/operation-SOURCE_CONNECTION_PROFILE_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "datastream.googleapis.com/DATASREAM_VERSION/PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "verb": "create",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
    
  4. 创建与 Cloud Storage 中目标存储桶相关联的连接配置文件。在提示符处,输入以下命令:

    GOOGLECLOUDSTORAGE="{\"displayName\":\"DISPLAY_NAME\",\"gcs_profile\":{\"bucket_name\":\"BUCKET_NAME\",\"root_path\":\"/FOLDER_PATH\"},\"no_connectivity\":{}}"
    

    下表可帮助您了解目标存储桶的参数值:

    参数值替换为
    DISPLAY_NAME到目标存储桶的连接配置文件的显示名。
    BUCKET_NAME目标存储桶的名称。
    FOLDER_PATHDatastream 将数据从源数据库转移到的目标存储桶中的文件夹(例如 /root/path)。
  5. 在提示符处,输入 echo $GOOGLECLOUDSTORAGE | jq 命令以查看以简单易读的文本创建的目标连接配置文件。

    {
      "displayName": "DISPLAY_NAME",
      "gcs_profile": {
        "bucket_name": "BUCKET_NAME",
        "root_path": "/FOLDER_PATH"
      },
      "no_connectivity": {}
    }
    
  6. 提交 Cloud Storage 连接配置文件,以便其可以创建。在提示符处,输入以下命令:

    curl -X POST -d $GOOGLECLOUDSTORAGE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles?connection_profile_id=DESTINATION_CONNECTION_PROFILE_ID
    
  7. 验证您是否看到以下代码行:

    {
      "name": "PROJECT_PATH/operations/operation-DESTINATION_CONNECTION_PROFILE_OPERATION_ID",
      "metadata": {
        "@type": "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "datastream.googleapis.com/DATASTREAM_VERSION/PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
        "verb": "create",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
    
  8. 确认已创建两个连接配置文件。在提示符处,输入以下命令:

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles
    
  9. 验证您是否收到了源和目标连接配置文件的两个返回结果。

    {
      "connectionProfiles": [
        {
          "name": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
          "createTime": "DATE_AND_TIME_STAMP",
          "updateTime": "DATE_AND_TIME_STAMP",
          "displayName": "DISPLAY_NAME",
          "gcsProfile": {
            "bucketName": "BUCKET_NAME",
            "rootPath": "FOLDER_PATH"
          },
          "noConnectivity": {}
        },
       {
        "name": "PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "createTime": "DATE_AND_TIME_STAMP",
        "updateTime": "DATE_AND_TIME_STAMP",
        "displayName": "DISPLAY_NAME",
        "oracleProfile": {
          "hostname": "HOSTNAME",
          "port": PORT_NUMBER,
          "username": "USERNAME",
          "databaseService": "DATABASE_SERVICE"
        },
        "noConnectivity": {}
        }
      ]
    }
    

管理连接配置文件

在此过程中,您将管理为来源 Oracle 数据库和 Cloud Storage 中的目标存储桶创建的连接配置文件。包括:

  • 检索有关 Cloud Storage 目标连接配置文件的信息
  • 修改此连接配置文件。在本教程中,将 Cloud Storage 目标存储桶的文件夹更改为 /root/tutorial。Datastream 会将数据从来源数据库转移到此文件夹中。
  • 对来源 Oracle 连接配置文件执行 Discover API 调用
  1. 检索目标 Cloud Storage 连接配置文件的相关信息。在提示符处,输入以下命令:

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID
    
  2. 验证您是否看到此连接配置文件的相关信息。

    {
      "name": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
      "createTime": "DATE_AND_TIME_STAMP",
      "updateTime": "DATE_AND_TIME_STAMP",
      "displayName": "DISPLAY_NAME",
      "gcsProfile": {
        "bucketName": "BUCKET_NAME",
        "rootPath": "FOLDER_PATH"
      },
      "noConnectivity": {}
    }
    
  3. 修改此连接配置文件。为此,请先设置 UPDATE 变量。此变量包含要更改的连接配置文件的值。在本教程中,您将目标存储桶的文件夹更改为 /root/tutorial

    如需设置此变量,请在提示符处输入以下命令:

    UPDATE="{\"gcsProfile\":{\"rootPath\":\"/root/tutorial\"}}"
  4. 在提示符处,输入以下命令:

    curl -X PATCH -d $UPDATE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID?update_mask=gcsProfile.rootPath
    
  5. 验证您是否看到以下代码行:

    {
      "name": "PROJECT_PATH/operations/operation-DESTINATION_CONNECTION_PROFILE_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
        "verb": "update",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
    
  6. 确认连接配置文件已修改。在提示符处,输入以下命令:

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID
    
  7. 验证 Cloud Storage 连接配置文件的目标存储桶的文件夹现在是否为 /root/tutorial

    {
      "name": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
      "createTime": "DATE_AND_TIME_STAMP",
      "updateTime": "DATE_AND_TIME_STAMP",
      "displayName": "DISPLAY_NAME",
      "gcsProfile": {
        "bucketName": "BUCKET_NAME",
        "rootPath": "/root/tutorial"
      },
      "noConnectivity": {}
    }
    
  8. 使用 Datastream 发现 API 来发现源 Oracle 数据库的架构和表。Datastream 通过源连接配置文件提供对此数据库的访问权限。

    1. 了解 Oracle 数据库的架构。在提示符处,输入以下命令:

      curl -X POST -d "{\"connection_profile_name\":\"projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID\", \"oracle_rdbms\":{\"oracleSchemas\":[{\"schema\":\"ROOT\"}]}}" -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles:discover
      
    2. 验证 Datastream 是否检索了数据库的所有架构。

    3. 在数据库中检索架构的表。在本教程中,您将使用发现 API 检索 ROOT 架构的表。但是,您可以发现数据库中任何架构的表。

      在提示符处,输入以下命令:

    curl -X POST -d "{\"connection_profile_name\":\"projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID\", \"oracle_rdbms\":{\"oracleSchemas\":[{\"schema\":\"ROOT\"}]}}" -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles:discover
    
    1. 验证 Datastream 是否检索了您指定的架构的所有表(在本教程中为 ROOT 架构)。

现在,您已为源 Oracle 数据库和 Cloud Storage 中的目标存储桶创建和管理连接配置文件,接下来可以在 Datastream 中创建和管理数据流。

创建和管理数据流

在本部分中,您将创建和管理数据流。Datastream 使用此数据流将数据、架构和表从源数据库转移到 Cloud Storage 目标存储桶中的文件夹。

创建和管理数据流的操作包括:

  • 验证数据流以确保数据流成功运行,并且通过所有验证检查。这些检查包括:
    • 来源是否已正确配置,允许 Datastream 从其中流式传输数据。
    • 数据流是否可以连接到来源和目标位置。
    • 数据流的端到端配置。
  • 使用以下列表创建数据流:
    • 允许列表。此列表指定源数据库中可由 Datastream 转移到 Cloud Storage 目标存储桶中的文件夹的表格和架构。在本教程中,这是 /root/tutorial 文件夹。
    • 拒绝列表。此列表指定源数据库中禁止 Datastream 转移到 Cloud Storage 目标存储桶中的文件夹的表和架构。
  • 检索有关数据流的信息
  • 修改数据流
  • 启动数据流,以便 Datastream 可以将数据、架构和表从来源数据库转移到 Cloud Storage 目标存储桶中的文件夹。
  • 使用 Fetch Errors API 检测与数据流关联的任何错误
  • 暂停数据流。暂停数据流后,DataStream 不会将来源数据库中的任何新数据拉取到目标存储桶。
  • 恢复已暂停的数据流,以便 Datastream 可以继续将数据转移到目标存储桶。

创建数据流

在此过程中,您将创建从 Oracle 源数据库到目标 Cloud Storage 存储桶文件夹中的数据流。您创建的数据流将同时包含允许列表和拒绝列表。

  1. 设置 SCHEMAS 变量。此变量定义了您希望 Datastream 从源数据库检索并转移到 Cloud Storage 目标存储桶的 /root/tutorial 文件夹中的数据和表所属的架构。在本教程中,设置 SCHEMAS 变量以与 ROOT 架构相关联。

    在提示符处,输入以下命令:

    SCHEMAS="{\"oracleSchemas\":[{\"schema\":\"ROOT\"}]}"
    
  2. 在提示符处,输入 echo $SCHEMAS | jq 命令以查看以简单易读的文本为此变量定义的 ROOT 架构。

  3. 创建数据流。在提示符处,输入以下命令:

    STREAM="{\"display_name\":\"DISPLAY_NAME\",\"source_config\":{\"source_connection_profile_name\":\"PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",\"oracle_source_config\":{\"allowlist\":$SCHEMAS,\"rejectlist\":{}}},\"destination_config\":{\"destination_connection_profile_name\":\"PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID\",\"gcs_destination_config\":{\"file_rotation_mb\":5,\"file_rotation_interval\":{\"seconds\":15},\"avro_file_format\":{}},\"backfill_all\":{}}}"
    
  4. 在提示符处,输入 echo $STREAM | jq 命令以查看以简单易读的文本创建的数据流。

    {
      "display_name": "DISPLAY_NAME",
      "source_config": {
        "source_connection_profile_name": "PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "oracle_source_config": {
          "allowlist": {
            "oracleSchemas": [
              {
                "schema": "ROOT"
              }
            ]
          },
          "rejectlist": {}
        }
      },
      "destination_config": {
        "destination_connection_profile_name": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
        "gcs_destination_config": {
          "file_rotation_mb": 5,
          "file_rotation_interval": {
            "seconds": 15
          },
          "avro_file_format": {}
        }
      },
      "backfill_all": {}
    }
    

    下表可帮助您了解数据流的以下参数:

    参数说明
    allowlist将从源数据库转移到 Cloud Storage 目标存储桶的文件夹中的表和数据所属的架构。在本教程中,ROOT 架构(且只有此架构)中的所有表和数据都将转移到目标存储桶的 /root/tutorial 文件夹中。
    rejectlist不会转移到 Cloud Storage 目标存储桶的文件夹中的表和数据所属的任何架构。在本教程中,{} 值表示系统不会阻止来源数据库中的任何表和数据转移到目标存储桶中。
    file_rotation_mb要从来源数据库转移到 Cloud Storage 目标存储桶的文件夹中的数据所属文件的大小(以 MB 为单位)。在本教程中,当从来源数据库检索数据时,系统会将数据写入 5 MB 的文件中。如果任何数据超过此大小,则该数据会被划分到多个 5 MB 的文件中。
    file_rotation_intervalDataStream 关闭 Cloud Storage 目标存储桶文件夹中的现有文件并打开另一个文件(其中包含从源数据库传输的数据)之前经过的秒数。在本教程中,文件轮替间隔设置为 15 秒。
    avro_file_format

    Datastream 从来源数据库转移到 Cloud Storage 目标存储桶的文件夹中的文件的格式。在本教程中,文件格式为 Avro。

    backfill_all

    此参数与历史回填相关联。通过将此参数设置为空字典 ({}),Datastream 将回填以下内容:

    • 历史数据,以及源数据库到目标位置的持续数据更改。
    • 从来源到目标位置的架构和表。
  5. 验证数据流,以确保数据流可成功运行,且能通过所有验证检查。在提示符处,输入以下命令:

    curl -X POST -d $STREAM -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" "https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams?stream_id=STREAM_ID&validate_only=true"
    
  6. 验证您是否看到 {} 行代码。这表示视频流已通过所有验证检查,并且没有任何与该视频流相关的错误。

  7. 提交数据流,以便于创建该数据流。在提示符处,输入以下命令:

    curl -X POST -d $STREAM -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams?stream_id=STREAM_ID
    
  8. 验证您是否看到以下代码行:

    {
      "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/streams/STREAM_ID",
        "verb": "create",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
    
  9. 确认已创建数据流。在提示符处,输入以下命令:

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams
    
  10. 验证您是否收到了创建的数据流的返回结果。

    {
      "streams": [
        {
          "name": "PROJECT_PATH/streams/STREAM_ID",
          "createTime": "DATE_AND_TIME_STAMP",
          "updateTime": "DATE_AND_TIME_STAMP",
          "displayName": "DISPLAY_NAME",
          "sourceConfig": {
            "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
            "oracleSourceConfig": {
              "allowlist": {
                "oracleSchemas": [
                  {
                    "schema": "ROOT"
                  }
                ]
              },
              "rejectlist": {}
            }
          },
          "destinationConfig": {
            "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
          "gcsDestinationConfig": {
              "fileRotationMb": 5,
              "fileRotationInterval": "15s"
              "avroFileFormat": {}
            }
          },
          "state": "CREATED",
          "backfillAll": {}
        }
      ]
    }
    

管理数据流

在此过程中,您将使用为将数据从来源 Oracle 数据库转移到 Cloud Storage 目标存储桶中的文件夹而创建的数据流。包括:

  • 检索有关数据流的信息
  • 修改数据流
  • 启动数据流
  • 使用 Fetch Errors API 检测与数据流关联的任何错误
  • 暂停和恢复直播
  1. 检索有关数据流的信息。在提示符处,输入以下命令:

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID
    
  2. 验证您是否可以看到此直播的相关信息。

    {
      "name": "PROJECT_PATH/streams/STREAM_ID",
      "createTime": "DATE_AND_TIME_STAMP",
      "updateTime": "DATE_AND_TIME_STAMP",
      "displayName": "DISPLAY_NAME",
      "sourceConfig": {
        "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "oracleSourceConfig": {
          "allowlist": {
            "oracleSchemas": [
              {
                "schema": "ROOT"
              }
            ]
          },
          "rejectlist": {}
         }
        },
        "destinationConfig": {
          "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
          "gcsDestinationConfig": {
            "fileRotationMb": 5,
            "fileRotationInterval": "15s"
            "avroFileFormat": {}
          }
        },
        "state": "CREATED",
        "backfillAll": {}
      }
    
  3. 修改此数据流。为此,请先设置 UPDATE 变量。此变量包含要更改的数据流值。在本教程中,您可更改所含数据从源数据库传输到 Cloud Storage 目标存储桶中文件夹(从 5 MB 到 100 MB)的文件的大小(以 MB 为单位)。从源数据库检索数据时,数据现在会被写入到 100 MB 的文件。如有任何数据超过此大小,系统会将数据拆分为多个大小为 100 MB 的文件。

    如需设置此变量,请在提示符处输入以下命令:

    UPDATE="{\"destination_config\":{\"gcs_destination_config\":{\"file_rotation_mb\":100}}}"
    
  4. 在提示符处,输入以下命令:

    curl -X PATCH -d $UPDATE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID/?update_mask=destination_config.gcs_destination_config.file_rotation_mb
    
  5. 验证您是否看到以下代码行:

    {
      "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/streams/STREAM_ID",
        "verb": "update",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
    
  6. 确认数据流已修改。在提示符处,输入以下命令:

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID
    
  7. 验证 Cloud Storage 连接配置文件的 fileRotationMb 参数的值现在是否为 100

    {
      "name": "PROJECT_PATH/streams/STREAM_ID",
      "createTime": "DATE_AND_TIME_STAMP",
      "updateTime": "DATE_AND_TIME_STAMP",
      "displayName": "DISPLAY_NAME",
      "sourceConfig": {
        "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "oracleSourceConfig": {
          "allowlist": {
            "oracleSchemas": [
              {
                "schema": "ROOT"
              }
            ]
          },
          "rejectlist": {}
         }
        },
        "destinationConfig": {
          "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
          "gcsDestinationConfig": {
            "fileRotationMb": 100,
            "fileRotationInterval": "15s"
            "avroFileFormat": {}
          }
        },
        "state": "CREATED",
        "backfillAll": {}
      }
    
  8. 启动数据流。为此,请执行以下操作:

    1. 更改 UPDATE 变量。在提示符处,输入以下命令:

      UPDATE="{\"state\":\"RUNNING\"}"
      
    2. 然后,输入以下命令:

      curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID?updateMask=state
      
  9. 验证您是否看到以下代码行。

    {
      "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/streams/STREAM_ID",
        "verb": "start",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
    
  10. 几分钟后,检索有关流式传输的信息,以确认流式传输已开始:

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID
    
  11. 验证数据流的状态是否已从 CREATED 更改为 RUNNING

    {
      "name": "PROJECT_PATH/streams/STREAM_ID",
      "createTime": "DATE_AND_TIME_STAMP",
      "updateTime": "DATE_AND_TIME_STAMP",
      "displayName": "DISPLAY_NAME",
      "sourceConfig": {
        "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "oracleSourceConfig": {
          "allowlist": {
            "oracleSchemas": [
              {
                "schema": "ROOT"
              }
            ]
          },
          "rejectlist": {}
        }
      },
      "destinationConfig": {
        "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
        "gcsDestinationConfig": {
          "fileRotationMb": 100,
          "fileRotationInterval": "15s"
          "avroFileFormat": {}
        }
      },
      "state": "RUNNING",
      "backfillAll": {}
    }
    
  12. 使用 Fetch Errors API 检索与数据流关联的任何错误。

    1. 在提示符处,输入以下命令:

      curl -X POST -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID:fetchErrors
      
    2. 验证您是否看到以下代码行:

        {
          "name": "PROJECT_PATH/operations/operation-FETCH_ERRORS_OPERATION_ID",
          "metadata": {
            "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
            "createTime": "DATE_AND_TIME_STAMP",
            "target": "PROJECT_PATH/streams/STREAM_ID",
            "verb": "fetchErrors",
            "requestedCancellation": false,
            "apiVersion": "DATASTREAM_API_VERSION"
          },
          "done": false
        }
        

    3. 在提示符处,输入以下命令:

      curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/operations/operation-FETCH_ERRORS_OPERATION_ID
      
    4. 验证您是否看到以下代码行:

        {
          "name": "PROJECT_PATH/operations/operation-FETCH_ERRORS_OPERATION_ID",
          "metadata": {
            "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
            "createTime": "DATE_AND_TIME_STAMP",
            "endTime": "DATE_AND_TIME_STAMP",
            "target": "PROJECT_PATH/streams/STREAM_ID",
            "verb": "fetchErrors",
            "requestedCancellation": false,
            "apiVersion": "DATASTREAM_API_VERSION"
          },
          "done": true,
          "response": {
            "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.FetchErrorsResponse"
          }
        }
        

  13. 暂停数据流。为此,请执行以下操作:

    1. 更改 UPDATE 变量。在提示符处,输入以下命令:

      UPDATE="{\"state\":\"PAUSED\"}"
      
    2. 然后,输入以下命令:

      curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID?updateMask=state
      
  14. 验证您是否看到以下代码行。

    {
      "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/streams/STREAM_ID",
        "verb": "start",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
    
  15. 检索有关数据流的信息,以确认其已暂停。

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID
    
  16. 验证数据流的状态是否已从 RUNNING 更改为 PAUSED

    {
      "name": "PROJECT_PATH/streams/STREAM_ID",
      "createTime": "DATE_AND_TIME_STAMP",
      "updateTime": "DATE_AND_TIME_STAMP",
      "displayName": "DISPLAY_NAME",
      "sourceConfig": {
        "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "oracleSourceConfig": {
          "allowlist": {
            "oracleSchemas": [
              {
                "schema": "ROOT"
              }
            ]
          },
          "rejectlist": {}
        }
      },
      "destinationConfig": {
        "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
        "gcsDestinationConfig": {
          "fileRotationMb": 100,
          "fileRotationInterval": "15s"
          "avroFileFormat": {}
        }
      },
      "state": "PAUSED",
      "backfillAll": {}
    }
    
  17. 恢复暂停的数据流。为此,请执行以下操作:

    1. 更改 UPDATE 变量。在提示符处,输入以下命令:

      UPDATE="{\"state\":\"RUNNING\"}"
      
    2. 然后,输入以下命令:

      curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID?updateMask=state
      
  18. 验证您是否看到以下代码行。

    {
      "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/streams/STREAM_ID",
        "verb": "start",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
    
  19. 几秒钟后,检索有关数据流的信息以确认其再次运行。

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID
    
  20. 验证数据流的状态是否已从 PAUSED 更改回 RUNNING

    {
      "name": "PROJECT_PATH/streams/STREAM_ID",
      "createTime": "DATE_AND_TIME_STAMP",
      "updateTime": "DATE_AND_TIME_STAMP",
      "displayName": "DISPLAY_NAME",
      "sourceConfig": {
        "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "oracleSourceConfig": {
          "allowlist": {
            "oracleSchemas": [
              {
                "schema": "ROOT"
              }
            ]
          },
          "rejectlist": {}
        }
      },
      "destinationConfig": {
        "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
        "gcsDestinationConfig": {
          "fileRotationMb": 100,
          "fileRotationInterval": "15s"
          "avroFileFormat": {}
        }
      },
      "state": "RUNNING",
      "backfillAll": {}
    }
    

现在您已创建和管理数据流,确认没有与该数据流关联的错误,并且该数据流的状态为 RUNNING,接下来您可以验证它是否可以将数据从来源数据库转移到 Cloud Storage 目标存储桶中的文件夹。

验证数据流

在此过程中,您将确认 Datastream 可以执行以下操作:

  • 将与来源 Oracle 数据库的 ROOT 架构关联的所有表中的数据转移到 Cloud Storage 目标存储桶中的 /root/tutorial 文件夹。
  • 将数据转换为 Avro 文件格式。
  1. 转到 Cloud Storage 中的 Storage 浏览器页面。

    转到 Storage 浏览器页面

  2. 点击包含您的存储桶的链接。

  3. 如果对象标签页未处于活跃状态,请点击它。

  4. 点击 root 文件夹,然后点击 tutorial 文件夹。

  5. 验证您是否看到了表示来源 Oracle 数据库的 ROOT 架构的表的文件夹。

  6. 点击其中一个表文件夹并展开细目,直到您看到与该表关联的数据。

  7. 点击代表相关数据的文件,然后点击下载

  8. 在 Avro 工具(例如 Avro Viewer)中打开此文件,以确保内容可读。这确认了 Datastream 还将数据转换为 Avro 文件格式。

清理

完成本教程后,您可以清理在 Datastream 上创建的资源,以免这些资源占用配额,日后产生费用。以下部分介绍如何删除或关闭这些资源。

删除项目

若要避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除 Cloud Storage 目标存储桶

  1. 在 Cloud Storage 的左侧抽屉式导航栏中,点击浏览器项。

  2. 选中存储桶左侧的复选框,然后点击删除

  3. 在“要删除存储桶吗?”窗口的文本字段中,输入存储桶的名称,然后点击确认

删除数据流

  1. 确保您的 Cloud Shell 应用处于活跃状态。

  2. 在提示符处,输入以下命令:

    curl -X DELETE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID
    
  3. 验证您是否看到以下代码行:

    {
      "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/streams/STREAM_ID",
        "verb": "delete",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
    
  4. 确认数据流已删除。在提示符处,输入以下命令:

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams
    
  5. 验证是否返回了 null {} 值。这表示 Datastream 中不再有任何数据流,并且您创建的数据流已删除。

删除连接配置文件

  1. 删除与源 Oracle 数据库的连接配置文件。在提示符处,输入以下命令:

    curl -X DELETE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID
    
  2. 验证您是否看到以下代码行:

    {
      "name": "PROJECT_PATH/operations/operation-SOURCE_CONNECTION_PROFILE_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "verb": "delete",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
    
  3. 删除与 Cloud Storage 中目标存储桶的连接配置文件。在提示符处,输入以下命令:

    curl -X DELETE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID
    
  4. 验证您是否看到以下代码行:

    {
      "name": "PROJECT_PATH/operations/operation-DESTINATION_CONNECTION_PROFILE_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
        "verb": "delete",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
    
  5. 确认两个连接配置文件都已删除。在提示符处,输入以下命令:

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles
    
  6. 验证是否返回了 null {} 值。这表示 Datastream 中不再有任何连接配置文件,您创建的配置文件已删除。

后续步骤

  • 详细了解 Datastream
  • 试用其他 Google Cloud 功能。查阅我们的教程