blob: 275989493b3c8c78230e595bdf8599e4205e6721 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import tvm
import tvm.relay
import tvm.testing
import tvm.topi
import numpy as np
import scipy.stats
def threefry_split(target, dev, gen):
gen_placeholder = tvm.te.placeholder(gen.shape, name="gen", dtype="uint64")
left_placeholder, right_placeholder = tvm.topi.random.threefry_split(gen_placeholder)
s = tvm.topi.generic.schedule_extern([left_placeholder, right_placeholder])
f = tvm.build(s, [gen_placeholder, left_placeholder, right_placeholder])
left = tvm.nd.array(np.zeros(gen.shape, dtype="uint64"))
right = tvm.nd.array(np.zeros(gen.shape, dtype="uint64"))
f(tvm.nd.array(gen), left, right)
return left.numpy(), right.numpy()
def threefry_generate(target, dev, gen, size):
gen_placeholder = tvm.te.placeholder(gen.shape, name="gen", dtype="uint64")
left_placeholder, right_placeholder = tvm.topi.random.threefry_generate(gen_placeholder, size)
s = tvm.topi.generic.schedule_extern([left_placeholder, right_placeholder])
f = tvm.build(s, [gen_placeholder, left_placeholder, right_placeholder])
out_gen = tvm.nd.array(np.zeros(gen.shape, dtype="uint64"))
rands = tvm.nd.array(np.zeros(size, dtype="uint64"))
f(tvm.nd.array(gen), out_gen, rands)
return out_gen.numpy(), rands.numpy()
def uniform(target, dev, gen, low, high, size, dtype):
gen_placeholder = tvm.te.placeholder(gen.shape, name="gen", dtype="uint64")
low_placeholder = tvm.te.placeholder(low.shape, name="low", dtype=dtype)
high_placeholder = tvm.te.placeholder(high.shape, name="high", dtype=dtype)
left_placeholder, right_placeholder = tvm.topi.random.uniform(
gen_placeholder, low_placeholder, high_placeholder, size, dtype
)
s = tvm.topi.generic.schedule_extern([left_placeholder, right_placeholder])
f = tvm.build(
s,
[gen_placeholder, low_placeholder, high_placeholder, left_placeholder, right_placeholder],
target=target,
)
out_gen = tvm.nd.array(np.zeros(gen.shape, dtype="uint64"), device=dev)
rands = tvm.nd.array(np.zeros(size, dtype=dtype), device=dev)
f(
tvm.nd.array(gen, device=dev),
tvm.nd.array(low, device=dev),
tvm.nd.array(high, device=dev),
out_gen,
rands,
)
return out_gen.numpy(), rands.asnumpy()
def multinomial(target, dev, gen, probs, num_samples):
gen_placeholder = tvm.te.placeholder(gen.shape, name="gen", dtype="uint64")
probs_placeholder = tvm.te.placeholder(probs.shape, name="probs", dtype="float32")
new_gen_placeholder, indices_placeholder = tvm.topi.random.multinomial(
gen_placeholder, probs_placeholder, num_samples
)
s = tvm.topi.generic.schedule_extern([new_gen_placeholder, indices_placeholder])
f = tvm.build(
s,
[gen_placeholder, probs_placeholder, new_gen_placeholder, indices_placeholder],
target=target,
)
out_gen = tvm.nd.array(np.zeros(gen.shape, dtype="uint64"), device=dev)
indices = tvm.nd.array(np.zeros((*probs.shape[:-1], num_samples), dtype="int32"), device=dev)
f(tvm.nd.array(gen), tvm.nd.array(probs), out_gen, indices)
return out_gen.numpy(), indices.asnumpy()
@tvm.testing.parametrize_targets("llvm")
def test_threefry_split(target, dev):
# test that results of split do not equal eachother or the input
gen = tvm.relay.random.threefry_key(0).data.numpy()
a, b = threefry_split(target, dev, gen)
assert (a != b).any() and (
a != gen
).any(), "Splitting a gen should result in different output gens"
# unittest some split inputs
assert (a == np.array([0, 0, 0, 0, 0, 0, 0, 0, 1 << 62, 0], dtype="uint64")).all()
assert (b == np.array([0, 0, 0, 0, 1 << 63, 0, 0, 0, 1 << 62, 0], dtype="uint64")).all()
# test enough splits to go over path length
for i in range(129):
a, b = threefry_split(target, dev, b)
assert (a[0:4] == b[0:4]).all(), "State part of split should be the same"
assert (b[0:4] != np.zeros(4, dtype="uint64")).any()
# check that split then generate does not generate the same for both sides
a, a_rands = threefry_generate(target, dev, a, (100,))
b, b_rands = threefry_generate(target, dev, b, (100,))
assert (
a_rands != b_rands
).all(), "Numbers generated from different initial states should be different"
# check repeatability
_, rands1 = threefry_generate(target, dev, a, (100,))
_, rands2 = threefry_generate(target, dev, a, (100,))
assert (
rands1 == rands2
).all(), "Numbers generated from the same initial state should be the same"
a1, b1 = threefry_split(target, dev, a)
a2, b2 = threefry_split(target, dev, a)
assert (a1 == a2).all() and (
b1 == b2
).all(), "Split called on the same input should return the same result"
@tvm.testing.parametrize_targets("llvm")
def test_threefry_generate(target, dev):
gen = tvm.relay.random.threefry_key(0).data.numpy()
# check that we can generate some data
a, rands = threefry_generate(target, dev, gen, (2048,))
assert (
rands.shape[0] == 2048 and len(rands.shape) == 1
), "Output shape should match requested shape"
# check that gen out does not equal input
assert (a != gen).any(), "Output generator should be different from input generator"
# check that we can generate data whose total number of elements is not a multiple of 4.
a, rands = threefry_generate(target, dev, gen, (7,))
assert (
rands.shape[0] == 7 and len(rands.shape) == 1
), "Output shape should match requested shape"
# test enough generates to go over generate limit
gen = np.array(
[0, 0, 0, 0, 0, 0, 0, 2**64 - 2, 1 << 63, 0], dtype="uint64"
) # make counter large
a, rands = threefry_generate(target, dev, gen, (2048,))
assert gen[4] != a[4], "Overflow of counter should trigger path change"
assert a[7] == 2048, "Overflow of counter should still update counter"
# check generate with path at length limit
gen = np.array([0, 0, 0, 0, 0, 0, 0, 2**64 - 2, 0, 0], dtype="uint64") # make counter large
a, rands = threefry_generate(target, dev, gen, (2048,))
assert (
gen[0:4] != a[0:4]
).any(), "Overflowing counter with no space left in path should change state"
@tvm.testing.parametrize_targets("llvm")
def test_threefry_wrapping(target, dev):
assert tvm.topi.random.threefry_test_wrapping(
target, dev
), f"{target} does not suppport wrapping unsigned integer arithmetic"
@tvm.testing.parametrize_targets("llvm")
def test_uniform(target, dev):
gen = tvm.relay.random.threefry_key(0).data.numpy()
m = 1024
n = 1024
dtypes = ["float32", "float64"]
for dtype in dtypes:
low = np.array(5.0, dtype=dtype)
high = np.array(10.0, dtype=dtype)
new_gen, rands = uniform(target, dev, gen, low, high, (m, n), dtype)
assert (gen != new_gen).any()
assert abs(np.mean(rands) - 7.5) < 1e-1
assert np.min(rands) >= 5.0
assert np.max(rands) <= 10.0
@tvm.testing.parametrize_targets("llvm")
def test_multinomial(target, dev):
def _verify_multinomial(size, num_samples, test_statistics=False):
gen = tvm.relay.random.threefry_key(np.random.randint(0, 1e5)).data.numpy()
probs = np.random.randint(low=-50, high=1000, size=size).astype("float32")
new_gen, indices = multinomial(target, dev, gen, probs, num_samples)
assert (gen != new_gen).any()
assert np.min(indices) >= 0
assert np.max(indices) < probs.shape[-1]
# Note, only use test_statistics with sample size > 10,000.
if test_statistics:
# Clipped and normalized probabilities * number of samples
# represents expected frequency of each category.
# First upcast to float64 to remove numerical error.
probs = probs.astype("float64")
probs = np.reshape(probs, [-1, probs.shape[-1]])
probs = np.maximum(probs, 0)
probs = probs / np.expand_dims(np.sum(probs, axis=-1), axis=-1)
# Multiply by number of samples and add epsilon to get non-zero expected samples per index.
expected_frequency = probs * num_samples + np.finfo(float).eps
# Do a small adjustment to make sure each row of expected_frequencies sums to exactly num_samples.
expected_frequency = (
np.expand_dims((num_samples / np.sum(expected_frequency, axis=-1)), axis=-1)
* expected_frequency
)
# Reduce shape to a 2D matrix.
indices = np.reshape(indices, [-1, indices.shape[-1]])
# Split indendent rows of indices.
index_list = [np.squeeze(x, 0) for x in np.split(indices, indices.shape[0], axis=0)]
# Count frequency of selected indices in each row.
observed_freqs = [np.bincount(samples, minlength=size[-1]) for samples in index_list]
# Stack observed frequencies back into a matrix.
observed_freqs = np.stack(observed_freqs, axis=0)
# Test how closely observed samples match expectations.
_, p_value = scipy.stats.chisquare(observed_freqs, expected_frequency, axis=-1)
# If sampled correctly, p_value should be greater than 1e-6 almost all the time.
assert np.all(p_value > 1e-6)
# Test simple 1-D case.
_verify_multinomial([3], 2)
# Test 2-D case.
_verify_multinomial([2, 10], 1)
# Test 3-D case.
_verify_multinomial([2, 3, 10], 4)
# Test large sample size statistics.
_verify_multinomial([3, 10], 10000, test_statistics=True)
if __name__ == "__main__":
test_threefry_split(tvm.target.Target("llvm"), tvm.device("cpu"))
test_threefry_generate(tvm.target.Target("llvm"), tvm.device("cpu"))
test_threefry_wrapping(tvm.target.Target("llvm"), tvm.device("cpu"))
test_uniform(tvm.target.Target("llvm"), tvm.device("cpu"))
test_multinomial(tvm.target.Target("llvm"), tvm.device("cpu"))