hampi提出的建议概述您的培训工作是一个很好的建议,可能对于了解您的管道中的实际瓶颈是必要的。输入管道性能指南中的其他建议也应该有用。
但是,还有另一个可能的“快速修复”可能很有用。在某些情况下,Dataset.map()
转换中的工作量可能很小,并且主要由调用每个元素的功能的成本决定。在这种情况下,我们经常尝试对map函数进行 处理,并在Dataset.batch()
转换后将其移动,以减少调用函数的次数(在这种情况下,调用次数为1/512次),并执行更大的代码,并且可能更容易实现- parallelize-每批操作。幸运的是,您的管道可以矢量化,如下所示:
def _batch_parser(record_batch):
# NOTE: Use `tf.parse_example()` to operate on batches of records.
parsed = tf.parse_example(record_batch, _keys_to_map)
return parsed['d'], parsed['s']
def init_tfrecord_dataset():
files_train = glob.glob(DIR_TFRECORDS + '*.tfrecord')
random.shuffle(files_train)
with tf.name_scope('tfr_iterator'):
ds = tf.data.TFRecordDataset(files_train) # define data from randomly ordered files
ds = ds.shuffle(buffer_size=10000) # select elements randomly from the buffer
# NOTE: Change begins here.
ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
ds = ds.map(_batch_parser) # map batches based on tfrecord format
# NOTE: Change ends here.
ds = ds.repeat() # iterate infinitely
return ds.make_initializable_iterator() # initialize the iterator
当前,矢量化是您必须手动进行的更改,但是tf.data
团队正在研究提供自动矢量化的优化过程。