TFX এবং Vertex Pipelines সহ BigQuery থেকে ডেটা পড়া

এই নোটবুক-ভিত্তিক টিউটোরিয়ালটি একটি ML মডেলকে প্রশিক্ষণের জন্য ডেটা উত্স হিসাবে Google Cloud BigQuery ব্যবহার করবে৷ ML পাইপলাইনটি TFX ব্যবহার করে নির্মিত হবে এবং Google Cloud Vertex Pipelines এ চলবে।

এই নোটবুকটি TFX পাইপলাইনের উপর ভিত্তি করে তৈরি করা হয়েছে যা আমরা Vertex Pipelines টিউটোরিয়ালের জন্য Simple TFX পাইপলাইনে তৈরি করেছি। আপনি যদি এখনও সেই টিউটোরিয়ালটি না পড়ে থাকেন তবে এই নোটবুকটি নিয়ে এগিয়ে যাওয়ার আগে আপনার এটি পড়া উচিত।

BigQuery হল সার্ভারহীন, অত্যন্ত মাপযোগ্য এবং সাশ্রয়ী মাল্টি-ক্লাউড ডেটা গুদাম যা ব্যবসায়িক তত্পরতার জন্য ডিজাইন করা হয়েছে। TFX BigQuery থেকে প্রশিক্ষণের ডেটা পড়তে এবং BigQuery- এ প্রশিক্ষিত মডেল প্রকাশ করতে ব্যবহার করা যেতে পারে।

এই টিউটোরিয়ালে, আমরা BigQueryExampleGen কম্পোনেন্ট ব্যবহার করব যা BigQuery থেকে TFX পাইপলাইনে ডেটা পড়ে।

এই নোটবুকটি Google Colab বা AI প্ল্যাটফর্ম নোটবুকে চালানোর উদ্দেশ্যে তৈরি করা হয়েছে। আপনি যদি এর মধ্যে একটি ব্যবহার না করে থাকেন, তাহলে আপনি উপরের "Google Colab-এ চালান" বোতামে ক্লিক করতে পারেন।

সেট আপ করুন

আপনি যদি ভার্টেক্স পাইপলাইন টিউটোরিয়ালের জন্য সাধারণ TFX পাইপলাইন সম্পূর্ণ করে থাকেন, তাহলে আপনার কাছে একটি কার্যকরী GCP প্রকল্প এবং একটি GCS বালতি থাকবে এবং এই টিউটোরিয়ালটির জন্য আমাদের শুধু এটিই প্রয়োজন। অনুগ্রহ করে প্রথমে প্রাথমিক টিউটোরিয়ালটি পড়ুন যদি আপনি এটি মিস করেন।

পাইথন প্যাকেজ ইনস্টল করুন

আমরা ML পাইপলাইন লেখকের কাছে TFX এবং KFP সহ প্রয়োজনীয় পাইথন প্যাকেজ ইনস্টল করব এবং Vertex পাইপলাইনে চাকরি জমা দেব।

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

আপনি কি রানটাইম রিস্টার্ট করেছেন?

আপনি যদি Google Colab ব্যবহার করেন, প্রথমবার যখন আপনি উপরের সেলটি চালান, তাহলে আপনাকে অবশ্যই উপরে "রিস্টার্ট RUNTIME" বোতামে ক্লিক করে বা "রানটাইম > রানটাইম রিস্টার্ট..." মেনু ব্যবহার করে রানটাইম রিস্টার্ট করতে হবে। Colab যেভাবে প্যাকেজগুলি লোড করে তার কারণেই এটি হয়েছে৷

আপনি Colab-এ না থাকলে, নিম্নলিখিত সেল দিয়ে রানটাইম রিস্টার্ট করতে পারেন।

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

এই নোটবুকের জন্য গুগলে লগ ইন করুন

আপনি যদি Colab-এ এই নোটবুকটি চালান, তাহলে আপনার ব্যবহারকারীর অ্যাকাউন্ট দিয়ে প্রমাণীকরণ করুন:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

আপনি যদি AI প্ল্যাটফর্ম নোটবুকগুলিতে থাকেন , তাহলে পরবর্তী বিভাগটি চালানোর আগে Google ক্লাউড দিয়ে প্রমাণীকরণ করুন

gcloud auth login

টার্মিনাল উইন্ডোতে (যা আপনি ফাইলের মাধ্যমে খুলতে পারেন > মেনুতে নতুন )। আপনি শুধুমাত্র নোটবুক উদাহরণ প্রতি একবার এটি করতে হবে.

প্যাকেজ সংস্করণ পরীক্ষা করুন.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1
TFX version: 1.6.0
KFP version: 1.8.11

ভেরিয়েবল সেট আপ করুন

আমরা নীচের পাইপলাইনগুলি কাস্টমাইজ করতে ব্যবহৃত কিছু ভেরিয়েবল সেট আপ করব। নিম্নলিখিত তথ্য প্রয়োজন:

এটি চালানোর আগে নীচের ঘরে প্রয়োজনীয় মানগুলি লিখুন

GOOGLE_CLOUD_PROJECT = ''         # <--- ENTER THIS
GOOGLE_CLOUD_PROJECT_NUMBER = ''  # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''          # <--- ENTER THIS
GCS_BUCKET_NAME = ''              # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and  GOOGLE_CLOUD_PROJECT_NUMBER and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

আপনার প্রকল্প ব্যবহার করতে gcloud সেট করুন।

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-bigquery'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-bigquery

ডিফল্টরূপে ভার্টেক্স পাইপলাইন ফরম্যাটের ডিফল্ট GCE VM পরিষেবা অ্যাকাউন্ট ব্যবহার করে [project-number]-compute@developer.gserviceaccount.com । পাইপলাইনে BigQuery অ্যাক্সেস করতে আমাদের এই অ্যাকাউন্টে BigQuery ব্যবহার করার অনুমতি দিতে হবে। আমরা অ্যাকাউন্টে 'BigQuery ব্যবহারকারী' ভূমিকা যোগ করব।

!gcloud projects add-iam-policy-binding {GOOGLE_CLOUD_PROJECT} \
  --member=serviceAccount:{GOOGLE_CLOUD_PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
  --role=roles/bigquery.user
ERROR: (gcloud.projects.add-iam-policy-binding) argument PROJECT_ID: Must be specified.
Usage: gcloud projects add-iam-policy-binding PROJECT_ID --member=PRINCIPAL --role=ROLE [optional flags]
  optional flags may be  --condition | --condition-from-file | --help

For detailed information on this command and its flags, run:
  gcloud projects add-iam-policy-binding --help

পরিষেবা অ্যাকাউন্ট এবং IAM কনফিগারেশন সম্পর্কে আরও জানতে অনুগ্রহ করে Vertex ডকুমেন্টেশন দেখুন।

একটি পাইপলাইন তৈরি করুন

TFX পাইপলাইনগুলি Python API ব্যবহার করে সংজ্ঞায়িত করা হয়েছে যেমনটি আমরা Vertex Pipelines টিউটোরিয়ালের জন্য সহজ TFX পাইপলাইনে করেছি। আমরা আগে CsvExampleGen ব্যবহার করতাম যা একটি CSV ফাইল থেকে ডেটা পড়ে। এই টিউটোরিয়ালে, আমরা BigQueryExampleGen কম্পোনেন্ট ব্যবহার করব যা BigQuery থেকে ডেটা পড়ে।

BigQuery কোয়েরি প্রস্তুত করুন

আমরা একই পামার পেঙ্গুইন ডেটাসেট ব্যবহার করব। যাইহোক, আমরা এটি একটি BigQuery টেবিল থেকে tfx-oss-public.palmer_penguins.palmer_penguins যা একই CSV ফাইল ব্যবহার করে তৈরি করা হয়েছে।

আপনি যদি Google Colab ব্যবহার করেন, তাহলে আপনি সরাসরি BigQuery টেবিলের বিষয়বস্তু পরীক্ষা করতে পারেন।

# docs_infra: no_execute
%%bigquery --project {GOOGLE_CLOUD_PROJECT}
SELECT *
FROM `tfx-oss-public.palmer_penguins.palmer_penguins`
LIMIT 5

লেবেল যা species বাদে সমস্ত বৈশিষ্ট্য ইতিমধ্যেই 0~1 এ স্বাভাবিক করা হয়েছে৷ আমরা একটি শ্রেণিবিন্যাস মডেল তৈরি করব যা পেঙ্গুইনের species পূর্বাভাস দেয়।

কোন ডেটা আনতে হবে তা নির্দিষ্ট করতে BigQueryExampleGen এর একটি ক্যোয়ারী প্রয়োজন। কারণ আমরা টেবিলের সমস্ত সারির সমস্ত ক্ষেত্র ব্যবহার করব, প্রশ্নটি বেশ সহজ। এছাড়াও আপনি ক্ষেত্রের নাম নির্দিষ্ট করতে পারেন এবং BigQuery স্ট্যান্ডার্ড SQL সিনট্যাক্স অনুযায়ী প্রয়োজন অনুযায়ী WHERE শর্ত যোগ করতে পারেন।

QUERY = "SELECT * FROM `tfx-oss-public.palmer_penguins.palmer_penguins`"

মডেল কোড লিখুন।

আমরা সাধারণ TFX পাইপলাইন টিউটোরিয়ালের মতো একই মডেল কোড ব্যবহার করব।

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

মডিউল ফাইলটি GCS-এ অনুলিপি করুন যা পাইপলাইনের উপাদানগুলি থেকে অ্যাক্সেস করা যেতে পারে। যেহেতু মডেল প্রশিক্ষণ GCP-তে হয়, তাই আমাদের এই মডেল সংজ্ঞা আপলোড করতে হবে।

অন্যথায়, আপনি মডিউল ফাইল সহ একটি ধারক চিত্র তৈরি করতে এবং পাইপলাইন চালানোর জন্য চিত্রটি ব্যবহার করতে চাইতে পারেন।

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-bigquery/".

পাইপলাইনের সংজ্ঞা লিখ

আমরা একটি TFX পাইপলাইন তৈরি করার জন্য একটি ফাংশন সংজ্ঞায়িত করব। আমাদের BigQueryExampleGen ব্যবহার করতে হবে যা একটি আর্গুমেন্ট হিসাবে query নেয়। আগের টিউটোরিয়াল থেকে আরও একটি পরিবর্তন হল যে আমাদেরকে beam_pipeline_args পাস করতে হবে যা কম্পোনেন্টে পাস করা হয় যখন সেগুলি কার্যকর করা হয়। BigQuery-এ অতিরিক্ত প্যারামিটার পাস করতে আমরা beam_pipeline_args ব্যবহার করব।

from typing import List, Optional

def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
                     module_file: str, serving_model_dir: str,
                     beam_pipeline_args: Optional[List[str]],
                     ) -> tfx.dsl.Pipeline:
  """Creates a TFX pipeline using BigQuery."""

  # NEW: Query data in BigQuery as a data source.
  example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
      query=query)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a file destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      # NEW: `beam_pipeline_args` is required to use BigQueryExampleGen.
      beam_pipeline_args=beam_pipeline_args)

ভার্টেক্স পাইপলাইনে পাইপলাইন চালান।

আমরা ভার্টেক্স পাইপলাইন টিউটোরিয়ালের জন্য সিম্পল টিএফএক্স পাইপলাইনে যেভাবে পাইপলাইন চালানোর জন্য ভার্টেক্স পাইপলাইন ব্যবহার করব।

আমাদের BigQueryExampleGen-এর জন্য beam_pipeline_args ও পাস করতে হবে। এতে GCP প্রকল্পের নাম এবং BigQuery সম্পাদনের জন্য অস্থায়ী স্টোরেজের মতো কনফিগারগুলি অন্তর্ভুক্ত রয়েছে।

import os

# We need to pass some GCP related configs to BigQuery. This is currently done
# using `beam_pipeline_args` parameter.
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   ]

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        query=QUERY,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR,
        beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS))

উত্পন্ন সংজ্ঞা ফাইল kfp ক্লায়েন্ট ব্যবহার করে জমা দেওয়া যেতে পারে.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

এখন আপনি অগ্রগতি দেখতে Google ক্লাউড কনসোলে 'Vertex AI > Pipelines'-এ যেতে পারেন।