MLコミュニティデーは11月9日です! TensorFlow、JAXからの更新のために私たちに参加し、より多くの詳細をご覧ください

TFXと頂点パイプラインを使用してBigQueryからデータを読み取る

このノートPCベースのチュートリアルでは、使用するGoogleクラウドBigQueryのをMLモデルを訓練するために、データソースとして。 MLパイプラインは、TFXを使用して構築され、Google CloudVertexパイプラインで実行されます。

このノートブックは、私たちが組み込まTFXパイプラインに基づいて頂点パイプラインのチュートリアル用のシンプルなTFXパイプライン。そのチュートリアルをまだ読んでいない場合は、このノートブックに進む前に読んでください。

BigQueryは、サーバーレス、高度にスケーラブル、およびビジネスの俊敏性のために設計されたコスト効果の高いマルチクラウドデータウェアハウスです。 TFXは、BigQueryのからのトレーニングデータを読み取るためにするために使用することができます訓練されたモデルを公開したBigQueryに。

このチュートリアルでは、使用するBigQueryExampleGen TFXパイプラインへのBigQueryからデータを読み込むコンポーネントを。

このノートブックは、上で実行されることを意図しているGoogleのコラボかにAIプラットフォームノート。これらのいずれかを使用していない場合は、上の[GoogleColabで実行]ボタンをクリックするだけです。

設定

あなたが完了している場合バーテックスパイプラインのチュートリアル用のシンプルなTFXパイプライン、あなたは作業GCPプロジェクトとGCSバケットを持つことになりますし、それは我々が、このチュートリアルに必要なすべてのです。見逃した場合は、最初に予備チュートリアルをお読みください。

Pythonパッケージをインストールする

TFXやKFPなどの必要なPythonパッケージをインストールして、MLパイプラインを作成し、VertexPipelinesにジョブを送信します。

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

ランタイムを再起動しましたか?

上記のセルを初めて実行するときにGoogleColabを使用している場合は、[ランタイムの再起動]ボタンをクリックするか、[ランタイム]> [ランタイムの再起動...]メニューを使用してランタイムを再起動する必要があります。これは、Colabがパッケージをロードする方法が原因です。

Colabを使用していない場合は、次のセルでランタイムを再開できます。

import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

このノートブックのためにGoogleにログインします

このノートブックをColabで実行している場合は、ユーザーアカウントで認証します。

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

あなたが実行することによって、次のセクションを実行する前に、AIプラットフォームノートブック、Googleクラウドで認証している場合は

gcloud auth login

ターミナルウィンドウで(あなたがファイルを経由して開くことができます>メニュー内)。これは、ノートブックインスタンスごとに1回だけ実行する必要があります。

パッケージのバージョンを確認してください。

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.5.1
TFX version: 1.2.0
KFP version: 1.8.1

変数を設定する

以下のパイプラインをカスタマイズするために使用されるいくつかの変数を設定します。次の情報が必要です。

それを実行する前に、下のセルに必要な値を入力します

GOOGLE_CLOUD_PROJECT = ''         # <--- ENTER THIS
GOOGLE_CLOUD_PROJECT_NUMBER = ''  # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''          # <--- ENTER THIS
GCS_BUCKET_NAME = ''              # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and  GOOGLE_CLOUD_PROJECT_NUMBER and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

セットには、 gcloudプロジェクトを使用します。

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-bigquery'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-bigquery

デフォルトでは、頂点パイプラインは、フォーマットのデフォルトGCE VMのサービスアカウント使用しています[project-number]-compute@developer.gserviceaccount.com 。パイプラインでBigQueryにアクセスするには、このアカウントにBigQueryを使用する権限を付与する必要があります。アカウントに「BigQueryユーザー」ロールを追加します。

!gcloud projects add-iam-policy-binding {GOOGLE_CLOUD_PROJECT} \
  --member=serviceAccount:{GOOGLE_CLOUD_PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
  --role=roles/bigquery.user
ERROR: (gcloud.projects.add-iam-policy-binding) argument PROJECT_ID: Must be specified.
Usage: gcloud projects add-iam-policy-binding PROJECT_ID --member=PRINCIPAL --role=ROLE [optional flags]
  optional flags may be  --condition | --condition-from-file | --help

For detailed information on this command and its flags, run:
  gcloud projects add-iam-policy-binding --help

ご覧下さい頂点のドキュメントをサービスアカウントおよびIAMの設定についての詳細を学ぶために。

パイプラインを作成する

私たちが行ったようにTFXパイプラインは、Python APIを使用して定義され、頂点パイプラインのチュートリアル用のシンプルなTFXのパイプライン。私たちは、以前に使用CsvExampleGen CSVファイルからデータを読み込みます。このチュートリアルでは、使用するBigQueryExampleGen BigQueryのからデータを読み込むコンポーネントを。

BigQueryクエリを準備する

私たちは、同じ使用しますパーマーペンギンデータセットを。しかし、我々は、BigQueryのテーブルからそれを読み込みますtfx-oss-public.palmer_penguins.palmer_penguins同じCSVファイルを使用して移入されます。

Google Colabを使用している場合は、BigQueryテーブルのコンテンツを直接調べることができます。

%%bigquery --project {GOOGLE_CLOUD_PROJECT}
SELECT *
FROM `tfx-oss-public.palmer_penguins.palmer_penguins`
LIMIT 5

すべての機能は、すでに以外0〜1に正規化されたspeciesのラベルです。私たちは、予測、分類モデル構築するspeciesのペンギンのを。

BigQueryExampleGenフェッチするデータを指定するクエリが必要です。テーブル内のすべての行のすべてのフィールドを使用するため、クエリは非常に簡単です。また、フィールド名を指定して追加することができますWHEREに応じて、必要に応じて条件をBigQueryの標準的なSQL構文

QUERY = "SELECT * FROM `tfx-oss-public.palmer_penguins.palmer_penguins`"

モデルコードを記述します。

私たちは、のように同じモデルのコードを使用するシンプルなTFXパイプラインのチュートリアル

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

モジュールファイルをパイプラインコンポーネントからアクセスできるGCSにコピーします。モデルトレーニングはGCPで行われるため、このモデル定義をアップロードする必要があります。

それ以外の場合は、モジュールファイルを含むコンテナーイメージを作成し、そのイメージを使用してパイプラインを実行することをお勧めします。

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-bigquery/".

パイプライン定義を書く

TFXパイプラインを作成する関数を定義します。私たちは、使用する必要がありますBigQueryExampleGenかかるquery引数としてを。前のチュートリアルからのもう一つの変化は、私たちが合格する必要があるということですbeam_pipeline_argsそれらが実行されたときにコンポーネントに渡されます。私たちは、使用するbeam_pipeline_args BigQueryのに追加のパラメータを渡します。

from typing import List, Optional

def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
                     module_file: str, serving_model_dir: str,
                     beam_pipeline_args: Optional[List[str]],
                     ) -> tfx.dsl.Pipeline:
  """Creates a TFX pipeline using BigQuery."""

  # NEW: Query data in BigQuery as a data source.
  example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
      query=query)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a file destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      # NEW: `beam_pipeline_args` is required to use BigQueryExampleGen.
      beam_pipeline_args=beam_pipeline_args)

頂点パイプラインでパイプラインを実行します。

私たちは、私たちが行ったようにパイプラインを実行するために、頂点パイプラインを使用しますバーテックスパイプラインのチュートリアル用のシンプルなTFXのパイプライン

また、合格する必要がbeam_pipeline_args BigQueryExampleGenため。 GCPプロジェクトの名前やBigQuery実行用の一時ストレージなどの設定が含まれています。

import os

# We need to pass some GCP related configs to BigQuery. This is currently done
# using `beam_pipeline_args` parameter.
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   ]

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        query=QUERY,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR,
        beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS))

生成された定義ファイルは、kfpクライアントを使用して送信できます。

from kfp.v2.google import client

pipelines_client = client.AIPlatformClient(
    project_id=GOOGLE_CLOUD_PROJECT,
    region=GOOGLE_CLOUD_REGION,
)

_ = pipelines_client.create_run_from_job_spec(PIPELINE_DEFINITION_FILE)

今、あなたはの「頂点AI>パイプライン」上または訪問出力内のリンクを訪問することができますGoogleクラウドコンソールの進行状況を確認します。