Treten Sie der SIG TFX-Addons-Community bei und helfen Sie mit, TFX noch besser zu machen! SIG TFX-Addons beitreten

Einfache TFX-Pipeline für Vertex-Pipelines

In dieser Notebook-basierten Anleitung wird eine einfache TFX-Pipeline erstellt und mit Google Cloud Vertex Pipelines ausgeführt. Dieses Notebook basiert auf der TFX - Pipeline wir in gebaut Einfache TFX Pipeline Tutorial . Wenn Sie mit TFX nicht vertraut sind und dieses Tutorial noch nicht gelesen haben, sollten Sie es lesen, bevor Sie mit diesem Notebook fortfahren.

Google Cloud Vertex Pipelines unterstützt Sie bei der Automatisierung, Überwachung und Steuerung Ihrer ML-Systeme durch die serverlose Orchestrierung Ihres ML-Workflows. Sie können Ihre ML-Pipelines mithilfe von Python mit TFX definieren und dann Ihre Pipelines in Google Cloud ausführen. Siehe Vertex Pipelines Einführung mehr über Vertex Pipelines zu lernen.

Das Notebook soll auf ausgeführt werden Google Colab oder auf AI - Plattform Notebooks . Wenn Sie keines davon verwenden, können Sie einfach oben auf die Schaltfläche "In Goolge Colab ausführen" klicken.

Einrichten

Stellen Sie vor dem Ausführen dieses Notizbuchs sicher, dass Sie über Folgendes verfügen:

Bitte beachten Sie Vertex Dokumentation Ihres GCP - Projekt weiter zu konfigurieren.

Python-Pakete installieren

Wir installieren erforderliche Python-Pakete, einschließlich TFX und KFP, um ML-Pipelines zu erstellen und Jobs an Vertex Pipelines zu senden.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade tfx==0.30.0 kfp==1.6.1

Hast du die Laufzeit neu gestartet?

Wenn Sie Google Colab verwenden und die obige Zelle zum ersten Mal ausführen, müssen Sie die Laufzeit neu starten, indem Sie auf die Schaltfläche "LAUFZEIT NEU STARTEN" klicken oder das Menü "Laufzeit > Laufzeit neu starten ..." verwenden. Dies liegt an der Art und Weise, wie Colab Pakete lädt.

Wenn Sie nicht auf Colab sind, können Sie die Laufzeit mit der folgenden Zelle neu starten.

import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Melden Sie sich für dieses Notizbuch bei Google an

Wenn Sie dieses Notizbuch auf Colab ausführen, authentifizieren Sie sich mit Ihrem Benutzerkonto:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

Wenn Sie auf AI - Plattform Notebooks, Authentifizieren mit Google Cloud , bevor Sie den nächsten Abschnitt ausgeführt wird , indem Sie

gcloud auth login

im Terminal - Fenster (das Sie über Datei> Neu im Menü öffnen kann). Sie müssen dies nur einmal pro Notebook-Instanz tun.

Überprüfen Sie die Paketversionen.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 1.15.5
WARNING:absl:tfx_bsl.tfxio.tensor_to_arrow can only handle evaluated tensors (i.e. ndarays or SparseTensorValues) in TF 1.x.
WARNING:absl:RuntimeParameter is only supported on Cloud-based DAG runner currently.
TFX version: 0.30.0
KFP version: 1.6.1

Variablen einrichten

Wir werden einige Variablen einrichten, die verwendet werden, um die Pipelines unten anzupassen. Folgende Angaben sind erforderlich:

  • GCP-Projekt-ID. Siehe Identifizieren Projekt - ID .
  • GCP-Region zum Ausführen von Pipelines. Weitere Informationen zu den Regionen , dass Vertex Pipelines in verfügbar ist, finden Sie in der Vertex AI Standorten Führung .
  • Google Cloud Storage Bucket zum Speichern von Pipelineausgaben.

Geben Sie die erforderlichen Werte in der Zelle unten , bevor es ausgeführt wird .

GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''      # <--- ENTER THIS
GCS_BUCKET_NAME = ''          # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

Set gcloud Ihr Projekt zu verwenden.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-pipelines'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for input data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-pipelines

Beispieldaten vorbereiten

Wir werden das gleiche verwenden Palmer Penguins - Datensatz als einfache TFX Pipeline Tutorial .

Es gibt vier numerische Merkmale in diesem Datensatz, die bereits auf den Bereich [0,1] normalisiert wurden. Wir werden ein Klassifikationsmodell bauen , die die vorhersagt species von Pinguinen.

Wir müssen unsere eigene Kopie des Datensatzes erstellen. Da TFX ExampleGen Eingaben aus einem Verzeichnis liest, müssen wir ein Verzeichnis erstellen und den Datensatz auf GCS dorthin kopieren.

gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/".

Sehen Sie sich die CSV-Datei kurz an.

gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/penguins_processed.csv".

Erstellen Sie eine Pipeline

TFX-Pipelines werden mithilfe von Python-APIs definiert. Wir werden eine Pipeline definieren, die aus den drei Komponenten CsvExampleGen, Trainer und Pusher besteht. Die Pipeline und Modelldefinition ist fast die gleiche wie einfache TFX Pipeline Tutorial .

Der einzige Unterschied ist , dass wir nicht zu Satz brauchen metadata_connection_config die verwendet wird , lokalisieren ML Metadaten - Datenbank. Da Vertex Pipelines einen verwalteten Metadatendienst verwendet, müssen sich die Benutzer nicht darum kümmern, und wir müssen den Parameter nicht angeben.

Bevor wir die Pipeline tatsächlich definieren, müssen wir zunächst einen Modellcode für die Trainer-Komponente schreiben.

Modellcode schreiben.

Wir werden das gleiche Modell Code wie bei der Verwendung Einfache TFX Pipeline Tutorial .

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils


from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Kopieren Sie die Moduldatei in GCS, auf die von den Pipelinekomponenten zugegriffen werden kann. Da das Modelltraining auf der GCP stattfindet, müssen wir diese Modelldefinition hochladen.

Andernfalls möchten Sie möglicherweise ein Container-Image einschließlich der Moduldatei erstellen und das Image zum Ausführen der Pipeline verwenden.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-pipelines/".

Schreiben Sie eine Pipeline-Definition

Wir werden eine Funktion zum Erstellen einer TFX-Pipeline definieren.

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified because we don't need `metadata_path` argument.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     ) -> tfx.dsl.Pipeline:
  """Creates a three component penguin pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  # Following three components will be included in the pipeline.
  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

Führen Sie die Pipeline auf Vertex-Pipelines aus.

Wir verwenden LocalDagRunner , die in der lokalen Umgebung ausgeführt Einfaches TFX Pipeline Tutorial . TFX bietet mehrere Orchestratoren zum Ausführen Ihrer Pipeline. In diesem Tutorial verwenden wir die Vertex Pipelines zusammen mit dem Kubeflow V2 Dag Runner.

Wir müssen einen Runner definieren, um die Pipeline tatsächlich auszuführen. Sie kompilieren Ihre Pipeline mithilfe von TFX-APIs in unser Pipeline-Definitionsformat.

import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
# Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR))

Die generierte Definitionsdatei kann mit dem kfp-Client übermittelt werden.

from kfp.v2.google import client

pipelines_client = client.AIPlatformClient(
    project_id=GOOGLE_CLOUD_PROJECT,
    region=GOOGLE_CLOUD_REGION,
)

_ = pipelines_client.create_run_from_job_spec(PIPELINE_DEFINITION_FILE)

Jetzt können Sie den Link in der Ausgabe über besuchen oder Besuch ‚Vertex AI> Pipelines‘ in Google Cloud Console den Fortschritt zu sehen.