このページは Cloud Translation API によって翻訳されました。
Switch to English

tf.data APIを使用してパフォーマンスの向上

TensorFlow.org上に表示します Googleのコラボで実行します GitHubの上のソースを表示 ダウンロードノート

概要

GPUとTPUは根本的に、単一のトレーニングのステップを実行するのに必要な時間を短縮することができます。ピーク性能を達成することは、現在のステップが完了する前に、次のステップのためのデータを提供し、効率的な入力パイプラインが必要です。 tf.data APIは、柔軟かつ効率的な入力パイプラインを構築することができます。この文書では、使用する方法を示しtf.data性の高いパフォーマンスTensorFlow入力パイプラインを構築するためにAPIを。

続行する前に、「読みビルドTensorFlow入力パイプラインの使用方法を学ぶために、」ガイドをtf.data APIを。

リソース

セットアップ

 import tensorflow as tf

import time
 

このガイドでは、データセット全体で反復してパフォーマンスを測定します。再現性のある性能ベンチマークを作ることは、それに影響を与えることは難しい、さまざまな要因になります

  • 現在のCPU負荷、
  • ネットワークトラフィック、
  • などのキャッシュのような複雑なメカニズム

そのため、人工的な例を構築し、再現性のベンチマークを提供します。

データセット

継承したクラスを定義tf.data.Dataset呼ばArtificialDataset 。このデータセット:

  • 生成num_samplesサンプル(デフォルトは3です)
  • ファイルを開くシミュレートする最初の項目の前にいくつかの時間のために眠ります
  • ファイルからのデータの読み出しをシミュレートするために、各項目を生成する前に、いくつかの時間のために眠ります
 class ArtificialDataset(tf.data.Dataset):
    def _generator(num_samples):
        # Opening the file
        time.sleep(0.03)
        
        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)
            
            yield (sample_idx,)
    
    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=tf.dtypes.int64,
            output_shapes=(1,),
            args=(num_samples,)
        )
 

このデータセットは同様であるtf.data.Dataset.range開始時と各サンプルとの間の固定遅延を加算し、1。

トレーニングループ

それは、データセットを反復処理するために要する時間を測定していることのダミー訓練ループを書きます。トレーニング時間がシミュレートされます。

 def benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)
    tf.print("Execution time:", time.perf_counter() - start_time)
 

パフォーマンスを最適化

パフォーマンスを最適化することができますどのように発揮するには、パフォーマンスが向上しますArtificialDataset

単純なアプローチ

素朴なパイプラインがそのままデータセットを繰り返し処理、何のトリックを使用していないと起動します。

 benchmark(ArtificialDataset())
 
Execution time: 0.2346214259999897

ボンネットの下では、これはあなたの実行時間が費やされた方法です。

ナイーブ

あなたは、トレーニングステップを行うことが関与していることがわかります。

  • それはまだ開かれていない場合は、ファイルを開いて、
  • ファイルからのデータ入力をフェッチし、
  • トレーニングのためのデータを使用して。

あなたのパイプラインがデータをフェッチしている間しかし、ここのような素朴な同期の実装では、あなたのモデルがアイドル状態に座っています。お使いのモデルが訓練されている間は逆に、入力パイプラインはアイドルに座っています。トレーニングステップ時間は、このようにすべての、開口部、読書やトレーニング時間の合計です。

次のセクションでは、パフォーマンスTensorFlow入力パイプラインを設計するためのベストプラクティスを示し、この入力パイプライン上に構築します。

プリフェッチ

プリフェッチは、トレーニングステップの前処理およびモデルの実行と重なります。モデルは、トレーニングステップ実行中にs 、入力パイプラインは、工程のためのデータを読み取っているs+1 。そうすることで、トレーニングの最大値(和とは対照的に)、それはデータを抽出するのにかかる時間にステップ時間を減少させます。

tf.data APIを提供しtf.data.Dataset.prefetch変換を。データが、データが消費された時点から生成される時間を分離するために使用することができます。具体的には、変換は、バックグラウンドスレッド、それらが要求された時間の入力データセットの前方からプリフェッチ要素に内部バッファを使用します。プリフェッチする要素の数に等しい(あるいはより大きい)単一の訓練段階で消費されるバッチの数でなければなりません。あなたは手動で調整するには、この値か、またはそれを設定しtf.data.experimental.AUTOTUNE促すメッセージが表示されますどのtf.data実行時に動的にチューニングした値を、ランタイムを。

注プリフェッチ変換はメリットにの作品と「プロデューサー」の作業オーバーラップする機会があり、いつでも提供している「消費者を。」

 benchmark(
    ArtificialDataset()
    .prefetch(tf.data.experimental.AUTOTUNE)
)
 
Execution time: 0.19066840700008925

プリフェッチ

今度は、トレーニングステップは、サンプル0のために実行されている間、入力パイプラインは、サンプル1のデータを読み込み、というようにされていることがわかります。

並列化データ抽出

現実世界の設定では、入力データ(例えば、GCS又はHDFS)リモートに格納されてもよいです。なぜなら、ローカルとリモートのストレージの間に以下の相違点にリモートでデータを読み込むときに、ローカルにデータを読み込むときにうまく機能セットのパイプラインは、I / Oボトルネックになるかもしれません。

  • タイム・ツー・1バイト目:リモートストレージからファイルの最初のバイトを読むには、ローカルストレージからよりも長い大きさの注文を取ることができます。
  • スループット読む:リモートストレージは通常、大規模な総帯域幅を提供していますが、1つのファイルを読み込むことはこれだけの帯域幅のごく一部を利用することができるかもしれません。

生のバイトがメモリにロードされるとさらに、また、(例えば、デシリアライズおよび/またはデータを復号化する必要があるかもしれないいるProtobuf追加の計算を必要とします)。このオーバーヘッドは、データがローカルまたはリモートで格納されているかどうかの存在に関わらずであるが、データを効果的にプリフェッチされていない場合、リモート場合に悪化することができます。

様々なデータ抽出のオーバーヘッドの影響を軽減するために、 tf.data.Dataset.interleave変換は、(例えば、データ・ファイル・リーダーのような)他のデータセットの内容をインターリーブ、データローディングステップを並列化するために使用することができます。オーバーラップのデータセットの数で指定することができるcycle_length並列処理のレベルを指定することができながら、引数num_parallel_calls引数。同様にprefetch変換、 interleave変換がサポートされていtf.data.experimental.AUTOTUNEについての決定を委任するかをするために使用する並列処理のレベルtf.dataランタイム。

シーケンシャルインタリーブ

デフォルト引数tf.data.Dataset.interleave変換は、順次2つのデータセットからの単一のサンプルをインターリーブします。

 benchmark(
    tf.data.Dataset.range(2)
    .interleave(ArtificialDataset)
)
 
Execution time: 0.19681244399998832

シーケンシャルインタリーブ

このプロットは、動作発揮することを可能にするinterleave利用可能な2つのデータセットから、代替的サンプルを取り出し、変換を。しかし、パフォーマンスの改善はここに含まれません。

パラレルインターリーブ

今使用num_parallel_calls引数interleave変換を。この負荷複数の並列でのデータセット、ファイルを待っている時間を短縮することが開かれます。

 benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        ArtificialDataset,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
)
 
Execution time: 0.1402252690000978

パラレルインターリーブ

この時間は、2つのデータセットの読み出しは、グローバルデータ処理時間を短縮する、並列化されています。

データ変換を並列化

データを準備する際に、入力要素は前処理する必要があるかもしれません。この目的のために、 tf.data APIの提供tf.data.Dataset.map入力データセットの各要素にユーザ定義関数を適用し変換、。入力要素は互いに独立しているので、前処理は、複数のCPUコアを横切って並列化することができます。これを可能にするために、同様にprefetchinterleave変換、 map変換が提供num_parallel_calls並列処理のレベルを指定する引数。

最高の価値選ぶnum_parallel_calls引数は、ハードウェアに依存します(たとえば、その大きさや形状など)あなたのトレーニングデータの特性、マップ機能のコスト、および他のどのような処理が同時にCPU上で起こっています。シンプルなヒューリスティックは、使用可能なCPUコアの数を使用することです。しかし、用としてprefetchinterleave変換、 map変換がサポートされていtf.data.experimental.AUTOTUNEするために使用する並列処理のどのレベルについての決定を委任れるtf.dataランタイム。

 def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s
 

シーケンシャルマッピング

使用して開始mapベースライン例として、並列なしの変換を。

 benchmark(
    ArtificialDataset()
    .map(mapped_function)
)
 
Execution time: 0.41994816599992646

シーケンシャルマッピング

ナイーブなアプローチ 、ここでは開口部のために費やした時間、読書、前処理(マッピング)とトレーニングは1回の反復のために一緒に合計を繰り返します。

パラレルマッピング

今、同じ前処理機能を使用するが、複数のサンプルに対して並列にそれを適用します。

 benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
)
 
Execution time: 0.25377997199996116

パラレルマッピング

さて、あなたは前処理ステップは、1回の反復のために全体の時間を短縮、オーバーラップするプロットで見ることができます。

キャッシング

tf.data.Dataset.cache変換は、メモリまたはローカルストレージ上のいずれかに、データセットをキャッシュすることができます。これは、各エポック中に実行されることから(ファイルのオープンとデータ読み出しのような)一部の操作が保存されます。

 benchmark(
    ArtificialDataset()
    .map(  # Apply time consuming operations before cache
        mapped_function
    ).cache(
    ),
    5
)
 
Execution time: 0.36316757900010543

キャッシュされたデータセット

あなたがデータセットをキャッシュすると、前の変換cache (ファイルのオープンと読み込みデータのような)1は、最初のエポック中に実行されています。次のエポックは、キャッシュされたデータによって再利用されますcache変換を。

渡されたユーザー定義関数ならばmap変換が高価であり、適用cache後の変換をmapたデータセットがまだメモリまたはローカルストレージに収まる限りとして変換。ユーザー定義関数は、スペースを増加させた場合のいずれか、キャッシュ容量を超えてデータセットを保存した後、それを適用するために必要なcache変換またはリソース使用量を減らすために、あなたのトレーニングの仕事の前に前処理してデータを考慮してください。

ベクトル化マッピング

渡されるユーザ定義関数呼び出しmapオーバーヘッドスケジューリングに関係している変換し、ユーザ定義関数を実行します。当社は、ユーザー定義関数をベクトル化をお勧めします(つまり、それは一度に入力のバッチで動作してい)と適用するbatch の前変換をmap転換。

この良い習慣を説明するために、あなたの人工データセットは適していません。スケジューリング遅延は、はるかに少ないで使用される数十ミリ秒よりも約10マイクロ秒(10E-6秒)でありArtificialDataset 、したがってその影響を見ることが困難です。

この例では、基地使用tf.data.Dataset.range機能を、その最も単純な形にトレーニング・ループを簡素化します。

 fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for _ in tf.data.Dataset.range(num_epochs):
        for _ in dataset:
            pass
    tf.print("Execution time:", time.perf_counter() - start_time)
    
def increment(x):
    return x+1
 

スカラーマッピング

 fast_benchmark(
    fast_dataset
    # Apply function one item at a time
    .map(increment)
    # Batch
    .batch(256)
)
 
Execution time: 0.9026268119999941

スカラマップ

プロットは、上記の(少ないサンプルで)何が起こっているかを示しています。あなたは、マップされた機能は、各サンプルに適用されていることがわかります。この機能は非常に高速ですが、それはいくつかのオーバーヘッドそのインパクト時の性能を持っています。

ベクトル化マッピング

 fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handle batches
    .map(increment)
)
 
Execution time: 0.03311353300000519

ベクトル化マップ

今回は、マッピングされた機能は、一度と呼ばれ、サンプルのバッチに適用されます。機能を実行するために多くの時間を要する可能性がありますが、オーバーヘッドが全体の時間のパフォーマンスを向上させること、一度だけ表示されます。

メモリフットプリントを削減

含む、変換の数、 interleaveprefetch 、およびshuffle 、要素の内部バッファを維持します。ユーザ定義関数は、に渡された場合map変換バッファ素子は、メモリ使用量に影響を与えることの要素のサイズは、マップ変換の順序および変換を変更します。一般的に、私達はより低いメモリフットプリントでの結果、異なる順序がパフォーマンスのために望ましい場合を除きという順序を選択することをお勧めします。

部分的な計算をキャッシュ

後にデータセットをキャッシュすることが推奨されmapこの変換がメモリに収まるようにデータが大きすぎて行う場合を除き、変換。時間のかかる1と一部を消費するメモリ:あなたのマップされた関数は、2つの部分に分割することができた場合のトレードオフを達成することができます。このケースでは、以下のようなあなたの変換をチェーンすることができます:

 dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)
 

このように、時間のかかる部分は、最初のエポック中に実行され、あなたはあまりにも多くのキャッシュ・スペースの使用は避けてください。

ベストプラクティスの概要

ここではパフォーマンスTensorFlow入力パイプラインを設計するためのベストプラクティスの概要は次のとおりです。

数字を再現

中に深く移動するにはtf.data.Dataset APIを理解し、あなた自身のパイプラインで遊ぶことができます。以下は、このガイドから画像をプロットするために使用されるコードです。 :それは、次のような一般的な難しさのためにいくつかの回避策を示し、良い出発点となります

  • 実行時間再現。
  • マップされた機能を熱心実行。
  • interleave変換呼び出し可能。
 import itertools
from collections import defaultdict

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
 

データセット

同様にArtificialDatasetあなたは、各ステップに費やされた時間を返すデータセットを構築することができます。

 class TimeMeasuredDataset(tf.data.Dataset):
    # OUTPUT: (steps, timings, counters)
    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))
    
    _INSTANCES_COUNTER = itertools.count()  # Number of datasets generated
    _EPOCHS_COUNTER = defaultdict(itertools.count)  # Number of epochs done for each dataset
    
    def _generator(instance_idx, num_samples):
        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])
        
        # Opening the file
        open_enter = time.perf_counter()
        time.sleep(0.03)
        open_elapsed = time.perf_counter() - open_enter
        
        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            read_enter = time.perf_counter()
            time.sleep(0.015)
            read_elapsed = time.perf_counter() - read_enter
            
            yield (
                [("Open",), ("Read",)],
                [(open_enter, open_elapsed), (read_enter, read_elapsed)],
                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
            )
            open_enter, open_elapsed = -1., -1.  # Negative values will be filtered
            
    
    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(next(cls._INSTANCES_COUNTER), num_samples)
        )
 

このデータセットは、形状のサンプルを提供する[[2, 1], [2, 2], [2, 3]]とタイプの[tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32]各サンプルは次のとおりです。

 (
  [("Open"), ("Read")],
  [(t0, d), (t0, d)],
  [(i, e, -1), (i, e, s)]
)
 

どこ:

  • OpenReadのステップ識別子です
  • t0対応するステップが開始されたときにタイムスタンプです
  • d 、対応するステップに費やされた時間であります
  • iインスタンスインデックスであります
  • e (データセットが反復された回数)エポックインデックスであります
  • sサンプルインデックスであります

反復ループ

全てのタイミングを集約する複雑もう少し反復ループを作成します。これは、上記で詳述したようサンプルを生成データセットで動作します。

 def timelined_benchmark(dataset, num_epochs=2):
    # Initialize accumulators
    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)
    
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        epoch_enter = time.perf_counter()
        for (steps, times, values) in dataset:
            # Record dataset preparation informations
            steps_acc = tf.concat((steps_acc, steps), axis=0)
            times_acc = tf.concat((times_acc, times), axis=0)
            values_acc = tf.concat((values_acc, values), axis=0)
            
            # Simulate training time
            train_enter = time.perf_counter()
            time.sleep(0.01)
            train_elapsed = time.perf_counter() - train_enter
            
            # Record training informations
            steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)
        
        epoch_elapsed = time.perf_counter() - epoch_enter
        # Record epoch informations
        steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
        time.sleep(0.001)
    
    tf.print("Execution time:", time.perf_counter() - start_time)
    return {"steps": steps_acc, "times": times_acc, "values": values_acc}
 

作図方法

最後に、返される値指定されたタイムラインプロットすることができる機能を定義timelined_benchmark機能を。

 def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
    # Remove invalid entries (negative times, or empty steps) from the timelines
    invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
    steps = timeline['steps'][invalid_mask].numpy()
    times = timeline['times'][invalid_mask].numpy()
    values = timeline['values'][invalid_mask].numpy()
    
    # Get a set of different steps, ordered by the first time they are encountered
    step_ids, indices = np.stack(np.unique(steps, return_index=True))
    step_ids = step_ids[np.argsort(indices)]

    # Shift the starting time to 0 and compute the maximal time value
    min_time = times[:,0].min()
    times[:,0] = (times[:,0] - min_time)
    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)
    
    cmap = mpl.cm.get_cmap("plasma")
    plt.close()
    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
    fig.suptitle(title)
    fig.set_size_inches(17.0, len(step_ids))
    plt.xlim(-0.01, end)
    
    for i, step in enumerate(step_ids):
        step_name = step.decode()
        ax = axs[i]
        ax.set_ylabel(step_name)
        ax.set_ylim(0, 1)
        ax.set_yticks([])
        ax.set_xlabel("time (s)")
        ax.set_xticklabels([])
        ax.grid(which="both", axis="x", color="k", linestyle=":")
        
        # Get timings and annotation for the given step
        entries_mask = np.squeeze(steps==step)
        serie = np.unique(times[entries_mask], axis=0)
        annotations = values[entries_mask]
        
        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
        if annotate:
            for j, (start, width) in enumerate(serie):
                annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
                        horizontalalignment='left', verticalalignment='center')
    if save:
        plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")
 

マップされた機能のためのラッパーを使用します

熱心なコンテキストにマッピング機能を実行するには、内部でそれらをラップする必要がありtf.py_functionコール。

 def map_decorator(func):
    def wrapper(steps, times, values):
        # Use a tf.py_function to prevent auto-graph from compiling the method
        return tf.py_function(
            func,
            inp=(steps, times, values),
            Tout=(steps.dtype, times.dtype, values.dtype)
        )
    return wrapper
 

パイプラインの比較

 _batch_map_num_items = 50

def dataset_generator_fun(*args):
    return TimeMeasuredDataset(num_samples=_batch_map_num_items)
 

ナイーブ

 @map_decorator
def naive_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001)  # Time consuming step
    time.sleep(0.0001)  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, [["Map"]]), axis=0),
        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
        tf.concat((values, [values[-1]]), axis=0)
    )

naive_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .flat_map(dataset_generator_fun)
    .map(naive_map)
    .batch(_batch_map_num_items, drop_remainder=True)
    .unbatch(),
    5
)
 
Execution time: 12.461227312999995

最適化されました

 @map_decorator
def time_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001 * values.shape[0])  # Time consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


@map_decorator
def memory_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.0001 * values.shape[0])  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    # Use tf.tile to handle batch dimension
    return (
        tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


optimized_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .interleave(  # Parallelize data reading
        dataset_generator_fun,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .batch(  # Vectorize your mapped function
        _batch_map_num_items,
        drop_remainder=True)
    .map(  # Parallelize map transformation
        time_consuming_map,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .cache()  # Cache data
    .map(  # Reduce memory usage
        memory_consuming_map,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .prefetch(  # Overlap producer and consumer works
        tf.data.experimental.AUTOTUNE
    )
    .unbatch(),
    5
)
 
Execution time: 6.333680681000033

 draw_timeline(naive_timeline, "Naive", 15)
 

PNG

 draw_timeline(optimized_timeline, "Optimized", 15)
 

PNG