| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import os |
| import tarfile |
| import unittest |
| import mxnet as mx |
| import numpy as np |
| import random |
| from mxnet import gluon |
| import platform |
| from common import setup_module, with_seed, teardown |
| from mxnet.gluon.data import DataLoader |
| import mxnet.ndarray as nd |
| from mxnet import context |
| from mxnet.gluon.data.dataset import Dataset |
| |
| @with_seed() |
| def test_array_dataset(): |
| X = np.random.uniform(size=(10, 20)) |
| Y = np.random.uniform(size=(10,)) |
| dataset = gluon.data.ArrayDataset(X, Y) |
| loader = gluon.data.DataLoader(dataset, 2) |
| for i, (x, y) in enumerate(loader): |
| assert mx.test_utils.almost_equal(x.asnumpy(), X[i*2:(i+1)*2]) |
| assert mx.test_utils.almost_equal(y.asnumpy(), Y[i*2:(i+1)*2]) |
| |
| dataset = gluon.data.ArrayDataset(X) |
| loader = gluon.data.DataLoader(dataset, 2) |
| |
| for i, x in enumerate(loader): |
| assert mx.test_utils.almost_equal(x.asnumpy(), X[i*2:(i+1)*2]) |
| |
| |
| def prepare_record(): |
| if not os.path.isdir("data/test_images"): |
| os.makedirs('data/test_images') |
| if not os.path.isdir("data/test_images/test_images"): |
| gluon.utils.download("http://data.mxnet.io/data/test_images.tar.gz", "data/test_images.tar.gz") |
| tarfile.open('data/test_images.tar.gz').extractall('data/test_images/') |
| if not os.path.exists('data/test.rec'): |
| imgs = os.listdir('data/test_images/test_images') |
| record = mx.recordio.MXIndexedRecordIO('data/test.idx', 'data/test.rec', 'w') |
| for i, img in enumerate(imgs): |
| str_img = open('data/test_images/test_images/'+img, 'rb').read() |
| s = mx.recordio.pack((0, i, i, 0), str_img) |
| record.write_idx(i, s) |
| return 'data/test.rec' |
| |
| |
| @with_seed() |
| def test_recordimage_dataset(): |
| recfile = prepare_record() |
| fn = lambda x, y : (x, y) |
| dataset = gluon.data.vision.ImageRecordDataset(recfile).transform(fn) |
| loader = gluon.data.DataLoader(dataset, 1) |
| |
| for i, (x, y) in enumerate(loader): |
| assert x.shape[0] == 1 and x.shape[3] == 3 |
| assert y.asscalar() == i |
| |
| def _dataset_transform_fn(x, y): |
| """Named transform function since lambda function cannot be pickled.""" |
| return x, y |
| |
| def _dataset_transform_first_fn(x): |
| """Named transform function since lambda function cannot be pickled.""" |
| return x |
| |
| @with_seed() |
| def test_recordimage_dataset_with_data_loader_multiworker(): |
| recfile = prepare_record() |
| dataset = gluon.data.vision.ImageRecordDataset(recfile) |
| loader = gluon.data.DataLoader(dataset, 1, num_workers=5) |
| |
| for i, (x, y) in enumerate(loader): |
| assert x.shape[0] == 1 and x.shape[3] == 3 |
| assert y.asscalar() == i |
| |
| # with transform |
| dataset = gluon.data.vision.ImageRecordDataset(recfile).transform(_dataset_transform_fn) |
| loader = gluon.data.DataLoader(dataset, 1, num_workers=5) |
| |
| for i, (x, y) in enumerate(loader): |
| assert x.shape[0] == 1 and x.shape[3] == 3 |
| assert y.asscalar() == i |
| |
| # with transform_first |
| dataset = gluon.data.vision.ImageRecordDataset(recfile).transform_first(_dataset_transform_first_fn) |
| loader = gluon.data.DataLoader(dataset, 1, num_workers=5) |
| |
| for i, (x, y) in enumerate(loader): |
| assert x.shape[0] == 1 and x.shape[3] == 3 |
| assert y.asscalar() == i |
| |
| @with_seed() |
| def test_sampler(): |
| seq_sampler = gluon.data.SequentialSampler(10) |
| assert list(seq_sampler) == list(range(10)) |
| rand_sampler = gluon.data.RandomSampler(10) |
| assert sorted(list(rand_sampler)) == list(range(10)) |
| seq_batch_keep = gluon.data.BatchSampler(seq_sampler, 3, 'keep') |
| assert sum(list(seq_batch_keep), []) == list(range(10)) |
| seq_batch_discard = gluon.data.BatchSampler(seq_sampler, 3, 'discard') |
| assert sum(list(seq_batch_discard), []) == list(range(9)) |
| rand_batch_keep = gluon.data.BatchSampler(rand_sampler, 3, 'keep') |
| assert sorted(sum(list(rand_batch_keep), [])) == list(range(10)) |
| |
| @with_seed() |
| def test_datasets(): |
| assert len(gluon.data.vision.MNIST(root='data/mnist')) == 60000 |
| assert len(gluon.data.vision.MNIST(root='data/mnist', train=False)) == 10000 |
| assert len(gluon.data.vision.FashionMNIST(root='data/fashion-mnist')) == 60000 |
| assert len(gluon.data.vision.FashionMNIST(root='data/fashion-mnist', train=False)) == 10000 |
| assert len(gluon.data.vision.CIFAR10(root='data/cifar10')) == 50000 |
| assert len(gluon.data.vision.CIFAR10(root='data/cifar10', train=False)) == 10000 |
| assert len(gluon.data.vision.CIFAR100(root='data/cifar100')) == 50000 |
| assert len(gluon.data.vision.CIFAR100(root='data/cifar100', fine_label=True)) == 50000 |
| assert len(gluon.data.vision.CIFAR100(root='data/cifar100', train=False)) == 10000 |
| |
| @with_seed() |
| def test_image_folder_dataset(): |
| prepare_record() |
| dataset = gluon.data.vision.ImageFolderDataset('data/test_images') |
| assert dataset.synsets == ['test_images'] |
| assert len(dataset.items) == 16 |
| |
| @with_seed() |
| def test_list_dataset(): |
| for num_worker in range(0, 3): |
| data = mx.gluon.data.DataLoader([([1,2], 0), ([3, 4], 1)], batch_size=1, num_workers=num_worker) |
| for d, l in data: |
| pass |
| |
| |
| class Dataset(gluon.data.Dataset): |
| def __len__(self): |
| return 100 |
| def __getitem__(self, key): |
| return mx.nd.full((10,), key) |
| |
| @with_seed() |
| def test_multi_worker(): |
| data = Dataset() |
| for thread_pool in [True, False]: |
| loader = gluon.data.DataLoader(data, batch_size=1, num_workers=5, thread_pool=thread_pool) |
| for i, batch in enumerate(loader): |
| assert (batch.asnumpy() == i).all() |
| |
| class _Dummy(Dataset): |
| """Dummy dataset for randomized shape arrays.""" |
| def __init__(self, random_shape): |
| self.random_shape = random_shape |
| |
| def __getitem__(self, idx): |
| key = idx |
| if self.random_shape: |
| out = np.random.uniform(size=(random.randint(1000, 1100), 40)) |
| labels = np.random.uniform(size=(random.randint(10, 15))) |
| else: |
| out = np.random.uniform(size=(1000, 40)) |
| labels = np.random.uniform(size=(10)) |
| return key, out, labels |
| |
| def __len__(self): |
| return 50 |
| |
| def _batchify_list(data): |
| """ |
| return list of ndarray without stack/concat/pad |
| """ |
| if isinstance(data, (tuple, list)): |
| return list(data) |
| if isinstance(data, mx.nd.NDArray): |
| return [data] |
| return data |
| |
| def _batchify(data): |
| """ |
| Collate data into batch. Use shared memory for stacking. |
| :param data: a list of array, with layout of 'NTC'. |
| :return either x and x's unpadded lengths, or x, x's unpadded lengths, y and y's unpadded lengths |
| if labels are not supplied. |
| """ |
| |
| # input layout is NTC |
| keys, inputs, labels = [item[0] for item in data], [item[1] for item in data], \ |
| [item[2] for item in data] |
| |
| if len(data) > 1: |
| max_data_len = max([seq.shape[0] for seq in inputs]) |
| max_labels_len = 0 if not labels else max([seq.shape[0] for seq in labels]) |
| else: |
| max_data_len = inputs[0].shape[0] |
| max_labels_len = 0 if not labels else labels[0].shape[0] |
| |
| x_lens = [item.shape[0] for item in inputs] |
| y_lens = [item.shape[0] for item in labels] |
| |
| for i, seq in enumerate(inputs): |
| pad_len = max_data_len - seq.shape[0] |
| inputs[i] = np.pad(seq, ((0, pad_len), (0, 0)), 'constant', constant_values=0) |
| labels[i] = np.pad(labels[i], (0, max_labels_len - labels[i].shape[0]), |
| 'constant', constant_values=-1) |
| |
| inputs = np.asarray(inputs, dtype=np.float32) |
| if labels is not None: |
| labels = np.asarray(labels, dtype=np.float32) |
| inputs = inputs.transpose((1, 0, 2)) |
| labels = labels.transpose((1, 0)) |
| |
| return (nd.array(inputs, dtype=inputs.dtype, ctx=context.Context('cpu_shared', 0)), |
| nd.array(x_lens, ctx=context.Context('cpu_shared', 0))) \ |
| if labels is None else ( |
| nd.array(inputs, dtype=inputs.dtype, ctx=context.Context('cpu_shared', 0)), |
| nd.array(x_lens, ctx=context.Context('cpu_shared', 0)), |
| nd.array(labels, dtype=labels.dtype, ctx=context.Context('cpu_shared', 0)), |
| nd.array(y_lens, ctx=context.Context('cpu_shared', 0))) |
| |
| @with_seed() |
| def test_multi_worker_forked_data_loader(): |
| data = _Dummy(False) |
| loader = DataLoader(data, batch_size=40, batchify_fn=_batchify, num_workers=2) |
| for epoch in range(1): |
| for i, data in enumerate(loader): |
| pass |
| |
| data = _Dummy(True) |
| loader = DataLoader(data, batch_size=40, batchify_fn=_batchify_list, num_workers=2) |
| for epoch in range(1): |
| for i, data in enumerate(loader): |
| pass |
| |
| @with_seed() |
| def test_multi_worker_dataloader_release_pool(): |
| # will trigger too many open file if pool is not released properly |
| for _ in range(100): |
| A = np.random.rand(999, 2000) |
| D = mx.gluon.data.DataLoader(A, batch_size=8, num_workers=8) |
| the_iter = iter(D) |
| next(the_iter) |
| del the_iter |
| del D |
| |
| |
| def test_dataloader_context(): |
| X = np.random.uniform(size=(10, 20)) |
| dataset = gluon.data.ArrayDataset(X) |
| default_dev_id = 0 |
| custom_dev_id = 1 |
| |
| # use non-pinned memory |
| loader1 = gluon.data.DataLoader(dataset, 8) |
| for _, x in enumerate(loader1): |
| assert x.context == context.cpu(default_dev_id) |
| |
| # use pinned memory with default device id |
| loader2 = gluon.data.DataLoader(dataset, 8, pin_memory=True) |
| for _, x in enumerate(loader2): |
| assert x.context == context.cpu_pinned(default_dev_id) |
| |
| # use pinned memory with custom device id |
| loader3 = gluon.data.DataLoader(dataset, 8, pin_memory=True, |
| pin_device_id=custom_dev_id) |
| for _, x in enumerate(loader3): |
| assert x.context == context.cpu_pinned(custom_dev_id) |
| |
| |
| if __name__ == '__main__': |
| import nose |
| nose.runmodule() |