# pylint: disable=too-many-branches """ Run MonoLoco/MonStereo and converts annotations into KITTI format """ import os import math from collections import defaultdict import torch from ..network import Loco from ..network.process import preprocess_pifpaf from ..network.geom_baseline import geometric_coordinates from ..utils import get_keypoints, pixel_to_camera, factory_file, factory_basename, make_new_directory, get_category, \ xyz_from_distance, read_and_rewrite from .stereo_baselines import baselines_association from .reid_baseline import get_reid_features, ReID class GenerateKitti: dir_gt = os.path.join('data', 'kitti', 'gt') dir_gt_new = os.path.join('data', 'kitti', 'gt_new') dir_kk = os.path.join('data', 'kitti', 'calib') dir_byc = '/data/lorenzo-data/kitti/object_detection/left' monoloco_checkpoint = 'data/models/monoloco-190717-0952.pkl' baselines = {'mono': [], 'stereo': []} def __init__(self, args): # Load Network self.net = args.net assert args.net in ('monstereo', 'monoloco_pp'), "net not recognized" use_cuda = torch.cuda.is_available() device = torch.device("cuda" if use_cuda else "cpu") self.model = Loco( model=args.model, net=args.net, device=device, n_dropout=args.n_dropout, p_dropout=args.dropout, linear_size=args.hidden_size ) # Extract list of pifpaf files in validation images self.dir_ann = args.dir_ann self.generate_official = args.generate_official assert os.listdir(self.dir_ann), "Annotation directory is empty" self.set_basename = factory_basename(args.dir_ann, self.dir_gt) # For quick testing # ------------------------------------------------------------------------------------------------------------ # self.set_basename = ('001782',) # self.set_basename = ('002282',) # ------------------------------------------------------------------------------------------------------------ # Add monocular and stereo baselines (they require monoloco as backbone) if args.baselines: # Load MonoLoco self.baselines['mono'] = ['monoloco', 'geometric'] self.monoloco = Loco( model=self.monoloco_checkpoint, net='monoloco', device=device, n_dropout=args.n_dropout, p_dropout=args.dropout, linear_size=256 ) # Stereo baselines if args.net == 'monstereo': self.baselines['stereo'] = ['pose', 'reid'] self.cnt_disparity = defaultdict(int) self.cnt_no_stereo = 0 self.dir_images = os.path.join('data', 'kitti', 'images') self.dir_images_r = os.path.join('data', 'kitti', 'images_r') # ReID Baseline weights_path = 'data/models/reid_model_market.pkl' self.reid_net = ReID(weights_path=weights_path, device=device, num_classes=751, height=256, width=128) def run(self): """Run Monoloco and save txt files for KITTI evaluation""" cnt_ann = cnt_file = cnt_no_file = 0 # Prepare empty folder di = os.path.join('data', 'kitti', self.net) make_new_directory(di) dir_out = {self.net: di} for mode, names in self.baselines.items(): for name in names: di = os.path.join('data', 'kitti', name) make_new_directory(di) dir_out[name] = di # Run the model for basename in self.set_basename: path_calib = os.path.join(self.dir_kk, basename + '.txt') annotations, kk, tt = factory_file(path_calib, self.dir_ann, basename) boxes, keypoints = preprocess_pifpaf(annotations, im_size=(1242, 374)) cat = get_category(keypoints, os.path.join(self.dir_byc, basename + '.json')) if keypoints: annotations_r, _, _ = factory_file(path_calib, self.dir_ann, basename, mode='right') _, keypoints_r = preprocess_pifpaf(annotations_r, im_size=(1242, 374)) if self.net == 'monstereo': dic_out = self.model.forward(keypoints, kk, keypoints_r=keypoints_r) elif self.net == 'monoloco_pp': dic_out = self.model.forward(keypoints, kk) all_outputs = {self.net: [dic_out['xyzd'], dic_out['bi'], dic_out['epi'], dic_out['yaw'], dic_out['h'], dic_out['w'], dic_out['l']]} zzs = [float(el[2]) for el in dic_out['xyzd']] # Save txt files params = [kk, tt] path_txt = os.path.join(dir_out[self.net], basename + '.txt') save_txts(path_txt, boxes, all_outputs[self.net], params, mode=self.net, cat=cat) cnt_ann += len(boxes) cnt_file += 1 # MONO (+ STEREO BASELINES) if self.baselines['mono']: # MONOLOCO dic_out = self.monoloco.forward(keypoints, kk) zzs_geom, xy_centers = geometric_coordinates(keypoints, kk, average_y=0.48) all_outputs['monoloco'] = [dic_out['d'], dic_out['bi'], dic_out['epi']] + [zzs_geom, xy_centers] all_outputs['geometric'] = all_outputs['monoloco'] # monocular baselines for key in self.baselines['mono']: path_txt = {key: os.path.join(dir_out[key], basename + '.txt')} save_txts(path_txt[key], boxes, all_outputs[key], params, mode=key, cat=cat) # stereo baselines if self.baselines['stereo']: all_inputs = {} dic_xyz = self._run_stereo_baselines(basename, boxes, keypoints, zzs, path_calib) for key in dic_xyz: all_outputs[key] = all_outputs['monoloco'].copy() all_outputs[key][0] = dic_xyz[key] all_inputs[key] = boxes path_txt[key] = os.path.join(dir_out[key], basename + '.txt') save_txts(path_txt[key], all_inputs[key], all_outputs[key], params, mode='baseline', cat=cat) print("\nSaved in {} txt {} annotations. Not found {} images".format(cnt_file, cnt_ann, cnt_no_file)) if self.net == 'monstereo': print("STEREO:") for key in self.baselines['stereo']: print("Annotations corrected using {} baseline: {:.1f}%".format( key, self.cnt_disparity[key] / cnt_ann * 100)) print("Not found {}/{} stereo files".format(self.cnt_no_stereo, cnt_file)) if self.generate_official: create_empty_files(dir_out, self.net) # Create empty files for official evaluation def _run_stereo_baselines(self, basename, boxes, keypoints, zzs, path_calib): annotations_r, _, _ = factory_file(path_calib, self.dir_ann, basename, mode='right') boxes_r, keypoints_r = preprocess_pifpaf(annotations_r, im_size=(1242, 374)) _, kk, tt = factory_file(path_calib, self.dir_ann, basename) uv_centers = get_keypoints(keypoints, mode='bottom') # Kitti uses the bottom center to calculate depth xy_centers = pixel_to_camera(uv_centers, kk, 1) # Stereo baselines if keypoints_r: path_image = os.path.join(self.dir_images, basename + '.png') path_image_r = os.path.join(self.dir_images_r, basename + '.png') reid_features = get_reid_features(self.reid_net, boxes, boxes_r, path_image, path_image_r) dic_zzs, cnt = baselines_association(self.baselines['stereo'], zzs, keypoints, keypoints_r, reid_features) for key in cnt: self.cnt_disparity[key] += cnt[key] else: self.cnt_no_stereo += 1 dic_zzs = {key: zzs for key in self.baselines['stereo']} # Combine the stereo zz with x, y from 2D detection (no MonoLoco involved) dic_xyz = defaultdict(list) for key in dic_zzs: for idx, zz_base in enumerate(dic_zzs[key]): xx = float(xy_centers[idx][0]) * zz_base yy = float(xy_centers[idx][1]) * zz_base dic_xyz[key].append([xx, yy, zz_base]) return dic_xyz def save_txts(path_txt, all_inputs, all_outputs, all_params, mode='monoloco', cat=None): assert mode in ('monoloco', 'monstereo', 'geometric', 'baseline', 'monoloco_pp') if mode in ('monstereo', 'monoloco_pp'): xyzd, bis, epis, yaws, hs, ws, ls = all_outputs[:] xyz = xyzd[:, 0:3] tt = [0, 0, 0] elif mode in ('monoloco', 'geometric'): tt = [0, 0, 0] dds, bis, epis, zzs_geom, xy_centers = all_outputs[:] xyz = xyz_from_distance(dds, xy_centers) else: _, tt = all_params[:] xyz, bis, epis, zzs_geom, xy_centers = all_outputs[:] uv_boxes = all_inputs[:] assert len(uv_boxes) == len(list(xyz)), "Number of inputs different from number of outputs" with open(path_txt, "w+") as ff: for idx, uv_box in enumerate(uv_boxes): xx = float(xyz[idx][0]) - tt[0] yy = float(xyz[idx][1]) - tt[1] zz = float(xyz[idx][2]) - tt[2] if mode == 'geometric': zz = zzs_geom[idx] cam_0 = [xx, yy, zz] bi = float(bis[idx]) epi = float(epis[idx]) if mode in ('monstereo', 'monoloco_pp'): alpha, ry = float(yaws[0][idx]), float(yaws[1][idx]) hwl = [float(hs[idx]), float(ws[idx]), float(ls[idx])] else: alpha, ry, hwl = -10., -10., [0, 0, 0] # Set the scale to obtain (approximately) same recall at evaluation if mode == 'monstereo': conf_scale = 0.03 elif mode == 'monoloco_pp': conf_scale = 0.033 # conf_scale = 0.035 # nuScenes for having same recall else: conf_scale = 0.05 conf = conf_scale * (uv_box[-1]) / (bi / math.sqrt(xx ** 2 + yy ** 2 + zz ** 2)) output_list = [alpha] + uv_box[:-1] + hwl + cam_0 + [ry, conf, bi, epi] category = cat[idx] if category < 0.1: ff.write("%s " % 'Pedestrian') else: ff.write("%s " % 'Cyclist') ff.write("%i %i " % (-1, -1)) for el in output_list: ff.write("%f " % el) ff.write("\n") def create_empty_files(dir_out, net): """Create empty txt files to run official kitti metrics on MonStereo and all other methods""" methods = ['pseudo-lidar', 'monopsr', '3dop', 'm3d', 'oc-stereo', 'e2e', 'monodis', 'smoke'] dirs = [os.path.join('data', 'kitti', method) for method in methods] dirs_orig = [os.path.join('data', 'kitti', method + '-orig') for method in methods] for di, di_orig in zip(dirs, dirs_orig): make_new_directory(di) for i in range(7481): name = "0" * (6 - len(str(i))) + str(i) + '.txt' path_orig = os.path.join(di_orig, name) path = os.path.join(di, name) # If the file exits, rewrite in new folder, otherwise create empty file read_and_rewrite(path_orig, path) for i in range(7481): name = "0" * (6 - len(str(i))) + str(i) + '.txt' ff = open(os.path.join(dir_out[net], name), "a+") ff.close()