blob: bbc689b234be6d0c1a57d108e84dc06144086a40 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import mxnet as mx
import numpy as np
import scipy.ndimage
from mxnet.test_utils import *
from common import xfail_when_nonstandard_decimal_separator
import shutil
import tempfile
import unittest
import pytest
mx.npx.reset_np()
def _get_data(url, dirname):
import os, tarfile
download(url, dirname=dirname, overwrite=False)
fname = os.path.join(dirname, url.split('/')[-1])
tar = tarfile.open(fname)
source_images = [os.path.join(dirname, x.name) for x in tar.getmembers() if x.isfile()]
if len(source_images) < 1 or not os.path.isfile(source_images[0]):
# skip extracting if exists
tar.extractall(path=dirname)
tar.close()
return source_images
def _generate_objects():
num = np.random.randint(1, 10)
xy = np.random.rand(num, 2)
wh = np.random.rand(num, 2) / 2
left = (xy[:, 0] - wh[:, 0])[:, np.newaxis]
right = (xy[:, 0] + wh[:, 0])[:, np.newaxis]
top = (xy[:, 1] - wh[:, 1])[:, np.newaxis]
bot = (xy[:, 1] + wh[:, 1])[:, np.newaxis]
boxes = np.maximum(0., np.minimum(1., np.hstack((left, top, right, bot))))
cid = np.random.randint(0, 20, size=num)
label = np.hstack((cid[:, np.newaxis], boxes)).ravel().tolist()
return [2, 5] + label
def _test_imageiter_last_batch(imageiter_list, assert_data_shape):
test_iter = imageiter_list[0]
# test batch data shape
for _ in range(3):
for batch in test_iter:
assert batch.data[0].shape == assert_data_shape
test_iter.reset()
# test last batch handle(discard)
test_iter = imageiter_list[1]
i = 0
for _ in test_iter:
i += 1
assert i == 5
# test last_batch_handle(pad)
test_iter = imageiter_list[2]
i = 0
for batch in test_iter:
if i == 0:
first_three_data = batch.data[0][:2]
if i == 5:
last_three_data = batch.data[0][1:]
i += 1
assert i == 6
assert np.array_equal(first_three_data.asnumpy(), last_three_data.asnumpy())
# test last_batch_handle(roll_over)
test_iter = imageiter_list[3]
i = 0
for batch in test_iter:
if i == 0:
first_image = batch.data[0][0]
i += 1
assert i == 5
test_iter.reset()
first_batch_roll_over = test_iter.next()
assert np.array_equal(
first_batch_roll_over.data[0][1].asnumpy(), first_image.asnumpy())
assert first_batch_roll_over.pad == 2
# test iteratopr work properly after calling reset several times when last_batch_handle is roll_over
for _ in test_iter:
pass
test_iter.reset()
first_batch_roll_over_twice = test_iter.next()
assert np.array_equal(
first_batch_roll_over_twice.data[0][2].asnumpy(), first_image.asnumpy())
assert first_batch_roll_over_twice.pad == 1
# we've called next once
i = 1
for _ in test_iter:
i += 1
# test the third epoch with size 6
assert i == 6
# test shuffle option for sanity test
test_iter = imageiter_list[4]
for _ in test_iter:
pass
class TestImage(unittest.TestCase):
IMAGES_URL = "https://repo.mxnet.io/gluon/dataset/test/test_images-9cebe48a.tar.gz"
def setUp(self):
self.IMAGES_DIR = tempfile.mkdtemp()
self.IMAGES = _get_data(self.IMAGES_URL, self.IMAGES_DIR)
print("Loaded {} images".format(len(self.IMAGES)))
def tearDown(self):
if self.IMAGES_DIR:
print("cleanup {}".format(self.IMAGES_DIR))
shutil.rmtree(self.IMAGES_DIR)
def test_imread_not_found(self):
with pytest.raises(mx.base.MXNetError):
x = mx.img.image.imread("/139810923jadjsajlskd.___adskj/blah.jpg")
def test_imread_vs_imdecode(self):
for img in self.IMAGES:
with open(img, 'rb') as fp:
str_image = fp.read()
image = mx.image.imdecode(str_image, to_rgb=0)
image_read = mx.img.image.imread(img)
same(image.asnumpy(), image_read.asnumpy())
def test_imdecode(self):
try:
import cv2
except ImportError:
raise unittest.SkipTest("Unable to import cv2.")
for img in self.IMAGES:
with open(img, 'rb') as fp:
str_image = fp.read()
image = mx.image.imdecode(str_image, to_rgb=0)
cv_image = cv2.imread(img)
assert_almost_equal(image.asnumpy(), cv_image)
def test_imdecode_bytearray(self):
try:
import cv2
except ImportError:
return
for img in self.IMAGES:
with open(img, 'rb') as fp:
str_image = bytearray(fp.read())
image = mx.image.imdecode(str_image, to_rgb=0)
cv_image = cv2.imread(img)
assert_almost_equal(image.asnumpy(), cv_image)
def test_imdecode_empty_buffer(self):
with pytest.raises(mx.base.MXNetError):
mx.image.imdecode(b'', to_rgb=0)
def test_imdecode_invalid_image(self):
with pytest.raises(mx.base.MXNetError):
image = mx.image.imdecode(b'clearly not image content')
def test_scale_down(self):
assert mx.image.scale_down((640, 480), (720, 120)) == (640, 106)
assert mx.image.scale_down((360, 1000), (480, 500)) == (360, 375)
assert mx.image.scale_down((300, 400), (0, 0)) == (0, 0)
def test_resize_short(self):
try:
import cv2
except ImportError:
raise unittest.SkipTest("Unable to import cv2")
for img in self.IMAGES:
cv_img = cv2.imread(img)
mx_img = mx.nd.array(cv_img[:, :, (2, 1, 0)])
h, w, _ = cv_img.shape
for _ in range(3):
new_size = np.random.randint(1, 1000)
if h > w:
new_h, new_w = new_size * h // w, new_size
else:
new_h, new_w = new_size, new_size * w // h
for interp in range(0, 2):
# area-based/lanczos don't match with cv2?
cv_resized = cv2.resize(cv_img, (new_w, new_h), interpolation=interp)
mx_resized = mx.image.resize_short(mx_img, new_size, interp)
assert_almost_equal(mx_resized.asnumpy()[:, :, (2, 1, 0)], cv_resized, atol=3)
def test_imresize(self):
try:
import cv2
except ImportError:
raise unittest.SkipTest("Unable to import cv2")
for img in self.IMAGES:
cv_img = cv2.imread(img)
mx_img = mx.nd.array(cv_img[:, :, (2, 1, 0)])
new_h = np.random.randint(1, 1000)
new_w = np.random.randint(1, 1000)
for interp_val in range(0, 2):
cv_resized = cv2.resize(cv_img, (new_w, new_h), interpolation=interp_val)
mx_resized = mx.image.imresize(mx_img, new_w, new_h, interp=interp_val)
assert_almost_equal(mx_resized.asnumpy()[:, :, (2, 1, 0)], cv_resized, atol=3)
out_img = mx.nd.zeros((new_h, new_w, 3), dtype=mx_img.dtype)
mx.image.imresize(mx_img, new_w, new_h, interp=interp_val, out=out_img)
assert_almost_equal(out_img.asnumpy()[:, :, (2, 1, 0)], cv_resized, atol=3)
def test_color_normalize(self):
for _ in range(10):
mean = np.random.rand(3) * 255
std = np.random.rand(3) + 1
width = np.random.randint(100, 500)
height = np.random.randint(100, 500)
src = np.random.rand(height, width, 3) * 255.
mx_result = mx.image.color_normalize(mx.nd.array(src),
mx.nd.array(mean), mx.nd.array(std))
assert_almost_equal(mx_result.asnumpy(), (src - mean) / std, atol=1e-3)
def test_imageiter(self):
print(self.IMAGES)
im_list = [[np.random.randint(0, 5), x] for x in self.IMAGES]
fname = os.path.join(self.IMAGES_DIR, 'test_imageiter.lst')
file_list = ['\t'.join([str(k), str(np.random.randint(0, 5)), x])
for k, x in enumerate(self.IMAGES)]
with open(fname, 'w') as f:
for line in file_list:
f.write(line + '\n')
test_list = ['imglist', 'path_imglist']
for dtype in ['int32', 'float32', 'int64', 'float64']:
for test in test_list:
imglist = im_list if test == 'imglist' else None
path_imglist = fname if test == 'path_imglist' else None
imageiter_list = [
mx.image.ImageIter(2, (3, 224, 224), label_width=1, imglist=imglist,
path_imglist=path_imglist, path_root=self.IMAGES_DIR, dtype=dtype),
mx.image.ImageIter(3, (3, 224, 224), label_width=1, imglist=imglist,
path_imglist=path_imglist, path_root=self.IMAGES_DIR, dtype=dtype, last_batch_handle='discard'),
mx.image.ImageIter(3, (3, 224, 224), label_width=1, imglist=imglist,
path_imglist=path_imglist, path_root=self.IMAGES_DIR, dtype=dtype, last_batch_handle='pad'),
mx.image.ImageIter(3, (3, 224, 224), label_width=1, imglist=imglist,
path_imglist=path_imglist, path_root=self.IMAGES_DIR, dtype=dtype, last_batch_handle='roll_over'),
mx.image.ImageIter(3, (3, 224, 224), label_width=1, imglist=imglist, shuffle=True,
path_imglist=path_imglist, path_root=self.IMAGES_DIR, dtype=dtype, last_batch_handle='pad')
]
_test_imageiter_last_batch(imageiter_list, (2, 3, 224, 224))
def test_copyMakeBorder(self):
try:
import cv2
except ImportError:
raise unittest.SkipTest("Unable to import cv2")
for img in self.IMAGES:
cv_img = cv2.imread(img)
mx_img = mx.nd.array(cv_img)
top = np.random.randint(1, 10)
bot = np.random.randint(1, 10)
left = np.random.randint(1, 10)
right = np.random.randint(1, 10)
new_h, new_w, _ = mx_img.shape
new_h += top + bot
new_w += left + right
val = [np.random.randint(1, 255)] * 3
for type_val in range(0, 5):
cv_border = cv2.copyMakeBorder(cv_img, top, bot, left, right, borderType=type_val, value=val)
mx_border = mx.image.copyMakeBorder(mx_img, top, bot, left, right, type=type_val, values=val)
assert_almost_equal(mx_border.asnumpy(), cv_border)
out_img = mx.nd.zeros((new_h , new_w, 3), dtype=mx_img.dtype)
mx.image.copyMakeBorder(mx_img, top, bot, left, right, type=type_val, values=val, out=out_img)
assert_almost_equal(out_img.asnumpy(), cv_border)
def test_augmenters(self):
# ColorNormalizeAug
mean = np.random.rand(3) * 255
std = np.random.rand(3) + 1
width = np.random.randint(100, 500)
height = np.random.randint(100, 500)
src = np.random.rand(height, width, 3) * 255.
# We test numpy and mxnet NDArray inputs
color_norm_aug = mx.image.ColorNormalizeAug(mean=mx.nd.array(mean), std=std)
out_image = color_norm_aug(mx.nd.array(src))
assert_almost_equal(out_image.asnumpy(), (src - mean) / std, atol=1e-3)
# only test if all augmenters will work
# TODO(Joshua Zhang): verify the augmenter outputs
im_list = [[0, x] for x in self.IMAGES]
test_iter = mx.image.ImageIter(2, (3, 224, 224), label_width=1, imglist=im_list,
resize=640, rand_crop=True, rand_resize=True, rand_mirror=True, mean=True,
std=np.array([1.1, 1.03, 1.05]), brightness=0.1, contrast=0.1, saturation=0.1,
hue=0.1, pca_noise=0.1, rand_gray=0.2, inter_method=10, path_root=self.IMAGES_DIR, shuffle=True)
for _ in test_iter:
pass
def test_image_detiter(self):
im_list = [_generate_objects() + [x] for x in self.IMAGES]
det_iter = mx.image.ImageDetIter(2, (3, 300, 300), imglist=im_list, path_root=self.IMAGES_DIR)
for _ in range(3):
for _ in det_iter:
pass
det_iter.reset()
val_iter = mx.image.ImageDetIter(2, (3, 300, 300), imglist=im_list, path_root=self.IMAGES_DIR)
det_iter = val_iter.sync_label_shape(det_iter)
assert det_iter.data_shape == val_iter.data_shape
assert det_iter.label_shape == val_iter.label_shape
# test batch_size is not divisible by number of images
det_iter = mx.image.ImageDetIter(4, (3, 300, 300), imglist=im_list, path_root=self.IMAGES_DIR)
for _ in det_iter:
pass
# test file list with last batch handle
fname = os.path.join(self.IMAGES_DIR, 'test_imagedetiter.lst')
im_list = [[k] + _generate_objects() + [x] for k, x in enumerate(self.IMAGES)]
with open(fname, 'w') as f:
for line in im_list:
line = '\t'.join([str(k) for k in line])
f.write(line + '\n')
imageiter_list = [
mx.image.ImageDetIter(2, (3, 400, 400),
path_imglist=fname, path_root=self.IMAGES_DIR),
mx.image.ImageDetIter(3, (3, 400, 400),
path_imglist=fname, path_root=self.IMAGES_DIR, last_batch_handle='discard'),
mx.image.ImageDetIter(3, (3, 400, 400),
path_imglist=fname, path_root=self.IMAGES_DIR, last_batch_handle='pad'),
mx.image.ImageDetIter(3, (3, 400, 400),
path_imglist=fname, path_root=self.IMAGES_DIR, last_batch_handle='roll_over'),
mx.image.ImageDetIter(3, (3, 400, 400), shuffle=True,
path_imglist=fname, path_root=self.IMAGES_DIR, last_batch_handle='pad')
]
_test_imageiter_last_batch(imageiter_list, (2, 3, 400, 400))
def test_det_augmenters(self):
# only test if all augmenters will work
# TODO(Joshua Zhang): verify the augmenter outputs
im_list = [_generate_objects() + [x] for x in self.IMAGES]
det_iter = mx.image.ImageDetIter(2, (3, 300, 300), imglist=im_list, path_root=self.IMAGES_DIR,
resize=640, rand_crop=1, rand_pad=1, rand_gray=0.1, rand_mirror=True, mean=True,
std=np.array([1.1, 1.03, 1.05]), brightness=0.1, contrast=0.1, saturation=0.1,
pca_noise=0.1, hue=0.1, inter_method=10, min_object_covered=0.5,
aspect_ratio_range=(0.2, 5), area_range=(0.1, 4.0), min_eject_coverage=0.5,
max_attempts=50)
for _ in det_iter:
pass
def test_random_size_crop(self):
# test aspect ratio within bounds
width = np.random.randint(100, 500)
height = np.random.randint(100, 500)
src = np.random.rand(height, width, 3) * 255.
ratio = (0.75, 1)
epsilon = 0.05
out, (x0, y0, new_w, new_h) = mx.image.random_size_crop(mx.nd.array(src), size=(width, height), area=0.08, ratio=ratio)
_, pts = mx.image.center_crop(mx.nd.array(src), size=(width, height))
if (x0, y0, new_w, new_h) != pts:
assert ratio[0] - epsilon <= float(new_w)/new_h <= ratio[1] + epsilon, \
'ration of new width and height out of the bound{}/{}={}'.format(new_w, new_h, float(new_w)/new_h)
@xfail_when_nonstandard_decimal_separator
def test_imrotate(self):
# test correctness
xlin = np.expand_dims(np.linspace(0, 0.5, 30), axis=1)
ylin = np.expand_dims(np.linspace(0, 0.5, 60), axis=0)
np_img = np.expand_dims(xlin + ylin, axis=2)
# rotate with imrotate
nd_img = mx.nd.array(np_img.transpose((2, 0, 1))) # convert to CHW
rot_angle = 6
args = {'src': nd_img, 'rotation_degrees': rot_angle, 'zoom_in': False, 'zoom_out': False}
nd_rot = mx.image.imrotate(**args)
npnd_rot = nd_rot.asnumpy().transpose((1, 2, 0))
# rotate with scipy
scipy_rot = scipy.ndimage.rotate(np_img, rot_angle, axes=(1, 0), reshape=False,
order=1, mode='constant', prefilter=False)
# cannot compare the edges (where image ends) because of different behavior
assert_almost_equal(scipy_rot[10:20, 20:40, :], npnd_rot[10:20, 20:40, :])
# test if execution raises exceptions in any allowed mode
# batch mode
img_in = mx.nd.random.uniform(0, 1, (5, 3, 30, 60), dtype=np.float32)
nd_rots = mx.nd.array([1, 2, 3, 4, 5], dtype=np.float32)
args = {'src': img_in, 'rotation_degrees': nd_rots, 'zoom_in': False, 'zoom_out': False}
_ = mx.image.imrotate(**args)
args = {'src': img_in, 'rotation_degrees': nd_rots, 'zoom_in': False, 'zoom_out': True}
_ = mx.image.imrotate(**args)
args = {'src': img_in, 'rotation_degrees': nd_rots, 'zoom_in': True, 'zoom_out': False}
_ = mx.image.imrotate(**args)
# single image mode
nd_rots = 11
img_in = mx.nd.random.uniform(0, 1, (3, 30, 60), dtype=np.float32)
args = {'src': img_in, 'rotation_degrees': nd_rots, 'zoom_in': False, 'zoom_out': False}
_ = mx.image.imrotate(**args)
args = {'src': img_in, 'rotation_degrees': nd_rots, 'zoom_in': False, 'zoom_out': True}
_ = mx.image.imrotate(**args)
args = {'src': img_in, 'rotation_degrees': nd_rots, 'zoom_in': True, 'zoom_out': False}
_ = mx.image.imrotate(**args)
# test if exceptions are correctly raised
# batch exception - zoom_in=zoom_out=True
img_in = mx.nd.random.uniform(0, 1, (5, 3, 30, 60), dtype=np.float32)
nd_rots = mx.nd.array([1, 2, 3, 4, 5], dtype=np.float32)
args={'src': img_in, 'rotation_degrees': nd_rots, 'zoom_in': True, 'zoom_out': True}
with pytest.raises(ValueError):
mx.image.imrotate(**args)
# single image exception - zoom_in=zoom_out=True
img_in = mx.nd.random.uniform(0, 1, (3, 30, 60), dtype=np.float32)
nd_rots = 11
args = {'src': img_in, 'rotation_degrees': nd_rots, 'zoom_in': True, 'zoom_out': True}
with pytest.raises(ValueError):
mx.image.imrotate(**args)
# batch of images with scalar rotation
img_in = mx.nd.stack(nd_img, nd_img, nd_img)
nd_rots = 6
args = {'src': img_in, 'rotation_degrees': nd_rots, 'zoom_in': False, 'zoom_out': False}
out = mx.image.imrotate(**args)
for img in out:
img = img.asnumpy().transpose((1, 2, 0))
assert_almost_equal(scipy_rot[10:20, 20:40, :], img[10:20, 20:40, :])
# single image exception - single image with vector rotation
img_in = mx.nd.random.uniform(0, 1, (3, 30, 60), dtype=np.float32)
nd_rots = mx.nd.array([1, 2, 3, 4, 5], dtype=np.float32)
args = {'src': img_in, 'rotation_degrees': nd_rots, 'zoom_in': False, 'zoom_out': False}
with pytest.raises(TypeError):
mx.image.imrotate(**args)
def test_random_rotate(self):
angle_limits = [-5., 5.]
src_single_image = mx.nd.random.uniform(0, 1, (3, 30, 60),
dtype=np.float32)
out_single_image = mx.image.random_rotate(src_single_image,
angle_limits)
self.assertEqual(out_single_image.shape, (3, 30, 60))
src_batch_image = mx.nd.stack(src_single_image,
src_single_image,
src_single_image)
out_batch_image = mx.image.random_rotate(src_batch_image,
angle_limits)
self.assertEqual(out_batch_image.shape, (3, 3, 30, 60))