Commit 8a6533da authored by Duncan Deveaux's avatar Duncan Deveaux
Browse files

Improved package structure

parent b0efdde3
#!/usr/bin/env python3
'''
Author: Duncan Deveaux
'''
import numpy as np
import pandas as pd
import seaborn as sn
from scipy import stats
import sys
sys.path.append('tools')
import read_csv as rd
class ExitTracking:
def __init__(self, topology):
self.vehicles = {}
self.training_data = {}
self.topology = topology
def update_vehicle(self, obj):
vehicle_id = obj[rd.TRACK_ID]
if vehicle_id not in self.vehicles:
self.vehicles[vehicle_id] = {'tracks':[], 'exit_point':None}
if vehicle_id in self.training_data: #Vehicle already exited, skip
return
# Is vehicle exiting?
is_exiting = self.topology.getobjectexits(obj)
if is_exiting != -1:
#print ("vid {} exiting on exit {}".format(vehicle_id, is_exiting))
self.vehicles[vehicle_id]['exit_point'] = is_exiting
self.generate_training_data(vehicle_id)
else:
# Add track data
# 1. current lane id
current_lane = self.topology.get_lane_distance(obj)
if current_lane == None: #Not in the roundabout yet, skip
return
# 2. relative heading
signed_relheading = self.topology.get_relative_heading(obj)
# 3. straight-line distance to next exit.
(next_exit_id, distance) = self.topology.get_distance_to_next_exit(obj)
self.vehicles[vehicle_id]['tracks'].append((current_lane, signed_relheading, distance, next_exit_id))
def generate_training_data(self, vehicle_id):
if vehicle_id not in self.vehicles or self.vehicles[vehicle_id]['exit_point'] == None:
return
if vehicle_id in self.training_data:
print ("Warning: object id {} has already been added to ExitTracking training data.".format(vehicle_id))
return
self.training_data[vehicle_id] = []
for track in self.vehicles[vehicle_id]['tracks']:
self.training_data[vehicle_id].append( (track[0], track[1], track[2], (track[3] == self.vehicles[vehicle_id]['exit_point'])) )
#!/usr/bin/env python3
'''
Author: Duncan Deveaux
'''
from os import listdir
from os.path import isfile, join
import sys
sys.path.append('tools')
import read_csv as rd
import topology as topology
from consts import ROUND_PATH
def get_input_for_location(location):
input_ids = []
data_files = [f for f in listdir(ROUND_PATH) if isfile(join(ROUND_PATH, f))]
for filename in data_files:
if filename.endswith('recordingMeta.csv'):
meta_info = rd.read_meta_info({'input_meta_path': join(ROUND_PATH, filename)})
if int(meta_info[rd.LOCATION_ID]) == location:
split = filename.split('_')
if len(split) == 0:
print("Warning: filename {} is not separated by '_'".format(filename))
else:
input_ids.append(split[0])
return input_ids
def get_topology_for_location(location):
if location == 0:
return topology.Topology.roundDLocation0Topology()
elif location == 1:
return topology.Topology.roundDLocation1Topology()
elif location == 2:
return topology.Topology.roundDLocation2Topology()
else:
raise Exception("The topology for location {} has not been defined.".format(location))
#!/usr/bin/env python3
'''
Author: Duncan Deveaux
'''
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from scipy import stats
plt.rcParams["figure.figsize"] = (10,8.5)
plt.rcParams['pdf.fonttype'] = 42
plt.rcParams['ps.fonttype'] = 42
''' Plot a linear regression.
see https://stackoverflow.com/questions/27164114/show-confidence-limits-and-prediction-limits-in-scatter-plot
'''
def plot_linreg(ax, x, y, confidence=0.90):
slope, intercept = np.polyfit(x, y, 1) # linear model adjustment
y_model = np.polyval([slope, intercept], x) # modeling...
x_mean = np.mean(x)
y_mean = np.mean(y)
n = x.size # number of samples
m = 2 # number of parameters
dof = n - m # degrees of freedom
t = stats.t.ppf(confidence, dof) # Students statistic of interval confidence
residual = y - y_model
std_error = (np.sum(residual**2) / dof)**.5 # Standard deviation of the error
# calculating the r2
# https://www.statisticshowto.com/probability-and-statistics/coefficient-of-determination-r-squared/
# Pearson's correlation coefficient
numerator = np.sum((x - x_mean)*(y - y_mean))
denominator = ( np.sum((x - x_mean)**2) * np.sum((y - y_mean)**2) )**.5
correlation_coef = numerator / denominator
r2 = correlation_coef**2
# mean squared error
MSE = 1/n * np.sum( (y - y_model)**2 )
# to plot the adjusted model
x_line = np.linspace(np.min(x), np.max(x), 100)
y_line = np.polyval([slope, intercept], x_line)
# confidence interval
ci = t * std_error * (1/n + (x_line - x_mean)**2 / np.sum((x - x_mean)**2))**.5
# predicting interval
pi = t * std_error * (1 + 1/n + (x_line - x_mean)**2 / np.sum((x - x_mean)**2))**.5
############### Plotting
ax.plot(x_line, y_line, color = 'black')
ax.plot(x_line, y_line + pi, color = 'grey', alpha=0.8, label = '95% prediction interval', linestyle='--')
ax.plot(x_line, y_line - pi, color = 'grey', alpha=0.8, linestyle='--')
ax.fill_between(x_line, y_line + ci, y_line - ci, color = 'grey', alpha=0.2, label = '95% confidence interval')
# rounding and position must be changed for each case and preference
a = str(np.round(intercept))
b = str(np.round(slope,2))
r2s = str(np.round(r2,2))
MSEs = str(np.round(MSE))
ax.text(0.04,0.975, 'y = ' + a + ' + ' + b + ' x', transform = ax.transAxes)
ax.text(0.04,0.945, '$r^2$ = ' + r2s + ' MSE = ' + MSEs, transform = ax.transAxes)
#!/usr/bin/env python3
'''
Author: Duncan Deveaux
'''
import math
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.patches as patches
plt.rcParams['pdf.fonttype'] = 42
plt.rcParams['ps.fonttype'] = 42
import sys
sys.path.append('tools')
import read_csv as rd
import exit_model
# https://stackoverflow.com/questions/31735499/calculate-angle-clockwise-between-two-points
def angle_between(p1, p2):
d1 = p2[0] - p1[0]
d2 = p2[1] - p1[1]
if d1 == 0:
if d2 == 0: # same points?
deg = 0
else:
deg = 0 if p1[1] > p2[1] else 180
elif d2 == 0:
deg = 90 if p1[0] < p2[0] else 270
else:
deg = math.atan(d2 / d1) / np.pi * 180
lowering = p1[1] < p2[1]
if (lowering and deg < 0) or (not lowering and deg > 0):
deg += 270
else:
deg += 90
return (deg+270) % 360
#https://stackoverflow.com/questions/34372480/rotate-point-about-another-point-in-degrees-python
def rotate_around(p, origin, degrees):
angle = np.deg2rad(degrees)
R = np.array([[np.cos(angle), -np.sin(angle)],
[np.sin(angle), np.cos(angle)]])
o = np.atleast_2d(origin)
p = np.atleast_2d(p)
return np.squeeze((R @ (p.T-o.T) + o.T).T)
def get_object_front(obj):
(x,y) = (obj[rd.X], obj[rd.Y] - obj[rd.HEIGHT]/2)
obj_center = (obj[rd.X], obj[rd.Y])
obj_angle = obj[rd.HEADING]+90
return rotate_around((x,y), obj_center, obj_angle)
def get_object_back(obj):
(x,y) = (obj[rd.X], obj[rd.Y] + obj[rd.HEIGHT]/2)
obj_center = (obj[rd.X], obj[rd.Y])
obj_angle = obj[rd.HEADING]+90
return rotate_around((x,y), obj_center, obj_angle)
# Lane: represents a circular lane, centered around a roundabout.
class Lane:
def __init__(self, center, radius_begin, radius_end, nb_slices=30):
self.center = center
self.radius_begin = radius_begin
self.radius_end = radius_end
self.slices = []
slice_step_deg = 360.0/nb_slices
for i in range(nb_slices):
self.slices.append((slice_step_deg*i, slice_step_deg*(i+1)))
def contains_point(self, obj_pos):
center_dist = np.linalg.norm(np.array(self.center)-np.array(obj_pos))
return center_dist >= self.radius_begin and center_dist < self.radius_end
# Whether the lane contains any point of obj
def intersects(self, obj):
# Coordinates
(x1,x2) = (obj[rd.X] - obj[rd.WIDTH]/2, obj[rd.X] + obj[rd.WIDTH]/2)
(y1,y2) = (obj[rd.Y] - obj[rd.HEIGHT]/2, obj[rd.Y] + obj[rd.HEIGHT]/2)
obj_center = (obj[rd.X], obj[rd.Y])
obj_angle = obj[rd.HEADING]+90
#Apply rotation
return ( self.contains_point(rotate_around((x1,y1), obj_center, obj_angle)) or
self.contains_point(rotate_around((x1,y2), obj_center, obj_angle)) or
self.contains_point(rotate_around((x2,y1), obj_center, obj_angle)) or
self.contains_point(rotate_around((x2,y2), obj_center, obj_angle)) )
# Returns the slice containing obj
# or -1 if the object was not found in the lane
def slice_of(self, obj):
if not self.intersects(obj):
return -1
for slice_ix in range(len(self.slices)):
if self.slice_contains(slice_ix, obj):
return slice_ix
return -1
# Preconditions:
# self.contains(obj) must be true.
def slice_contains(self, slice_ix, obj):
obj_angle = angle_between(self.center, (obj[rd.X], obj[rd.Y]))
return obj_angle >= self.slices[slice_ix][0] and obj_angle < self.slices[slice_ix][1]
def frontvehicleof(self, objectsList, objectId):
obj_slice = self.slice_of(objectId)
if obj_slice == -1:
return None
nb_slices = len(self.slices)
slice_ix = obj_slice
for _ in range(nb_slices//2):
slice_ix = (slice_ix + 1) % nb_slices
# Is there a vehicle in slice_ix ? (If yes it is the closest front vehicle).
for obj in objectsList:
if obj[rd.TRACK_ID] != objectId and self.intersects(obj) and self.slice_contains(slice_ix, obj):
return obj
return None
# Lane: describes a roundabout, with a set of lanes.
class Topology:
''' NOTE: The number of circular lanes for each roundabout as well as
the roundabout center points are different for each roundabout.
This is a utilitary method to define a circular lanes topology that
is suitable for the rounD location of ID=0 '''
@staticmethod
def roundDLocation0Topology():
roundabout_center = (81.0, -47.1)
#Lanes in circular_lanes must be sorted from the closest
#to the further from the roundabout center.
circular_lanes = [Lane(roundabout_center, radius_begin=15, radius_end=17.25),
Lane(roundabout_center, radius_begin=17.25, radius_end=19.5),
Lane(roundabout_center, radius_begin=19.5, radius_end=21.75),
Lane(roundabout_center, radius_begin=21.75, radius_end=24)]
exit_points = [(97.2,-24.3), (55.5,-35.8), (65.3,-70.0), (106.0,-59.0)]
print("Generated Location 0 Topology...")
return Topology(roundabout_center, circular_lanes, exit_points)
# TODO! Define the lanes position and roundabout center for other topologies : the following pattern can be used:
@staticmethod
def roundDLocation1Topology():
roundabout_center = (115.6, -70)
circular_lanes = [Lane(roundabout_center, radius_begin=8, radius_end=10.25),
Lane(roundabout_center, radius_begin=10.25, radius_end=12.5),
Lane(roundabout_center, radius_begin=12.5, radius_end=14.75)]
exit_points = [(121.0, -52.0), (97.8, -68.5), (133.2, -75.0), (111.0, -89.0)]
print("Generated Location 1 Topology...")
return Topology(roundabout_center, circular_lanes, exit_points)
@staticmethod
def roundDLocation2Topology():
roundabout_center = (138.0, -61.3)
circular_lanes = [Lane(roundabout_center, radius_begin=6.75, radius_end=9),
Lane(roundabout_center, radius_begin=9.00, radius_end=11.25)]
exit_points = [(150.8, -53.0), (136.4, -45.0), (125.0, -68.5), (146.2, -73.8)]
print("Generated Location 2 Topology...")
return Topology(roundabout_center, circular_lanes, exit_points)
# exit_points must be given in trigonometric order starting from the smallest angle.
def __init__(self, roundabout_center, circular_lanes, exit_points, entry_lanes={}):
self.exits_radius = 3
self.roundabout_center = roundabout_center
self.circular_lanes = circular_lanes #Sorted from the closest to the further from the roundabout center.
self.entry_lanes = entry_lanes
# Convert to polar coordinates and extract first/last exits
self.exit_points_cartesian = exit_points
self.exit_points = []
for (ix, exit_point) in enumerate(exit_points):
radius = np.linalg.norm(np.array(self.roundabout_center)-np.array(exit_point))
angle = angle_between(self.roundabout_center, exit_point)
self.exit_points.append((radius, angle))
''' Returns lane id starting from the outer lane'''
def get_lane_distance(self, obj):
laneid = self.getlaneidof(obj)
if laneid == None:
return None
return (len(self.circular_lanes) - 1) - laneid
def getnextexit(self, obj):
center_angle = angle_between(self.roundabout_center, (obj[rd.X], obj[rd.Y]))
#1. Before first exit?
if center_angle >= self.exit_points[-1][1] or center_angle < self.exit_points[0][1]:
return 0
#2. Before another exit?
for i in range(len(self.exit_points)):
exit_angle = self.exit_points[i][1]
previous_angle = self.exit_points[-1][1]
if i > 0:
previous_angle = self.exit_points[i-1][1]
if center_angle >= previous_angle and center_angle < exit_angle:
return i
'''
def get_distance_to_next_exit(self, obj):
next_exit = self.getnextexit(obj)
# Compute distance.
obj_pos = get_object_front(obj)
radius = np.linalg.norm(np.array(self.roundabout_center)-np.array(obj_pos))
angle = (self.exit_points[next_exit][1] - angle_between(self.roundabout_center, obj_pos)) % 360
distance = np.deg2rad(angle) * radius
return distance
'''
def get_distance_to_next_exit(self, obj):
next_exit = self.getnextexit(obj)
obj_pos = get_object_front(obj)
return (next_exit, np.linalg.norm(np.array(self.exit_points_cartesian[next_exit])-np.array(obj_pos)))
def getobjectexits(self, obj):
for (ix, exit_point) in enumerate(self.exit_points_cartesian):
dist = np.linalg.norm(np.array(exit_point)-np.array((obj[rd.X], obj[rd.Y])))
if dist <= self.exits_radius:
return ix
return -1
def gettangentialangle(self, obj):
return (angle_between(self.roundabout_center, get_object_front(obj)) + 90) % 360
def get_relative_heading(self, obj):
relative_heading = (obj[rd.HEADING] - self.gettangentialangle(obj))
signed_relheading = relative_heading
if relative_heading > 180.0:
signed_relheading -= 360.0
elif relative_heading < -180.0:
signed_relheading += 360.0
return signed_relheading
def getlaneof(self, obj):
# Is the object in a circular lane ?
#hardcoded yes for now.
for lane in self.circular_lanes:
if lane.contains_point((obj[rd.X], obj[rd.Y])):
return lane
return None
# Returns the lane id with origin at the furthest lane from the roundabout center
def getlaneidof(self, obj):
# Is the object in a circular lane ?
#hardcoded yes for now.
nb_lanes = len(self.circular_lanes)
for (ix, lane) in enumerate(self.circular_lanes):
if lane.contains_point((obj[rd.X], obj[rd.Y])):
return nb_lanes - (ix+1)
return None
def is_inside_roundabout(self, obj):
for lane in self.circular_lanes:
if lane.intersects(obj):
return True
return False
def getfrontvehicle(self, objectsList, obj):
obj_lane = self.getlaneof(obj)
if obj_lane == None:
return None
return obj_lane.frontvehicleof(objectsList, obj)
def getTTC(self, objectsList, obj):
front = self.getfrontvehicle(objectsList, obj)
if front == None:
return None
# Compute distance.
following_pos = np.array(get_object_front(obj))
front_pos = np.array(get_object_back(front))
rd_center = np.array(self.roundabout_center)
radius = np.linalg.norm(np.array(self.roundabout_center)-np.array(following_pos))
angle = (angle_between(rd_center, front_pos) - angle_between(rd_center, following_pos)) % 360
distance = np.deg2rad(angle) * radius
# Compute velocity difference
vel_following = np.linalg.norm(np.array([obj[rd.X_VELOCITY], obj[rd.Y_VELOCITY]]))
vel_front = np.linalg.norm(np.array([front[rd.X_VELOCITY], front[rd.Y_VELOCITY]]))
vel_diff = vel_following - vel_front
# Compute TTC
if (vel_diff == 0):
return None
return (front, distance/vel_diff)
# The following vehicles should be located inside the roundabout
def get_risk_probability(self, obj_following, obj_front, model_exit_probability):
next_exit_following = self.getnextexit(obj_following)
next_exit_front = self.getnextexit(obj_front)
if next_exit_following == next_exit_front:
# No probability that the following vehicle exits before the collision occurs,
# as no exit is located between the following and the front vehicle
return 1
else:
# Return the probability that following does not exit the roundabout at next exit
# before having a chance of collision with 'front' (and nullifying the collision risk).
current_lane = self.get_lane_distance(obj_following)
if current_lane == None:
raise ValueError('get_risk_probability: The following vehicle {} is not in the roundabout'.format(obj_following[rd.TRACK_ID]))
relative_heading = self.get_relative_heading(obj_following)
(_, next_exit_dist) = self.get_distance_to_next_exit(obj_following)
return 1 - exit_model.get_exit_probability(model_exit_probability, current_lane, relative_heading, next_exit_dist)
def get_risk_strength(self, ttc, ttc_threshold):
if ttc > ttc_threshold or ttc <= 0:
return 0
return (1 - ttc/ttc_threshold)
def draw(self):
for lane in self.circular_lanes:
draw_lane(lane)
# Draw exit points
for (ix, exit_point) in enumerate(self.exit_points):
point_angle = np.deg2rad(exit_point[1])
point = (self.roundabout_center[0] + exit_point[0] * np.cos(point_angle),
self.roundabout_center[1] + exit_point[0] * np.sin(point_angle)) #Cartesian coordinates
(xe, ye) = draw_circle(point, self.exits_radius)
plt.plot(xe, ye, color="red")
plt.text(point[0], point[1], "{}".format(ix), color='grey', ha='center', va='center')
# Utils
def draw_circle(center, radius):
x, y = [], []
for i in range(360):
rad = np.deg2rad(i)
x.append(center[0] + radius * np.cos(rad))
y.append(center[1] + radius * np.sin(rad))
return (x, y)
def draw_lane(lane):
(x1,y1) = draw_circle(lane.center, lane.radius_begin)
(x2,y2) = draw_circle(lane.center, lane.radius_end)
for i in range(len(lane.slices)):
i_next = 0
if i < len(lane.slices)-1:
i_next = i+1
slice_angle = np.deg2rad(lane.slices[i][0])
slice_x = [lane.center[0]+lane.radius_begin*np.cos(slice_angle),
lane.center[0]+lane.radius_end*np.cos(slice_angle)]
slice_y = [lane.center[1]+lane.radius_begin*np.sin(slice_angle),
lane.center[1]+lane.radius_end*np.sin(slice_angle)]
# Next angle
slice2_angle = np.deg2rad(lane.slices[i_next][0])
slice2_x = [lane.center[0]+lane.radius_begin*np.cos(slice2_angle),
lane.center[0]+lane.radius_end*np.cos(slice2_angle)]
slice2_y = [lane.center[1]+lane.radius_begin*np.sin(slice2_angle),
lane.center[1]+lane.radius_end*np.sin(slice2_angle)]
text_x = ((slice_x[0]+slice_x[1])/2 + (slice2_x[0]+slice2_x[1])/2) / 2
text_y = ((slice_y[0]+slice_y[1])/2 + (slice2_y[0]+slice2_y[1])/2) / 2
# end - next angle
plt.plot(slice_x, slice_y, color="blue", alpha=0.5)
plt.text(text_x, text_y, "{}".format(i), fontsize=8, ha='center', va='center', color='white', alpha=0.5)
plt.plot(x1,y1,color="blue")
plt.plot(x2,y2,color="blue")
#!/usr/bin/env python3
'''
Author: Duncan Deveaux
'''
import numpy as np