使用 DataStream API

Datastream

概览

对于拥有许多独立数据源的企业而言,访问整个组织内的企业数据(尤其是实时访问)并非易事。这会导致数据访问受限且速度缓慢,因而造成组织无法进行检查。

借助 Datastream,您可以近乎实时地访问来自各种本地数据源和云数据源的更改数据,从而创建对组织数据的访问权限。Datastream 提供简单的设置体验和统一的消费 API,让组织可以轻松访问整个组织中可用的最新企业数据,为集成式近实时场景提供支持。

其中一种场景是,将数据从来源数据库转移到云端存储服务或消息传递队列,然后将这些数据转换为可供与该存储服务或消息传递队列通信的其他应用和服务读取的形式。

在本教程中,您将学习如何使用 Datastream 将架构、表和数据从源 Oracle 数据库转移到 Cloud Storage 存储桶中的文件夹。Cloud Storage 是一种用于在 Google Cloud 上存储和访问数据的 Web 服务。该服务将 Google 云的高性能和可扩展性与先进的安全和共享功能融合在一起。

在将此信息转移到目标 Cloud Storage 存储桶中的文件夹的过程中,Datastream 会将此信息转换为 Avro。Avro 由以 JavaScript 对象表示法 (JSON) 编写的架构定义。通过进行这种转换,您可以以统一的方式读取来自不同数据源的数据。

目标

在本教程中,您将学习如何:

  • 设置环境变量。向 Datastream 发出请求时,您将使用这些变量创建和管理连接配置文件数据流
  • 创建和管理用于 Cloud Storage 中源数据库和目标存储桶的连接配置文件。创建这些连接配置文件后,您将创建包含来源数据库和 Cloud Storage 目标存储桶的相关信息的记录。Datastream 中的数据流使用连接配置文件中的信息将数据从源数据库转移到目标存储桶中的文件夹。
  • 创建和管理数据流。Datastream 使用此数据流将数据、架构和表从源数据库转移到目标存储桶中的文件夹。
  • 验证 Datastream 是否将与源 Oracle 数据库的架构关联的数据和表转移到目标存储桶中的文件夹,以及是否将这些数据转换为 Avro 文件格式。
  • 清理您在 Datastream 上创建的资源,以避免这些资源占用配额并在日后产生费用。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

  • Cloud Storage

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Google Cloud project.

  6. 启用 Datastream API。

    启用 API

  7. 确保您已为您的用户账号分配 Datastream Admin 角色。

    转到 IAM 页面

  8. 确保您有一个 Datastream 可以访问的来源数据库。本教程使用 Oracle 数据库作为来源。
  9. 配置来源数据库以允许来自 Datastream 公共 IP 地址的传入连接。如需访问所有 Datastream 区域的位置及其关联的公共 IP 地址,请参阅 IP 许可名单和区域
  10. 确保您已配置一个目标 Cloud Storage 存储分区,Datastream 可以使用 IP 许可名单、SSH 隧道或 VPC 对等互连网络连接方法对其进行访问。
  11. 确保来源数据库中存在 Datastream 可转移到 Cloud Storage 目标存储桶中的文件夹的数据、表和架构。
  12. 下载并安装 Cloud Shell。此客户端应用可让您通过命令行访问云资源(包括 Datastream)。
  13. 安装并配置 jq 实用程序。此实用程序是一个轻量级且灵活的命令行 JSON 处理器。您将使用此处理器,以简洁易懂的文本来展示复杂的 cURL 命令。

设置环境变量

在此过程中,您将设置以下变量:

  • $PROJECT:此变量与您的 Google Cloud 项目相关联。您分配和使用的任何 Google Cloud 资源都必须属于一个项目。
  • $TOKEN:此变量与访问令牌相关联。访问令牌提供了一个会话,Cloud Shell 在此会话中可以使用 REST API 在 Datastream 中执行任务。
  1. 启动 Cloud Shell 应用。

  2. 使用您的 Google 账号在您的应用中验证身份后,请输入 gcloud auth login

  3. Do you want to continue (Y/n)? 提示符处,输入 Y

  4. 打开网络浏览器并将网址复制到浏览器中。

  5. 使用您的 Google 账号向 Google Cloud SDK 进行身份验证。一个验证码会显示在登录页面上。此代码是您的访问令牌。

  6. 复制访问令牌,将其粘贴到 Cloud Shell 应用的 Enter verification code: 参数中,然后按 Enter

  7. 在提示符处,输入 PROJECT="YOUR_PROJECT_NAME" 以将 $PROJECT 环境变量设置为您的 Google Cloud 项目。

  8. 在提示符处,输入 gcloud config set project YOUR_PROJECT_NAME 以将您希望处理的项目设置为您的 Google Cloud 项目。

    您的命令提示符会更新以反映您的活跃项目并遵循以下格式:USERNAME@cloudshell:~ (YOUR_PROJECT_NAME)$

  9. 在提示符处,输入 TOKEN=$(gcloud auth print-access-token) 以检索访问令牌并将其存储为变量。

  10. 在提示符后输入以下命令,确保正确设置了 $PROJECT$TOKEN 变量:

    • echo $PROJECT
    • echo $TOKEN

现在您已设置了变量,可以向 Datastream 发出请求来创建和管理连接配置文件和数据流。

创建和管理连接配置文件

在本部分中,您将为源 Oracle 数据库和 Cloud Storage 中的目标存储桶创建和管理连接配置文件。

创建这些连接配置文件后,您将创建包含来源数据库和 Cloud Storage 目标存储桶的相关信息的记录。Datastream 使用连接配置文件中的信息将数据从来源数据库转移到目标存储桶中的文件夹。

创建和管理连接配置文件的操作包括:

  • 为来源 Oracle 数据库和 Cloud Storage 中的目标存储桶创建连接配置文件
  • 检索有关连接配置文件的信息
  • 修改连接配置文件
  • 对来源 Oracle 连接配置文件执行 Discover API 调用。通过此调用,您可以在数据库中查找以查看与其关联的对象。这些对象包含数据库数据所属的架构和表。使用 Datastream 配置数据流时,您可能不希望从数据库中拉取所有对象,而是想要提取对象的一部分(例如,只有数据库的特定表和架构)。使用 Discover API 可帮助您查找(或发现)要拉取的那部分数据库对象。

创建连接配置文件

在此过程中,您将创建两个连接配置文件:一个与来源 Oracle 数据库相关联,另一个与 Cloud Storage 中的目标存储桶相关联。

  1. 为源 Oracle 数据库创建连接配置文件。在提示符处,输入以下命令:
ORACLE="{\"displayName\":\"DISPLAY_NAME\",\"oracle_profile\":{\"hostname\":\"HOSTNAME\",\"username\":\"USERNAME\",\"password\":\"PASSWORD\",\"database_service\":\"DATABASE_SERVICE\",\"port\":"PORT_NUMBER\"},\"no_connectivity\":{}}"
  

下表可以帮助您了解源 Oracle 数据库的参数值:

参数值替换为
DISPLAY_NAME到源数据库的连接配置文件的显示名称。
HOSTNAME来源数据库服务器的主机名。
USERNAME来源数据库账号的用户名(例如 ROOT)。
PASSWORD来源数据库账号的密码。
DATABASE_SERVICE用于确保来源数据库受到保护和监控的服务。对于 Oracle 数据库,数据库服务通常为 ORCL
PORT_NUMBER为源数据库预留的端口号。对于 Oracle 数据库,端口号通常为 1521。

  1. 在提示符处,输入 echo $ORACLE | jq 命令以查看以简单易读的文本创建的源连接配置文件。

    {
      "displayName": "DISPLAY_NAME",
      "oracle_profile": {
        "hostname": "HOSTNAME",
        "username": "USERNAME",
        "password": "PASSWORD",
        "database_service": "DATABASE_SERVICE",
        "port": PORT_NUMBER
       },
      "no_connectivity": {}
    }
  2. 提交 Oracle 连接配置文件,以便创建该配置文件。在提示符处,输入以下命令:

    curl -X POST -d $ORACLE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles?connection_profile_id=SOURCE_CONNECTION_PROFILE_ID

    下表可帮助了解此命令的参数值:

    参数值替换为
    DATASTREAM_API_VERSIONDatastream API 的当前版本(例如 v1)。
    PROJECT_PATHGoogle Cloud 项目的完整路径(例如 projects/$PROJECT/locations/YOUR_PROJECT_LOCATION)。
    SOURCE_CONNECTION_PROFILE_ID为此连接配置文件预留的唯一标识符(例如 cp-1)。
  3. 验证您是否看到以下代码行:

    {
      "name": "PROJECT_PATH/operations/operation-SOURCE_CONNECTION_PROFILE_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "datastream.googleapis.com/DATASREAM_VERSION/PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "verb": "create",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
  4. 创建与 Cloud Storage 中目标存储桶相关联的连接配置文件。在提示符处,输入以下命令:

    GOOGLECLOUDSTORAGE="{\"displayName\":\"DISPLAY_NAME\",\"gcs_profile\":{\"bucket_name\":\"BUCKET_NAME\",\"root_path\":\"/FOLDER_PATH\"},\"no_connectivity\":{}}"

    下表可帮助您了解目标存储桶的参数值:

    参数值替换为
    DISPLAY_NAME到目标存储桶的连接配置文件的显示名。
    BUCKET_NAME目标存储桶的名称。
    FOLDER_PATHDatastream 将数据从源数据库转移到的目标存储桶中的文件夹(例如 /root/path)。
  5. 在提示符处,输入 echo $GOOGLECLOUDSTORAGE | jq 命令以查看以简单易读的文本创建的目标连接配置文件。

    {
      "displayName": "DISPLAY_NAME",
      "gcs_profile": {
        "bucket_name": "BUCKET_NAME",
        "root_path": "/FOLDER_PATH"
      },
      "no_connectivity": {}
    }
  6. 提交 Cloud Storage 连接配置文件,以便其可以创建。在提示符处,输入以下命令:

    curl -X POST -d $GOOGLECLOUDSTORAGE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles?connection_profile_id=DESTINATION_CONNECTION_PROFILE_ID
  7. 验证您是否看到以下代码行:

    {
      "name": "PROJECT_PATH/operations/operation-DESTINATION_CONNECTION_PROFILE_OPERATION_ID",
      "metadata": {
        "@type": "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "datastream.googleapis.com/DATASTREAM_VERSION/PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
        "verb": "create",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
  8. 确认已创建两个连接配置文件。在提示符处,输入以下命令:

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles
  9. 验证您是否收到了源和目标连接配置文件的两个返回结果。

    {
      "connectionProfiles": [
        {
          "name": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
          "createTime": "DATE_AND_TIME_STAMP",
          "updateTime": "DATE_AND_TIME_STAMP",
          "displayName": "DISPLAY_NAME",
          "gcsProfile": {
            "bucketName": "BUCKET_NAME",
            "rootPath": "FOLDER_PATH"
          },
          "noConnectivity": {}
        },
       {
        "name": "PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "createTime": "DATE_AND_TIME_STAMP",
        "updateTime": "DATE_AND_TIME_STAMP",
        "displayName": "DISPLAY_NAME",
        "oracleProfile": {
          "hostname": "HOSTNAME",
          "port": PORT_NUMBER,
          "username": "USERNAME",
          "databaseService": "DATABASE_SERVICE"
        },
        "noConnectivity": {}
        }
      ]
    }

管理连接配置文件

在此过程中,您将管理为来源 Oracle 数据库和 Cloud Storage 中的目标存储桶创建的连接配置文件。包括:

  • 检索有关 Cloud Storage 目标连接配置文件的信息
  • 修改此连接配置文件。在本教程中,将 Cloud Storage 目标存储桶的文件夹更改为 /root/tutorial。Datastream 会将数据从来源数据库转移到此文件夹中。
  • 对来源 Oracle 连接配置文件执行 Discover API 调用
  1. 检索有关 Cloud Storage 目标连接配置文件的信息。在提示符处,输入以下命令:

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID
  2. 验证您是否能看到有关此连接配置文件的信息。

    {
      "name": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
      "createTime": "DATE_AND_TIME_STAMP",
      "updateTime": "DATE_AND_TIME_STAMP",
      "displayName": "DISPLAY_NAME",
      "gcsProfile": {
        "bucketName": "BUCKET_NAME",
        "rootPath": "FOLDER_PATH"
      },
      "noConnectivity": {}
    }
  3. 修改此连接配置文件。为此,请先设置 UPDATE 变量。此变量包含要更改的连接配置文件的值。在本教程中,您将目标存储桶的文件夹更改为 /root/tutorial

    如需设置此变量,请在提示符处输入以下命令:

    UPDATE="{\"gcsProfile\":{\"rootPath\":\"/root/tutorial\"}}"
  4. 在提示符处,输入以下命令:

    curl -X PATCH -d $UPDATE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID?update_mask=gcsProfile.rootPath
  5. 验证您是否看到以下代码行:

    {
      "name": "PROJECT_PATH/operations/operation-DESTINATION_CONNECTION_PROFILE_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
        "verb": "update",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
  6. 确认连接配置文件已修改。在提示符处,输入以下命令:

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID
  7. 验证 Cloud Storage 连接配置文件的目标存储桶的文件夹现在是否为 /root/tutorial

    {
      "name": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
      "createTime": "DATE_AND_TIME_STAMP",
      "updateTime": "DATE_AND_TIME_STAMP",
      "displayName": "DISPLAY_NAME",
      "gcsProfile": {
        "bucketName": "BUCKET_NAME",
        "rootPath": "/root/tutorial"
      },
      "noConnectivity": {}
    }
  8. 使用 Datastream 发现 API 来发现源 Oracle 数据库的架构和表。Datastream 通过源连接配置文件提供对此数据库的访问权限。

    1. 探索 Oracle 数据库的架构。在提示符处,输入以下命令:

      curl -X POST -d "{\"connection_profile_name\":\"projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID\", \"oracle_rdbms\":{\"oracleSchemas\":[{\"schema\":\"ROOT\"}]}}" -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles:discover
    2. 验证 Datastream 是否检索了数据库的所有架构。

    3. 在数据库中检索架构表。在本教程中,您将使用发现 API 检索 ROOT 架构的表。但是,您可以发现数据库中任何架构的表。

      在提示符处,输入以下命令:

    curl -X POST -d "{\"connection_profile_name\":\"projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID\", \"oracle_rdbms\":{\"oracleSchemas\":[{\"schema\":\"ROOT\"}]}}" -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles:discover
    1. 验证 Datastream 是否检索了您指定的架构的所有表(在本教程中为 ROOT 架构)。

现在,您已为源 Oracle 数据库和 Cloud Storage 中的目标存储桶创建和管理连接配置文件,接下来可以在 Datastream 中创建和管理数据流。

创建和管理数据流

在本部分中,您将创建和管理数据流。Datastream 使用此数据流将数据、架构和表从源数据库转移到 Cloud Storage 目标存储桶中的文件夹。

创建和管理数据流的操作包括:

  • 验证数据流以确保数据流成功运行,并且通过所有验证检查。这些检查包括:
    • 来源是否已正确配置,允许 Datastream 从其中流式传输数据。
    • 数据流是否可以连接到来源和目标位置。
    • 数据流的端到端配置。
  • 使用以下列表创建数据流:
    • 允许列表。此列表指定源数据库中可由 Datastream 转移到 Cloud Storage 目标存储桶中的文件夹的表格和架构。在本教程中,这是 /root/tutorial 文件夹。
    • 拒绝列表。此列表指定源数据库中禁止 Datastream 转移到 Cloud Storage 目标存储桶中的文件夹的表和架构。
  • 检索有关数据流的信息
  • 修改数据流
  • 启动数据流,以便 Datastream 可以将数据、架构和表从来源数据库转移到 Cloud Storage 目标存储桶中的文件夹。
  • 使用 Fetch Errors API 检测与数据流关联的任何错误
  • 暂停数据流。暂停数据流后,DataStream 不会将来源数据库中的任何新数据拉取到目标存储桶。
  • 恢复已暂停的数据流,以便 Datastream 可以继续将数据传输到目标存储桶。

创建数据流

在此过程中,您要创建一个从 Oracle 源数据库到目标 Cloud Storage 存储桶文件夹中的数据流。您创建的数据流将同时包含允许列表和拒绝列表。

  1. 设置 SCHEMAS 变量。此变量定义了您希望 Datastream 从源数据库检索并转移到 Cloud Storage 目标存储桶的 /root/tutorial 文件夹中的数据和表所属的架构。在本教程中,设置 SCHEMAS 变量以与 ROOT 架构相关联。

    在提示符处,输入以下命令:

    SCHEMAS="{\"oracleSchemas\":[{\"schema\":\"ROOT\"}]}"
  2. 在提示符处,输入 echo $SCHEMAS | jq 命令以查看以简单易读的文本为此变量定义的 ROOT 架构。

  3. 创建数据流。在提示符处,输入以下命令:

    STREAM="{\"display_name\":\"DISPLAY_NAME\",\"source_config\":{\"source_connection_profile_name\":\"PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",\"oracle_source_config\":{\"allowlist\":$SCHEMAS,\"rejectlist\":{}}},\"destination_config\":{\"destination_connection_profile_name\":\"PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID\",\"gcs_destination_config\":{\"file_rotation_mb\":5,\"file_rotation_interval\":{\"seconds\":15},\"avro_file_format\":{}},\"backfill_all\":{}}}"
  4. 在提示符处,输入 echo $STREAM | jq 命令以查看以简单易读的文本创建的数据流。

    {
      "display_name": "DISPLAY_NAME",
      "source_config": {
        "source_connection_profile_name": "PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "oracle_source_config": {
          "allowlist": {
            "oracleSchemas": [
              {
                "schema": "ROOT"
              }
            ]
          },
          "rejectlist": {}
        }
      },
      "destination_config": {
        "destination_connection_profile_name": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
        "gcs_destination_config": {
          "file_rotation_mb": 5,
          "file_rotation_interval": {
            "seconds": 15
          },
          "avro_file_format": {}
        }
      },
      "backfill_all": {}
    }

    下表可帮助您了解数据流的以下参数:

    参数说明
    allowlist将从源数据库转移到 Cloud Storage 目标存储桶的文件夹中的表和数据所属的架构。在本教程中,ROOT 架构(且只有此架构)中的所有表和数据都将转移到目标存储桶的 /root/tutorial 文件夹中。
    rejectlist不会转移到 Cloud Storage 目标存储桶的文件夹中的表和数据所属的任何架构。在本教程中,{} 值表示系统不会阻止来源数据库中的任何表和数据转移到目标存储桶中。
    file_rotation_mb要从来源数据库转移到 Cloud Storage 目标存储桶的文件夹中的数据所属文件的大小(以 MB 为单位)。在本教程中,当从来源数据库检索数据时,系统会将数据写入 5 MB 的文件中。如果任何数据超过此大小,则该数据会被划分到多个 5 MB 的文件中。
    file_rotation_intervalDatastream 会关闭 Cloud Storage 目标存储桶文件夹中的现有文件并打开另一个文件以包含从源数据库转移的数据,这需要秒数。在本教程中,文件轮替间隔设置为 15 秒。
    avro_file_format

    Datastream 从来源数据库转移到 Cloud Storage 目标存储桶的文件夹中的文件的格式。在本教程中,文件格式为 Avro。

    backfill_all

    此参数与历史回填相关联。如果将此参数设置为空字典 ({}),Datastream 将回填以下内容:

    • 历史数据,以及源数据库到目标位置的持续数据更改。
    • 从来源到目标位置的架构和表。
  5. 验证该数据流,以确保该数据流可成功运行,且已通过所有验证检查。在提示符处,输入以下命令:

    curl -X POST -d $STREAM -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" "https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams?stream_id=STREAM_ID&validate_only=true"
  6. 验证您是否看到 {} 行代码。这表示视频流已通过所有验证检查,并且没有与视频流相关的错误。

  7. 提交数据流,以便于创建该数据流。在提示符处,输入以下命令:

    curl -X POST -d $STREAM -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams?stream_id=STREAM_ID
  8. 验证您是否看到以下代码行:

    {
      "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/streams/STREAM_ID",
        "verb": "create",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
  9. 确认已创建数据流。在提示符处,输入以下命令:

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams
  10. 验证您是否收到了创建的数据流的返回结果。

    {
      "streams": [
        {
          "name": "PROJECT_PATH/streams/STREAM_ID",
          "createTime": "DATE_AND_TIME_STAMP",
          "updateTime": "DATE_AND_TIME_STAMP",
          "displayName": "DISPLAY_NAME",
          "sourceConfig": {
            "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
            "oracleSourceConfig": {
              "allowlist": {
                "oracleSchemas": [
                  {
                    "schema": "ROOT"
                  }
                ]
              },
              "rejectlist": {}
            }
          },
          "destinationConfig": {
            "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
          "gcsDestinationConfig": {
              "fileRotationMb": 5,
              "fileRotationInterval": "15s"
              "avroFileFormat": {}
            }
          },
          "state": "CREATED",
          "backfillAll": {}
        }
      ]
    }

管理数据流

在此过程中,您将使用为将数据从来源 Oracle 数据库转移到 Cloud Storage 目标存储桶中的文件夹而创建的数据流。包括:

  • 检索有关数据流的信息
  • 修改数据流
  • 启动数据流
  • 使用 Fetch Errors API 检测与数据流关联的任何错误
  • 暂停和恢复数据流
  1. 检索有关数据流的信息。在提示符处,输入以下命令:

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID
  2. 验证您是否看到了有关此数据流的信息。

    {
      "name": "PROJECT_PATH/streams/STREAM_ID",
      "createTime": "DATE_AND_TIME_STAMP",
      "updateTime": "DATE_AND_TIME_STAMP",
      "displayName": "DISPLAY_NAME",
      "sourceConfig": {
        "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "oracleSourceConfig": {
          "allowlist": {
            "oracleSchemas": [
              {
                "schema": "ROOT"
              }
            ]
          },
          "rejectlist": {}
         }
        },
        "destinationConfig": {
          "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
          "gcsDestinationConfig": {
            "fileRotationMb": 5,
            "fileRotationInterval": "15s"
            "avroFileFormat": {}
          }
        },
        "state": "CREATED",
        "backfillAll": {}
      }
  3. 修改此数据流。为此,请先设置 UPDATE 变量。此变量包含要更改的数据流值。在本教程中,更改要从来源数据库转移到 Cloud Storage 目标存储桶的文件夹中的数据所属文件的大小(从 5 MB 更改为 100 MB)。现在,当从来源数据库检索数据时,系统会将数据写入 100 MB 的文件中。如果任何数据超过此大小,则该数据会被划分到多个 100 MB 的文件中。

    如需设置此变量,请在提示符处输入以下命令:

    UPDATE="{\"destination_config\":{\"gcs_destination_config\":{\"file_rotation_mb\":100}}}"
  4. 在提示符处,输入以下命令:

    curl -X PATCH -d $UPDATE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID/?update_mask=destination_config.gcs_destination_config.file_rotation_mb
  5. 验证您是否看到以下代码行:

    {
      "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/streams/STREAM_ID",
        "verb": "update",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
  6. 确认数据流已修改。在提示符处,输入以下命令:

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID
  7. 验证 Cloud Storage 连接配置文件的 fileRotationMb 参数的值现在是否为 100

    {
      "name": "PROJECT_PATH/streams/STREAM_ID",
      "createTime": "DATE_AND_TIME_STAMP",
      "updateTime": "DATE_AND_TIME_STAMP",
      "displayName": "DISPLAY_NAME",
      "sourceConfig": {
        "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "oracleSourceConfig": {
          "allowlist": {
            "oracleSchemas": [
              {
                "schema": "ROOT"
              }
            ]
          },
          "rejectlist": {}
         }
        },
        "destinationConfig": {
          "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
          "gcsDestinationConfig": {
            "fileRotationMb": 100,
            "fileRotationInterval": "15s"
            "avroFileFormat": {}
          }
        },
        "state": "CREATED",
        "backfillAll": {}
      }
  8. 启动数据流。为此,请执行以下操作:

    1. 更改 UPDATE 变量。在提示符处,输入以下命令:

      UPDATE="{\"state\":\"RUNNING\"}"
    2. 然后,输入以下命令:

      curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID?updateMask=state
  9. 验证您是否看到以下代码行。

    {
      "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/streams/STREAM_ID",
        "verb": "start",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
  10. 几分钟后,检索有关数据流的信息以确认其已启动:

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID
  11. 验证数据流的状态是否已从 CREATED 更改为 RUNNING

    {
      "name": "PROJECT_PATH/streams/STREAM_ID",
      "createTime": "DATE_AND_TIME_STAMP",
      "updateTime": "DATE_AND_TIME_STAMP",
      "displayName": "DISPLAY_NAME",
      "sourceConfig": {
        "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "oracleSourceConfig": {
          "allowlist": {
            "oracleSchemas": [
              {
                "schema": "ROOT"
              }
            ]
          },
          "rejectlist": {}
        }
      },
      "destinationConfig": {
        "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
        "gcsDestinationConfig": {
          "fileRotationMb": 100,
          "fileRotationInterval": "15s"
          "avroFileFormat": {}
        }
      },
      "state": "RUNNING",
      "backfillAll": {}
    }
  12. 使用 Fetch Errors API 检索与数据流关联的任何错误。

    1. 在提示符处,输入以下命令:

      curl -X POST -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID:fetchErrors
    2. 验证您是否看到以下代码行:

        {
          "name": "PROJECT_PATH/operations/operation-FETCH_ERRORS_OPERATION_ID",
          "metadata": {
            "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
            "createTime": "DATE_AND_TIME_STAMP",
            "target": "PROJECT_PATH/streams/STREAM_ID",
            "verb": "fetchErrors",
            "requestedCancellation": false,
            "apiVersion": "DATASTREAM_API_VERSION"
          },
          "done": false
        }
        

    3. 在提示符处,输入以下命令:

      curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/operations/operation-FETCH_ERRORS_OPERATION_ID
    4. 验证您是否看到以下代码行:

        {
          "name": "PROJECT_PATH/operations/operation-FETCH_ERRORS_OPERATION_ID",
          "metadata": {
            "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
            "createTime": "DATE_AND_TIME_STAMP",
            "endTime": "DATE_AND_TIME_STAMP",
            "target": "PROJECT_PATH/streams/STREAM_ID",
            "verb": "fetchErrors",
            "requestedCancellation": false,
            "apiVersion": "DATASTREAM_API_VERSION"
          },
          "done": true,
          "response": {
            "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.FetchErrorsResponse"
          }
        }
        

  13. 暂停数据流。为此,请执行以下操作:

    1. 更改 UPDATE 变量。在提示符处,输入以下命令:

      UPDATE="{\"state\":\"PAUSED\"}"
    2. 然后,输入以下命令:

      curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID?updateMask=state
  14. 验证您是否看到以下代码行。

    {
      "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/streams/STREAM_ID",
        "verb": "start",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
  15. 检索有关数据流的信息,确认其已暂停。

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID
  16. 验证数据流的状态是否已从 RUNNING 更改为 PAUSED

    {
      "name": "PROJECT_PATH/streams/STREAM_ID",
      "createTime": "DATE_AND_TIME_STAMP",
      "updateTime": "DATE_AND_TIME_STAMP",
      "displayName": "DISPLAY_NAME",
      "sourceConfig": {
        "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "oracleSourceConfig": {
          "allowlist": {
            "oracleSchemas": [
              {
                "schema": "ROOT"
              }
            ]
          },
          "rejectlist": {}
        }
      },
      "destinationConfig": {
        "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
        "gcsDestinationConfig": {
          "fileRotationMb": 100,
          "fileRotationInterval": "15s"
          "avroFileFormat": {}
        }
      },
      "state": "PAUSED",
      "backfillAll": {}
    }
  17. 恢复暂停的数据流。为此,请执行以下操作:

    1. 更改 UPDATE 变量。在提示符处,输入以下命令:

      UPDATE="{\"state\":\"RUNNING\"}"
    2. 然后,输入以下命令:

      curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID?updateMask=state
  18. 验证您是否看到以下代码行。

    {
      "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/streams/STREAM_ID",
        "verb": "start",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
  19. 几秒钟后,检索有关数据流的信息以确认其再次运行。

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID
  20. 验证数据流的状态是否已从 PAUSED 更改回 RUNNING

    {
      "name": "PROJECT_PATH/streams/STREAM_ID",
      "createTime": "DATE_AND_TIME_STAMP",
      "updateTime": "DATE_AND_TIME_STAMP",
      "displayName": "DISPLAY_NAME",
      "sourceConfig": {
        "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "oracleSourceConfig": {
          "allowlist": {
            "oracleSchemas": [
              {
                "schema": "ROOT"
              }
            ]
          },
          "rejectlist": {}
        }
      },
      "destinationConfig": {
        "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
        "gcsDestinationConfig": {
          "fileRotationMb": 100,
          "fileRotationInterval": "15s"
          "avroFileFormat": {}
        }
      },
      "state": "RUNNING",
      "backfillAll": {}
    }

现在您已创建和管理数据流,确认没有与该数据流关联的错误,并且该数据流的状态为 RUNNING,接下来您可以验证它是否可以将数据从来源数据库转移到 Cloud Storage 目标存储桶中的文件夹。

验证数据流

在此过程中,您将确认 Datastream 可以执行以下操作:

  • 将与来源 Oracle 数据库的 ROOT 架构关联的所有表中的数据转移到 Cloud Storage 目标存储桶中的 /root/tutorial 文件夹。
  • 将数据转换为 Avro 文件格式。
  1. 转到 Cloud Storage 中的 Storage 浏览器页面。

    转到 Storage 浏览器页面

  2. 点击包含您的存储桶的链接。

  3. 如果对象标签页未处于活跃状态,请点击它。

  4. 点击 root 文件夹,然后点击 tutorial 文件夹。

  5. 验证您是否看到了表示来源 Oracle 数据库的 ROOT 架构的表的文件夹。

  6. 点击其中一个表文件夹并展开细目,直到您看到与该表关联的数据。

  7. 点击代表数据的文件,然后点击下载

  8. 在 Avro 工具(例如 Avro Viewer)中打开此文件,以确保内容可读。这表明 Datastream 也已将数据转换为 Avro 文件格式。

清理

完成本教程后,您可以清理在 Datastream 上创建的资源,以免这些资源占用配额,日后产生费用。以下部分介绍如何删除或关闭这些资源。

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

删除 Cloud Storage 目标存储桶

  1. 在 Cloud Storage 的左侧抽屉式导航栏中,点击浏览器项。

  2. 选中存储桶左侧的复选框,然后点击删除

  3. 在“要删除存储桶吗?”窗口的文本字段中,输入存储桶的名称,然后点击确认

删除数据流

  1. 确保您的 Cloud Shell 应用处于活跃状态。

  2. 在提示符处,输入以下命令:

    curl -X DELETE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID
  3. 验证您是否看到以下代码行:

    {
      "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/streams/STREAM_ID",
        "verb": "delete",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
  4. 确认数据流已删除。在提示符处,输入以下命令:

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams
  5. 验证是否返回了 null {} 值。这表示 Datastream 中不再有任何数据流,并且您创建的数据流已删除。

删除连接配置文件

  1. 删除与源 Oracle 数据库的连接配置文件。在提示符处,输入以下命令:

    curl -X DELETE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID
  2. 验证您是否看到以下代码行:

    {
      "name": "PROJECT_PATH/operations/operation-SOURCE_CONNECTION_PROFILE_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID",
        "verb": "delete",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
  3. 删除与 Cloud Storage 中目标存储桶的连接配置文件。在提示符处,输入以下命令:

    curl -X DELETE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID
  4. 验证您是否看到以下代码行:

    {
      "name": "PROJECT_PATH/operations/operation-DESTINATION_CONNECTION_PROFILE_OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata",
        "createTime": "DATE_AND_TIME_STAMP",
        "target": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID",
        "verb": "delete",
        "requestedCancellation": false,
        "apiVersion": "DATASTREAM_API_VERSION"
      },
      "done": false
    }
  5. 确认两个连接配置文件都已删除。在提示符处,输入以下命令:

    curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles
  6. 验证是否返回了 null {} 值。这表示 Datastream 中不再有任何连接配置文件,您创建的配置文件已删除。

后续步骤

  • 详细了解 Datastream
  • 试用其他 Google Cloud 功能。查阅我们的教程