blob: 75b48f42c5e89715a083360ad5604037e30001fe [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import sys
sys.path.insert(0, "../../python/")
import mxnet as mx
import numpy as np
import numpy.random as rnd
import time
def check_diff_to_scalar(A, x, rank=None):
""" assert A == x"""
assert(np.sum(np.abs((A - x).asnumpy())) == 0), (rank, A.asnumpy(), x)
# setup
keys = ['3', '5', '7']
init_test_keys = [str(i) for i in range(200,300)]
init_test_keys_big = [str(i) for i in range(300,400)]
init_test_keys_device = [str(i) for i in range(400,500)]
init_test_keys_device_big = [str(i) for i in range(500,600)]
rate = 2
shape = (2, 3)
big_shape = (1200, 1200) # bigger than MXNET_KVSTORE_BIGARRAY_BOUND
kv = mx.kv.create('dist_device_sync')
def init_kv():
# init kv dns keys
kv.init(keys, [mx.nd.ones(shape)] * len(keys))
kv.init('99', mx.nd.ones(big_shape))
# worker info
my_rank = kv.rank
nworker = kv.num_workers
# init updater on servers
kv.set_optimizer(mx.optimizer.create('test', rescale_grad=rate))
return kv, my_rank, nworker
def test_sync_push_pull():
kv, my_rank, nworker = init_kv()
num_gpus = 2
def check_default_keys(kv, my_rank, nworker):
nrepeat = 3
# checks pull after push in loop, because behavior during
# consecutive pushes doesn't offer any guarantees
for i in range(nrepeat):
scale = my_rank + 1
kv.push('3', [mx.nd.ones(shape, ctx=mx.gpu(j)) * scale for j in range(num_gpus)])
kv.push('99', [mx.nd.ones(big_shape, ctx=mx.gpu(j)) * scale for j in range(num_gpus)])
num = (nworker + 1) * nworker * rate * num_gpus / 2 * (i + 1) + 1
val = mx.nd.zeros(shape)
kv.pull('3', out=val)
check_diff_to_scalar(val, num)
val2 = mx.nd.zeros(big_shape)
kv.pull('99', out=val2)
check_diff_to_scalar(val2, num)
check_default_keys(kv, my_rank, nworker)
print('worker ' + str(my_rank) + ' is done')
def test_sync_init():
def check_init(kv, cur_keys, cur_shape, device=False):
ctx = mx.gpu(0) if device else mx.cpu()
val = [mx.nd.zeros(cur_shape, ctx) for i in cur_keys]
for i in range(len(cur_keys)):
expected = i
kv.init(cur_keys[i], [mx.nd.ones(cur_shape, ctx) * i])
kv.pull(cur_keys[i], out=val[i])
check_diff_to_scalar(val[i], expected)
check_init(kv, init_test_keys, shape)
check_init(kv, init_test_keys_big, big_shape)
check_init(kv, init_test_keys_device, shape, device=True)
check_init(kv, init_test_keys_device_big, big_shape, device=True)
my_rank = kv.rank
print('worker ' + str(my_rank) + ' is initialized')
if __name__ == "__main__":
test_sync_init()
test_sync_push_pull()