比较 Cloud Dataflow 流水线中用于预测的机器学习模型

Last reviewed 2018-09-07 UTC

该解决方案介绍并比较了在 Dataflow 流水线中调用机器学习模型的不同设计方法,并探讨了选择各个方法所涉及的利弊。我们展示了一系列实验的结果,旨在探究不同的方法并说明这些利弊(针对批处理和流处理流水线)。该解决方案适合希望将经过训练的模型集成到数据处理流水线的人员,不适合想要构建机器学习模型的数据科学家。

简介

作为将此机器学习模型集成到 Dataflow 流水线的负责人,您可能想知道有哪些不同的方法,以及哪种方法最适合系统要求。您需要注意以下几个方面,例如:

  • 吞吐量
  • 延迟时间
  • 费用
  • 实现
  • 维护

要对这几个方面进行权衡并非易事,但此解决方案可以帮助您根据优先级做出决策。该解决方案比较了在批量数据和流式数据流水线中使用经过 TensorFlow 训练的机器学习 (ML) 模型进行预测的三种方法:

  • 将已部署的模型用作流处理流水线的 REST/HTTP API。
  • 对于批处理流水线,使用 AI Platform 批量预测作业。
  • 对于批量处理和流式处理流水线,使用 Dataflow 直接模型预测。

所有实验都使用经过训练的现有模型(称为 Natality 数据集),该模型根据各种输入预测婴儿体重。由于此解决方案的目标不是构建模型,因此它不探讨如何构建或训练模型。要详细了解 Natality 数据集,请参阅后续步骤部分。

平台

您可以通过多种方法运行数据流水线并调用经过训练的机器学习模型。但功能要求都是一样的:

  1. 从有界限(批量)或无界限(流式)来源提取数据。可从中提取数据的某些来源包括传感器数据、网站互动和金融交易等。
  2. 调用机器学习模型来进行预测,从而转换和丰富输入数据。例如,解析 JSON 文件以提取相关字段,以此预测维护日期、提出产品建议或检测欺诈。
  3. 存储转换后的数据和预测以进行分析或备份,或者传递到队列系统以触发新事件或其他流水线。例如,实时检测潜在欺诈或将维护时间表信息存储在可通过信息中心访问的存储空间中。

在批量 ETL 过程中通过预测来转换和丰富数据时,您的目标是尽可能提高吞吐量,以缩短整个数据批次所需的总时间。另一方面,当您处理用于在线预测的流式数据时,您的目标是尽可能缩短延迟时间,以(近乎)实时地接收每个预测。因此,调用模型可能会成为瓶颈。

核心组件

此解决方案中的批量和流处理实验使用三种主要技术:

  • Dataflow 上运行的 Apache Beam,用于处理数据。
  • TensorFlow,用于实现和训练机器学习模型。
  • 某些实验还会使用 AI Platform 作为经过训练的机器学习模型的托管平台,以执行批量预测和在线预测。

在此解决方案中,我们选择 Dataflow 上运行的 Apache Beam 运行数据流水线,原因如下:

  • Apache Beam 是一种统一的开源编程模型,既可以运行流式数据处理作业,又可以运行批量数据处理作业。
  • Dataflow 是一种 Google Cloud 产品,无需服务器即可运行 Apache Beam 作业。

TensorFlow 是 Google 的一个开源数学库,可用作机器学习框架。TensorFlow 支持在单个机器或分布式环境中构建、训练和投放模型。模型可移植到各种设备,还可以利用可用的 CPUGPUTPU 资源,进行训练和投放。

AI Platform 是一个无服务器平台,可以大规模训练、调整(使用超参数调整功能)及提供 TensorFlow 模型,并且只需少量 DevOps 管理。AI Platform 支持将经过训练的模型部署为用于在线预测的 REST API,还支持提交批量预测作业。AI Platform 是可以将您的模型处理为微服务的几种方式之一。

此解决方案中详细介绍的各个方法将 Dataflow 用于数据处理流水线,并使用 AI Platform 将模型作为 HTTP 端点托管。但是,可用其他技术替代这些方法。HTTP 和直接 TensorFlow 模型之间的性能比较不会有太大的变化。

处理批量数据和流式数据

该解决方案中的实验包括批量和流式用例。每个实验利用不同的 Google Cloud 产品进行输入和输出,因为无界限和有界限源的操作要求不同。

批处理有界限数据集

图 1 显示,在典型的批处理流水线中,原始输入数据存储在对象存储空间中(例如 Cloud Storage)。结构化数据存储格式包括逗号分隔值 (CSV)、Optimized Row Columnar (ORC)、Parquet 或 Avro。如果数据来自数据库或日志,通常会使用这些格式。

典型的批处理流水线的架构
图 1.批处理架构

一些分析平台(如 BigQuery)除了提供查询功能,还提供存储空间。BigQuery 使用 Capacitor 进行存储。除了批处理流水线中的其他存储选项之外,Dataflow 上的 Apache Beam 还可以在 BigQuery 和 Cloud Storage 中读取和写入数据。

流处理无界限数据流

对于流处理,数据处理流水线的输入通常是消息传递系统(如图 2 所示)。Pub/Sub 或 Kafka 等技术通常用于提取 JSON、CSV 或 protobuf 格式的各数据点。

典型的流处理流水线的架构
图 2.流处理架构

通过使用数据选取函数执行临时事件处理,可以单独处理数据点,也可以将数据点作为组进行微批处理。经过处理的数据可能会转到多个目标位置,包括:

  1. BigQuery,用于临时分析(通过流处理 API)。
  2. Bigtable,用于提供实时信息。
  3. Pub/Sub 主题,用于触发后续流程/流水线。

对于有界限和无界限数据源接收器,您可以在 Apache Beam I/O 页面上找到其源连接器(输入)和接收器连接器(输出)的完整列表。

调用 TensorFlow 模型

您可以通过三种方法调用经过 TensorFlow 训练的模型:

  1. 通过 HTTP 端点,用于在线预测。
  2. 直接使用已保存的模型文件,用于批量预测和在线预测。
  3. 通过 AI Platform 批量预测作业,用于批量预测。

用于在线预测的 HTTP 端点

TensorFlow 模型被部署为要调用的 HTTP 端点,并通过流式数据处理流水线或通过客户端应用提供实时预测。

您可以使用 TensorFlow Serving 或任何其他托管服务(如 Seldon)将 TensorFlow 模型部署为用于在线预测的 HTTP 端点。如图 3 所示,您可以选择以下任一方式:

  1. 在一个或多个 Compute Engine 实例上自行部署模型。
  2. Compute EngineGoogle Kubernetes Engine 上使用 Docker 映像
  3. 利用 Kubeflow 辅助在 Kubernetes 或 Google Kubernetes Engine 上进行部署。
  4. 将 App Engine 与 Endpoints 结合使用,以在网页应用中托管模型。
  5. 使用 AI Platform(Google Cloud 上的全代管式机器学习训练和供应服务)。
Dataflow 中可用于将模型处理为 HTTP 端点的不同选项
图 3.Dataflow 中可用于将模型投放为 HTTP 端点的不同方式

AI Platform 是一项全代管式服务,因此它比其他服务更容易实现。所以在实验中,我们选择通过该服务来将模型用作 HTTP 端点。这样一来,我们可以关注 AI Platform 中直接模型与 HTTP 端点的性能对比,而不是比较不同的 HTTP 模型投放方式。

使用 AI Platform Prediction 提供在线预测

要提供在线预测,需要完成两项任务:

  1. 部署模型。
  2. 与部署的模型交互以进行推断(即进行预测)。

使用 AI Platform Prediction 将模型部署为 HTTP 端点需要执行以下步骤:

  1. 确保 Cloud Storage 上提供了经过训练的模型文件。
  2. 使用 gcloud ml-engine models create 命令创建模型。
  3. 使用 gcloud ml-engine versions create 命令和 Cloud Storage 上的模型文件部署模型版本。

您可以使用以下命令部署模型:


PROJECT="[PROJECT_ID]" # change to your project name
REGION="[REGION]"
BUCKET="[BUCKET]" # change to your bucket name
MODEL_NAME="babyweight_estimator" # change to your estimator name
MODEL_VERSION="v1" # change to your model version
MODEL_BINARIES=gs://${BUCKET}/models/${MODEL_NAME}

# upload the local SavedModel to GCS
gsutil -m cp -r model/trained/v1/* gs://${BUCKET}/models/${MODEL_NAME}

# set the current project
gcloud config set project ${PROJECT}

# list model files on GCS
gsutil ls ${MODEL_BINARIES}

# deploy model to GCP
gcloud ml-engine models create ${MODEL_NAME} --regions=${REGION}

# deploy model version
gcloud ml-engine versions create ${MODEL_VERSION} --model=${MODEL_NAME} --origin=${MODEL_BINARIES} --runtime-version=1.4

该代码在 Google Cloud 项目中创建名为 babyweight_estimator 的 AI Platform Prediction 模型,模型版本为 v1。

部署了模型之后,您可以调用该模型。以下 Python 代码展示了一种在 AI Platform Prediction 中将模型版本作为 REST API 进行调用的方法:

cmle_api = None

def init_api():

    global cmle_api

    if cmle_api is None:
        cmle_api = discovery.build('ml', 'v1',
                              discoveryServiceUrl='https://storage.googleapis.com/cloud-ml/discovery/ml_v1_discovery.json',
                              cache_discovery=True)

def estimate_cmle(instances):
    """
    Calls the babyweight estimator API on CMLE to get predictions

    Args:
       instances: list of json objects
    Returns:
        int - estimated baby weight
    """
    init_api()

    request_data = {'instances': instances}

    model_url = 'projects/{}/models/{}/versions/{}'.format(PROJECT, CMLE_MODEL_NAME, CMLE_MODEL_VERSION)
    response = cmle_api.projects().predict(body=request_data, name=model_url).execute()
    values = [item["predictions"][0] for item in response['predictions']]
    return values

如果您的大型数据集可用于 BigQuery 或 Cloud Storage 等工具,并且您希望尽可能提高整个过程的吞吐量,对于批量预测,则不建议将机器学习模型投放为 HTTP 端点。此操作会为每个数据点生成一个 HTTP 请求,从而生成大量的 HTTP 请求。以下部分介绍了用于批量预测的更好的方法。

用于批量预测和在线预测的直接模型

直接模型预测技术利用 Dataflow 实例上的本地 TensorFlow SavedModel。保存的模型是在 TensorFlow 模型完成构建和训练后创建的输出文件的副本。TensorFlow SavedModel

  • 可以作为 Dataflow 作业提交的部分流水线源代码。
  • 可以从 Cloud Storage 下载,如图 4 所示。
Dataflow 中的直接模型预测
图 4.Dataflow 中的直接模型预测

在此解决方案中,我们使用 SavedModel,它是 GitHub 上源代码的一部分。要在实例上加载模型,请执行以下操作:

  1. 创建 Dataflow 作业时,请指定要加载的流水线依赖项(包括模型文件)。以下 Python 代码显示了 setup.py 文件,其中包含要与 Dataflow 作业一起提交的模型文件。

    import setuptools
    
    requirements = []
    
    setuptools.setup(
        name='TF-DATAFLOW-DEMO',
        version='v1',
        install_requires=requirements,
        packages=setuptools.find_packages(),
        package_data={'model': ['trained/*',
                                'trained/v1/*',
                                'trained/v1/variables/*']
                      },
    )
  2. 在流水线中调用本地模型文件。此操作会针对给定实例生成预测。以下 Python 代码展示了如何执行此操作。

    predictor_fn = None
    
    def init_predictor():
        """ Loads the TensorFlow saved model to the predictor object
    
        Returns:
            predictor_fn
        """
    
        global predictor_fn
    
        if predictor_fn is None:
    
            logging.info("Initialising predictor...")
            dir_path = os.path.dirname(os.path.realpath(__file__))
            export_dir = os.path.join(dir_path, SAVED_MODEL_DIR)
    
            if os.path.exists(export_dir):
                predictor_fn = tf.contrib.predictor.from_saved_model(
                    export_dir=export_dir,
                    signature_def_key="predict"
                )
            else:
                logging.error("Model not found! - Invalid model path: {}".format(export_dir))
    
    def estimate_local(instances):
        """
        Calls the local babyweight estimator to get predictions
    
        Args:
           instances: list of json objects
        Returns:
            int - estimated baby weight
        """
    
        init_predictor()
    
        inputs = dict((k, [v]) for k, v in instances[0].items())
        for i in range(1,len(instances)):
            instance = instances[i]
    
            for k, v in instance.items():
                inputs[k] += [v]
    
        values = predictor_fn(inputs)['predictions']
        return [value.item() for value in values.reshape(-1)]

如需了解详情,请参阅 Apache Beam 多个文件依赖项页面。

AI Platform 批量预测作业

除了将模型部署为 HTTP 端点之外,AI Platform 还允许您使用 Cloud Storage 中已部署的模型版本或 TensorFlow SavedModel 来运行批量预测作业

AI Platform 批量预测作业将输入数据文件的 Cloud Storage 位置作为参数。它使用模型获取该数据的预测,然后将预测结果存储在另一个 Cloud Storage 输出位置(该位置也作为参数提供)。以下示例展示了用于提交 AI Platform 批量预测作业的 gcloud 命令。

BUCKET='<BUCKET>'
DATA_FORMAT="TEXT"
INPUT_PATHS=gs://${BUCKET}/data/babyweight/experiments/outputs/data-prep-*
OUTPUT_PATH=gs://${BUCKET}/data/babyweight/experiments/outputs/cmle-estimates
MODEL_NAME='babyweight_estimator'
VERSION_NAME='v1'
REGION='<REGION>'
now=$(date +"%Y%m%d_%H%M%S")
JOB_NAME="batch_predict_$MODEL_NAME$now"
MAX_WORKER_COUNT="20"

gcloud ml-engine jobs submit prediction $JOB_NAME \
    --model=$MODEL_NAME \
    --input-paths=$INPUT_PATHS \
    --output-path=$OUTPUT_PATH \
    --region=$REGION \
    --data-format=$DATA_FORMAT \
    --max-worker-count=$MAX_WORKER_COUNT

用于在线预测的逐点与微批处理

在实时预测流水线中,无论您是将模型投放为 HTTP 端点还是直接从工作器使用模型,您都可以通过以下两种方法获取传入数据点的预测:

  • 单个点。将每个数据点单独发送到模型并获得预测,这是一种明显可行的方法。
  • 微批次。更好的方法是使用数据选取函数来创建微批次,在特定时间段内(例如每 5 秒)对数据点进行分组。然后将微批次发送到模型,一次获得所有实例的预测。

以下 Python 代码显示了如何在 Apache Beam 流水线中使用数据选取函数创建基于时间的微批次。

def run_pipeline_with_micro_batches(inference_type, project,
                                    pubsub_topic, pubsub_subscription,
                                    bq_dataset, bq_table,
                                    window_size, runner, args=None):

    prepare_steaming_source(project, pubsub_topic, pubsub_subscription)
    prepare_steaming_sink(project, bq_dataset, bq_table)
    pubsub_subscription_url = "projects/{}/subscriptions/{}".format(project, pubsub_subscription)
    options = beam.pipeline.PipelineOptions(flags=[], **args)

    pipeline = beam.Pipeline(runner, options=options)
    (
            pipeline
            | 'Read from PubSub' >> beam.io.ReadStringsFromPubSub(subscription=pubsub_subscription_url, id_label="source_id")
            | 'Micro-batch - Window Size: {} Seconds'.format(window_size) >> beam.WindowInto(FixedWindows(size=window_size))
            | 'Estimate Targets - {}'.format(inference_type) >> beam.FlatMap(lambda messages: estimate(messages, inference_type))
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(project=project,
                                                             dataset=bq_dataset,
                                                             table=bq_table
                                                             )
    )

    pipeline.run()

微批处理方法使用部署为 HTTP 端点的模型,这样可以大大减少 HTTP 请求的数量并缩短延迟时间。即使在将微批处理技术与直接模型结合使用的情况下,由于矢量化操作,向模型发送具有 N 个实例的张量用于预测比发送长度为 1 的张量更高效。

批量实验

在批量实验中,我们希望使用 TensorFlow 回归模型估算 BigQuery 中 Natality 数据集中的婴儿体重。然后,我们希望使用 Dataflow 批处理流水线将 Cloud Storage 中的预测结果另存为 CSV 文件。以下部分介绍了我们为完成这项任务所尝试的不同实验。

方法 1:Dataflow 结合直接模型预测

在此方法中,Dataflow 工作器托管 TensorFlow SavedModel,您可以在批处理流水线中为每条记录直接调用该模型以进行预测。图 5 展示了这种方法的概要架构。

批量方法 1:Dataflow 结合直接模型预测
图 5.批量方法 1:Dataflow 结合直接模型预测

Dataflow 流水线会执行以下步骤:

  1. 读取来自 BigQuery 的数据。
  2. 准备 BigQuery 记录,用于预测。
  3. 调用本地 TensorFlow SavedModel 以获取每条记录的预测。
  4. 将结果(输入记录和估算的婴儿体重)转换为 CSV 文件。
  5. 将 CSV 文件写入 Cloud Storage。

此方法不会调用远程服务,例如 AI Platform 上作为 HTTP 端点的已部署模型。通过利用 TensorFlow SavedModel 在每个 Dataflow 工作器内本地完成预测。

方法 2:Dataflow 结合 AI Platform 批量预测

在此方法中,TensorFlow SavedModel 存储在 Cloud Storage 中,供 AI Platform 用于预测。不过,与前一个方法不同的是,此方法没有对每个记录的已部署模型执行 API 调用,而是准备用于预测的数据,并批量提交这些数据。

此方法包含两个阶段:

  1. Dataflow 准备来自 BigQuery 的数据以用于预测,然后将这些数据存储在 Cloud Storage 中。
  2. 将 AI Platform 批量预测作业与准备好的数据一起提交,预测结果存储在 Cloud Storage 中。

图 6 展示了这种两阶段方法的总体架构。

批量方法 2:Dataflow 结合 AI Platform 批量预测
图 6.批量方法 2:Dataflow 结合 AI Platform 批量预测

工作流步骤(包括 Dataflow 流水线)如下:

  1. 读取来自 BigQuery 的数据。
  2. 准备 BigQuery 记录,用于预测。
  3. 将 JSON 数据写入 Cloud Storage。模型中的 serving_fn 函数需要将 JSON 实例作为输入。
  4. 将 AI Platform 批量预测作业与 Cloud Storage 中准备好的数据一起提交。此作业也会将预测结果写入 Cloud Storage。

Dataflow 作业会准备数据用于预测,而不是提交 AI Platform 预测作业。换言之,数据准备任务和批量预测任务没有紧密耦合。Cloud Functions、Airflow 或任何调度器都可以通过执行 Dataflow 作业并提交 AI Platform 作业进行批量预测来编排工作流。

如果您的数据符合以下条件,建议使用 AI Platform 批量预测来提高性能和易用性:

  • 您的数据在 Cloud Storage 中可用,这些数据采用预测所需的格式,通过之前的数据提取过程获得。
  • 您无法控制工作流的第一个阶段,例如 Dataflow 流水线(准备 Cloud Storage 中的数据以用于预测)。

实验配置

我们在三个实验中使用了以下配置:

  • 数据大小:10K100K1M10M
  • Cloud Storage 类:Regional Storage
  • Cloud Storage 位置:europe-west1-b
  • Dataflow 地区:europe-west1-b
  • Dataflow 工作器机器类型:n1-standard-1
  • Dataflow 自动扩缩(针对多达 100 万条记录的批量数据)
  • Dataflow num_worker20(针对多达 1000 万条记录的批量数据)
  • AI Platform 批量预测 max-worker-count 设置:20

Cloud Storage 位置应与 Dataflow 地区相同。该解决方案使用 europe-west1-b 地区作为任意值。

结果

下表总结了使用不同大小的数据集执行批量预测和直接模型预测的结果(耗时)。

批量数据大小 指标 Dataflow,然后 AI Platform 批量预测 Dataflow 结合直接模型预测
10K 行 运行时长 15 分 30 秒

(Dataflow:7 分 47 秒 +
AI Platform:7 分 43 秒)
8 分 24 秒
vCPU 总运行时长 0.301 小时

(Dataflow:0.151 小时 +
AI Platform:0.15 小时)
0.173 小时
100K 行 运行时长 16 分 37 秒

(Dataflow:8 分 39 秒 +
AI Platform:7 分 58 秒)
10 分 31 秒
vCPU 总运行时长 0.334 小时

(Dataflow:0.184 小时 +
AI Platform:0.15 小时)
0.243 小时
1M 行 运行时长 21 分 11 秒
(Dataflow:11 分 07 秒 +
AI Platform:10 分 04 秒)
17 分 12 秒
vCPU 总运行时长 0.446 小时

(Dataflow:0.256 小时 +
AI Platform:0.19 小时)
1.115 小时
10M 行 运行时长 33 分 08 秒
(Dataflow:12 分 15 秒 +
AI Platform:20 分 53 秒)
25 分 02 秒
vCPU 总运行时长 5.251 小时

(Dataflow:3.581 小时 +
AI Platform:1.67 小时)
7.878 小时

图 7 显示了这些结果的图表。

针对 4 种不同数据集大小采用 3 种方法的耗时图
图 7.针对 4 种不同数据集大小采用 3 种方法的耗时图

如结果所示,如果数据已在 Cloud Storage 中并采用预测所用的格式,则 AI Platform 批量预测作业本身可以花费较少的时间来为输入数据生成预测。但是,如果批量预测作业与预处理步骤(将数据从 BigQuery 提取到 Cloud Storage 并准备数据用于预测)和后处理步骤(将数据存储回 BigQuery)相结合,直接模型方法的端到端执行时长更短。此外,直接模型预测方法的性能可以通过使用微批处理进一步优化(我们稍后将在介绍流处理实验时对此进行说明)。

流实验

在流处理实验中,Dataflow 流水线从 Pub/Sub 主题读取数据点并使用流处理 API 将数据写入 BigQuery。Dataflow 流处理流水线使用 TensorFlow 婴儿体重估算模型处理数据并获得预测。

该主题以预定义每秒事件速率接收来自可生成数据点(这些数据点是用于估算婴儿体重的实例)的流模拟器的数据。这样可模拟无界限数据源的真实示例。以下 Python 代码模拟了发送到 Pub/Sub 主题的数据流。

client = pubsub.Client(project=PARAMS.project_id)
topic = client.topic(PARAMS.pubsub_topic)
if not topic.exists():
    print 'Topic does not exist. Please run a stream pipeline first to create the topic.'
    print 'Simulation aborted.'

    return

for index in range(PARAMS.stream_sample_size):

    message = send_message(topic, index)

    # for debugging
    if PARAMS.show_message:
        print "Message {} was sent: {}".format(index+1, message)
        print ""

    time.sleep(sleep_time_per_msg)

方法 1:Dataflow 结合 AI Platform 在线预测

在此方法中,TensorFlow 模型作为 REST API 在 AI Platform 中进行部署和托管。Dataflow 流处理流水线为从 Pub/Sub 使用的每条消息调用 API 以获取预测。该方法的概要架构如图 8 所示。

流方法 1:Dataflow 结合 AI Platform 在线预测
图 8.流方法 1:Dataflow 结合 AI Platform 在线预测。HTTP 请求可能包括单个数据点或微批次中的一组数据点。

在此方法中,Dataflow 流水线执行以下步骤:

  1. 从 Pub/Sub 主题中读取消息。
  2. 向 AI Platform 模型的 API 发送 HTTP 请求以获取每条消息的预测。
  3. 使用流处理 API 将结果写入 BigQuery。

微批处理是一种更好的方法。也就是说,对于从 Pub/Sub 读取的每条消息,Dataflow 不会向模型的 REST API 发送 HTTP 请求,而是将 1 秒时间范围内收到的消息进行分组。然后,它在单个 HTTP 请求中将这组消息作为微批次发送给模型的 API。在此方法中,Dataflow 流水线执行以下步骤:

  1. 从 Pub/Sub 主题中读取消息。
  2. 应用时长 1 秒的数据选取操作,以创建微批次消息。
  3. 使用微批次方式将 HTTP 请求发送给 AI Platform 模型的 API,以获取消息的预测。
  4. 使用流处理 API 将结果写入 BigQuery。

这种方法的基本原理是:

  1. 减少对远程服务(例如 AI Platform 模型)的调用次数。
  2. 缩短处理每条消息的平均延迟时间。
  3. 缩短流水线的总体处理时间。

在此实验中,时间范围设置为 1 秒。但是,微批次大小(即作为一个批次发送到 AI Platform 模型的消息数量)会有所不同。微批次大小取决于消息生成频率(每秒的消息数)。

以下部分介绍了采用三种不同频率的实验:每秒 50 条、每秒 100 条和每秒 500 条消息。也就是说,微批次大小为 50、100 和 500。

方法 2:Dataflow 结合直接模型预测

该方法类似于批量实验中使用的方法。Dataflow 工作器托管 TensorFlow SavedModel,您可以在流处理流水线中为每条记录调用该模型用于预测。图 9 展示了这种方法的概要架构。

流方法 2:Dataflow 结合直接模型预测
图 9.流方法 2:Dataflow 结合直接模型预测

在此方法中,Dataflow 流水线执行以下步骤:

  1. 从 Pub/Sub 主题中读取消息。
  2. 调用本地 TensorFlow SavedModel 以获取每条记录的预测。
  3. 使用流处理 API 将结果写入 BigQuery。

微批处理技术也可以在流处理流水线中结合直接模型预测方法使用。我们可以向模型发送 N 个数据实例的张量(N 等于在 Dataflow 时间范围内接收到的消息数),而不是发送一个数据实例的张量。该技术使用 TensorFlow 模型的矢量化操作,可以并行获得多个预测。

实验配置

在这些实验中,我们使用以下配置:

  • 流式数据大小:10K records (messages)
  • 模拟的每秒消息数 (MPS):50100500
  • 时间范围(在微批次实验中):1 second
  • Dataflow 地区:europe-west1-b
  • Dataflow 工作器机器类型:n1-standard-1
  • Dataflow num_worker5(无自动扩缩功能)
  • AI Platform 模型 API 节点:3 (manualScale)

结果

下表总结了使用不同数据量(每秒消息数)执行流式传输实验的结果。消息频率是指每秒发送的消息数,而模拟时长是指发送所有消息的时长

流式消息频率 指标 Dataflow 结合 AI Platform 在线预测   Dataflow 结合直接模型预测  
    单条消息 微批处理 单条消息 微批处理
每秒 50 条

(模拟时长:3 分 20 秒)
总时长 9 分 34 秒 7 分 44 秒 3 分 43 秒 3 分 22 秒
每秒 100 条

(模拟时长:1 分 40 秒)
总时长 6 分 03 秒 4 分 34 秒 1 分 51 秒 1 分 41 秒
每秒 500 条

(模拟时长:20 秒)
总时长 无(默认 AI Platform 在线预测配额) 2 分 47 秒 1 分 23 秒 48 秒

图 10 显示了这些结果的图表。

显示了针对不同方法和频率的耗时图
图 10.针对不同方法和频率的耗时图

如结果所示,微批处理技术提高了 AI Platform 在线预测和直接模型预测的执行性能。此外,与调用外部 REST/HTTP API 进行在线预测相比,将直接模型与流处理流水线搭配使用可将性能提升 2 倍到 4 倍。

总结

根据上述方法和实验结果,我们推荐以下方法。

批处理

  • 如果要构建批量数据处理流水线,并且希望在流水线中进行预测,请使用直接模型方法以获得最佳性能。
  • 通过在调用本地模型进行预测之前创建微批次数据点,以利用矢量化操作的并行化,提高直接模型方法的性能。
  • 如果您的数据采用预测所需的格式填充到 Cloud Storage,请使用 AI Platform 批量预测以获得最佳性能。
  • 如果要使用 GPU 的功能进行批量预测,请使用 AI Platform。
  • 请勿将 AI Platform 在线预测用于批量预测。

流处理

  • 在流式处理流水线中使用直接模型,以获得最佳性能并缩短平均延迟时间。在本地执行预测,而不对远程服务进行 HTTP 调用。
  • 将模型与数据处理流水线分离开来,可以更好地维护在线预测中使用的模型。最佳做法是使用 AI Platform 或任何其他网站托管服务将您的模型用作独立的微服务。
  • 将模型部署为独立的 Web 服务,以允许多个数据处理流水线和在线应用将模型服务用作端点。此外,对模型的更改对于使用它的应用和流水线来说是透明的。
  • 使用负载平衡部署服务的多个实例,以提高模型网络服务的可扩缩性和可用性。利用 AI Platform,在部署模型版本时,您只需在 yaml 配置文件中指定节点数 (manualScaling) 或 minNodes (autoScaling)。
  • 如果在单独的微服务中部署模型,会产生额外费用(具体取决于底层的服务基础架构)。请参阅 AI Platform 在线预测的价格常见问题解答
  • 在流式数据处理流水线中使用微批次处理可以使直接模型和 HTTP 模型服务获得更好的性能。微批处理减少了对模型服务的 HTTP 请求数,并使用 TensorFlow 模型的矢量化操作来获得预测。

后续步骤