Чтение данных из BigQuery с помощью TFX и Vertex Pipelines

В этом учебном пособии на основе записной книжки в качестве источника данных для обучения модели машинного обучения будет использоваться Google Cloud BigQuery . Конвейер машинного обучения будет построен с использованием TFX и запущен в Google Cloud Vertex Pipelines.

Этот блокнот основан на конвейере TFX, который мы создали в учебнике Simple TFX Pipeline for Vertex Pipelines . Если вы еще не читали это руководство, прочтите его, прежде чем приступить к работе с этой записной книжкой.

BigQuery — это бессерверное, масштабируемое и экономичное многооблачное хранилище данных, разработанное для гибкости бизнеса. TFX можно использовать для чтения обучающих данных из BigQuery и публикации обученной модели в BigQuery.

В этом руководстве мы будем использовать компонент BigQueryExampleGen , который считывает данные из BigQuery в конвейеры TFX.

Эта записная книжка предназначена для работы в Google Colab или записных книжках на платформе AI . Если вы не используете один из них, вы можете просто нажать кнопку «Запустить в Google Colab» выше.

Настраивать

Если вы выполнили учебник Simple TFX Pipeline for Vertex Pipelines Tutorial , у вас будет работающий проект GCP и корзина GCS, и это все, что нам нужно для этого руководства. Пожалуйста, сначала прочитайте предварительное руководство, если вы его пропустили.

Установить пакеты питона

Мы установим необходимые пакеты Python, включая TFX и KFP, для создания конвейеров ML и отправки заданий в Vertex Pipelines.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

Вы перезапустили среду выполнения?

Если вы используете Google Colab, при первом запуске указанной выше ячейки необходимо перезапустить среду выполнения, нажав кнопку «RESTART RUNTIME» выше или используя меню «Runtime > Restart runtime...». Это связано с тем, как Colab загружает пакеты.

Если вы не используете Colab, вы можете перезапустить среду выполнения со следующей ячейкой.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Войдите в Google для этого блокнота

Если вы используете этот блокнот в Colab, выполните аутентификацию с помощью своей учетной записи пользователя:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

Если вы используете ноутбуки на платформе AI , выполните аутентификацию в Google Cloud перед запуском следующего раздела, запустив

gcloud auth login

в окне терминала (которое можно открыть через меню « Файл» > « Создать »). Это нужно сделать только один раз для каждого экземпляра записной книжки.

Проверьте версии пакетов.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1
TFX version: 1.6.0
KFP version: 1.8.11

Настройка переменных

Ниже мы настроим некоторые переменные, используемые для настройки конвейеров. Требуется следующая информация:

Введите требуемые значения в ячейку ниже перед запуском .

GOOGLE_CLOUD_PROJECT = ''         # <--- ENTER THIS
GOOGLE_CLOUD_PROJECT_NUMBER = ''  # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''          # <--- ENTER THIS
GCS_BUCKET_NAME = ''              # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and  GOOGLE_CLOUD_PROJECT_NUMBER and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

Настройте gcloud для использования вашего проекта.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-bigquery'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-bigquery

По умолчанию Vertex Pipelines использует учетную запись службы виртуальной машины GCE по умолчанию в формате [project-number]-compute@developer.gserviceaccount.com . Нам нужно дать разрешение на использование BigQuery этой учетной записи, чтобы получить доступ к BigQuery в конвейере. Мы добавим в учетную запись роль «Пользователь BigQuery».

!gcloud projects add-iam-policy-binding {GOOGLE_CLOUD_PROJECT} \
  --member=serviceAccount:{GOOGLE_CLOUD_PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
  --role=roles/bigquery.user
ERROR: (gcloud.projects.add-iam-policy-binding) argument PROJECT_ID: Must be specified.
Usage: gcloud projects add-iam-policy-binding PROJECT_ID --member=PRINCIPAL --role=ROLE [optional flags]
  optional flags may be  --condition | --condition-from-file | --help

For detailed information on this command and its flags, run:
  gcloud projects add-iam-policy-binding --help

См. документацию Vertex , чтобы узнать больше об учетных записях служб и конфигурации IAM.

Создать конвейер

Конвейеры TFX определяются с помощью API-интерфейсов Python, как мы это делали в учебнике Simple TFX Pipeline for Vertex Pipelines Tutorial . Ранее мы использовали CsvExampleGen , который считывает данные из CSV-файла. В этом руководстве мы будем использовать компонент BigQueryExampleGen , который считывает данные из BigQuery.

Подготовить запрос BigQuery

Мы будем использовать тот же набор данных Palmer Penguins . Однако мы будем считывать его из таблицы BigQuery tfx-oss-public.palmer_penguins.palmer_penguins , которая заполняется с использованием того же CSV-файла.

Если вы используете Google Colab, вы можете напрямую изучить содержимое таблицы BigQuery.

# docs_infra: no_execute
%%bigquery --project {GOOGLE_CLOUD_PROJECT}
SELECT *
FROM `tfx-oss-public.palmer_penguins.palmer_penguins`
LIMIT 5

Все признаки уже были нормализованы до 0~1, кроме species , который является меткой. Мы построим классификационную модель, которая предсказывает species пингвинов.

BigQueryExampleGen требует запроса, чтобы указать, какие данные нужно извлечь. Поскольку мы будем использовать все поля всех строк таблицы, запрос будет довольно простым. Вы также можете указать имена полей и добавить условия WHERE по мере необходимости в соответствии со стандартным синтаксисом SQL BigQuery .

QUERY = "SELECT * FROM `tfx-oss-public.palmer_penguins.palmer_penguins`"

Напишите код модели.

Мы будем использовать тот же код модели, что и в Simple TFX Pipeline Tutorial .

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Скопируйте файл модуля в GCS, к которому можно получить доступ из компонентов конвейера. Поскольку обучение модели происходит в GCP, нам необходимо загрузить это определение модели.

В противном случае вы можете создать образ контейнера, включающий файл модуля, и использовать этот образ для запуска конвейера.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-bigquery/".

Напишите определение конвейера

Мы определим функцию для создания конвейера TFX. Нам нужно использовать BigQueryExampleGen , который принимает query в качестве аргумента. Еще одно изменение по сравнению с предыдущим уроком заключается в том, что нам нужно передать beam_pipeline_args , который передается компонентам при их выполнении. Мы будем использовать beam_pipeline_args для передачи дополнительных параметров в BigQuery.

from typing import List, Optional

def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
                     module_file: str, serving_model_dir: str,
                     beam_pipeline_args: Optional[List[str]],
                     ) -> tfx.dsl.Pipeline:
  """Creates a TFX pipeline using BigQuery."""

  # NEW: Query data in BigQuery as a data source.
  example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
      query=query)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a file destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      # NEW: `beam_pipeline_args` is required to use BigQueryExampleGen.
      beam_pipeline_args=beam_pipeline_args)

Запустите конвейер на Vertex Pipelines.

Мы будем использовать Vertex Pipelines для запуска конвейера, как мы это делали в Simple TFX Pipeline for Vertex Pipelines Tutorial .

Нам также нужно передать beam_pipeline_args для BigQueryExampleGen. Он включает такие конфигурации, как имя проекта GCP и временное хранилище для выполнения BigQuery.

import os

# We need to pass some GCP related configs to BigQuery. This is currently done
# using `beam_pipeline_args` parameter.
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   ]

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        query=QUERY,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR,
        beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS))

Сгенерированный файл определения можно отправить с помощью клиента kfp.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Теперь вы можете посетить Vertex AI > Pipelines в Google Cloud Console , чтобы увидеть прогресс.