# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import os
import mxnet as mx
import numpy as np
import scipy.ndimage
from mxnet.test_utils import *
from common import xfail_when_nonstandard_decimal_separator
import shutil
import tempfile
import unittest
import pytest

mx.npx.reset_np()

def _get_data(url, dirname):
    import os, tarfile
    download(url, dirname=dirname, overwrite=False)
    fname = os.path.join(dirname, url.split('/')[-1])
    tar = tarfile.open(fname)
    source_images = [os.path.join(dirname, x.name) for x in tar.getmembers() if x.isfile()]
    if len(source_images) < 1 or not os.path.isfile(source_images[0]):
        # skip extracting if exists
        tar.extractall(path=dirname)
    tar.close()
    return source_images

def _generate_objects():
    num = np.random.randint(1, 10)
    xy = np.random.rand(num, 2)
    wh = np.random.rand(num, 2) / 2
    left = (xy[:, 0] - wh[:, 0])[:, np.newaxis]
    right = (xy[:, 0] + wh[:, 0])[:, np.newaxis]
    top = (xy[:, 1] - wh[:, 1])[:, np.newaxis]
    bot = (xy[:, 1] + wh[:, 1])[:, np.newaxis]
    boxes = np.maximum(0., np.minimum(1., np.hstack((left, top, right, bot))))
    cid = np.random.randint(0, 20, size=num)
    label = np.hstack((cid[:, np.newaxis], boxes)).ravel().tolist()
    return [2, 5] + label

def _test_imageiter_last_batch(imageiter_list, assert_data_shape):
    test_iter = imageiter_list[0]
    # test batch data shape
    for _ in range(3):
        for batch in test_iter:
            assert batch.data[0].shape == assert_data_shape
        test_iter.reset()
    # test last batch handle(discard)
    test_iter = imageiter_list[1]
    i = 0
    for _ in test_iter:
        i += 1
    assert i == 5
    # test last_batch_handle(pad)
    test_iter = imageiter_list[2]
    i = 0
    for batch in test_iter:
        if i == 0:
            first_three_data = batch.data[0][:2]
        if i == 5:
            last_three_data = batch.data[0][1:]
        i += 1
    assert i == 6
    assert np.array_equal(first_three_data.asnumpy(), last_three_data.asnumpy())
    # test last_batch_handle(roll_over)
    test_iter = imageiter_list[3]
    i = 0
    for batch in test_iter:
        if i == 0:
            first_image = batch.data[0][0]
        i += 1
    assert i == 5
    test_iter.reset()
    first_batch_roll_over = test_iter.next()
    assert np.array_equal(
        first_batch_roll_over.data[0][1].asnumpy(), first_image.asnumpy())
    assert first_batch_roll_over.pad == 2
    # test iteratopr work properly after calling reset several times when last_batch_handle is roll_over
    for _ in test_iter:
        pass
    test_iter.reset()
    first_batch_roll_over_twice = test_iter.next()
    assert np.array_equal(
        first_batch_roll_over_twice.data[0][2].asnumpy(), first_image.asnumpy())
    assert first_batch_roll_over_twice.pad == 1
    # we've called next once
    i = 1
    for _ in test_iter:
        i += 1
    # test the third epoch with size 6
    assert i == 6
    # test shuffle option for sanity test
    test_iter = imageiter_list[4]
    for _ in test_iter:
        pass


class TestImage(unittest.TestCase):
    IMAGES_URL = "https://repo.mxnet.io/gluon/dataset/test/test_images-9cebe48a.tar.gz"

    def setUp(self):
        self.IMAGES_DIR = tempfile.mkdtemp()
        self.IMAGES = _get_data(self.IMAGES_URL, self.IMAGES_DIR)
        print("Loaded {} images".format(len(self.IMAGES)))

    def tearDown(self):
        if self.IMAGES_DIR:
            print("cleanup {}".format(self.IMAGES_DIR))
            shutil.rmtree(self.IMAGES_DIR)

    def test_imread_not_found(self):
        with pytest.raises(mx.base.MXNetError):
            x = mx.img.image.imread("/139810923jadjsajlskd.___adskj/blah.jpg")

    def test_imread_vs_imdecode(self):
        for img in self.IMAGES:
            with open(img, 'rb') as fp:
                str_image = fp.read()
                image = mx.image.imdecode(str_image, to_rgb=0)
                image_read = mx.img.image.imread(img)
                same(image.asnumpy(), image_read.asnumpy())

    def test_imdecode(self):
        try:
            import cv2
        except ImportError:
            raise unittest.SkipTest("Unable to import cv2.")
        for img in self.IMAGES:
            with open(img, 'rb') as fp:
                str_image = fp.read()
                image = mx.image.imdecode(str_image, to_rgb=0)
            cv_image = cv2.imread(img)
            assert_almost_equal(image.asnumpy(), cv_image)

    def test_imdecode_bytearray(self):
        try:
            import cv2
        except ImportError:
            return
        for img in self.IMAGES:
            with open(img, 'rb') as fp:
                str_image = bytearray(fp.read())
                image = mx.image.imdecode(str_image, to_rgb=0)
            cv_image = cv2.imread(img)
            assert_almost_equal(image.asnumpy(), cv_image)

    def test_imdecode_empty_buffer(self):
        with pytest.raises(mx.base.MXNetError):
            mx.image.imdecode(b'', to_rgb=0)

    def test_imdecode_invalid_image(self):
        with pytest.raises(mx.base.MXNetError):
            image = mx.image.imdecode(b'clearly not image content')

    def test_scale_down(self):
        assert mx.image.scale_down((640, 480), (720, 120)) == (640, 106)
        assert mx.image.scale_down((360, 1000), (480, 500)) == (360, 375)
        assert mx.image.scale_down((300, 400), (0, 0)) == (0, 0)

    def test_resize_short(self):
        try:
            import cv2
        except ImportError:
            raise unittest.SkipTest("Unable to import cv2")
        for img in self.IMAGES:
            cv_img = cv2.imread(img)
            mx_img = mx.nd.array(cv_img[:, :, (2, 1, 0)])
            h, w, _ = cv_img.shape
            for _ in range(3):
                new_size = np.random.randint(1, 1000)
                if h > w:
                    new_h, new_w = new_size * h // w, new_size
                else:
                    new_h, new_w = new_size, new_size * w // h
                for interp in range(0, 2):
                    # area-based/lanczos don't match with cv2?
                    cv_resized = cv2.resize(cv_img, (new_w, new_h), interpolation=interp)
                    mx_resized = mx.image.resize_short(mx_img, new_size, interp)
                    assert_almost_equal(mx_resized.asnumpy()[:, :, (2, 1, 0)], cv_resized, atol=3)

    def test_imresize(self):
        try:
            import cv2
        except ImportError:
            raise unittest.SkipTest("Unable to import cv2")
        for img in self.IMAGES:
            cv_img = cv2.imread(img)
            mx_img = mx.nd.array(cv_img[:, :, (2, 1, 0)])
            new_h = np.random.randint(1, 1000)
            new_w = np.random.randint(1, 1000)
            for interp_val in range(0, 2):
                cv_resized = cv2.resize(cv_img, (new_w, new_h), interpolation=interp_val)
                mx_resized = mx.image.imresize(mx_img, new_w, new_h, interp=interp_val)
                assert_almost_equal(mx_resized.asnumpy()[:, :, (2, 1, 0)], cv_resized, atol=3)
                out_img = mx.nd.zeros((new_h, new_w, 3), dtype=mx_img.dtype)
                mx.image.imresize(mx_img, new_w, new_h, interp=interp_val, out=out_img)
                assert_almost_equal(out_img.asnumpy()[:, :, (2, 1, 0)], cv_resized, atol=3)

    def test_color_normalize(self):
        for _ in range(10):
            mean = np.random.rand(3) * 255
            std = np.random.rand(3) + 1
            width = np.random.randint(100, 500)
            height = np.random.randint(100, 500)
            src = np.random.rand(height, width, 3) * 255.
            mx_result = mx.image.color_normalize(mx.nd.array(src),
                mx.nd.array(mean), mx.nd.array(std))
            assert_almost_equal(mx_result.asnumpy(), (src - mean) / std, atol=1e-3)

    def test_imageiter(self):
        print(self.IMAGES)
        im_list = [[np.random.randint(0, 5), x] for x in self.IMAGES]
        fname = os.path.join(self.IMAGES_DIR, 'test_imageiter.lst')
        file_list = ['\t'.join([str(k), str(np.random.randint(0, 5)), x])
                        for k, x in enumerate(self.IMAGES)]
        with open(fname, 'w') as f:
            for line in file_list:
                f.write(line + '\n')

        test_list = ['imglist', 'path_imglist']
        for dtype in ['int32', 'float32', 'int64', 'float64']:
            for test in test_list:
                imglist = im_list if test == 'imglist' else None
                path_imglist = fname if test == 'path_imglist' else None
                imageiter_list = [
                    mx.image.ImageIter(2, (3, 224, 224), label_width=1, imglist=imglist,
                        path_imglist=path_imglist, path_root=self.IMAGES_DIR, dtype=dtype),
                    mx.image.ImageIter(3, (3, 224, 224), label_width=1, imglist=imglist,
                        path_imglist=path_imglist, path_root=self.IMAGES_DIR, dtype=dtype, last_batch_handle='discard'),
                    mx.image.ImageIter(3, (3, 224, 224), label_width=1, imglist=imglist,
                        path_imglist=path_imglist, path_root=self.IMAGES_DIR, dtype=dtype, last_batch_handle='pad'),
                    mx.image.ImageIter(3, (3, 224, 224), label_width=1, imglist=imglist,
                        path_imglist=path_imglist, path_root=self.IMAGES_DIR, dtype=dtype, last_batch_handle='roll_over'),
                    mx.image.ImageIter(3, (3, 224, 224), label_width=1, imglist=imglist, shuffle=True,
                        path_imglist=path_imglist, path_root=self.IMAGES_DIR, dtype=dtype, last_batch_handle='pad')
                ]
                _test_imageiter_last_batch(imageiter_list, (2, 3, 224, 224))

    def test_copyMakeBorder(self):
        try:
            import cv2
        except ImportError:
            raise unittest.SkipTest("Unable to import cv2")
        for img in self.IMAGES:
            cv_img = cv2.imread(img)
            mx_img = mx.nd.array(cv_img)
            top = np.random.randint(1, 10)
            bot = np.random.randint(1, 10)
            left = np.random.randint(1, 10)
            right = np.random.randint(1, 10)
            new_h, new_w, _ = mx_img.shape
            new_h += top + bot
            new_w += left + right
            val = [np.random.randint(1, 255)] * 3
            for type_val in range(0, 5):
                cv_border = cv2.copyMakeBorder(cv_img, top, bot, left, right, borderType=type_val, value=val)
                mx_border = mx.image.copyMakeBorder(mx_img, top, bot, left, right, type=type_val, values=val)
                assert_almost_equal(mx_border.asnumpy(), cv_border)
                out_img = mx.nd.zeros((new_h , new_w, 3), dtype=mx_img.dtype)
                mx.image.copyMakeBorder(mx_img, top, bot, left, right, type=type_val, values=val, out=out_img)
                assert_almost_equal(out_img.asnumpy(), cv_border)

    def test_augmenters(self):
        # ColorNormalizeAug
        mean = np.random.rand(3) * 255
        std = np.random.rand(3) + 1
        width = np.random.randint(100, 500)
        height = np.random.randint(100, 500)
        src = np.random.rand(height, width, 3) * 255.
        # We test numpy and mxnet NDArray inputs
        color_norm_aug = mx.image.ColorNormalizeAug(mean=mx.nd.array(mean), std=std)
        out_image = color_norm_aug(mx.nd.array(src))
        assert_almost_equal(out_image.asnumpy(), (src - mean) / std, atol=1e-3)

        # only test if all augmenters will work
        # TODO(Joshua Zhang): verify the augmenter outputs
        im_list = [[0, x] for x in self.IMAGES]
        test_iter = mx.image.ImageIter(2, (3, 224, 224), label_width=1, imglist=im_list,
            resize=640, rand_crop=True, rand_resize=True, rand_mirror=True, mean=True,
            std=np.array([1.1, 1.03, 1.05]), brightness=0.1, contrast=0.1, saturation=0.1,
            hue=0.1, pca_noise=0.1, rand_gray=0.2, inter_method=10, path_root=self.IMAGES_DIR, shuffle=True)
        for _ in test_iter:
            pass

    def test_image_detiter(self):
        im_list = [_generate_objects() + [x] for x in self.IMAGES]
        det_iter = mx.image.ImageDetIter(2, (3, 300, 300), imglist=im_list, path_root=self.IMAGES_DIR)
        for _ in range(3):
            for _ in det_iter:
                pass
        det_iter.reset()
        val_iter = mx.image.ImageDetIter(2, (3, 300, 300), imglist=im_list, path_root=self.IMAGES_DIR)
        det_iter = val_iter.sync_label_shape(det_iter)
        assert det_iter.data_shape == val_iter.data_shape
        assert det_iter.label_shape == val_iter.label_shape

        # test batch_size is not divisible by number of images
        det_iter = mx.image.ImageDetIter(4, (3, 300, 300), imglist=im_list, path_root=self.IMAGES_DIR)
        for _ in det_iter:
            pass

        # test file list with last batch handle
        fname = os.path.join(self.IMAGES_DIR, 'test_imagedetiter.lst')
        im_list = [[k] + _generate_objects() + [x] for k, x in enumerate(self.IMAGES)]
        with open(fname, 'w') as f:
            for line in im_list:
                line = '\t'.join([str(k) for k in line])
                f.write(line + '\n')

        imageiter_list = [
            mx.image.ImageDetIter(2, (3, 400, 400),
                path_imglist=fname, path_root=self.IMAGES_DIR),
            mx.image.ImageDetIter(3, (3, 400, 400),
                path_imglist=fname, path_root=self.IMAGES_DIR, last_batch_handle='discard'),
            mx.image.ImageDetIter(3, (3, 400, 400),
                path_imglist=fname, path_root=self.IMAGES_DIR, last_batch_handle='pad'),
            mx.image.ImageDetIter(3, (3, 400, 400),
                path_imglist=fname, path_root=self.IMAGES_DIR, last_batch_handle='roll_over'),
            mx.image.ImageDetIter(3, (3, 400, 400), shuffle=True,
                path_imglist=fname, path_root=self.IMAGES_DIR, last_batch_handle='pad')
        ]
        _test_imageiter_last_batch(imageiter_list, (2, 3, 400, 400))

    def test_det_augmenters(self):
        # only test if all augmenters will work
        # TODO(Joshua Zhang): verify the augmenter outputs
        im_list = [_generate_objects() + [x] for x in self.IMAGES]
        det_iter = mx.image.ImageDetIter(2, (3, 300, 300), imglist=im_list, path_root=self.IMAGES_DIR,
            resize=640, rand_crop=1, rand_pad=1, rand_gray=0.1, rand_mirror=True, mean=True,
            std=np.array([1.1, 1.03, 1.05]), brightness=0.1, contrast=0.1, saturation=0.1,
            pca_noise=0.1, hue=0.1, inter_method=10, min_object_covered=0.5,
            aspect_ratio_range=(0.2, 5), area_range=(0.1, 4.0), min_eject_coverage=0.5,
            max_attempts=50)
        for _ in det_iter:
            pass

    def test_random_size_crop(self):
        # test aspect ratio within bounds
        width = np.random.randint(100, 500)
        height = np.random.randint(100, 500)
        src = np.random.rand(height, width, 3) * 255.
        ratio = (0.75, 1)
        epsilon = 0.05
        out, (x0, y0, new_w, new_h) = mx.image.random_size_crop(mx.nd.array(src), size=(width, height), area=0.08, ratio=ratio)
        _, pts = mx.image.center_crop(mx.nd.array(src), size=(width, height))
        if (x0, y0, new_w, new_h) != pts:
            assert ratio[0] - epsilon <= float(new_w)/new_h <= ratio[1] + epsilon, \
            'ration of new width and height out of the bound{}/{}={}'.format(new_w, new_h, float(new_w)/new_h)

    @xfail_when_nonstandard_decimal_separator
    def test_imrotate(self):
        # test correctness
        xlin = np.expand_dims(np.linspace(0, 0.5, 30), axis=1)
        ylin = np.expand_dims(np.linspace(0, 0.5, 60), axis=0)
        np_img = np.expand_dims(xlin + ylin, axis=2)
        # rotate with imrotate
        nd_img = mx.nd.array(np_img.transpose((2, 0, 1)))  # convert to CHW
        rot_angle = 6
        args = {'src': nd_img, 'rotation_degrees': rot_angle, 'zoom_in': False, 'zoom_out': False}
        nd_rot = mx.image.imrotate(**args)
        npnd_rot = nd_rot.asnumpy().transpose((1, 2, 0))
        # rotate with scipy
        scipy_rot = scipy.ndimage.rotate(np_img, rot_angle, axes=(1, 0), reshape=False,
                                         order=1, mode='constant', prefilter=False)
        # cannot compare the edges (where image ends) because of different behavior
        assert_almost_equal(scipy_rot[10:20, 20:40, :], npnd_rot[10:20, 20:40, :])

        # test if execution raises exceptions in any allowed mode
        # batch mode
        img_in = mx.nd.random.uniform(0, 1, (5, 3, 30, 60), dtype=np.float32)
        nd_rots = mx.nd.array([1, 2, 3, 4, 5], dtype=np.float32)
        args = {'src': img_in, 'rotation_degrees': nd_rots, 'zoom_in': False, 'zoom_out': False}
        _ = mx.image.imrotate(**args)
        args = {'src': img_in, 'rotation_degrees': nd_rots, 'zoom_in': False, 'zoom_out': True}
        _ = mx.image.imrotate(**args)
        args = {'src': img_in, 'rotation_degrees': nd_rots, 'zoom_in': True, 'zoom_out': False}
        _ = mx.image.imrotate(**args)
        # single image mode
        nd_rots = 11
        img_in = mx.nd.random.uniform(0, 1, (3, 30, 60), dtype=np.float32)
        args = {'src': img_in, 'rotation_degrees': nd_rots, 'zoom_in': False, 'zoom_out': False}
        _ = mx.image.imrotate(**args)
        args = {'src': img_in, 'rotation_degrees': nd_rots, 'zoom_in': False, 'zoom_out': True}
        _ = mx.image.imrotate(**args)
        args = {'src': img_in, 'rotation_degrees': nd_rots, 'zoom_in': True, 'zoom_out': False}
        _ = mx.image.imrotate(**args)

        # test if exceptions are correctly raised
        # batch exception - zoom_in=zoom_out=True
        img_in = mx.nd.random.uniform(0, 1, (5, 3, 30, 60), dtype=np.float32)
        nd_rots = mx.nd.array([1, 2, 3, 4, 5], dtype=np.float32)
        args={'src': img_in, 'rotation_degrees': nd_rots, 'zoom_in': True, 'zoom_out': True}
        with pytest.raises(ValueError):
            mx.image.imrotate(**args)

        # single image exception - zoom_in=zoom_out=True
        img_in = mx.nd.random.uniform(0, 1, (3, 30, 60), dtype=np.float32)
        nd_rots = 11
        args = {'src': img_in, 'rotation_degrees': nd_rots, 'zoom_in': True, 'zoom_out': True}
        with pytest.raises(ValueError):
            mx.image.imrotate(**args)

        # batch of images with scalar rotation
        img_in = mx.nd.stack(nd_img, nd_img, nd_img)
        nd_rots = 6
        args = {'src': img_in, 'rotation_degrees': nd_rots, 'zoom_in': False, 'zoom_out': False}
        out = mx.image.imrotate(**args)
        for img in out:
            img = img.asnumpy().transpose((1, 2, 0))
            assert_almost_equal(scipy_rot[10:20, 20:40, :], img[10:20, 20:40, :])

        # single image exception - single image with vector rotation
        img_in = mx.nd.random.uniform(0, 1, (3, 30, 60), dtype=np.float32)
        nd_rots = mx.nd.array([1, 2, 3, 4, 5], dtype=np.float32)
        args = {'src': img_in, 'rotation_degrees': nd_rots, 'zoom_in': False, 'zoom_out': False}
        with pytest.raises(TypeError):
            mx.image.imrotate(**args)

    def test_random_rotate(self):
        angle_limits = [-5., 5.]
        src_single_image = mx.nd.random.uniform(0, 1, (3, 30, 60),
                                                dtype=np.float32)
        out_single_image = mx.image.random_rotate(src_single_image,
                                                  angle_limits)
        self.assertEqual(out_single_image.shape, (3, 30, 60))
        src_batch_image = mx.nd.stack(src_single_image,
                                      src_single_image,
                                      src_single_image)
        out_batch_image = mx.image.random_rotate(src_batch_image,
                                                 angle_limits)
        self.assertEqual(out_batch_image.shape, (3, 3, 30, 60))

