Commit b0efdde3 authored by Duncan Deveaux's avatar Duncan Deveaux
Browse files

Improved package structure

parent dfb4a0e2
......@@ -4,7 +4,7 @@ Estimation of the relationship between the variation of TTC in an area and the a
# Files description
* To use the provided code, `ROUND_PATH = "path/to/round/data/"` should be replaced with the path to your local copy of the rounD dataset in the [topology.py](topology.py) file.
* To use the provided code, `ROUND_PATH = "path/to/round/data/"` should be replaced with the path to your local copy of the rounD dataset in the [tools/consts.py](tools/consts.py) file.
The provided TTC analysis code is a tool to assess the variation of TTC in roundabouts of the rounD dataset, as well as risk, and investigate on correlation between these two units.
The provided files should be run in the following order:
......
......@@ -18,8 +18,12 @@ from scipy import stats
import pickle
import argparse
import roundtools.read_csv as rd
from topology import ROUND_PATH, Topology, Lane
import sys
sys.path.append('tools')
import read_csv as rd
from consts import ROUND_PATH
from topology import Topology, Lane
from ttc_correlation import TTCTimeline, TTCData, VariationDataset
import locations
......
......@@ -9,90 +9,22 @@ import matplotlib.patches as patches
plt.rcParams['pdf.fonttype'] = 42
plt.rcParams['ps.fonttype'] = 42
from sklearn.linear_model import LogisticRegression
import numpy as np
import pickle
import argparse
import roundtools.read_csv as rd
import locations
import sys
sys.path.append('tools')
from exit_model import *
import read_csv as rd
import locations as locations
from exit_tracking import ExitTracking
# ................................................................. #
# Plot correlation data from the pickle files generated by parse.py #
# ................................................................. #
'''
Extracts training data from the pickle files generated by parse.py
'''
def gather_training_data(input_ids, seed):
training_data = []
for id_str in input_ids:
with open('exit_parse/round_exit_{}.pickle'.format(id_str), 'rb') as f:
exit_data = pickle.load(f)
for vehicle_id in exit_data.training_data.keys():
training_data.extend(exit_data.training_data[vehicle_id])
np.random.seed(seed)
np.random.shuffle(training_data)
return training_data
'''
Extracts training and validation sets compatible with sklearn from training_data
'''
def process_training_data(training_data):
nb_samples = len(training_data)
nb_samples_training = int(nb_samples*0.8)
training_set = training_data[0:nb_samples_training]
validation_set = training_data[nb_samples_training:]
(x_training, y_training) = ([],[])
(x_validation, y_validation) = ([],[])
for (laneid, heading, dist, label) in training_set:
x_training.append([laneid, heading, dist])
y_training.append(label)
x_training = np.array(x_training)
y_training = np.array(y_training)
for (laneid, heading, dist, label) in validation_set:
x_validation.append([laneid, heading, dist])
y_validation.append(label)
x_validation = np.array(x_validation)
y_validation = np.array(y_validation)
return (x_training, y_training, x_validation, y_validation)
'''
Trains a model to predict the probability of vehicles exiting from the roundabout
based on the data of the given input file.
'''
def get_exit_proba_model(input_ids, seed):
training_data = gather_training_data(input_ids, seed)
(x_training, y_training, x_validation, y_validation) = process_training_data(training_data)
model = LogisticRegression()
model.fit(x_training, y_training)
return (model, model.score(x_validation, y_validation))
'''
Returns the probability of exit of a given vehicle using a trained model.
'''
def get_exit_probability(model, lane, heading, distance):
sample = np.array([[lane,heading,distance]])
true_class_id = [i for i in range(len(model.classes_)) if model.classes_[i] == True][0]
return model.predict_proba(sample)[0][true_class_id]
'''
Run script to generate graphs about the trained model
'''
......
#!/usr/bin/env python3
'''
Author: Duncan Deveaux
'''
import numpy as np
import pandas as pd
import seaborn as sn
from scipy import stats
import roundtools.read_csv as rd
class ExitTracking:
def __init__(self, topology):
self.vehicles = {}
self.training_data = {}
self.topology = topology
def update_vehicle(self, obj):
vehicle_id = obj[rd.TRACK_ID]
if vehicle_id not in self.vehicles:
self.vehicles[vehicle_id] = {'tracks':[], 'exit_point':None}
if vehicle_id in self.training_data: #Vehicle already exited, skip
return
# Is vehicle exiting?
is_exiting = self.topology.getobjectexits(obj)
if is_exiting != -1:
#print ("vid {} exiting on exit {}".format(vehicle_id, is_exiting))
self.vehicles[vehicle_id]['exit_point'] = is_exiting
self.generate_training_data(vehicle_id)
else:
# Add track data
# 1. current lane id
current_lane = self.topology.get_lane_distance(obj)
if current_lane == None: #Not in the roundabout yet, skip
return
# 2. relative heading
signed_relheading = self.topology.get_relative_heading(obj)
# 3. straight-line distance to next exit.
(next_exit_id, distance) = self.topology.get_distance_to_next_exit(obj)
self.vehicles[vehicle_id]['tracks'].append((current_lane, signed_relheading, distance, next_exit_id))
def generate_training_data(self, vehicle_id):
if vehicle_id not in self.vehicles or self.vehicles[vehicle_id]['exit_point'] == None:
return
if vehicle_id in self.training_data:
print ("Warning: object id {} has already been added to ExitTracking training data.".format(vehicle_id))
return
self.training_data[vehicle_id] = []
for track in self.vehicles[vehicle_id]['tracks']:
self.training_data[vehicle_id].append( (track[0], track[1], track[2], (track[3] == self.vehicles[vehicle_id]['exit_point'])) )
#!/usr/bin/env python3
'''
Author: Duncan Deveaux
'''
from os import listdir
from os.path import isfile, join
import roundtools.read_csv as rd
import topology
def get_input_for_location(location):
input_ids = []
data_files = [f for f in listdir(topology.ROUND_PATH) if isfile(join(topology.ROUND_PATH, f))]
for filename in data_files:
if filename.endswith('recordingMeta.csv'):
meta_info = rd.read_meta_info({'input_meta_path': join(topology.ROUND_PATH, filename)})
if int(meta_info[rd.LOCATION_ID]) == location:
split = filename.split('_')
if len(split) == 0:
print("Warning: filename {} is not separated by '_'".format(filename))
else:
input_ids.append(split[0])
return input_ids
def get_topology_for_location(location):
if location == 0:
return topology.Topology.roundDLocation0Topology()
elif location == 1:
return topology.Topology.roundDLocation1Topology()
elif location == 2:
return topology.Topology.roundDLocation2Topology()
else:
raise Exception("The topology for location {} has not been defined.".format(location))
......@@ -19,11 +19,15 @@ from scipy import stats
import pickle
import argparse
import roundtools.read_csv as rd
from topology import ROUND_PATH, Topology, Lane
import sys
sys.path.append('tools')
import read_csv as rd
from consts import ROUND_PATH
from topology import Topology, Lane
from ttc_correlation import TTCTimeline, TTCData, VariationDataset
import analysis_exits
import exit_model
import locations
......@@ -90,7 +94,7 @@ if argsparse.probability_weighting:
model_training_inputs = input_ids.copy()
model_training_inputs.remove(input_str) #Do not train the model using the data it will be applied to.
(exit_model, accuracy) = analysis_exits.get_exit_proba_model(model_training_inputs, 101010)
(exit_model, accuracy) = exit_model.get_exit_proba_model(model_training_inputs, 101010)
print ("Model trained excluding {}_tracks, accuracy: {}".format(input_str, accuracy))
print ('{} -> {}'.format(input_str, model_training_inputs))
......
......@@ -19,10 +19,14 @@ from scipy import stats
import pickle
import argparse
import roundtools.read_csv as rd
from topology import ROUND_PATH, Topology, Lane
import sys
sys.path.append('tools')
from consts import ROUND_PATH
from topology import Topology, Lane
from exit_tracking import ExitTracking
import locations
import read_csv as rd
import locations as locations
def get_frames(tracks_meta, trackId):
......
#!/usr/bin/env python3
'''
Author: Duncan Deveaux
'''
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from scipy import stats
plt.rcParams["figure.figsize"] = (10,8.5)
plt.rcParams['pdf.fonttype'] = 42
plt.rcParams['ps.fonttype'] = 42
''' Plot a linear regression.
see https://stackoverflow.com/questions/27164114/show-confidence-limits-and-prediction-limits-in-scatter-plot
'''
def plot_linreg(ax, x, y, confidence=0.90):
slope, intercept = np.polyfit(x, y, 1) # linear model adjustment
y_model = np.polyval([slope, intercept], x) # modeling...
x_mean = np.mean(x)
y_mean = np.mean(y)
n = x.size # number of samples
m = 2 # number of parameters
dof = n - m # degrees of freedom
t = stats.t.ppf(confidence, dof) # Students statistic of interval confidence
residual = y - y_model
std_error = (np.sum(residual**2) / dof)**.5 # Standard deviation of the error
# calculating the r2
# https://www.statisticshowto.com/probability-and-statistics/coefficient-of-determination-r-squared/
# Pearson's correlation coefficient
numerator = np.sum((x - x_mean)*(y - y_mean))
denominator = ( np.sum((x - x_mean)**2) * np.sum((y - y_mean)**2) )**.5
correlation_coef = numerator / denominator
r2 = correlation_coef**2
# mean squared error
MSE = 1/n * np.sum( (y - y_model)**2 )
# to plot the adjusted model
x_line = np.linspace(np.min(x), np.max(x), 100)
y_line = np.polyval([slope, intercept], x_line)
# confidence interval
ci = t * std_error * (1/n + (x_line - x_mean)**2 / np.sum((x - x_mean)**2))**.5
# predicting interval
pi = t * std_error * (1 + 1/n + (x_line - x_mean)**2 / np.sum((x - x_mean)**2))**.5
############### Plotting
ax.plot(x_line, y_line, color = 'black')
ax.plot(x_line, y_line + pi, color = 'grey', alpha=0.8, label = '95% prediction interval', linestyle='--')
ax.plot(x_line, y_line - pi, color = 'grey', alpha=0.8, linestyle='--')
ax.fill_between(x_line, y_line + ci, y_line - ci, color = 'grey', alpha=0.2, label = '95% confidence interval')
# rounding and position must be changed for each case and preference
a = str(np.round(intercept))
b = str(np.round(slope,2))
r2s = str(np.round(r2,2))
MSEs = str(np.round(MSE))
ax.text(0.04,0.975, 'y = ' + a + ' + ' + b + ' x', transform = ax.transAxes)
ax.text(0.04,0.945, '$r^2$ = ' + r2s + ' MSE = ' + MSEs, transform = ax.transAxes)
#!/usr/bin/env python3
'''
Author: Duncan Deveaux
'''
# Constants
ROUND_PATH = "path/to/round/data/" # TODO: Replace with the path to the RounD dataset
#!/usr/bin/env python3
'''
Author: Duncan Deveaux
'''
from sklearn.linear_model import LogisticRegression
import numpy as np
import pickle
import sys
sys.path.append('tools')
import exit_tracking
'''
Extracts training data from the pickle files generated by parse.py
'''
def gather_training_data(input_ids, seed):
training_data = []
for id_str in input_ids:
with open('exit_parse/round_exit_{}.pickle'.format(id_str), 'rb') as f:
exit_data = pickle.load(f)
for vehicle_id in exit_data.training_data.keys():
training_data.extend(exit_data.training_data[vehicle_id])
np.random.seed(seed)
np.random.shuffle(training_data)
return training_data
'''
Extracts training and validation sets compatible with sklearn from training_data
'''
def process_training_data(training_data):
nb_samples = len(training_data)
nb_samples_training = int(nb_samples*0.8)
training_set = training_data[0:nb_samples_training]
validation_set = training_data[nb_samples_training:]
(x_training, y_training) = ([],[])
(x_validation, y_validation) = ([],[])
for (laneid, heading, dist, label) in training_set:
x_training.append([laneid, heading, dist])
y_training.append(label)
x_training = np.array(x_training)
y_training = np.array(y_training)
for (laneid, heading, dist, label) in validation_set:
x_validation.append([laneid, heading, dist])
y_validation.append(label)
x_validation = np.array(x_validation)
y_validation = np.array(y_validation)
return (x_training, y_training, x_validation, y_validation)
'''
Trains a model to predict the probability of vehicles exiting from the roundabout
based on the data of the given input file.
'''
def get_exit_proba_model(input_ids, seed):
training_data = gather_training_data(input_ids, seed)
(x_training, y_training, x_validation, y_validation) = process_training_data(training_data)
model = LogisticRegression()
model.fit(x_training, y_training)
return (model, model.score(x_validation, y_validation))
'''
Returns the probability of exit of a given vehicle using a trained model.
'''
def get_exit_probability(model, lane, heading, distance):
sample = np.array([[lane,heading,distance]])
true_class_id = [i for i in range(len(model.classes_)) if model.classes_[i] == True][0]
return model.predict_proba(sample)[0][true_class_id]
#!/usr/bin/env python3
'''
Author: Duncan Deveaux
'''
import math
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.patches as patches
plt.rcParams['pdf.fonttype'] = 42
plt.rcParams['ps.fonttype'] = 42
import roundtools.read_csv as rd
import analysis_exits
# Constants
ROUND_PATH = "path/to/round/data/" # TODO: Replace with the path to the RounD dataset
# https://stackoverflow.com/questions/31735499/calculate-angle-clockwise-between-two-points
def angle_between(p1, p2):
d1 = p2[0] - p1[0]
d2 = p2[1] - p1[1]
if d1 == 0:
if d2 == 0: # same points?
deg = 0
else:
deg = 0 if p1[1] > p2[1] else 180
elif d2 == 0:
deg = 90 if p1[0] < p2[0] else 270
else:
deg = math.atan(d2 / d1) / np.pi * 180
lowering = p1[1] < p2[1]
if (lowering and deg < 0) or (not lowering and deg > 0):
deg += 270
else:
deg += 90
return (deg+270) % 360
#https://stackoverflow.com/questions/34372480/rotate-point-about-another-point-in-degrees-python
def rotate_around(p, origin, degrees):
angle = np.deg2rad(degrees)
R = np.array([[np.cos(angle), -np.sin(angle)],
[np.sin(angle), np.cos(angle)]])
o = np.atleast_2d(origin)
p = np.atleast_2d(p)
return np.squeeze((R @ (p.T-o.T) + o.T).T)
def get_object_front(obj):
(x,y) = (obj[rd.X], obj[rd.Y] - obj[rd.HEIGHT]/2)
obj_center = (obj[rd.X], obj[rd.Y])
obj_angle = obj[rd.HEADING]+90
return rotate_around((x,y), obj_center, obj_angle)
def get_object_back(obj):
(x,y) = (obj[rd.X], obj[rd.Y] + obj[rd.HEIGHT]/2)
obj_center = (obj[rd.X], obj[rd.Y])
obj_angle = obj[rd.HEADING]+90
return rotate_around((x,y), obj_center, obj_angle)
# Lane: represents a circular lane, centered around a roundabout.
class Lane:
def __init__(self, center, radius_begin, radius_end, nb_slices=30):
self.center = center
self.radius_begin = radius_begin
self.radius_end = radius_end
self.slices = []
slice_step_deg = 360.0/nb_slices
for i in range(nb_slices):
self.slices.append((slice_step_deg*i, slice_step_deg*(i+1)))
def contains_point(self, obj_pos):
center_dist = np.linalg.norm(np.array(self.center)-np.array(obj_pos))
return center_dist >= self.radius_begin and center_dist < self.radius_end
# Whether the lane contains any point of obj
def intersects(self, obj):
# Coordinates
(x1,x2) = (obj[rd.X] - obj[rd.WIDTH]/2, obj[rd.X] + obj[rd.WIDTH]/2)
(y1,y2) = (obj[rd.Y] - obj[rd.HEIGHT]/2, obj[rd.Y] + obj[rd.HEIGHT]/2)
obj_center = (obj[rd.X], obj[rd.Y])
obj_angle = obj[rd.HEADING]+90
#Apply rotation
return ( self.contains_point(rotate_around((x1,y1), obj_center, obj_angle)) or
self.contains_point(rotate_around((x1,y2), obj_center, obj_angle)) or
self.contains_point(rotate_around((x2,y1), obj_center, obj_angle)) or
self.contains_point(rotate_around((x2,y2), obj_center, obj_angle)) )
# Returns the slice containing obj
# or -1 if the object was not found in the lane
def slice_of(self, obj):
if not self.intersects(obj):
return -1
for slice_ix in range(len(self.slices)):
if self.slice_contains(slice_ix, obj):
return slice_ix
return -1
# Preconditions:
# self.contains(obj) must be true.
def slice_contains(self, slice_ix, obj):
obj_angle = angle_between(self.center, (obj[rd.X], obj[rd.Y]))
return obj_angle >= self.slices[slice_ix][0] and obj_angle < self.slices[slice_ix][1]
def frontvehicleof(self, objectsList, objectId):
obj_slice = self.slice_of(objectId)
if obj_slice == -1:
return None
nb_slices = len(self.slices)
slice_ix = obj_slice
for _ in range(nb_slices//2):
slice_ix = (slice_ix + 1) % nb_slices
# Is there a vehicle in slice_ix ? (If yes it is the closest front vehicle).
for obj in objectsList:
if obj[rd.TRACK_ID] != objectId and self.intersects(obj) and self.slice_contains(slice_ix, obj):
return obj
return None
# Lane: describes a roundabout, with a set of lanes.
class Topology:
''' NOTE: The number of circular lanes for each roundabout as well as
the roundabout center points are different for each roundabout.
This is a utilitary method to define a circular lanes topology that
is suitable for the rounD location of ID=0 '''
@staticmethod
def roundDLocation0Topology():
roundabout_center = (81.0, -47.1)
#Lanes in circular_lanes must be sorted from the closest
#to the further from the roundabout center.
circular_lanes = [Lane(roundabout_center, radius_begin=15, radius_end=17.25),
Lane(roundabout_center, radius_begin=17.25, radius_end=19.5),
Lane(roundabout_center, radius_begin=19.5, radius_end=21.75),
Lane(roundabout_center, radius_begin=21.75, radius_end=24)]
exit_points = [(97.2,-24.3), (55.5,-35.8), (65.3,-70.0), (106.0,-59.0)]
print("Generated Location 0 Topology...")
return Topology(roundabout_center, circular_lanes, exit_points)
# TODO! Define the lanes position and roundabout center for other topologies : the following pattern can be used:
@staticmethod
def roundDLocation1Topology():
roundabout_center = (115.6, -70)
circular_lanes = [Lane(roundabout_center, radius_begin=8, radius_end=10.25),
Lane(roundabout_center, radius_begin=10.25, radius_end=12.5),
Lane(roundabout_center, radius_begin=12.5, radius_end=14.75)]
exit_points = [(121.0, -52.0), (97.8, -68.5), (133.2, -75.0), (111.0, -89.0)]
print("Generated Location 1 Topology...")
return Topology(roundabout_center, circular_lanes, exit_points)
@staticmethod