0%

动手实现胶囊网络

前言

2017年,Hinton团队提出胶囊网络,首次将标量型网络扩展到矢量,并运用动态路由方式来进行胶囊之间的传递计算。提出的矢量神经元被认为具有保留物体姿态的能力,为神经网络带来了等变性(equivariance)。本着learning by doing的态度,笔者尝试对这一篇论文进行复现。本文不会对其原论文原理和思想有太多解释。在保证工程性和完整性的同时,尽可能记录自己在实现过程中的总结和反思。Anyway,实现过程也许会有一些bug,欢迎交流和提交issue~

Author: QiangZiBro.
Github: https://github/QiangZiBro.

本文notebook地址https://github.com/QiangZiBro/capsule_networks_play_ground/blob/main/capsule_networks/notebooks/caps_v2017.ipynb.

基础模块实现

引入必备的库

本文依赖第三方框架pytorch,实验使用1.2,基本来说各个版本都可以用。

1
2
3
4
5
6
7
8
9
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import matplotlib.pyplot as plt
import numpy as np
from torchvision import transforms
from torchvision.utils import save_image

超参数定义

1
2
3
4
5
6
7
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Hyper-parameters
num_epochs = 5
batch_size = 64
learning_rate = 1e-3

数据加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# MNIST dataset
root="/home/qiangzibro/2TB1/"
train_dataset = torchvision.datasets.MNIST(root=root,
train=True,
transform=transforms.ToTensor(),
download=True)

test_dataset = torchvision.datasets.MNIST(root=root,
train=False,
transform=transforms.ToTensor())

# Data loader
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=batch_size,
shuffle=True)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
batch_size=batch_size,
shuffle=False)

对MNIST图片可视化

1
2
3
4
5
6
7
def show_img(data_tuple):
img, label = data_tuple
img = img.squeeze()
plt.imshow(img)
print(f"label is {label}")

show_img(test_dataset[1])

img

胶囊网络的压缩函数与动态路由算法

胶囊网络由三层组成:卷积层,初级胶囊层,卷积胶囊层,其中卷积胶囊层使用了动态路由算法。我们来一一实现这些功能。 首先实现两个helper函数,压缩函数与动态路由算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def squash(x):
"""
Args:
x: (B, 10, 16)
Return:
squashed x (B, 10, 16)
"""
L = torch.norm(x, dim=2, keepdim=True) #(B, 10, 1)
L_square = L**2 #(B, 10, 1)
c = (L_square)/(1+L_square)/L #(B, 10, 1)

s = c*x #(B, 10, 16)
s[s==np.nan] = 0
return s

x = torch.rand(1,10,16)
squash(x).shape

输出

1
torch.Size([1, 10, 16])

动态路由算法相当于一个聚类过程,将底层的若干个向量以迭代的路由方法,选取若干具有代表性的胶囊。

  • 输入 (B, 10, 32x6x6, 16)
  • 输出 (B, 10, 16)

要注意的细节

  • b的维度是多少? 注意b是有batchsize的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def dynamic_routing(x, iterations=3):
"""
Args:
x: u_hat, (B, 10, 32x6x6, 16, 1)

Return:
v: next layer output (B, 10, 16)
"""
N = 32*6*6 # previous layer
N1 = 10 # next layer
B = x.shape[0]

b = torch.zeros(B,N1,N,1, 1).to(x.device)
for _ in range(iterations):
# probability of each vector to be distributed is 1
# (B,10,32*6*6,1, 1)
c = F.softmax(b, dim=1)

# (B,10,16)
s = torch.sum(x.matmul(c), dim=2).squeeze(-1)

# (B,10,16)
v = squash(s)

# (B,10,32*6*6,1,1)
b = b + v[:,:,None,None,:].matmul(x)


return v

x = torch.rand(1,10,32*6*6,16, 1)
dynamic_routing(x).shape

初级胶囊层

在实现初级胶囊层时,我们要了解一些细节,比如

  • 怎样表示一个胶囊? 每个像素点上一个1x8的向量
  • 怎样计算出初级胶囊?使用32组卷积核,每组输出8个通道(256组卷积核,通道拆分32份也没有问题)
  • 怎样在程序里存这些胶囊?我的做法是将所有的胶囊放在一列,换句话说,放在一个(B, 32*6*6, 8)的矩阵里面
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class PrimaryCapsuleLayer(nn.Module):
def __init__(self):
super().__init__()
self.primary_capsule_layer = \
nn.ModuleList([nn.Conv2d(256,8,9, stride=2) for _ in range(32)])

def forward(self, x):
""" Produce primary capsules

Args:
x: features with (B, 256, 20, 20)

Return:
vectors (B, 32*6*6, 8)
"""
capsules = [conv(x) for conv in self.primary_capsule_layer] # [[B, 8, 6, 6] * 32]
capsules_reshaped = [c.reshape(-1,8,6*6) for c in capsules] # [[B, 8, 36] * 32]
s = torch.cat(capsules_reshaped, dim=-1).permute(0, 2, 1) # (B, 32*6*6, 8)
return squash(s)



# 测试单元
def test_for_primary_capsule_layer():
input = torch.rand(1,256,20,20)
layer = PrimaryCapsuleLayer()
assert layer(input).shape == (1,32*6*6, 8)
test_for_primary_capsule_layer()

卷积胶囊层

在实现卷积胶囊层时,我们要了解一些细节有

  • 高维矩阵相乘怎么进行计算? 比如对(B, 32x6x6, 8)大小的向量矩阵,通过权重矩阵,得到输出(B,10, 32x6x6,16)的矩阵,通过下面高维矩阵相乘方式推出𝑜=𝑊𝑥o=Wx

    • W (1, 10, 32x6x6, 16, 8)
    • x (B, 1, 32x6x6, 8, 1)
    • o (B, 10, 32x6x6, 16, 1)

    上面两个矩阵相乘看起来有点复杂,怎么思考呢?多维矩阵的相乘可以看作最后两个维度作矩阵乘法,两个维度我们肯定很清楚,维度(a,b)和(b,c)两个矩阵相乘就是(a,c)。其他维度要么进行广播机制,要么不变。所以,从上面的维度,可以知道,前两个维度实行广播机制,第三个不变,最后两个维度的乘法也就是(16,8)和(8,1)的向量相乘,完成了变换。因此,就有了下面的例子

    1
    2
    3
    4
    5
    6
    7
    B = 1
    x = torch.rand(B,32*6*6,8)
    x = x[:,None,...,None]

    w = torch.rand(1,10,32*6*6,16,8)
    w.matmul(x).shape
    # torch.Size([1, 10, 1152, 16, 1])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class CapsLayer(nn.Module):
def __init__(self,nclasses=10, out_channels_dim=16):
super().__init__()
self.W = nn.Parameter(1e-3 * torch.randn(1,nclasses,32*6*6,out_channels_dim,8))

def forward(self, x):
"""Predict and routing

Args:
x: Input vectors, (B, 32*6*6, 8)

Return:
class capsules, (B, 10, 16)
"""
x = x[:,None,...,None]
u_hat = self.W.matmul(x) # (B, 10, 32x6x6, 16, 1)
assert u_hat.shape[1:] == (10, 32*6*6, 16, 1)
class_capsules = dynamic_routing(u_hat)
return class_capsules


def test_for_caps_layer():
input = torch.rand(1,32*6*6,8)
layer = CapsLayer()
assert layer(input).shape == (1,10,16)
test_for_caps_layer()

胶囊网络

实现了前面必须的几层,相信胶囊网络也是非常好搭了。我们定义的胶囊网络最后输出为10个分类向量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class CapsNet(nn.Module):
def __init__(self):
super().__init__()
self.conv_layer = nn.Conv2d(1,256,9)
self.primary_layer = PrimaryCapsuleLayer()
self.caps_layer = CapsLayer(nclasses=10, out_channels_dim=16)

def forward(self, x):
"""
Args:
x : Input img, (B, 1, 28, 28)

Return:
the class capsules, each capsule is a 16 dimension vector
"""
x = self.conv_layer(x) # (B, 256, 20, 20)
x = self.primary_layer(x) # (B, 32*6*6, 8)
x = self.caps_layer(x) # (B, 10, 16)
return x

def test_for_caps_net():
input = torch.rand(1,1,28,28)
model = CapsNet()
assert model(input).shape == (1,10,16)

test_for_caps_net()

胶囊网络的分类实验

margin loss

现在我们来到第一个实验,训练一个分类网络。首先了解原文提到的损失

$$
L_{k}=T_{k} \max \left(0, m^{+}-\left|\mathbf{v}{k}\right|\right)^{2}+\lambda\left(1-T{k}\right) \max \left(0,\left|\mathbf{v}_{k}\right|-m^{-}\right)^{2}
$$

这其中$\left|\mathbf{v}_{k}\right|$就是胶囊网络最后输出来的分类胶囊,k表示第k个。这个式子可以理解为一个分段函数

$$
\begin{equation}
L_{k}=\left{
\begin{aligned}
\max \left(0, m^{+}-\left|\mathbf{v}{k}\right|\right)^{2}& & {第k个胶囊正确分类} \
\lambda \max \left(0,\left|\mathbf{v}
{k}\right|-m^{-}\right)^{2} & & {第k个胶囊错误分类}
\end{aligned}
\right.
\end{equation}
$$
其中$m^{+}=0.9,m^{-}=0.1, \lambda=0.5$。也就是说,分类胶囊概率(即模长)为0.9以上且分类正确的,以及概率为0.1且错误的,我们都认为是”好“的,因此训练时我们采样梯度下降到方式往这个方向靠拢。

一些细节

  • 使用onehot向量T和预测胶囊模长相乘来选取正确预测的值,(1-T)和预测胶囊模长相乘来选取错误预测的值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def margin_loss(y, y_hat):
"""
Args:
y: ground truth labels (B)
y_hat: class capsules with (B, 10, 16)

Return
the margin loss
"""
_lambda = 0.5
m_plus = 0.9
m_minus = 0.1
nclasses = 10

y_norm = y_hat.norm(dim=-1) # (B,10)
T = F.one_hot(y, nclasses) # use it as index for right class (B,10)
T = T.float()

right = torch.max(torch.zeros_like(y_norm), m_plus-y_norm*T)
right = right**2
wrong = torch.max(torch.zeros_like(y_norm), y_norm*(1-T)-m_minus)
wrong = _lambda*wrong**2
return torch.sum(right+wrong)

def test_margin_loss():
y = torch.randint(0,10,(20,))
y_hat = torch.rand(20,10,16)
print(margin_loss(y,y_hat).item())

训练函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import time

def train(net, epochs, dataloader,reconstruction=False, report=30):
"""
global variable:
- train_loader
"""
net.train()

optimizer = torch.optim.Adam(net.parameters())

train_history = []
for epoch in range(epochs):
t0 = time.time()

epoch_loss = torch.tensor(0.)
for batch, (X_batch, y_batch) in enumerate(dataloader):
X_batch, y_batch = X_batch.to(device), y_batch.to(device)

if reconstruction:
y_hat_param = net(X_batch, y_batch)
loss = total_loss(X_batch, y_batch, y_hat_param)
else:
y_hat = net(X_batch)
loss = margin_loss(y_batch, y_hat)
epoch_loss += loss

optimizer.zero_grad()
loss.backward()
optimizer.step()

train_history.append(epoch_loss.item())
print(f"Epoch {epoch+1} loss is {epoch_loss}")
return train_history

实验过程

训练

1
2
3
4
5
# Start training
torch.autograd.set_detect_anomaly(True)

encoder = CapsNet().to(device)
train(encoder, 5, train_loader, margin_loss, report=460),

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Test the model
def evaluate(model, test_loader):
model.eval() # eval mode (batchnorm uses moving mean/variance instead of mini-batch mean/variance)

with torch.no_grad():
correct = 0
total = 0
for images, labels in test_loader:
images = images.to(device)
labels = labels.to(device)
outputs = model(images)
outputs = outputs.norm(dim=-1)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
return correct / total

acc = evaluate(encoder, test_loader)
print('Test Accuracy of the model on the 10000 test images: {:.2f}%'.format(100 * acc))
# Test Accuracy of the model on the 10000 test images: 98.62%

总结

有点惊喜,训练了五个epoch,效果达到98.62%,但是每轮训练比较相对参考资料[1]花的时间更多,并且精度没有他们高,这应该与一些参数和计算细节有关,有时间研究下。

重建实验

解码器实现

除了做简单的分类外,原作者对预测正确的向量进行解码,解码到图片空间。这个过程现有的实现都是用MLP来实现的。也就是说,对一个预测正确的向量(1x16),使用(16 --> 28*28)的解码网络即可,只是中间网络层数可能需要多一些。原文用了三个全连接层,[1] [2]给隐层size设定都是512,1024。

这里编程的细节有

1
2
3
4
5
6
7
8
9
10
11
def set_parameter_requires_grad(model, feature_extracting):
if feature_extracting:
for param in model.parameters():
param.requires_grad = False

set_parameter_requires_grad(encoder, True)
# 对优化器
optimizer = optim.Adam(
filter(lambda p: p.requires_grad, net.parameters()),
lr=0.1
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class MLPDecoder(nn.Module):
"""Decode the input predicted vectors tor origin images

Usage:
decoder = MLPDecoder([512, 1024], 16, (28,28))
reconstructed_x = decoder(selected_capsules)
"""
def __init__(self, hidden, in_channels, out_shape):
super().__init__()
self.out_shape = out_shape
h,w = out_shape
out_channels = w*h
self.mlp = nn.Sequential(*[
nn.Linear(_in, _out)
for _in,_out in zip([in_channels]+hidden, hidden+[out_channels])
])


def forward(self, x):
"""
Args:
x: (B,16)

Return:
reconstructed images with (B,1,28,28)
"""
B = x.shape[0]
x = self.mlp(x)
x = x.reshape(B, 1, *self.out_shape)
return x

def test_decoder():
decoder = MLPDecoder([512, 1024], 16, (28,28))
x = torch.rand(5,16)
assert decoder(x).shape == (5,1,28,28)
test_decoder()

自编码网络实现

对于自编码器的设计,我们将编解码设得灵活一点,通过构造函数传入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class CapsAE(nn.Module):
def __init__(self, encoder, decoder):
super().__init__()
self.encoder = encoder
self.decoder = decoder

def forward(self, x, y):
"""
Args:
x: (B, C, H, W) (B,1,28,28)
y: (B)

Return:
reconstructed images with (B,1,28,28)
"""
B = x.shape[0]

class_capsules = self.encoder(x) # (B, 10, 16)
selected_capsules = class_capsules[torch.arange(B), y] # (B, 16)
assert selected_capsules.shape == (B, 16)

reconstructed_x = self.decoder(selected_capsules)


return class_capsules,reconstructed_x

损失

加上重建损失的总损失
$$
L_{total} = L_{margin} + 0.0005 \times L_{reconstruction}
$$
其中
$$
L_{reconstruction} = ||x - \hat{x} ||^2
$$

1
2
3
4
5
6
7
8
9
10
11
def total_loss(x, y, y_hat_params, c=0.0005):
""" marigin loss + 0.00005reconstruction loss

Args:
x: (B,C,H,W)
y: (B,)
y_hat_params: a tuple of (class_capsules, reconstructed_x)
"""
class_capsules, reconstructed_x = y_hat_params

return margin_loss(y, class_capsules)+c*F.mse_loss(x,reconstructed_x)

实验过程

重新训练自编码器

1
2
3
4
5
6
encoder = CapsNet()
decoder = MLPDecoder([512, 1024], 16, (28,28))
autoencoder = CapsAE(
encoder = encoder,
decoder = decoder
).to(device)
1
train_loss = train(autoencoder, 5, train_loader, reconstruction=True, report=460)

对自编码器,我们从两个方面来评估:

  • 编码器的分类能力
  • 解码器解码效果

编码器的分类能力

1
2
3
acc = evaluate(encoder, test_loader)
print('Test accuracy of the model on the 10000 test images of encoder in AE: {:.2f}%'.format(100 * acc))
Test Accuracy of the model on the 10000 test images of encoder in AE: 98.80%

解码效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def evaluate_ae(model, test_loader, once=False):
""" get all reconstruction results
Args:
model: autoencoder
test_loader

Return:
origin images and reconstructed images (N,1,28,28)
"""
model.eval() # eval mode (batchnorm uses moving mean/variance instead of mini-batch mean/variance)

with torch.no_grad():
X_ = [] # reconstructed images
X = [] # origin images
for images, labels in test_loader:
images = images.to(device)
labels = labels.to(device)
_, x_ = model(images, labels)
X_.append(x_)
X.append(images)

if once:
break

return torch.cat(X, dim=0),torch.cat(X_, dim=0)

计算重建

1
2
X,X_ =  evaluate_ae(autoencoder, test_loader, once=True)
X,X_ = X.cpu(),X_.cpu()

可视化展示

1
2
3
4
5
6
7
8
9
fg, axs = plt.subplots(nrows=2, ncols=10, gridspec_kw={'hspace': 0, 'wspace': 0.1}, figsize=(13,5))
fg.suptitle('Groundtruths - Reconstructions')

for i in range(10):
axs[0, i].imshow(X[i].squeeze(), cmap='binary')
axs[1, i].imshow(X_[i].squeeze(), cmap='binary')
axs[0, i].axis('off')
axs[1, i].axis('off')
plt.show()

img

总结

  • 重建效果 不是特别好,和[1]相比有些糊,但是基本轮廓是重建出来了

探究胶囊每一位的作用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def evaluate_class_capsule(model, x, y, delta=1, dim=0, l=5):
""" Simply adding class capsules digit from -7 to 7, to
see what happens about reconstruction.

Args:
model: autoencoder
x: input image (B,1,28,28)
y
dim: which dim you want to research


Return
[origin image,reconstructed_xs] [(B,1,28,28), ... ,(B,1,28,28)]
"""
model.eval()
B = x.shape[0]
encoder, decoder = model.encoder, model.decoder

with torch.no_grad():
# Auto encoder, but adding class capsules digit from -7 to 7
class_capsules = encoder(x) # (B, 10, 16)
selected_capsules = class_capsules[torch.arange(B), y] # (B, 16)
assert selected_capsules.shape == (B, 16)

index = F.one_hot(torch.ones(1, dtype=torch.long)*dim, num_classes=16)
index = index.float().to(device)

shifted_capsules = [selected_capsules+i*delta*index for i in range(-l,l+1)]
reconstructed_xs = [decoder(i) for i in shifted_capsules]

reconstructed_xs.insert(0, x)
return reconstructed_xs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def research_for_class_capsule_for(i=0,  delta=0.5):
""" Test class capsule dim usage for i th test image

"""
result = []
for X,y in test_loader:
X,y = X[i][None,...].to(device),y[i][None,...].to(device)
for dim in range(16):
result.append(
evaluate_class_capsule(autoencoder, X, y, delta=delta, dim=dim)
)
break

fg, axs = plt.subplots(nrows=16, ncols=len(result[0]), gridspec_kw={'hspace': 0, 'wspace': 0.1}, figsize=(13,13))
fg.suptitle(f'research for each dim in capsule, delta={delta}')

for i in range(16):
for j in range(len(result[0])):
axs[i, j].imshow(result[i][j].squeeze().cpu(), cmap='binary')
axs[i, j].axis('off')
plt.show()

部分效果

1
2
for i in range(10):
research_for_class_capsule_for(i, 0.05)
1
2
for i in range(10):
research_for_class_capsule_for(i, 0.1)

image-20201109183833560

image-20201109183855380

image-20201109185258096

总结

使用控制变量的方式观察重建出来的效果,并没有像原论文那样胶囊里每一位都明确控制一个属性,反而没太明显的变化。也许实现中仍有一些问题,欢迎有研究的朋友指正。

展望和总结

总结

写了两天,写完这篇对胶囊网络的细节更加了然于心,而对胶囊是否work更加持批判的态度。

2017年胶囊网络的提出之后引出了一个新的研究方向,随即胶囊网络被用在多个领域当中,其中2017版的后续工作最多。将标量网络扩展到矢量网络在直觉上来说是一个很有潜力的方式,但实际实验上效果上并不是很好。Hinton也在最近AAAI会议的演讲称:”之前胶囊网络都是错的,忘记它们吧“,这也让矢量型网络的研究变得争议不断——矢量神经元到底能发挥多少作用,比标量神经网络优越多少,这是一个待考察的问题。

在初步的三个小实验仍有问题:

  • 在分类上,训练了五个epoch,效果达到98.62%,但是每轮训练时间相对参考资料[1]慢6倍,并且精度没有他们高,这应该与一些参数和计算细节有关,有时间研究下。
  • 在重建上,MLP解码器效果不太好,可能是损失的占比有关?
  • 在第三个实验,使用控制变量的方式观察重建出来的效果,并没有像原论文那样的效果——胶囊里每一位都明确控制一个属性。反而没太明显的变化。也许实现中仍有一些问题,欢迎有研究的朋友指正。

展望

而在具体设计中,到底怎样定义一个矢量神经元,怎样推理矢量神经元之间的关系成为了难题。后续将继续考察矢量神经元在其中的作用,分别复现2018,2019年的堆栈式胶囊编码网络(立个flag),并将算法实现的工程型进一步加强(写个小library)。

参考资料

[1] https://github.com/gchochla/capsules-utils
[2] https://github.com/gram-ai/capsule-networks

欢迎关注我的其它发布渠道