import time from copy import deepcopy import thop import torch.nn as nn from ultralytics.yolo.utils import LOGGER from ultralytics.yolo.utils.anchors import check_anchor_order from ultralytics.yolo.utils.modeling import parse_model from ultralytics.yolo.utils.modeling.modules import * from ultralytics.yolo.utils.torch_utils import (fuse_conv_and_bn, initialize_weights, intersect_state_dicts, model_info, scale_img, time_sync) class BaseModel(nn.Module): # YOLOv5 base model def forward(self, x, profile=False, visualize=False): return self._forward_once(x, profile, visualize) # single-scale inference, train def _forward_once(self, x, profile=False, visualize=False): y, dt = [], [] # outputs for m in self.model: if m.f != -1: # if not from previous layer x = y[m.f] if isinstance(m.f, int) else [x if j == -1 else y[j] for j in m.f] # from earlier layers if profile: self._profile_one_layer(m, x, dt) x = m(x) # run y.append(x if m.i in self.save else None) # save output if visualize: pass # TODO: feature_visualization(x, m.type, m.i, save_dir=visualize) return x def _profile_one_layer(self, m, x, dt): c = m == self.model[-1] # is final layer, copy input as inplace fix o = thop.profile(m, inputs=(x.copy() if c else x,), verbose=False)[0] / 1E9 * 2 if thop else 0 # FLOPs t = time_sync() for _ in range(10): m(x.copy() if c else x) dt.append((time_sync() - t) * 100) if m == self.model[0]: LOGGER.info(f"{'time (ms)':>10s} {'GFLOPs':>10s} {'params':>10s} module") LOGGER.info(f'{dt[-1]:10.2f} {o:10.2f} {m.np:10.0f} {m.type}') if c: LOGGER.info(f"{sum(dt):10.2f} {'-':>10s} {'-':>10s} Total") def fuse(self): # fuse model Conv2d() + BatchNorm2d() layers LOGGER.info('Fusing layers... ') for m in self.model.modules(): if isinstance(m, (Conv, DWConv)) and hasattr(m, 'bn'): m.conv = fuse_conv_and_bn(m.conv, m.bn) # update conv delattr(m, 'bn') # remove batchnorm m.forward = m.forward_fuse # update forward self.info() return self def info(self, verbose=False, img_size=640): # print model information model_info(self, verbose, img_size) def _apply(self, fn): # Apply to(), cpu(), cuda(), half() to model tensors that are not parameters or registered buffers self = super()._apply(fn) m = self.model[-1] # Detect() if isinstance(m, (Detect, Segment)): m.stride = fn(m.stride) m.grid = list(map(fn, m.grid)) if isinstance(m.anchor_grid, list): m.anchor_grid = list(map(fn, m.anchor_grid)) return self def load(self, weights): # Force all tasks implement this function raise NotImplementedError("This function needs to be implemented by derived classes!") class DetectionModel(BaseModel): # YOLO detection model def __init__(self, cfg='yolov5s.yaml', ch=3, nc=None, anchors=None): # model, input channels, number of classes super().__init__() if isinstance(cfg, dict): self.yaml = cfg # model dict else: # is *.yaml import yaml # for torch hub self.yaml_file = Path(cfg).name with open(cfg, encoding='ascii', errors='ignore') as f: self.yaml = yaml.safe_load(f) # model dict # Define model ch = self.yaml['ch'] = self.yaml.get('ch', ch) # input channels if nc and nc != self.yaml['nc']: LOGGER.info(f"Overriding model.yaml nc={self.yaml['nc']} with nc={nc}") self.yaml['nc'] = nc # override yaml value if anchors: LOGGER.info(f'Overriding model.yaml anchors with anchors={anchors}') self.yaml['anchors'] = round(anchors) # override yaml value self.model, self.save = parse_model(deepcopy(self.yaml), ch=[ch]) # model, savelist self.names = [str(i) for i in range(self.yaml['nc'])] # default names self.inplace = self.yaml.get('inplace', True) # Build strides, anchors m = self.model[-1] # Detect() if isinstance(m, (Detect, Segment)): s = 256 # 2x min stride m.inplace = self.inplace forward = lambda x: self.forward(x)[0] if isinstance(m, Segment) else self.forward(x) m.stride = torch.tensor([s / x.shape[-2] for x in forward(torch.zeros(1, ch, s, s))]) # forward check_anchor_order(m) m.anchors /= m.stride.view(-1, 1, 1) self.stride = m.stride self._initialize_biases() # only run once # Init weights, biases initialize_weights(self) self.info() LOGGER.info('') def forward(self, x, augment=False, profile=False, visualize=False): if augment: return self._forward_augment(x) # augmented inference, None return self._forward_once(x, profile, visualize) # single-scale inference, train def _forward_augment(self, x): img_size = x.shape[-2:] # height, width s = [1, 0.83, 0.67] # scales f = [None, 3, None] # flips (2-ud, 3-lr) y = [] # outputs for si, fi in zip(s, f): xi = scale_img(x.flip(fi) if fi else x, si, gs=int(self.stride.max())) yi = self._forward_once(xi)[0] # forward # cv2.imwrite(f'img_{si}.jpg', 255 * xi[0].cpu().numpy().transpose((1, 2, 0))[:, :, ::-1]) # save yi = self._descale_pred(yi, fi, si, img_size) y.append(yi) y = self._clip_augmented(y) # clip augmented tails return torch.cat(y, 1), None # augmented inference, train def _descale_pred(self, p, flips, scale, img_size): # de-scale predictions following augmented inference (inverse operation) if self.inplace: p[..., :4] /= scale # de-scale if flips == 2: p[..., 1] = img_size[0] - p[..., 1] # de-flip ud elif flips == 3: p[..., 0] = img_size[1] - p[..., 0] # de-flip lr else: x, y, wh = p[..., 0:1] / scale, p[..., 1:2] / scale, p[..., 2:4] / scale # de-scale if flips == 2: y = img_size[0] - y # de-flip ud elif flips == 3: x = img_size[1] - x # de-flip lr p = torch.cat((x, y, wh, p[..., 4:]), -1) return p def _clip_augmented(self, y): # Clip YOLOv5 augmented inference tails nl = self.model[-1].nl # number of detection layers (P3-P5) g = sum(4 ** x for x in range(nl)) # grid points e = 1 # exclude layer count i = (y[0].shape[1] // g) * sum(4 ** x for x in range(e)) # indices y[0] = y[0][:, :-i] # large i = (y[-1].shape[1] // g) * sum(4 ** (nl - 1 - x) for x in range(e)) # indices y[-1] = y[-1][:, i:] # small return y def _initialize_biases(self, cf=None): # initialize biases into Detect(), cf is class frequency # https://arxiv.org/abs/1708.02002 section 3.3 # cf = torch.bincount(torch.tensor(np.concatenate(dataset.labels, 0)[:, 0]).long(), minlength=nc) + 1. m = self.model[-1] # Detect() module for mi, s in zip(m.m, m.stride): # from b = mi.bias.view(m.na, -1) # conv.bias(255) to (3,85) b.data[:, 4] += math.log(8 / (640 / s) ** 2) # obj (8 objects per 640 image) b.data[:, 5:5 + m.nc] += math.log(0.6 / (m.nc - 0.99999)) if cf is None else torch.log(cf / cf.sum()) # cls mi.bias = torch.nn.Parameter(b.view(-1), requires_grad=True) def load(self, weights): ckpt = torch.load(weights, map_location='cpu') # load checkpoint to CPU to avoid CUDA memory leak csd = ckpt['model'].float().state_dict() # checkpoint state_dict as FP32 csd = intersect_state_dicts(csd, self.state_dict()) # intersect self.load_state_dict(csd, strict=False) # load class SegmentationModel(DetectionModel): # YOLOv5 segmentation model def __init__(self, cfg='yolov5s-seg.yaml', ch=3, nc=None, anchors=None): super().__init__(cfg, ch, nc, anchors) class ClassificationModel(BaseModel): # YOLOv5 classification model def __init__(self, cfg=None, model=None, nc=1000, cutoff=10): # yaml, model, number of classes, cutoff index super().__init__() self._from_detection_model(model, nc, cutoff) if model is not None else self._from_yaml(cfg) def _from_detection_model(self, model, nc=1000, cutoff=10): # Create a YOLOv5 classification model from a YOLOv5 detection model if isinstance(model, AutoBackend): model = model.model # unwrap DetectMultiBackend model.model = model.model[:cutoff] # backbone m = model.model[-1] # last layer ch = m.conv.in_channels if hasattr(m, 'conv') else m.cv1.conv.in_channels # ch into module c = Classify(ch, nc) # Classify() c.i, c.f, c.type = m.i, m.f, 'models.common.Classify' # index, from, type model.model[-1] = c # replace self.model = model.model self.stride = model.stride self.save = [] self.nc = nc def _from_yaml(self, cfg): # Create a YOLOv5 classification model from a *.yaml file self.model = None def load(self, weights): ckpt = torch.load(weights, map_location='cpu') # load checkpoint to CPU to avoid CUDA memory leak csd = ckpt['model'].float().state_dict() # checkpoint state_dict as FP32 csd = intersect_state_dicts(csd, self.state_dict()) # intersect self.load_state_dict(csd, strict=False) # load