API compatible with PyTorch DataLoader, with a lot more callbacks and flexibility
bs = 4
letters = list(string.ascii_lowercase)
t = [(1,(2,3)),(1,(2,3))]
test_eq(fa_collate(t), default_collate(t))
test_eq(L(fa_collate(t)).map(type), [Tensor,tuple])
t = [(1,(2,(3,4))),(1,(2,(3,4)))]
test_eq(fa_collate(t), default_collate(t))
test_eq(L(fa_collate(t)).map(type), [Tensor,tuple])
test_eq(L(fa_collate(t)[1]).map(type), [Tensor,tuple])
t0 = array([1,2])
t = [t0,(t0,t0)]
test_eq(fa_convert(t), default_convert(t))
test_eq(L(fa_convert(t)).map(type), [Tensor,tuple])
Override item
and use the default infinite sampler to get a stream of unknown length (stop()
when you want to stop the stream).
class RandDL(DataLoader):
def create_item(self, s):
r = random.random()
return r if r<0.95 else stop()
L(RandDL())
L(RandDL(bs=4, drop_last=True)).map(len)
dl = RandDL(bs=4, num_workers=4, drop_last=True)
L(dl).map(len)
test_eq(dl.fake_l.num_workers, 4)
with dl.fake_l.no_multiproc():
test_eq(dl.fake_l.num_workers, 0)
L(dl).map(len)
test_eq(dl.fake_l.num_workers, 4)
def _rand_item(s):
r = random.random()
return r if r<0.95 else stop()
L(DataLoader(create_item=_rand_item))
If you don't set bs
, then dataset
is assumed to provide an iterator or a __getitem__
that returns a batch.
ds1 = DataLoader(letters)
test_eq(L(ds1), letters)
test_eq(len(ds1), 26)
test_shuffled(L(DataLoader(letters, shuffle=True)), letters)
ds1 = DataLoader(letters, indexed=False)
test_eq(L(ds1), letters)
test_eq(len(ds1), 26)
t2 = L(tensor([0,1,2]),tensor([3,4,5]))
ds2 = DataLoader(t2)
test_eq_type(L(ds2), t2)
t3 = L(array([0,1,2]),array([3,4,5]))
ds3 = DataLoader(t3)
test_eq_type(L(ds3), t3.map(tensor))
ds4 = DataLoader(t3, create_batch=noop, after_iter=lambda: setattr(t3, 'f', 1))
test_eq_type(L(ds4), t3)
test_eq(t3.f, 1)
If you do set bs
, then dataset
is assumed to provide an iterator or a __getitem__
that returns a single item of a batch.
def twoepochs(d): return ' '.join(''.join(list(o)) for _ in range(2) for o in d)
ds1 = DataLoader(letters, bs=4, drop_last=True, num_workers=0)
test_eq(twoepochs(ds1), 'abcd efgh ijkl mnop qrst uvwx abcd efgh ijkl mnop qrst uvwx')
ds1 = DataLoader(letters,4,num_workers=2)
test_eq(twoepochs(ds1), 'abcd efgh ijkl mnop qrst uvwx yz abcd efgh ijkl mnop qrst uvwx yz')
ds1 = DataLoader(range(12), bs=4, num_workers=3)
test_eq_type(L(ds1), L(tensor([0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10,11])))
ds1 = DataLoader([str(i) for i in range(11)], bs=4, after_iter=lambda: setattr(t3, 'f', 2))
test_eq_type(L(ds1), L(['0','1','2','3'],['4','5','6','7'],['8','9','10']))
test_eq(t3.f, 2)
it = iter(DataLoader(map(noop,range(20)), bs=4, num_workers=1))
test_eq_type([next(it) for _ in range(3)], [tensor([0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10,11])])
class SleepyDL(list):
def __getitem__(self,i):
time.sleep(random.random()/50)
return super().__getitem__(i)
t = SleepyDL(letters)
%time test_eq(DataLoader(t, num_workers=0), letters)
%time test_eq(DataLoader(t, num_workers=2), letters)
%time test_eq(DataLoader(t, num_workers=4), letters)
dl = DataLoader(t, shuffle=True, num_workers=1)
test_shuffled(L(dl), letters)
test_shuffled(L(dl), L(dl))
class SleepyQueue():
"Simulate a queue with varying latency"
def __init__(self, q): self.q=q
def __iter__(self):
while True:
time.sleep(random.random()/100)
try: yield self.q.get_nowait()
except queues.Empty: return
q = Queue()
for o in range(30): q.put(o)
it = SleepyQueue(q)
%time test_shuffled(L(DataLoader(it, num_workers=4)), range(30))
class A(TensorBase): pass
for nw in (0,2):
t = A(tensor([1,2]))
dl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=nw)
b = first(dl)
test_eq(type(b), A)
t = (A(tensor([1,2])),)
dl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=nw)
b = first(dl)
test_eq(type(b[0]), A)
class A(TensorBase): pass
t = A(tensor(1,2))
tdl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=2, after_batch=to_device)
b = first(tdl)
test_eq(type(b), A)
# Unknown attributes are delegated to `dataset`
test_eq(tdl.pop(), tensor(1,2))