# Licensed to the Apache Software Foundation (ASF) under one | |
# or more contributor license agreements. See the NOTICE file | |
# distributed with this work for additional information | |
# regarding copyright ownership. The ASF licenses this file | |
# to you under the Apache License, Version 2.0 (the | |
# "License"); you may not use this file except in compliance | |
# with the License. You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, | |
# software distributed under the License is distributed on an | |
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
# KIND, either express or implied. See the License for the | |
# specific language governing permissions and limitations | |
# under the License. | |
"""Create layers of capsule net""" | |
import mxnet as mx | |
def squash(data, squash_axis, name=''): | |
epsilon = 1e-08 | |
s_squared_norm = mx.sym.sum(data=mx.sym.square(data, name='square_'+name), | |
axis=squash_axis, keepdims=True, name='s_squared_norm_'+name) | |
scale = s_squared_norm / (1 + s_squared_norm) / mx.sym.sqrt(data=(s_squared_norm+epsilon), | |
name='s_squared_norm_sqrt_'+name) | |
squashed_net = mx.sym.broadcast_mul(scale, data, name='squashed_net_'+name) | |
return squashed_net | |
def primary_caps(data, dim_vector, n_channels, kernel, strides, name=''): | |
out = mx.sym.Convolution(data=data, | |
num_filter=dim_vector * n_channels, | |
kernel=kernel, | |
stride=strides, | |
name=name | |
) | |
out = mx.sym.Reshape(data=out, shape=(0, -1, dim_vector)) | |
out = squash(out, squash_axis=2) | |
return out | |
class CapsuleLayer: | |
"""The capsule layer with dynamic routing. | |
[batch_size, input_num_capsule, input_dim_vector] => [batch_size, num_capsule, dim_vector] | |
""" | |
def __init__(self, num_capsule, dim_vector, batch_size, kernel_initializer, bias_initializer, num_routing=3): | |
self.num_capsule = num_capsule | |
self.dim_vector = dim_vector | |
self.batch_size = batch_size | |
self.num_routing = num_routing | |
self.kernel_initializer = kernel_initializer | |
self.bias_initializer = bias_initializer | |
def __call__(self, data): | |
_, out_shapes, __ = data.infer_shape(data=(self.batch_size, 1, 28, 28)) | |
_, input_num_capsule, input_dim_vector = out_shapes[0] | |
# build w and bias | |
# W : (input_num_capsule, num_capsule, input_dim_vector, dim_vector) | |
# bias : (batch_size, input_num_capsule, num_capsule ,1, 1) | |
w = mx.sym.Variable('Weight', | |
shape=(1, input_num_capsule, self.num_capsule, input_dim_vector, self.dim_vector), | |
init=self.kernel_initializer) | |
bias = mx.sym.Variable('Bias', | |
shape=(self.batch_size, input_num_capsule, self.num_capsule, 1, 1), | |
init=self.bias_initializer) | |
bias = mx.sym.BlockGrad(bias) | |
bias_ = bias | |
# input : (batch_size, input_num_capsule, input_dim_vector) | |
# inputs_expand : (batch_size, input_num_capsule, 1, input_dim_vector, 1) | |
inputs_expand = mx.sym.Reshape(data=data, shape=(0, 0, -4, -1, 1)) | |
inputs_expand = mx.sym.Reshape(data=inputs_expand, shape=(0, 0, -4, 1, -1, 0)) | |
# input_tiled (batch_size, input_num_capsule, num_capsule, input_dim_vector, 1) | |
inputs_tiled = mx.sym.tile(data=inputs_expand, reps=(1, 1, self.num_capsule, 1, 1)) | |
# w_tiled : [(1L, input_num_capsule, num_capsule, input_dim_vector, dim_vector)] | |
w_tiled = mx.sym.tile(w, reps=(self.batch_size, 1, 1, 1, 1)) | |
# inputs_hat : [(1L, input_num_capsule, num_capsule, 1, dim_vector)] | |
inputs_hat = mx.sym.linalg_gemm2(w_tiled, inputs_tiled, transpose_a=True) | |
inputs_hat = mx.sym.swapaxes(data=inputs_hat, dim1=3, dim2=4) | |
inputs_hat_stopped = inputs_hat | |
inputs_hat_stopped = mx.sym.BlockGrad(inputs_hat_stopped) | |
for i in range(0, self.num_routing): | |
c = mx.sym.softmax(bias_, axis=2, name='c' + str(i)) | |
if i == self.num_routing - 1: | |
outputs = squash( | |
mx.sym.sum(mx.sym.broadcast_mul(c, inputs_hat, name='broadcast_mul_' + str(i)), | |
axis=1, keepdims=True, | |
name='sum_' + str(i)), name='output_' + str(i), squash_axis=4) | |
else: | |
outputs = squash( | |
mx.sym.sum(mx.sym.broadcast_mul(c, inputs_hat_stopped, name='broadcast_mul_' + str(i)), | |
axis=1, keepdims=True, | |
name='sum_' + str(i)), name='output_' + str(i), squash_axis=4) | |
bias_ = bias_ + mx.sym.sum(mx.sym.broadcast_mul(c, inputs_hat_stopped, | |
name='bias_broadcast_mul' + str(i)), | |
axis=4, | |
keepdims=True, name='bias_' + str(i)) | |
outputs = mx.sym.Reshape(data=outputs, shape=(-1, self.num_capsule, self.dim_vector)) | |
return outputs |