Đọc dữ liệu từ BigQuery với TFX và Vertex Pipelines

Hướng dẫn dựa trên sổ tay này sẽ sử dụng Google Cloud BigQuery làm nguồn dữ liệu để đào tạo mô hình ML. Đường ống ML sẽ được xây dựng bằng TFX và chạy trên Google Cloud Vertex Pipelines.

Máy tính xách tay này dựa trên đường ống TFX mà chúng tôi đã xây dựng trong Đường ống TFX đơn giản cho Hướng dẫn về đường ống Vertex . Nếu bạn chưa đọc hướng dẫn đó, bạn nên đọc nó trước khi tiếp tục với sổ tay này.

BigQuery là kho dữ liệu đa đám mây không máy chủ, có khả năng mở rộng cao và tiết kiệm chi phí được thiết kế cho sự nhanh nhạy của doanh nghiệp. TFX có thể được sử dụng để đọc dữ liệu đào tạo từ BigQuery và xuất bản mô hình đào tạo lên BigQuery.

Trong hướng dẫn này, chúng tôi sẽ sử dụng thành phần BigQueryExampleGen đọc dữ liệu từ các đường ống BigQuery đến TFX.

Máy tính xách tay này được thiết kế để chạy trên Google Colab hoặc trên Máy tính xách tay nền tảng AI . Nếu bạn không sử dụng một trong những cách này, bạn có thể chỉ cần nhấp vào nút "Chạy trong Google Colab" ở trên.

Cài đặt

Nếu bạn đã hoàn thành Hướng dẫn về Đường ống TFX Đơn giản cho Đường ống Đỉnh , bạn sẽ có một dự án GCP đang hoạt động và một nhóm GCS và đó là tất cả những gì chúng tôi cần cho hướng dẫn này. Vui lòng đọc hướng dẫn sơ bộ trước nếu bạn bỏ lỡ.

Cài đặt gói python

Chúng tôi sẽ cài đặt các gói Python được yêu cầu bao gồm TFX và KFP để tác giả các đường ống ML và gửi công việc cho Vertex Pipelines.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

Bạn có khởi động lại thời gian chạy không?

Nếu bạn đang sử dụng Google Colab, lần đầu tiên bạn chạy ô ở trên, bạn phải khởi động lại thời gian chạy bằng cách nhấp vào phía trên nút "RESTART RUNTIME" hoặc sử dụng menu "Runtime> Restart runtime ...". Điều này là do cách Colab tải các gói.

Nếu bạn không ở trên Colab, bạn có thể khởi động lại thời gian chạy bằng ô sau.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Đăng nhập vào Google cho sổ tay này

Nếu bạn đang chạy sổ ghi chép này trên Colab, hãy xác thực bằng tài khoản người dùng của bạn:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

Nếu bạn đang sử dụng Máy tính xách tay nền tảng AI , hãy xác thực với Google Cloud trước khi chạy phần tiếp theo, bằng cách chạy

gcloud auth login

trong cửa sổ Terminal (bạn có thể mở qua File > New trong menu). Bạn chỉ cần làm điều này một lần cho mỗi phiên bản sổ ghi chép.

Kiểm tra các phiên bản gói.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1
TFX version: 1.6.0
KFP version: 1.8.11

Thiết lập các biến

Chúng tôi sẽ thiết lập một số biến được sử dụng để tùy chỉnh các đường ống bên dưới. Thông tin sau là bắt buộc:

Nhập các giá trị bắt buộc vào ô bên dưới trước khi chạy nó .

GOOGLE_CLOUD_PROJECT = ''         # <--- ENTER THIS
GOOGLE_CLOUD_PROJECT_NUMBER = ''  # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''          # <--- ENTER THIS
GCS_BUCKET_NAME = ''              # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and  GOOGLE_CLOUD_PROJECT_NUMBER and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

Đặt gcloud để sử dụng dự án của bạn.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-bigquery'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-bigquery

Theo mặc định, Vertex Pipelines sử dụng tài khoản dịch vụ GCE VM mặc định có định dạng [project-number]-compute@developer.gserviceaccount.com . Chúng tôi cần cấp quyền sử dụng BigQuery cho tài khoản này để truy cập BigQuery trong quy trình. Chúng tôi sẽ thêm vai trò 'Người dùng BigQuery' vào tài khoản.

!gcloud projects add-iam-policy-binding {GOOGLE_CLOUD_PROJECT} \
  --member=serviceAccount:{GOOGLE_CLOUD_PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
  --role=roles/bigquery.user
ERROR: (gcloud.projects.add-iam-policy-binding) argument PROJECT_ID: Must be specified.
Usage: gcloud projects add-iam-policy-binding PROJECT_ID --member=PRINCIPAL --role=ROLE [optional flags]
  optional flags may be  --condition | --condition-from-file | --help

For detailed information on this command and its flags, run:
  gcloud projects add-iam-policy-binding --help

Vui lòng xem tài liệu Vertex để tìm hiểu thêm về tài khoản dịch vụ và cấu hình IAM.

Tạo một đường dẫn

Đường ống TFX được xác định bằng cách sử dụng API Python như chúng ta đã làm trong Hướng dẫn về Đường ống TFX Đơn giản cho Đường ống Vertex . Trước đây chúng tôi đã sử dụng CsvExampleGen để đọc dữ liệu từ tệp CSV. Trong hướng dẫn này, chúng tôi sẽ sử dụng thành phần BigQueryExampleGen đọc dữ liệu từ BigQuery.

Chuẩn bị truy vấn BigQuery

Chúng tôi sẽ sử dụng cùng một bộ dữ liệu Palmer Penguins . Tuy nhiên, chúng tôi sẽ đọc nó từ bảng BigQuery tfx-oss-public.palmer_penguins.palmer_penguins được điền bằng cùng một tệp CSV.

Nếu đang sử dụng Google Colab, bạn có thể kiểm tra trực tiếp nội dung của bảng BigQuery.

# docs_infra: no_execute
%%bigquery --project {GOOGLE_CLOUD_PROJECT}
SELECT *
FROM `tfx-oss-public.palmer_penguins.palmer_penguins`
LIMIT 5

Tất cả các tính năng đã được chuẩn hóa thành 0 ~ 1 ngoại trừ species là nhãn. Chúng tôi sẽ xây dựng một mô hình phân loại dự đoán species chim cánh cụt.

BigQueryExampleGen yêu cầu truy vấn để chỉ định dữ liệu nào cần tìm nạp. Vì chúng ta sẽ sử dụng tất cả các trường của tất cả các hàng trong bảng nên truy vấn khá đơn giản. Bạn cũng có thể chỉ định tên trường và thêm điều kiện WHERE nếu cần theo cú pháp SQL chuẩn của BigQuery .

QUERY = "SELECT * FROM `tfx-oss-public.palmer_penguins.palmer_penguins`"

Viết mã mô hình.

Chúng tôi sẽ sử dụng cùng một mã kiểu như trong Hướng dẫn sử dụng đường ống TFX đơn giản .

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Sao chép tệp mô-đun sang GCS có thể được truy cập từ các thành phần đường ống. Vì quá trình đào tạo mô hình diễn ra trên GCP nên chúng tôi cần tải lên định nghĩa mô hình này.

Nếu không, bạn có thể muốn xây dựng hình ảnh vùng chứa bao gồm tệp mô-đun và sử dụng hình ảnh để chạy đường dẫn.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-bigquery/".

Viết định nghĩa đường ống

Chúng tôi sẽ xác định một chức năng để tạo một đường ống TFX. Chúng tôi cần sử dụng BigQueryExampleGen lấy query làm đối số. Một thay đổi nữa so với hướng dẫn trước đó là chúng ta cần truyền beam_pipeline_args được truyền cho các thành phần khi chúng được thực thi. Chúng tôi sẽ sử dụng beam_pipeline_args để chuyển các tham số bổ sung cho BigQuery.

from typing import List, Optional

def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
                     module_file: str, serving_model_dir: str,
                     beam_pipeline_args: Optional[List[str]],
                     ) -> tfx.dsl.Pipeline:
  """Creates a TFX pipeline using BigQuery."""

  # NEW: Query data in BigQuery as a data source.
  example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
      query=query)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a file destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      # NEW: `beam_pipeline_args` is required to use BigQueryExampleGen.
      beam_pipeline_args=beam_pipeline_args)

Chạy đường ống trên Vertex Pipelines.

Chúng ta sẽ sử dụng Đường ống Vertex để chạy đường ống như chúng ta đã làm trong Hướng dẫn về Đường ống TFX Đơn giản cho Đường ống Vertex .

Chúng tôi cũng cần chuyển beam_pipeline_args cho BigQueryExampleGen. Nó bao gồm các cấu hình như tên của dự án GCP và bộ nhớ tạm thời để thực thi BigQuery.

import os

# We need to pass some GCP related configs to BigQuery. This is currently done
# using `beam_pipeline_args` parameter.
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   ]

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        query=QUERY,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR,
        beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS))

Tệp định nghĩa đã tạo có thể được gửi bằng ứng dụng khách kfp.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Bây giờ bạn có thể truy cập 'Vertex AI> Pipelines' trong Google Cloud Console để xem tiến trình.