정점 파이프라인을 위한 간단한 TFX 파이프라인

이 노트북 기반 가이드에서는 간단한 TFX 파이프라인을 만들고 Google Cloud Vertex Pipelines를 사용하여 실행합니다. 이 노트북은 Simple TFX Pipeline Tutorial 에서 구축한 TFX 파이프라인을 기반으로 합니다. TFX에 익숙하지 않고 해당 자습서를 아직 읽지 않았다면 이 노트북을 계속 진행하기 전에 읽어야 합니다.

Google Cloud Vertex Pipelines를 사용하면 서버리스 방식으로 ML 워크플로를 조정하여 ML 시스템을 자동화, 모니터링, 관리할 수 있습니다. TFX와 함께 Python을 사용하여 ML 파이프라인을 정의한 다음 Google Cloud에서 파이프라인을 실행할 수 있습니다. 정점 파이프라인에 대한 자세한 내용은 정점 파이프라인 소개 를 참조하세요.

이 메모장은 Google Colab 또는 AI Platform Notebooks 에서 실행하도록 만들어졌습니다. 이들 중 하나를 사용하지 않는 경우 위의 "Google Colab에서 실행" 버튼을 클릭하기만 하면 됩니다.

설정

이 노트북을 실행하기 전에 다음 사항이 있는지 확인하십시오.

GCP 프로젝트를 추가로 구성하려면 Vertex 문서 를 참조하세요.

파이썬 패키지 설치

TFX 및 KFP를 포함한 필수 Python 패키지를 설치하여 ML 파이프라인을 작성하고 Vertex Pipelines에 작업을 제출합니다.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

런타임을 다시 시작하셨습니까?

Google Colab을 사용하는 경우 위의 셀을 처음 실행할 때 위의 "RESTART RUNTIME" 버튼을 클릭하거나 "런타임 > 런타임 다시 시작..." 메뉴를 사용하여 런타임을 다시 시작해야 합니다. Colab이 패키지를 로드하는 방식 때문입니다.

Colab에 있지 않다면 다음 셀을 사용하여 런타임을 다시 시작할 수 있습니다.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

이 노트북을 사용하려면 Google에 로그인하세요.

Colab에서 이 노트북을 실행하는 경우 사용자 계정으로 인증하세요.

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

AI Platform Notebooks 를 사용 중인 경우 다음 섹션을 실행하기 전에 다음을 실행하여 Google Cloud에 인증하세요.

gcloud auth login

터미널 창 에서(메뉴에서 파일 > 새로 만들기를 통해 열 수 있음). 이 작업은 노트북 인스턴스당 한 번만 수행하면 됩니다.

패키지 버전을 확인하십시오.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1
TFX version: 1.6.0
KFP version: 1.8.11

변수 설정

아래에서 파이프라인을 사용자 지정하는 데 사용되는 몇 가지 변수를 설정합니다. 다음 정보가 필요합니다.

  • GCP 프로젝트 ID 프로젝트 ID 식별을 참조하십시오.
  • 파이프라인을 실행할 GCP 리전 Vertex Pipelines를 사용할 수 있는 지역에 대한 자세한 내용은 Vertex AI 위치 가이드 를 참조하십시오.
  • 파이프라인 출력을 저장하기 위한 Google Cloud Storage 버킷.

실행하기 전에 아래 셀에 필요한 값을 입력하십시오 .

GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''      # <--- ENTER THIS
GCS_BUCKET_NAME = ''          # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

프로젝트를 사용하도록 gcloud 를 설정합니다.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-pipelines'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for input data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-pipelines

예시 데이터 준비

Simple TFX Pipeline Tutorial 과 동일한 Palmer Penguins 데이터셋 을 사용할 것입니다.

이 데이터 세트에는 이미 [0,1] 범위를 갖도록 정규화된 4개의 숫자 기능이 있습니다. 우리는 펭귄의 species 을 예측하는 분류 모델을 구축할 것입니다.

데이터 세트의 자체 복사본을 만들어야 합니다. TFX ExampleGen은 디렉터리에서 입력을 읽기 때문에 디렉터리를 만들고 GCS에서 데이터 집합을 복사해야 합니다.

gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/".

CSV 파일을 간단히 살펴보세요.

gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/penguins_processed.csv".

파이프라인 생성

TFX 파이프라인은 Python API를 사용하여 정의됩니다. CsvExampleGen, Trainer 및 Pusher의 세 가지 구성 요소로 구성된 파이프라인을 정의합니다. 파이프라인 및 모델 정의는 Simple TFX Pipeline Tutorial 과 거의 동일합니다.

유일한 차이점은 ML 메타데이터 데이터베이스를 찾는 데 사용되는 metadata_connection_config 를 설정할 필요가 없다는 것입니다. Vertex Pipelines는 관리형 메타데이터 서비스를 사용하기 때문에 사용자가 신경 쓸 필요가 없고 매개변수를 지정할 필요도 없습니다.

파이프라인을 실제로 정의하기 전에 먼저 Trainer 구성 요소에 대한 모델 코드를 작성해야 합니다.

모델 코드를 작성합니다.

Simple TFX Pipeline Tutorial 에서와 동일한 모델 코드를 사용합니다.

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils


from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

파이프라인 구성 요소에서 액세스할 수 있는 GCS에 모듈 파일을 복사합니다. 모델 학습은 GCP에서 이루어지므로 이 모델 정의를 업로드해야 합니다.

그렇지 않으면 모듈 파일을 포함하는 컨테이너 이미지를 빌드하고 이미지를 사용하여 파이프라인을 실행할 수 있습니다.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-pipelines/".

파이프라인 정의 작성

TFX 파이프라인을 생성하는 함수를 정의합니다.

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified because we don't need `metadata_path` argument.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     ) -> tfx.dsl.Pipeline:
  """Creates a three component penguin pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  # Following three components will be included in the pipeline.
  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

정점 파이프라인에서 파이프라인을 실행합니다.

Simple TFX Pipeline Tutorial 에서는 로컬 환경에서 실행되는 LocalDagRunner 를 사용했습니다. TFX는 파이프라인을 실행할 여러 오케스트레이터를 제공합니다. 이 튜토리얼에서 우리는 Kubeflow V2 dag runner와 함께 Vertex Pipelines를 사용할 것입니다.

파이프라인을 실제로 실행하려면 러너를 정의해야 합니다. TFX API를 사용하여 파이프라인을 파이프라인 정의 형식으로 컴파일합니다.

import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
# Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR))

생성된 정의 파일은 kfp 클라이언트를 사용하여 제출할 수 있습니다.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

이제 Google Cloud Console 에서 'Vertex AI > Pipelines'를 방문하여 진행 상황을 확인할 수 있습니다.