| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import ctypes |
| |
| import os |
| import time |
| import argparse |
| import subprocess |
| import scipy.sparse as sp |
| |
| import mxnet as mx |
| import numpy as np |
| import numpy.random as rnd |
| from mxnet.test_utils import rand_ndarray, set_default_device, assert_almost_equal, get_bz2_data |
| from mxnet.base import check_call, _LIB |
| from util import estimate_density |
| |
| PARSER = argparse.ArgumentParser(description="Benchmark sparse operators", |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter) |
| PARSER.add_argument('--num-omp-threads', type=int, |
| default=1, help='number of omp threads to set in MXNet') |
| PARSER.add_argument('--gpu', action='store_true', |
| help="to be run on gpu") |
| # TODO: Use logging later |
| PARSER.add_argument('--verbose', action='store_true', |
| help="Verbose output") |
| ARGS = PARSER.parse_args() |
| |
| # some data information |
| KDDA = { |
| 'data_mini': 'kdda.t.mini', |
| 'data_name': 'kdda.t', |
| 'data_origin_name': 'kdda.t.bz2', |
| 'url': "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/kdda.t.bz2", |
| 'feature_dim': 20216830, |
| 'm': [1, 8, 32], |
| 'batch_size': [64], |
| 'default_index': {'batch_size': 0, |
| 'output_dim': 2}, |
| 'num_batches': 10 |
| } |
| |
| AVAZU = { |
| 'data_mini': 'avazu-app.t.mini', |
| 'data_name': 'avazu-app.t', |
| 'data_origin_name': 'avazu-app.t.bz2', |
| 'url': "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/avazu-app.t.bz2", |
| 'feature_dim': 1000000, |
| 'm': [1, 1000, 2000], |
| 'batch_size': [128, 256], |
| 'default_index': {'batch_size': 0, |
| 'output_dim': 1}, |
| 'num_batches': 10 |
| } |
| |
| CRITEO = { |
| 'data_mini': 'criteo.t.mini', |
| 'data_name': 'criteo.t', |
| 'data_origin_name': 'criteo.t.bz2', |
| 'url': "https://s3-us-west-2.amazonaws.com/sparse-dataset/criteo.t.bz2", |
| 'feature_dim': 8388621, |
| 'm': [1, 8, 16, 32, 64], |
| 'batch_size': [64, 128], |
| 'default_index': {'batch_size': 1, |
| 'output_dim': 3}, |
| 'num_batches': 10 |
| } |
| |
| SYNTHETIC1 = { |
| 'feature_dim': [1000000], |
| 'm': [256, 1000], |
| 'density': [0.001, 0.005, 0.01, 0.02, 0.05, |
| 0.1, 0.2, 0.5, 0.65], |
| 'batch_size': [64, 128], |
| 'default_index': {'batch_size': 1, |
| 'density': 2, |
| 'output_dim': 1, |
| 'feature_dim': 0}, |
| 'num_repeat': 10 |
| } |
| |
| SYNTHETIC2 = { |
| 'feature_dim': [8000000, 16000000], |
| 'm': [1, 32], |
| 'density': [0.001, 0.005, 0.01, 0.02, 0.05, |
| 0.1, 0.2, 0.5, 0.65], |
| 'batch_size': [64, 128], |
| 'default_index': {'batch_size': 1, |
| 'density': 2, |
| 'output_dim': 1, |
| 'feature_dim': 0}, |
| 'num_repeat': 10 |
| } |
| |
| def measure_cost(repeat, scipy_trans_lhs, scipy_dns_lhs, func_name, *args, **kwargs): |
| """Measure time cost of running a function |
| """ |
| mx.nd.waitall() |
| args_list = [] |
| for arg in args: |
| args_list.append(arg) |
| start = time.time() |
| if scipy_trans_lhs: |
| args_list[0] = np.transpose(args_list[0]) if scipy_dns_lhs else sp.spmatrix.transpose(args_list[0]) |
| for _ in range(repeat): |
| func_name(*args_list, **kwargs) |
| mx.nd.waitall() |
| end = time.time() |
| diff = end - start |
| return diff / repeat |
| |
| |
| def _get_iter(path, data_shape, batch_size): |
| data_train = mx.io.LibSVMIter(data_libsvm=path, |
| data_shape=data_shape, |
| batch_size=batch_size) |
| data_iter = iter(data_train) |
| return data_iter |
| |
| |
| def _line_count(path): |
| return int(subprocess.check_output('wc -l {}'.format(path), shell=True).split()[0]) |
| |
| |
| def _compare_sparse_dense(data_dir, file_name, mini_file_name, feature_dim, |
| output_dim, density, batch_size, num_batches=3, num_repeat=5, transpose=False, |
| rsp=False): |
| |
| def create_mini_path(mini_path, path, num_batches): |
| """Samples batches of size: batch_size, total number: num_batches |
| from the dataset files for running benchmarks""" |
| if not os.path.exists(mini_path): |
| last = _line_count(path) - num_batches * batch_size |
| last = last if last >= 1 else 1 |
| start = int(rnd.uniform(1, last)) |
| os.system("sed -n '{},{}p' {} > {}".format( |
| start, start + num_batches * batch_size, repr(path), repr(mini_path))) |
| assert os.path.exists(mini_path) |
| |
| def run_benchmark(mini_path): |
| """Run benchmarks |
| """ |
| data_shape = (feature_dim, ) |
| train_iter = _get_iter(mini_path, data_shape, batch_size) |
| weight_row_dim = batch_size if transpose else feature_dim |
| weight_shape = (weight_row_dim, output_dim) |
| if not rsp: |
| weight = mx.nd.random.uniform(low=0, high=1, shape=weight_shape) |
| else: |
| weight = rand_ndarray(weight_shape, "row_sparse", density=0.05, distribution="uniform") |
| total_cost = {} |
| average_cost = {} |
| count = 0 |
| total_cost["sparse"] = 0. |
| total_cost["dense"] = 0. |
| for _ in train_iter: |
| csr_data = train_iter.getdata() |
| dns_data = csr_data.tostype('default') |
| cost_sparse = measure_cost(num_repeat, False, False, mx.nd.sparse.dot, csr_data, weight, transpose_a=transpose) |
| cost_dense = measure_cost(num_repeat, False, False, mx.nd.dot, dns_data, weight, transpose_a=transpose) |
| total_cost["sparse"] += cost_sparse |
| total_cost["dense"] += cost_dense |
| count = count + 1 |
| average_cost["sparse"] = total_cost["sparse"] / count |
| average_cost["dense"] = total_cost["dense"] / count |
| return (average_cost["sparse"], average_cost["dense"]) |
| |
| def print_result(average_cost_sparse, average_cost_dense): |
| """Print result of comparison between sparse and dense |
| """ |
| ratio = average_cost_dense / average_cost_sparse |
| fmt = '{:15.4f} {:10d} {:10d} {:10d} {:20.2f} {:15.2f} {:15.2f} {:10} {:10}' |
| print(fmt.format(density * 100, batch_size, output_dim, feature_dim, |
| ratio, average_cost_dense*1000, average_cost_sparse*1000, |
| transpose, rsp)) |
| |
| mini_path = os.path.join(data_dir, mini_file_name) |
| path = os.path.join(data_dir, file_name) |
| create_mini_path(mini_path, path, num_batches) |
| average_cost_sparse, average_cost_dense = run_benchmark(mini_path) |
| print_result(average_cost_sparse, average_cost_dense) |
| |
| |
| def test_dot_real(data_dict): |
| """Dot operator testing with real datasets""" |
| data_dir = os.path.join(os.getcwd(), 'data') |
| |
| path = os.path.join(data_dir, data_dict['data_name']) |
| if not os.path.exists(path): |
| get_bz2_data( |
| data_dir, |
| data_dict['data_name'], |
| data_dict['url'], |
| data_dict['data_origin_name'] |
| ) |
| assert os.path.exists(path) |
| |
| k = data_dict['feature_dim'] |
| m = data_dict['m'] |
| batch_size_list = data_dict['batch_size'] |
| |
| default_output_index = data_dict['default_index']['output_dim'] |
| default_batch_size_index = data_dict['default_index']['batch_size'] |
| density = estimate_density(path, data_dict['feature_dim']) |
| num_batches = data_dict['num_batches'] |
| |
| assert default_batch_size_index < len(batch_size_list) |
| assert default_output_index < len(m) |
| if ARGS.verbose: |
| print(f"Running Benchmarking on {repr(data_dict['data_mini'])} data") |
| print('{:>15} {:>10} {:>10} {:>10} {:>20} {:>15} {:>15} {:>10} {:>10}'.format('density(%)', |
| 'n', |
| 'm', |
| 'k', |
| 't_dense/t_sparse', |
| 't_dense(ms)', |
| 't_sparse(ms)', |
| 'is_transpose', |
| 'rhs_rsp')) |
| |
| for output_dim in m: |
| _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'], |
| k, output_dim, density, |
| batch_size_list[default_batch_size_index], num_batches) |
| _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'], |
| k, output_dim, density, |
| batch_size_list[default_batch_size_index], num_batches, |
| transpose=True) |
| _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'], |
| k, output_dim, density, |
| batch_size_list[default_batch_size_index], num_batches, rsp=True) |
| |
| for batch_size in batch_size_list: |
| _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'], |
| k, m[default_output_index], density, batch_size, num_batches) |
| _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'], |
| k, m[default_output_index], density, batch_size, num_batches, |
| transpose=True) |
| _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'], |
| k, output_dim, density, |
| batch_size_list[default_batch_size_index], num_batches, rsp=True) |
| |
| |
| def test_dot_synthetic(data_dict): |
| """benchmark sparse mxnet dot and scipy dot operator with matrices of given density. |
| `t_sparse` is the runtime of the invoked sparse dot operator in ms, while `t_dense` is the |
| runtime of dot(dns, dns), with the same matrices except that they are in default storage type. |
| """ |
| # Benchmark MXNet and Scipys dot operator |
| def bench_dot(lhs_shape, rhs_shape, lhs_stype, rhs_stype, |
| lhs_den, rhs_den, trans_lhs, ctx, num_repeat=10, fw="mxnet", distribution="uniform"): |
| set_default_device(ctx) |
| assert fw == "mxnet" or fw == "scipy" |
| # Set funcs |
| dot_func_sparse = mx.nd.sparse.dot if fw == "mxnet" else sp.spmatrix.dot |
| dot_func_dense = mx.nd.dot if fw == "mxnet" else np.dot |
| # Create matrix instances |
| lhs_nd = rand_ndarray(lhs_shape, lhs_stype, density=lhs_den, distribution=distribution) |
| # only uniform distribution supported for rhs |
| if rhs_stype == 'csr': |
| rhs_nd = rand_ndarray(rhs_shape, rhs_stype, density=rhs_den, distribution=distribution) |
| else: |
| rhs_nd = rand_ndarray(rhs_shape, rhs_stype, density=rhs_den, distribution="uniform") |
| lhs_dns = None |
| rhs_dns = None |
| dense_cost = None |
| sparse_cost = None |
| |
| if fw == "mxnet": |
| lhs_dns = lhs_nd if lhs_stype == 'default' else lhs_nd.tostype('default') |
| rhs_dns = rhs_nd if rhs_stype == 'default' else rhs_nd.tostype('default') |
| # One warm up run, verify correctness |
| out = dot_func_sparse(lhs_nd, rhs_dns, trans_lhs) |
| out_expected = dot_func_dense(lhs_dns, rhs_dns, trans_lhs) |
| assert_almost_equal(out.asnumpy(), out_expected.asnumpy(), rtol=1e-1, atol=1e-1) |
| sparse_cost = measure_cost(num_repeat, False, False, dot_func_sparse, lhs_nd, rhs_nd, trans_lhs) |
| dense_cost = measure_cost(num_repeat, False, False, dot_func_dense, lhs_dns, rhs_dns, trans_lhs) |
| else: |
| lhs_dns = lhs_nd.asnumpy() |
| rhs_dns = rhs_nd.asnumpy() |
| lhs_nd = sp.csr_matrix(lhs_nd.asnumpy()) |
| rhs_nd = rhs_nd.asnumpy() |
| # One warm up run, verify correctness |
| lhs_nd_copy = sp.spmatrix.transpose(lhs_nd) if trans_lhs else lhs_nd |
| out = dot_func_sparse(lhs_nd_copy, rhs_dns) |
| sparse_cost = measure_cost(num_repeat, trans_lhs, False, dot_func_sparse, lhs_nd, rhs_nd) |
| dense_cost = measure_cost(num_repeat, trans_lhs, True, dot_func_dense, lhs_dns, rhs_dns) |
| |
| speedup = dense_cost / sparse_cost |
| # Print results |
| m = lhs_shape[0] |
| k = lhs_shape[1] |
| n = rhs_shape[1] |
| result_pattern = '{:15.1f} {:15.1f} {:>10} {:8d} {:8d} {:8d} {:13.2f} {:13.2f} {:8.2f}' |
| results = result_pattern.format(lhs_den*100, |
| rhs_den*100, |
| str(ctx), |
| m, |
| k, |
| n, |
| sparse_cost*1000, |
| dense_cost*1000, |
| speedup) |
| print(results) |
| |
| def print_benchmark_info(lhs, rhs, lhs_trans, fw): |
| trans_str = "^T" if lhs_trans else "" |
| print("========================================================") |
| print(f" {fw} sparse dot benchmark: dot({lhs}, {rhs}) = {rhs} ") |
| print( |
| f" (matrix multiplication: (m x k){trans_str} * (k x n) = m x n) ") |
| print("========================================================") |
| headline_pattern = '{:>15} {:>15} {:>10} {:>8} {:>8} {:>8} {:>13} {:>13} {:>8}' |
| headline = headline_pattern.format('lhs_density(%)', |
| 'rhs_density(%)', |
| 'context', |
| 'm', 'k', 'n', |
| 't_sparse(ms)', |
| 't_dense(ms)', |
| 'speedup') |
| print(headline) |
| |
| def run_benchmark(ctx=None, lhs="csr", lhs_trans=False, rhs="dns", fw="mxnet", rhs_density=1, |
| distribution="uniform"): |
| |
| if rhs_density > 1 or rhs_density < 0: |
| raise ValueError("rhs_density has to be between 0 and 1") |
| |
| print_benchmark_info(lhs, rhs, lhs_trans, fw) |
| |
| if rhs == "csr": |
| lhs_stype = "default" |
| rhs_stype = "csr" |
| assert (lhs_stype == 'default'), "Only dot(default, csr) supported" |
| # Arrange dimensions according to use case. For below csr will have num_rows << num_cols |
| feature_dim_list = data_dict['batch_size'] |
| batch_size_list = data_dict['m'] |
| output_dim_list = data_dict['feature_dim'] |
| density_list = data_dict['density'] |
| default_output_index = data_dict['default_index']['feature_dim'] |
| default_density_index = data_dict['default_index']['density'] |
| default_feature_index = data_dict['default_index']['batch_size'] |
| default_batch_size_index = data_dict['default_index']['output_dim'] |
| num_repeat = data_dict['num_repeat'] |
| |
| else: |
| lhs_stype = "csr" |
| rhs_stype = "row_sparse" if rhs == "rsp" else "default" |
| |
| feature_dim_list = data_dict['feature_dim'] |
| output_dim_list = data_dict['m'] |
| batch_size_list = data_dict['batch_size'] |
| density_list = data_dict['density'] |
| |
| default_output_index = data_dict['default_index']['output_dim'] |
| default_batch_size_index = data_dict['default_index']['batch_size'] |
| default_feature_index = data_dict['default_index']['feature_dim'] |
| default_density_index = data_dict['default_index']['density'] |
| num_repeat = data_dict['num_repeat'] |
| |
| for output_dim in output_dim_list: |
| if lhs_trans: |
| output_row_dim = batch_size_list[default_batch_size_index] |
| else: |
| output_row_dim = feature_dim_list[default_feature_index] |
| bench_dot((batch_size_list[default_batch_size_index], |
| feature_dim_list[default_feature_index]), |
| (output_row_dim, output_dim), |
| lhs_stype, rhs_stype, |
| density_list[default_density_index], rhs_density, |
| lhs_trans, ctx, num_repeat=num_repeat, |
| fw=fw, distribution=distribution) |
| |
| for feature_dim in feature_dim_list: |
| if lhs_trans: |
| output_row_dim = batch_size_list[default_batch_size_index] |
| else: |
| output_row_dim = feature_dim |
| bench_dot((batch_size_list[default_batch_size_index], feature_dim), |
| (output_row_dim, output_dim_list[default_output_index]), |
| lhs_stype, rhs_stype, density_list[default_density_index], rhs_density, |
| lhs_trans, ctx, num_repeat=num_repeat, fw=fw, distribution=distribution) |
| |
| for batch_size in batch_size_list: |
| if lhs_trans: |
| output_row_dim = batch_size |
| else: |
| output_row_dim = feature_dim_list[default_feature_index] |
| bench_dot((batch_size, feature_dim_list[default_feature_index]), |
| (output_row_dim, |
| output_dim_list[default_output_index]), |
| lhs_stype, rhs_stype, density_list[default_density_index], |
| rhs_density, lhs_trans, ctx, num_repeat=num_repeat, |
| fw=fw, distribution=distribution) |
| |
| for density in density_list: |
| if lhs_trans: |
| output_row_dim = batch_size_list[default_batch_size_index] |
| else: |
| output_row_dim = feature_dim_list[default_feature_index] |
| bench_dot((batch_size_list[default_batch_size_index], |
| feature_dim_list[default_feature_index]), |
| (output_row_dim, |
| output_dim_list[default_output_index]), |
| lhs_stype, rhs_stype, density, density, lhs_trans, ctx, |
| num_repeat=num_repeat, fw=fw, distribution=distribution) |
| |
| check_call(_LIB.MXSetNumOMPThreads(ctypes.c_int(ARGS.num_omp_threads))) |
| context = mx.gpu() if ARGS.gpu else mx.cpu() |
| # TODO(anirudh): make the data dicts to config which can be passed at runtime |
| distributions = ["uniform", "powerlaw"] |
| for distribution in distributions: |
| run_benchmark(context, lhs="csr", |
| rhs="default", lhs_trans=False, |
| fw="mxnet", rhs_density=1, |
| distribution=distribution) |
| run_benchmark(context, lhs="csr", |
| rhs="default", lhs_trans=True, |
| fw="mxnet", rhs_density=1, |
| distribution=distribution) |
| run_benchmark(context, lhs="csr", |
| rhs="rsp", lhs_trans=False, |
| fw="mxnet", rhs_density=0.05, |
| distribution=distribution) |
| run_benchmark(context, lhs="default", |
| rhs="csr", lhs_trans=False, |
| fw="mxnet", rhs_density=0.001, |
| distribution=distribution) |
| if not ARGS.gpu: |
| run_benchmark(context, lhs="csr", |
| rhs="default", lhs_trans=False, |
| fw="scipy", rhs_density=1, |
| distribution=distribution) |
| run_benchmark(context, lhs="csr", |
| rhs="default", lhs_trans=True, |
| fw="scipy", rhs_density=1, |
| distribution=distribution) |
| |
| |
| if __name__ == "__main__": |
| begin_time = time.time() |
| test_dot_real(KDDA) |
| test_dot_real(AVAZU) |
| test_dot_real(CRITEO) |
| test_dot_synthetic(SYNTHETIC1) |
| test_dot_synthetic(SYNTHETIC2) |
| total_time = time.time() - begin_time |
| print(f"total time is {total_time}") |