blob: 259ad0df51913240a7878def4dcc1dde56237e0e [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
package AI::MXNet::KVStore;
use strict;
use warnings;
use AI::MXNet::NS;
use AI::MXNet::Base;
use AI::MXNet::NDArray;
use AI::MXNet::Optimizer;
use MIME::Base64;
use Storable;
use Mouse;
use AI::MXNet::Function::Parameters;
=head1 NAME
AI::MXNet::KVStore - Key value store interface of MXNet.
=head1 DESCRIPTION
Key value store interface of MXNet for parameter synchronization, over multiple devices.
=cut
has 'handle' => (is => 'ro', isa => 'KVStoreHandle', required => 1);
has '_updater' => (is => 'rw', isa => 'AI::MXNet::Updater');
sub DEMOLISH
{
check_call(AI::MXNetCAPI::KVStoreFree(shift->handle));
}
=head2 init
Initialize a single or a sequence of key-value pairs into the store.
For each key, one must init it before push and pull.
Only worker 0's (rank == 0) data are used.
This function returns after data have been initialized successfully
Parameters
----------
$key : Str|ArrayRef[Str]
The keys.
$value : AI::MXNet::NDArray|ArrayRef[AI::MXNet::NDArray]|ArrayRef[ArrayRef[AI::MXNet::NDArray]]
The values.
Examples
--------
>>> # init a single key-value pair
>>> $shape = [2,3]
>>> $kv = mx->kv->create('local')
>>> $kv->init(3, mx->nd->ones($shape)*2)
>>> $a = mx->nd->zeros($shape)
>>> $kv->pull(3, out=>$a)
>>> print $a->aspdl
[[ 2 2 2]
[ 2 2 2]]
>>> # init a list of key-value pairs
>>> $keys = [5, 7, 9]
>>> $kv->init(keys, [map { mx->nd->ones($shape) } 0..@$keys-1])
=cut
method init(
Str|ArrayRef[Str] $key,
AI::MXNet::NDArray|ArrayRef[AI::MXNet::NDArray]|ArrayRef[ArrayRef[AI::MXNet::NDArray]] $value
)
{
my ($keys, $vals) = _key_value($key, $value);
check_call(
AI::MXNetCAPI::KVStoreInitEx(
$self->handle, scalar(@{ $keys }), $keys, $vals
)
);
}
=head2 push
Push a single or a sequence of key-value pairs into the store.
Data consistency:
1. this function returns after adding an operator to the engine.
2. push is always called after all previous push and pull on the same
key are finished.
3. there is no synchronization between workers. One can use _barrier()
to sync all workers.
Parameters
----------
$key : Str|ArrayRef[Str]
$value : AI::MXNet::NDArray|ArrayRef[AI::MXNet::NDArray]|ArrayRef[ArrayRef[AI::MXNet::NDArray]]
:$priority=0 : Int, optional
The priority of the push operation.
The higher the priority, the faster this action is likely
to be executed before other push actions.
Examples
--------
>>> # push a single key-value pair
>>> $kv->push(3, mx->nd->ones($shape)*8)
>>> $kv->pull(3, out=>$a) # pull out the value
>>> print $a->aspdl()
[[ 8. 8. 8.]
[ 8. 8. 8.]]
>>> # aggregate the value and the push
>>> $gpus = [map { mx->gpu($_) } 0..3]
>>> $b = [map { mx->nd->ones($shape, ctx => $_) } @$gpus]
>>> $kv->push(3, $b)
>>> $kv->pull(3, out=>$a)
>>> print $a->aspdl
[[ 4. 4. 4.]
[ 4. 4. 4.]]
>>> # push a list of keys.
>>> # single device
>>> $kv->push($keys, [map { mx->nd->ones($shape) } 0..@$keys-1)
>>> $b = [map { mx->nd->zeros(shape) } 0..@$keys-1]
>>> $kv->pull($keys, out=>$b)
>>> print $b->[1]->aspdl
[[ 1. 1. 1.]
[ 1. 1. 1.]]
>>> # multiple devices:
>>> $b = [map { [map { mx->nd->ones($shape, ctx => $_) } @$gpus] } @$keys-1]
>>> $kv->push($keys, $b)
>>> $kv->pull($keys, out=>$b)
>>> print $b->[1][1]->aspdl()
[[ 4. 4. 4.]
[ 4. 4. 4.]]
=cut
method push(
Str|ArrayRef[Str] $key,
AI::MXNet::NDArray|ArrayRef[AI::MXNet::NDArray]|ArrayRef[ArrayRef[AI::MXNet::NDArray]] $value,
Int :$priority=0
)
{
my ($keys, $vals) = _key_value($key, $value);
check_call(
AI::MXNetCAPI::KVStorePushEx(
$self->handle, scalar(@{ $keys }), $keys, $vals, $priority
)
);
}
=head2 pull
Pull a single value or a sequence of values from the store.
Data consistency:
1. this function returns after adding an operator to the engine. But any
further read on out will be blocked until it is finished.
2. pull is always called after all previous push and pull on the same
key are finished.
3. It pulls the newest value from the store.
Parameters
----------
$key : Str|ArrayRef[Str]
Keys
:$out: AI::MXNet::NDArray|ArrayRef[AI::MXNet::NDArray]|ArrayRef[ArrayRef[AI::MXNet::NDArray]]
According values
:$priority=0 : Int, optional
The priority of the push operation.
The higher the priority, the faster this action is likely
to be executed before other push actions.
Examples
--------
>>> # pull a single key-value pair
>>> $a = mx->nd->zeros($shape)
>>> $kv->pull(3, out=>$a)
>>> print $a->aspdl
[[ 2. 2. 2.]
[ 2. 2. 2.]]
>>> # pull into multiple devices
>>> $b = [map { mx->nd->ones($shape, $_) } @$gpus]
>>> $kv->pull(3, out=>$b)
>>> print $b->[1]->aspdl()
[[ 2. 2. 2.]
[ 2. 2. 2.]]
>>> # pull a list of key-value pairs.
>>> # On single device
>>> $keys = [5, 7, 9]
>>> $b = [map { mx->nd->zeros($shape) } 0..@$keys-1]
>>> $kv->pull($keys, out=>$b)
>>> print $b->[1]->aspdl()
[[ 2. 2. 2.]
[ 2. 2. 2.]]
>>> # On multiple devices
>>> $b = [map { [map { mx->nd->ones($shape, ctx => $_) } @$gpus ] } 0..@$keys-1]
>>> $kv->pull($keys, out=>$b)
>>> print $b->[1][1]->aspdl()
[[ 2. 2. 2.]
[ 2. 2. 2.]]
=cut
method pull(
Str|ArrayRef[Str] $key,
AI::MXNet::NDArray|ArrayRef[AI::MXNet::NDArray]|ArrayRef[ArrayRef[AI::MXNet::NDArray]] :$out,
Int :$priority=0,
Bool :$ignore_sparse=1
)
{
my ($keys, $vals) = _key_value($key, $out);
check_call(
AI::MXNetCAPI::KVStorePullWithSparseEx(
$self->handle, scalar(@{ $keys }), $keys, $vals, $priority, $ignore_sparse
)
);
}
=head2 row_sparse_pull
Pulls a single AI::MXNet::NDArray::RowSparse value or an array ref of AI::MXNet::NDArray::RowSparse values
from the store with specified row_ids. When there is only one row_id, KVStoreRowSparsePull
is invoked just once and the result is broadcast to all the rest of outputs.
`row_sparse_pull` is executed asynchronously after all previous
`pull`/`row_sparse_pull` calls and the last `push` call for the
same input key(s) are finished.
The returned values are guaranteed to be the latest values in the store.
Parameters
----------
$key : Str|ArrayRef[Str] $key
Keys.
:$out: AI::MXNet::NDArray::RowSparse|ArrayRef[AI::MXNet::NDArray::RowSparse]|ArrayRef[ArrayRef[AI::MXNet::NDArray::RowSparse]]
Values corresponding to the keys. The stype is expected to be row_sparse
:$priority=0 : Int, optional
The priority of the pull operation.
Higher priority pull operations are likely to be executed before
other pull actions.
:$row_ids : AI::MXNet::NDArray|ArrayRef[AI::MXNet::NDArray]|ArrayRef[ArrayRef[AI::MXNet::NDArray]]
The row_ids for which to pull for each value. Each row_id is an 1D NDArray
whose values don't have to be unique nor sorted.
Examples
--------
>>> $shape = [3, 3]
>>> $kv->init('3', mx->nd->ones($shape)->tostype('row_sparse'))
>>> $a = mx->nd->sparse->zeros('row_sparse', $shape)
>>> $row_ids = mx->nd->array([0, 2], dtype=>'int64')
>>> $kv->row_sparse_pull('3', out=>$a, row_ids=>$row_ids)
>>> print $a->aspdl
[[ 1. 1. 1.]
[ 0. 0. 0.]
[ 1. 1. 1.]]
>>> $duplicate_row_ids = mx->nd->array([2, 2], dtype=>'int64')
>>> $kv->row_sparse_pull('3', out=>$a, row_ids=>$duplicate_row_ids)
>>> print $a->aspdl
[[ 0. 0. 0.]
[ 0. 0. 0.]
[ 1. 1. 1.]]
>>> $unsorted_row_ids = mx->nd->array([1, 0], dtype=>'int64')
>>> $kv->row_sparse_pull('3', out=>$a, row_ids=>$unsorted_row_ids)
>>> print $a->aspdl
[[ 1. 1. 1.]
[ 1. 1. 1.]
[ 0. 0. 0.]]
=cut
method row_sparse_pull(
Str|ArrayRef[Str] $key,
AI::MXNet::NDArray::RowSparse|ArrayRef[AI::MXNet::NDArray::RowSparse]|ArrayRef[ArrayRef[AI::MXNet::NDArray::RowSparse]] :$out,
Int :$priority=0,
AI::MXNet::NDArray|ArrayRef[AI::MXNet::NDArray]|ArrayRef[ArrayRef[AI::MXNet::NDArray]] :$row_ids
)
{
if(blessed $row_ids)
{
$row_ids = [$row_ids];
}
my $first_out = $out;
# whether row_ids are the same
my $single_rowid = 0;
if(@$row_ids == 1 and ref $out eq 'ARRAY')
{
$single_rowid = 1;
$first_out = [$out->[0]];
}
my ($ckeys, $cvals) = _key_value($key, $first_out);
my (undef, $crow_ids) = _key_value($key, $row_ids);
assert(
(@$crow_ids == @$cvals),
"the number of row_ids doesn't match the number of values"
);
check_call(
AI::MXNetCAPI::KVStorePullRowSparseEx(
$self->handle, scalar(@$ckeys), $ckeys, $cvals, $crow_ids, $priority
)
);
# the result can be copied to other devices without invoking row_sparse_pull
# if the indices are the same
if($single_rowid)
{
for my $out_i (@{ $out } [1..@{ $out }-1])
{
$out->[0]->copyto($out_i);
}
}
}
=head2 set_gradient_compression
Specifies type of low-bit quantization for gradient compression \
and additional arguments depending on the type of compression being used.
2bit Gradient Compression takes a positive float `threshold`.
The technique works by thresholding values such that positive values in the
gradient above threshold will be set to threshold. Negative values whose absolute
values are higher than threshold, will be set to the negative of threshold.
Values whose absolute values are less than threshold will be set to 0.
By doing so, each value in the gradient is in one of three states. 2bits are
used to represent these states, and every 16 float values in the original
gradient can be represented using one float. This compressed representation
can reduce communication costs. The difference between these thresholded values and
original values is stored at the sender's end as residual and added to the
gradient in the next iteration.
When kvstore is 'local', gradient compression is used to reduce communication
between multiple devices (gpus). Gradient is quantized on each GPU which
computed the gradients, then sent to the GPU which merges the gradients. This
receiving GPU dequantizes the gradients and merges them. Note that this
increases memory usage on each GPU because of the residual array stored.
When kvstore is 'dist', gradient compression is used to reduce communication
from worker to sender. Gradient is quantized on each worker which
computed the gradients, then sent to the server which dequantizes
this data and merges the gradients from each worker. Note that this
increases CPU memory usage on each worker because of the residual array stored.
Only worker to server communication is compressed in this setting.
If each machine has multiple GPUs, currently this GPU to GPU or GPU to CPU communication
is not compressed. Server to worker communication (in the case of pull)
is also not compressed.
To use 2bit compression, we need to specify `type` as `2bit`.
Only specifying `type` would use default value for the threshold.
To completely specify the arguments for 2bit compression, we would need to pass
a dictionary which includes `threshold` like:
{'type': '2bit', 'threshold': 0.5}
Parameters
----------
$compression_params : HashRef[Str]
A dictionary specifying the type and parameters for gradient compression.
The key `type` in this dictionary is a
required string argument and specifies the type of gradient compression.
Currently `type` can be only `2bit`
Other keys in this dictionary are optional and specific to the type
of gradient compression.
=cut
method set_gradient_compression(HashRef[Str] $compression_params)
{
if($self->type =~ /(?:device|dist)/)
{
check_call(
AI::MXNetCAPI::KVStoreSetGradientCompression(
$self->handle,
scalar(keys %$compression_params),
$compression_params
)
);
}
else
{
confess('Gradient compression is not supported for this type of kvstore');
}
}
=head2 set_optimizer
Register an optimizer to the store
If there are multiple machines, this process (should be a worker node)
will pack this optimizer and send it to all servers. It returns after
this action is done.
Parameters
----------
$optimizer : AI::MXNet::Optimizer
the optimizer
=cut
method set_optimizer(AI::MXNet::Optimizer $optimizer)
{
my $is_worker = check_call(AI::MXNetCAPI::KVStoreIsWorkerNode());
if($self->type =~ /dist/ and $is_worker)
{
my $optim_str = MIME::Base64::encode_base64(Storable::freeze($optimizer), "");
$self->_send_command_to_servers(0, $optim_str);
}
else
{
$self->_updater(AI::MXNet::Optimizer->get_updater($optimizer));
$self->_set_updater($self->_updater);
}
}
=head2 type
Get the type of this kvstore
Returns
-------
$type : Str
the string type
=cut
method type()
{
return scalar(check_call(AI::MXNetCAPI::KVStoreGetType($self->handle)));
}
=head2 rank
Get the rank of this worker node
Returns
-------
$rank : Int
The rank of this node, which is in [0, get_num_workers())
=cut
method rank()
{
return scalar(check_call(AI::MXNetCAPI::KVStoreGetRank($self->handle)));
}
=head2 num_workers
Get the number of worker nodes
Returns
-------
$size : Int
The number of worker nodes
=cut
method num_workers()
{
return scalar(check_call(AI::MXNetCAPI::KVStoreGetGroupSize($self->handle)));
}
=head2 save_optimizer_states
Save optimizer (updater) state to file
Parameters
----------
$fname : Str
Path to output states file.
:$dump_optimizer=0 : Bool, default False
Whether to also save the optimizer itself. This would also save optimizer
information such as learning rate and weight decay schedules.
=cut
method save_optimizer_states(Str $fname, Bool :$dump_optimizer=0)
{
confess("Cannot save states for distributed training")
unless defined $self->_updater;
open(F, ">:raw", "$fname") or confess("can't open $fname for writing: $!");
print F $self->_updater->get_states($dump_optimizer);
close(F);
}
=head2 load_optimizer_states
Load optimizer (updater) state from file.
Parameters
----------
$fname : Str
Path to input states file.
=cut
method load_optimizer_states(Str $fname)
{
confess("Cannot save states for distributed training")
unless defined $self->_updater;
open(F, "<:raw", "$fname") or confess("can't open $fname for reading: $!");
my $data;
{ local($/) = undef; $data = <F>; }
close(F);
$self->_updater->set_states($data);
}
=head2 _set_updater
Set a push updater into the store.
This function only changes the local store. Use set_optimizer for
multi-machines.
Parameters
----------
$updater : Undater
the updater function
Examples
--------
>>> my $update = sub { my ($key, input, stored) = @_;
... print "update on key: $key\n";
... $stored += $input * 2; };
>>> $kv->_set_updater($update)
>>> $kv->pull(3, out=>$a)
>>> print $a->aspdl()
[[ 4. 4. 4.]
[ 4. 4. 4.]]
>>> $kv->push(3, mx->nd->ones($shape))
update on key: 3
>>> $kv->pull(3, out=>$a)
>>> print $a->aspdl()
[[ 6. 6. 6.]
[ 6. 6. 6.]]
=cut
method _set_updater(Updater $updater_func)
{
check_call(
AI::MXNetCAPI::KVStoreSetUpdater(
$self->handle,
sub {
my ($index, $input_handle, $storage_handle) = @_;
$updater_func->(
$index,
AI::MXNet::NDArray->_ndarray_cls($input_handle),
AI::MXNet::NDArray->_ndarray_cls($storage_handle)
);
}
)
);
}
=head2 _barrier
Global barrier between all worker nodes.
For example, assume there are n machines, we want to let machine 0 first
init the values, and then pull the inited value to all machines. Before
pulling, we can place a barrier to guarantee that the initialization is
finished.
=cut
method _barrier()
{
check_call(AI::MXNetCAPI::KVStoreBarrier($self->handle));
}
=head2 _send_command_to_servers
Send a command to all server nodes
Send a command to all server nodes, which will make each server node run
KVStoreServer.controller
This function returns after the command has been executed in all server
nodes.
Parameters
----------
$head : Int
the head of the command
$body : Str
the body of the command
=cut
method _send_command_to_servers(Int $head, Str $body)
{
check_call(
AI::MXNetCAPI::KVStoreSendCommmandToServers(
$self->handle,
$head,
$body
)
);
}
=head2 create
Create a new KVStore.
Parameters
----------
$name='local' : Str
The type of KVStore
- local works for multiple devices on a single machine (single process)
- dist works for multi-machines (multiple processes)
Returns
-------
kv : KVStore
The created AI::MXNet::KVStore
=cut
method create(Str $name='local')
{
my $handle = check_call(AI::MXNetCAPI::KVStoreCreate($name));
return __PACKAGE__->new(handle => $handle);
}
sub _key_value
{
my ($keys, $vals) = @_;
if(not ref $keys)
{
if(blessed $vals)
{
return ([$keys], [$vals->handle]);
}
else
{
for my $value (@{ $vals })
{
assert(blessed($value) and $value->isa('AI::MXNet::NDArray'));
return ([($keys)x@$vals], [map { $_->handle } @$vals]);
}
}
}
else
{
assert(not blessed($vals) and @$keys == @$vals);
my @c_keys;
my @c_vals;
for(zip($keys, $vals)) {
my ($key, $val) = @$_;
my ($c_key, $c_val) = _key_value($key, $val);
push @c_keys, @$c_key;
push @c_vals, @$c_val;
}
return (\@c_keys, \@c_vals);
}
}
1;