blob: aaed7055cf9148f0d6dc77c6e440e4be3e2b0677 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import sys
sys.path.insert(0, "../../python/")
import mxnet as mx
import numpy as np
import numpy.random as rnd
import copy
from mxnet.test_utils import assert_almost_equal
def check_diff_to_scalar(A, x, rank=None):
""" assert A == x"""
assert(np.sum(np.abs((A - x).asnumpy())) == 0), (rank, A.asnumpy(), x)
def compute_1bit(arr, curr_residual, threshold):
str_quant = ""
new_residual = []
decompr = []
for idx, val in np.ndenumerate(arr):
val += curr_residual[idx]
if val > threshold:
str_quant += "1"
new_residual.append(val - 1)
decompr.append(1)
else:
str_quant += "0"
new_residual.append(val + 1)
decompr.append(-1)
# append extra bits when size of array not a factor of 32
if len(str_quant) != 32:
str_quant += "0" * (32 - len(str_quant) % 32)
return str_quant, new_residual, decompr
def compute_2bit(arr, curr_residual, threshold):
str_quant = ""
new_residual = []
decompr = []
for idx, val in np.ndenumerate(arr):
val += curr_residual[idx]
if val >= threshold:
str_quant += "11"
new_residual.append(val - threshold)
decompr.append(threshold)
elif val <= -threshold:
str_quant += "10"
new_residual.append(val + threshold)
decompr.append(-threshold)
else:
str_quant += "00"
new_residual.append(val)
decompr.append(0)
# append extra bits when size of array not a factor of 16
if len(str_quant) % 16 != 0:
str_quant += "0" * (16 - len(str_quant) % 16)
return str_quant, new_residual, decompr
def compute_expected_quantization(arr, curr_residual, threshold, quantize_func):
from struct import pack,unpack
def as_float32(s):
return unpack("f",pack("I", int(s, 2)))[0]
arr_npy = arr.asnumpy()
# str_quant stores the quantized representation as a sequence of bits
str_quant, new_residual, decompr = quantize_func(arr_npy, curr_residual, threshold)
compr = []
# converts the string generated into integers 32chars at a time
for i in range(0, len(str_quant), 32):
cur_float = str_quant[i+24:i+32] + str_quant[i+16:i+24] + str_quant[i+8:i+16] + str_quant[i:i+8]
compr.append(as_float32(cur_float))
return np.array(compr), np.array(new_residual).reshape(arr.shape), np.array(decompr).reshape(arr.shape)
## individual key interface
def test_kvstore(kv_type, stype):
print(kv_type)
kv = mx.kv.create(kv_type)
kv.set_optimizer(mx.optimizer.create('test', learning_rate=-lr))
for k, s in zip(keys, shapes):
kv.init(k, mx.nd.zeros(s))
res = [np.zeros(s) for s in shapes]
for i in range(nrepeat):
for j in range(len(keys)):
kv.push(keys[j], [mx.nd.array(
data[i][j][g], mx.gpu(g)).tostype(stype) for g in range(nworker)])
res = [a + b * lr for a, b in zip(res, [sum(d) for d in data[i]])]
for j in range(len(keys)):
out = [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in range(nworker)]
kv.pull(keys[j], out=out)
err = [np.sum(np.abs(o.asnumpy() - res[j])) for o in out]
err = sum(err) / np.sum(np.abs(res[j]))
assert(err < 1e-6), (err, shapes[j])
def test_compress_kvstore(kv_type, compression='2bit', threshold=0.5):
print(kv_type + ' with ' + compression + ' compression and threshold is ' + str(threshold))
rate = 2
quantize_func = None
if compression == '1bit':
quantize_func = compute_1bit
elif compression == '2bit':
quantize_func = compute_2bit
else:
raise RuntimeError("Unknown gradient compression type!")
kv = mx.kv.create(kv_type)
kv.set_gradient_compression({'type':compression, 'threshold':threshold})
kv.set_optimizer(mx.optimizer.create('test', learning_rate=-rate))
for k, s in zip(keys, shapes):
kv.init(k, mx.nd.zeros(s))
# init one key with 1s so we can check if it was compressed during init
kv.init(gc_init_test_key, mx.nd.ones(shapes[0]))
# use different keys for random tests so that
# we can track residual from start
random_keys = [13, 15, 17]
for k, s in zip(random_keys, shapes):
kv.init(k, mx.nd.zeros(s))
def pull_init_test(kv):
# checks that compression is not applied to init of key
out = [mx.nd.zeros(shapes[0], mx.gpu(g)) for g in range(nworker)]
kv.pull(gc_init_test_key, out=out)
exp = np.ones_like(out[0].asnumpy())
for o in out:
assert_almost_equal(o.asnumpy(), exp)
def pull_before_push(kv):
for _ in range(nrepeat):
for j in range(len(keys)):
out = [mx.nd.ones(shapes[j], mx.gpu(g)) for g in range(nworker)]
kv.pull(keys[j], out=out)
exp = np.zeros_like(out[0].asnumpy())
for o in out:
assert_almost_equal(o.asnumpy(), exp)
def push_ones(kv, sign=1):
for i in range(nrepeat):
for j in range(len(keys)):
kv.push(keys[j], [sign * mx.nd.ones(shapes[j], mx.gpu(g)) for g in range(nworker)])
out = [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in range(nworker)]
kv.pull(keys[j], out=out)
if sign == 1:
exp = (i + 1) * rate * nworker * np.ones_like(out[0].asnumpy())
else:
exp = (nrepeat - i - 1) * rate * nworker * np.ones_like(out[0].asnumpy())
for o in out:
assert_almost_equal(o.asnumpy(), exp)
def verify_residual_1bit(kv, threshold, rate):
# current values must equal to zero
for j in range(len(keys)):
out = [mx.nd.ones(shapes[j], mx.gpu(g)) for g in range(nworker)]
kv.pull(keys[j], out=out)
exp = np.zeros_like(out[0].asnumpy())
for o in out:
assert_almost_equal(o.asnumpy(), exp)
curr_residual = 0
curr_val = rate * nworker if 2 > threshold else -rate * nworker
for j in range(len(keys)):
kv.push(keys[j], [2 * mx.nd.ones(shapes[j], mx.gpu(g)) for g in range(nworker)])
out = [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in range(nworker)]
kv.pull(keys[j], out=out)
for o in out:
check_diff_to_scalar(o, curr_val)
curr_residual = 1 if 2 > threshold else 3
curr_val += rate * nworker if 0 + curr_residual > threshold else -rate * nworker
for j in range(len(keys)):
kv.push(keys[j], [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in range(nworker)])
out = [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in range(nworker)]
kv.pull(keys[j], out=out)
for o in out:
check_diff_to_scalar(o, curr_val)
curr_residual += -1 if curr_residual > threshold else +1
curr_val += rate * nworker if -2 + curr_residual > threshold else -rate * nworker
for j in range(len(keys)):
kv.push(keys[j], [-2 * mx.nd.ones(shapes[j], mx.gpu(g)) for g in range(nworker)])
out = [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in range(nworker)]
kv.pull(keys[j], out=out)
for o in out:
check_diff_to_scalar(o, curr_val)
def push_zeros(kv):
for _ in range(nrepeat):
for j in range(len(keys)):
kv.push(keys[j], [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in range(nworker)])
out = [mx.nd.ones(shapes[j], mx.gpu(g)) for g in range(nworker)]
kv.pull(keys[j], out=out)
exp = np.zeros_like(out[0].asnumpy())
for o in out:
assert_almost_equal(o.asnumpy(), exp)
def verify_residual_2bit(kv, threshold, rate):
for j in range(len(keys)):
kv.push(keys[j], [mx.nd.ones(shapes[j], mx.gpu(g))*0.4 for g in range(nworker)])
out = [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in range(nworker)]
kv.pull(keys[j],out=out)
for o in out:
check_diff_to_scalar(o, 0)
kv.push(keys[j], [mx.nd.ones(shapes[j], mx.gpu(g))*(threshold-0.3) for g in range(nworker)])
out = [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in range(nworker)]
kv.pull(keys[j],out=out)
curval = threshold * rate * nworker
for o in out:
check_diff_to_scalar(o, curval)
kv.push(keys[j], [mx.nd.ones(shapes[j], mx.gpu(g))*(0.2) for g in range(nworker)])
out = [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in range(nworker)]
kv.pull(keys[j],out=out)
for o in out:
check_diff_to_scalar(o, curval)
kv.push(keys[j], [mx.nd.ones(shapes[j], mx.gpu(g))*(threshold-0.3) for g in range(nworker)])
out = [mx.nd.zeros(shapes[j], mx.gpu(g)) for g in range(nworker)]
kv.pull(keys[j],out=out)
curval += threshold*rate*nworker
for o in out:
check_diff_to_scalar(o, curval)
# residual would be 0 now
return curval
def check_neg(kv, neg, rate, curval):
for _ in range(nrepeat):
curval = curval + rate*nworker*neg
for j in range(len(keys)):
kv.push(keys[j], [mx.nd.ones(shapes[j], mx.gpu(g))*neg for g in range(nworker)])
out = [mx.nd.ones(shapes[j], mx.gpu(g)) for g in range(nworker)]
kv.pull(keys[j], out=out)
for o in out:
check_diff_to_scalar(o, curval)
# residual would be 0 again
def check_compr_random(kv, threshold):
for k, s in zip(random_keys, shapes):
curr_residual = [np.zeros(s) for g in range(nworker)]
orig_val = [mx.nd.zeros(s, mx.gpu(g)) for g in range(nworker)]
kv.pull(k, out=orig_val)
grads = [mx.nd.random_uniform(-0.6, 0.6, shape=s, ctx=mx.gpu(g)) for g in range(nworker)]
grads_cpy = copy.deepcopy(grads)
kv.push(k, grads)
val = [mx.nd.zeros(s, mx.gpu(g)) for g in range(nworker)]
kv.pull(k, out=val)
diffs = [val[g] - orig_val[g] for g in range(nworker)]
# compute expected by using simulation of operator
# on cpu
sum_dequantized_vals = np.zeros(s)
for g in range(nworker):
compr, curr_residual[g], decompr = compute_expected_quantization(
grads_cpy[g], curr_residual[g], threshold, quantize_func)
sum_dequantized_vals += (decompr * rate)
for g in range(nworker):
assert_almost_equal(diffs[g].asnumpy(), sum_dequantized_vals)
pull_init_test(kv)
pull_before_push(kv)
if compression == '1bit':
push_ones(kv, sign=1)
push_ones(kv, sign=-1)
verify_residual_1bit(kv, threshold, rate)
elif compression == '2bit':
push_zeros(kv)
curval = verify_residual_2bit(kv, threshold, rate)
check_neg(kv, -1*threshold, rate, curval)
check_compr_random(kv, threshold)
## group keys interface
def test_group_kvstore(kv_type, stype):
print(kv_type)
kv = mx.kv.create(kv_type)
kv.set_optimizer(mx.optimizer.create('test', learning_rate=-lr))
kv.init(keys, [mx.nd.zeros(s) for s in shapes])
res = [np.zeros(s) for s in shapes]
out = [[mx.nd.zeros(s, mx.gpu(g)) for g in range(nworker)] for s in shapes]
for i in range(nrepeat):
kv.push(keys, [[
mx.nd.array(data[i][j][g], mx.gpu(g)).tostype(stype) for g in range(nworker)]
for j in range(len(keys))])
kv.pull(keys, out=out)
res = [a + b * lr for a, b in zip(res, [sum(d) for d in data[i]])]
for a, b in zip(res, out):
err = [np.sum(np.abs(o.asnumpy() - a)) for o in b]
err = sum(err) / np.sum(np.abs(a))
assert(err < 1e-6), (err, a.shape)
if __name__ == "__main__":
keys = [3, 5, 7]
# let the last shape exceed MXNET_KVSTORE_BIGARRAY_BOUND
shapes = [(4, 4), (100, 100), (2000, 2000)]
stypes = ['default', 'row_sparse']
gc_init_test_key = 9
lr = .1
nworker = 4
nrepeat = 10
# generate data
data = [[[np.random.random(s)*2-1 for i in range(nworker)] for s in shapes] for j in range(nrepeat)]
for stype in stypes:
test_kvstore('local_update_cpu', stype)
test_kvstore('local_allreduce_cpu', stype)
test_kvstore('local_allreduce_device', stype)
## compression for local kvstore happens only when reduce is on device
test_compress_kvstore('local_allreduce_device', '1bit', -.5)
test_compress_kvstore('local_allreduce_device', '1bit', 0)
test_compress_kvstore('local_allreduce_device', '1bit', .5)
test_compress_kvstore('local_allreduce_device', '2bit', .5)
for stype in stypes:
test_group_kvstore('local_update_cpu', stype)
test_group_kvstore('local_allreduce_cpu', stype)
test_group_kvstore('local_allreduce_device', stype)