Machine learning com o Apache Beam e o TensorFlow

Neste tutorial, você verá como pré-processar, treinar e fazer previsões em um modelo de machine learning, usando o Apache Beam, o Google Dataflow e o TensorFlow.

Para demonstrar esses conceitos, usaremos a amostra de código do Molecules. Considerando dados moleculares como entrada, a amostra do código Molecules cria e treina um modelo de machine learning para prever a energia molecular.

Custos

Neste tutorial, há a possibilidade de usar componentes faturáveis do Google Cloud, incluindo um destes ou mais:

  • Dataflow
  • Cloud Storage
  • AI Platform

Use a calculadora de preços para gerar uma estimativa de custo baseada na projeção de uso.

Visão geral

A amostra de código do Molecules extrai arquivos que contêm dados moleculares e conta o número de átomos de carbono, hidrogênio, oxigênio e nitrogênio em cada molécula. Em seguida, o código normaliza as contagens para valores entre 0 e 1 e alimenta os valores em um estimador de rede neural profunda do TensorFlow. O estimador de rede neural treina um modelo de aprendizado de máquina para prever energia molecular.

A amostra de código consiste em quatro fases:

  1. Extração de dados (data-extractor.py)
  2. Pré-processamento (preprocess.py)
  3. Treinamento (trainer/task.py)
  4. Previsão (predict.py)

As seções abaixo analisam as quatro fases, mas este tutorial concentra-se mais nas fases que usam o Apache Beam e o Dataflow: a fase de pré-processamento e a de previsão. A fase de pré-processamento também usa a biblioteca de transformação do TensorFlow, conhecida como tf.Transform.

A imagem a abaixo mostra o fluxo de trabalho da amostra de código do Molecules.

Fluxo de trabalho do Molecules

Executar a amostra de código

Para configurar o ambiente, siga as instruções no README (em inglês) do repositório do GitHub do Molecules. Em seguida, execute a amostra de código do Molecules usando um dos scripts de wrapper fornecidos, run-local ou run-cloud. Esses scripts executam automaticamente todas as quatro fases da amostra de código (extração, pré-processamento, treinamento e predição).

Como alternativa, você pode executar cada fase manualmente usando os comandos fornecidos nas seções deste documento.

Executar no local

Para executar a amostra de código do Molecules localmente, execute o script de wrapper run-local:

./run-local

Os registros de saída mostram quando o script executa cada uma das quatro fases (extração de dados, pré-processamento, treinamento e previsão).

O script data-extractor.py tem um argumento obrigatório para o número de arquivos. Para facilitar o uso, os scripts run-local e run-cloud têm um padrão de cinco arquivos para esse argumento. Cada arquivo contém 25.000 moléculas. A execução da amostra de código leva aproximadamente de três a sete minutos do início ao fim. O tempo de execução varia de acordo com a CPU do computador.

Executar no Google Cloud

Para executar a amostra de código do Molecules no Google Cloud, execute o script de wrapper run-cloud. Todos os arquivos de entrada precisam estar no Cloud Storage.

Defina o parâmetro --work-dir para o bucket do Cloud Storage:

./run-cloud --work-dir gs://<your-bucket-name>/cloudml-samples/molecules

Fase 1: extração de dados

Código-fonte: data-extractor.py

O primeiro passo é extrair os dados de entrada. O arquivo data-extractor.py extrai e descompacta os arquivos SDF especificados. Em passos posteriores, o exemplo pré-processa esses arquivos e usa os dados para treinar e avaliar o modelo de machine learning. O arquivo extrai os arquivos SDF da fonte pública e os armazena em um subdiretório dentro do diretório de trabalho especificado. O diretório de trabalho padrão (--work-dir) é /tmp/cloudml-samples/molecules.

Armazenar os arquivos extraídos

Armazene os arquivos de dados extraídos no local:

python data-extractor.py --max-data-files 5

Ou armazene os arquivos de dados extraídos em um local do Cloud Storage:

WORK_DIR=gs://<your bucket name>/cloudml-samples/molecules

python data-extractor.py --work-dir $WORK_DIR --max-data-files 5

Fase 2: pré-processamento

Código-fonte: preprocess.py

A amostra de código do Molecules usa um canal do Apache Beam para pré-processar os dados. O canal executa as ações de pré-processamento a seguir:

  1. Lê e analisa os arquivos SDF extraídos.
  2. Conta o número de átomos diferentes em cada uma das moléculas nos arquivos.
  3. Normaliza as contagens para valores entre 0 e 1 usando tf.Transform.
  4. Particiona o conjunto de dados em um conjunto de dados de treinamento e em um de avaliação.
  5. Grava os dois conjuntos de dados como objetos TFRecord.

As transformações do Apache Beam podem manipular elementos individuais de uma só vez de maneira eficiente. No entanto, transformações que exigem uma passagem completa do conjunto de dados não são feitas facilmente apenas com o Apache Beam e têm melhores resultados com o uso do tf.Transform. Por causa disso, o código usa as transformações do Apache Beam para ler e formatar as moléculas, além de contar os átomos em cada molécula. Em seguida, o código usa tf.Transform para encontrar as contagens mínima e máxima globais e, assim, normalizar os dados.

A imagem a seguir mostra os passos no pipeline.

Canal de pré-processamento

Como aplicar transformações baseadas em elementos

O código preprocess.py cria um pipeline do Apache Beam.

# Build and run a Beam Pipeline
with beam.Pipeline(options=beam_options) as p, \
     beam_impl.Context(temp_dir=tft_temp_dir):

Em seguida, o código aplica uma transformação feature_extraction ao pipeline.

# Transform and validate the input data matches the input schema
dataset = (
    p
    | 'Feature extraction' >> feature_extraction

O pipeline usa SimpleFeatureExtraction como sua transformação feature_extraction.

pubchem.SimpleFeatureExtraction(pubchem.ParseSDF(data_files_pattern)),

A transformação SimpleFeatureExtraction, definida em pubchem/pipeline.py, contém uma série de transformações que manipulam todos os elementos de maneira independente. Primeiro, o código analisa as moléculas do arquivo de origem, depois as formata para um dicionário de propriedades das moléculas e, por fim, conta os átomos nelas. Essas contagens são os atributos (entradas) para o modelo de machine learning.

class SimpleFeatureExtraction(beam.PTransform):
  """The feature extraction (element-wise transformations).

  We create a `PTransform` class. This `PTransform` is a bundle of
  transformations that can be applied to any other pipeline as a step.

  We'll extract all the raw features here. Due to the nature of `PTransform`s,
  we can only do element-wise transformations here. Anything that requires a
  full-pass of the data (such as feature scaling) has to be done with
  tf.Transform.
  """
  def __init__(self, source):
    super(SimpleFeatureExtraction, self).__init__()
    self.source = source

  def expand(self, p):
    # Return the preprocessing pipeline. In this case we're reading the PubChem
    # files, but the source could be any Apache Beam source.
    return (p
        | 'Read raw molecules' >> self.source
        | 'Format molecule' >> beam.ParDo(FormatMolecule())
        | 'Count atoms' >> beam.ParDo(CountAtoms())
    )

A transformação de leitura beam.io.Read(pubchem.ParseSDF(data_files_pattern)) lê arquivos SDF de uma fonte personalizada (em inglês).

A fonte personalizada, chamada ParseSDF, é definida em pubchem/pipeline.py. ParseSDF estende FileBasedSource (em inglês) e implementa o read_records que abre os arquivos SDF extraídos.

Ao executar a amostra de código do Molecules no Google Cloud, vários workers (VMs) podem ler os arquivos simultaneamente. Para garantir que não haja dois workers lendo o mesmo conteúdo nos arquivos, cada arquivo usa um range_tracker (em inglês).

O pipeline agrupa os dados brutos em seções de informações relevantes necessárias para os próximos passos. Cada seção no arquivo SDF analisado é armazenada em um dicionário (consulte pipeline/sdf.py), em que as chaves são os nomes das seções, e os valores são os conteúdos da linha não processada da seção correspondente.

O código aplica beam.ParDo(FormatMolecule()) ao pipeline. O ParDo aplica o DoFn chamado FormatMolecule a cada molécula. FormatMolecule produz um dicionário de moléculas formatadas. O snippet a seguir é um exemplo de um elemento na PCollection de saída:

{
  'atoms': [
    {
      'atom_atom_mapping_number': 0,
      'atom_stereo_parity': 0,
      'atom_symbol': u'O',
      'charge': 0,
      'exact_change_flag': 0,
      'h0_designator': 0,
      'hydrogen_count': 0,
      'inversion_retention': 0,
      'mass_difference': 0,
      'stereo_care_box': 0,
      'valence': 0,
      'x': -0.0782,
      'y': -1.5651,
      'z': 1.3894,
    },
    ...
  ],
  'bonds': [
    {
      'bond_stereo': 0,
      'bond_topology': 0,
      'bond_type': 1,
      'first_atom_number': 1,
      'reacting_center_status': 0,
      'second_atom_number': 5,
    },
    ...
  ],
  '<PUBCHEM_COMPOUND_CID>': ['3\n'],
  ...
  '<PUBCHEM_MMFF94_ENERGY>': ['19.4085\n'],
  ...
}

Em seguida, o código aplica beam.ParDo(CountAtoms()) ao pipeline. Os DoFnCountAtoms soma a quantidade de átomos de carbono, hidrogênio, nitrogênio e oxigênio que cada molécula tem. CountAtoms gera uma PCollection de atributos e rótulos. Aqui está um exemplo de um elemento na PCollection de saída:

{
  'ID': 3,
  'TotalC': 7,
  'TotalH': 8,
  'TotalO': 4,
  'TotalN': 0,
  'Energy': 19.4085,
}

Então, o pipeline valida as entradas. O ValidateInputDataDoFn valida que cada elemento corresponda os metadados fornecidos ao input_schema. Essa validação garante que os dados estejam no formato correto quando forem alimentados no TensorFlow.

| 'Validate inputs' >> beam.ParDo(ValidateInputData(
    input_feature_spec)))

Como aplicar transformações de passagem completa

A amostra de código do Molecules usa um Regressor de rede neural profunda para fazer predições. A recomendação geral é normalizar as entradas antes de alimentá-las no modelo de ML. O pipeline usa tf.Transform para normalizar as contagens de cada átomo para valores entre 0 e 1. Para ler mais sobre a normalização de entradas, consulte dimensionamento de atributos (em inglês).

A normalização dos valores requer uma passagem completa pelo conjunto de dados, gravando os valores mínimo e máximo. O código usa tf.Transform para percorrer todo o conjunto de dados e aplicar transformações de passagem completa.

Para usar tf.Transform, o código precisa fornecer uma função que contenha a lógica da transformação a ser executada no conjunto de dados. Em preprocess.py, o código usa a transformação AnalyzeAndTransformDataset fornecida por tf.Transform. Saiba mais sobre como usar tf.Transform.

# Apply the tf.Transform preprocessing_fn
input_metadata = dataset_metadata.DatasetMetadata(
    dataset_schema.from_feature_spec(input_feature_spec))

dataset_and_metadata, transform_fn = (
    (dataset, input_metadata)
    | 'Feature scaling' >> beam_impl.AnalyzeAndTransformDataset(
        feature_scaling))
dataset, metadata = dataset_and_metadata

Em preprocess.py, a função feature_scaling usada é normalize_inputs, que é definida em pubchem/pipeline.py. A função usa a função tf.Transform scale_to_0_1 para normalizar as contagens para valores entre 0 e 1.

def normalize_inputs(inputs):
  """Preprocessing function for tf.Transform (full-pass transformations).

  Here we will do any preprocessing that requires a full-pass of the dataset.
  It takes as inputs the preprocessed data from the `PTransform` we specify, in
  this case `SimpleFeatureExtraction`.

  Common operations might be scaling values to 0-1, getting the minimum or
  maximum value of a certain field, creating a vocabulary for a string field.

  There are two main types of transformations supported by tf.Transform, for
  more information, check the following modules:
    - analyzers: tensorflow_transform.analyzers.py
    - mappers:   tensorflow_transform.mappers.py

  Any transformation done in tf.Transform will be embedded into the TensorFlow
  model itself.
  """
  return {
      # Scale the input features for normalization
      'NormalizedC': tft.scale_to_0_1(inputs['TotalC']),
      'NormalizedH': tft.scale_to_0_1(inputs['TotalH']),
      'NormalizedO': tft.scale_to_0_1(inputs['TotalO']),
      'NormalizedN': tft.scale_to_0_1(inputs['TotalN']),

      # Do not scale the label since we want the absolute number for prediction
      'Energy': inputs['Energy'],
  }

É possível normalizar os dados manualmente, mas se o conjunto de dados for grande, será mais rápido usar o Dataflow. Usar o Dataflow permite que o pipeline seja executado em vários workers (VMs), conforme necessário.

Como particionar o conjunto de dados

Em seguida, o pipeline preprocess.py particiona o conjunto de dados único em dois. Ele aloca aproximadamente 80% dos dados a serem usados como dados de treinamento, e aproximadamente 20% deles como dados de avaliação.

# Split the dataset into a training set and an evaluation set
assert 0 < eval_percent < 100, 'eval_percent must in the range (0-100)'
train_dataset, eval_dataset = (
    dataset
    | 'Split dataset' >> beam.Partition(
        lambda elem, _: int(random.uniform(0, 100) < eval_percent), 2))

Como gravar a saída

Por fim, o pipeline preprocess.py grava os dois conjuntos de dados (treinamento e avaliação) usando a transformação WriteToTFRecord (em inglês).

# Write the datasets as TFRecords
coder = example_proto_coder.ExampleProtoCoder(metadata.schema)

train_dataset_prefix = os.path.join(train_dataset_dir, 'part')
_ = (
    train_dataset
    | 'Write train dataset' >> tfrecordio.WriteToTFRecord(
        train_dataset_prefix, coder))

eval_dataset_prefix = os.path.join(eval_dataset_dir, 'part')
_ = (
    eval_dataset
    | 'Write eval dataset' >> tfrecordio.WriteToTFRecord(
        eval_dataset_prefix, coder))

# Write the transform_fn
_ = (
    transform_fn
    | 'Write transformFn' >> transform_fn_io.WriteTransformFn(work_dir))

Executar o pipeline de pré-processamento

Execute o pipeline de pré-processamento localmente:

python preprocess.py

Ou execute o pipeline de pré-processamento no Dataflow:

PROJECT=$(gcloud config get-value project)
WORK_DIR=gs://<your bucket name>/cloudml-samples/molecules
python preprocess.py \
  --project $PROJECT \
  --runner DataflowRunner \
  --temp_location $WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --work-dir $WORK_DIR

Depois que o pipeline estiver em execução, será possível visualizar o progresso dele na interface de monitoramento do Dataflow:

Canal de pré-processamento do Molecules

Fase 3: treinamento

Código-fonte: trainer/task.py

Lembre-se de que, no final da fase de pré-processamento, o código divide os dados em dois conjuntos de dados (treinamento e avaliação).

A amostra usa o TensorFlow para treinar o modelo de machine learning. O arquivo trainer/task.py na amostra de código de moléculas contém o código para treinar o modelo. A função principal de trainer/task.py carrega os dados que foram processados na fase de pré-processamento.

O Estimator usa o conjunto de dados de treinamento para treinar o modelo. Em seguida, usa o conjunto de dados de avaliação para verificar se o modelo prevê com precisão a energia molecular, dadas algumas das propriedades da molécula.

Saiba mais sobre como treinar um modelo ML.

Treinar o modelo

Treine o modelo localmente:

python trainer/task.py

# To get the path of the trained model
EXPORT_DIR=/tmp/cloudml-samples/molecules/model/export/final
MODEL_DIR=$(ls -d -1 $EXPORT_DIR/* | sort -r | head -n 1)

Ou treine o modelo no AI Platform:

gcloud config set compute/region $REGION
  • Iniciar o job de treinamento
JOB="cloudml_samples_molecules_$(date +%Y%m%d_%H%M%S)"
BUCKET=gs://<your bucket name>
WORK_DIR=$BUCKET/cloudml-samples/molecules
gcloud ai-platform jobs submit training $JOB \
  --module-name trainer.task \
  --package-path trainer \
  --staging-bucket $BUCKET \
  --runtime-version 1.13 \
  --stream-logs \
  -- \
  --work-dir $WORK_DIR

# To get the path of the trained model:
EXPORT_DIR=$WORK_DIR/model/export
MODEL_DIR=$(gsutil ls -d $EXPORT_DIR/* | sort -r | head -n 1)

Fase 4: previsão

Código-fonte: predict.py

Depois que o Estimator treinar o modelo, forneça entradas para que ele faça previsões. Na amostra de código do Molecules, o pipeline em predict.py é responsável pelas previsões. O canal pode atuar como um canal em lote ou de streaming.

O código para o canal é o mesmo para lote e streaming, exceto para as interações de fonte e coletor. Se o pipeline for executado no modo em lote, ele lerá os arquivos de entrada da fonte personalizada e gravará as predições de saída como arquivos de texto no diretório de trabalho especificado. Se o pipeline for executado no modo de streaming, ele lerá as moléculas de entrada, conforme elas chegam, a partir de um tópico do Pub/Sub. O pipeline grava as previsões de saída, conforme elas ficam prontas, em um tópico diferente do Pub/Sub.

if args.verb == 'batch':
  data_files_pattern = os.path.join(args.inputs_dir, '*.sdf')
  results_prefix = os.path.join(args.outputs_dir, 'part')
  source = pubchem.ParseSDF(data_files_pattern)
  sink = beam.io.WriteToText(results_prefix)

elif args.verb == 'stream':
  if not project:
    parser.print_usage()
    print('error: argument --project is required for streaming')
    sys.exit(1)

  beam_options.view_as(StandardOptions).streaming = True
  source = beam.io.ReadFromPubSub(topic='projects/{}/topics/{}'.format(
      project, args.inputs_topic))
  sink = beam.io.WriteToPubSub(topic='projects/{}/topics/{}'.format(
      project, args.outputs_topic))

A imagem a seguir mostra as etapas nos canais de predição (lote e streaming).

Pipeline de pré-processamento do Molecules

Em predict.py, o código define o pipeline na função run:

def run(model_dir, feature_extraction, sink, beam_options=None):
  print('Listening...')
  with beam.Pipeline(options=beam_options) as p:
    _ = (p
        | 'Feature extraction' >> feature_extraction
        | 'Predict' >> beam.ParDo(Predict(model_dir, 'ID'))
        | 'Format as JSON' >> beam.Map(json.dumps)
        | 'Write predictions' >> sink)

O código chama a função run com os seguintes parâmetros:

run(
    args.model_dir,
    pubchem.SimpleFeatureExtraction(source),
    sink,
    beam_options)

Primeiro, o código passa a transformação pubchem.SimpleFeatureExtraction(source) como a transformação feature_extraction. Essa transformação, que também foi usada na fase de pré-processamento, é aplicada ao canal:

class SimpleFeatureExtraction(beam.PTransform):
  """The feature extraction (element-wise transformations).

  We create a `PTransform` class. This `PTransform` is a bundle of
  transformations that can be applied to any other pipeline as a step.

  We'll extract all the raw features here. Due to the nature of `PTransform`s,
  we can only do element-wise transformations here. Anything that requires a
  full-pass of the data (such as feature scaling) has to be done with
  tf.Transform.
  """
  def __init__(self, source):
    super(SimpleFeatureExtraction, self).__init__()
    self.source = source

  def expand(self, p):
    # Return the preprocessing pipeline. In this case we're reading the PubChem
    # files, but the source could be any Apache Beam source.
    return (p
        | 'Read raw molecules' >> self.source
        | 'Format molecule' >> beam.ParDo(FormatMolecule())
        | 'Count atoms' >> beam.ParDo(CountAtoms())
    )

A transformação lê a fonte apropriada com base no modo de execução do pipeline (lote ou streaming), formata as moléculas e conta os átomos diferentes em cada molécula.

Em seguida, beam.ParDo(Predict(…)) é aplicado ao pipeline que realiza a previsão da energia molecular. Predict, o DoFn que passou, usa o dicionário fornecido de atributos de entrada (contagem de átomos), para prever a energia molecular.

A próxima transformação aplicada ao pipeline é beam.Map(lambda result: json.dumps(result)), que pega o dicionário do resultado de previsão e o serializa em uma string JSON.

Por fim, a saída é gravada no coletor, como arquivos de texto no diretório de trabalho para o modo de lote ou como mensagens publicadas em um tópico do Pub/Sub para o modo de streaming.

Previsões em lote

As predições em lote são otimizadas por capacidade em vez de latência. Elas funcionam melhor se você estiver fazendo muitas predições e puder esperar que todas terminem antes de ter os resultados.

Executar o canal de predição no modo em lote

Execute o pipeline de previsão em lote localmente:

# For simplicity, we'll use the same files we used for training
python predict.py \
  --model-dir $MODEL_DIR \
  batch \
  --inputs-dir /tmp/cloudml-samples/molecules/data \
  --outputs-dir /tmp/cloudml-samples/molecules/predictions

Ou execute o pipeline de previsão em lote no Dataflow:

# For simplicity, we'll use the same files we used for training
PROJECT=$(gcloud config get-value project)
WORK_DIR=gs://<your bucket name>/cloudml-samples/molecules
python predict.py \
  --work-dir $WORK_DIR \
  --model-dir $MODEL_DIR \
  batch \
  --project $PROJECT \
  --runner DataflowRunner \
  --temp_location $WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --inputs-dir $WORK_DIR/data \
  --outputs-dir $WORK_DIR/predictions

Depois que o pipeline estiver em execução, será possível visualizar o progresso dele na interface de monitoramento do Dataflow:

Canal de predição do Molecules

Previsões de streaming

As predições de streaming são otimizadas por latência em vez de capacidade. Elas funcionam melhor se você estiver fazendo previsões esporádicas, mas quiser os resultados o mais rápido possível.

O serviço de previsão (o pipeline de previsão de streaming) recebe as moléculas de entrada de um tópico do Pub/Sub e publica a saída (previsões) em outro tópico Pub/Sub.

Crie o tópico do Pub/Sub de entradas:

gcloud pubsub topics create molecules-inputs

Crie o tópico do Pub/Sub de saídas:

gcloud pubsub topics create molecules-predictions

Execute o pipeline de previsão de streaming localmente:

# Run on terminal 1
PROJECT=$(gcloud config get-value project)
python predict.py \
  --model-dir $MODEL_DIR \
  stream \
  --project $PROJECT
  --inputs-topic molecules-inputs \
  --outputs-topic molecules-predictions

Ou execute o pipeline de previsão de streaming no Dataflow:

# Run on terminal 1
PROJECT=$(gcloud config get-value project)
WORK_DIR=gs://<your bucket name>/cloudml-samples/molecules
python predict.py \
  --work-dir $WORK_DIR \
  --model-dir $MODEL_DIR \
  stream \
  --project $PROJECT
  --runner DataflowRunner \
  --temp_location $WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --inputs-topic molecules-inputs \
  --outputs-topic molecules-predictions

Depois de executar o serviço de previsão (o pipeline de previsão de streaming), é necessário executar um editor para enviar moléculas para o serviço de previsão e um assinante para escutar os resultados. O exemplo de código do Molecules fornece serviços de editor (publisher.py) e de assinante (subscriber.py).

O editor analisa arquivos SDF de um diretório e os publica no tópico de entradas. O assinante escuta os resultados da predição e os registra. Para simplificar, neste exemplo usaremos os mesmos arquivos SDF usados ​​na fase de treinamento.

Execute o assinante:

# Run on terminal 2
python subscriber.py \
  --project $PROJECT \
  --topic molecules-predictions

Execute o editor:

# Run on terminal 3
python publisher.py \
  --project $PROJECT \
  --topic molecules-inputs \
  --inputs-dir $WORK_DIR/data

Depois que o editor começar a analisar e publicar moléculas, você começará a ver as predições do assinante.

Limpar

Depois de concluir a execução do canal de predições de streaming, interrompa o canal para evitar cobranças.

Próximas etapas