Dataset - The Tensor “Highway” On-Ramp
Machine learning is about lots and lots of data. Organizing the input data is an error-prone, arduous task.
TensorFlow datasets
were designed to build complex input data pipelines from simple, reusable pieces with the clear objective of normalizing that process.
This blog shows off some of the useful features of this new approach to “feed the beast”.
Before we can run any meaningful code, we first need to prep our environment:
import numpy as np
import pathlib as pth
import tensorflow as tf
For convenience, and brevity, let’s create some aliases as well:
td = tf.data
tt = tf.train
Elementary arithmetic
While computers were made to add numbers together, they quickly run into insurmountable obstacles if these numbers are presented as simple textual sequences of digits.
We aim to finally “teach” our computer to correctly add and multiply, just as we learned in elementary school. And we start with a simple yet fascinating example, inspired by here.
Our input data consists of num_samples
(perhaps easily millions) of "x=-12,y=24:y+x:12"
-like strings, or lines, of texts. These visibly consist of defs
, op
and res
fields (separated by :
).
Our variables are: x
and y
, our “operations” are: =
, +
, -
and *
, and our variables can be assigned values from: [-max_val, max_val]
.
The rest of the blogs in this group will continue to build on the below presented results:
vocab = (' ', ':', '|')
vocab += ('x', 'y', '=', ',', '+', '-', '*')
vocab += ('0', '1', '2', '3', '4', '5', '6', '7', '8', '9')
We also intend to make our input data pipeline parametric.
However, the obvious simple and intuitive Python dict
structures, with literal string keys, are error-prone exactly because of unchecked literals.
Our Params class again
A few lines of code gives as the Params
class that leverages the native Python attribute mechanism to validate the names of all our params:
params = dict(
max_val=10,
num_samples=4,
num_shards=3,
)
class Params:
def __init__(self, **kw):
for k, v in kw.items():
setattr(self, k, v)
Synthesize lots of “clean” data
Let’s now randomly generate our data, fittingly as a Python generator, and based on a given Params
instance. For this we define:
def py_gen(ps):
m, n = ps.max_val, ps.num_samples
# x, y vals in defs
vals = np.random.randint(low=1 - m, high=m, size=(2, n))
# (x, y) order if 1 in defs [0] and op [1], respectively
ords = np.random.randint(2, size=(2, n))
# index of ['+', '-', '*']
ops = np.array(['+', '-', '*'])
ops.reshape((1, 3))
ops = ops[np.random.randint(3, size=n)]
for i in range(n):
x, y = vals[:, i]
res = f'x={x},y={y}:' if ords[0, i] else f'y={y},x={x}:'
o = ops[i]
res += (f'x{o}y:' if ords[1, i] else f'y{o}x:')
if o == '+':
res += f'{x + y}'
elif o == '*':
res += f'{x * y}'
else:
assert o == '-'
res += (f'{x - y}' if ords[1, i] else f'{y - x}')
yield res
With our generator defined, it takes just a line of code to create millions of correct “exercises” or samples for our training sessions:
ps = Params(**params)
for s in py_gen(ps):
print(s)
y=-4,x=4:y-x:-8
y=4,x=-8:y*x:-32
y=-8,x=7:y-x:-15
y=-4,x=9:y+x:5
Dataset class
The tf.data.Dataset
class is the main abstraction for our sequence of elements.
Each element of a dataset is one or more Tensors containing the fields, or features
, of our sample
“lines” of elementary math exercises.
Using our “in-memory” generator, we can directly create a TF dataset as follows:
def gen_src(ps):
ds = td.Dataset.from_generator(
lambda: py_gen(ps),
tf.string,
tf.TensorShape([]),
)
return ds
And here are the first 2 samples of the now tensor-based sequence:
dg = gen_src(ps)
for s in dg.take(2):
print(s)
tf.Tensor(b'x=-6,y=0:x+y:-6', shape=(), dtype=string)
tf.Tensor(b'y=1,x=7:y+x:8', shape=(), dtype=string)
Data pipelines and ops
An input data pipeline starts with a “source” dataset, perhaps just as simple as the above.
This “source” can also be a range
, from_tensor_slices
, from_tensors
and even a TextLineDataset
(see the TF docs).
def src_dset(ps):
ds = np.array(list(py_gen(ps)))
ds = td.Dataset.from_tensor_slices(ds)
return ds
All datasets can then be consumed one-by-one as iterables or as aggregatables (e.g. using reduce ops) collections.
Datasets also allow chaining of handy “transformations” to themselves. Some of the canned operations are the intuitive: cache, concatenate, enumerate, reduce, repeat, shuffle, skip, take, zip.
An example of 2 samples of a new dataset, concatenated with all 4 samples of the previous, gen-based, dataset and also “enumerated” on-the-fly is as follows:
ds = src_dset(ps)
for i, s in ds.take(2).concatenate(dg).enumerate():
print(i, s)
tf.Tensor(0, shape=(), dtype=int64) tf.Tensor(b'x=9,y=-9:y+x:0', shape=(), dtype=string)
tf.Tensor(1, shape=(), dtype=int64) tf.Tensor(b'x=-4,y=2:y+x:-2', shape=(), dtype=string)
tf.Tensor(2, shape=(), dtype=int64) tf.Tensor(b'x=5,y=-7:y-x:-12', shape=(), dtype=string)
tf.Tensor(3, shape=(), dtype=int64) tf.Tensor(b'y=-4,x=1:x+y:-3', shape=(), dtype=string)
tf.Tensor(4, shape=(), dtype=int64) tf.Tensor(b'y=2,x=4:x-y:2', shape=(), dtype=string)
tf.Tensor(5, shape=(), dtype=int64) tf.Tensor(b'y=-6,x=-5:y-x:-1', shape=(), dtype=string)
We can also filter our to be “pipeline” at any stage, with the objective of perhaps dropping unfit samples:
@tf.function
def filterer(x):
r = tf.strings.length(x) < 15
tf.print(tf.strings.format('filtering {}... ', x) + ('in' if r else 'out'))
return r
for i, s in enumerate(ds.filter(filterer)):
print(i, s)
filtering "x=9,y=-9:y+x:0"... in
0 tf.Tensor(b'x=9,y=-9:y+x:0', shape=(), dtype=string)
filtering "x=-4,y=2:y+x:-2"... out
filtering "x=-2,y=3:x*y:-6"... out
filtering "x=-3,y=5:y-x:8"... in
1 tf.Tensor(b'x=-3,y=5:y-x:8', shape=(), dtype=string)
“Filaments” of data
More importantly, we can split the pipeline into named “filaments” of data.
This new feature proves to be extremely useful, allowing us to standardize and unify all our data sources with configurable, on-the-fly channeling of features aggregated therein:
@tf.function
def splitter(x):
fs = tf.strings.split(x, ':')
return {'defs': fs[0], 'op': fs[1], 'res': fs[2]}
for s in ds.map(splitter).take(1):
print(s)
{'defs': <tf.Tensor: id=207, shape=(), dtype=string, numpy=b'x=9,y=-9'>, 'op': <tf.Tensor: id=208, shape=(), dtype=string, numpy=b'y+x'>, 'res': <tf.Tensor: id=209, shape=(), dtype=string, numpy=b'0'>}
Another example of a “pipeline component” is an in-line Python dict
-based tokenizer:
tokens = {c: i for i, c in enumerate(vocab)}
@tf.function
def tokenizer(d):
return {
k: tf.numpy_function(
lambda x: tf.constant([tokens[chr(c)] for c in x]),
[v],
Tout=tf.int32,
)
for k, v in d.items()
}
for s in ds.map(splitter).map(tokenizer).take(1):
print(s)
{'defs': <tf.Tensor: id=258, shape=(8,), dtype=int32, numpy=array([ 3, 5, 19, 6, 4, 5, 8, 19], dtype=int32)>, 'op': <tf.Tensor: id=259, shape=(3,), dtype=int32, numpy=array([4, 7, 3], dtype=int32)>, 'res': <tf.Tensor: id=260, shape=(1,), dtype=int32, numpy=array([10], dtype=int32)>}
Sharded binary storage
Datasets can be potentially very large, fitting only on disk and in many files.
As transparent data-pipeline performance is key for training throughput, datasets can also be efficiently encoded into binary sequences stored in sharded
files.
The following will convert our samples into such binary “records”:
def records(dset):
for s in dset:
fs = tt.Features(
feature={
'defs': tt.Feature(int64_list=tt.Int64List(value=s['defs'])),
'op': tt.Feature(int64_list=tt.Int64List(value=s['op'])),
'res': tt.Feature(int64_list=tt.Int64List(value=s['res'])),
})
yield tt.Example(features=fs).SerializeToString()
And we can “dump” our tokenized, ready-to-consume samples into shards of files stored in a directory.
Once these prepared samples are stored, we can “stream” them straight into our models without any more prep (see subsequent blogs):
def shards(ps):
for _ in range(ps.num_shards):
yield src_dset(ps).map(splitter).map(tokenizer)
def dump(ps):
d = pth.Path('/tmp/q/dataset')
d.mkdir(parents=True, exist_ok=True)
for i, ds in enumerate(shards(ps)):
i = '{:0>4d}'.format(i)
p = str(d / f'shard_{i}.tfrecords')
print(f'dumping {p}...')
with tf.io.TFRecordWriter(p) as w:
for r in records(ds):
w.write(r)
yield p
fs = [f for f in dump(ps)]
dumping /tmp/q/dataset/shard_0000.tfrecords...
dumping /tmp/q/dataset/shard_0001.tfrecords...
dumping /tmp/q/dataset/shard_0002.tfrecords...
Loading data
For streaming, or loading the “records” back, we need to create templates used in interpreting the stored binary data.
With the templates defined, loading them back in, straight into our datasets, can be just as follows.
Note that the names of the shard files are conveniently returned by our “dump” function:
features = {
'defs': tf.io.VarLenFeature(tf.int64),
'op': tf.io.VarLenFeature(tf.int64),
'res': tf.io.VarLenFeature(tf.int64),
}
def load(ps, files):
ds = td.TFRecordDataset(files)
if ps.dim_batch:
ds = ds.batch(ps.dim_batch)
return ds.map(lambda x: tf.io.parse_example(x, features))
return ds.map(lambda x: tf.io.parse_single_example(x, features))
Before we actually start using the loaded data in our models, let’s “adapt” the pipeline to supply dense tensors instead of the originally configured sparse ones.
Also, since we haven’t batched anything yet, we set dim_batch
to None
:
@tf.function
def adapter(d):
return [
tf.sparse.to_dense(d['defs']),
tf.sparse.to_dense(d['op']),
tf.sparse.to_dense(d['res']),
]
ps.dim_batch = None
for i, s in enumerate(load(ps, fs).map(adapter)):
print(i, len(s))
0 3
1 3
2 3
3 3
4 3
5 3
6 3
7 3
8 3
9 3
10 3
11 3
The listing above reveals that we merged 3 sharded files, worth 4 samples each, into the 12 printed samples. We only printed the number of features for each sample, for brevity.
Please also note how the above in-line adapter converted our named features into unnamed, positional, i.e. in-a-list features. This was necessary as the Keras Input
doesn’t recognize named input tensors yet.
Batching data
If we turn on batching in our dataset, the same code will now return the following:
ps.dim_batch = 2
for i, s in enumerate(load(ps, fs).map(adapter)):
print(i, s)
0 (<tf.Tensor: id=1362, shape=(2, 8), dtype=int64, numpy=
array([[ 4, 5, 14, 6, 3, 5, 8, 11],
[ 3, 5, 11, 6, 4, 5, 8, 12]])>, <tf.Tensor: id=1363, shape=(2, 3), dtype=int64, numpy=
array([[4, 7, 3],
[4, 8, 3]])>, <tf.Tensor: id=1364, shape=(2, 2), dtype=int64, numpy=
array([[13, 0],
[ 8, 13]])>)
1 (<tf.Tensor: id=1368, shape=(2, 9), dtype=int64, numpy=
array([[ 4, 5, 8, 16, 6, 3, 5, 8, 16],
[ 4, 5, 8, 12, 6, 3, 5, 8, 14]])>, <tf.Tensor: id=1369, shape=(2, 3), dtype=int64, numpy=
array([[3, 8, 4],
[3, 9, 4]])>, <tf.Tensor: id=1370, shape=(2, 1), dtype=int64, numpy=
array([[10],
[18]])>)
2 (<tf.Tensor: id=1374, shape=(2, 8), dtype=int64, numpy=
array([[ 3, 5, 8, 11, 6, 4, 5, 10],
[ 3, 5, 12, 6, 4, 5, 14, 0]])>, <tf.Tensor: id=1375, shape=(2, 3), dtype=int64, numpy=
array([[4, 7, 3],
[4, 8, 3]])>, <tf.Tensor: id=1376, shape=(2, 2), dtype=int64, numpy=
array([[ 8, 11],
[12, 0]])>)
3 (<tf.Tensor: id=1380, shape=(2, 8), dtype=int64, numpy=
array([[ 4, 5, 15, 6, 3, 5, 8, 11],
[ 4, 5, 10, 6, 3, 5, 19, 0]])>, <tf.Tensor: id=1381, shape=(2, 3), dtype=int64, numpy=
array([[4, 9, 3],
[3, 7, 4]])>, <tf.Tensor: id=1382, shape=(2, 2), dtype=int64, numpy=
array([[ 8, 15],
[19, 0]])>)
4 (<tf.Tensor: id=1386, shape=(2, 8), dtype=int64, numpy=
array([[ 4, 5, 12, 6, 3, 5, 10, 0],
[ 4, 5, 19, 6, 3, 5, 8, 17]])>, <tf.Tensor: id=1387, shape=(2, 3), dtype=int64, numpy=
array([[4, 8, 3],
[3, 9, 4]])>, <tf.Tensor: id=1388, shape=(2, 3), dtype=int64, numpy=
array([[12, 0, 0],
[ 8, 16, 13]])>)
5 (<tf.Tensor: id=1392, shape=(2, 8), dtype=int64, numpy=
array([[ 4, 5, 8, 13, 6, 3, 5, 17],
[ 4, 5, 10, 6, 3, 5, 14, 0]])>, <tf.Tensor: id=1393, shape=(2, 3), dtype=int64, numpy=
array([[4, 7, 3],
[3, 9, 4]])>, <tf.Tensor: id=1394, shape=(2, 1), dtype=int64, numpy=
array([[14],
[10]])>)
As preparation for the subsequent blogs, let’s generate a more substantial data source with 10 shards of 1,000 samples each:
ps.max_val = 100
ps.num_samples = 1000
ps.num_shards = 10
fs = [f for f in dump(ps)]
ps.dim_batch = 100
for i, _ in enumerate(load(ps, fs).map(adapter)):
pass
print(i)
dumping /tmp/q/dataset/shard_0000.tfrecords...
dumping /tmp/q/dataset/shard_0001.tfrecords...
dumping /tmp/q/dataset/shard_0002.tfrecords...
dumping /tmp/q/dataset/shard_0003.tfrecords...
dumping /tmp/q/dataset/shard_0004.tfrecords...
dumping /tmp/q/dataset/shard_0005.tfrecords...
dumping /tmp/q/dataset/shard_0006.tfrecords...
dumping /tmp/q/dataset/shard_0007.tfrecords...
dumping /tmp/q/dataset/shard_0008.tfrecords...
dumping /tmp/q/dataset/shard_0009.tfrecords...
99
This concludes our blog, please see how easy masking our uneven sample “lines” can be by clicking on the next blog.