# Copyright 2022-present, Lorenzo Bonicelli, Pietro Buzzega, Matteo Boschini, Angelo Porrello, Simone Calderara.
# All rights reserved.
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
import torch
from torch.autograd import Variable
from models.utils.continual_model import ContinualModel
from utils import binary_to_boolean_type
from utils.args import add_rehearsal_args, ArgumentParser
from utils.buffer import Buffer
[docs]
class CustomLinear(torch.nn.Module):
def __init__(self, indim, outdim, weight=None):
super(CustomLinear, self).__init__()
self.L = torch.nn.Linear(indim, outdim, bias=False)
if weight is not None:
self.L.weight.data = Variable(weight)
self.scale_factor = 10
[docs]
def forward(self, x: torch.Tensor):
x_norm = torch.norm(x, p=2, dim=1).unsqueeze(1).expand_as(x)
x_normalized = x.div(x_norm + 0.00001)
L_norm = torch.norm(self.L.weight, p=2, dim=1).unsqueeze(1).expand_as(self.L.weight.data)
cos_dist = torch.mm(x_normalized, self.L.weight.div(L_norm + 0.00001).transpose(0, 1))
scores = self.scale_factor * (cos_dist)
return scores
[docs]
class ErACE(ContinualModel):
"""Continual learning via Experience Replay with asymmetric cross-entropy."""
NAME = 'er_ace'
COMPATIBILITY = ['class-il', 'task-il']
[docs]
@staticmethod
def get_parser(parser) -> ArgumentParser:
add_rehearsal_args(parser)
parser.add_argument('--task_free', type=binary_to_boolean_type, default=False, help='Enable task-free training (replay starts from second task)?.')
parser.add_argument('--use_custom_classifier', type=binary_to_boolean_type, default=True, help='Use the custom classifier used in the original work.')
return parser
def __init__(self, backbone, loss, args, transform, dataset=None):
if args.use_custom_classifier:
assert hasattr(backbone, 'classifier'), 'The backbone must have a classifier layer.'
backbone.classifier = CustomLinear(backbone.classifier.in_features, backbone.classifier.out_features)
super().__init__(backbone, loss, args, transform, dataset=dataset)
self.buffer = Buffer(self.args.buffer_size)
self.seen_so_far = torch.tensor([]).long().to(self.device)
[docs]
def observe(self, inputs, labels, not_aug_inputs, epoch=None):
present = labels.unique()
self.seen_so_far = torch.cat([self.seen_so_far, present]).unique()
logits = self.net(inputs)
mask = torch.zeros_like(logits)
mask[:, present] = 1
self.opt.zero_grad()
# if self.seen_so_far.max() < (self.num_classes - 1):
mask[:, self.seen_so_far.max():] = 1
if self.current_task > 0 or self.args.task_free:
logits = logits.masked_fill(mask == 0, -1e9) # torch.finfo(logits.dtype).min)
loss = self.loss(logits, labels)
loss_re = torch.tensor(0.)
if len(self.buffer) > 0:
if self.args.task_free or self.current_task > 0:
# sample from buffer
buf_inputs, buf_labels = self.buffer.get_data(
self.args.minibatch_size, transform=self.transform, device=self.device)
loss_re = self.loss(self.net(buf_inputs), buf_labels)
loss += loss_re
loss.backward()
self.opt.step()
self.buffer.add_data(examples=not_aug_inputs,
labels=labels)
return loss.item()