您可以在创建 Dataproc 时激活其他组件,例如 Flink 使用 可选组件 功能。本页介绍了如何创建启用了 Apache Flink 可选组件(Flink 集群)的 Dataproc 集群,然后在集群上运行 Flink 作业。
您可以使用 Flink 集群执行以下操作:
使用 Dataproc
Jobs
资源运行 Flink 作业 从 Google Cloud 控制台、Google Cloud CLI 或 Dataproc API 登录。
创建 Dataproc Flink 集群
您可以使用 Google Cloud 控制台、Google Cloud CLI 或 Dataproc API 创建在集群上启用了 Flink 组件的 Dataproc 集群。
建议:使用带有 Flink 组件的标准单主实例虚拟机集群。 Dataproc 高可用性模式集群 (具有 3 个主虚拟机)不支持 Flink 高可用性模式。
控制台
如需使用 Google Cloud 控制台创建 Dataproc Flink 集群,请执行以下步骤:
打开 Dataproc 在 Compute Engine 上创建 Dataproc 集群页面。
- 选中设置集群面板。
- 在版本控制部分,确认或更改映像类型和版本。集群映像版本决定了
安装在集群上的 Flink 组件版本。
- 映像版本必须为 1.5 或更高版本,才能在集群上激活 Flink 组件(如需查看每个 Dataproc 映像版本中包含的组件版本的列表,请参阅支持的 Dataproc 版本)。
- 映像版本必须为 [TBD] 或更高,才能通过 Dataproc Jobs API 运行 Flink 作业(请参阅运行 Dataproc Flink 作业)。
- 在组件部分中执行以下操作:
- 在组件网关下,选择启用组件网关。您必须启用 组件网关 激活指向 Flink 历史记录服务器界面的组件网关链接。 启用组件网关也会启用 对 Flink Job Manager 网页界面的访问权限 在 Flink 集群上运行
- 在可选组件下,选择 Flink 和要在集群上激活的其他可选组件。
- 在版本控制部分,确认或更改映像类型和版本。集群映像版本决定了
安装在集群上的 Flink 组件版本。
点击自定义集群(可选)面板。
在集群属性部分,点击添加属性 每个可选 集群属性 以添加到集群您可以添加带有
flink
前缀的属性,以便在/etc/flink/conf/flink-conf.yaml
中配置 Flink 属性,这些属性将用作您在集群上运行的 Flink 应用的默认属性。示例:
- 设置
flink:historyserver.archive.fs.dir
指定 Cloud Storage 位置以写入 Flink 作业历史记录 此位置将供运行 Flink History Server 的 )。 - 使用
flink:taskmanager.numberOfTaskSlots=n
设置 Flink 任务槽。
- 设置
在自定义集群元数据部分中,点击添加元数据以添加 元数据。例如,添加
flink-start-yarn-session
true
:用于运行 Flink YARN 守护程序 (/usr/bin/flink-yarn-daemon
) 在集群主服务器实例上在后台运行 节点启动 Flink YARN 会话(请参阅 Flink 会话模式)。
如果您使用的是 Dataproc 映像 2.0 或更低版本, 点击管理安全性(可选)面板,然后在项目访问权限下, 选择
Enables the cloud-platform scope for this cluster
。 创建集群时,cloud-platform
范围默认处于启用状态 (使用 Dataproc 映像 2.1 或更高版本)。
- 选中设置集群面板。
点击创建以创建集群。
gcloud
如需使用 gcloud CLI 创建 Dataproc Flink 集群,请运行以下命令 gcloud gclid clusters create 命令行中的命令 Cloud Shell:
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=DATAPROC_IMAGE_VERSION \ --optional-components=FLINK \ --enable-component-gateway \ --properties=PROPERTIES ... other flags
注意:
- CLUSTER_NAME:指定集群的名称。
- REGION:指定集群所在的 Compute Engine 区域。
DATAPROC_IMAGE_VERSION:(可选)指定映像版本 以便在集群上使用集群映像版本决定了 安装在集群上的 Flink 组件版本。
映像版本必须为 1.5 或更高版本,才能在集群上激活 Flink 组件(如需查看每个 Dataproc 映像版本中包含的组件版本的列表,请参阅支持的 Dataproc 版本)。
映像版本必须为 [TBD] 或更高,才能通过 Dataproc Jobs API 运行 Flink 作业(请参阅运行 Dataproc Flink 作业)。
--optional-components
:您必须指定FLINK
组件,才能在集群上运行 Flink 作业和 Flink HistoryServer Web 服务。--enable-component-gateway
:您必须启用组件网关,才能激活指向 Flink 历史记录服务器界面的组件网关链接。启用组件网关也会启用对 Flink Job Manager 网页界面,运行于 Flink 集群。PROPERTIES。可选指定一个或多个集群属性。
使用如下过滤条件创建 Dataproc 集群时: 映像版本
2.0.67
+ 和2.1.15
+,则可以使用--properties
标志 在/etc/flink/conf/flink-conf.yaml
中配置 Flink 属性, 用作您在集群上运行的 Flink 应用的默认值。您可以将
flink:historyserver.archive.fs.dir
设置为 指定 Cloud Storage 位置以写入 Flink 作业历史记录 此位置将供运行 Flink History Server 的 )。多个媒体资源示例:
--properties=flink:historyserver.archive.fs.dir=gs://my-bucket/my-flink-cluster/completed-jobs,flink:taskmanager.numberOfTaskSlots=2
其他标志:
- 您可以添加可选的
--metadata flink-start-yarn-session=true
标志,以便在集群主节点上后台运行 Flink YARN 守护程序 (/usr/bin/flink-yarn-daemon
) 以启动 Flink YARN 会话(请参阅 Flink 会话模式)。
- 您可以添加可选的
使用 2.0 或更早版本的映像时,您可以将
--scopes=https://www.googleapis.com/auth/cloud-platform
旗帜 启用您的集群对 Google Cloud API 的访问权限 (请参阅范围最佳实践)。 创建集群时,cloud-platform
范围默认处于启用状态 (使用 Dataproc 映像 2.1 或更高版本)。
API
如需使用 Dataproc API 创建 Dataproc Flink 集群,请提交 clusters.create 请求,如下所示:
注意:
将 SoftwareConfig.Component 发送至
FLINK
。您可以选择设置
SoftwareConfig.imageVersion
,以指定要在集群上使用的映像版本。集群映像版本决定了集群上安装的 Flink 组件的版本。映像版本必须为 1.5 或更高版本,才能在集群上激活 Flink 组件(如需查看每个 Dataproc 映像版本中包含的组件版本的列表,请参阅支持的 Dataproc 版本)。
如需运行 Flink 作业,映像版本必须为 [TBD] 或更高版本 (请参阅运行 Dataproc Flink 作业)。
将 EndpointConfig.enableHttpPortAccess 设置为
true
,以启用组件网关与 Flink 历史记录服务器界面的链接。启用组件网关后,您还可以访问 Flink 集群上运行的 Flink Job Manager 网页界面。您可以视需要设置
SoftwareConfig.properties
指定一个或多个 集群属性。- 您可以指定 Flink 属性,这些属性将用作您在集群上运行的 Flink 应用的默认属性。例如:
您可以设置
flink:historyserver.archive.fs.dir
指定 Cloud Storage 位置以写入 Flink 作业历史记录 此位置将供运行 Flink History Server 的 )。
- 您可以指定 Flink 属性,这些属性将用作您在集群上运行的 Flink 应用的默认属性。例如:
您可以设置
您可以选择设置:
GceClusterConfig.metadata
。例如,指定flink-start-yarn-session
true
以在集群主节点上后台运行 Flink YARN 守护程序 (/usr/bin/flink-yarn-daemon
) 以启动 Flink YARN 会话(请参阅 Flink 会话模式)。- 使用 2.0 或更低版本的映像时,将 GceClusterConfig.serviceAccountScopes 设为
https://www.googleapis.com/auth/cloud-platform
(cloud-platform
范围),以便集群能够访问 Google Cloud API(请参阅范围最佳实践)。创建集群时,cloud-platform
范围默认处于启用状态 (使用 Dataproc 映像 2.1 或更高版本)。
创建 Flink 集群后
- 使用组件网关中的
Flink History Server
链接查看在 Flink 集群上运行的 Flink 历史记录服务器。 - 在组件网关中使用
YARN ResourceManager link
查看 Flink Job Manager 网页界面 在 Flink 集群上运行。 - 创建 Dataproc Persistent History Server 以查看现有和已删除的 Flink 集群写入的 Flink 作业历史记录文件。
使用 Dataproc Jobs
资源运行 Flink 作业
您可以使用 Google Cloud 控制台中的 Dataproc Jobs
资源来运行 Flink 作业,
Google Cloud 控制台、Google Cloud CLI 或 Dataproc API。
控制台
如需从控制台提交示例 Flink WordCount 作业,请执行以下操作:
在浏览器中,打开 Google Cloud 控制台中的 Dataproc 提交作业页面。
填写提交作业页面上的字段:
- 从集群列表中选择集群名称。
- 将作业类型设置为
Flink
。 - 将主类或 jar 设置为
org.apache.flink.examples.java.wordcount.WordCount
。 - 将 Jar 文件设置为
file:///usr/lib/flink/examples/batch/WordCount.jar
。file:///
表示位于集群上的文件。在创建 Flink 集群时,Dataproc 安装了WordCount.jar
。- 此字段还接受 Cloud Storage 路径
(
gs://BUCKET/JARFILE
) 或 Hadoop 分布式文件系统 (HDFS) 路径 (hdfs://PATH_TO_JAR
)。
点击提交。
- 作业驱动程序输出会显示在作业详情页面上。
- Flink 作业列在 Google Cloud 控制台中的 Dataproc 作业页面上。
- 在作业或作业详情页面中,点击停止或删除以停止或删除作业。
gcloud
如需将 Flink 作业提交到 Dataproc Flink 集群,请在本地终端窗口或 Cloud Shell 中运行 gcloud CLI gcloud dataproc jobs submit 命令。
gcloud dataproc jobs submit flink \ --cluster=CLUSTER_NAME \ --region=REGION \ --class=MAIN_CLASS \ --jar=JAR_FILE \ -- JOB_ARGS
注意:
- CLUSTER_NAME:指定 Dataproc Flink 的名称 将作业提交到哪个集群
- REGION:指定 Compute Engine 区域 集群所在的位置
- MAIN_CLASS:指定
main
类 Flink 应用,例如:org.apache.flink.examples.java.wordcount.WordCount
- JAR_FILE:指定 Flink 应用 jar 文件。您可以指定:
- 在集群上安装的 jar 文件,使用
file:///` 前缀:
file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
file:///usr/lib/flink/examples/batch/WordCount.jar
- Cloud Storage 中的 jar 文件:
gs://BUCKET/JARFILE
- HDFS 中的 jar 文件:
hdfs://PATH_TO_JAR
- 在集群上安装的 jar 文件,使用
JOB_ARGS:(可选)在双短划线 (
--
) 后面添加作业参数。提交作业后,作业驱动程序输出会显示在本地或 Cloud Shell 终端中。
Program execution finished Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished. Job Runtime: 13610 ms ... (after,1) (and,12) (arrows,1) (ay,1) (be,4) (bourn,1) (cast,1) (coil,1) (come,1)
REST
本部分介绍如何使用 Dataproc jobs.submit API 将 Flink 作业提交到 Dataproc Flink 集群。
在使用任何请求数据之前,请先进行以下替换:
- PROJECT_ID:Google Cloud 项目 ID
- REGION:集群地区
- CLUSTER_NAME:指定要将作业提交到的 Dataproc Flink 集群的名称
HTTP 方法和网址:
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit
请求 JSON 正文:
{ "job": { "placement": { "clusterName": "CLUSTER_NAME" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] } } }
如需发送您的请求,请展开以下选项之一:
您应该收到类似以下内容的 JSON 响应:
{ "reference": { "projectId": "PROJECT_ID", "jobId": "JOB_ID" }, "placement": { "clusterName": "CLUSTER_NAME", "clusterUuid": "CLUSTER_UUID" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "args": [ "1000" ], "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] }, "status": { "state": "PENDING", "stateStartTime": "2020-10-07T20:16:21.759Z" }, "jobUuid": "JOB_UUID" }
- Flink 作业列在 Google Cloud 控制台中的 Dataproc 作业页面上。
- 您可以在作业或作业详情页面中点击停止或删除 来停止或删除作业。
使用 flink
CLI 运行 Flink 作业
您可以使用 flink
CLI 在 Flink 集群的主节点上运行 Flink 作业,而不是使用 Dataproc Jobs
资源运行 Flink 作业。
以下部分介绍了在 Dataproc Flink 集群上运行 flink
CLI 作业的不同方法。
通过 SSH 连接到主节点:使用 SSH 实用程序打开终端窗口。
设置类路径:从 Flink 集群主服务器虚拟机:
export HADOOP_CLASSPATH=$(hadoop classpath)
运行 Flink 作业:您可以在不同的环境中运行 Flink 作业 YARN 上的部署模式:应用模式、每个作业模式和会话模式。
应用模式:Dataproc 映像 2.0 及更高版本支持 Flink 应用模式。 此模式会在 YARN 作业管理器上执行作业的
main()
方法。集群将关闭 。作业提交示例:
flink run-application \ -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=2048m \ -Djobmanager.heap.mb=820 \ -Dtaskmanager.heap.mb=1640 \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dparallelism.default=4 \ /usr/lib/flink/examples/batch/WordCount.jar
列出正在运行的作业:
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
取消正在运行的作业:
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
按作业模式:此 Flink 模式会在
main()
客户端。作业提交示例:
flink run \ -m yarn-cluster \ -p 4 \ -ys 2 \ -yjm 1024m \ -ytm 2048m \ /usr/lib/flink/examples/batch/WordCount.jar
会话模式:启动长时间运行的 Flink YARN 会话,然后将一个或多个作业提交到该会话。
启动会话:您可以在以下任一位置启动 Flink 会话: 方式:
创建 Flink 集群,将
--metadata flink-start-yarn-session=true
标志添加到gcloud dataproc clusters create
命令(请参阅创建 Dataproc Flink 集群)。带有此标志 创建集群后,Dataproc 将/usr/bin/flink-yarn-daemon
,用于在集群上启动 Flink 会话。会话的 YARN 应用 ID 保存在
/tmp/.yarn-properties-${USER}
中。 您可以使用yarn application -list
命令列出该 ID。运行 Flink
yarn-session.sh
脚本(已预安装在集群主虚拟机上),并使用自定义设置:自定义设置示例:
/usr/lib/flink/bin/yarn-session.sh \ -s 1 \ -jm 1024m \ -tm 2048m \ -nm flink-dataproc \ --detached
使用默认设置运行 Flink
/usr/bin/flink-yarn-daemon
封装容器脚本:. /usr/bin/flink-yarn-daemon
向会话提交作业:运行以下命令可向会话提交 Flink 作业。
flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
- FLINK_MASTER_URL:用于执行作业的 Flink 主虚拟机(包括主机和端口)的网址。从以下位置移除
http:// prefix
: 。当您启动 Flink 会话。您可以运行以下命令,在Tracking-URL
字段中列出此网址:
yarn application -list -appId=<yarn-app-id> | sed 's#http://##' ```
- FLINK_MASTER_URL:用于执行作业的 Flink 主虚拟机(包括主机和端口)的网址。从以下位置移除
列出会话中的作业:如需列出会话中的 Flink 作业,请执行以下任一操作:
在不使用参数的情况下运行
flink list
。该命令会在/tmp/.yarn-properties-${USER}
中查找会话的 YARN 应用 ID。从
/tmp/.yarn-properties-${USER}
或yarn application -list
的输出中获取会话的 YARN 应用 ID,然后运行<code>
flink list -yid YARN_APPLICATION_ID。运行
flink list -m FLINK_MASTER_URL
。
停止会话:如需停止会话,请获取 YARN 应用 ID 从
/tmp/.yarn-properties-${USER}
开始会话或yarn application -list
,然后运行以下任一命令:echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
yarn application -kill YARN_APPLICATION_ID
在 Flink 上运行 Apache Beam 作业
您可以使用 FlinkRunner
在 Dataproc 上运行 Apache Beam 作业。
您可以通过以下方式在 Flink 上运行 Beam 作业:
- Java Beam 作业
- 可移植的 Beam 作业
Java Beam 作业
将 Beam 作业打包成一个 JAR 文件。为捆绑的 JAR 文件提供运行作业所需的依赖项。
以下示例从 Dataproc 集群的主节点运行 Java Beam 作业。
创建启用了 Flink 组件的 Dataproc 集群。
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
--optional-components
:Flink。--image-version
:集群的映像版本,用于确定集群上安装的 Flink 版本(例如,请参阅针对最近 4 个 2.0.x 映像发布版本列出的 Apache Flink 组件版本)。--region
:受支持的 Dataproc 区域。--enable-component-gateway
:启用对 Flink Job Manager 界面的访问权限。--scopes
:启用您的集群对 Google Cloud API 的访问权限 (请参阅范围最佳实践)。cloud-platform
范围默认处于启用状态(您不需要包含 此标志设置) (使用 Dataproc 映像 2.1 或更高版本)。
使用 SSH 实用程序 在 Flink 集群主服务器节点上打开一个终端窗口。
在 Dataproc 集群主节点上启动 Flink YARN 会话。
. /usr/bin/flink-yarn-daemon
记下 Dataproc 集群上的 Flink 版本。
flink --version
在本地机器上,使用 Java 生成规范的 Beam 字数统计示例。
选择与 Dataproc 集群上的 Flink 版本兼容的 Beam 版本。请参阅 Flink 版本兼容性 列出了 Beam-Flink 版本兼容性的表
打开生成的 POM 文件。检查
<flink.artifact.name>
标记指定的 Beam Flink 运行程序版本。如果 Flink 工件名称中的 Beam Flink 运行程序版本与您的集群上的 Flink 版本不匹配,请更新版本号以使其相匹配。mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
打包字数统计示例。
mvn package -Pflink-runner
将打包的超级 JAR 文件
word-count-beam-bundled-0.1.jar
(约 135 MB)上传到您的 Dataproc 集群的主节点。您可以使用gcloud storage cp
加快从 Cloud Storage 向 Dataproc 集群传输文件的速度。在本地终端上,创建一个 Cloud Storage 存储桶,然后上传超级 JAR。
gcloud storage buckets create BUCKET_NAME
gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
在 Dataproc 的主节点上,下载超级 JAR。
gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
在 Dataproc 集群的主节点上运行 Java Beam 作业。
flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \ --runner=FlinkRunner \ --output=gs://BUCKET_NAME/java-wordcount-out
检查结果是否已写入 Cloud Storage 存储桶。
gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
停止 Flink YARN 会话。
yarn application -list
yarn application -kill YARN_APPLICATION_ID
可移植的 Beam 作业
如需运行以 Python、Go 和其他支持的语言编写的 Beam 作业,您可以按照 Beam 的 Flink 运行程序页面中的说明使用 FlinkRunner
和 PortableRunner
(另请参阅可移植性框架路线图)。
以下示例从 Dataproc 集群的主节点运行以 Python 编写的可移植 Beam 作业。
创建同时启用了 Flink 和 Docker 组件的 Dataproc 集群。
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK,DOCKER \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
注意:
--optional-components
:Flink 和 Docker。--image-version
:集群的映像版本。 这决定了集群上安装的 Flink 版本(例如, 请查看列出的 Apache Flink 组件版本, 四个 2.0.x 映像发布版本)。--region
:可用的 Dataproc 区域。--enable-component-gateway
:启用对 Flink Job Manager 界面的访问权限。--scopes
:启用您的集群对 Google Cloud API 的访问权限 (请参阅范围最佳实践)。cloud-platform
范围默认处于启用状态(您不需要包含 此标志设置) (使用 Dataproc 映像 2.1 或更高版本)。
在本地或在 Cloud Shell,用于创建 Cloud Storage 存储桶。您将指定BUCKET_NAME 。
gcloud storage buckets create BUCKET_NAME
在集群虚拟机的终端窗口中,启动 Flink YARN 会话。 记下 Flink 主网址,即执行作业的 Flink 主实例的地址。在执行以下操作时,您需要指定FLINK_MASTER_URL 运行示例 WordCount 程序。
. /usr/bin/flink-yarn-daemon
显示并记下运行 Dataproc 集群的 Flink 版本。在执行以下操作时,您需要指定FLINK_VERSION 运行示例 WordCount 程序。
flink --version
在 集群主服务器节点。
安装 Beam 版本 与集群上的 Flink 版本兼容的兼容版本。
python -m pip install apache-beam[gcp]==BEAM_VERSION
在集群主服务器实例上运行单词数示例 节点。
python -m apache_beam.examples.wordcount \ --runner=FlinkRunner \ --flink_version=FLINK_VERSION \ --flink_master=FLINK_MASTER_URL --flink_submit_uber_jar \ --output=gs://BUCKET_NAME/python-wordcount-out
注意:
--runner
:FlinkRunner
。--flink_version
:之前提到的 FLINK_VERSION。--flink_master
:之前提到的 FLINK_MASTER_URL。--flink_submit_uber_jar
:使用超级 JAR 来执行 Beam 作业。--output
:BUCKET_NAME(先前创建)。
验证结果是否已写入存储桶。
gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
停止 Flink YARN 会话。
- 获取应用 ID。
yarn application -list
1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.yarn application -kill
在 Kerberos 化的集群上运行 Flink
Dataproc Flink 组件支持 Kerberos 化的集群。需要有效的 Kerberos 票据才能提交和保留 Flink 作业,或启动 一个 Flink 集群。默认情况下,Kerberos 票据有效期为 7 天。
访问 Flink Job Manager 界面
当 Flink 作业或 Flink 会话集群运行时,您可以使用 Flink Job Manager 网页界面。如需使用网页界面,请执行以下操作:
- 创建 Dataproc Flink 集群。
- 创建集群后,点击组件网关 集群详情页面上“网络界面”标签页上的 YARN ResourceManager 链接 Google Cloud 控制台中。
- 在 YARN Resource Manager 界面中,识别 Flink 集群应用条目。根据作业的完成状态,系统将列出 ApplicationMaster 或 History 链接。
- 对于长时间运行的流式传输作业,请点击 ApplicationManager 链接以打开 Flink 信息中心;对于已完成的作业,请点击 History 链接以查看作业详情。