Prosty samouczek dotyczący potoku TFX przy użyciu zestawu danych Penguin

Krótki samouczek dotyczący uruchamiania prostego potoku TFX.

W tym samouczku opartym na notatniku utworzymy i uruchomimy potok TFX dla prostego modelu klasyfikacji. Potok będzie składał się z trzech podstawowych komponentów TFX: ExampleGen, Trainer i Pusher. Potok obejmuje najbardziej minimalny przepływ pracy ML, taki jak importowanie danych, trenowanie modelu i eksportowanie trenowanego modelu.

Proszę zobaczyć Zrozumienie TFX Rurociągi , aby dowiedzieć się więcej na temat różnych koncepcji w TFX.


Najpierw musimy zainstalować pakiet TFX Python i pobrać zestaw danych, którego użyjemy dla naszego modelu.

Ulepsz Pip

Aby uniknąć aktualizacji Pip w systemie, gdy działa on lokalnie, upewnij się, że działamy w Colab. Systemy lokalne można oczywiście aktualizować oddzielnie.

  import colab
  !pip install --upgrade pip

Zainstaluj TFX

pip install -U tfx

Czy uruchomiłeś ponownie środowisko wykonawcze?

Jeśli korzystasz z Google Colab, przy pierwszym uruchomieniu powyższej komórki musisz ponownie uruchomić środowisko wykonawcze, klikając powyżej przycisk „RESTART RUNTIME” lub korzystając z menu „Runtime > Restart runtime...”. Wynika to ze sposobu, w jaki Colab ładuje paczki.

Sprawdź wersje TensorFlow i TFX.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
TensorFlow version: 2.6.2
TFX version: 1.4.0

Ustaw zmienne

Istnieje kilka zmiennych używanych do zdefiniowania potoku. Możesz dostosować te zmienne, jak chcesz. Domyślnie wszystkie dane wyjściowe z potoku będą generowane w bieżącym katalogu.

import os

PIPELINE_NAME = "penguin-simple"

# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.

Przygotuj przykładowe dane

Pobierzemy przykładowy zestaw danych do użycia w naszym potoku TFX. Zbiór danych używamy jest Palmer Penguins zestaw danych , który jest również stosowany w innych przykładach TFX .

Ten zbiór danych zawiera cztery funkcje liczbowe:

  • culmen_length_mm
  • culmen_depth_mm
  • flipper_length_mm
  • body_mass_g

Wszystkie cechy zostały już znormalizowane do zakresu [0,1]. Będziemy budować model klasyfikacji który przewiduje species pingwinów.

Ponieważ TFX ExampleGen odczytuje dane wejściowe z katalogu, musimy utworzyć katalog i skopiować do niego zbiór danych.

import urllib.request
import tempfile

DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data')  # Create a temporary directory.
_data_url = ''
_data_filepath = os.path.join(DATA_ROOT, "data.csv")
urllib.request.urlretrieve(_data_url, _data_filepath)
('/tmp/tfx-dataijanq9u3/data.csv', <http.client.HTTPMessage at 0x7f487953d110>)

Rzuć okiem na plik CSV.

head {_data_filepath}

Powinieneś być w stanie zobaczyć pięć wartości. species jest 0, 1 lub 2, a wszystkie inne cechy powinny mieć wartości między 0 i 1.

Utwórz potok

Potoki TFX są definiowane za pomocą interfejsów API Pythona. Zdefiniujemy potok składający się z trzech następujących elementów.

  • CsvExampleGen: odczytuje pliki danych i konwertuje je do wewnętrznego formatu TFX w celu dalszego przetwarzania. Istnieje wiele ExampleGen s dla różnych formatów. W tym samouczku użyjemy CsvExampleGen, który pobiera dane wejściowe z pliku CSV.
  • Trener: Trenuje model ML. Komponent trener wymaga model kodu rozdzielczości od użytkowników. Można użyć TensorFlow API w celu określenia sposobu trenowania modelu i zapisać go w formacie _saved modelu.
  • Pusher: kopiuje wyszkolony model poza potok TFX. Popychacz komponent może być traktowane w procesie wdrażania wyszkolonego modelu ML.

Zanim faktycznie zdefiniujemy potok, musimy najpierw napisać kod modelu dla komponentu Trainer.

Napisz kod szkolenia modelu

Stworzymy prosty model DNN do klasyfikacji za pomocą TensorFlow Keras API. Ten kod szkolenia modelu zostanie zapisany w osobnym pliku.

W tym tutorialu użyjemy Generic Trainer z TFX które obsługują modele Keras oparte. Trzeba zapisać plik Pythona zawierający run_fn funkcję, która jest punkt_wejścia dla Trainer komponentu.

_trainer_module_file = ''
%%writefile {_trainer_module_file}

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2

    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
_LABEL_KEY = 'species'


# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
        feature:[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
    _LABEL_KEY:[1], dtype=tf.int64)

def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int = 200) ->
  """Generates features and label for training.

    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  return data_accessor.tf_dataset_factory(
          batch_size=batch_size, label_key=_LABEL_KEY),

def _build_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

    A Keras Model.
  # The model below is built with Functional API, please refer to
  # for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)

  return model

# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

    fn_args: Holds args used to train the model as name/value pairs.

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
  eval_dataset = _input_fn(

  model = _build_keras_model()

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory., save_format='tf')

Teraz wykonałeś wszystkie kroki przygotowawcze do zbudowania potoku TFX.

Napisz definicję potoku

Definiujemy funkcję do tworzenia potoku TFX. Pipeline przedmiot stanowi rurociąg TFX, który może być uruchamiany za pomocą jednego z systemów instrumentacji gazociągu TFX podpór.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates a three component penguin pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(

  # Following three components will be included in the pipeline.
  components = [

  return tfx.dsl.Pipeline(

Uruchom rurociąg

TFX obsługuje wiele orkiestratorów do uruchamiania potoków. W tym tutorialu użyjemy LocalDagRunner który jest zawarty w pakiecie i biegnie TFX Pythona rurociągów na środowisko lokalne. Często nazywamy potoki TFX „DAGs”, co oznacza skierowany graf acykliczny.

LocalDagRunner zapewnia szybki iteracji dla Developemnt i debugowania. TFX obsługuje również inne orkiestratory, w tym Kubeflow Pipelines i Apache Airflow, które są odpowiednie do zastosowań produkcyjnych.

Zobacz TFX na chmura AI Platforma rurociągów lub TFX Airflow Tutorial , aby dowiedzieć się więcej na temat innych systemów instrumentacji.

Teraz tworzymy LocalDagRunner i zdać Pipeline obiektu utworzonego z funkcji już zdefiniowany.

Potok działa bezpośrednio i można wyświetlić dzienniki postępu potoku, w tym trenowanie modelu ML.

Powinieneś zobaczyć "INFO:absl:Component Pusher jest gotowy." na końcu dzienników, jeśli potok zakończył się pomyślnie. Ponieważ Pusher składnikiem jest ostatnim elementem rurociągu.

Popychacz składowa popycha wyszkolony modelu do SERVING_MODEL_DIR który jest serving_model/penguin-simple katalogu, jeśli nie zmieni zmienne w poprzednich krokach. Możesz zobaczyć wynik w przeglądarce plików w lewym panelu w Colab lub za pomocą następującego polecenia:

# List files in created model directory.

Następne kroki

Można znaleźć więcej środków na

Proszę zobaczyć Zrozumienie TFX Rurociągi , aby dowiedzieć się więcej na temat różnych koncepcji w TFX.