TFX ve Vertex Pipelines ile BigQuery'den veri okuma

Bu not defteri tabanlı eğitici, bir makine öğrenimi modelini eğitmek için veri kaynağı olarak Google Cloud BigQuery'yi kullanacak. ML ardışık düzeni, TFX kullanılarak oluşturulacak ve Google Cloud Vertex Pipelines üzerinde çalıştırılacaktır.

Bu not defteri, Vertex Pipelines Eğitimi için Simple TFX Pipeline'da oluşturduğumuz TFX ardışık düzenine dayanmaktadır. Bu öğreticiyi henüz okumadıysanız, bu not defterine geçmeden önce okumalısınız.

BigQuery , iş çevikliği için tasarlanmış sunucusuz, yüksek düzeyde ölçeklenebilir ve uygun maliyetli çoklu bulut veri ambarıdır. TFX, BigQuery'den eğitim verilerini okumak ve eğitilen modeli BigQuery'ye yayınlamak için kullanılabilir.

Bu eğiticide, BigQuery'den TFX ardışık düzenlerine veri okuyan BigQueryExampleGen bileşenini kullanacağız.

Bu not defterinin Google Colab veya AI Platform Not Defterlerinde çalıştırılması amaçlanmıştır. Bunlardan birini kullanmıyorsanız, yukarıdaki "Google Colab'da Çalıştır" düğmesini tıklamanız yeterlidir.

Kurmak

Vertex Pipelines Eğitimi için Simple TFX Pipeline'ı tamamladıysanız, çalışan bir GCP projeniz ve bir GCS paketiniz olacak ve bu eğitim için ihtiyacımız olan tek şey bu. Lütfen kaçırdıysanız, önce ön öğreticiyi okuyun.

Python paketlerini kurun

ML işlem hatlarını yazmak ve işleri Vertex Pipelines'a göndermek için TFX ve KFP dahil gerekli Python paketlerini kuracağız.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

Çalışma zamanını yeniden başlattınız mı?

Google Colab kullanıyorsanız, yukarıdaki hücreyi ilk kez çalıştırdığınızda, yukarıdaki "ÇALIŞTIRMA ZAMINI YENİDEN BAŞLAT" düğmesini tıklayarak veya "Çalışma Zamanı > Çalışma zamanını yeniden başlat ..." menüsünü kullanarak çalışma zamanını yeniden başlatmanız gerekir. Bunun nedeni Colab'ın paketleri yükleme şeklidir.

Colab'de değilseniz, aşağıdaki hücre ile çalışma zamanını yeniden başlatabilirsiniz.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Bu not defteri için Google'da oturum açın

Bu not defterini Colab üzerinde çalıştırıyorsanız, kullanıcı hesabınızla kimlik doğrulaması yapın:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

AI Platform Notebooks kullanıyorsanız , sonraki bölümü çalıştırmadan önce Google Cloud ile kimlik doğrulaması yapın.

gcloud auth login

Terminal penceresinde (menüde Dosya > Yeni ile açabilirsiniz). Bunu her notebook örneği için yalnızca bir kez yapmanız gerekir.

Paket sürümlerini kontrol edin.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
tutucu5 l10n-yer
TensorFlow version: 2.7.1
TFX version: 1.6.0
KFP version: 1.8.11

Değişkenleri ayarla

Aşağıdaki boru hatlarını özelleştirmek için kullanılan bazı değişkenleri ayarlayacağız. Aşağıdaki bilgiler gereklidir:

Çalıştırmadan önce aşağıdaki hücreye gerekli değerleri girin .

GOOGLE_CLOUD_PROJECT = ''         # <--- ENTER THIS
GOOGLE_CLOUD_PROJECT_NUMBER = ''  # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''          # <--- ENTER THIS
GCS_BUCKET_NAME = ''              # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and  GOOGLE_CLOUD_PROJECT_NUMBER and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
tutucu7 l10n-yer
ERROR:absl:Please set all required parameters.

Projenizi kullanmak için gcloud ayarlayın.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
yer tutucu10 l10n-yer
PIPELINE_NAME = 'penguin-bigquery'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-bigquery

Varsayılan olarak Vertex Pipelines, [project-number]-compute@developer.gserviceaccount.com biçimindeki varsayılan GCE VM hizmet hesabını kullanır. Ardışık düzende BigQuery'ye erişmek için bu hesaba BigQuery kullanma izni vermemiz gerekiyor. Hesaba 'BigQuery Kullanıcısı' rolü ekleyeceğiz.

!gcloud projects add-iam-policy-binding {GOOGLE_CLOUD_PROJECT} \
  --member=serviceAccount:{GOOGLE_CLOUD_PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
  --role=roles/bigquery.user
tutucu13 l10n-yer
ERROR: (gcloud.projects.add-iam-policy-binding) argument PROJECT_ID: Must be specified.
Usage: gcloud projects add-iam-policy-binding PROJECT_ID --member=PRINCIPAL --role=ROLE [optional flags]
  optional flags may be  --condition | --condition-from-file | --help

For detailed information on this command and its flags, run:
  gcloud projects add-iam-policy-binding --help

Hizmet hesapları ve IAM yapılandırması hakkında daha fazla bilgi edinmek için lütfen Vertex belgelerine bakın.

Bir işlem hattı oluşturun

TFX işlem hatları, Vertex Pipelines Eğitimi için Simple TFX Pipeline'da yaptığımız gibi Python API'leri kullanılarak tanımlanır. Daha önce bir CSV dosyasından veri okuyan CsvExampleGen kullandık. Bu eğiticide, BigQuery'den veri okuyan BigQueryExampleGen bileşenini kullanacağız.

BigQuery sorgusu hazırlayın

Aynı Palmer Penguins veri setini kullanacağız. Ancak, aynı CSV dosyası kullanılarak doldurulmuş bir BigQuery tablosu tfx-oss-public.palmer_penguins.palmer_penguins .

Google Colab kullanıyorsanız, doğrudan BigQuery tablosunun içeriğini inceleyebilirsiniz.

# docs_infra: no_execute
%%bigquery --project {GOOGLE_CLOUD_PROJECT}
SELECT *
FROM `tfx-oss-public.palmer_penguins.palmer_penguins`
LIMIT 5

Etiket olan species dışında tüm özellikler zaten 0~1'e normalleştirildi. Penguen species tahmin eden bir sınıflandırma modeli oluşturacağız.

BigQueryExampleGen , hangi verilerin getirileceğini belirtmek için bir sorgu gerektirir. Tablodaki tüm satırların tüm alanlarını kullanacağımız için sorgu oldukça basittir. Ayrıca alan adlarını belirtebilir ve BigQuery Standard SQL sözdizimine göre gerektiği gibi WHERE koşulları ekleyebilirsiniz.

QUERY = "SELECT * FROM `tfx-oss-public.palmer_penguins.palmer_penguins`"

Model kodunu yazın.

Simple TFX Pipeline Tutorial'dakiyle aynı model kodunu kullanacağız.

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
-yer tutucu18 l10n-yer
Writing penguin_trainer.py

Modül dosyasını, ardışık düzen bileşenlerinden erişilebilen GCS'ye kopyalayın. Model eğitimi GCP'de gerçekleştiği için bu model tanımını yüklememiz gerekiyor.

Aksi takdirde, modül dosyasını içeren bir kapsayıcı görüntüsü oluşturmak ve ardışık düzeni çalıştırmak için görüntüyü kullanmak isteyebilirsiniz.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
tutucu20 l10n-yer
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-bigquery/".

Bir işlem hattı tanımı yazın

Bir TFX boru hattı oluşturmak için bir fonksiyon tanımlayacağız. Argüman olarak query alan BigQueryExampleGen kullanmamız gerekiyor. Önceki öğreticiden bir değişiklik daha, yürütüldüklerinde bileşenlere geçirilen beam_pipeline_args gerektiğidir. BigQuery'ye ek parametreler iletmek için beam_pipeline_args kullanacağız.

from typing import List, Optional

def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
                     module_file: str, serving_model_dir: str,
                     beam_pipeline_args: Optional[List[str]],
                     ) -> tfx.dsl.Pipeline:
  """Creates a TFX pipeline using BigQuery."""

  # NEW: Query data in BigQuery as a data source.
  example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
      query=query)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a file destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      # NEW: `beam_pipeline_args` is required to use BigQueryExampleGen.
      beam_pipeline_args=beam_pipeline_args)

İşlem hattını Vertex Pipelines üzerinde çalıştırın.

Vertex Pipelines Eğitimi için Simple TFX Pipeline'da yaptığımız gibi ardışık düzeni çalıştırmak için Vertex Pipelines'ı kullanacağız.

Ayrıca BigQueryExampleGen için beam_pipeline_args gerekiyor. GCP projesinin adı ve BigQuery yürütmesi için geçici depolama alanı gibi yapılandırmaları içerir.

import os

# We need to pass some GCP related configs to BigQuery. This is currently done
# using `beam_pipeline_args` parameter.
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   ]

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        query=QUERY,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR,
        beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS))

Oluşturulan tanım dosyası kfp istemcisi kullanılarak gönderilebilir.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Artık ilerlemeyi görmek için Google Cloud Console'da "Vertex AI > Pipelines"ı ziyaret edebilirsiniz.