Introdução ao pipeline de classificação do TensorFlow

DR : Reduza o código padrão para construir, treinar e servir modelos do TensorFlow Ranking com TensorFlow Ranking Pipelines; Use estratégias distribuídas adequadas para aplicações de classificação em grande escala, de acordo com o caso de uso e os recursos.

Introdução

O TensorFlow Ranking Pipeline consiste em uma série de processos de processamento de dados, criação de modelos, treinamento e atendimento que permitem construir, treinar e fornecer modelos de classificação escalonáveis ​​baseados em redes neurais a partir de registros de dados com o mínimo de esforço. O pipeline é mais eficiente quando o sistema é ampliado. Em geral, se o seu modelo levar 10 minutos ou mais para ser executado em uma única máquina, considere usar esta estrutura de pipeline para distribuir a carga e acelerar o processamento.

O TensorFlow Ranking Pipeline tem sido executado de forma constante e estável em experimentos e produções em grande escala com big data (terabytes+) e grandes modelos (100 milhões+ de FLOPs) em sistemas distribuídos (1K+ CPU e mais de 100 GPU e TPUs). Depois que um modelo do TensorFlow é comprovado com model.fit em uma pequena parte dos dados, o pipeline é recomendado para verificação de hiperparâmetros, treinamento contínuo e outras situações de grande escala.

Pipeline de classificação

No TensorFlow, um pipeline típico para criar, treinar e fornecer um modelo de classificação inclui as etapas típicas a seguir.

  • Defina a estrutura do modelo:
    • Criar insumos;
    • Crie camadas de pré-processamento;
    • Criar arquitetura de rede neural;
  • Modelo de trem:
    • Gere conjuntos de dados de treinamento e validação a partir de logs de dados;
    • Prepare o modelo com hiperparâmetros adequados:
      • Otimizador;
      • Perdas de classificação;
      • Métricas de Classificação;
    • Configure estratégias distribuídas para treinar em vários dispositivos.
    • Configure retornos de chamada para diversas escriturações contábeis.
    • Modelo de exportação para atendimento;
  • Modelo de serviço:
    • Determine o formato dos dados na veiculação;
    • Escolha e carregue o modelo treinado;
    • Processo com modelo carregado.

Um dos principais objetivos do pipeline do TensorFlow Ranking é reduzir o código padrão nas etapas, como carregamento e pré-processamento do conjunto de dados, compatibilidade de dados listwise e função de pontuação pontual e exportação de modelo. O outro objetivo importante é impor o design consistente de muitos processos inerentemente correlacionados, por exemplo, as entradas do modelo devem ser compatíveis com os conjuntos de dados de treinamento e com o formato dos dados no serviço.

Guia de uso

Com todo o design acima, o lançamento de um modelo de classificação TF se enquadra nas etapas a seguir, conforme mostrado na Figura 1.

Diagrama do pipeline de classificação do TensorFlow
Figura 1 : Diagrama de classes do TensorFlow Ranking e etapas para treinar modelos de classificação com o pipeline TF Ranking. Os módulos verdes podem ser personalizados para o seu modelo de classificação.

Exemplo usando uma rede neural distribuída

Neste exemplo, você aproveitará os recursos integrados tfr.keras.model.FeatureSpecInputCreator , tfr.keras.pipeline.SimpleDatasetBuilder e tfr.keras.pipeline.SimplePipeline que utilizam feature_spec s para definir consistentemente os recursos de entrada nas entradas do modelo e servidor de conjunto de dados. A versão em notebook com passo a passo pode ser encontrada no tutorial de classificação distribuída .

Primeiro defina feature_spec s para recursos de contexto e de exemplo.

context_feature_spec = {}
example_feature_spec = {
    'custom_features_{}'.format(i + 1):
    tf.io.FixedLenFeature(shape=(1,), dtype=tf.float32, default_value=0.0)
    for i in range(10)
}
label_spec = ('utility', tf.io.FixedLenFeature(
    shape=(1,), dtype=tf.float32, default_value=-1))

Siga as etapas ilustradas na Figura 1:
Defina input_creator de feature_spec s.

input_creator = tfr.keras.model.FeatureSpecInputCreator(
    context_feature_spec, example_feature_spec)

Em seguida, defina transformações de recursos de pré-processamento para o mesmo conjunto de recursos de entrada.

def log1p(tensor):
    return tf.math.log1p(tensor * tf.sign(tensor)) * tf.sign(tensor)
preprocessor = {
    'custom_features_{}'.format(i + 1): log1p
    for i in range(10)
}

Defina o marcador com modelo DNN feedforward integrado.

dnn_scorer = tfr.keras.model.DNNScorer(
    hidden_layer_dims=[1024, 512, 256],
    output_units=1,
    activation=tf.nn.relu,
    use_batch_norm=True,
    batch_norm_moment=0.99,
    dropout=0.4)

Faça o model_builder com input_creator , preprocessor e scorer .

model_builder = tfr.keras.model.ModelBuilder(
    input_creator=input_creator,
    preprocessor=preprocessor,
    scorer=dnn_scorer,
    mask_feature_name='__list_mask__',
    name='web30k_dnn_model')

Agora defina os hiperparâmetros para dataset_builder .

dataset_hparams = tfr.keras.pipeline.DatasetHparams(
    train_input_pattern='/path/to/MSLR-WEB30K-ELWC/train-*',
    valid_input_pattern='/path/to/MSLR-WEB30K-ELWC/vali-*',
    train_batch_size=128,
    valid_batch_size=128,
    list_size=200,
    dataset_reader=tf.data.RecordIODataset,
    convert_labels_to_binary=False)

Faça o dataset_builder .

tfr.keras.pipeline.SimpleDatasetBuilder(
    context_feature_spec=context_feature_spec,
    example_feature_spec=example_feature_spec,
    mask_feature_name='__list_mask__',
    label_spec=label_spec,
    hparams=dataset_hparams)

Defina também os hiperparâmetros para o pipeline.

pipeline_hparams = tfr.keras.pipeline.PipelineHparams(
    model_dir='/tmp/web30k_dnn_model',
    num_epochs=100,
    num_train_steps=100000,
    num_valid_steps=100,
    loss='softmax_loss',
    loss_reduction=tf.losses.Reduction.AUTO,
    optimizer='adam',
    learning_rate=0.0001,
    steps_per_execution=100,
    export_best_model=True,
    strategy='MirroredStrategy',
    tpu=None)

Faça o ranking_pipeline e treine.

ranking_pipeline = tfr.keras.pipeline.SimplePipeline(
    model_builder=model_builder,
    dataset_builder=dataset_builder,
    hparams=pipeline_hparams,
)
ranking_pipeline.train_and_validate()

Projeto do pipeline de classificação do TensorFlow

O TensorFlow Ranking Pipeline ajuda a economizar tempo de engenharia com código padrão e, ao mesmo tempo, permite flexibilidade de personalização por meio de substituição e subclasse. Para conseguir isso, o pipeline introduz classes personalizáveis tfr.keras.model.AbstractModelBuilder , tfr.keras.pipeline.AbstractDatasetBuilder e tfr.keras.pipeline.AbstractPipeline para configurar o pipeline do TensorFlow Ranking.

Design de classes de pipeline de classificação do TensorFlow
Figura 2 : Design geral das classes do TensorFlow Ranking Pipeline.

Construtor de Modelo

O código padrão relacionado à construção do modelo Keras é integrado no AbstractModelBuilder , que é passado para o AbstractPipeline e chamado dentro do pipeline para construir o modelo no escopo da estratégia. Isso é mostrado na Figura 1. Os métodos de classe são definidos na classe base abstrata.

class AbstractModelBuilder:
  def __init__(self, mask_feature_name, name):

  @abstractmethod
  def create_inputs(self):
    // To create tf.keras.Input. Abstract method, to be overridden.
    ...
  @abstractmethod
  def preprocess(self, context_inputs, example_inputs, mask):
    // To preprocess input features. Abstract method, to be overridden.
    ...
  @abstractmethod
  def score(self, context_features, example_features, mask):
    // To score based on preprocessed features. Abstract method, to be overridden.
    ...
  def build(self):
    context_inputs, example_inputs, mask = self.create_inputs()
    context_features, example_features = self.preprocess(
        context_inputs, example_inputs, mask)
    logits = self.score(context_features, example_features, mask)
    return tf.keras.Model(inputs=..., outputs=logits, name=self._name)

Você pode subclassificar diretamente o AbstractModelBuilder e sobrescrever com os métodos concretos para personalização, como

class MyModelBuilder(AbstractModelBuilder):
  def create_inputs(self, ...):
  ...

Ao mesmo tempo, você deve usar ModelBuilder com recursos de entrada, transformações de pré-processamento e funções de pontuação especificadas como entradas de função input_creator , preprocessor e scorer na classe init em vez de subclassificar.

class ModelBuilder(AbstractModelBuilder):
  def __init__(self, input_creator, preprocessor, scorer, mask_feature_name, name):
  ...

Para reduzir os padrões de criação dessas entradas, são fornecidas as classes de função tfr.keras.model.InputCreator para input_creator , tfr.keras.model.Preprocessor para preprocessor e tfr.keras.model.Scorer para scorer , juntamente com subclasses concretas tfr.keras.model.FeatureSpecInputCreator , tfr.keras.model.TypeSpecInputCreator , tfr.keras.model.PreprocessorWithSpec , tfr.keras.model.UnivariateScorer , tfr.keras.model.DNNScorer e tfr.keras.model.GAMScorer . Eles devem cobrir a maioria dos casos de uso comuns.

Observe que essas classes de função são classes Keras, portanto não há necessidade de serialização. A subclasse é a forma recomendada para personalizá-los.

Construtor de conjunto de dados

A classe DatasetBuilder coleta padrões relacionados ao conjunto de dados. Os dados são passados ​​para o Pipeline e chamados para servir os conjuntos de dados de treinamento e validação e para definir as assinaturas de serviço para modelos salvos. Conforme mostrado na Figura 1, os métodos DatasetBuilder são definidos na classe base tfr.keras.pipeline.AbstractDatasetBuilder ,

class AbstractDatasetBuilder:

  @abstractmethod
  def build_train_dataset(self, *arg, **kwargs):
    // To return the training dataset.
    ...
  @abstractmethod
  def build_valid_dataset(self, *arg, **kwargs):
    // To return the validation dataset.
    ...
  @abstractmethod
  def build_signatures(self, *arg, **kwargs):
    // To build the signatures to export saved model.
    ...

Em uma classe DatasetBuilder concreta, você deve implementar build_train_datasets , build_valid_datasets e build_signatures .

Uma classe concreta que cria conjuntos de dados a partir de feature_spec s também é fornecida:

class BaseDatasetBuilder(AbstractDatasetBuilder):

  def __init__(self, context_feature_spec, example_feature_spec,
               training_only_example_spec,
               mask_feature_name, hparams,
               training_only_context_spec=None):
    // Specify label and weight specs in training_only_example_spec.
    ...
  def _features_and_labels(self, features):
    // To split the labels and weights from input features.
    ...

  def _build_dataset(self, ...):
    return tfr.data.build_ranking_dataset(
        context_feature_spec+training_only_context_spec,
        example_feature_spec+training_only_example_spec, mask_feature_name, ...)

  def build_train_dataset(self):
    return self._build_dataset(...)

  def build_valid_dataset(self):
    return self._build_dataset(...)

  def build_signatures(self, model):
    return saved_model.Signatures(model, context_feature_spec,
                                  example_feature_spec, mask_feature_name)()

Os hparams usados ​​no DatasetBuilder são especificados na classe de dados tfr.keras.pipeline.DatasetHparams .

Gasoduto

O Ranking Pipeline é baseado na classe tfr.keras.pipeline.AbstractPipeline :

class AbstractPipeline:

  @abstractmethod
  def build_loss(self):
    // Returns a tf.keras.losses.Loss or a dict of Loss. To be overridden.
    ...
  @abstractmethod
  def build_metrics(self):
    // Returns a list of evaluation metrics. To be overridden.
    ...
  @abstractmethod
  def build_weighted_metrics(self):
    // Returns a list of weighted metrics. To be overridden.
    ...
  @abstractmethod
  def train_and_validate(self, *arg, **kwargs):
    // Main function to run the training pipeline. To be overridden.
    ...

Uma classe de pipeline concreta que treina o modelo com diferentes tf.distribute.strategy s compatíveis com model.fit também é fornecida:

class ModelFitPipeline(AbstractPipeline):

  def __init__(self, model_builder, dataset_builder, hparams):
    ...
  def build_callbacks(self):
    // Builds callbacks used in model.fit. Override for customized usage.
    ...
  def export_saved_model(self, model, export_to, checkpoint=None):
    if checkpoint:
      model.load_weights(checkpoint)
    model.save(export_to, signatures=dataset_builder.build_signatures(model))

  def train_and_validate(self, verbose=0):
    with self._strategy.scope():
      model = model_builder.build()
      model.compile(
          optimizer,
          loss=self.build_loss(),
          metrics=self.build_metrics(),
          loss_weights=self.hparams.loss_weights,
          weighted_metrics=self.build_weighted_metrics())
      train_dataset, valid_dataset = (
          dataset_builder.build_train_dataset(),
          dataset_builder.build_valid_dataset())
      model.fit(
          x=train_dataset,
          validation_data=valid_dataset,
          callbacks=self.build_callbacks(),
          verbose=verbose)
      self.export_saved_model(model, export_to=model_output_dir)

Os hparams usados ​​no tfr.keras.pipeline.ModelFitPipeline são especificados na classe de dados tfr.keras.pipeline.PipelineHparams . Esta classe ModelFitPipeline é suficiente para a maioria dos casos de uso do TF Ranking. Os clientes podem facilmente subclassificá-lo para fins específicos.

Suporte à estratégia distribuída

Consulte o treinamento distribuído para obter uma introdução detalhada das estratégias distribuídas suportadas pelo TensorFlow. Atualmente, o pipeline do TensorFlow Ranking oferece suporte tf.distribute.MirroredStrategy (padrão), tf.distribute.TPUStrategy , tf.distribute.MultiWorkerMirroredStrategy e tf.distribute.ParameterServerStrategy . A estratégia espelhada é compatível com a maioria dos sistemas de máquina única. Defina strategy como None para nenhuma estratégia distribuída.

Em geral, MirroredStrategy funciona para modelos relativamente pequenos na maioria dos dispositivos com opções de CPU e GPU. MultiWorkerMirroredStrategy funciona para grandes modelos que não cabem em um trabalhador. ParameterServerStrategy faz treinamento assíncrono e requer vários trabalhadores disponíveis. TPUStrategy é ideal para grandes modelos e big data quando TPUs estão disponíveis, no entanto, é menos flexível em termos de formatos de tensor que pode manipular.

Perguntas frequentes

  1. O conjunto mínimo de componentes para usar o RankingPipeline
    Veja o código de exemplo acima.

  2. E se eu tiver meu próprio model Keras
    Para ser treinado com estratégias tf.distribute , model precisa ser construído com todas as variáveis ​​treináveis ​​definidas em strategy.scope(). Então envolva seu modelo no ModelBuilder como,

class MyModelBuilder(AbstractModelBuilder):
  def __init__(self, model, context_feature_names, example_feature_names,
               mask_feature_name, name):
    super().__init__(mask_feature_name, name)
    self._model = model
    self._context_feature_names = context_feature_names
    self._example_feature_names = example_feature_names

  def create_inputs(self):
    inputs = self._model.input
    context_inputs = {inputs[name] for name in self._context_feature_names}
    example_inputs = {inputs[name] for name in self._example_feature_names}
    mask = inputs[self._mask_feature_name]
    return context_inputs, example_inputs, mask

  def preprocess(self, context_inputs, example_inputs, mask):
    return context_inputs, example_inputs, mask

  def score(self, context_features, example_features, mask):
    inputs = dict(
        list(context_features.items()) + list(example_features.items()) +
        [(self._mask_feature_name, mask)])
    return self._model(inputs)

model_builder = MyModelBuilder(model, context_feature_names, example_feature_names,
                               mask_feature_name, "my_model")

Em seguida, alimente este model_builder ao pipeline para treinamento adicional.