قراءة البيانات من BigQuery باستخدام TFX و Vertex Pipelines

سيستخدم هذا البرنامج التعليمي المستند إلى الكمبيوتر المحمول Google Cloud BigQuery كمصدر بيانات لتدريب نموذج ML. سيتم إنشاء خط أنابيب ML باستخدام TFX وتشغيله على Google Cloud Vertex Pipelines.

يعتمد هذا الكمبيوتر الدفتري على خط أنابيب TFX الذي أنشأناه في Simple TFX Pipeline لبرنامج تعليمي لخطوط أنابيب Vertex . إذا لم تكن قد قرأت هذا البرنامج التعليمي حتى الآن ، فيجب عليك قراءته قبل المتابعة مع دفتر الملاحظات هذا.

BigQuery عبارة عن مستودع بيانات متعدد السحاب متعدد السحاب ، وقابل للتطوير بدرجة عالية ، وفعال من حيث التكلفة ، مصمم لسرعة الأعمال. يمكن استخدام TFX لقراءة بيانات التدريب من BigQuery ونشر النموذج المدرب على BigQuery.

في هذا البرنامج التعليمي ، سنستخدم مكون BigQueryExampleGen الذي يقرأ البيانات من BigQuery إلى خطوط أنابيب TFX.

تم تصميم هذا الكمبيوتر الدفتري ليتم تشغيله على Google Colab أو على أجهزة الكمبيوتر المحمولة بنظام AI . إذا كنت لا تستخدم أيًا من هؤلاء ، يمكنك ببساطة النقر فوق الزر "تشغيل في Google Colab" أعلاه.

يثبت

إذا كنت قد أكملت Simple TFX Pipeline لبرنامج Vertex Pipelines Tutorial ، فسيكون لديك مشروع GCP عامل ودلو GCS وهذا كل ما نحتاجه لهذا البرنامج التعليمي. يرجى قراءة البرنامج التعليمي الأولي أولاً إذا فاتتك.

ثبت حزم بايثون

سنقوم بتثبيت حزم Python المطلوبة بما في ذلك TFX و KFP لتأليف خطوط أنابيب ML وإرسال الوظائف إلى خطوط أنابيب Vertex.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

هل أعدت تشغيل وقت التشغيل؟

إذا كنت تستخدم Google Colab ، في المرة الأولى التي تقوم فيها بتشغيل الخلية أعلاه ، يجب إعادة تشغيل وقت التشغيل بالنقر فوق الزر "RESTART RUNTIME" أعلاه أو باستخدام قائمة "Runtime> Restart runtime ...". هذا بسبب الطريقة التي يقوم بها كولاب بتحميل الحزم.

إذا لم تكن في Colab ، يمكنك إعادة تشغيل وقت التشغيل بالخلية التالية.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

تسجيل الدخول إلى Google لهذا دفتر الملاحظات

إذا كنت تقوم بتشغيل هذا الكمبيوتر الدفتري على Colab ، فقم بالمصادقة باستخدام حساب المستخدم الخاص بك:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

إذا كنت تستخدم أجهزة الكمبيوتر المحمولة بنظام AI Platform ، فقم بالمصادقة باستخدام Google Cloud قبل تشغيل القسم التالي ، عن طريق التشغيل

gcloud auth login

في نافذة Terminal (والتي يمكنك فتحها عبر ملف > جديد في القائمة). ما عليك سوى القيام بذلك مرة واحدة لكل مثيل دفتر ملاحظات.

تحقق من إصدارات الحزمة.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1
TFX version: 1.6.0
KFP version: 1.8.11

قم بإعداد المتغيرات

سنقوم بإعداد بعض المتغيرات المستخدمة لتخصيص خطوط الأنابيب أدناه. المعلومات التالية مطلوبة:

  • رقم تعريف مشروع GCP ورقمه. انظر تحديد معرف المشروع الخاص بك ورقم .
  • منطقة "شركاء Google المعتمدون" لتشغيل خطوط الأنابيب. لمزيد من المعلومات حول المناطق التي تتوفر فيها خطوط أنابيب Vertex ، راجع دليل مواقع Vertex AI .
  • حزمة Google Cloud Storage لتخزين مخرجات خطوط الأنابيب.

أدخل القيم المطلوبة في الخلية أدناه قبل تشغيلها .

GOOGLE_CLOUD_PROJECT = ''         # <--- ENTER THIS
GOOGLE_CLOUD_PROJECT_NUMBER = ''  # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''          # <--- ENTER THIS
GCS_BUCKET_NAME = ''              # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and  GOOGLE_CLOUD_PROJECT_NUMBER and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

اضبط gcloud لاستخدام مشروعك.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-bigquery'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-bigquery

بشكل افتراضي ، تستخدم خطوط أنابيب Vertex حساب خدمة GCE VM الافتراضي بالتنسيق [project-number]-compute@developer.gserviceaccount.com . نحتاج إلى منح إذن لاستخدام BigQuery لهذا الحساب للوصول إلى BigQuery قيد التنفيذ. سنضيف دور "مستخدم BigQuery" إلى الحساب.

!gcloud projects add-iam-policy-binding {GOOGLE_CLOUD_PROJECT} \
  --member=serviceAccount:{GOOGLE_CLOUD_PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
  --role=roles/bigquery.user
ERROR: (gcloud.projects.add-iam-policy-binding) argument PROJECT_ID: Must be specified.
Usage: gcloud projects add-iam-policy-binding PROJECT_ID --member=PRINCIPAL --role=ROLE [optional flags]
  optional flags may be  --condition | --condition-from-file | --help

For detailed information on this command and its flags, run:
  gcloud projects add-iam-policy-binding --help

يرجى الاطلاع على وثائق Vertex لمعرفة المزيد حول حسابات الخدمة وتكوين IAM.

قم بإنشاء خط أنابيب

يتم تعريف خطوط أنابيب TFX باستخدام واجهات برمجة تطبيقات Python كما فعلنا في Simple TFX Pipeline لبرنامج Vertex Pipelines Tutorial . استخدمنا سابقًا CsvExampleGen الذي يقرأ البيانات من ملف CSV. في هذا البرنامج التعليمي ، سنستخدم مكون BigQueryExampleGen الذي يقرأ البيانات من BigQuery.

إعداد استعلام BigQuery

سنستخدم نفس مجموعة بيانات Palmer Penguins . ومع ذلك ، سنقرأها من جدول BigQuery tfx-oss-public.palmer_penguins.palmer_penguins الذي يتم ملؤه باستخدام نفس ملف CSV.

إذا كنت تستخدم Google Colab ، فيمكنك فحص محتوى جدول BigQuery مباشرةً.

# docs_infra: no_execute
%%bigquery --project {GOOGLE_CLOUD_PROJECT}
SELECT *
FROM `tfx-oss-public.palmer_penguins.palmer_penguins`
LIMIT 5

تم تطبيع جميع الميزات بالفعل إلى 0 ~ 1 باستثناء species التي هي التسمية. سنقوم ببناء نموذج تصنيف يتنبأ species طيور البطريق.

يتطلب BigQueryExampleGen استعلامًا لتحديد البيانات المطلوب جلبها. نظرًا لأننا سنستخدم جميع حقول جميع الصفوف في الجدول ، فإن الاستعلام بسيط للغاية. يمكنك أيضًا تحديد أسماء الحقول وإضافة شروط WHERE حسب الحاجة وفقًا لبناء جملة SQL القياسي في BigQuery .

QUERY = "SELECT * FROM `tfx-oss-public.palmer_penguins.palmer_penguins`"

اكتب كود النموذج.

سنستخدم نفس رمز النموذج كما في البرنامج التعليمي Simple TFX Pipeline .

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

انسخ ملف الوحدة النمطية إلى GCS والذي يمكن الوصول إليه من مكونات خط الأنابيب. نظرًا لأن التدريب النموذجي يحدث على GCP ، نحتاج إلى تحميل تعريف النموذج هذا.

خلاف ذلك ، قد ترغب في إنشاء صورة حاوية بما في ذلك ملف الوحدة النمطية واستخدام الصورة لتشغيل خط الأنابيب.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-bigquery/".

اكتب تعريف خط الأنابيب

سنحدد وظيفة لإنشاء خط أنابيب TFX. نحتاج إلى استخدام BigQueryExampleGen الذي يأخذ query كوسيطة. تغيير آخر من البرنامج التعليمي السابق هو أننا بحاجة إلى تمرير beam_pipeline_args الذي يتم تمريره إلى المكونات عند تنفيذها. سنستخدم beam_pipeline_args لتمرير معلمات إضافية إلى BigQuery.

from typing import List, Optional

def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
                     module_file: str, serving_model_dir: str,
                     beam_pipeline_args: Optional[List[str]],
                     ) -> tfx.dsl.Pipeline:
  """Creates a TFX pipeline using BigQuery."""

  # NEW: Query data in BigQuery as a data source.
  example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
      query=query)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a file destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      # NEW: `beam_pipeline_args` is required to use BigQueryExampleGen.
      beam_pipeline_args=beam_pipeline_args)

قم بتشغيل خط الأنابيب على خطوط أنابيب Vertex.

سنستخدم خطوط أنابيب Vertex لتشغيل خط الأنابيب كما فعلنا في Simple TFX Pipeline لبرنامج Vertex Pipelines Tutorial .

نحتاج أيضًا إلى تمرير beam_pipeline_args إلى BigQueryExampleGen. يتضمن تكوينات مثل اسم مشروع GCP والتخزين المؤقت لتنفيذ BigQuery.

import os

# We need to pass some GCP related configs to BigQuery. This is currently done
# using `beam_pipeline_args` parameter.
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   ]

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        query=QUERY,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR,
        beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS))

يمكن تقديم ملف التعريف الذي تم إنشاؤه باستخدام عميل kfp.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

يمكنك الآن زيارة "Vertex AI> Pipelines" في Google Cloud Console لمعرفة التقدم.