此页面由 Cloud Translation API 翻译。
Switch to English

tf.data:构建TensorFlow输入管道

在TensorFlow.org上查看 在Google Colab中运行 在GitHub上查看源代码 下载笔记本

使用tf.data API,您可以从简单,可重复使用的部分中构建复杂的输入管道。例如,图像模型的管道可能会聚合来自分布式文件系统中文件的数据,对每个图像应用随机扰动,然后将随机选择的图像合并为一批进行训练。文本模型的管道可能涉及从原始文本数据中提取符号,将其转换为带有查找表的嵌入标识符,以及将不同长度的序列分批处理。使用tf.data API,可以处理大量数据,从不同数据格式读取数据以及执行复杂的转换。

tf.data API引入了tf.data.Dataset抽象,它表示一系列元素,其中每个元素由一个或多个组件组成。例如,在图像管线中,元素可能是单个训练示例,其中有一对张量分量代表图像及其标签。

创建数据集有两种不同的方法:

  • 数据从存储在内存或一个或多个文件中的数据构造一个Dataset

  • 数据转换从一个或多个tf.data.Dataset对象构造一个数据集。

 import tensorflow as tf
 
 import pathlib
import os
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

np.set_printoptions(precision=4)
 

基础力学

要创建输入管道,您必须从数据开始。例如,要从内存中的数据构造Dataset ,可以使用tf.data.Dataset.from_tensors()tf.data.Dataset.from_tensor_slices() 。或者,如果输入数据以推荐的TFRecord格式存储在文件中,则可以使用tf.data.TFRecordDataset()

拥有Dataset对象后,可以通过链接tf.data.Dataset对象上的方法调用其转换为新的Dataset 。例如,您可以应用每个元素的转换(例如Dataset.map()和多个元素的转换(例如Dataset.batch() 。有关转换的完整列表,请参见tf.data.Dataset的文档。

Dataset对象是Python迭代的。这样就可以使用for循环使用其元素:

 dataset = tf.data.Dataset.from_tensor_slices([8, 3, 0, 8, 2, 1])
dataset
 
<TensorSliceDataset shapes: (), types: tf.int32>
 for elem in dataset:
  print(elem.numpy())
 
8
3
0
8
2
1

或者通过使用iter显式创建一个Python迭代器,并使用next来使用其元素:

 it = iter(dataset)

print(next(it).numpy())
 
8

或者,可以使用reduce转换来消耗数据集元素,从而减少所有元素以产生单个结果。下面的示例说明如何使用reduce转换来计算整数数据集的总和。

 print(dataset.reduce(0, lambda state, value: state + value).numpy())
 
22

数据集结构

数据集包含每个具有相同(嵌套)结构的元素,并且该结构的各个组件可以是tf.TypeSpec可表示的任何类型,包括tf.Tensortf.sparse.SparseTensortf.RaggedTensortf.TensorArray ,或tf.data.Dataset

Dataset.element_spec属性使您可以检查每个元素组件的类型。该属性返回tf.TypeSpec对象的嵌套结构 ,该嵌套结构与元素的结构匹配,该元素可以是单个组件,组件的元组或组件的嵌套元组。例如:

 dataset1 = tf.data.Dataset.from_tensor_slices(tf.random.uniform([4, 10]))

dataset1.element_spec
 
TensorSpec(shape=(10,), dtype=tf.float32, name=None)
 dataset2 = tf.data.Dataset.from_tensor_slices(
   (tf.random.uniform([4]),
    tf.random.uniform([4, 100], maxval=100, dtype=tf.int32)))

dataset2.element_spec
 
(TensorSpec(shape=(), dtype=tf.float32, name=None),
 TensorSpec(shape=(100,), dtype=tf.int32, name=None))
 dataset3 = tf.data.Dataset.zip((dataset1, dataset2))

dataset3.element_spec
 
(TensorSpec(shape=(10,), dtype=tf.float32, name=None),
 (TensorSpec(shape=(), dtype=tf.float32, name=None),
  TensorSpec(shape=(100,), dtype=tf.int32, name=None)))
 # Dataset containing a sparse tensor.
dataset4 = tf.data.Dataset.from_tensors(tf.SparseTensor(indices=[[0, 0], [1, 2]], values=[1, 2], dense_shape=[3, 4]))

dataset4.element_spec
 
SparseTensorSpec(TensorShape([3, 4]), tf.int32)
 # Use value_type to see the type of value represented by the element spec
dataset4.element_spec.value_type
 
tensorflow.python.framework.sparse_tensor.SparseTensor

Dataset转换支持任何结构的数据集。当使用将一个函数应用于每个元素的Dataset.map()Dataset.filter()转换时,元素结构确定该函数的参数:

 dataset1 = tf.data.Dataset.from_tensor_slices(
    tf.random.uniform([4, 10], minval=1, maxval=10, dtype=tf.int32))

dataset1
 
<TensorSliceDataset shapes: (10,), types: tf.int32>
 for z in dataset1:
  print(z.numpy())
 
[8 1 2 6 1 7 2 6 1 3]
[6 5 6 5 3 5 2 5 3 6]
[5 8 4 8 3 1 4 6 4 8]
[2 4 5 8 3 5 7 9 4 2]

 dataset2 = tf.data.Dataset.from_tensor_slices(
   (tf.random.uniform([4]),
    tf.random.uniform([4, 100], maxval=100, dtype=tf.int32)))

dataset2
 
<TensorSliceDataset shapes: ((), (100,)), types: (tf.float32, tf.int32)>
 dataset3 = tf.data.Dataset.zip((dataset1, dataset2))

dataset3
 
<ZipDataset shapes: ((10,), ((), (100,))), types: (tf.int32, (tf.float32, tf.int32))>
 for a, (b,c) in dataset3:
  print('shapes: {a.shape}, {b.shape}, {c.shape}'.format(a=a, b=b, c=c))
 
shapes: (10,), (), (100,)
shapes: (10,), (), (100,)
shapes: (10,), (), (100,)
shapes: (10,), (), (100,)

读取输入数据

消耗NumPy数组

有关更多示例,请参见加载NumPy数组

如果所有输入数据都适合内存,从它们创建Dataset的最简单方法是将它们转换为tf.Tensor对象并使用Dataset.from_tensor_slices()

 train, test = tf.keras.datasets.fashion_mnist.load_data()
 
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
32768/29515 [=================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
26427392/26421880 [==============================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
8192/5148 [===============================================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz
4423680/4422102 [==============================] - 0s 0us/step

 images, labels = train
images = images/255

dataset = tf.data.Dataset.from_tensor_slices((images, labels))
dataset
 
<TensorSliceDataset shapes: ((28, 28), ()), types: (tf.float64, tf.uint8)>

消耗Python生成器

易于生成的另一个常见数据源是tf.data.Dataset它是python生成器。

 def count(stop):
  i = 0
  while i<stop:
    yield i
    i += 1
 
 for n in count(5):
  print(n)
 
0
1
2
3
4

Dataset.from_generator构造函数将python生成器转换为功能齐全的tf.data.Dataset

构造函数将可调用对象作为输入,而不是迭代器。这样就可以在到达终点时重新启动发电机。它带有一个可选的args参数,该参数作为可调用参数传递。

之所以需要output_types参数,是因为tf.data在内部构建了tf.Graph ,而图形边缘需要tf.dtype

 ds_counter = tf.data.Dataset.from_generator(count, args=[25], output_types=tf.int32, output_shapes = (), )
 
 for count_batch in ds_counter.repeat().batch(10).take(10):
  print(count_batch.numpy())
 
[0 1 2 3 4 5 6 7 8 9]
[10 11 12 13 14 15 16 17 18 19]
[20 21 22 23 24  0  1  2  3  4]
[ 5  6  7  8  9 10 11 12 13 14]
[15 16 17 18 19 20 21 22 23 24]
[0 1 2 3 4 5 6 7 8 9]
[10 11 12 13 14 15 16 17 18 19]
[20 21 22 23 24  0  1  2  3  4]
[ 5  6  7  8  9 10 11 12 13 14]
[15 16 17 18 19 20 21 22 23 24]

output_shapes参数不是必需的,但由于许多张量流操作不支持未知秩的张量,因此强烈建议使用它。如果特定轴的长度未知或可变,则在output_shapes中将其设置为None

同样重要的是要注意output_shapesoutput_types遵循与其他数据集方法相同的嵌套规则。

这是一个示例生成器,演示了两个方面,它返回数组的元组,其中第二个数组是长度未知的向量。

 def gen_series():
  i = 0
  while True:
    size = np.random.randint(0, 10)
    yield i, np.random.normal(size=(size,))
    i += 1
 
 for i, series in gen_series():
  print(i, ":", str(series))
  if i > 5:
    break
 
0 : [-1.978  -1.0531  0.1959 -2.1618]
1 : [ 1.9185 -0.1874  0.5084]
2 : [0.1441 0.3987 0.7737 0.9266 1.5057 0.9151]
3 : [ 0.681  -0.6155 -0.1231 -0.2429  0.6892  1.2571 -1.7588 -1.6575 -0.5375]
4 : [-0.5567  1.5298  0.7242  0.2213]
5 : [ 1.5572 -0.6856]
6 : [-1.0965 -0.336   1.2405  0.6006]

第一个输出是int32 ,第二个输出是float32

第一项是标量shape () ,第二项是长度未知的向量,形状(None,)

 ds_series = tf.data.Dataset.from_generator(
    gen_series, 
    output_types=(tf.int32, tf.float32), 
    output_shapes=((), (None,)))

ds_series
 
<FlatMapDataset shapes: ((), (None,)), types: (tf.int32, tf.float32)>

现在,它可以像常规tf.data.Dataset一样使用。请注意,在批处理具有可变形状的数据集时,需要使用Dataset.padded_batch

 ds_series_batch = ds_series.shuffle(20).padded_batch(10)

ids, sequence_batch = next(iter(ds_series_batch))
print(ids.numpy())
print()
print(sequence_batch.numpy())
 
[ 2 10 18  3  6 15 25 23  0  4]

[[ 1.2665 -0.6274  0.4076  1.0146  0.      0.      0.      0.    ]
 [ 0.8091 -0.0683 -0.1464  0.2734  0.7461 -0.1009  0.      0.    ]
 [-0.9381  1.5075  0.      0.      0.      0.      0.      0.    ]
 [ 1.5705  0.4438  0.      0.      0.      0.      0.      0.    ]
 [-0.4692 -1.8328 -2.2838  0.7418  0.0172 -0.3547 -1.4502 -1.2786]
 [-1.574   0.      0.      0.      0.      0.      0.      0.    ]
 [-0.9274  1.4758  0.      0.      0.      0.      0.      0.    ]
 [-0.5043  0.7066  0.9599 -1.2986  0.      0.      0.      0.    ]
 [ 0.      0.      0.      0.      0.      0.      0.      0.    ]
 [-0.4893 -0.6937  0.      0.      0.      0.      0.      0.    ]]

对于更实际的示例,请尝试将preprocessing.image.ImageDataGenerator包装为tf.data.Dataset

首先下载数据:

 flowers = tf.keras.utils.get_file(
    'flower_photos',
    'https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz',
    untar=True)
 
Downloading data from https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz
228818944/228813984 [==============================] - 2s 0us/step

创建image.ImageDataGenerator

 img_gen = tf.keras.preprocessing.image.ImageDataGenerator(rescale=1./255, rotation_range=20)
 
 images, labels = next(img_gen.flow_from_directory(flowers))
 
Found 3670 images belonging to 5 classes.

 print(images.dtype, images.shape)
print(labels.dtype, labels.shape)
 
float32 (32, 256, 256, 3)
float32 (32, 5)

 ds = tf.data.Dataset.from_generator(
    img_gen.flow_from_directory, args=[flowers], 
    output_types=(tf.float32, tf.float32), 
    output_shapes=([32,256,256,3], [32,5])
)

ds
 
<FlatMapDataset shapes: ((32, 256, 256, 3), (32, 5)), types: (tf.float32, tf.float32)>

消耗TFRecord数据

有关端到端示例,请参见加载TFRecords

tf.data API支持多种文件格式,因此您可以处理内存tf.data大型数据集。例如,TFRecord文件格式是一种简单的面向记录的二进制格式,许多TensorFlow应用程序都使用它来训练数据。 tf.data.TFRecordDataset类使您可以流式传输一个或多个TFRecord文件的内容,作为输入管道的一部分。

这是使用法国街道名称标志(FSNS)中测试文件的示例。

 # Creates a dataset that reads all of the examples from two files.
fsns_test_file = tf.keras.utils.get_file("fsns.tfrec", "https://storage.googleapis.com/download.tensorflow.org/data/fsns-20160927/testdata/fsns-00000-of-00001")
 
Downloading data from https://storage.googleapis.com/download.tensorflow.org/data/fsns-20160927/testdata/fsns-00000-of-00001
7905280/7904079 [==============================] - 0s 0us/step

TFRecordDataset初始化程序的filenames参数可以是字符串,字符串列表或字符串tf.Tensor 。因此,如果您有两组用于训练和验证的文件,则可以创建一个工厂方法来生成数据集,并使用文件名作为输入参数:

 dataset = tf.data.TFRecordDataset(filenames = [fsns_test_file])
dataset
 
<TFRecordDatasetV2 shapes: (), types: tf.string>

许多TensorFlow项目在其TFRecord文件中使用序列化的tf.train.Example记录。在检查它们之前,需要对它们进行解码:

 raw_example = next(iter(dataset))
parsed = tf.train.Example.FromString(raw_example.numpy())

parsed.features.feature['image/text']
 
bytes_list {
  value: "Rue Perreyon"
}

消耗文字数据

有关端到端示例,请参见加载文本

许多数据集作为一个或多个文本文件分发。 tf.data.TextLineDataset提供了一种从一个或多个文本文件中提取行的简便方法。给定一个或多个文件名, TextLineDataset将在这些文件的每一行产生一个字符串值的元素。

 directory_url = 'https://storage.googleapis.com/download.tensorflow.org/data/illiad/'
file_names = ['cowper.txt', 'derby.txt', 'butler.txt']

file_paths = [
    tf.keras.utils.get_file(file_name, directory_url + file_name)
    for file_name in file_names
]
 
Downloading data from https://storage.googleapis.com/download.tensorflow.org/data/illiad/cowper.txt
819200/815980 [==============================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/download.tensorflow.org/data/illiad/derby.txt
811008/809730 [==============================] - 0s 0us/step
Downloading data from https://storage.googleapis.com/download.tensorflow.org/data/illiad/butler.txt
811008/807992 [==============================] - 0s 0us/step

 dataset = tf.data.TextLineDataset(file_paths)
 

这是第一个文件的前几行:

 for line in dataset.take(5):
  print(line.numpy())
 
b"\xef\xbb\xbfAchilles sing, O Goddess! Peleus' son;"
b'His wrath pernicious, who ten thousand woes'
b"Caused to Achaia's host, sent many a soul"
b'Illustrious into Ades premature,'
b'And Heroes gave (so stood the will of Jove)'

要在文件之间替换行,请使用Dataset.interleave 。这样可以更轻松地将文件混在一起。这是每种翻译的第一,第二和第三行:

 files_ds = tf.data.Dataset.from_tensor_slices(file_paths)
lines_ds = files_ds.interleave(tf.data.TextLineDataset, cycle_length=3)

for i, line in enumerate(lines_ds.take(9)):
  if i % 3 == 0:
    print()
  print(line.numpy())
 

b"\xef\xbb\xbfAchilles sing, O Goddess! Peleus' son;"
b"\xef\xbb\xbfOf Peleus' son, Achilles, sing, O Muse,"
b'\xef\xbb\xbfSing, O goddess, the anger of Achilles son of Peleus, that brought'

b'His wrath pernicious, who ten thousand woes'
b'The vengeance, deep and deadly; whence to Greece'
b'countless ills upon the Achaeans. Many a brave soul did it send'

b"Caused to Achaia's host, sent many a soul"
b'Unnumbered ills arose; which many a soul'
b'hurrying down to Hades, and many a hero did it yield a prey to dogs and'

默认情况下, TextLineDataset产生每个文件的每一行,这可能是不希望的,例如,如果文件以标题行开头或包含注释。可以使用Dataset.skip()Dataset.filter()转换删除这些行。在这里,您跳过第一行,然后过滤以仅查找幸存者。

 titanic_file = tf.keras.utils.get_file("train.csv", "https://storage.googleapis.com/tf-datasets/titanic/train.csv")
titanic_lines = tf.data.TextLineDataset(titanic_file)
 
Downloading data from https://storage.googleapis.com/tf-datasets/titanic/train.csv
32768/30874 [===============================] - 0s 0us/step

 for line in titanic_lines.take(10):
  print(line.numpy())
 
b'survived,sex,age,n_siblings_spouses,parch,fare,class,deck,embark_town,alone'
b'0,male,22.0,1,0,7.25,Third,unknown,Southampton,n'
b'1,female,38.0,1,0,71.2833,First,C,Cherbourg,n'
b'1,female,26.0,0,0,7.925,Third,unknown,Southampton,y'
b'1,female,35.0,1,0,53.1,First,C,Southampton,n'
b'0,male,28.0,0,0,8.4583,Third,unknown,Queenstown,y'
b'0,male,2.0,3,1,21.075,Third,unknown,Southampton,n'
b'1,female,27.0,0,2,11.1333,Third,unknown,Southampton,n'
b'1,female,14.0,1,0,30.0708,Second,unknown,Cherbourg,n'
b'1,female,4.0,1,1,16.7,Third,G,Southampton,n'

 def survived(line):
  return tf.not_equal(tf.strings.substr(line, 0, 1), "0")

survivors = titanic_lines.skip(1).filter(survived)
 
 for line in survivors.take(10):
  print(line.numpy())
 
b'1,female,38.0,1,0,71.2833,First,C,Cherbourg,n'
b'1,female,26.0,0,0,7.925,Third,unknown,Southampton,y'
b'1,female,35.0,1,0,53.1,First,C,Southampton,n'
b'1,female,27.0,0,2,11.1333,Third,unknown,Southampton,n'
b'1,female,14.0,1,0,30.0708,Second,unknown,Cherbourg,n'
b'1,female,4.0,1,1,16.7,Third,G,Southampton,n'
b'1,male,28.0,0,0,13.0,Second,unknown,Southampton,y'
b'1,female,28.0,0,0,7.225,Third,unknown,Cherbourg,y'
b'1,male,28.0,0,0,35.5,First,A,Southampton,y'
b'1,female,38.0,1,5,31.3875,Third,unknown,Southampton,n'

消费CSV数据

有关更多示例,请参见加载CSV文件加载Pandas DataFrame

CSV文件格式是一种流行的格式,用于以纯文本格式存储表格数据。

例如:

 titanic_file = tf.keras.utils.get_file("train.csv", "https://storage.googleapis.com/tf-datasets/titanic/train.csv")
 
 df = pd.read_csv(titanic_file, index_col=None)
df.head()
 

如果您的数据适合内存,则相同的Dataset.from_tensor_slices方法可用于词典,从而可以轻松导入此数据:

 titanic_slices = tf.data.Dataset.from_tensor_slices(dict(df))

for feature_batch in titanic_slices.take(1):
  for key, value in feature_batch.items():
    print("  {!r:20s}: {}".format(key, value))
 
  'survived'          : 0
  'sex'               : b'male'
  'age'               : 22.0
  'n_siblings_spouses': 1
  'parch'             : 0
  'fare'              : 7.25
  'class'             : b'Third'
  'deck'              : b'unknown'
  'embark_town'       : b'Southampton'
  'alone'             : b'n'

更具可扩展性的方法是根据需要从磁盘加载。

tf.data模块提供了从一个或多个符合RFC 4180的 CSV文件中提取记录的方法。

experimental.make_csv_dataset函数是用于读取csv文件集的高级接口。它支持列类型推断和许多其他功能,例如批处理和混排,以简化用法。

 titanic_batches = tf.data.experimental.make_csv_dataset(
    titanic_file, batch_size=4,
    label_name="survived")
 
 for feature_batch, label_batch in titanic_batches.take(1):
  print("'survived': {}".format(label_batch))
  print("features:")
  for key, value in feature_batch.items():
    print("  {!r:20s}: {}".format(key, value))
 
'survived': [0 1 0 0]
features:
  'sex'               : [b'male' b'female' b'male' b'male']
  'age'               : [28. 42. 43. 21.]
  'n_siblings_spouses': [0 0 0 0]
  'parch'             : [0 0 0 0]
  'fare'              : [47.1    13.      8.05    8.6625]
  'class'             : [b'First' b'Second' b'Third' b'Third']
  'deck'              : [b'unknown' b'unknown' b'unknown' b'unknown']
  'embark_town'       : [b'Southampton' b'Southampton' b'Southampton' b'Southampton']
  'alone'             : [b'y' b'y' b'y' b'y']

如果只需要一列列,则可以使用select_columns参数。

 titanic_batches = tf.data.experimental.make_csv_dataset(
    titanic_file, batch_size=4,
    label_name="survived", select_columns=['class', 'fare', 'survived'])
 
 for feature_batch, label_batch in titanic_batches.take(1):
  print("'survived': {}".format(label_batch))
  for key, value in feature_batch.items():
    print("  {!r:20s}: {}".format(key, value))
 
'survived': [1 0 1 1]
  'fare'              : [24.15    0.     13.8583 53.1   ]
  'class'             : [b'Third' b'Second' b'Second' b'First']

还有一个较低级别的experimental.CsvDataset类,它提供了更精细的控制。它不支持列类型推断。相反,您必须指定每列的类型。

 titanic_types  = [tf.int32, tf.string, tf.float32, tf.int32, tf.int32, tf.float32, tf.string, tf.string, tf.string, tf.string] 
dataset = tf.data.experimental.CsvDataset(titanic_file, titanic_types , header=True)

for line in dataset.take(10):
  print([item.numpy() for item in line])
 
[0, b'male', 22.0, 1, 0, 7.25, b'Third', b'unknown', b'Southampton', b'n']
[1, b'female', 38.0, 1, 0, 71.2833, b'First', b'C', b'Cherbourg', b'n']
[1, b'female', 26.0, 0, 0, 7.925, b'Third', b'unknown', b'Southampton', b'y']
[1, b'female', 35.0, 1, 0, 53.1, b'First', b'C', b'Southampton', b'n']
[0, b'male', 28.0, 0, 0, 8.4583, b'Third', b'unknown', b'Queenstown', b'y']
[0, b'male', 2.0, 3, 1, 21.075, b'Third', b'unknown', b'Southampton', b'n']
[1, b'female', 27.0, 0, 2, 11.1333, b'Third', b'unknown', b'Southampton', b'n']
[1, b'female', 14.0, 1, 0, 30.0708, b'Second', b'unknown', b'Cherbourg', b'n']
[1, b'female', 4.0, 1, 1, 16.7, b'Third', b'G', b'Southampton', b'n']
[0, b'male', 20.0, 0, 0, 8.05, b'Third', b'unknown', b'Southampton', b'y']

如果某些列为空,则此低级界面允许您提供默认值而不是列类型。

 %%writefile missing.csv
1,2,3,4
,2,3,4
1,,3,4
1,2,,4
1,2,3,
,,,
 
Writing missing.csv

 # Creates a dataset that reads all of the records from two CSV files, each with
# four float columns which may have missing values.

record_defaults = [999,999,999,999]
dataset = tf.data.experimental.CsvDataset("missing.csv", record_defaults)
dataset = dataset.map(lambda *items: tf.stack(items))
dataset
 
<MapDataset shapes: (4,), types: tf.int32>
 for line in dataset:
  print(line.numpy())
 
[1 2 3 4]
[999   2   3   4]
[  1 999   3   4]
[  1   2 999   4]
[  1   2   3 999]
[999 999 999 999]

默认情况下, CsvDataset产生文件每一行的每一列,这可能是不希望的,例如,如果文件以应忽略的标题行开头,或者输入中不需要某些列。这些行和字段可以分别使用headerselect_cols参数删除。

 # Creates a dataset that reads all of the records from two CSV files with
# headers, extracting float data from columns 2 and 4.
record_defaults = [999, 999] # Only provide defaults for the selected columns
dataset = tf.data.experimental.CsvDataset("missing.csv", record_defaults, select_cols=[1, 3])
dataset = dataset.map(lambda *items: tf.stack(items))
dataset
 
<MapDataset shapes: (2,), types: tf.int32>
 for line in dataset:
  print(line.numpy())
 
[2 4]
[2 4]
[999   4]
[2 4]
[  2 999]
[999 999]

消耗文件集

有许多作为一组文件分布的数据集,其中每个文件都是一个示例。

 flowers_root = tf.keras.utils.get_file(
    'flower_photos',
    'https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz',
    untar=True)
flowers_root = pathlib.Path(flowers_root)

 

根目录包含每个类的目录:

 for item in flowers_root.glob("*"):
  print(item.name)
 
sunflowers
daisy
LICENSE.txt
roses
tulips
dandelion

每个类目录中的文件都是示例:

 list_ds = tf.data.Dataset.list_files(str(flowers_root/'*/*'))

for f in list_ds.take(5):
  print(f.numpy())
 
b'/home/kbuilder/.keras/datasets/flower_photos/dandelion/7243478942_30bf542a2d_m.jpg'
b'/home/kbuilder/.keras/datasets/flower_photos/tulips/4525067924_177ea3bfb4.jpg'
b'/home/kbuilder/.keras/datasets/flower_photos/tulips/7002703410_3e97b29da5_n.jpg'
b'/home/kbuilder/.keras/datasets/flower_photos/daisy/6299910262_336309ffa5_n.jpg'
b'/home/kbuilder/.keras/datasets/flower_photos/sunflowers/6140661443_bb48344226.jpg'

使用tf.io.read_file函数读取数据,并从路径中提取标签,并返回(image, label)对:

 def process_path(file_path):
  label = tf.strings.split(file_path, os.sep)[-2]
  return tf.io.read_file(file_path), label

labeled_ds = list_ds.map(process_path)
 
 for image_raw, label_text in labeled_ds.take(1):
  print(repr(image_raw.numpy()[:100]))
  print()
  print(label_text.numpy())
 
b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01\x00H\x00H\x00\x00\xff\xe2\x0cXICC_PROFILE\x00\x01\x01\x00\x00\x0cHLino\x02\x10\x00\x00mntrRGB XYZ \x07\xce\x00\x02\x00\t\x00\x06\x001\x00\x00acspMSFT\x00\x00\x00\x00IEC sRGB\x00\x00\x00\x00\x00\x00'

b'tulips'

批处理数据集元素

简单批处理

批处理的最简单形式是将数据集的n个连续元素堆叠为单个元素。 Dataset.batch()转换使用与tf.stack()运算符相同的约束来完全执行此操作,并将其应用于元素的每个组成部分:即,对于每个组成部分i ,所有元素都必须具有完全相同形状的张量。

 inc_dataset = tf.data.Dataset.range(100)
dec_dataset = tf.data.Dataset.range(0, -100, -1)
dataset = tf.data.Dataset.zip((inc_dataset, dec_dataset))
batched_dataset = dataset.batch(4)

for batch in batched_dataset.take(4):
  print([arr.numpy() for arr in batch])
 
[array([0, 1, 2, 3]), array([ 0, -1, -2, -3])]
[array([4, 5, 6, 7]), array([-4, -5, -6, -7])]
[array([ 8,  9, 10, 11]), array([ -8,  -9, -10, -11])]
[array([12, 13, 14, 15]), array([-12, -13, -14, -15])]

tf.data尝试传播形状信息时, Dataset.batch的默认设置Dataset.batch导致未知的批次大小,因为最后一个批次可能未满。请注意形状中的None

 batched_dataset
 
<BatchDataset shapes: ((None,), (None,)), types: (tf.int64, tf.int64)>

使用drop_remainder参数忽略最后一批,并获得完整的形状传播:

 batched_dataset = dataset.batch(7, drop_remainder=True)
batched_dataset
 
<BatchDataset shapes: ((7,), (7,)), types: (tf.int64, tf.int64)>

带填充批处理张量

以上配方适用于所有大小均相同的张量。但是,许多模型(例如序列模型)都可以使用大小可变(例如长度不同的序列)的输入数据。为了处理这种情况,通过Dataset.padded_batch转换,您可以通过指定可以在其中填充的一个或多个维来批量处理不同形状的张量。

 dataset = tf.data.Dataset.range(100)
dataset = dataset.map(lambda x: tf.fill([tf.cast(x, tf.int32)], x))
dataset = dataset.padded_batch(4, padded_shapes=(None,))

for batch in dataset.take(2):
  print(batch.numpy())
  print()

 
[[0 0 0]
 [1 0 0]
 [2 2 0]
 [3 3 3]]

[[4 4 4 4 0 0 0]
 [5 5 5 5 5 0 0]
 [6 6 6 6 6 6 0]
 [7 7 7 7 7 7 7]]


Dataset.padded_batch转换允许您为每个组件的每个维度设置不同的填充,并且填充长度可以是可变长度(在上例中由None表示)或恒定长度。也可以覆盖填充值,该值默认为0。

培训工作流程

处理多个时期

tf.data API提供了两种主要方法来处理同一数据的多个时期。

在多个时期内迭代数据集的最简单方法是使用Dataset.repeat()转换。首先,创建钛酸数据的数据集:

 titanic_file = tf.keras.utils.get_file("train.csv", "https://storage.googleapis.com/tf-datasets/titanic/train.csv")
titanic_lines = tf.data.TextLineDataset(titanic_file)
 
 def plot_batch_sizes(ds):
  batch_sizes = [batch.shape[0] for batch in ds]
  plt.bar(range(len(batch_sizes)), batch_sizes)
  plt.xlabel('Batch number')
  plt.ylabel('Batch size')
 

应用不带参数的Dataset.repeat()转换将无限期地重复输入。

Dataset.repeat转换将其参数连接起来,而没有用信号Dataset.repeat一个纪元的结束和下一个纪元的开始。因此,在Dataset.batch之后应用的Dataset.repeat将产生跨越历元边界的批处理:

 titanic_batches = titanic_lines.repeat(3).batch(128)
plot_batch_sizes(titanic_batches)
 

png

如果需要清晰的历元分隔,请将Dataset.batch放在重复Dataset.batch之前:

 titanic_batches = titanic_lines.batch(128).repeat(3)

plot_batch_sizes(titanic_batches)
 

png

如果您想在每个时期结束时执行自定义计算(例如,收集统计信息),那么最简单的方法是在每个时期重新开始数据集迭代:

 epochs = 3
dataset = titanic_lines.batch(128)

for epoch in range(epochs):
  for batch in dataset:
    print(batch.shape)
  print("End of epoch: ", epoch)
 
(128,)
(128,)
(128,)
(128,)
(116,)
End of epoch:  0
(128,)
(128,)
(128,)
(128,)
(116,)
End of epoch:  1
(128,)
(128,)
(128,)
(128,)
(116,)
End of epoch:  2

随机改组输入数据

Dataset.shuffle()转换维护一个固定大小的缓冲区,并从该缓冲区中随机均匀地选择下一个元素。

向数据集添加索引,以便可以看到效果:

 lines = tf.data.TextLineDataset(titanic_file)
counter = tf.data.experimental.Counter()

dataset = tf.data.Dataset.zip((counter, lines))
dataset = dataset.shuffle(buffer_size=100)
dataset = dataset.batch(20)
dataset
 
<BatchDataset shapes: ((None,), (None,)), types: (tf.int64, tf.string)>

由于buffer_size为100,而批大小为20,因此第一批不包含索引大于120的元素。

 n,line_batch = next(iter(dataset))
print(n.numpy())
 
[ 63  48 101  12 103  52   6  39   4   9  93  91   5  86  79  64  95  33
 102  50]

Dataset.batch ,相对于Dataset.repeat的顺序也Dataset.repeat重要。

直到shuffle缓冲区为空时, Dataset.shuffle才发出信号以Dataset.shuffle纪元结束。因此,在重复之前放置随机播放将显示一个历元的每个元素,然后再移至下一个:

 dataset = tf.data.Dataset.zip((counter, lines))
shuffled = dataset.shuffle(buffer_size=100).batch(10).repeat(2)

print("Here are the item ID's near the epoch boundary:\n")
for n, line_batch in shuffled.skip(60).take(5):
  print(n.numpy())
 
Here are the item ID's near the epoch boundary:

[613 609 624 553 608 583 493 617 611 610]
[217 508 579 601 319 616 606 549 618 623]
[416 567 404 622 283 458 503 602]
[ 87  68  56  16   6  62   1  89  58 106]
[98 80 43 10 67 44 19 34 13 57]

 shuffle_repeat = [n.numpy().mean() for n, line_batch in shuffled]
plt.plot(shuffle_repeat, label="shuffle().repeat()")
plt.ylabel("Mean item ID")
plt.legend()
 
<matplotlib.legend.Legend at 0x7fe0a00a1d68>

png

但是,在随机播放之前进行重复操作将纪元边界混合在一起:

 dataset = tf.data.Dataset.zip((counter, lines))
shuffled = dataset.repeat(2).shuffle(buffer_size=100).batch(10)

print("Here are the item ID's near the epoch boundary:\n")
for n, line_batch in shuffled.skip(55).take(15):
  print(n.numpy())
 
Here are the item ID's near the epoch boundary:

[440  15   8 599 567  18 550   5  19  17]
[ 12 501 571 473 466  21 531 596 580 555]
[  3 573  38 563  25 416 595  29  46 602]
[485 566 561  16 331 615 386  28 609  41]
[611 622 575  10 589  61 598 527  52  35]
[ 55 597  42  23  13  47  11 505  68 582]
[612 613  75  43   7 392  74 452  82 509]
[  9  44  62 491  71 343  51 590  60  98]
[  6  95 619  86 625 537 617  85 465   0]
[ 88  27  92 101 109 111 104  24  36 113]
[103 118  79  53  70  40 121 100  65  33]
[562 588 124 125  64  84  83  67 610 130]
[  4 142 131  90 518 129 143 112   2 551]
[377  91 140  76  50  48 526 553 156 591]
[105 128  69 114  93 520 154  56 145 115]

 repeat_shuffle = [n.numpy().mean() for n, line_batch in shuffled]

plt.plot(shuffle_repeat, label="shuffle().repeat()")
plt.plot(repeat_shuffle, label="repeat().shuffle()")
plt.ylabel("Mean item ID")
plt.legend()
 
<matplotlib.legend.Legend at 0x7fe0582fbb70>

png

预处理数据

Dataset.map(f)转换通过将给定函数f应用于输入数据集的每个元素来生成新的数据集。它基于map()函数,该函数通常以函数式编程语言应用于列表(和其他结构)。函数ftf.Tensor表示在输入一个单一的元素对象,并返回tf.Tensor对象,将在新的数据集表示单个元件。它的实现使用标准的TensorFlow操作将一个元素转换为另一个元素。

本节介绍如何使用Dataset.map()常见示例。

解码图像数据并调整其大小

在现实世界的图像数据上训练神经网络时,通常需要将不同大小的图像转换为通用大小,以便可以将它们批量处理为固定大小。

重建花的文件名数据集:

 list_ds = tf.data.Dataset.list_files(str(flowers_root/'*/*'))
 

编写一个操作数据集元素的函数。

 # Reads an image from a file, decodes it into a dense tensor, and resizes it
# to a fixed shape.
def parse_image(filename):
  parts = tf.strings.split(filename, os.sep)
  label = parts[-2]

  image = tf.io.read_file(filename)
  image = tf.image.decode_jpeg(image)
  image = tf.image.convert_image_dtype(image, tf.float32)
  image = tf.image.resize(image, [128, 128])
  return image, label
 

测试它是否有效。

 file_path = next(iter(list_ds))
image, label = parse_image(file_path)

def show(image, label):
  plt.figure()
  plt.imshow(image)
  plt.title(label.numpy().decode('utf-8'))
  plt.axis('off')

show(image, label)
 

png

将其映射到数据集。

 images_ds = list_ds.map(parse_image)

for image, label in images_ds.take(2):
  show(image, label)
 

png

png

应用任意Python逻辑

出于性能原因,请尽可能使用TensorFlow操作预处理数据。但是,有时在解析输入数据时调用外部Python库很有用。您可以在Dataset.map()转换中使用tf.py_function()操作。

例如,如果要应用随机旋转,则tf.image模块仅具有tf.image.rot90 ,这对于图像增强不是很有用。

为了演示tf.py_function ,请尝试使用scipy.ndimage.rotate函数:

 import scipy.ndimage as ndimage

def random_rotate_image(image):
  image = ndimage.rotate(image, np.random.uniform(-30, 30), reshape=False)
  return image
 
 image, label = next(iter(images_ds))
image = random_rotate_image(image)
show(image, label)
 
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).

png

要使用此功能Dataset.map同样告诫:与Dataset.from_generator ,您需要在应用功能描述返回形状和类型:

 def tf_random_rotate_image(image, label):
  im_shape = image.shape
  [image,] = tf.py_function(random_rotate_image, [image], [tf.float32])
  image.set_shape(im_shape)
  return image, label
 
 rot_ds = images_ds.map(tf_random_rotate_image)

for image, label in rot_ds.take(2):
  show(image, label)
 
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).

png

png

解析tf.Example协议缓冲区消息

许多输入管道从TFRecord格式提取tf.train.Example协议缓冲区消息。每个tf.train.Example记录包含一个或多个“特征”,并且输入管道通常将这些特征转换为张量。

 fsns_test_file = tf.keras.utils.get_file("fsns.tfrec", "https://storage.googleapis.com/download.tensorflow.org/data/fsns-20160927/testdata/fsns-00000-of-00001")
dataset = tf.data.TFRecordDataset(filenames = [fsns_test_file])
dataset
 
<TFRecordDatasetV2 shapes: (), types: tf.string>

您可以在tf.train.Example之外使用tf.data.Dataset来理解数据:

 raw_example = next(iter(dataset))
parsed = tf.train.Example.FromString(raw_example.numpy())

feature = parsed.features.feature
raw_img = feature['image/encoded'].bytes_list.value[0]
img = tf.image.decode_png(raw_img)
plt.imshow(img)
plt.axis('off')
_ = plt.title(feature["image/text"].bytes_list.value[0])
 

png

 raw_example = next(iter(dataset))
 
 def tf_parse(eg):
  example = tf.io.parse_example(
      eg[tf.newaxis], {
          'image/encoded': tf.io.FixedLenFeature(shape=(), dtype=tf.string),
          'image/text': tf.io.FixedLenFeature(shape=(), dtype=tf.string)
      })
  return example['image/encoded'][0], example['image/text'][0]
 
 img, txt = tf_parse(raw_example)
print(txt.numpy())
print(repr(img.numpy()[:20]), "...")
 
b'Rue Perreyon'
b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x02X' ...

 decoded = dataset.map(tf_parse)
decoded
 
<MapDataset shapes: ((), ()), types: (tf.string, tf.string)>
 image_batch, text_batch = next(iter(decoded.batch(10)))
image_batch.shape
 
TensorShape([10])

时间序列窗口化

有关端到端时间序列示例,请参阅: 时间序列预测

时间序列数据通常以完整的时间轴进行组织。

使用简单的Dataset.range演示:

 range_ds = tf.data.Dataset.range(100000)
 

通常,基于此类数据的模型需要连续的时间片。

最简单的方法是批量处理数据:

使用batch

 batches = range_ds.batch(10, drop_remainder=True)

for batch in batches.take(5):
  print(batch.numpy())
 
[0 1 2 3 4 5 6 7 8 9]
[10 11 12 13 14 15 16 17 18 19]
[20 21 22 23 24 25 26 27 28 29]
[30 31 32 33 34 35 36 37 38 39]
[40 41 42 43 44 45 46 47 48 49]

或者,要对未来进行一步一步的密集预测,您可以将要素和标签相对于彼此移动一步:

 def dense_1_step(batch):
  # Shift features and labels one step relative to each other.
  return batch[:-1], batch[1:]

predict_dense_1_step = batches.map(dense_1_step)

for features, label in predict_dense_1_step.take(3):
  print(features.numpy(), " => ", label.numpy())
 
[0 1 2 3 4 5 6 7 8]  =>  [1 2 3 4 5 6 7 8 9]
[10 11 12 13 14 15 16 17 18]  =>  [11 12 13 14 15 16 17 18 19]
[20 21 22 23 24 25 26 27 28]  =>  [21 22 23 24 25 26 27 28 29]

要预测整个窗口而不是固定的偏移量,可以将批处理分为两部分:

 batches = range_ds.batch(15, drop_remainder=True)

def label_next_5_steps(batch):
  return (batch[:-5],   # Take the first 5 steps
          batch[-5:])   # take the remainder

predict_5_steps = batches.map(label_next_5_steps)

for features, label in predict_5_steps.take(3):
  print(features.numpy(), " => ", label.numpy())
 
[0 1 2 3 4 5 6 7 8 9]  =>  [10 11 12 13 14]
[15 16 17 18 19 20 21 22 23 24]  =>  [25 26 27 28 29]
[30 31 32 33 34 35 36 37 38 39]  =>  [40 41 42 43 44]

要在一个批次的功能和另一个批次的标签之间允许某些重叠,请使用Dataset.zip

 feature_length = 10
label_length = 5

features = range_ds.batch(feature_length, drop_remainder=True)
labels = range_ds.batch(feature_length).skip(1).map(lambda labels: labels[:-5])

predict_5_steps = tf.data.Dataset.zip((features, labels))

for features, label in predict_5_steps.take(3):
  print(features.numpy(), " => ", label.numpy())
 
[0 1 2 3 4 5 6 7 8 9]  =>  [10 11 12 13 14]
[10 11 12 13 14 15 16 17 18 19]  =>  [20 21 22 23 24]
[20 21 22 23 24 25 26 27 28 29]  =>  [30 31 32 33 34]

使用window

在使用Dataset.batch ,在某些情况下可能需要更好的控制。 Dataset.window方法可以完全控制您,但需要格外小心:它返回DatasetsDataset 。有关详细信息,请参见数据集结构

 window_size = 5

windows = range_ds.window(window_size, shift=1)
for sub_ds in windows.take(5):
  print(sub_ds)
 
<_VariantDataset shapes: (), types: tf.int64>
<_VariantDataset shapes: (), types: tf.int64>
<_VariantDataset shapes: (), types: tf.int64>
<_VariantDataset shapes: (), types: tf.int64>
<_VariantDataset shapes: (), types: tf.int64>

Dataset.flat_map方法可以获取数据集的数据集并将其展平为单个数据集:

  for x in windows.flat_map(lambda x: x).take(30):
   print(x.numpy(), end=' ')
 
WARNING:tensorflow:AutoGraph could not transform <function <lambda> at 0x7fe0582dbbf8> and will run it as-is.
Cause: could not parse the source code:

for x in windows.flat_map(lambda x: x).take(30):

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function <lambda> at 0x7fe0582dbbf8> and will run it as-is.
Cause: could not parse the source code:

for x in windows.flat_map(lambda x: x).take(30):

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
0 1 2 3 4 1 2 3 4 5 2 3 4 5 6 3 4 5 6 7 4 5 6 7 8 5 6 7 8 9 

在几乎所有情况下,您都需要首先.batch数据集:

 def sub_to_batch(sub):
  return sub.batch(window_size, drop_remainder=True)

for example in windows.flat_map(sub_to_batch).take(5):
  print(example.numpy())
 
[0 1 2 3 4]
[1 2 3 4 5]
[2 3 4 5 6]
[3 4 5 6 7]
[4 5 6 7 8]

现在,您可以看到shift参数控制了每个窗口的移动量。

放在一起,您可以编写以下函数:

 def make_window_dataset(ds, window_size=5, shift=1, stride=1):
  windows = ds.window(window_size, shift=shift, stride=stride)

  def sub_to_batch(sub):
    return sub.batch(window_size, drop_remainder=True)

  windows = windows.flat_map(sub_to_batch)
  return windows

 
 ds = make_window_dataset(range_ds, window_size=10, shift = 5, stride=3)

for example in ds.take(10):
  print(example.numpy())
 
[ 0  3  6  9 12 15 18 21 24 27]
[ 5  8 11 14 17 20 23 26 29 32]
[10 13 16 19 22 25 28 31 34 37]
[15 18 21 24 27 30 33 36 39 42]
[20 23 26 29 32 35 38 41 44 47]
[25 28 31 34 37 40 43 46 49 52]
[30 33 36 39 42 45 48 51 54 57]
[35 38 41 44 47 50 53 56 59 62]
[40 43 46 49 52 55 58 61 64 67]
[45 48 51 54 57 60 63 66 69 72]

然后,像以前一样很容易提取标签:

 dense_labels_ds = ds.map(dense_1_step)

for inputs,labels in dense_labels_ds.take(3):
  print(inputs.numpy(), "=>", labels.numpy())
 
[ 0  3  6  9 12 15 18 21 24] => [ 3  6  9 12 15 18 21 24 27]
[ 5  8 11 14 17 20 23 26 29] => [ 8 11 14 17 20 23 26 29 32]
[10 13 16 19 22 25 28 31 34] => [13 16 19 22 25 28 31 34 37]

重采样

当使用类别非常不平衡的数据集时,您可能需要对数据集重新采样。 tf.data提供了两种方法来执行此操作。信用卡欺诈数据集就是此类问题的一个很好的例子。

 zip_path = tf.keras.utils.get_file(
    origin='https://storage.googleapis.com/download.tensorflow.org/data/creditcard.zip',
    fname='creditcard.zip',
    extract=True)

csv_path = zip_path.replace('.zip', '.csv')
 
Downloading data from https://storage.googleapis.com/download.tensorflow.org/data/creditcard.zip
69156864/69155632 [==============================] - 10s 0us/step

 creditcard_ds = tf.data.experimental.make_csv_dataset(
    csv_path, batch_size=1024, label_name="Class",
    # Set the column types: 30 floats and an int.
    column_defaults=[float()]*30+[int()])
 

现在,检查类的分布,它有很大的偏斜:

 def count(counts, batch):
  features, labels = batch
  class_1 = labels == 1
  class_1 = tf.cast(class_1, tf.int32)

  class_0 = labels == 0
  class_0 = tf.cast(class_0, tf.int32)

  counts['class_0'] += tf.reduce_sum(class_0)
  counts['class_1'] += tf.reduce_sum(class_1)

  return counts
 
 counts = creditcard_ds.take(10).reduce(
    initial_state={'class_0': 0, 'class_1': 0},
    reduce_func = count)

counts = np.array([counts['class_0'].numpy(),
                   counts['class_1'].numpy()]).astype(np.float32)

fractions = counts/counts.sum()
print(fractions)
 
[0.996 0.004]

训练不平衡数据集的一种常见方法是平衡它。 tf.data包含启用此工作流程的一些方法:

数据集采样

重采样数据集的一种方法是使用sample_from_datasets 。当每个类都有单独的data.Dataset时,此方法更适用。

在这里,只需使用过滤器从信用卡欺诈数据中生成它们:

 negative_ds = (
  creditcard_ds
    .unbatch()
    .filter(lambda features, label: label==0)
    .repeat())
positive_ds = (
  creditcard_ds
    .unbatch()
    .filter(lambda features, label: label==1)
    .repeat())
 
WARNING:tensorflow:AutoGraph could not transform <function <lambda> at 0x7fe0a01fd1e0> and will run it as-is.
Cause: could not parse the source code:

    .filter(lambda features, label: label==0)

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function <lambda> at 0x7fe0a01fd1e0> and will run it as-is.
Cause: could not parse the source code:

    .filter(lambda features, label: label==0)

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING:tensorflow:AutoGraph could not transform <function <lambda> at 0x7fe058159620> and will run it as-is.
Cause: could not parse the source code:

    .filter(lambda features, label: label==1)

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function <lambda> at 0x7fe058159620> and will run it as-is.
Cause: could not parse the source code:

    .filter(lambda features, label: label==1)

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert

 for features, label in positive_ds.batch(10).take(1):
  print(label.numpy())
 
[1 1 1 1 1 1 1 1 1 1]

要使用tf.data.experimental.sample_from_datasets传递数据集以及每个数据集的权重:

 balanced_ds = tf.data.experimental.sample_from_datasets(
    [negative_ds, positive_ds], [0.5, 0.5]).batch(10)
 

现在,数据集以50/50的概率生成每个类的示例:

 for features, labels in balanced_ds.take(10):
  print(labels.numpy())
 
[0 0 0 1 0 0 1 1 0 0]
[1 1 0 0 1 1 0 0 0 1]
[1 1 0 0 0 0 0 0 1 1]
[1 0 0 1 1 0 0 0 1 0]
[1 1 0 0 0 1 1 0 0 1]
[0 0 1 1 1 0 0 1 1 0]
[0 0 0 1 0 0 0 1 1 1]
[1 1 0 1 0 0 1 0 1 1]
[0 1 0 0 1 1 0 0 0 1]
[0 1 0 0 1 0 0 1 1 0]

拒绝重采样

上述experimental.sample_from_datasets方法的一个问题是,每个类需要一个单独的tf.data.Dataset 。使用Dataset.filter可以的,但是会导致两次加载所有数据。

可以将data.experimental.rejection_resample函数应用于数据集以重新平衡它,而仅加载一次。元素将从数据集中删除以实现平衡。

data.experimental.rejection_resample采用class_func参数。该class_func应用于每个数据集元素,并用于确定示例出于平衡目的所属的类。

creditcard_ds的元素已经是(features, label)对。因此, class_func只需要返回这些标签:

 def class_func(features, label):
  return label
 

重采样器还需要目标分布,以及可选的初始分布估计:

 resampler = tf.data.experimental.rejection_resample(
    class_func, target_dist=[0.5, 0.5], initial_dist=fractions)
 

重采样器处理各个示例,因此您必须在应用重采样器之前对数据集进行unbatch

 resample_ds = creditcard_ds.unbatch().apply(resampler).batch(10)
 
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/experimental/ops/resampling.py:156: Print (from tensorflow.python.ops.logging_ops) is deprecated and will be removed after 2018-08-20.
Instructions for updating:
Use tf.print instead of tf.Print. Note that tf.print returns a no-output operator that directly prints the output. Outside of defuns or eager mode, this operator will not be executed unless it is directly specified in session.run or used as a control dependency for other operators. This is only a concern in graph mode. Below is an example of how to ensure tf.print executes in graph mode:


重采样器从class_func的输出返回创建(class, example)对。在这种情况下, example已经是一对(feature, label) ,因此请使用map删除标签的多余副本:

 balanced_ds = resample_ds.map(lambda extra_label, features_and_label: features_and_label)
 

现在,数据集以50/50的概率生成每个类的示例:

 for features, labels in balanced_ds.take(10):
  print(labels.numpy())
 
[1 0 1 1 1 1 0 1 1 1]
[0 0 1 1 1 0 1 0 1 1]
[1 0 0 1 0 0 0 0 0 1]
[1 1 1 1 1 1 0 1 1 1]
[1 1 0 1 0 0 0 0 1 0]
[1 0 0 0 1 0 1 0 1 0]
[0 0 0 0 0 0 1 0 0 0]
[0 0 0 1 1 0 0 1 0 1]
[0 0 1 1 1 1 0 0 1 1]
[0 0 1 1 0 1 0 1 1 0]

迭代器检查点

Tensorflow支持获取检查点,以便您的培训过程重新启动时,它可以还原最新的检查点以恢复大部分进度。除了检查模型变量之外,还可以检查数据集迭代器的进度。如果您有一个很大的数据集,并且不想在每次重新启动时都从头开始,则这可能很有用。但是请注意,迭代器检查点可能很大,因为诸如shuffleprefetch需要迭代器中的缓冲元素。

要将迭代器包含在检查点中,请将迭代器传递给tf.train.Checkpoint构造函数。

 range_ds = tf.data.Dataset.range(20)

iterator = iter(range_ds)
ckpt = tf.train.Checkpoint(step=tf.Variable(0), iterator=iterator)
manager = tf.train.CheckpointManager(ckpt, '/tmp/my_ckpt', max_to_keep=3)

print([next(iterator).numpy() for _ in range(5)])

save_path = manager.save()

print([next(iterator).numpy() for _ in range(5)])

ckpt.restore(manager.latest_checkpoint)

print([next(iterator).numpy() for _ in range(5)])
 
[0, 1, 2, 3, 4]
[5, 6, 7, 8, 9]
[5, 6, 7, 8, 9]

使用高级API

喀拉拉邦

tf.keras API简化了创建和执行机器学习模型的许多方面。其.fit() .evaluate() .fit().evaluate() .predict() API支持将数据集作为输入。这是一个快速的数据集和模型设置:

 train, test = tf.keras.datasets.fashion_mnist.load_data()

images, labels = train
images = images/255.0
labels = labels.astype(np.int32)
 
 fmnist_train_ds = tf.data.Dataset.from_tensor_slices((images, labels))
fmnist_train_ds = fmnist_train_ds.shuffle(5000).batch(32)

model = tf.keras.Sequential([
  tf.keras.layers.Flatten(),
  tf.keras.layers.Dense(10)
])

model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), 
              metrics=['accuracy'])
 

传递(feature, label)对的数据集是Model.fitModel.evaluate所需要的:

 model.fit(fmnist_train_ds, epochs=2)
 
Epoch 1/2
WARNING:tensorflow:Layer flatten is casting an input tensor from dtype float64 to the layer's dtype of float32, which is new behavior in TensorFlow 2.  The layer has dtype float32 because it's dtype defaults to floatx.

If you intended to run this layer in float32, you can safely ignore this warning. If in doubt, this warning is likely only an issue if you are porting a TensorFlow 1.X model to TensorFlow 2.

To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

1875/1875 [==============================] - 4s 2ms/step - loss: 0.6031 - accuracy: 0.7937
Epoch 2/2
1875/1875 [==============================] - 3s 2ms/step - loss: 0.4620 - accuracy: 0.8416

<tensorflow.python.keras.callbacks.History at 0x7fe13f9ed3c8>

如果传递无限数据集(例如,通过调用Dataset.repeat() ,则只需传递steps_per_epoch参数:

 model.fit(fmnist_train_ds.repeat(), epochs=2, steps_per_epoch=20)
 
Epoch 1/2
20/20 [==============================] - 0s 2ms/step - loss: 0.4050 - accuracy: 0.8672
Epoch 2/2
20/20 [==============================] - 0s 2ms/step - loss: 0.4077 - accuracy: 0.8703

<tensorflow.python.keras.callbacks.History at 0x7fe0ca13edd8>

对于评估,您可以通过许多评估步骤:

 loss, accuracy = model.evaluate(fmnist_train_ds)
print("Loss :", loss)
print("Accuracy :", accuracy)
 
1875/1875 [==============================] - 3s 2ms/step - loss: 0.4474 - accuracy: 0.8439
Loss : 0.4474281072616577
Accuracy : 0.843916654586792

对于长数据集,设置要评估的步骤数:

 loss, accuracy = model.evaluate(fmnist_train_ds.repeat(), steps=10)
print("Loss :", loss)
print("Accuracy :", accuracy)
 
10/10 [==============================] - 0s 2ms/step - loss: 0.5262 - accuracy: 0.8156
Loss : 0.5262183547019958
Accuracy : 0.815625011920929

调用Model.predict时,不需要标签。

 predict_ds = tf.data.Dataset.from_tensor_slices(images).batch(32)
result = model.predict(predict_ds, steps = 10)
print(result.shape)
 
(320, 10)

但是,如果传递包含标签的数据集,则会忽略标签:

 result = model.predict(fmnist_train_ds, steps = 10)
print(result.shape)
 
(320, 10)

tf估计器

要使用Datasetinput_fn一个的tf.estimator.Estimator ,只是返回的Datasetinput_fn和框架将消耗它的元素对你的照顾。例如:

 import tensorflow_datasets as tfds

def train_input_fn():
  titanic = tf.data.experimental.make_csv_dataset(
      titanic_file, batch_size=32,
      label_name="survived")
  titanic_batches = (
      titanic.cache().repeat().shuffle(500)
      .prefetch(tf.data.experimental.AUTOTUNE))
  return titanic_batches
 
 embark = tf.feature_column.categorical_column_with_hash_bucket('embark_town', 32)
cls = tf.feature_column.categorical_column_with_vocabulary_list('class', ['First', 'Second', 'Third']) 
age = tf.feature_column.numeric_column('age')
 
 import tempfile
model_dir = tempfile.mkdtemp()
model = tf.estimator.LinearClassifier(
    model_dir=model_dir,
    feature_columns=[embark, cls, age],
    n_classes=2
)
 
INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpefmfuc4o', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}

 model = model.train(input_fn=train_input_fn, steps=100)
 
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/ops/resource_variable_ops.py:1666: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
Instructions for updating:
If using Keras pass *_constraint arguments to layers.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/training_util.py:236: Variable.initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version.
Instructions for updating:
Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/feature_column/feature_column_v2.py:540: Layer.add_variable (from tensorflow.python.keras.engine.base_layer_v1) is deprecated and will be removed in a future version.
Instructions for updating:
Please use `layer.add_weight` method instead.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/keras/optimizer_v2/ftrl.py:144: calling Constant.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpefmfuc4o/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 0.6931472, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 100...
INFO:tensorflow:Saving checkpoints for 100 into /tmp/tmpefmfuc4o/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 100...
INFO:tensorflow:Loss for final step: 0.58668363.

 result = model.evaluate(train_input_fn, steps=10)

for key, value in result.items():
  print(key, ":", value)
 
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2020-07-23T01:23:29Z
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpefmfuc4o/model.ckpt-100
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [1/10]
INFO:tensorflow:Evaluation [2/10]
INFO:tensorflow:Evaluation [3/10]
INFO:tensorflow:Evaluation [4/10]
INFO:tensorflow:Evaluation [5/10]
INFO:tensorflow:Evaluation [6/10]
INFO:tensorflow:Evaluation [7/10]
INFO:tensorflow:Evaluation [8/10]
INFO:tensorflow:Evaluation [9/10]
INFO:tensorflow:Evaluation [10/10]
INFO:tensorflow:Inference Time : 0.83507s
INFO:tensorflow:Finished evaluation at 2020-07-23-01:23:30
INFO:tensorflow:Saving dict for global step 100: accuracy = 0.675, accuracy_baseline = 0.58125, auc = 0.71750116, auc_precision_recall = 0.6480325, average_loss = 0.64111984, global_step = 100, label/mean = 0.41875, loss = 0.64111984, precision = 0.85714287, prediction/mean = 0.30204886, recall = 0.26865673
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 100: /tmp/tmpefmfuc4o/model.ckpt-100
accuracy : 0.675
accuracy_baseline : 0.58125
auc : 0.71750116
auc_precision_recall : 0.6480325
average_loss : 0.64111984
label/mean : 0.41875
loss : 0.64111984
precision : 0.85714287
prediction/mean : 0.30204886
recall : 0.26865673
global_step : 100

 for pred in model.predict(train_input_fn):
  for key, value in pred.items():
    print(key, ":", value)
  break
 
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpefmfuc4o/model.ckpt-100
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
logits : [-0.5965]
logistic : [0.3551]
probabilities : [0.6449 0.3551]
class_ids : [0]
classes : [b'0']
all_class_ids : [0 1]
all_classes : [b'0' b'1']