使用 TensorFlow Transform 为机器学习预处理数据

本文介绍了如何使用 TensorFlow Transform (tf.Transform) 来为机器学习 (ML) 实现数据预处理。tf.Transform 是一个适用于 TensorFlow 的库,可让您通过数据预处理流水线来定义实例级和全通数据转换。这些流水线是使用 Apache Beam 高效执行的,在此过程中,它们还会创建一个 TensorFlow 图,以便在预测期间执行与模型提供服务时同样的转换。

本文介绍了一个端到端示例,展示了将 Dataflow 用于运行 Apache Beam 的每个步骤。本文假设您已熟悉 BigQuery、Dataflow、AI Platform (ML) Engine 和 TensorFlow Estimator API。

简介

TensorFlow Transform (tf.Transform) 是一个与 TensorFlow 搭配使用来预处理数据的库,对于需要全通的转换非常有用。tf.Transform 的输出内容会导出为 TensorFlow 图以用于训练和服务,该图反映了实例级的转换逻辑以及根据全通转换计算的统计信息(作为常量)。在训练和服务阶段使用同一张图可以防止出现偏差,因为两个阶段执行的转换操作完全相同。此外,tf.Transform 可以在 Dataflow 上的批处理流水线中规模化运行,以预先准备训练数据并提高训练效率。

如需详细了解有关 Google Cloud 上的预处理类型、挑战和选项的概念,请参阅使用 TensorFlow Transform 为机器学习预处理数据 - 第 1 部分

图 1 提供了一个概念性概览,显示了 tf.Transform 在预处理及转换数据以用于训练和预测方面的行为。

tf.Transform 的行为
图 1. tf.Transform 的行为

第 1 部分所述,tf.Transform 有以下用法:

  1. 可使用 tf.Transform Apache Beam API 中实现的转换并在 Dataflow 上大规模地运行,来预处理原始训练数据。预处理分两个阶段进行:分析和转换。

  2. 在分析阶段期间,有状态转换所需的统计信息(例如平均值和方差)由全通操作对训练数据计算得出。在转换阶段中,这些计算的统计信息与实例级操作搭配用来处理训练数据(例如对数值特征进行 z 评分归一化)。

  3. tf.Transform 预处理流水线产生两个主要输出:转换后的训练数据和 transform_fn 函数。

    • 转换后的训练数据用于训练模型,其中模型接口需要转换后的特征。
    • transform_fn 函数包含 TensorFlow 图形式的转换逻辑,在该图中,转换是实例级操作,而在全通转换中计算的统计信息是常量。
  4. 在训练后导出模型时,导出过程会将 transform_fn 图附加到导出的 SavedModel。在提供模型进行预测期间,模型服务接口需要使用原始格式的数据点。 transform_fn 现在是模型的一部分,它会对传入的数据点应用所有预处理逻辑。它也会使用存储的常量(例如平均值和方差)作为预测期间的实例级操作来对数值特征进行归一化。

  5. transform_fn 对原始数据点进行格式转换,生成模型接口要产生预测结果所需要的格式。

适用于此解决方案的 Jupyter 笔记本

GitHub 代码库中有两个 Jupyter Notebook 显示此实现示例:

实现 Apache Beam 流水线

本部分使用一个实际示例来说明如何使用 tf.Transform 预处理数据。本文使用“Natality”(出生率)数据集,用于根据各种输入值预测婴儿体重。这些数据存储在 BigQuery 的公共 natality 表中。

在该示例中,Dataflow 大规模运行 tf.Transform 流水线以准备数据并生成 transform_fn 工件。以下代码运行此流水线。本文档后面的几个部分说明了执行流水线中的每个步骤的函数。整个流水线步骤如下所示:

  1. 从 BigQuery 读取训练数据。
  2. 使用 tf.Transform 分析和转换训练数据。
  3. 将转换后的训练数据写入 Cloud Storage 作为 tfRecords
  4. 从 BigQuery 读取评估数据。
  5. 使用第 2 步生成的 transform_fn 转换评估数据。
  6. 将转换后的训练数据写入 Cloud Storage 作为 tfrecords
  7. 将转换工件写入 Cloud Storage 以创建和导出模型。

下面是整个流水线的 Python 代码。接下来的部分提供了每个步骤的说明和代码列表。

def run_transformation_pipeline(args):

    pipeline_options = beam.pipeline.PipelineOptions(flags=[], **args)

    runner = args['runner']
    data_size = args['data_size']
    transformed_data_location = args['transformed_data_location']
    transform_artefact_location = args['transform_artefact_location']
    temporary_dir = args['temporary_dir']
    debug = args['debug']

    # Instantiate the pipeline
    with beam.Pipeline(runner, options=pipeline_options) as pipeline:
        with impl.Context(temporary_dir):

            # Preprocess train data
            step = 'train'
            # Read raw train data from BigQuery
            raw_train_dataset = read_from_bq(pipeline, step, data_size)
            # Analyze and transform raw_train_dataset
            transformed_train_dataset, transform_fn = analyze_and_transform(raw_train_dataset, step)
            # Write transformed train data to sink as tfrecords
            write_tfrecords(transformed_train_dataset, transformed_data_location, step)

            # Preprocess evaluation data
            step = 'eval'
            # Read raw eval data from BigQuery
            raw_eval_dataset = read_from_bq(pipeline, step, data_size)
            # Transform eval data based on produced transform_fn
            transformed_eval_dataset = transform(raw_eval_dataset, transform_fn, step)
            # Write transformed eval data to sink as tfrecords
            write_tfrecords(transformed_eval_dataset, transformed_data_location, step)

            # Write transformation artefacts
            write_transform_artefacts(transform_fn, transform_artefact_location)

            # (Optional) for debugging, write transformed data as text
            step = 'debug'
            # Wwrite transformed train data as text if debug enabled
            if debug == True:
            write_text(transformed_train_dataset, transformed_data_location, step)

第 1 步:从 BigQuery 读取原始训练数据

第一步是使用 read_from_bq 方法从 BigQuery 读取原始训练数据。此方法会返回提取自 BigQuery 的 raw_dataset 对象。您传递一个 data_size 值和 step 值(可以是 traineval)。BigQuery 源查询是使用 get_source_query 方法构造的。

def read_from_bq(pipeline, step, data_size):

    source_query = get_source_query(step, data_size)
    raw_data = (
        pipeline
        | '{} - Read Data from BigQuery'.format(step) >> beam.io.Read(
                           beam.io.BigQuerySource(query=source_query, use_standard_sql=True))
        | '{} - Clean up Data'.format(step) >> beam.Map(prep_bq_row)
    )

    raw_metadata = create_raw_metadata()
    raw_dataset = (raw_data, raw_metadata)
    return raw_dataset

在执行 tf.Transform 预处理之前,您可能需要执行基于 Apache Beam 的典型处理,包括映射、过滤、分组和设置时间窗口。在该示例中,代码使用 beam.Map(prep_bq_row) 清理从 BigQuery 读取的记录,其中 prep_bq_row 是自定义方法。此自定义方法将分类特征的数值代码转换为直观易懂的标签。

此外,为了使用 tf.Transform 来分析和转换提取自 BigQuery 的 raw_data 对象,您需要创建一个 raw_dataset 对象,它是 (raw_data, raw_metadata) 的元组。raw_metadata 对象由 create_raw_metadata 方法创建,如下所示:

CATEGORICAL_FEATURE_NAMES = ['is_male', 'mother_race']
NUMERIC_FEATURE_NAMES = ['mother_age', 'plurality', 'gestation_weeks']
TARGET_FEATURE_NAME = 'weight_pounds'
KEY_COLUMN = 'key'

def create_raw_metadata():

    raw_data_schema = {}

    # key feature schema
    raw_data_schema[KEY_COLUMN]= tf.io.FixedLenFeature([], tf.float32)

    # target feature schema
    raw_data_schema[TARGET_FEATURE_NAME]= tf.io.FixedLenFeature([], tf.float32)

    # categorical feature schema
    raw_data_schema.update({ column_name: tf.io.FixedLenFeature([], tf.string))
              for column_name in CATEGORICAL_FEATURE_NAMES})

    # numerical feature schema
    raw_data_schema.update({ column_name: tf.io.FixedLenFeature([], tf.float32))
              for column_name in NUMERIC_FEATURE_NAMES})

    # create dataset_metadata given raw_data_schema
    raw_metadata = dataset_metadata.DatasetMetadata(
        schema_utils.schema_from_feature_spec(feature_spec))

    return raw_metadata

这个 raw_metadata 对象还可供 seving_input_receiver_fn 用来创建导出的模型服务接口,该接口需要采用原始格式的数据点,如图 1 所示。导出模型以提供预测服务稍后会在导出模型以提供预测服务下进行介绍。

执行笔记本中定义此方法的单元时,raw_metadata.schema 的内容将会显示。它包括以下列:

  • mother_race (tf.string)
  • weight_pounds (tf.float32)
  • gestation_weaks (tf.float32)
  • key (tf.float32)
  • is_male (tf.string)
  • mother_age (tf.string)

第 2 步:使用 preprocess_fn 转换原始训练数据

假设您希望对训练数据的输入原始特征执行典型的预处理转换,以便为机器学习做好准备。这些转换包括全通和实例级操作。具体来讲,您将执行下表列出的转换。

输入特征 转换 需要的统计信息 类型 输出特征
weight_pound 不适用 weight_pound
mother_age 归一化 平均值、方差 全通 mother_age_normalized
mother_age 等大小分区 分位数 全通 mother_age_bucketized
mother_age 计算日志 实例级 mother_age_log
plurality 指示是单个还是多个婴儿 实例级 is_multiple
is_multiple 将标称值转换为数值索引 词汇表 全通 is_multiple_index
gestation_weeks 缩放到 0 到 1 之间 最小值、最大值 全通 gestation_weeks_scaled
mother_race 将标称值转换为数值索引 词汇表 全通 mother_race_index
is_male 将标称值转换为数值索引 词汇表 全通 is_male_index

这些转换在 preprocess_fn 方法中实现,该方法需要的是张量 (input_features) 的字典,返回已处理特征 (output_features) 的字典。

以下代码展示了使用 tf.Transform 全通转换 API(前缀为 tft.)以及 TensorFlow(前缀为 tf.)实例级操作来实现 preprocess_fn 方法的过程:

def preprocess_fn(input_features):

    output_features = {}

    # target feature
    output_features['weight_pounds'] = input_features['weight_pounds']

    # normalization
    output_features['mother_age_normalized'] = tft.scale_to_z_score(input_features['mother_age'])
    output_features['gestation_weeks_normalized'] = tft.scale_to_z_score(input_features['gestation_weeks'])

    # bucketization based on quantiles
    output_features['mother_age_bucketized'] = tft.bucketise(input_features['mother_age'], num_buckets=5)

    # you can compute new features based on custom formulas
    output_features['mother_age_log'] = tf.log(input_features['mother_age'])

    # or create flags/indicators
    is_multiple = tf.as_string(input_features['plurality'] > tf.constant(1.0))

    # convert categorical features to indexed vocab
    output_features['mother_race_index'] = tft.compute_and_apply_vocabulary(input_features['mother_race'],
        vocab_filename='mother_race')
    output_features['is_male_index'] = tft.compute_and_apply_vocabulary(input_features['is_male'],
        vocab_filename='is_male')
    output_features['is_multiple_index'] = tft.compute_and_apply_vocabulary(is_multiple,
        vocab_filename='is_male')
    return output_features

除了先前示例中的转换之外,tf.Transform 框架还有数个其他转换,其中包括下表所列出的转换。

转换 应用对象 说明
scale_by_min_max 数值特征 将数值列缩放到 [output_min, output_max] 范围
scale_to_0_1 数值特征 返回一列,该列是已缩放到 [0,1] 范围的输入列
scale_to_z_score 数值特征 返回平均值为 0 且方差为 1 的标准化列
tfidf 文本特征 将 x 中的字词映射到其词频 * 逆向文档频率
compute_and_apply_vocabulary 分类特征 生成分类特征的词汇表,并使用此词汇表将分类特征映射为一个整数
ngrams 文本特征 创建 N 元语法的 SparseTensor
hash_strings 分类特征 对字符串进行哈希分区操作
pca 数值特征 使用偏协方差在数据集上计算 PCA
bucketize 数值特征 返回一个大小相等(基于分位数)的分区列,且每个输入都分配有一个分区索引

为了将 preprocess_fn方法中实现的转换应用于流水线的上一步生成的 raw_train_dataset 对象,可以使用 AnalyzeAndTransformDataset 方法。此方法需要 raw_dataset 对象作为输入,应用 preprocess_fn 方法,并生成 transformed_dataset 对象和 transform_fn。以下代码说明此处理过程。

def analyze_and_transform(raw_dataset, step):

    transformed_dataset, transform_fn = (
        raw_dataset
        | '{} - Analyze & Transform'.format(step) >> impl.AnalyzeAndTransformDataset(preprocess_fn)
    )

    return transformed_dataset, transform_fn

转换分两个阶段应用于原始数据:分析阶段和转换阶段。本文档后面的图 5 显示了如何将 AnalyzeAndTransformDataset 分解为 AnalyzeDatasetTransformDataset

分析阶段

在分析阶段,原始训练数据在全通过程中进行分析,以计算转换所需的统计信息。这包括计算平均值、方差、最小值、最大值、分位数和词汇表。分析过程需要原始数据集(原始数据加上原始元数据),并且会生成两个输出:

  • transform_fn。该函数包含从分析阶段计算的统计信息和使用统计信息的转换逻辑作为实例级操作。如稍后的第 7 步所述,transform_fn 将被保存以附加到模型 serving_input_fn。这样做便能够对在线预测数据点应用相同的转换。

  • transform_metadata。该对象描述转换后的数据需要的架构。

分析阶段如图 2 所示。

tf.Transform 分析阶段
图 2. tf.Transform 分析阶段

tf.Transform 分析器包括 minmaxsumsizemeanvarcovariancequantilesvocabularypca

转换阶段

在转换阶段,由分析阶段产生的 transform_fn 用于在实例级过程中转换原始训练数据,以便产生转换后的训练数据。转换后的训练数据与转换后的元数据(由分析阶段产生)配对以生成 transformed_train_dataset

转换阶段如图 3 所示。

tf.Transform 转换阶段
图 3. tf.Transform 转换阶段

如需预处理特征,请在 preprocess_fn 的实现中调用必需的 tensorflow_transform(在代码中导入为 tft)转换。例如,在调用 tft.scale_to_z_score 时,tf.Transform 将此函数调用转换为平均值和方差分析器,计算统计信息,然后应用这些统计信息对数值特征进行归一化。以上全部都是通过调用以下方法自动完成的:

AnalyzeAndTransformDataset(preprocess_fn)

此调用生成的 transformed_metadata.schema 实体将包含以下列:

  • gestation_weeks_normalixed (tf.float32)
  • is_multiple_index (tf.int64, _is_categorical: True)
  • mother_race_index' (tf.int64, _is_categorical: True)
  • is_male_index (tf.int64, _is_categorical: True)
  • mother_age_log (tf.float32)
  • mother_age_bucketized (tf.int64, _is_categorical: True)
  • mother_age_normalized (tf.float32)
  • weight_pounds (tf.float32)

第 3 步:编写转换后的训练数据

在分析和转换阶段通过使用 tf.Transform preprocess_fn 预处理训练数据之后,您可将该数据写入接收器以用来训练 TensorFlow 模型。使用 Dataflow 运行 Apache Beam 流水线时,接收器是 Cloud Storage。在其他情况下,接收器是本地磁盘。虽然您可以将数据写为固定宽度格式的 CSV 文件,但建议 TensorFlow 数据集使用的文件格式是 TFRecord 格式。这是一个简单的面向记录的二进制格式,由 tf.train.Example 协议缓冲区消息组成。

每条 tf.train.Example 记录包含一个或多个特征。它们被送入模型进行训练时将转换为张量。以下代码将已转换的数据集写入指定位置的 TFRecord 文件中。

def write_tfrecords(dataset, location, step):

    transformed_data, transformed_metadata = transformed_dataset
    (
        transformed_data
        | '{} - Write Transformed Data'.format(step) >> beam.io.tfrecordio.WriteToTFRecord(
            file_path_prefix=os.path.join(location,'{}-'.format(step)),
            file_name_suffix=".tfrecords",
            coder=example_proto_coder.ExampleProtoCoder(transformed_metadata.schema))
    )

步骤 4、5 和 6:读取、转换和写入评估数据

转换训练数据并生成 transform_fn 函数后,可以使用该函数转换评估数据。首先,使用前面的第 1 步:从 BigQuery 读取原始训练数据中所述的 read_from_bq 方法,并为 step 参数传递 eval 来读取及清理评估数据。

其次,使用以下代码将原始评估数据集 (raw_dataset) 转换为预期的转换后格式 (transformed_dataset),如以下代码所示:

def transform(raw_dataset, transform_fn, step):
    transformed_dataset = (
        (raw_dataset, transform_fn)
        | '{} - Transform'.format(step) >> impl.TransformDataset()
        )
    return transformed_dataset

转换评估数据时,只会使用 transform_fn 中的逻辑以及从训练数据的分析阶段中计算的统计信息来应用实例级操作。换句话说,您不会以全通方式分析评估数据来计算新的统计信息(例如平均值和方差),以对评估数据中的数值特征进行 z 评分归一化,而是使用从训练数据计算的统计信息,以实例级方式来转换评估数据。

因此,应在训练数据的上下文中使用 impl.AnalyzeAndTransform 来计算统计信息并转换数据。同时,在转换评估数据的上下文中使用 impl.TransformDataset 以仅使用根据训练数据计算的统计信息来转换数据。

第三,将数据写入接收器(Cloud Storage 或本地磁盘,具体取决于运行程序)作为 TFRecord 文件,以在训练过程中用来评估 TensorFlow 模型。为此,请使用第 3 步:写入转换后的训练数据下所述的 write_tfrecords 方法。图 4 展示了如何使用在训练数据的分析阶段中生成的 transform_fn 来转换评估数据。

使用 transform_fn 转换评估数据
图 4. 使用 transform_fn 转换评估数据

第 7 步:保存 transform_fn

tf.Transform 预处理流水线中的最后一步是存储 transform_fn,其中包括在训练数据的分析阶段中生成的工件。用于存储 transform_fn 的代码显示在以下 write_transform_artifacts 方法中:

def write_transform_artefacts(transform_fn, location):

    (
        transform_fn
        | 'Write Transform Artifacts' >> transform_fn_io.WriteTransformFn(location)
    )

这些工件稍后将用于训练模型以及导出模型以提供服务。 此过程中还会生成以下工件,如后面的图 6 所示:

  • saved_model.pb。它代表包含转换逻辑的 TensorFlow 图,该图将附加到模型服务接口以对原始数据点进行格式转换。

  • variables。它包含在训练数据的分析阶段中计算的统计信息,将在 saved_model.pb 的转换逻辑中使用。

  • assets。它包含多个词汇表文件(使用 compute_and_apply_vocabulary 方法处理的每个分类特征各有一个),将在服务f期间用来将输入原始名义值转换为数值索引。

  • transformed_metadata。它包含说明转换后的数据架构的 schema.json 文件。

在 Dataflow 中运行流水线

如需在 Dataflow 中运行 tf.Transform 流水线,请逐步执行笔记本中的代码。如需详细了解如何在 Datalab 中运行笔记本,请参阅使用结构化数据进行机器学习教程。

请务必在笔记本的第一个代码单元中进行以下更改:

  • PROJECT 变量设置为您的项目名称。
  • BUCKET 变量设置为您的存储分区名称。
  • REGION 变量设置为存储分区的地区。
  • LOCAL_RUN 变量设置为 False,以便在 Dialogflow 上运行流水线。

图 5 展示了示例中所述的 tf.Transform 流水线的 Dataflow 执行图。

tf.Transform 流水线的 Dataflow 执行图
图 5. tf.Transform 流水线的 Dataflow 执行图

执行 Dataflow 流水线以预处理训练和评估数据后,您可以通过执行笔记本中的最后一个单元来浏览 Cloud Storage 中生成的对象。TFRecord 格式的已转换训练和评估数据位于以下位置:

gs://[YOUR_BUCKET_NAME]/tft_babyweight/transformed

生成的转换工件位于以下位置:

gs://[YOUR_BUCKET_NAME]/tft_babyweight/transform

图 6 是流水线输出的列表,展示了生成的数据对象和工件。

由 tf.Transform 流水线生成作为输出的数据对象的列表
图 6.tf.Transform 流水线生成作为输出的数据对象和工件列表

实现 TensorFlow 模型

在婴儿体重估计示例中,使用 DNNLinearCombinedRegressor 来实现 TensorFlow 模型。 用于创建、训练、评估和导出模型的代码是 GitHub 代码库中的一个笔记本。该模型使用由先前所述的 tf.Transform 预处理流水线生成的数据和工件。

创建模型的步骤如下:

  1. 加载 transform_metadata 对象。
  2. 创建 input_fn 以使用 transform_metadata 对象读取和解析训练及评估数据。
  3. 使用 transform_metadata 对象创建特征列。
  4. 使用 feature_columns 创建 DNNLinearCombinedRegressor Estimator。
  5. 训练和评估 Estimator。
  6. 通过定义附加了 transform_fnserving_input_fn 导出 Estimator。
  7. 使用 saved_model_cli 工具检查导出的模型。
  8. 使用导出的模型进行预测。

本文的目标并非构建模型,因此并未详细讨论如何构建或训练模型。但是在本部分中,文章介绍了如何使用由 tf.Transform 过程生成的 transform_metadata 对象来创建模型的特征列。本文还介绍了当导出模型以提供服务时如何在 serving_input_fn 中使用同样由 tf.Transform 过程生成的 transform_fn

使用模型训练过程中生成的转换工件

训练 TensorFlow 模型时,您可以使用先前数据处理步骤中生成的已转换的 traineval 对象。这些对象存储为分片的 TFRecord 文件。上一步中生成的 transformed_metadata 对象在以下方面非常有用:

  • 解析数据(tf.train.Example 对象)以提供给模型进行训练和评估。
  • 以元数据驱动动态方式创建特征列。

解析 tf.train.Example 数据

由于您读取 TFRecord 文件来向模型提供训练和评估数据,因此您需要解析文件中的每个 tf.train.Example 对象,以创建特征(张量)字典。这可确保使用用作模型训练/评估接口的特征列将特征映射到模型输入层。您可以使用上一步生成的 transformed_metadata 对象。此过程涉及两个步骤。

首先,加载在先前预处理步骤中生成并保存的 transformed_metadata 对象,如保存 transform_fn 部分所述:

transformed_metadata = metadata_io.read_metadata(
    os.path.join(TRANSFORM_ARTEFACTS_DIR,"transformed_metadata"))

然后,将 transformed_metadata 对象转换为 feature_spec 对象,并在 tfrecords_input_fn 中使用此对象:

def tfrecords_input_fn(files_name_pattern, transformed_metadata,
        mode=tf.estimator.ModeKeys.EVAL,
        num_epochs=1,
        batch_size=500):

    dataset = tf.contrib.data.make_batched_features_dataset(
        file_pattern=files_name_pattern,
        batch_size=batch_size,
        features=transformed_metadata.schema.as_feature_spec(),
        reader=tf.data.TFRecordDataset,
        num_epochs=num_epochs,
        shuffle=True if mode == tf.estimator.ModeKeys.TRAIN else False,
        shuffle_buffer_size=1+(batch_size*2),
        prefetch_buffer_size=1
    )

    iterator = dataset.make_one_shot_iterator()
    features = iterator.get_next()
    target = features.pop(TARGET_FEATURE_NAME)
    return features, target

创建特征列

流水线生成** **transformed_metadata 对象,该对象描述模型进行训练和评估需要的转换后的数据架构。您可以使用此元数据动态创建特征列,而无需按名称指定每一列。如果您拥有数百个特征,这种以元数据驱动的动态方法对于创建特征列非常有用。以下代码展示了如何使用元数据创建特征列。

def create_wide_and_deep_feature_columns(transformed_metadata, hparams):

    deep_feature_columns = []
    wide_feature_columns = []

    column_schemas = transformed_metadata.schema.column_schemas

    for feature_name in column_schemas:
        if feature_name == TARGET_FEATURE_NAME:
            continue

        # creating numerical features
        column_schema = column_schemas[feature_name]
        if isinstance(column_schema._domain, dataset_schema.FloatDomain):
            deep_feature_columns.append(tf.feature_column.numeric_column(feature_name))

       # creating categorical features with identity
        elif isinstance(column_schema._domain, dataset_schema.IntDomain):
            if column_schema._domain._is_categorical==True:
                wide_feature_columns.append(
                    tf.feature_column.categorical_column_with_identity(
                        feature_name,
                        num_buckets=column_schema._domain._max_value+1)
                )
           else:
                deep_feature_columns.append(tf.feature_column.numeric_column(feature_name))

    if hparams.extend_feature_columns==True:
        mother_race_X_mother_age_bucketized = tf.feature_column.crossed_column(
            ['mother_age_bucketized', 'mother_race_index'],  55)

        wide_feature_columns.append(mother_race_X_mother_age_bucketized)

        mother_race_X_mother_age_bucketized_embedded = tf.feature_column.embedding_column(
            mother_race_X_mother_age_bucketized, hparams.embed_dimensions)
        deep_feature_columns.append(mother_race_X_mother_age_bucketized_embedded)

    return wide_feature_columns, deep_feature_columns

如果列架构为 FloatDomain,则此代码将创建 tf.feature_column.numeric_column 列。另一方面,如果列架构为 IntDomain_is_categorical 属性为 True,则此代码将创建 tf.feature_column.categorical_column_with_identity 列。

此外,您还可以创建扩展的特征列,如第 1 部分的“在何处执行预处理”部分的选项 C 下所述。在这组文章所用的示例中,通过使用 tf.feature_column.crossed_column 来交叉 mother_racemother_age_bucketized 特征创建了一个新特征 mother_race_X_mother_age_bucketized。此外,此交叉特征的低维度、密集表示法通过使用 tf.feature_column.embedding_column 特征列创建。

图 7 展示了如何使用转换后的数据以及转换后的元数据来定义和训练 Tensorflow Estimator。

使用转换后的数据训练 TensorFlow 模型
图 7. 使用转换后的数据训练 TensorFlow 模型

导出模型以提供预测服务

训练 TensorFlow 模型 (Estimator) 之后,可将 Estimator 导出为 SavedModel 对象,以便它能够用于根据新的数据点进行预测。导出模型时,必须定义其接口,即提供服务期间需要的输入特征架构。此输入特征架构是在 serving_input_fn 中定义的,如以下代码所示:

def serving_input_fn():

    from tensorflow_transform.saved import saved_transform_io

    # get the feature_spec of raw data
    raw_metadata = create_raw_metadata()

    # create receiver placeholders to the raw input features
    raw_input_features = raw_metadata.schema.as_batched_placeholders()
    raw_input_features.pop(TARGET_FEATURE_NAME)
    raw_input_features.pop(KEY_COLUMN)

    # apply transform_fn on raw features
    _, transformed_features = (
    saved_transform_io.partially_apply_saved_transform(
        os.path.join(TRANSFORM_ARTEFACTS_DIR,transform_fn_io.TRANSFORM_FN_DIR),
    raw_input_features)
    )

    return tf.estimator.export.ServingInputReceiver(
        transformed_features, raw_input_features)

export_dir = os.path.join(model_dir, 'export')

if tf.gfile.Exists(export_dir):
    tf.gfile.DeleteRecursively(export_dir)

estimator.export_savedmodel(
    export_dir_base=export_dir,
    serving_input_receiver_fn=serving_input_fn
)

在服务期间,模型需要原始格式的数据点(即未转换的原始特征)。因此,serving_input_fn 使用 raw_metadata 对象为原始特征创建接收器占位符。但是,如前所述,经过训练的模型需要的是已转换架构中的数据点。因此,在接收原始特征后,请将已保存的 transform_fn 应用于使用占位符接收的 raw_input_features 对象,从而将这些特征转换为模型接口需要的 transformed_features。图 8 展示了导出模型以提供服务的最后一步。

导出附加了 transform_fn 的模型以提供服务
图 8. 导出附加了 transform_fn 的模型以提供服务

训练并使用模型进行预测

您可以通过运行笔记本的单元在本地训练模型。如需查看如何使用 AI Platform 大规模打包代码和训练模型的示例,请参阅 Google Cloud cloudml-samples GitHub 代码库中的示例和指南。

使用 saved_model_cli 工具检查导出的 SavedModel 时,您会发现 signature_def 的输入包括原始特征,如图 9 所示:

导出的 SavedModel 接口
图 9. 导出的 SavedModel 接口

笔记本的最后一个单元展示了如何使用导出的模型进行预测。值得注意的是,输入(示例)数据点位于原始架构中。如需了解如何在 AI Platform 上将模型部署为微服务以进行在线预测,请参阅部署模型文档。

后续步骤