blob: 356977437431c333754250a1ecce6096b3a3af88 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: skip-file
import sys
import os
import mxnet as mx
import numpy as np
import pytest
from mxnet.test_utils import assert_almost_equal, default_device, environment
curr_path = os.path.dirname(os.path.abspath(os.path.expanduser(__file__)))
sys.path.insert(0, os.path.join(curr_path, '../unittest'))
shape = (4, 4)
keys = [5, 7, 11]
str_keys = ['b', 'c', 'd']
def init_kv_with_str(stype='default', kv_type='local'):
"""init kv """
kv = mx.kv.create(kv_type)
# single
kv.init('a', mx.nd.zeros(shape, stype=stype))
# list
kv.init(str_keys, [mx.nd.zeros(shape=shape, stype=stype)] * len(keys))
return kv
# 1. Test seed 89411477 (module seed 1829754103) resulted in a py3-gpu CI runner core dump.
# 2. Test seed 1155716252 (module seed 1032824746) resulted in py3-dnnl-gpu have error
# src/operator/nn/dnnl/dnnl_base.cc:567: Check failed: similar
# Both of them are not reproducible, so this test is back on random seeds.
@pytest.mark.skipif(mx.device.num_gpus() < 2, reason="test_rsp_push_pull needs more than 1 GPU")
@pytest.mark.skip("Flaky test https://github.com/apache/incubator-mxnet/issues/14189")
@pytest.mark.serial
def test_rsp_push_pull():
def check_rsp_push_pull(kv_type, sparse_pull, is_push_cpu=True):
kv = init_kv_with_str('row_sparse', kv_type)
kv.init('e', mx.nd.ones(shape).tostype('row_sparse'))
push_ctxs = [mx.cpu(i) if is_push_cpu else mx.gpu(i) for i in range(2)]
kv.push('e', [mx.nd.ones(shape, ctx=context).tostype('row_sparse') for context in push_ctxs])
def check_rsp_pull(kv, ctxs, sparse_pull, is_same_rowid=False, use_slice=False):
count = len(ctxs)
num_rows = shape[0]
row_ids = []
all_row_ids = np.arange(num_rows)
vals = [mx.nd.sparse.zeros(shape=shape, ctx=ctxs[i], stype='row_sparse') for i in range(count)]
if is_same_rowid:
row_id = np.random.randint(num_rows, size=num_rows)
row_ids = [mx.nd.array(row_id)] * count
elif use_slice:
total_row_ids = mx.nd.array(np.random.randint(num_rows, size=count*num_rows))
row_ids = [total_row_ids[i*num_rows : (i+1)*num_rows] for i in range(count)]
else:
for _ in range(count):
row_id = np.random.randint(num_rows, size=num_rows)
row_ids.append(mx.nd.array(row_id))
row_ids_to_pull = row_ids[0] if (len(row_ids) == 1 or is_same_rowid) else row_ids
vals_to_pull = vals[0] if len(vals) == 1 else vals
kv.row_sparse_pull('e', out=vals_to_pull, row_ids=row_ids_to_pull)
for val, row_id in zip(vals, row_ids):
retained = val.asnumpy()
excluded_row_ids = np.setdiff1d(all_row_ids, row_id.asnumpy())
for row in range(num_rows):
expected_val = np.zeros_like(retained[row])
expected_val += 0 if row in excluded_row_ids else 2
assert_almost_equal(retained[row], expected_val)
if sparse_pull is True:
kv.pull('e', out=vals_to_pull, ignore_sparse=False)
for val in vals:
retained = val.asnumpy()
expected_val = np.zeros_like(retained)
expected_val[:] = 2
assert_almost_equal(retained, expected_val)
check_rsp_pull(kv, [mx.gpu(0)], sparse_pull)
check_rsp_pull(kv, [mx.cpu(0)], sparse_pull)
check_rsp_pull(kv, [mx.gpu(i//2) for i in range(4)], sparse_pull)
check_rsp_pull(kv, [mx.gpu(i//2) for i in range(4)], sparse_pull, is_same_rowid=True)
check_rsp_pull(kv, [mx.cpu(i) for i in range(4)], sparse_pull)
check_rsp_pull(kv, [mx.cpu(i) for i in range(4)], sparse_pull, is_same_rowid=True)
check_rsp_pull(kv, [mx.gpu(i//2) for i in range(4)], sparse_pull, use_slice=True)
check_rsp_pull(kv, [mx.cpu(i) for i in range(4)], sparse_pull, use_slice=True)
envs = [None, '1']
key = 'MXNET_KVSTORE_USETREE'
for val in envs:
with environment(key, val):
if val is '1':
sparse_pull = False
else:
sparse_pull = True
check_rsp_push_pull('local', sparse_pull)
check_rsp_push_pull('device', sparse_pull)
check_rsp_push_pull('device', sparse_pull, is_push_cpu=False)
def test_row_sparse_pull_single_device():
kvstore = mx.kv.create('device')
copy = mx.nd.random_normal(shape=(4,4), ctx=mx.gpu(0))
grad = copy.tostype("row_sparse")
key = 0
kvstore.init(key, grad)
idx = grad.indices
kvstore.push(key, grad)
kvstore.row_sparse_pull(key, out=grad, row_ids=idx)
assert_almost_equal(grad.asnumpy(), copy.asnumpy())
@pytest.mark.serial
def test_rsp_push_pull_large_rowid():
num_rows = 793470
val = mx.nd.ones((num_rows, 1)).tostype('row_sparse').copyto(mx.gpu())
kv = mx.kv.create('device')
kv.init('a', val)
out = mx.nd.zeros((num_rows,1), stype='row_sparse').copyto(mx.gpu())
kv.push('a', val)
kv.row_sparse_pull('a', out=out, row_ids=mx.nd.arange(0, num_rows, dtype='int64'))
assert(out.indices.shape[0] == num_rows)