Membaca data dari BigQuery dengan TFX dan Vertex Pipelines

Tutorial berbasis notebook ini akan menggunakan Google Cloud BigQuery sebagai sumber data untuk melatih model ML. Pipeline ML akan dibuat menggunakan TFX dan dijalankan di Google Cloud Vertex Pipelines.

Notebook ini didasarkan pada pipeline TFX yang kami buat di Simple TFX Pipeline for Vertex Pipelines Tutorial . Jika Anda belum membaca tutorial itu, Anda harus membacanya sebelum melanjutkan dengan buku catatan ini.

BigQuery adalah gudang data multi-cloud tanpa server, sangat skalabel, dan hemat biaya yang dirancang untuk kelincahan bisnis. TFX dapat digunakan untuk membaca data pelatihan dari BigQuery dan memublikasikan model terlatih ke BigQuery.

Dalam tutorial ini, kita akan menggunakan komponen BigQueryExampleGen yang membaca data dari BigQuery ke pipeline TFX.

Notebook ini dimaksudkan untuk dijalankan di Google Colab atau di AI Platform Notebook . Jika Anda tidak menggunakan salah satunya, Anda cukup mengeklik tombol "Jalankan di Google Colab" di atas.

Mempersiapkan

Jika Anda telah menyelesaikan Tutorial Simple TFX Pipeline untuk Vertex Pipelines , Anda akan memiliki proyek GCP yang berfungsi dan bucket GCS dan hanya itu yang kami butuhkan untuk tutorial ini. Silakan baca tutorial pendahuluan terlebih dahulu jika Anda melewatkannya.

Instal paket python

Kami akan menginstal paket Python yang diperlukan termasuk TFX dan KFP untuk membuat pipeline ML dan mengirimkan pekerjaan ke Vertex Pipelines.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

Apakah Anda me-restart runtime?

Jika Anda menggunakan Google Colab, pertama kali menjalankan sel di atas, Anda harus memulai ulang runtime dengan mengeklik tombol "RESTART RUNTIME" di atas atau menggunakan menu "Runtime > Restart runtime ...". Ini karena cara Colab memuat paket.

Jika Anda tidak menggunakan Colab, Anda dapat memulai ulang runtime dengan sel berikut.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Masuk ke Google untuk buku catatan ini

Jika Anda menjalankan notebook ini di Colab, autentikasi dengan akun pengguna Anda:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

Jika Anda menggunakan AI Platform Notebooks , autentikasi dengan Google Cloud sebelum menjalankan bagian berikutnya, dengan menjalankan

gcloud auth login

di jendela Terminal (yang dapat Anda buka melalui File > New di menu). Anda hanya perlu melakukan ini sekali per instance notebook.

Periksa versi paket.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1
TFX version: 1.6.0
KFP version: 1.8.11

Mengatur variabel

Kami akan menyiapkan beberapa variabel yang digunakan untuk menyesuaikan jalur pipa di bawah ini. Informasi berikut diperlukan:

Masukkan nilai yang diperlukan dalam sel di bawah ini sebelum menjalankannya .

GOOGLE_CLOUD_PROJECT = ''         # <--- ENTER THIS
GOOGLE_CLOUD_PROJECT_NUMBER = ''  # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''          # <--- ENTER THIS
GCS_BUCKET_NAME = ''              # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and  GOOGLE_CLOUD_PROJECT_NUMBER and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

Setel gcloud untuk menggunakan proyek Anda.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-bigquery'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-bigquery

Secara default, Vertex Pipelines menggunakan akun layanan VM GCE default dengan format [project-number]-compute@developer.gserviceaccount.com . Kami perlu memberikan izin untuk menggunakan BigQuery ke akun ini guna mengakses BigQuery dalam pipeline. Kami akan menambahkan peran 'Pengguna BigQuery' ke akun.

!gcloud projects add-iam-policy-binding {GOOGLE_CLOUD_PROJECT} \
  --member=serviceAccount:{GOOGLE_CLOUD_PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
  --role=roles/bigquery.user
ERROR: (gcloud.projects.add-iam-policy-binding) argument PROJECT_ID: Must be specified.
Usage: gcloud projects add-iam-policy-binding PROJECT_ID --member=PRINCIPAL --role=ROLE [optional flags]
  optional flags may be  --condition | --condition-from-file | --help

For detailed information on this command and its flags, run:
  gcloud projects add-iam-policy-binding --help

Silakan lihat dokumentasi Vertex untuk mempelajari lebih lanjut tentang akun layanan dan konfigurasi IAM.

Buat saluran pipa

Pipeline TFX didefinisikan menggunakan API Python seperti yang kami lakukan di Simple TFX Pipeline for Vertex Pipelines Tutorial . Kami sebelumnya menggunakan CsvExampleGen yang membaca data dari file CSV. Dalam tutorial ini, kita akan menggunakan komponen BigQueryExampleGen yang membaca data dari BigQuery.

Siapkan kueri BigQuery

Kami akan menggunakan dataset Palmer Penguins yang sama . Namun, kami akan membacanya dari tabel BigQuery tfx-oss-public.palmer_penguins.palmer_penguins yang diisi menggunakan file CSV yang sama.

Jika Anda menggunakan Google Colab, Anda dapat memeriksa konten tabel BigQuery secara langsung.

# docs_infra: no_execute
%%bigquery --project {GOOGLE_CLOUD_PROJECT}
SELECT *
FROM `tfx-oss-public.palmer_penguins.palmer_penguins`
LIMIT 5

Semua fitur sudah dinormalisasi ke 0~1 kecuali species yang merupakan labelnya. Kami akan membangun model klasifikasi yang memprediksi species penguin.

BigQueryExampleGen memerlukan kueri untuk menentukan data mana yang akan diambil. Karena kami akan menggunakan semua bidang dari semua baris dalam tabel, kuerinya cukup sederhana. Anda juga dapat menentukan nama bidang dan menambahkan ketentuan WHERE sesuai kebutuhan sesuai dengan sintaks SQL Standar BigQuery .

QUERY = "SELECT * FROM `tfx-oss-public.palmer_penguins.palmer_penguins`"

Tulis kode model.

Kami akan menggunakan kode model yang sama seperti pada Tutorial Pipa TFX Sederhana .

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Salin file modul ke GCS yang dapat diakses dari komponen pipeline. Karena pelatihan model terjadi di GCP, kita perlu mengupload definisi model ini.

Jika tidak, Anda mungkin ingin membuat image container termasuk file modul dan menggunakan image untuk menjalankan pipeline.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-bigquery/".

Tulis definisi pipa

Kami akan mendefinisikan fungsi untuk membuat pipa TFX. Kita perlu menggunakan BigQueryExampleGen yang menggunakan query sebagai argumen. Satu lagi perubahan dari tutorial sebelumnya adalah kita harus melewati beam_pipeline_args yang diteruskan ke komponen saat dijalankan. Kami akan menggunakan beam_pipeline_args untuk meneruskan parameter tambahan ke BigQuery.

from typing import List, Optional

def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
                     module_file: str, serving_model_dir: str,
                     beam_pipeline_args: Optional[List[str]],
                     ) -> tfx.dsl.Pipeline:
  """Creates a TFX pipeline using BigQuery."""

  # NEW: Query data in BigQuery as a data source.
  example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
      query=query)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a file destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      # NEW: `beam_pipeline_args` is required to use BigQueryExampleGen.
      beam_pipeline_args=beam_pipeline_args)

Jalankan pipeline di Vertex Pipelines.

Kami akan menggunakan Vertex Pipelines untuk menjalankan pipeline seperti yang kami lakukan di Simple TFX Pipeline untuk Tutorial Vertex Pipelines .

Kita juga perlu meneruskan beam_pipeline_args untuk BigQueryExampleGen. Ini mencakup konfigurasi seperti nama project GCP dan penyimpanan sementara untuk eksekusi BigQuery.

import os

# We need to pass some GCP related configs to BigQuery. This is currently done
# using `beam_pipeline_args` parameter.
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   ]

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        query=QUERY,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR,
        beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS))

File definisi yang dihasilkan dapat dikirimkan menggunakan klien kfp.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Sekarang Anda dapat mengunjungi 'Vertex AI > Pipelines' di Google Cloud Console untuk melihat perkembangannya.