Stereo Baselines (#11)

* skeleton stereo_baselines

* skeleton stereo_baselines (2)

* combine stereo evaluation

* fix stereo bugs

* refactor for multiple baselines

* temp

* temp

* fix disparity bug

* fix bug on avg_disparity

* cleaning

* cleaning

* cleaning (2)

* methods_stereo variable

* refactor evaluation

* cleaning

* create general mode to save txt files

* temp

* fix bug in compare errors

* fix dicionary of iou

* unify parser

* skeleton pose baseline

* update visualization names

* refactor for modularity

* multiple baselines running

* update printing information

* add evaluation files filter

* refactor skeleton

* refactor skeleton(2)

* cleaning

* working refactor without reid

* add features and keypoints distinction

* working reid representation

* refactor to matricial form

* while version

* new for version

* correct bug in updated for version

* fix bugs in while version and reformat

* add new counting

* fix bug on counting

* fix bug on evaluation

* pylint cleaning

* build new version

* change test setting to allow flexibility on different platforms

* pylint clening(2)
This commit is contained in:
Lorenzo Bertoni 2019-08-23 14:07:09 +02:00 committed by GitHub
parent 0cacedb6b4
commit 9eafb1414e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 567 additions and 417 deletions

View File

@ -1,4 +1,4 @@
"""Open implementation of MonoLoco."""
__version__ = '0.4.5'
__version__ = '0.4.6'

View File

@ -15,7 +15,7 @@ from tabulate import tabulate
from ..utils import get_iou_matches, get_task_error, get_pixel_error, check_conditions, get_category, split_training, \
parse_ground_truth
from ..visuals import show_results, show_spread
from ..visuals import show_results, show_spread, show_task_error
class EvalKitti:
@ -23,22 +23,22 @@ class EvalKitti:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
CLUSTERS = ('easy', 'moderate', 'hard', 'all', '6', '10', '15', '20', '25', '30', '40', '50', '>50')
METHODS = ['m3d', 'md', 'geom', 'task_error', '3dop', 'our']
HEADERS = ['method', '<0.5', '<1m', '<2m', 'easy', 'moderate', 'hard', 'all']
CATEGORIES = ['pedestrian']
ALP_THRESHOLDS = ('<0.5m', '<1m', '<2m')
METHODS_MONO = ['m3d', 'monodepth', '3dop', 'monoloco']
METHODS_STEREO = ['ml_stereo', 'pose', 'reid']
BASELINES = ['geometric', 'task_error', 'pixel_error']
HEADERS = ('method', '<0.5', '<1m', '<2m', 'easy', 'moderate', 'hard', 'all')
CATEGORIES = ('pedestrian',)
def __init__(self, thresh_iou_our=0.3, thresh_iou_m3d=0.3, thresh_conf_m3d=0.3, thresh_conf_our=0.3,
def __init__(self, thresh_iou_monoloco=0.3, thresh_iou_base=0.3, thresh_conf_monoloco=0.3, thresh_conf_base=0.3,
verbose=False, stereo=False):
self.dir_gt = os.path.join('data', 'kitti', 'gt')
self.dir_m3d = os.path.join('data', 'kitti', 'm3d')
self.dir_3dop = os.path.join('data', 'kitti', '3dop')
self.dir_md = os.path.join('data', 'kitti', 'monodepth')
self.dir_our = os.path.join('data', 'kitti', 'monoloco')
self.main_dir = os.path.join('data', 'kitti')
self.dir_gt = os.path.join(self.main_dir, 'gt')
self.methods = self.METHODS_MONO
self.stereo = stereo
if self.stereo:
self.dir_our_stereo = os.path.join('data', 'kitti', 'monoloco_stereo')
self.METHODS.extend(['our_stereo', 'pixel_error'])
self.methods.extend(self.METHODS_STEREO)
path_train = os.path.join('splits', 'kitti_train.txt')
path_val = os.path.join('splits', 'kitti_val.txt')
dir_logs = os.path.join('data', 'logs')
@ -49,24 +49,21 @@ class EvalKitti:
self.path_results = os.path.join(dir_logs, 'eval-' + now_time + '.json')
self.verbose = verbose
assert os.path.exists(self.dir_m3d) and os.path.exists(self.dir_our) \
and os.path.exists(self.dir_3dop)
self.dic_thresh_iou = {'m3d': thresh_iou_m3d, '3dop': thresh_iou_m3d,
'md': thresh_iou_our, 'our': thresh_iou_our, 'our_stereo': thresh_iou_our}
self.dic_thresh_conf = {'m3d': thresh_conf_m3d, '3dop': thresh_conf_m3d,
'our': thresh_conf_our, 'our_stereo': thresh_conf_our}
self.dic_thresh_iou = {method: (thresh_iou_monoloco if method[:8] == 'monoloco' else thresh_iou_base)
for method in self.methods}
self.dic_thresh_conf = {method: (thresh_conf_monoloco if method[:8] == 'monoloco' else thresh_conf_base)
for method in self.methods}
# Extract validation images for evaluation
names_gt = tuple(os.listdir(self.dir_gt))
_, self.set_val = split_training(names_gt, path_train, path_val)
# Define variables to save statistics
self.dic_methods = None
self.errors = None
self.dic_stds = None
self.dic_stats = None
self.dic_cnt = None
self.cnt_stereo_error = None
self.cnt_gt = 0
def run(self):
@ -80,40 +77,29 @@ class EvalKitti:
self.dic_stats = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(float))))
self.dic_cnt = defaultdict(int)
self.cnt_gt = 0
self.cnt_stereo_error = 0
# Iterate over each ground truth file in the training set
for name in self.set_val:
path_gt = os.path.join(self.dir_gt, name)
path_m3d = os.path.join(self.dir_m3d, name)
path_our = os.path.join(self.dir_our, name)
if self.stereo:
path_our_stereo = os.path.join(self.dir_our_stereo, name)
path_3dop = os.path.join(self.dir_3dop, name)
path_md = os.path.join(self.dir_md, name)
# Iterate over each line of the gt file and save box location and distances
out_gt = parse_ground_truth(path_gt, category)
methods_out = defaultdict(tuple) # Save all methods for comparison
self.cnt_gt += len(out_gt[0])
# Extract annotations for the same file
if out_gt[0]:
out_m3d = self._parse_txts(path_m3d, category, method='m3d')
out_3dop = self._parse_txts(path_3dop, category, method='3dop')
out_md = self._parse_txts(path_md, category, method='md')
out_our = self._parse_txts(path_our, category, method='our')
out_our_stereo = self._parse_txts(path_our_stereo, category, method='our') if self.stereo else []
for method in self.methods:
# Extract annotations
dir_method = os.path.join(self.main_dir, method)
assert os.path.exists(dir_method), "directory of the method %s does not exists" % method
path_method = os.path.join(dir_method, name)
methods_out[method] = self._parse_txts(path_method, category, method=method)
# Compute the error with ground truth
self._estimate_error(out_gt, out_m3d, method='m3d')
self._estimate_error(out_gt, out_3dop, method='3dop')
self._estimate_error(out_gt, out_md, method='md')
self._estimate_error(out_gt, out_our, method='our')
if self.stereo:
self._estimate_error(out_gt, out_our_stereo, method='our_stereo')
# Compute the error with ground truth
self._estimate_error(out_gt, methods_out[method], method=method)
# Iterate over all the files together to find a pool of common annotations
self._compare_error(out_gt, out_m3d, out_3dop, out_md, out_our, out_our_stereo)
self._compare_error(out_gt, methods_out)
# Update statistics of errors and uncertainty
for key in self.errors:
@ -129,79 +115,54 @@ class EvalKitti:
if save or show:
show_results(self.dic_stats, show, save)
show_spread(self.dic_stats, show, save)
show_task_error(show, save)
def _parse_txts(self, path, category, method):
boxes = []
dds = []
stds_ale = []
stds_epi = []
dds_geom = []
dds_geometric = []
output = (boxes, dds) if method != 'monoloco' else (boxes, dds, stds_ale, stds_epi, dds_geometric)
# Iterate over each line of the txt file
if method in ['3dop', 'm3d']:
try:
with open(path, "r") as ff:
for line in ff:
if check_conditions(line, category, method=method, thresh=self.dic_thresh_conf[method]):
boxes.append([float(x) for x in line.split()[4:8]])
loc = ([float(x) for x in line.split()[11:14]])
dds.append(math.sqrt(loc[0] ** 2 + loc[1] ** 2 + loc[2] ** 2))
self.dic_cnt[method] += 1
return boxes, dds
except FileNotFoundError:
return [], []
elif method == 'md':
try:
with open(path, "r") as ff:
for line in ff:
box = [float(x[:-1]) for x in line.split()[0:4]]
delta_h = (box[3] - box[1]) / 7
delta_w = (box[2] - box[0]) / 3.5
assert delta_h > 0 and delta_w > 0, "Bounding box <=0"
box[0] -= delta_w
box[1] -= delta_h
box[2] += delta_w
box[3] += delta_h
try:
with open(path, "r") as ff:
for line_str in ff:
line = line_str.split()
if check_conditions(line, category, method=method, thresh=self.dic_thresh_conf[method]):
if method == 'monodepth':
box = [float(x[:-1]) for x in line[0:4]]
delta_h = (box[3] - box[1]) / 7
delta_w = (box[2] - box[0]) / 3.5
assert delta_h > 0 and delta_w > 0, "Bounding box <=0"
box[0] -= delta_w
box[1] -= delta_h
box[2] += delta_w
box[3] += delta_h
dd = float(line[5][:-1])
else:
box = [float(x) for x in line[4:8]]
loc = ([float(x) for x in line[11:14]])
dd = math.sqrt(loc[0] ** 2 + loc[1] ** 2 + loc[2] ** 2)
boxes.append(box)
dds.append(float(line.split()[5][:-1]))
dds.append(dd)
self.dic_cnt[method] += 1
return boxes, dds
except FileNotFoundError:
return [], []
else:
assert method == 'our', "method not recognized"
try:
with open(path, "r") as ff:
file_lines = ff.readlines()
for line_our in file_lines[:-1]:
line_list = [float(x) for x in line_our.split()]
if check_conditions(line_list, category, method=method, thresh=self.dic_thresh_conf[method]):
boxes.append(line_list[:4])
dds.append(line_list[8])
stds_ale.append(line_list[9])
stds_epi.append(line_list[10])
dds_geom.append(line_list[11])
self.dic_cnt[method] += 1
self.dic_cnt['geom'] += 1
# kk_list = [float(x) for x in file_lines[-1].split()]
return boxes, dds, stds_ale, stds_epi, dds_geom
except FileNotFoundError:
return [], [], [], [], []
if method == 'monoloco':
stds_ale.append(float(line[16]))
stds_epi.append(float(line[17]))
dds_geometric.append(float(line[18]))
self.dic_cnt['geometric'] += 1
return output
except FileNotFoundError:
return output
def _estimate_error(self, out_gt, out, method):
"""Estimate localization error"""
boxes_gt, _, dds_gt, zzs_gt, truncs_gt, occs_gt = out_gt
if method[:3] == 'our':
boxes, dds, stds_ale, stds_epi, dds_geom = out
if method == 'monoloco':
boxes, dds, stds_ale, stds_epi, dds_geometric = out
else:
boxes, dds = out
@ -212,62 +173,52 @@ class EvalKitti:
cat = get_category(boxes_gt[idx_gt], truncs_gt[idx_gt], occs_gt[idx_gt])
self.update_errors(dds[idx], dds_gt[idx_gt], cat, self.errors[method])
if method == 'our':
self.update_errors(dds_geom[idx], dds_gt[idx_gt], cat, self.errors['geom'])
if method == 'monoloco':
self.update_errors(dds_geometric[idx], dds_gt[idx_gt], cat, self.errors['geometric'])
self.update_uncertainty(stds_ale[idx], stds_epi[idx], dds[idx], dds_gt[idx_gt], cat)
dd_task_error = dds_gt[idx_gt] + (get_task_error(dds_gt[idx_gt]))**2
self.update_errors(dd_task_error, dds_gt[idx_gt], cat, self.errors['task_error'])
elif method == 'our_stereo':
dd_pixel_error = get_pixel_error(dds_gt[idx_gt], zzs_gt[idx_gt])
self.update_errors(dd_pixel_error, dds_gt[idx_gt], cat, self.errors['pixel_error'])
def _compare_error(self, out_gt, out_m3d, out_3dop, out_md, out_our, out_our_stereo):
def _compare_error(self, out_gt, methods_out):
"""Compare the error for a pool of instances commonly matched by all methods"""
# Extract outputs of each method
boxes_gt, _, dds_gt, zzs_gt, truncs_gt, occs_gt = out_gt
boxes_m3d, dds_m3d = out_m3d
boxes_3dop, dds_3dop = out_3dop
boxes_md, dds_md = out_md
boxes_our, dds_our, _, _, dds_geom = out_our
if self.stereo:
boxes_our_stereo, dds_our_stereo, _, _, dds_geom_stereo = out_our_stereo
# Find IoU matches
matches_our = get_iou_matches(boxes_our, boxes_gt, self.dic_thresh_iou['our'])
matches_m3d = get_iou_matches(boxes_m3d, boxes_gt, self.dic_thresh_iou['m3d'])
matches_3dop = get_iou_matches(boxes_3dop, boxes_gt, self.dic_thresh_iou['3dop'])
matches_md = get_iou_matches(boxes_md, boxes_gt, self.dic_thresh_iou['md'])
matches = []
boxes_monoloco = methods_out['monoloco'][0]
matches_monoloco = get_iou_matches(boxes_monoloco, boxes_gt, self.dic_thresh_iou['monoloco'])
base_methods = [method for method in self.methods if method != 'monoloco']
for method in base_methods:
boxes = methods_out[method][0]
matches.append(get_iou_matches(boxes, boxes_gt, self.dic_thresh_iou[method]))
# Update error of commonly matched instances
for (idx, idx_gt) in matches_our:
check, indices = extract_indices(idx_gt, matches_m3d, matches_3dop, matches_md)
for (idx, idx_gt) in matches_monoloco:
check, indices = extract_indices(idx_gt, *matches)
if check:
cat = get_category(boxes_gt[idx_gt], truncs_gt[idx_gt], occs_gt[idx_gt])
dd_gt = dds_gt[idx_gt]
self.update_errors(dds_our[idx], dd_gt, cat, self.errors['our_merged'])
self.update_errors(dds_geom[idx], dd_gt, cat, self.errors['geom_merged'])
self.update_errors(dd_gt + get_task_error(dd_gt), dd_gt, cat, self.errors['task_error_merged'])
self.update_errors(dds_m3d[indices[0]], dd_gt, cat, self.errors['m3d_merged'])
self.update_errors(dds_3dop[indices[1]], dd_gt, cat, self.errors['3dop_merged'])
self.update_errors(dds_md[indices[2]], dd_gt, cat, self.errors['md_merged'])
if self.stereo:
self.update_errors(dds_our_stereo[idx], dd_gt, cat, self.errors['our_stereo_merged'])
dd_pixel = get_pixel_error(dd_gt, zzs_gt[idx_gt])
self.update_errors(dd_pixel, dd_gt, cat, self.errors['pixel_error_merged'])
error = abs(dds_our[idx] - dd_gt)
error_stereo = abs(dds_our_stereo[idx] - dd_gt)
if error_stereo > (error + 0.1):
self.cnt_stereo_error += 1
for idx_indices, method in enumerate(base_methods):
dd = methods_out[method][1][indices[idx_indices]]
self.update_errors(dd, dd_gt, cat, self.errors[method + '_merged'])
for key in self.METHODS:
dd_monoloco = methods_out['monoloco'][1][idx]
dd_geometric = methods_out['monoloco'][4][idx]
self.update_errors(dd_monoloco, dd_gt, cat, self.errors['monoloco_merged'])
self.update_errors(dd_geometric, dd_gt, cat, self.errors['geometric_merged'])
self.update_errors(dd_gt + get_task_error(dd_gt), dd_gt, cat, self.errors['task_error_merged'])
dd_pixel = get_pixel_error(dd_gt, zzs_gt[idx_gt])
self.update_errors(dd_pixel, dd_gt, cat, self.errors['pixel_error_merged'])
for key in self.methods:
self.dic_cnt[key + '_merged'] += 1
def update_errors(self, dd, dd_gt, cat, errors):
"""Compute and save errors between a single box and the gt box which match"""
diff = abs(dd - dd_gt)
clst = find_cluster(dd_gt, self.CLUSTERS)
errors['all'].append(diff)
@ -346,23 +297,13 @@ class EvalKitti:
def show_statistics(self):
all_methods = self.methods + self.BASELINES
print('-'*90)
alp = [[str(100 * average(self.errors[key][perc]))[:5]
for perc in ['<0.5m', '<1m', '<2m']]
for key in self.METHODS]
ale = [[str(self.dic_stats['test'][key + '_merged'][clst]['mean'])[:4] + ' (' +
str(self.dic_stats['test'][key][clst]['mean'])[:4] + ')'
for clst in self.CLUSTERS[:4]]
for key in self.METHODS]
results = [[key] + alp[idx] + ale[idx] for idx, key in enumerate(self.METHODS)]
print(tabulate(results, headers=self.HEADERS))
print('-'*90 + '\n')
self.summary_table(all_methods)
if self.verbose:
methods_all = list(chain.from_iterable((method, method + '_merged') for method in self.METHODS))
for key in methods_all:
all_methods_merged = list(chain.from_iterable((method, method + '_merged') for method in all_methods))
for key in all_methods_merged:
for clst in self.CLUSTERS[:4]:
print(" {} Average error in cluster {}: {:.2f} with a max error of {:.1f}, "
"for {} annotations"
@ -370,13 +311,13 @@ class EvalKitti:
self.dic_stats['test'][key][clst]['max'],
self.dic_stats['test'][key][clst]['cnt']))
if key == 'our':
if key == 'monoloco':
print("% of annotation inside the confidence interval: {:.1f} %, "
"of which {:.1f} % at higher risk"
.format(self.dic_stats['test'][key][clst]['interval']*100,
self.dic_stats['test'][key][clst]['at_risk']*100))
for perc in ['<0.5m', '<1m', '<2m']:
for perc in self.ALP_THRESHOLDS:
print("{} Instances with error {}: {:.2f} %"
.format(key, perc, 100 * average(self.errors[key][perc])))
@ -385,27 +326,35 @@ class EvalKitti:
print("-" * 100)
print("\n Annotations inside the confidence interval: {:.1f} %"
.format(self.dic_stats['test']['our']['all']['interval']))
print("precision 1: {:.2f}".format(self.dic_stats['test']['our']['all']['prec_1']))
print("precision 2: {:.2f}".format(self.dic_stats['test']['our']['all']['prec_2']))
if self.stereo:
print("Stereo error greater than mono: {:.1f} %"
.format(100 * self.cnt_stereo_error / self.dic_cnt['our_merged']))
.format(self.dic_stats['test']['monoloco']['all']['interval']))
print("precision 1: {:.2f}".format(self.dic_stats['test']['monoloco']['all']['prec_1']))
print("precision 2: {:.2f}".format(self.dic_stats['test']['monoloco']['all']['prec_2']))
def summary_table(self, all_methods):
"""Tabulate table for ALP and ALE metrics"""
alp = [[str(100 * average(self.errors[key][perc]))[:5]
for perc in ['<0.5m', '<1m', '<2m']]
for key in all_methods]
ale = [[str(self.dic_stats['test'][key + '_merged'][clst]['mean'])[:4] + ' (' +
str(self.dic_stats['test'][key][clst]['mean'])[:4] + ')'
for clst in self.CLUSTERS[:4]]
for key in all_methods]
results = [[key] + alp[idx] + ale[idx] for idx, key in enumerate(all_methods)]
print(tabulate(results, headers=self.HEADERS))
print('-' * 90 + '\n')
def get_statistics(dic_stats, errors, dic_stds, key):
"""Update statistics of a cluster"""
try:
dic_stats['mean'] = average(errors)
dic_stats['max'] = max(errors)
dic_stats['cnt'] = len(errors)
except (ZeroDivisionError, ValueError):
dic_stats['mean'] = 0.
dic_stats['max'] = 0.
dic_stats['cnt'] = 0.
dic_stats['mean'] = average(errors)
dic_stats['max'] = max(errors)
dic_stats['cnt'] = len(errors)
if key == 'our':
if key == 'monoloco':
dic_stats['std_ale'] = average(dic_stds['ale'])
dic_stats['std_epi'] = average(dic_stds['epi'])
dic_stats['interval'] = average(dic_stds['interval'])

View File

@ -1,14 +1,12 @@
"""Run monoloco over all the pifpaf joints of KITTI images
and extract and save the annotations in txt files"""
import math
import os
import glob
import json
import shutil
import itertools
import copy
from collections import defaultdict
import numpy as np
import torch
@ -16,176 +14,146 @@ import torch
from ..network import MonoLoco
from ..network.process import preprocess_pifpaf
from ..eval.geom_baseline import compute_distance
from ..utils import get_keypoints, pixel_to_camera, xyz_from_distance, get_calibration, depth_from_disparity
from ..utils import get_keypoints, pixel_to_camera, xyz_from_distance, get_calibration, open_annotations, split_training
from .stereo_baselines import baselines_association
from .reid_baseline import ReID, get_reid_features
class GenerateKitti:
def __init__(self, model, dir_ann, p_dropout=0.2, n_dropout=0):
def __init__(self, model, dir_ann, p_dropout=0.2, n_dropout=0, stereo=True):
# Load monoloco
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
self.monoloco = MonoLoco(model=model, device=device, n_dropout=n_dropout, p_dropout=p_dropout)
self.dir_out = os.path.join('data', 'kitti', 'monoloco')
self.dir_ann = dir_ann
# List of images
self.list_basename = factory_basename(dir_ann)
# Extract list of pifpaf files in validation images
dir_gt = os.path.join('data', 'kitti', 'gt')
self.set_basename = factory_basename(dir_ann, dir_gt)
self.dir_kk = os.path.join('data', 'kitti', 'calib')
def run_mono(self):
# Calculate stereo baselines
self.stereo = stereo
if stereo:
self.baselines = ['ml_stereo', 'pose', 'reid']
self.cnt_disparity = defaultdict(int)
self.cnt_no_stereo = 0
# ReID Baseline
weights_path = 'data/models/reid_model_market.pkl'
self.reid_net = ReID(weights_path=weights_path, device=device, num_classes=751, height=256, width=128)
self.dir_images = os.path.join('data', 'kitti', 'images')
self.dir_images_r = os.path.join('data', 'kitti', 'images_r')
def run(self):
"""Run Monoloco and save txt files for KITTI evaluation"""
cnt_ann = cnt_file = cnt_no_file = 0
dir_out = os.path.join('data', 'kitti', 'monoloco')
# Remove the output directory if alreaady exists (avoid residual txt files)
if os.path.exists(dir_out):
shutil.rmtree(dir_out)
os.makedirs(dir_out)
dir_out = {"monoloco": os.path.join('data', 'kitti', 'monoloco')}
make_new_directory(dir_out["monoloco"])
print("\nCreated empty output directory for txt files")
if self.stereo:
for key in self.baselines:
dir_out[key] = os.path.join('data', 'kitti', key)
make_new_directory(dir_out[key])
print("Created empty output directory for {}".format(key))
print("\n")
# Run monoloco over the list of images
for basename in self.list_basename:
for basename in self.set_basename:
path_calib = os.path.join(self.dir_kk, basename + '.txt')
annotations, kk, tt = factory_file(path_calib, self.dir_ann, basename)
boxes, keypoints = preprocess_pifpaf(annotations, im_size=(1242, 374))
assert keypoints, "all pifpaf files should have at least one annotation"
cnt_ann += len(boxes)
cnt_file += 1
if not keypoints:
cnt_no_file += 1
continue
else:
# Run the network and the geometric baseline
outputs, varss = self.monoloco.forward(keypoints, kk)
dds_geom = eval_geometric(keypoints, kk, average_y=0.48)
# Run the network and the geometric baseline
outputs, varss = self.monoloco.forward(keypoints, kk)
dds_geom = eval_geometric(keypoints, kk, average_y=0.48)
# Save the file
uv_centers = get_keypoints(keypoints, mode='bottom') # Kitti uses the bottom center to calculate depth
xy_centers = pixel_to_camera(uv_centers, kk, 1)
outputs = outputs.detach().cpu()
zzs = xyz_from_distance(outputs[:, 0:1], xy_centers)[:, 2].tolist()
all_outputs = [outputs.detach().cpu(), varss.detach().cpu(), dds_geom, zzs]
all_inputs = [boxes, xy_centers]
all_params = [kk, tt]
path_txt = os.path.join(dir_out, basename + '.txt')
save_txts(path_txt, all_inputs, all_outputs, all_params)
path_txt = {'monoloco': os.path.join(dir_out['monoloco'], basename + '.txt')}
save_txts(path_txt['monoloco'], all_inputs, all_outputs, all_params)
# Update counting
cnt_ann += len(boxes)
cnt_file += 1
print("Saved in {} txt {} annotations. Not found {} images\n".format(cnt_file, cnt_ann, cnt_no_file))
# Correct using stereo disparity and save in different folder
if self.stereo:
zzs = self._run_stereo_baselines(basename, boxes, keypoints, zzs, path_calib)
for key in zzs:
path_txt[key] = os.path.join(dir_out[key], basename + '.txt')
save_txts(path_txt[key], all_inputs, zzs[key], all_params, mode='baseline')
def run_stereo(self):
"""Run monoloco on left and right images and alculate disparity if a match is found"""
print("\nSaved in {} txt {} annotations. Not found {} images".format(cnt_file, cnt_ann, cnt_no_file))
cnt_ann = cnt_file = cnt_no_file = cnt_no_stereo = cnt_disparity = 0
dir_out = os.path.join('data', 'kitti', 'monoloco_stereo')
if self.stereo:
print("STEREO:")
for key in self.baselines:
print("Annotations corrected using {} baseline: {:.1f}%".format(
key, self.cnt_disparity[key] / cnt_ann * 100))
print("Maximum possible stereo associations: {:.1f}%".format(self.cnt_disparity['max'] / cnt_ann * 100))
print("Not found {}/{} stereo files".format(self.cnt_no_stereo, cnt_file))
# Remove the output directory if alreaady exists (avoid residual txt files)
if os.path.exists(dir_out):
shutil.rmtree(dir_out)
os.makedirs(dir_out)
print("Created empty output directory for txt STEREO files")
def _run_stereo_baselines(self, basename, boxes, keypoints, zzs, path_calib):
for basename in self.list_basename:
path_calib = os.path.join(self.dir_kk, basename + '.txt')
stereo = True
annotations_r, _, _ = factory_file(path_calib, self.dir_ann, basename, mode='right')
boxes_r, keypoints_r = preprocess_pifpaf(annotations_r, im_size=(1242, 374))
for mode in ['left', 'right']:
annotations, kk, tt = factory_file(path_calib, self.dir_ann, basename, mode=mode)
boxes, keypoints = preprocess_pifpaf(annotations, im_size=(1242, 374))
# Stereo baselines
if keypoints_r:
path_image = os.path.join(self.dir_images, basename + '.png')
path_image_r = os.path.join(self.dir_images_r, basename + '.png')
reid_features = get_reid_features(self.reid_net, boxes, boxes_r, path_image, path_image_r)
zzs, cnt = baselines_association(self.baselines, zzs, keypoints, keypoints_r, reid_features)
if not keypoints and mode == 'left':
cnt_no_file += 1
break
for key in cnt:
self.cnt_disparity[key] += cnt[key]
elif not keypoints and mode == 'right':
stereo = False
else:
# Run the network and the geometric baseline
outputs, varss = self.monoloco.forward(keypoints, kk)
dds_geom = eval_geometric(keypoints, kk, average_y=0.48)
uv_centers = get_keypoints(keypoints, mode='bottom') # Kitti uses the bottom to calculate depth
xy_centers = pixel_to_camera(uv_centers, kk, 1)
if mode == 'left':
outputs_l = outputs.detach().cpu()
varss_l = varss.detach().cpu()
zzs_l = xyz_from_distance(outputs_l[:, 0:1], xy_centers)[:, 2].tolist()
kps_l = copy.deepcopy(keypoints)
boxes_l = boxes
xy_centers_l = xy_centers
dds_geom_l = dds_geom
kk_l = kk
tt_l = tt
else:
kps_r = copy.deepcopy(keypoints)
if stereo:
zzs, cnt = depth_from_disparity(zzs_l, kps_l, kps_r)
cnt_disparity += cnt
else:
zzs = zzs_l
# Save the file
all_outputs = [outputs_l, varss_l, dds_geom_l, zzs]
all_inputs = [boxes_l, xy_centers_l]
all_params = [kk_l, tt_l]
path_txt = os.path.join(dir_out, basename + '.txt')
save_txts(path_txt, all_inputs, all_outputs, all_params)
# Update counting
cnt_ann += len(boxes_l)
cnt_file += 1
# Print statistics
print("Saved in {} txt {} annotations. Not found {} images."
.format(cnt_file, cnt_ann, cnt_no_file))
print("Annotations corrected using stereo: {:.1f}%, not found {} stereo files"
.format(cnt_disparity / cnt_ann * 100, cnt_no_stereo))
else:
self.cnt_no_stereo += 1
zzs = {key: zzs for key in self.baselines}
return zzs
def save_txts(path_txt, all_inputs, all_outputs, all_params):
def save_txts(path_txt, all_inputs, all_outputs, all_params, mode='monoloco'):
outputs, varss, dds_geom, zzs = all_outputs[:]
assert mode in ('monoloco', 'baseline')
if mode == 'monoloco':
outputs, varss, dds_geom, zzs = all_outputs[:]
else:
zzs = all_outputs
uv_boxes, xy_centers = all_inputs[:]
kk, tt = all_params[:]
with open(path_txt, "w+") as ff:
for idx in range(outputs.shape[0]):
for idx, zz_base in enumerate(zzs):
xx = float(xy_centers[idx][0]) * zzs[idx] + tt[0]
yy = float(xy_centers[idx][1]) * zzs[idx] + tt[1]
zz = zzs[idx] + tt[2]
dd = math.sqrt(xx ** 2 + yy ** 2 + zz ** 2)
cam_0 = [xx, yy, zz, dd]
zz = zz_base + tt[2]
cam_0 = [xx, yy, zz]
output_list = [0.]*3 + uv_boxes[idx][:-1] + [0.]*3 + cam_0 + [0.] + uv_boxes[idx][-1:] # kitti format
ff.write("%s " % 'pedestrian')
for el in output_list:
ff.write("%f " % el)
for el in uv_boxes[idx][:]:
ff.write("%s " % el)
for el in cam_0:
ff.write("%s " % el)
ff.write("%s " % float(outputs[idx][1]))
ff.write("%s " % float(varss[idx]))
ff.write("%s " % dds_geom[idx])
# add additional uncertainty information
if mode == 'monoloco':
ff.write("%f " % float(outputs[idx][1]))
ff.write("%f " % float(varss[idx]))
ff.write("%f " % dds_geom[idx])
ff.write("\n")
# Save intrinsic matrix in the last row
for kk_el in itertools.chain(*kk): # Flatten a list of lists
ff.write("%f " % kk_el)
ff.write("\n")
def factory_basename(dir_ann):
""" Return all the basenames in the annotations folder"""
list_ann = glob.glob(os.path.join(dir_ann, '*.json'))
list_basename = [os.path.basename(x).split('.')[0] for x in list_ann]
assert list_basename, " Missing json annotations file to create txt files for KITTI datasets"
return list_basename
def factory_file(path_calib, dir_ann, basename, mode='left'):
"""Choose the annotation and the calibration files. Stereo option with ite = 1"""
@ -201,11 +169,7 @@ def factory_file(path_calib, dir_ann, basename, mode='left'):
kk, tt = p_right[:]
path_ann = os.path.join(dir_ann + '_right', basename + '.png.pifpaf.json')
try:
with open(path_ann, 'r') as f:
annotations = json.load(f)
except FileNotFoundError:
annotations = []
annotations = open_annotations(path_ann)
return annotations, kk, tt
@ -230,3 +194,28 @@ def eval_geometric(keypoints, kk, average_y=0.48):
dds_geom.append(dd_geom)
return dds_geom
def make_new_directory(dir_out):
"""Remove the output directory if already exists (avoid residual txt files)"""
if os.path.exists(dir_out):
shutil.rmtree(dir_out)
os.makedirs(dir_out)
def factory_basename(dir_ann, dir_gt):
""" Return all the basenames in the annotations folder corresponding to validation images"""
# Extract ground truth validation images
names_gt = tuple(os.listdir(dir_gt))
path_train = os.path.join('splits', 'kitti_train.txt')
path_val = os.path.join('splits', 'kitti_val.txt')
_, set_val_gt = split_training(names_gt, path_train, path_val)
set_val_gt = {os.path.basename(x).split('.')[0] for x in set_val_gt}
# Extract pifpaf files corresponding to validation images
list_ann = glob.glob(os.path.join(dir_ann, '*.json'))
set_basename = {os.path.basename(x).split('.')[0] for x in list_ann}
set_val = set_basename.intersection(set_val_gt)
assert set_val, " Missing json annotations file to create txt files for KITTI datasets"
return set_val

View File

@ -0,0 +1,110 @@
import torch
import torch.backends.cudnn as cudnn
from torch import nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as T
from ..utils import open_image
def get_reid_features(reid_net, boxes, boxes_r, path_image, path_image_r):
pil_image = open_image(path_image)
pil_image_r = open_image(path_image_r)
assert boxes and boxes_r
cropped_img = []
for box in boxes:
cropped_img = cropped_img + [pil_image.crop((box[0], box[1], box[2], box[3]))]
cropped_img_r = []
for box in boxes_r:
cropped_img_r = cropped_img_r + [pil_image_r.crop((box[0], box[1], box[2], box[3]))]
features = reid_net.forward(cropped_img)
features_r = reid_net.forward(cropped_img_r)
return features.cpu(), features_r.cpu()
class ReID(object):
def __init__(self, weights_path, device, num_classes=751, height=256, width=128):
super(ReID, self).__init__()
torch.manual_seed(1)
self.device = device
if self.device.type == "cuda":
cudnn.benchmark = True
torch.cuda.manual_seed_all(1)
else:
print("Currently using CPU (GPU is highly recommended)")
self.transform_test = T.Compose([
T.Resize((height, width)),
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
print("ReID Baseline:")
print("Initializing ResNet model")
self.model = ResNet50(num_classes=num_classes, loss={'xent'})
self.model.to(device)
num_param = sum(p.numel() for p in self.model.parameters()) / 1e+06
print("Model size: {:.3f} M".format(num_param))
# load pretrained weights but ignore layers that don't match in size
checkpoint = torch.load(weights_path)
model_dict = self.model.state_dict()
pretrain_dict = {k: v for k, v in checkpoint.items() if k in model_dict and model_dict[k].size() == v.size()}
model_dict.update(pretrain_dict)
self.model.load_state_dict(model_dict)
print("Loaded pretrained weights from '{}'".format(weights_path))
self.model.eval()
def forward(self, images):
image = torch.stack([self.transform_test(image) for image in images], dim=0)
image = image.to(self.device)
with torch.no_grad():
features = self.model(image)
return features
@staticmethod
def calculate_distmat(features_1, features_2=None, use_cosine=False):
query = features_1
if features_2 is not None:
gallery = features_2
else:
gallery = features_1
m = query.size(0)
n = gallery.size(0)
if not use_cosine:
distmat = torch.pow(query, 2).sum(dim=1, keepdim=True).expand(m, n) + \
torch.pow(gallery, 2).sum(dim=1, keepdim=True).expand(n, m).t()
distmat.addmm_(1, -2, query, gallery.t())
else:
features_norm = query/query.norm(dim=1)[:, None]
reference_norm = gallery/gallery.norm(dim=1)[:, None]
distmat = torch.mm(features_norm, reference_norm.transpose(0, 1))
return distmat
class ResNet50(nn.Module):
def __init__(self, num_classes, loss):
super(ResNet50, self).__init__()
self.loss = loss
resnet50 = torchvision.models.resnet50(pretrained=True)
self.base = nn.Sequential(*list(resnet50.children())[:-2])
self.classifier = nn.Linear(2048, num_classes)
self.feat_dim = 2048
def forward(self, x):
x = self.base(x)
x = F.avg_pool2d(x, x.size()[2:])
f = x.view(x.size(0), -1)
if not self.training:
return f
y = self.classifier(f)
if self.loss == {'xent'}:
return y
return y, f

View File

@ -0,0 +1,176 @@
""""Generate stereo baselines for kitti evaluation"""
import warnings
from collections import defaultdict
import numpy as np
from ..utils import get_keypoints
def baselines_association(baselines, zzs, keypoints, keypoints_right, reid_features):
"""compute stereo depth for each of the given stereo baselines"""
# Initialize variables
zzs_stereo = defaultdict()
cnt_stereo = defaultdict(int)
features, features_r, keypoints, keypoints_r = factory_features(
keypoints, keypoints_right, baselines, reid_features)
# count maximum possible associations
cnt_stereo['max'] = min(keypoints.shape[0], keypoints_r.shape[0])
# Filter joints disparity and calculate avg disparity
avg_disparities, disparities_x, disparities_y = mask_joint_disparity(keypoints, keypoints_r)
# Iterate over each left pose
for key in baselines:
# Extract features of the baseline
similarity = features_similarity(features[key], features_r[key], key, avg_disparities, zzs)
# Compute the association based on features minimization and calculate depth
zzs_stereo[key] = np.empty((keypoints.shape[0]))
indices_stereo = [] # keep track of indices
best = np.nanmin(similarity)
while not np.isnan(best):
idx, arg_best = np.unravel_index(np.nanargmin(similarity), similarity.shape) # pylint: disable=W0632
zz_stereo, flag = similarity_to_depth(avg_disparities[idx, arg_best])
zz_mono = zzs[idx]
similarity[idx, :] = np.nan
indices_stereo.append(idx)
# Filter stereo depth
if flag and verify_stereo(zz_stereo, zz_mono, disparities_x[idx, arg_best], disparities_y[idx, arg_best]):
zzs_stereo[key][idx] = zz_stereo
cnt_stereo[key] += 1
similarity[:, arg_best] = np.nan
else:
zzs_stereo[key][idx] = zz_mono
best = np.nanmin(similarity)
indices_mono = [idx for idx, _ in enumerate(zzs) if idx not in indices_stereo]
for idx in indices_mono:
zzs_stereo[key][idx] = zzs[idx]
zzs_stereo[key] = zzs_stereo[key].tolist()
return zzs_stereo, cnt_stereo
def factory_features(keypoints, keypoints_right, baselines, reid_features):
features = defaultdict()
features_r = defaultdict()
for key in baselines:
if key == 'reid':
features[key] = np.array(reid_features[0])
features_r[key] = np.array(reid_features[1])
else:
features[key] = np.array(keypoints)
features_r[key] = np.array(keypoints_right)
return features, features_r, np.array(keypoints), np.array(keypoints_right)
def features_similarity(features, features_r, key, avg_disparities, zzs):
similarity = np.empty((features.shape[0], features_r.shape[0]))
for idx, zz_mono in enumerate(zzs):
feature = features[idx]
if key == 'ml_stereo':
expected_disparity = 0.54 * 721. / zz_mono
sim_row = np.abs(expected_disparity - avg_disparities[idx])
elif key == 'pose':
# Zero-center the keypoints
uv_center = np.array(get_keypoints(feature, mode='center').reshape(-1, 1)) # (1, 2) --> (2, 1)
uv_centers_r = np.array(get_keypoints(features_r, mode='center').unsqueeze(-1)) # (m,2) --> (m, 2, 1)
feature_0 = feature[:2, :] - uv_center
feature_0 = feature_0.reshape(1, -1) # (1, 34)
features_r_0 = features_r[:, :2, :] - uv_centers_r
features_r_0 = features_r_0.reshape(features_r_0.shape[0], -1) # (m, 34)
sim_row = np.linalg.norm(feature_0 - features_r_0, axis=1)
else:
sim_row = np.linalg.norm(feature - features_r, axis=1)
similarity[idx] = sim_row
return similarity
def similarity_to_depth(avg_disparity):
try:
zz_stereo = 0.54 * 721. / float(avg_disparity)
flag = True
except (ZeroDivisionError, ValueError): # All nan-slices or zero division
zz_stereo = np.nan
flag = False
return zz_stereo, flag
def mask_joint_disparity(keypoints, keypoints_r):
"""filter joints based on confidence and interquartile range of the distribution"""
CONF_MIN = 0.3
with warnings.catch_warnings() and np.errstate(invalid='ignore'):
disparity_x_mask = np.empty((keypoints.shape[0], keypoints_r.shape[0], 17))
disparity_y_mask = np.empty((keypoints.shape[0], keypoints_r.shape[0], 17))
avg_disparity = np.empty((keypoints.shape[0], keypoints_r.shape[0]))
for idx, kps in enumerate(keypoints):
disparity_x = kps[0, :] - keypoints_r[:, 0, :]
disparity_y = kps[1, :] - keypoints_r[:, 1, :]
# Mask for low confidence
mask_conf_left = kps[2, :] > CONF_MIN
mask_conf_right = keypoints_r[:, 2, :] > CONF_MIN
mask_conf = mask_conf_left & mask_conf_right
disparity_x_conf = np.where(mask_conf, disparity_x, np.nan)
disparity_y_conf = np.where(mask_conf, disparity_y, np.nan)
# Mask outliers using iqr
mask_outlier = interquartile_mask(disparity_x_conf)
x_mask_row = np.where(mask_outlier, disparity_x_conf, np.nan)
y_mask_row = np.where(mask_outlier, disparity_y_conf, np.nan)
avg_row = np.nanmedian(x_mask_row, axis=1) # ignore the nan
# Append
disparity_x_mask[idx] = x_mask_row
disparity_y_mask[idx] = y_mask_row
avg_disparity[idx] = avg_row
return avg_disparity, disparity_x_mask, disparity_y_mask
def verify_stereo(zz_stereo, zz_mono, disparity_x, disparity_y):
"""Verify disparities based on coefficient of variation, maximum y difference and z difference wrt monoloco"""
COV_MIN = 0.1
y_max_difference = (50 / zz_mono)
z_max_difference = 0.6 * zz_mono
cov = float(np.nanstd(disparity_x) / np.abs(np.nanmean(disparity_x))) # Coefficient of variation
avg_disparity_y = np.nanmedian(disparity_y)
if abs(zz_stereo - zz_mono) < z_max_difference and \
avg_disparity_y < y_max_difference and \
cov < COV_MIN:
return True
# if not np.isnan(zz_stereo):
# return True
return False
def interquartile_mask(distribution):
quartile_1, quartile_3 = np.nanpercentile(distribution, [25, 75], axis=1)
iqr = quartile_3 - quartile_1
lower_bound = quartile_1 - (iqr * 1.5)
upper_bound = quartile_3 + (iqr * 1.5)
return (distribution < upper_bound.reshape(-1, 1)) & (distribution > lower_bound.reshape(-1, 1))

View File

@ -133,10 +133,9 @@ def main():
if args.generate:
from .eval import GenerateKitti
kitti_txt = GenerateKitti(args.model, args.dir_ann, p_dropout=args.dropout, n_dropout=args.n_dropout)
kitti_txt.run_mono()
if args.stereo:
kitti_txt.run_stereo()
kitti_txt = GenerateKitti(args.model, args.dir_ann, p_dropout=args.dropout, n_dropout=args.n_dropout,
stereo=args.stereo)
kitti_txt.run()
if args.dataset == 'kitti':
from .eval import EvalKitti

View File

@ -1,8 +1,7 @@
from .iou import get_iou_matches, reorder_matches, get_iou_matrix
from .misc import get_task_error, get_pixel_error, append_cluster
from .misc import get_task_error, get_pixel_error, append_cluster, open_annotations
from .kitti import check_conditions, get_category, split_training, parse_ground_truth, get_calibration
from .camera import xyz_from_distance, get_keypoints, pixel_to_camera, project_3d
from .camera import xyz_from_distance, get_keypoints, pixel_to_camera, project_3d, open_image
from .logs import set_logger
from .stereo import depth_from_disparity
from ..utils.nuscenes import select_categories

View File

@ -2,15 +2,16 @@
import numpy as np
import torch
import torch.nn.functional as F
from PIL import Image
def pixel_to_camera(uv_tensor, kk, z_met):
"""
Convert a tensor in pixel coordinate to absolute camera coordinates
It accepts lists or tensors of (m, 2) or (m, x, 2) or (m, 2, x)
It accepts lists or torch/numpy tensors of (m, 2) or (m, x, 2)
where x is the number of keypoints
"""
if isinstance(uv_tensor, list):
if isinstance(uv_tensor, (list, np.ndarray)):
uv_tensor = torch.tensor(uv_tensor)
if isinstance(kk, list):
kk = torch.tensor(kk)
@ -67,14 +68,13 @@ def project_3d(box_obj, kk):
def get_keypoints(keypoints, mode):
"""
Extract center, shoulder or hip points of a keypoint
Input --> list or torch.tensor [(m, 3, 17) or (3, 17)]
Input --> list or torch/numpy tensor [(m, 3, 17) or (3, 17)]
Output --> torch.tensor [(m, 2)]
"""
if isinstance(keypoints, list):
if isinstance(keypoints, (list, np.ndarray)):
keypoints = torch.tensor(keypoints)
if len(keypoints.size()) == 2: # add batch dim
keypoints = keypoints.unsqueeze(0)
assert len(keypoints.size()) == 3 and keypoints.size()[1] == 3, "tensor dimensions not recognized"
assert mode in ['center', 'bottom', 'head', 'shoulder', 'hip', 'ankle']
@ -174,3 +174,9 @@ def xyz_from_distance(distances, xy_centers):
assert xy_centers.size()[-1] == 3 and distances.size()[-1] == 1, "Size of tensor not recognized"
return xy_centers * distances / torch.sqrt(1 + xy_centers[:, 0:1].pow(2) + xy_centers[:, 1:2].pow(2))
def open_image(path_image):
with open(path_image, 'rb') as f:
pil_image = Image.open(f).convert('RGB')
return pil_image

View File

@ -74,15 +74,9 @@ def check_conditions(line, category, method, thresh=0.3):
"""Check conditions of our or m3d txt file"""
check = False
assert method in ['gt', 'm3d', '3dop', 'our'], "Method %r not recognized" % method
assert category in ['pedestrian', 'cyclist', 'all']
if method in ('m3d', '3dop'):
conf = line.split()[15]
if line.split()[0] == category and float(conf) >= thresh:
check = True
elif method == 'gt':
if method == 'gt':
if category == 'all':
categories_gt = ['Pedestrian', 'Person_sitting', 'Cyclist']
else:
@ -90,8 +84,17 @@ def check_conditions(line, category, method, thresh=0.3):
if line.split()[0] in categories_gt:
check = True
elif method == 'our':
if line[4] >= thresh:
elif method in ('m3d', '3dop'):
conf = float(line[15])
if line[0] == category and conf >= thresh:
check = True
elif method == 'monodepth':
check = True
else:
conf = float(line[15])
if conf >= thresh:
check = True
return check

View File

@ -1,4 +1,4 @@
import json
def append_cluster(dic_jo, phase, xx, dd, kps):
"""Append the annotation based on its distance"""
@ -36,3 +36,12 @@ def get_pixel_error(dd_gt, zz_gt):
disp = 0.54 * 721 / zz_gt
delta_z = zz_gt - 0.54 * 721 / (disp - 1)
return dd_gt + delta_z
def open_annotations(path_ann):
try:
with open(path_ann, 'r') as f:
annotations = json.load(f)
except FileNotFoundError:
annotations = []
return annotations

View File

@ -1,87 +0,0 @@
import copy
import warnings
import numpy as np
def depth_from_disparity(zzs, kps, kps_right):
"""Associate instances in left and right images and compute disparity"""
zzs_stereo = []
zzs = np.array(zzs)
kps = np.array(kps)
kps_right_list = copy.deepcopy(kps_right)
cnt_stereo = 0
expected_disps = 0.54 * 721 / np.array(zzs)
for idx, zz_mono in enumerate(zzs):
if kps_right_list:
zz_stereo, disparity_x, disparity_y, idx_min = filter_disparities(kps, kps_right_list, idx, expected_disps)
if verify_stereo(zz_stereo, zz_mono, disparity_x, disparity_y):
zzs_stereo.append(zz_stereo)
cnt_stereo += 1
kps_right_list.pop(idx_min)
else:
zzs_stereo.append(zz_mono)
else:
zzs_stereo.append(zz_mono)
return zzs_stereo, cnt_stereo
def filter_disparities(kps, kps_right_list, idx, expected_disps):
"""filter joints based on confidence and interquartile range of the distribution"""
CONF_MIN = 0.3
kps_right = np.array(kps_right_list)
with warnings.catch_warnings() and np.errstate(invalid='ignore'):
try:
disparity_x = kps[idx, 0, :] - kps_right[:, 0, :]
disparity_y = kps[idx, 1, :] - kps_right[:, 1, :]
# Mask for low confidence
mask_conf_left = kps[idx, 2, :] > CONF_MIN
mask_conf_right = kps_right[:, 2, :] > CONF_MIN
mask_conf = mask_conf_left & mask_conf_right
disparity_x_conf = np.where(mask_conf, disparity_x, np.nan)
disparity_y_conf = np.where(mask_conf, disparity_y, np.nan)
# Mask outliers using iqr
mask_outlier = interquartile_mask(disparity_x_conf)
disparity_x_mask = np.where(mask_outlier, disparity_x_conf, np.nan)
disparity_y_mask = np.where(mask_outlier, disparity_y_conf, np.nan)
avg_disparity_x = np.nanmedian(disparity_x_mask, axis=1) # ignore the nan
diffs_x = [abs(expected_disps[idx] - real) for real in avg_disparity_x]
idx_min = diffs_x.index(min(diffs_x))
zz_stereo = 0.54 * 721. / float(avg_disparity_x[idx_min])
except ZeroDivisionError:
zz_stereo = - 100
return zz_stereo, disparity_x_mask[idx_min], disparity_y_mask[idx_min], idx_min
def verify_stereo(zz_stereo, zz_mono, disparity_x, disparity_y):
COV_MIN = 0.1
y_max_difference = (50 / zz_mono)
z_max_difference = 0.6 * zz_mono
cov = float(np.nanstd(disparity_x) / np.abs(np.nanmean(disparity_x))) # Coefficient of variation
avg_disparity_y = np.nanmedian(disparity_y)
if abs(zz_stereo - zz_mono) < z_max_difference and \
avg_disparity_y < y_max_difference and \
cov < COV_MIN:
return True
return False
def interquartile_mask(distribution):
quartile_1, quartile_3 = np.nanpercentile(distribution, [25, 75], axis=1)
iqr = quartile_3 - quartile_1
lower_bound = quartile_1 - (iqr * 1.5)
upper_bound = quartile_3 + (iqr * 1.5)
return (distribution < upper_bound.reshape(-1, 1)) & (distribution > lower_bound.reshape(-1, 1))

View File

@ -1,3 +1,3 @@
from .printer import Printer
from .figures import show_results, show_spread
from .figures import show_results, show_spread, show_task_error

View File

@ -24,10 +24,8 @@ def show_results(dic_stats, show=False, save=False):
x_max = 38
xx = np.linspace(0, 60, 100)
excl_clusters = ['all', '50', '>50', 'easy', 'moderate', 'hard']
clusters = tuple([clst for clst in dic_stats[phase]['our'] if clst not in excl_clusters])
clusters = tuple([clst for clst in dic_stats[phase]['monoloco'] if clst not in excl_clusters])
yy_gender = get_task_error(xx)
yy_gps = np.linspace(5., 5., xx.shape[0])
plt.figure(0)
plt.grid(linewidth=0.2)
@ -41,21 +39,22 @@ def show_results(dic_stats, show=False, save=False):
colors = ['r', 'deepskyblue', 'grey', 'b', 'darkorange']
lstyles = ['solid', 'solid', 'solid', 'solid', 'dashdot']
plt.plot(xx, yy_gps, '-', label="GPS Error", color='y')
for idx, method in enumerate(['m3d_merged', 'geom_merged', 'md_merged', 'our_merged', '3dop_merged']):
for idx, method in enumerate(['m3d_merged', 'geometric_merged', 'monodepth_merged', 'monoloco_merged',
'3dop_merged']):
errs = [dic_stats[phase][method][clst]['mean'] for clst in clusters]
assert errs, "method %s empty" % method
xxs = get_distances(clusters)
plt.plot(xxs, errs, marker=mks[idx], markersize=mksizes[idx], linewidth=lws[idx], label=labels[idx],
linestyle=lstyles[idx], color=colors[idx])
plt.plot(xx, yy_gender, '--', label="Task error", color='lightgreen', linewidth=2.5)
plt.legend(loc='upper left')
if show:
plt.show()
if save:
path_fig = os.path.join(dir_out, 'results.png')
plt.savefig(path_fig)
print("Figure of results saved in {}".format(path_fig))
if show:
plt.show()
plt.close()
@ -103,12 +102,12 @@ def show_spread(dic_stats, show=False, save=False):
fig.subplots_adjust(hspace=0.1)
plt.setp([aa.get_yticklabels() for aa in fig.axes[:-1]], visible=False)
plt.legend()
if show:
plt.show()
if save:
path_fig = os.path.join(dir_out, 'spread.png')
plt.savefig(path_fig)
print("Figure of confidence intervals saved in {}".format(path_fig))
if show:
plt.show()
plt.close()
@ -129,9 +128,7 @@ def show_task_error(show, save):
yy_young_male = target_error(xx, mm_young_male)
yy_young_female = target_error(xx, mm_young_female)
yy_gender = target_error(xx, mm_gmm)
yy_gps = np.linspace(5., 5., xx.shape[0])
plt.grid(linewidth=0.3)
plt.plot(xx, yy_gps, color='y', label='GPS')
plt.plot(xx, yy_young_male, linestyle='dotted', linewidth=2.1, color='b', label='Adult/young male')
plt.plot(xx, yy_young_female, linestyle='dotted', linewidth=2.1, color='darkorange', label='Adult/young female')
plt.plot(xx, yy_gender, '--', color='lightgreen', linewidth=2.8, label='Generic adult (task error)')
@ -141,12 +138,12 @@ def show_task_error(show, save):
plt.xlabel("Ground-truth distance from the camera $d_{gt}$ [m]")
plt.ylabel("Localization error $\hat{e}$ due to human height variation [m]") # pylint: disable=W1401
plt.legend(loc=(0.01, 0.55)) # Location from 0 to 1 from lower left
if show:
plt.show()
if save:
path_fig = os.path.join(dir_out, 'task_error.png')
plt.savefig(path_fig)
print("Figure of task error saved in {}".format(path_fig))
if show:
plt.show()
plt.close()

View File

@ -53,7 +53,7 @@ def test_package():
# Training test
val_acc, model = tst_trainer(JOINTS)
assert val_acc < 2
assert val_acc < 2.5
# Prediction test
dic_out, kk = tst_prediction(model, PIFPAF_KEYPOINTS)