# Copyright 2022-present, Lorenzo Bonicelli, Pietro Buzzega, Matteo Boschini, Angelo Porrello, Simone Calderara.
# All rights reserved.
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
from copy import deepcopy
from typing import List, Tuple
import numpy as np
import torch
import torch.nn as nn
from datasets.utils.continual_dataset import ContinualDataset
from models.utils.continual_model import ContinualModel
from utils.augmentations import apply_transform
from utils.conf import create_seeded_dataloader, get_device
[docs]
def icarl_replay(self: ContinualModel, dataset, val_set_split=0):
"""
Merge the replay buffer with the current task data.
Optionally split the replay buffer into a validation set.
Args:
self: the model instance
dataset: the dataset
val_set_split: the fraction of the replay buffer to be used as validation set
"""
if self.current_task > 0:
buff_val_mask = torch.rand(len(self.buffer)) < val_set_split
val_train_mask = torch.zeros(len(dataset.train_loader.dataset.data)).bool()
val_train_mask[torch.randperm(len(dataset.train_loader.dataset.data))[:buff_val_mask.sum()]] = True
if val_set_split > 0:
self.val_dataset = deepcopy(dataset.train_loader.dataset)
data_concatenate = torch.cat if isinstance(dataset.train_loader.dataset.data, torch.Tensor) else np.concatenate
need_aug = hasattr(dataset.train_loader.dataset, 'not_aug_transform')
if not need_aug:
def refold_transform(x): return x.cpu()
else:
data_shape = len(dataset.train_loader.dataset.data[0].shape)
if data_shape == 3:
def refold_transform(x): return (x.cpu() * 255).permute([0, 2, 3, 1]).numpy().astype(np.uint8)
elif data_shape == 2:
def refold_transform(x): return (x.cpu() * 255).squeeze(1).type(torch.uint8)
# REDUCE AND MERGE TRAINING SET
dataset.train_loader.dataset.targets = np.concatenate([
dataset.train_loader.dataset.targets[~val_train_mask],
self.buffer.labels.cpu().numpy()[:len(self.buffer)][~buff_val_mask]
])
dataset.train_loader.dataset.data = data_concatenate([
dataset.train_loader.dataset.data[~val_train_mask],
refold_transform((self.buffer.examples)[:len(self.buffer)][~buff_val_mask])
])
if val_set_split > 0:
# REDUCE AND MERGE VALIDATION SET
self.val_dataset.targets = np.concatenate([
self.val_dataset.targets[val_train_mask],
self.buffer.labels.cpu().numpy()[:len(self.buffer)][buff_val_mask]
])
self.val_dataset.data = data_concatenate([
self.val_dataset.data[val_train_mask],
refold_transform((self.buffer.examples)[:len(self.buffer)][buff_val_mask])
])
self.val_loader = create_seeded_dataloader(self.args, self.val_dataset, batch_size=self.args.batch_size, shuffle=True)
[docs]
class BaseSampleSelection:
"""
Base class for sample selection strategies.
"""
def __init__(self, buffer_size: int, device):
"""
Initialize the sample selection strategy.
Args:
buffer_size: the maximum buffer size
device: the device to store the buffer on
"""
self.buffer_size = buffer_size
self.device = device
def __call__(self, num_seen_examples: int) -> int:
"""
Selects the index of the sample to replace.
Args:
num_seen_examples: the number of seen examples
Returns:
the index of the sample to replace
"""
raise NotImplementedError
[docs]
def update(self, *args, **kwargs):
"""
(optional) Update the state of the sample selection strategy.
"""
pass
[docs]
class ReservoirSampling(BaseSampleSelection):
def __call__(self, num_seen_examples: int) -> int:
"""
Reservoir sampling algorithm.
Args:
num_seen_examples: the number of seen examples
buffer_size: the maximum buffer size
Returns:
the target index if the current image is sampled, else -1
"""
if num_seen_examples < self.buffer_size:
return num_seen_examples
rand = np.random.randint(0, num_seen_examples + 1)
if rand < self.buffer_size:
return rand
else:
return -1
[docs]
class LossAwareBalancedSampling(BaseSampleSelection):
"""
Combination of Loss-Aware Sampling (LARS) and Balanced Reservoir Sampling (BRS) from `Rethinking Experience Replay: a Bag of Tricks for Continual Learning`.
"""
def __init__(self, buffer_size: int, device):
super().__init__(buffer_size, device)
# lossoir scores
self.importance_scores = torch.ones(buffer_size, device=device) * -float('inf')
# balancoir scores
self.balance_scores = torch.ones(self.buffer_size, dtype=torch.float).to(self.device) * -float('inf')
# merged scores
self.scores = torch.ones(self.buffer_size).to(self.device) * -float('inf')
[docs]
def update(self, indexes: torch.Tensor, values: torch.Tensor):
self.importance_scores[indexes] = values
[docs]
def merge_scores(self):
scaling_factor = self.importance_scores.abs().mean() * self.balance_scores.abs().mean()
norm_importance = self.importance_scores / scaling_factor
presoftscores = 0.5 * norm_importance + 0.5 * self.balance_scores
if presoftscores.max() - presoftscores.min() != 0:
presoftscores = (presoftscores - presoftscores.min()) / (presoftscores.max() - presoftscores.min() + 1e-9)
self.scores = presoftscores / presoftscores.sum()
[docs]
def update_balancoir_scores(self, labels: torch.Tensor):
unique_labels, orig_inputs_idxs, counts = labels.unique(return_counts=True, return_inverse=True)
# assert len(counts) > unique_labels.max(), "Some classes are missing from the buffer"
self.balance_scores = torch.gather(counts, 0, orig_inputs_idxs).float()
def __call__(self, num_seen_examples: int, labels: torch.Tensor) -> int:
if num_seen_examples < self.buffer_size:
return num_seen_examples
rn = np.random.randint(0, num_seen_examples)
if rn < self.buffer_size:
self.update_balancoir_scores(labels)
self.merge_scores()
index = np.random.choice(range(self.buffer_size), p=self.scores.cpu().numpy(), size=1)
return index
else:
return -1
[docs]
class ABSSampling(LARSSampling):
def __init__(self, buffer_size: int, device: str, dataset: ContinualDataset):
super().__init__(buffer_size, device)
self.dataset = dataset
[docs]
def scale_scores(self, past_indexes: torch.Tensor):
# due normalizzazioni divere per i due gruppi
past_importance = self.normalize_scores(self.importance_scores[past_indexes])
current_importance = self.normalize_scores(self.importance_scores[~past_indexes])
current_scores, past_scores = None, None
if past_importance is not None:
past_importance = 1 - past_importance
past_scores = past_importance / past_importance.sum()
if current_importance is not None:
if current_importance.sum() == 0:
current_importance += 1e-9
current_scores = current_importance / current_importance.sum()
return past_scores, current_scores
def __call__(self, num_seen_examples: int, labels: torch.Tensor) -> int:
n_seen_classes, _ = self.dataset.get_offsets()
if num_seen_examples < self.buffer_size:
return num_seen_examples
rn = np.random.randint(0, num_seen_examples)
if rn < self.buffer_size:
past_indexes = labels < n_seen_classes
past_scores, current_scores = self.scale_scores(past_indexes)
past_percentage = np.float64(past_indexes.sum().cpu() / self.buffer_size) # avoid numerical issues
pres_percetage = 1 - past_percentage
assert past_percentage + pres_percetage == 1, f"The sum of the percentages must be 1 but found {past_percentage+pres_percetage}: {past_percentage} + {pres_percetage}"
rp = np.random.choice((0, 1), p=[past_percentage, pres_percetage])
if not rp:
index = np.random.choice(np.arange(self.buffer_size)[past_indexes.cpu().numpy()], p=past_scores.cpu().numpy(), size=1)
else:
index = np.random.choice(np.arange(self.buffer_size)[~past_indexes.cpu().numpy()], p=current_scores.cpu().numpy(), size=1)
return index
else:
return -1
[docs]
class Buffer:
"""
The memory buffer of rehearsal method.
"""
buffer_size: int # the maximum size of the buffer
device: str # the device to store the buffer on
num_seen_examples: int # the total number of examples seen, used for reservoir
attributes: List[str] # the attributes stored in the buffer
attention_maps: List[torch.Tensor] # (optional) attention maps used by TwF
sample_selection_strategy: str # the sample selection strategy used to select samples to replace. By default, 'reservoir'
examples: torch.Tensor # (mandatory) buffer attribute: the tensor of images
labels: torch.Tensor # (optional) buffer attribute: the tensor of labels
logits: torch.Tensor # (optional) buffer attribute: the tensor of logits
task_labels: torch.Tensor # (optional) buffer attribute: the tensor of task labels
true_labels: torch.Tensor # (optional) buffer attribute: the tensor of true labels
def __init__(self, buffer_size: int, device="cpu", sample_selection_strategy='reservoir', **kwargs):
"""
Initialize a reservoir-based Buffer object.
Supports storing images, labels, logits, task_labels, and attention maps. This can be extended by adding more attributes to the `attributes` list and updating the `init_tensors` method accordingly.
To select samples to replace, the buffer supports:
- `reservoir` sampling: randomly selects samples to replace (default). Ref: "Jeffrey S Vitter. Random sampling with a reservoir."
- `lars`: prioritizes retaining samples with the *higher* loss. Ref: "Pietro Buzzega et al. Rethinking Experience Replay: a Bag of Tricks for Continual Learning."
- `labrs` (Loss-Aware Balanced Reservoir Sampling): combination of LARS and BRS. Ref: "Pietro Buzzega et al. Rethinking Experience Replay: a Bag of Tricks for Continual Learning."
- `abs` (Asymmetric Balanced Sampling): for samples from the current task, prioritizes retaining samples with the *lower* loss (i.e., inverse `lossoir`); for samples from previous tasks, prioritizes retaining samples with the *higher* loss (i.e., `lossoir`). Useful for settings with noisy labels. Ref: "Monica Millunzi et al. May the Forgetting Be with You: Alternate Replay for Learning with Noisy Labels".
Args:
buffer_size (int): The maximum size of the buffer.
device (str, optional): The device to store the buffer on. Defaults to "cpu".
sample_selection_strategy: The sample selection strategy. Defaults to 'reservoir'. Options: 'reservoir', 'lars', 'labrs', 'abs'.
Note:
If during the `get_data` the transform is PIL, data will be moved to cpu and then back to the device. This is why the device is set to cpu by default.
"""
self.buffer_size = buffer_size
self.device = device
self.num_seen_examples = 0
self.attributes = ['examples', 'labels', 'logits', 'task_labels', 'true_labels']
self.attention_maps = [None] * buffer_size
self.sample_selection_strategy = sample_selection_strategy
assert sample_selection_strategy.lower() in ['reservoir', 'lars', 'labrs', 'abs'], f"Invalid sample selection strategy: {sample_selection_strategy}"
if sample_selection_strategy.lower() == 'abs':
assert 'dataset' in kwargs, "The dataset is required for ABS sample selection"
self.sample_selection_fn = ABSSampling(buffer_size, device, kwargs['dataset'])
elif sample_selection_strategy.lower() == 'lars':
self.sample_selection_fn = LARSSampling(buffer_size, device)
elif sample_selection_strategy.lower() == 'labrs':
self.sample_selection_fn = LossAwareBalancedSampling(buffer_size, device)
else:
self.sample_selection_fn = ReservoirSampling(buffer_size, device)
[docs]
def to(self, device):
"""
Move the buffer and its attributes to the specified device.
Args:
device: The device to move the buffer and its attributes to.
Returns:
The buffer instance with the updated device and attributes.
"""
self.device = device
self.sample_selection_fn.device = device
for attr_str in self.attributes:
if hasattr(self, attr_str):
setattr(self, attr_str, getattr(self, attr_str).to(device))
return self
def __len__(self):
"""
Returns the number items in the buffer.
"""
return min(self.num_seen_examples, self.buffer_size)
[docs]
def init_tensors(self, examples: torch.Tensor, labels: torch.Tensor,
logits: torch.Tensor, task_labels: torch.Tensor,
true_labels: torch.Tensor) -> None:
"""
Initializes just the required tensors.
Args:
examples: tensor containing the images
labels: tensor containing the labels
logits: tensor containing the outputs of the network
task_labels: tensor containing the task labels
true_labels: tensor containing the true labels (used only for logging)
"""
for attr_str in self.attributes:
attr = eval(attr_str)
if attr is not None and not hasattr(self, attr_str):
typ = torch.int64 if attr_str.endswith('els') else torch.float32
setattr(self, attr_str, torch.zeros((self.buffer_size,
*attr.shape[1:]), dtype=typ, device=self.device))
@property
def used_attributes(self):
"""
Returns a list of attributes that are currently being used by the object.
"""
return [attr_str for attr_str in self.attributes if hasattr(self, attr_str)]
[docs]
def add_data(self, examples, labels=None, logits=None, task_labels=None, attention_maps=None, true_labels=None, sample_selection_scores=None):
"""
Adds the data to the memory buffer according to the reservoir strategy.
Args:
examples: tensor containing the images
labels: tensor containing the labels
logits: tensor containing the outputs of the network
task_labels: tensor containing the task labels
attention_maps: list of tensors containing the attention maps
true_labels: if setting is noisy, the true labels associated with the examples. **Used only for logging.**
sample_selection_scores: tensor containing the scores used for the sample selection strategy. NOTE: this is only used if the sample selection strategy defines the `update` method.
Note:
Only the examples are required. The other tensors are initialized only if they are provided.
"""
if not hasattr(self, 'examples'):
self.init_tensors(examples, labels, logits, task_labels, true_labels)
for i in range(examples.shape[0]):
if self.sample_selection_strategy == 'abs' or self.sample_selection_strategy == 'labrs':
index = self.sample_selection_fn(self.num_seen_examples, labels=self.labels)
else:
index = self.sample_selection_fn(self.num_seen_examples)
self.num_seen_examples += 1
if index >= 0:
self.examples[index] = examples[i].to(self.device)
if labels is not None:
self.labels[index] = labels[i].to(self.device)
if logits is not None:
self.logits[index] = logits[i].to(self.device)
if task_labels is not None:
self.task_labels[index] = task_labels[i].to(self.device)
if attention_maps is not None:
self.attention_maps[index] = [at[i].byte().to(self.device) for at in attention_maps]
if sample_selection_scores is not None:
self.sample_selection_fn.update(index, sample_selection_scores[i])
if true_labels is not None:
self.true_labels[index] = true_labels[i].to(self.device)
[docs]
def get_data(self, size: int, transform: nn.Module = None, return_index=False, device=None,
mask_task_out=None, cpt=None, return_not_aug=False, not_aug_transform=None) -> Tuple:
"""
Random samples a batch of size items.
Args:
size: the number of requested items
transform: the transformation to be applied (data augmentation)
return_index: if True, returns the indexes of the sampled items
mask_task: if not None, masks OUT the examples from the given task
cpt: the number of classes per task (required if mask_task is not None and task_labels are not present)
return_not_aug: if True, also returns the not augmented items
not_aug_transform: the transformation to be applied to the not augmented items (if `return_not_aug` is True)
Returns:
a tuple containing the requested items. If return_index is True, the tuple contains the indexes as first element.
"""
target_device = self.device if device is None else device
if mask_task_out is not None:
assert hasattr(self, 'task_labels') or cpt is not None
assert hasattr(self, 'task_labels') or hasattr(self, 'labels')
samples_mask = (self.task_labels != mask_task_out) if hasattr(self, 'task_labels') else self.labels // cpt != mask_task_out
num_avail_samples = self.examples.shape[0] if mask_task_out is None else samples_mask.sum().item()
num_avail_samples = min(self.num_seen_examples, num_avail_samples)
if size > min(num_avail_samples, self.examples.shape[0]):
size = min(num_avail_samples, self.examples.shape[0])
choice = np.random.choice(num_avail_samples, size=size, replace=False)
if transform is None:
def transform(x): return x
selected_samples = self.examples[choice] if mask_task_out is None else self.examples[samples_mask][choice]
if return_not_aug:
if not_aug_transform is None:
def not_aug_transform(x): return x
ret_tuple = (apply_transform(selected_samples, transform=not_aug_transform).to(target_device),)
else:
ret_tuple = tuple()
ret_tuple += (apply_transform(selected_samples, transform=transform).to(target_device),)
for attr_str in self.attributes[1:]:
if hasattr(self, attr_str):
attr = getattr(self, attr_str)
selected_attr = attr[choice] if mask_task_out is None else attr[samples_mask][choice]
ret_tuple += (selected_attr.to(target_device),)
if not return_index:
return ret_tuple
else:
return (torch.tensor(choice).to(target_device), ) + ret_tuple
[docs]
def get_data_by_index(self, indexes, transform: nn.Module = None, device=None) -> Tuple:
"""
Returns the data by the given index.
Args:
index: the index of the item
transform: the transformation to be applied (data augmentation)
Returns:
a tuple containing the requested items. The returned items depend on the attributes stored in the buffer from previous calls to `add_data`.
"""
target_device = self.device if device is None else device
if transform is None:
def transform(x): return x
ret_tuple = (apply_transform(self.examples[indexes], transform=transform).to(target_device),)
for attr_str in self.attributes[1:]:
if hasattr(self, attr_str):
attr = getattr(self, attr_str).to(target_device)
ret_tuple += (attr[indexes],)
return ret_tuple
[docs]
def is_empty(self) -> bool:
"""
Returns true if the buffer is empty, false otherwise.
"""
if self.num_seen_examples == 0:
return True
else:
return False
[docs]
def get_all_data(self, transform: nn.Module = None, device=None) -> Tuple:
"""
Return all the items in the memory buffer.
Args:
transform: the transformation to be applied (data augmentation)
Returns:
a tuple with all the items in the memory buffer
"""
target_device = self.device if device is None else device
if transform is None:
ret_tuple = (self.examples[:len(self)].to(target_device),)
else:
ret_tuple = (apply_transform(self.examples[:len(self)], transform=transform).to(target_device),)
for attr_str in self.attributes[1:]:
if hasattr(self, attr_str):
attr = getattr(self, attr_str)[:len(self)].to(target_device)
ret_tuple += (attr,)
return ret_tuple
[docs]
def empty(self) -> None:
"""
Set all the tensors to None.
"""
for attr_str in self.attributes:
if hasattr(self, attr_str):
delattr(self, attr_str)
self.num_seen_examples = 0
[docs]
@torch.no_grad()
def fill_buffer(buffer: Buffer, dataset: ContinualDataset, t_idx: int, net: ContinualModel = None, use_herding=False, required_attributes: List[str] = None) -> None:
"""
Adds examples from the current task to the memory buffer.
Supports images, labels, task_labels, and logits.
Args:
buffer: the memory buffer
dataset: the dataset from which take the examples
t_idx: the task index
net: (optional) the model instance. Used if logits are in buffer. If provided, adds logits.
use_herding: (optional) if True, uses herding strategy. Otherwise, random sampling.
required_attributes: (optional) the attributes to be added to the buffer. If None and buffer is empty, adds only examples and labels.
"""
if net is not None:
mode = net.training
net.eval()
else:
assert not use_herding, "Herding strategy requires a model instance"
device = net.device if net is not None else get_device()
n_seen_classes = dataset.N_CLASSES_PER_TASK * (t_idx + 1) if isinstance(dataset.N_CLASSES_PER_TASK, int) else \
sum(dataset.N_CLASSES_PER_TASK[:t_idx + 1])
n_past_classes = dataset.N_CLASSES_PER_TASK * t_idx if isinstance(dataset.N_CLASSES_PER_TASK, int) else \
sum(dataset.N_CLASSES_PER_TASK[:t_idx])
samples_per_class = buffer.buffer_size // n_seen_classes
# Check for requirs attributes
required_attributes = required_attributes or ['examples', 'labels']
assert all([attr in buffer.used_attributes for attr in required_attributes]) or len(buffer) == 0, \
"Required attributes not in buffer: {}".format([attr for attr in required_attributes if attr not in buffer.used_attributes])
if t_idx > 0:
# 1) First, subsample prior classes
buf_data = buffer.get_all_data()
buf_y = buf_data[1]
buffer.empty()
for _y in buf_y.unique():
idx = (buf_y == _y)
_buf_data_idx = {attr_name: _d[idx][:samples_per_class] for attr_name, _d in zip(required_attributes, buf_data)}
buffer.add_data(**_buf_data_idx)
# 2) Then, fill with current tasks
loader = dataset.train_loader
norm_trans = dataset.get_normalization_transform()
if norm_trans is None:
def norm_trans(x): return x
if 'logits' in buffer.used_attributes:
assert net is not None, "Logits in buffer require a model instance"
# 2.1 Extract all features
a_x, a_y, a_f, a_l = [], [], [], []
for data in loader:
x, y, not_norm_x = data[0], data[1], data[2]
mask = (y >= n_past_classes) & (y < n_seen_classes)
x, y, not_norm_x = x[mask], y[mask], not_norm_x[mask]
if not x.size(0):
continue
a_x.append(not_norm_x.cpu())
a_y.append(y.cpu())
if net is not None:
feats = net(norm_trans(not_norm_x.to(device)), returnt='features')
outs = net.classifier(feats)
a_f.append(feats.cpu())
a_l.append(torch.sigmoid(outs).cpu())
a_x, a_y = torch.cat(a_x), torch.cat(a_y)
if net is not None:
a_f, a_l = torch.cat(a_f), torch.cat(a_l)
# 2.2 Compute class means
for _y in a_y.unique():
idx = (a_y == _y)
_x, _y = a_x[idx], a_y[idx]
if use_herding:
_l = a_l[idx]
feats = a_f[idx]
mean_feat = feats.mean(0, keepdim=True)
running_sum = torch.zeros_like(mean_feat)
i = 0
while i < samples_per_class and i < feats.shape[0]:
cost = (mean_feat - (feats + running_sum) / (i + 1)).norm(2, 1)
idx_min = cost.argmin().item()
buffer.add_data(
examples=_x[idx_min:idx_min + 1].to(device),
labels=_y[idx_min:idx_min + 1].to(device),
logits=_l[idx_min:idx_min + 1].to(device) if 'logits' in required_attributes else None,
task_labels=torch.ones(len(_x[idx_min:idx_min + 1])).to(device) * t_idx if 'task_labels' in required_attributes else None
)
running_sum += feats[idx_min:idx_min + 1]
feats[idx_min] = feats[idx_min] + 1e6
i += 1
else:
idx = torch.randperm(len(_x))[:samples_per_class]
buffer.add_data(
examples=_x[idx].to(device),
labels=_y[idx].to(device),
logits=_l[idx].to(device) if 'logits' in required_attributes else None,
task_labels=torch.ones(len(_x[idx])).to(device) * t_idx if 'task_labels' in required_attributes else None
)
assert len(buffer.examples) <= buffer.buffer_size
assert buffer.num_seen_examples <= buffer.buffer_size
if net is not None:
net.train(mode)