TensorFlow Cloud を使用した Keras モデルのトレーニング

TensorFlow.org で実行 Google Colab で実行 GitHubでソースを表示 ノートブックをダウンロード

はじめに

TensorFlow Cloud は、ローカルデバッグから Google Cloud 上の分散トレーニングにシームレスに移行する API を提供する Python パッケージです。クラウド上での TensorFlow モデルのトレーニングプロセスを単一のシンプルな関数呼び出しに簡素化して、必要なセットアップを最小限にし、モデルの変更を不要にしました。TensorFlow Cloud は、モデルの VM(仮想マシン)インスタンスや分散ストラテジーの作成など、クラウド固有のタスクを自動的に処理します。このガイドでは、TensorFlow Cloud を通じた Google Cloud とのインターフェース方法と、TensorFlow Cloud 内で提供する幅広い機能について説明します。まずは最もシンプルなユースケースから始めます。

Setup

TensorFlow Cloud をインストールし、このガイドで必要なパッケージのインポートから始めます。

pip install -q tensorflow_cloud
import tensorflow as tf
import tensorflow_cloud as tfc

from tensorflow import keras
from tensorflow.keras import layers

API の概要:最初のエンドツーエンドの例

まずは、以下のような CNN の Keras モデルのトレーニングスクリプトから始めましょう。

(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

model = keras.Sequential(
    [
        keras.Input(shape=(28, 28)),
        # Use a Rescaling layer to make sure input values are in the [0, 1] range.
        layers.experimental.preprocessing.Rescaling(1.0 / 255),
        # The original images have shape (28, 28), so we reshape them to (28, 28, 1)
        layers.Reshape(target_shape=(28, 28, 1)),
        # Follow-up with a classic small convnet
        layers.Conv2D(32, 3, activation="relu"),
        layers.MaxPooling2D(2),
        layers.Conv2D(32, 3, activation="relu"),
        layers.MaxPooling2D(2),
        layers.Conv2D(32, 3, activation="relu"),
        layers.Flatten(),
        layers.Dense(128, activation="relu"),
        layers.Dense(10),
    ]
)

model.compile(
    optimizer=keras.optimizers.Adam(),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=keras.metrics.SparseCategoricalAccuracy(),
)

model.fit(x_train, y_train, epochs=20, batch_size=128, validation_split=0.1)

インポートを始める前にスクリプトの最初に呼び出し run() を追加するだけで、このモデルのトレーニングが Google Cloud で行われるようになります。

tfc.run()

TensorFlow Cloud の使用時には、VM インスタンスの作成や分散ストラテジーなど、クラウド固有のタスクの心配をする必要がありません。API のすべてのパラメータにはインテリジェントなデフォルトが含まれており、すべてのパラメータは構成可能ですが、多くのモデルはこれらのデフォルトに依存しています。

run() を呼び出すと、TensorFlow Cloud は以下を実行します。

  • Python スクリプトやノートブックを分散可能な状態にします。
  • 上記を必要な依存関係を持つ Docker イメージに変換します。
  • GCP や GPU 搭載の VM 上でトレーニングジョブを実行します。
  • 関連するログやジョブ情報をストリーム化します。

デフォルトの VM 構成は、チーフが 1 個とワーカーが 0 個、CPU 8 コアと Tesla T4 GPU 1 台です。

Google Cloud の構成

適切な Cloud のトレーニングの道筋をつけるためには、多少の新規セットアップが必要です。初めて Google Cloud を利用する場合は、いくらかの予備措置を講じる必要があります。

  1. GCP プロジェクトを作成する
  2. AI プラットフォームサービスを有効にする
  3. サービスアカウントを作成する
  4. 認証キーをダウンロードする
  5. Cloud Storage バケットを作成する

初回セットアップに関する詳細な手順については、TensorFlow Cloud README をご覧ください。追加のセットアップ例は TensorFlow ブログに記載されています。

一般的なワークフローと Cloud Storage

大抵の場合は、Google Cloud でトレーニングを行った後にモデルを取得する必要があります。そのためには、リモートトレーニング中に保存と読み込みを Cloud Storage にリダイレクトすることが重要です。TensorFlow Cloud を Cloud Storage バケットに導いて、様々なタスクを行うことができます。ストレージバケットは、大規模なトレーニングデータセットの保存と読み込み、コールバックログやモデル重みの格納、トレーニングモデルファイルの保存に使用することが可能です。まず、fit() を構成してモデルを Cloud Storage に保存し、TensorBoard モニタリングをセットアップしてトレーニングの進捗状況をトラッキングしてみましょう。

def create_model():
    model = keras.Sequential(
        [
            keras.Input(shape=(28, 28)),
            layers.experimental.preprocessing.Rescaling(1.0 / 255),
            layers.Reshape(target_shape=(28, 28, 1)),
            layers.Conv2D(32, 3, activation="relu"),
            layers.MaxPooling2D(2),
            layers.Conv2D(32, 3, activation="relu"),
            layers.MaxPooling2D(2),
            layers.Conv2D(32, 3, activation="relu"),
            layers.Flatten(),
            layers.Dense(128, activation="relu"),
            layers.Dense(10),
        ]
    )

    model.compile(
        optimizer=keras.optimizers.Adam(),
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=keras.metrics.SparseCategoricalAccuracy(),
    )
    return model

トレーニング中に生成された TensorBoard のログとモデルのチェックポイントをクラウド ストレージバケットに保存してみましょう。

import datetime
import os

# Note: Please change the gcp_bucket to your bucket name.
gcp_bucket = "keras-examples"

checkpoint_path = os.path.join("gs://", gcp_bucket, "mnist_example", "save_at_{epoch}")

tensorboard_path = os.path.join(  # Timestamp included to enable timeseries graphs
    "gs://", gcp_bucket, "logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
)

callbacks = [
    # TensorBoard will store logs for each epoch and graph performance for us.
    keras.callbacks.TensorBoard(log_dir=tensorboard_path, histogram_freq=1),
    # ModelCheckpoint will save models after each epoch for retrieval later.
    keras.callbacks.ModelCheckpoint(checkpoint_path),
    # EarlyStopping will terminate training when val_loss ceases to improve.
    keras.callbacks.EarlyStopping(monitor="val_loss", patience=3),
]

model = create_model()

ここでは、直接 Keras からデータをロードします。一般的にはデータセットをクラウドストレージのバケットに格納するのがベストプラクティスですが、TensorFlow Cloud はローカルに格納されているデータセットにも対応可能です。これについては、このガイドのマルチファイルに関するセクションで説明しています。

(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
11493376/11490434 [==============================] - 0s 0us/step

TensorFlow Cloud API は、ローカルとクラウド上のどちらでコードを実行するかを決める、remote() 関数を用意しています。これにより、fit() パラメータをローカル実行とリモート実行に分けて指定することができ、ローカルマシンに負荷をかけることなく容易にデバッグする手段を提供します。

if tfc.remote():
    epochs = 100
    callbacks = callbacks
    batch_size = 128
else:
    epochs = 5
    batch_size = 64
    callbacks = None

model.fit(x_train, y_train, epochs=epochs, callbacks=callbacks, batch_size=batch_size)
Epoch 1/5
938/938 [==============================] - 5s 3ms/step - loss: 0.4699 - sparse_categorical_accuracy: 0.8573
938/938 [==============================] - 3s 3ms/step - loss: 0.0650 - sparse_categorical_accuracy: 0.9797
938/938 [==============================] - 3s 3ms/step - loss: 0.0436 - sparse_categorical_accuracy: 0.9864
938/938 [==============================] - 3s 3ms/step - loss: 0.0325 - sparse_categorical_accuracy: 0.9903
938/938 [==============================] - 3s 3ms/step - loss: 0.0268 - sparse_categorical_accuracy: 0.9915
<tensorflow.python.keras.callbacks.History at 0x7f91f01355c0>

トレーニングが終了したら、モデルを GCS に保存しましょう。

save_path = os.path.join("gs://", gcp_bucket, "mnist_example")

if tfc.remote():
    model.save(save_path)

Docker イメージの構築には、ローカルの Docker インスタンスの代わりに、このストレージバケットを使用することもできます。docker_image_bucket_name パラメータにバケットを追加するだけで、これが可能になります。

# docs_infra: no_execute
tfc.run(docker_image_bucket_name=gcp_bucket)

モデルをトレーニングした後は、保存したモデルを読み込み、パフォーマンスを監視するために TensorBoard のログを表示します。

# docs_infra: no_execute
model = keras.models.load_model(save_path)
#docs_infra: no_execute
tensorboard dev upload --logdir "gs://keras-examples-jonah/logs/fit" --name "Guide MNIST"

大規模プロジェクト

多くの場合、Keras モデルを含むプロジェクトは複数の Python スクリプトを含んでいたり、外部データや特定の依存関係を必要としたりします。TensorFlow Cloud は、大規模デプロイメントに完全な柔軟性があり、プロジェクトを支援する多くのインテリジェントな機能を提供します。

エントリーポイント:Python スクリプトと Jupyter ノートブックのサポート

run() API を呼び出しても、それが必ずモデルトレーニングコードと同じ Python スクリプト内に含まれているとは限りません。そのため、entry_point パラメータを用意しています。entry_point パラメータは、モデルトレーニングコードが置かれている Python スクリプトやノートブックの指定に使用できます。モデルと同じスクリプトから run() を呼び出す場合は、entry_point のデフォルトである None を使用します。

pip 依存関係

プロジェクトが追加の pip 依存関係を呼び出す場合は、requirements.txt ファイルをインクルードして必要な追加のライブラリを指定することができます。このファイル内に必要な依存関係のリストを書き込むだけで、TensorFlow Cloud がそれらをクラウドビルドに統合する処理を行います。

Python ノートブック

TensorFlow Cloud は Python ノートブックからも実行可能です。さらに、指定した entry_point は必要に応じてノートブックにすることができます。ノートブック上の TensorFlow Cloud をスクリプトと比較した場合、念頭に置いておいた方がよい重要な違いが 2 点あります。

  • ノートブック内から run() を呼び出す際に、Docker イメージを構築して格納するために Cloud Storage バケットを指定する必要があります。
  • Google Cloud 認証はすべて認証キーを使用して行い、プロジェクトを指定する必要はありません。ノートブックから TensorFlow Cloud を使用するワークフローの例は、本ガイドの「すべてを統合する」セクションで紹介しています。

マルチファイルプロジェクト

モデルが追加ファイルに依存する場合、ここではそれらのファイルを必ず指定されたエントリポイントと同じディレクトリ(またはサブディレクトリ)に置くようにするだけです。指定した entry_point と同じディレクトリ内に格納されているすべてのファイルは、entry_point に隣接するサブディレクトリに格納されているファイルと同様に、Docker イメージにインクルードされます。これは pip では取得できない依存関係が必要な場合にも該当します。

追加の pip 依存関係を持つカスタムエントリーポイントとマルチファイルプロジェクトの例は、TensorFlow Cloud Repository にあるマルチファイルの例をご覧ください。簡潔にするために、ここでは例の run() 呼び出しのみをインクルードします。

tfc.run(
    docker_image_bucket_name=gcp_bucket,
    entry_point="train_model.py",
    requirements="requirements.txt"
)

マシン構成と分散トレーニング

モデルのトレーニングには、モデルやデータセットの大きさ次第で、幅広く異なったリソースを必要とする可能性があります。複数の GPU を使用した構成を考慮する場合、適合する分散ストラテジーを選択することが重要になります。ここでは、可能性のある構成をいくつか概説します。

マルチワーカー分散

ここでは、COMMON_MACHINE_CONFIGSを使用して、チーフ CPU 1 個、ワーカー GPU 4 個を指定します。

tfc.run(
    docker_image_bucket_name=gcp_bucket,
    chief_config=tfc.COMMON_MACHINE_CONFIGS['CPU'],
    worker_count=2,
    worker_config=tfc.COMMON_MACHINE_CONFIGS['T4_4X']
)

デフォルトでは、TensorFlow Cloudは提供されたchief_configworker_configworker_countパラメータを使用する単純な式で、マシン構成に最適な分散ストラテジーを選択します。

  • 指定した GPU の数が 0 より大きい場合は、tf.distribute.MirroredStrategyが選択されます。
  • ワーカーの数が 0 より大きい場合は、アクセラレータの種類に応じてtf.distribution.experimental.MultiWorkerMirroredStrategyまたはtf.distribution.experimental.TPUStrategyが選択されます。
  • それ以外の場合は、tf.distribute.OneDeviceStrategyが選択されます。

TPU 分散

以下のように TPU 上で同じモデルをトレーニングしてみましょう。

tfc.run(
    docker_image_bucket_name=gcp_bucket,
    chief_config=tfc.COMMON_MACHINE_CONFIGS["CPU"],
    worker_count=1,
    worker_config=tfc.COMMON_MACHINE_CONFIGS["TPU"]
)

カスタム分散ストラテジー

カスタム分散ストラテジーを指定するには、分散トレーニングガイドに従って通常通りにコードをフォーマットし、distribution_strategyNone 設定にします。以下では、同じ MNIST モデルに独自の分散ストラテジーを指定します。

(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

mirrored_strategy = tf.distribute.MirroredStrategy()
with mirrored_strategy.scope():
  model = create_model()

if tfc.remote():
    epochs = 100
    batch_size = 128
else:
    epochs = 10
    batch_size = 64
    callbacks = None

model.fit(
    x_train, y_train, epochs=epochs, callbacks=callbacks, batch_size=batch_size
)

tfc.run(
    docker_image_bucket_name=gcp_bucket,
    chief_config=tfc.COMMON_MACHINE_CONFIGS['CPU'],
    worker_count=2,
    worker_config=tfc.COMMON_MACHINE_CONFIGS['T4_4X'],
    distribution_strategy=None
)

カスタム Docker イメージ

デフォルトでは、TensorFlow Cloud は Google が提供する Docker ベースイメージを使用するので、現在の TensorFlow バージョンに対応します。しかし、必要に応じて構築要件に合わせたカスタム Docker イメージを指定することも可能です。この例では、古い TensorFlow バージョンから Docker イメージを指定します。

tfc.run(
    docker_image_bucket_name=gcp_bucket,
    base_docker_image="tensorflow/tensorflow:2.1.0-gpu"
)

その他のメトリクス

Cloud ジョブに固有のラベルを付けたり、Cloud トレーニング中にモデルのログをストリーム化したりすると有用な場合があります。記録を管理できるように、すべての Cloud ジョブに適切なラベル付けをすることをお勧めします。そのために、run() は最大 64 ペアまでキーと値のラベルのディクショナリを受け入れることができ、これは Cloud のビルドログに表示されます。エポックのパフォーマンスやモデルの保存内部などに関するログは、tfc.run を実行して提供されるリンクを使用してアクセスするか、stream_logs フラグを使用してローカルターミナルに出力することが可能です。

job_labels = {"job": "mnist-example", "team": "keras-io", "user": "jonah"}

tfc.run(
    docker_image_bucket_name=gcp_bucket,
    job_labels=job_labels,
    stream_logs=True
)

すべてを統合する

このガイドで説明した特徴の多くを用いた、さらに深い Colab については、こちらの例に従って特徴抽出を使用し、写真から犬の品種を認識する最先端モデルのトレーニングを行います。