#!/usr/bin/env python import argparse import sys import os import os.path as osp from collections import OrderedDict import numpy as np import cv2 import paddle import paddlers from sklearn.decomposition import PCA _dir = osp.dirname(osp.abspath(__file__)) sys.path.append(osp.abspath(osp.join(_dir, '../'))) import custom_model import custom_trainer FILENAME_PATTERN = "{key}_{idx}_vis.png" class FeatureContainer: def __init__(self): self._dict = OrderedDict() def __setitem__(self, key, val): if key not in self._dict: self._dict[key] = list() self._dict[key].append(val) def __getitem__(self, key): return self._dict[key] def __repr__(self): return self._dict.__repr__() def items(self): return self._dict.items() def keys(self): return self._dict.keys() def values(self): return self._dict.values() class HookHelper: def __init__(self, model, fetch_dict, out_dict, hook_type='forward_out', auto_key=True): # XXX: A HookHelper object should only be used as a context manager and should not # persist in memory since it may keep references to some very large objects. self.model = model self.fetch_dict = fetch_dict self.out_dict = out_dict self._handles = [] self.hook_type = hook_type self.auto_key = auto_key def __enter__(self): def _hook_proto(x, entry): # `x` should be a tensor or a tuple; # entry is expected to be a string or a non-nested tuple. if isinstance(entry, tuple): for key, f in zip(entry, x): self.out_dict[key] = f.detach().clone() else: if isinstance(x, tuple) and self.auto_key: for i, f in enumerate(x): key = self._gen_key(entry, i) self.out_dict[key] = f.detach().clone() else: self.out_dict[entry] = x.detach().clone() if self.hook_type == 'forward_in': # NOTE: Register forward hooks for LAYERs for name, layer in self.model.named_sublayers(): if name in self.fetch_dict: entry = self.fetch_dict[name] self._handles.append( layer.register_forward_pre_hook( lambda l, x, entry=entry: # x is a tuple _hook_proto(x[0] if len(x)==1 else x, entry) ) ) elif self.hook_type == 'forward_out': # NOTE: Register forward hooks for LAYERs. for name, module in self.model.named_sublayers(): if name in self.fetch_dict: entry = self.fetch_dict[name] self._handles.append( module.register_forward_post_hook( lambda l, x, y, entry=entry: # y is a tensor or a tuple _hook_proto(y, entry) ) ) elif self.hook_type == 'backward': # NOTE: Register backward hooks for TENSORs. for name, param in self.model.named_parameters(): if name in self.fetch_dict: entry = self.fetch_dict[name] self._handles.append( param.register_hook( lambda grad, entry=entry: _hook_proto(grad, entry))) else: raise RuntimeError("Hook type is not implemented.") def __exit__(self, exc_type, exc_val, ext_tb): for handle in self._handles: handle.remove() def _gen_key(self, key, i): return key + f'_{i}' def parse_args(): parser = argparse.ArgumentParser() parser.add_argument( "--model_dir", default=None, type=str, help="Path of saved model.") parser.add_argument( "--hook_type", default='forward_out', type=str, help="Type of hook.") parser.add_argument( "--layer_names", nargs='+', default=[], type=str, help="Layers that accepts or produces the features to visualize.") parser.add_argument( "--im_paths", nargs='+', type=str, help="Paths of input images.") parser.add_argument( "--save_dir", type=str, help="Path of directory to save prediction results.") parser.add_argument( "--to_pseudo_color", action='store_true', help="Whether to save pseudo-color images.") parser.add_argument( "--output_size", nargs='+', type=int, default=None, help="Resize the visualized image to `output_size`.") return parser.parse_args() def normalize_minmax(x): EPS = 1e-32 return (x - x.min()) / (x.max() - x.min() + EPS) def quantize_8bit(x): # [0.0,1.0] float => [0,255] uint8 # or [0,1] int => [0,255] uint8 return (x * 255).astype('uint8') def to_pseudo_color(gray, color_map=cv2.COLORMAP_JET): return cv2.applyColorMap(gray, color_map) def process_fetched_feat(feat, to_pcolor=True): # Convert tensor to array feat = feat.squeeze(0).numpy() # Get principal component shape = feat.shape x = feat.reshape(shape[0], -1).transpose((1, 0)) pca = PCA(n_components=1) y = pca.fit_transform(x) feat = y.reshape(shape[1:]) feat = normalize_minmax(feat) feat = quantize_8bit(feat) if to_pcolor: feat = to_pseudo_color(feat) return feat if __name__ == '__main__': args = parse_args() # Load model model = paddlers.tasks.load_model(args.model_dir) fetch_dict = dict(zip(args.layer_names, args.layer_names)) out_dict = FeatureContainer() with HookHelper(model.net, fetch_dict, out_dict, hook_type=args.hook_type): if len(args.im_paths) == 1: model.predict(args.im_paths[0]) else: if len(args.im_paths) != 2: raise ValueError model.predict(tuple(args.im_paths)) if not osp.exists(args.save_dir): os.makedirs(args.save_dir) for key, feats in out_dict.items(): for idx, feat in enumerate(feats): im_vis = process_fetched_feat(feat, to_pcolor=args.to_pseudo_color) if args.output_size is not None: im_vis = cv2.resize(im_vis, tuple(args.output_size)) out_path = osp.join( args.save_dir, FILENAME_PATTERN.format( key=key.replace('.', '_'), idx=idx)) cv2.imwrite(out_path, im_vis) print(f"Write feature map to {out_path}")