वर्टेक्स पाइपलाइनों के लिए सरल टीएफएक्स पाइपलाइन

यह नोटबुक-आधारित ट्यूटोरियल एक साधारण TFX पाइपलाइन बनाएगा और इसे Google क्लाउड वर्टेक्स पाइपलाइनों का उपयोग करके चलाएगा। यह नोटबुक TFX पाइपलाइन पर आधारित है जिसे हमने Simple TFX पाइपलाइन ट्यूटोरियल में बनाया है। यदि आप टीएफएक्स से परिचित नहीं हैं और आपने अभी तक उस ट्यूटोरियल को नहीं पढ़ा है, तो आपको इस नोटबुक के साथ आगे बढ़ने से पहले इसे पढ़ना चाहिए।

Google क्लाउड वर्टेक्स पाइपलाइन आपके एमएल वर्कफ़्लो को सर्वर रहित तरीके से व्यवस्थित करके आपके एमएल सिस्टम को स्वचालित, मॉनिटर और नियंत्रित करने में आपकी सहायता करता है। आप टीएफएक्स के साथ पायथन का उपयोग करके अपनी एमएल पाइपलाइनों को परिभाषित कर सकते हैं, और फिर Google क्लाउड पर अपनी पाइपलाइनों को निष्पादित कर सकते हैं। वर्टेक्स पाइपलाइनों के बारे में अधिक जानने के लिए वर्टेक्स पाइपलाइन परिचय देखें।

यह नोटबुक Google Colab या AI प्लेटफ़ॉर्म नोटबुक पर चलाने के लिए अभिप्रेत है। यदि आप इनमें से किसी एक का उपयोग नहीं कर रहे हैं, तो आप बस ऊपर "Google Colab में चलाएँ" बटन पर क्लिक कर सकते हैं।

सेट अप

इस नोटबुक को चलाने से पहले, सुनिश्चित करें कि आपके पास निम्नलिखित हैं:

कृपया अपने GCP प्रोजेक्ट को और अधिक कॉन्फ़िगर करने के लिए Vertex दस्तावेज़ देखें।

पायथन पैकेज स्थापित करें

हम लेखक एमएल पाइपलाइनों के लिए टीएफएक्स और केएफपी सहित आवश्यक पायथन पैकेज स्थापित करेंगे और वर्टेक्स पाइपलाइनों को नौकरियां जमा करेंगे।

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

क्या आपने रनटाइम को पुनरारंभ किया?

यदि आप Google Colab का उपयोग कर रहे हैं, जब आप पहली बार ऊपर सेल चलाते हैं, तो आपको "रनटाइम को पुनरारंभ करें" बटन पर क्लिक करके या "रनटाइम> रनटाइम पुनरारंभ करें ..." मेनू का उपयोग करके रनटाइम को पुनरारंभ करना होगा। ऐसा इसलिए है क्योंकि Colab संकुल को लोड करता है।

यदि आप Colab पर नहीं हैं, तो आप निम्न सेल के साथ रनटाइम को पुनरारंभ कर सकते हैं।

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

इस नोटबुक के लिए Google में लॉगिन करें

यदि आप इस नोटबुक को Colab पर चला रहे हैं, तो अपने उपयोगकर्ता खाते से प्रमाणित करें:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

यदि आप AI प्लेटफ़ॉर्म नोटबुक पर हैं , तो अगला भाग चलाने से पहले Google क्लाउड से प्रमाणित करें

gcloud auth login

टर्मिनल विंडो में (जिसे आप मेनू में फ़ाइल > नया के माध्यम से खोल सकते हैं)। आपको प्रति नोटबुक इंस्टेंस में केवल एक बार ऐसा करने की आवश्यकता है।

पैकेज संस्करणों की जाँच करें।

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1
TFX version: 1.6.0
KFP version: 1.8.11

चर सेट करें

हम नीचे पाइपलाइनों को अनुकूलित करने के लिए उपयोग किए जाने वाले कुछ चर स्थापित करेंगे। निम्नलिखित जानकारी की आवश्यकता है:

इसे चलाने से पहले नीचे दिए गए सेल में आवश्यक मान दर्ज करें

GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''      # <--- ENTER THIS
GCS_BUCKET_NAME = ''          # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

अपने प्रोजेक्ट का उपयोग करने के लिए gcloud सेट करें।

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-pipelines'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for input data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-pipelines

उदाहरण डेटा तैयार करें

हम उसी पामर पेंगुइन डेटासेट का उपयोग साधारण टीएफएक्स पाइपलाइन ट्यूटोरियल के रूप में करेंगे।

इस डेटासेट में चार संख्यात्मक विशेषताएं हैं जिन्हें पहले से ही [0,1] श्रेणी के लिए सामान्यीकृत किया गया था। हम एक वर्गीकरण मॉडल तैयार करेंगे जो पेंगुइन की species की भविष्यवाणी करता है।

हमें डेटासेट की अपनी प्रतिलिपि बनाने की आवश्यकता है। क्योंकि TFXexampleGen एक निर्देशिका से इनपुट पढ़ता है, हमें GCS पर एक निर्देशिका बनाने और डेटासेट की प्रतिलिपि बनाने की आवश्यकता है।

gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/".

सीएसवी फ़ाइल पर एक त्वरित नज़र डालें।

gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/penguins_processed.csv".

एक पाइपलाइन बनाएं

टीएफएक्स पाइपलाइनों को पायथन एपीआई का उपयोग करके परिभाषित किया गया है। हम एक पाइपलाइन को परिभाषित करेंगे जिसमें तीन घटक, CsvExampleGen, Trainer और Pusher शामिल हैं। पाइपलाइन और मॉडल की परिभाषा लगभग साधारण TFX पाइपलाइन ट्यूटोरियल के समान ही है।

अंतर केवल इतना है कि हमें metadata_connection_config सेट करने की आवश्यकता नहीं है जिसका उपयोग एमएल मेटाडेटा डेटाबेस का पता लगाने के लिए किया जाता है। चूंकि वर्टेक्स पाइपलाइन एक प्रबंधित मेटाडेटा सेवा का उपयोग करती है, इसलिए उपयोगकर्ताओं को इसकी देखभाल करने की आवश्यकता नहीं है, और हमें पैरामीटर निर्दिष्ट करने की आवश्यकता नहीं है।

वास्तव में पाइपलाइन को परिभाषित करने से पहले, हमें पहले ट्रेनर घटक के लिए एक मॉडल कोड लिखना होगा।

मॉडल कोड लिखें।

हम उसी मॉडल कोड का उपयोग करेंगे जैसा कि साधारण TFX पाइपलाइन ट्यूटोरियल में किया जाता है।

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils


from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

मॉड्यूल फ़ाइल को GCS में कॉपी करें जिसे पाइपलाइन घटकों से एक्सेस किया जा सकता है। चूंकि मॉडल प्रशिक्षण GCP पर होता है, इसलिए हमें इस मॉडल परिभाषा को अपलोड करने की आवश्यकता है।

अन्यथा, आप मॉड्यूल फ़ाइल सहित एक कंटेनर छवि बनाना चाहते हैं और पाइपलाइन चलाने के लिए छवि का उपयोग कर सकते हैं।

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-pipelines/".

एक पाइपलाइन परिभाषा लिखें

हम TFX पाइपलाइन बनाने के लिए एक फ़ंक्शन को परिभाषित करेंगे।

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified because we don't need `metadata_path` argument.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     ) -> tfx.dsl.Pipeline:
  """Creates a three component penguin pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  # Following three components will be included in the pipeline.
  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

पाइपलाइन को वर्टेक्स पाइपलाइनों पर चलाएँ।

हमने LocalDagRunner का उपयोग किया है जो साधारण TFX पाइपलाइन ट्यूटोरियल में स्थानीय वातावरण पर चलता है। TFX आपकी पाइपलाइन को चलाने के लिए कई ऑर्केस्ट्रेटर प्रदान करता है। इस ट्यूटोरियल में हम Kubeflow V2 dag रनर के साथ वर्टेक्स पाइपलाइनों का उपयोग करेंगे।

हमें वास्तव में पाइपलाइन चलाने के लिए एक धावक को परिभाषित करने की आवश्यकता है। आप टीएफएक्स एपीआई का उपयोग करके अपनी पाइपलाइन को हमारे पाइपलाइन परिभाषा प्रारूप में संकलित करेंगे।

import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
# Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR))

उत्पन्न परिभाषा फ़ाइल kfp क्लाइंट का उपयोग करके प्रस्तुत की जा सकती है।

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

अब आप प्रगति देखने के लिए Google क्लाउड कंसोल में 'वर्टेक्स एआई> पाइपलाइन' पर जा सकते हैं।