使用 Apache Beam 和 TensorFlow 进行机器学习

本演示介绍了如何使用 Apache Beam、Google Dataflow 和 TensorFlow 对机器学习模型进行预处理、训练以及使用它来执行预测。

本演示使用分子代码示例来演示这些概念。分子代码示例会将分子数据作为输入,创建并训练机器学习模型来预测分子能量。

注意:分子代码示例使用的是 TensorFlow Estimator API。如果您要使用 TensorFlow Keras API,但又希望您的项目更贴近本演示,则可以将您的 Keras 模型转换为 Estimator

费用

本演示可能会使用 Google Cloud 的一项或多项收费组件,包括:

  • Dataflow
  • Cloud Storage
  • AI 平台

您可使用价格计算器,根据您的预计使用情况来估算费用。

概览

分子代码示例会提取包含分子数据的文件,并计算每个分子中所含的碳原子、氢原子、氧原子和氮原子的数量。然后,该代码会将各计数归一化为 0 到 1 之间的值,并将这些值馈送到 TensorFlow 深度神经网络 Estimator。神经网络 Estimator 会训练机器学习模型来预测分子能量。

代码示例包含以下四个阶段:

  1. 数据提取 (data-extractor.py)
  2. 预处理 (preprocess.py)
  3. 训练 (trainer/task.py)
  4. 预测 (predict.py)

以下部分逐步介绍了这四个阶段,不过,本演示重点介绍的是使用 Apache Beam 和 Dataflow 的阶段,即预处理阶段和预测阶段。预处理阶段还会使用 TensorFlow Transform 库(通常称为 tf.Transform)。

下图显示了分子代码示例的工作流。

分子工作流

运行代码示例

请按照分子 GitHub 存储库的 README 中的说明设置您的环境。然后,使用提供的其中一个封装容器脚本(run-localrun-cloud)运行分子代码示例。这些脚本会自动运行代码示例的所有四个阶段(提取、预处理、训练和预测)。

或者,您也可以使用本文档各部分中提供的命令手动运行各个阶段。

在本地运行

要在本地运行分子代码示例,请运行 run-local 封装容器脚本:

./run-local
    

输出日志会显示该脚本运行这四个阶段(数据提取、预处理、训练和预测)的时间。

data-extractor.py 脚本有一个用于指定文件数量的必需参数。 为便于使用,在 run-local 脚本和 run-cloud 脚本中,该参数均默认为 5 个文件。每个文件包含 25000 个分子。运行代码示例从开始到结束大约需要 3 至 7 分钟。运行时间会因计算机的 CPU 而有所不同。

注意run-local 脚本包含可选参数。如需了解如何使用可选参数,请参阅 README

在 Google Cloud 上运行

要在 Google Cloud 上运行分子代码示例,请运行 run-cloud 封装容器脚本。所有输入文件都必须位于 Cloud Storage 中。

请将 --work-dir 参数设置为 Cloud Storage 存储分区:

./run-cloud --work-dir gs://<your-bucket-name>/cloudml-samples/molecules
    

注意run-cloud 脚本包含可选参数。如需了解如何使用可选参数,请参阅 README

第 1 阶段:数据提取

源代码:data-extractor.py

第一步是提取输入数据。data-extractor.py 文件会提取并解压缩指定的 SDF 文件。在后续步骤中,该示例会预处理这些文件,并使用这些数据来训练和评估机器学习模型。该文件会从公共来源中提取 SDF 文件,并将它们存储在指定工作目录内的一个子目录中。默认工作目录 (--work-dir) 是 /tmp/cloudml-samples/molecules

存储提取的文件

将提取的数据文件存储在本地:

python data-extractor.py --max-data-files 5
    

或者,将提取的数据文件存储在 Cloud Storage 位置:

WORK_DIR=gs://<your bucket name>/cloudml-samples/molecules

    python data-extractor.py --work-dir $WORK_DIR --max-data-files 5
    

注意:将文件存储在 Cloud Storage 中会让您的 Google Cloud 项目产生费用。如需了解详情,请参阅 Cloud Storage 价格

第 2 阶段:预处理

源代码:preprocess.py

分子代码示例使用 Apache Beam 流水线来预处理数据。 该流水线执行以下预处理操作:

  1. 读取并解析提取的 SDF 文件。
  2. 计算这些文件中各分子包含的不同原子数。
  3. 使用 tf.Transform 将各计数归一化为 0 到 1 之间的值。
  4. 将数据集划分为训练数据集和评估数据集。
  5. 将两个数据集作为 TFRecord 对象写入。

Apache Beam 转换一次可以有效操作单个元素,但需要对数据集进行全集扫描的转换无法单靠 Apache Beam 就轻松完成,最好是使用 tf.Transform 来执行。鉴于此,代码会先使用 Apache Beam 转换来读取分子、设置分子格式,并计算每个分子中的原子数量。然后,代码会使用 tf.Transform 来计算全局最小和最大计数,以便将数据归一化。

下图显示了流水线中的步骤。

预处理流水线

注意:预处理流水线 (preprocess.py) 与负责进行预测的流水线 (predict.py) 会共用部分逻辑。为避免代码重复,此共用逻辑位于 pubchem/pipeline.pypubchem/sdf.py 文件中。Apache Beam 的流水线依赖项管理文档建议将导入的文件放在单独的模块中。

应用基于元素的转换

preprocess.py 代码会创建 Apache Beam 流水线。

请参阅 tensorflow_transform/beam/impl.py 代码。

# Build and run a Beam Pipeline
    with beam.Pipeline(options=beam_options) as p, \
         beam_impl.Context(temp_dir=tft_temp_dir):

然后,代码会将 feature_extraction 转换应用于该流水线。

# Transform and validate the input data matches the input schema
    dataset = (
        p
        | 'Feature extraction' >> feature_extraction

该流水线使用 SimpleFeatureExtraction 作为其 feature_extraction 转换。

pubchem.SimpleFeatureExtraction(pubchem.ParseSDF(data_files_pattern)),

pubchem/pipeline.py 中定义的 SimpleFeatureExtraction 转换包含一系列转换,这些转换可独立操作所有元素。代码首先会解析源文件中的分子,接着将分子格式设置为分子属性字典,最后计算分子中的原子数量。这些计数是机器学习模型的特征(输入)。

class SimpleFeatureExtraction(beam.PTransform):
      """The feature extraction (element-wise transformations).

      We create a `PTransform` class. This `PTransform` is a bundle of
      transformations that can be applied to any other pipeline as a step.

      We'll extract all the raw features here. Due to the nature of `PTransform`s,
      we can only do element-wise transformations here. Anything that requires a
      full-pass of the data (such as feature scaling) has to be done with
      tf.Transform.
      """
      def __init__(self, source):
        super(SimpleFeatureExtraction, self).__init__()
        self.source = source

      def expand(self, p):
        # Return the preprocessing pipeline. In this case we're reading the PubChem
        # files, but the source could be any Apache Beam source.
        return (p
            | 'Read raw molecules' >> self.source
            | 'Format molecule' >> beam.ParDo(FormatMolecule())
            | 'Count atoms' >> beam.ParDo(CountAtoms())
        )

读取转换 beam.io.Read(pubchem.ParseSDF(data_files_pattern)) 会从自定义来源读取 SDF 文件。

自定义来源名为 ParseSDF,在 pubchem/pipeline.py 中进行定义。 ParseSDF 会扩展 FileBasedSource,并实现 read_records 函数来打开提取的 SDF 文件。

在 Google Cloud 上运行分子代码示例时,多个工作器(虚拟机)可以同时读取文件。为确保不会有两个工作器读取文件中的相同内容,每个文件都会使用 range_tracker

该流水线会按后续步骤所需信息的相关性将原始数据划分为多个部分。解析后 SDF 文件中的每个部分会存储在字典中(请参阅 pipeline/sdf.py),其中键是部分名称,值是相应部分的原始行内容。

注意:本分子代码示例仅提取分子的部分特征。要查看更复杂的特征提取,请参阅此项目

代码会将 beam.ParDo(FormatMolecule()) 应用于该流水线。ParDo 会将名为 FormatMoleculeDoFn 应用于每个分子。FormatMolecule 会生成一个由带格式分子构成的字典。以下代码段是输出 PCollection 中的一个元素示例:

{
      'atoms': [
        {
          'atom_atom_mapping_number': 0,
          'atom_stereo_parity': 0,
          'atom_symbol': u'O',
          'charge': 0,
          'exact_change_flag': 0,
          'h0_designator': 0,
          'hydrogen_count': 0,
          'inversion_retention': 0,
          'mass_difference': 0,
          'stereo_care_box': 0,
          'valence': 0,
          'x': -0.0782,
          'y': -1.5651,
          'z': 1.3894,
        },
        ...
      ],
      'bonds': [
        {
          'bond_stereo': 0,
          'bond_topology': 0,
          'bond_type': 1,
          'first_atom_number': 1,
          'reacting_center_status': 0,
          'second_atom_number': 5,
        },
        ...
      ],
      '<PUBCHEM_COMPOUND_CID>': ['3\n'],
      ...
      '<PUBCHEM_MMFF94_ENERGY>': ['19.4085\n'],
      ...
    }
    

然后,代码会将 beam.ParDo(CountAtoms()) 应用于该流水线。DoFn CountAtoms 会对每个分子包含的碳原子、氢原子、氮原子和氧原子的数量求和。CountAtoms 会输出一个由特征和标签组成的 PCollection。 下面是输出 PCollection 中的一个元素示例:

{
      'ID': 3,
      'TotalC': 7,
      'TotalH': 8,
      'TotalO': 4,
      'TotalN': 0,
      'Energy': 19.4085,
    }
    

流水线随后对输入进行验证。ValidateInputData DoFn 会验证每个元素是否与 input_schema 中指定的元数据匹配。此验证可确保数据需要与其馈入 TensorFlow 时相似。

| 'Validate inputs' >> beam.ParDo(ValidateInputData(
        input_feature_spec)))

应用全集转换

分子代码示例使用深度神经网络回归器进行预测。一般情况下,建议先将输入归一化,然后再将其馈入机器学习模型。该流水线使用 tf.Transform 将各个原子数归一化为 0 到 1 之间的值。要详细了解如何对输入归一化,请参阅特征调节

将值归一化需要对数据集进行全集扫描,并记录最小值和最大值。代码会使用 tf.Transform 对整个数据集进行全集扫描并应用全集转换。

要使用 tf.Transform,代码必须提供一个包含数据集转换逻辑的函数。在 preprocess.py 中,代码使用 tf.Transform 提供的 AnalyzeAndTransformDataset 转换。详细了解如何使用 tf.Transform

# Apply the tf.Transform preprocessing_fn
    input_metadata = dataset_metadata.DatasetMetadata(
        dataset_schema.from_feature_spec(input_feature_spec))

    dataset_and_metadata, transform_fn = (
        (dataset, input_metadata)
        | 'Feature scaling' >> beam_impl.AnalyzeAndTransformDataset(
            feature_scaling))
    dataset, metadata = dataset_and_metadata

preprocess.py 中,使用的 feature_scaling 函数为 normalize_inputs,该函数在 pubchem/pipeline.py 中定义。该函数使用 tf.Transform 函数 scale_to_0_1 将计数归一化为 0 到 1 之间的值。

def normalize_inputs(inputs):
      """Preprocessing function for tf.Transform (full-pass transformations).

      Here we will do any preprocessing that requires a full-pass of the dataset.
      It takes as inputs the preprocessed data from the `PTransform` we specify, in
      this case `SimpleFeatureExtraction`.

      Common operations might be scaling values to 0-1, getting the minimum or
      maximum value of a certain field, creating a vocabulary for a string field.

      There are two main types of transformations supported by tf.Transform, for
      more information, check the following modules:
        - analyzers: tensorflow_transform.analyzers.py
        - mappers:   tensorflow_transform.mappers.py

      Any transformation done in tf.Transform will be embedded into the TensorFlow
      model itself.
      """
      return {
          # Scale the input features for normalization
          'NormalizedC': tft.scale_to_0_1(inputs['TotalC']),
          'NormalizedH': tft.scale_to_0_1(inputs['TotalH']),
          'NormalizedO': tft.scale_to_0_1(inputs['TotalO']),
          'NormalizedN': tft.scale_to_0_1(inputs['TotalN']),

          # Do not scale the label since we want the absolute number for prediction
          'Energy': inputs['Energy'],
      }

可以手动将数据归一化,但如果数据集很大,使用 Dataflow 的速度会更快。使用 Dataflow 可以根据需要在多个工作器(虚拟机)上运行流水线。

对数据集分区

接下来,preprocess.py 流水线会将单个数据集划分为两个数据集。该流水线会分配约 80% 的数据用作训练数据,约 20% 的数据用作评估数据。

# Split the dataset into a training set and an evaluation set
    assert 0 < eval_percent < 100, 'eval_percent must in the range (0-100)'
    train_dataset, eval_dataset = (
        dataset
        | 'Split dataset' >> beam.Partition(
            lambda elem, _: int(random.uniform(0, 100) < eval_percent), 2))

写入输出

最后,preprocess.py 流水线会使用 WriteToTFRecord 转换写入两个数据集(训练数据集和评估数据集)。

# Write the datasets as TFRecords
    coder = example_proto_coder.ExampleProtoCoder(metadata.schema)

    train_dataset_prefix = os.path.join(train_dataset_dir, 'part')
    _ = (
        train_dataset
        | 'Write train dataset' >> tfrecordio.WriteToTFRecord(
            train_dataset_prefix, coder))

    eval_dataset_prefix = os.path.join(eval_dataset_dir, 'part')
    _ = (
        eval_dataset
        | 'Write eval dataset' >> tfrecordio.WriteToTFRecord(
            eval_dataset_prefix, coder))

    # Write the transform_fn
    _ = (
        transform_fn
        | 'Write transformFn' >> transform_fn_io.WriteTransformFn(work_dir))

运行预处理流水线

在本地运行预处理流水线:

python preprocess.py
    

或者,在 Dataflow 上运行预处理流水线:

PROJECT=$(gcloud config get-value project)
    WORK_DIR=gs://<your bucket name>/cloudml-samples/molecules
    python preprocess.py \
      --project $PROJECT \
      --runner DataflowRunner \
      --temp_location $WORK_DIR/beam-temp \
      --setup_file ./setup.py \
      --work-dir $WORK_DIR
    

注意:运行 Dataflow 流水线会让您的 Google Cloud 项目产生费用。如需了解详情,请参阅 Dataflow 价格

该流水线运行后,您可以在 Dataflow 监控界面中查看流水线的进度:

分子预处理流水线

第 3 阶段:训练

源代码:trainer/task.py

如前文所述,在预处理阶段的最后步骤中,代码会将数据拆分为两个数据集(训练数据集和评估数据集)。

该示例使用 TensorFlow 来训练机器学习模型。分子代码示例中的 trainer/task.py 文件包含用于训练模型的代码。trainer/task.py 的主函数会加载在预处理阶段中处理的数据。

Estimator 会使用训练数据集来训练模型,然后使用评估数据集来验证模型是否能在提供了特定分子属性的情况下准确预测分子能量。

详细了解如何训练机器学习模型

训练模型

在本地训练模型:

python trainer/task.py

    # To get the path of the trained model
    EXPORT_DIR=/tmp/cloudml-samples/molecules/model/export/final
    MODEL_DIR=$(ls -d -1 $EXPORT_DIR/* | sort -r | head -n 1)
    

或者,在 AI Platform 上训练模型:

JOB="cloudml_samples_molecules_$(date +%Y%m%d_%H%M%S)"
    BUCKET=gs://<your bucket name>
    WORK_DIR=$BUCKET/cloudml-samples/molecules
    gcloud ml-engine jobs submit training $JOB \
      --module-name trainer.task \
      --package-path trainer \
      --staging-bucket $BUCKET \
      --runtime-version 1.8 \
      --stream-logs \
      -- \
      --work-dir $WORK_DIR

    # To get the path of the trained model:
    EXPORT_DIR=$WORK_DIR/model/export
    MODEL_DIR=$(gsutil ls -d $EXPORT_DIR/* | sort -r | head -n 1)
    

注意:使用 AI Platform 进行预测会让您的 Google Cloud 项目产生费用。如需了解详情,请参阅 AI Platform 价格

第 4 阶段:预测

源代码:predict.py

在 Estimator 训练模型后,为模型提供输入即可进行预测。在分子代码示例中,predict.py 中的流水线负责进行预测。该流水线可以作为批量流水线或流式流水线。

除来源和接收器互动外,批量流水线和流式流水线的代码是相同的。如果流水线以批量模式运行,它将从自定义来源读取输入文件,并将输出预测作为文本文件写入指定的工作目录。如果流水线以流式模式运行,则会在输入分子到达时从某个 Pub/Sub 主题中读取这些输入分子。该流水线会将可供使用的输出预测写入另一个 Pub/Sub 主题。

if args.verb == 'batch':
      data_files_pattern = os.path.join(args.inputs_dir, '*.sdf')
      results_prefix = os.path.join(args.outputs_dir, 'part')
      source = pubchem.ParseSDF(data_files_pattern)
      sink = beam.io.WriteToText(results_prefix)

    elif args.verb == 'stream':
      if not project:
        parser.print_usage()
        print('error: argument --project is required for streaming')
        sys.exit(1)

      beam_options.view_as(StandardOptions).streaming = True
      source = beam.io.ReadFromPubSub(topic='projects/{}/topics/{}'.format(
          project, args.inputs_topic))
      sink = beam.io.WriteToPubSub(topic='projects/{}/topics/{}'.format(
          project, args.outputs_topic))

下图显示了预测流水线(批量和流式模式)中的各个步骤。

分子预处理流水线

predict.py 中,代码通过 run 函数定义该流水线:

def run(model_dir, feature_extraction, sink, beam_options=None):
      print('Listening...')
      with beam.Pipeline(options=beam_options) as p:
        _ = (p
            | 'Feature extraction' >> feature_extraction
            | 'Predict' >> beam.ParDo(Predict(model_dir, 'ID'))
            | 'Format as JSON' >> beam.Map(json.dumps)
            | 'Write predictions' >> sink)

代码使用以下参数调用 run 函数:

run(
        args.model_dir,
        pubchem.SimpleFeatureExtraction(source),
        sink,
        beam_options)

首先,代码会将 pubchem.SimpleFeatureExtraction(source) 转换作为 feature_extraction 转换进行传递。此转换(也用于预处理阶段)会应用于该流水线:

class SimpleFeatureExtraction(beam.PTransform):
      """The feature extraction (element-wise transformations).

      We create a `PTransform` class. This `PTransform` is a bundle of
      transformations that can be applied to any other pipeline as a step.

      We'll extract all the raw features here. Due to the nature of `PTransform`s,
      we can only do element-wise transformations here. Anything that requires a
      full-pass of the data (such as feature scaling) has to be done with
      tf.Transform.
      """
      def __init__(self, source):
        super(SimpleFeatureExtraction, self).__init__()
        self.source = source

      def expand(self, p):
        # Return the preprocessing pipeline. In this case we're reading the PubChem
        # files, but the source could be any Apache Beam source.
        return (p
            | 'Read raw molecules' >> self.source
            | 'Format molecule' >> beam.ParDo(FormatMolecule())
            | 'Count atoms' >> beam.ParDo(CountAtoms())
        )

该转换会根据流水线的执行模式(批量或流式)从适当来源读取分子、设置分子的格式,并计算每个分子中各类原子的数量。

接下来,beam.ParDo(Predict(…)) 会应用于执行分子能量预测的流水线。Predict(即系统传递的 DoFn)会使用指定的输入特征(原子计数)字典来预测分子能量。

接着要应用于该流水线的转换是 beam.Map(lambda result: json.dumps(result)),这个转换会获取预测结果字典并将其序列化为 JSON 字符串。

最后,输出会写入接收器(对于批量模式,输出以文本文件形式写入工作目录;对于流式模式,输出以消息形式发布到 Pub/Sub 主题)。

批量预测

批量预测针对吞吐量(而非延迟时间)进行了优化。如果您需要进行许多预测,而且可以等到所有预测完成后再获取结果,则批量预测最适合。

以批量模式运行预测流水线

在本地运行批量预测流水线:

# For simplicity, we'll use the same files we used for training
    python predict.py \
      --model-dir $MODEL_DIR \
      batch \
      --inputs-dir /tmp/cloudml-samples/molecules/data \
      --outputs-dir /tmp/cloudml-samples/molecules/predictions
    

或者,在 Dataflow 上运行批量预测流水线:

# For simplicity, we'll use the same files we used for training
    PROJECT=$(gcloud config get-value project)
    WORK_DIR=gs://<your bucket name>/cloudml-samples/molecules
    python predict.py \
      --work-dir $WORK_DIR \
      --model-dir $MODEL_DIR \
      batch \
      --project $PROJECT \
      --runner DataflowRunner \
      --temp_location $WORK_DIR/beam-temp \
      --setup_file ./setup.py \
      --inputs-dir $WORK_DIR/data \
      --outputs-dir $WORK_DIR/predictions
    

注意:运行 Dataflow 流水线会让您的 Google Cloud 项目产生费用。如需了解详情,请参阅 Dataflow 价格

该流水线运行后,您可以在 Dataflow 监控界面中查看流水线的进度:

分子预测流水线

流式预测

流式预测针对延迟时间而非吞吐量进行了优化。如果您是零星地进行几次预测,但希望尽快获取结果,则流式预测最适合。

预测服务(流式预测流水线)会从一个 Pub/Sub 主题接收输入分子,并将输出(预测结果)发布到另一个 Pub/Sub 主题。

创建输入 Pub/Sub 主题:

gcloud pubsub topics create molecules-inputs
    

创建输出 Pub/Sub 主题:

gcloud pubsub topics create molecules-predictions
    

在本地运行流式预测流水线:

# Run on terminal 1
    PROJECT=$(gcloud config get-value project)
    python predict.py \
      --model-dir $MODEL_DIR \
      stream \
      --project $PROJECT
      --inputs-topic molecules-inputs \
      --outputs-topic molecules-predictions
    

或者,在 Dataflow 上运行流式预测流水线:

# Run on terminal 1
    PROJECT=$(gcloud config get-value project)
    WORK_DIR=gs://<your bucket name>/cloudml-samples/molecules
    python predict.py \
      --work-dir $WORK_DIR \
      --model-dir $MODEL_DIR \
      stream \
      --project $PROJECT
      --runner DataflowRunner \
      --temp_location $WORK_DIR/beam-temp \
      --setup_file ./setup.py \
      --inputs-topic molecules-inputs \
      --outputs-topic molecules-predictions
    

在运行预测服务(流式预测流水线)后,您需要运行发布者服务来将分子发送到预测服务,并运行订阅者服务来侦听预测结果。本分子代码示例提供了发布者 (publisher.py) 和订阅者 (subscriber.py) 服务。

发布者服务会解析目录中的 SDF 文件,并将其发布到输入主题。订阅者服务会侦听预测结果并记录它们。为简单起见,此示例使用与训练阶段相同的 SDF 文件。

注意:您需要以不同的进程并行运行这些服务,因此需要使用不同的终端来运行各个命令。请务必在每个终端上激活 virtualenv

运行订阅者:

# Run on terminal 2
    python subscriber.py \
      --project $PROJECT \
      --topic molecules-predictions
    

运行发布者服务:

# Run on terminal 3
    python publisher.py \
      --project $PROJECT \
      --topic molecules-inputs \
      --inputs-dir $WORK_DIR/data
    

在发布者服务开始解析和发布分子之后,您将开始看到来自订阅者服务的预测。

清理

当流式预测流水线运行完毕后,请停止您的流水线以免产生费用。

后续步骤