Skip to content
Snippets Groups Projects
Commit 7456eeaf authored by Pedro L. Magalhães's avatar Pedro L. Magalhães
Browse files

Deleted old files.

parent 483ceeb3
No related branches found
No related tags found
2 merge requests!3Moves a renamed latest branch into master,!2Renames arcs as edges within the GIS module. Removed redundant methods. Added...
# imports
# standard
import uuid
import random
import math
# local, external
import networkx as nx
import osmnx as ox
from shapely.geometry import Point, LineString
from numpy.testing import assert_allclose
from osmnx.stats import count_streets_per_node
# from osmnx.projection import project_graph
from osmnx.routing import k_shortest_paths
# local, internal
import src.topupopt.data.gis.identify as gis_ident
import src.topupopt.data.gis.calculate as gis_calc
import src.topupopt.data.gis.modify as gis_mod
import src.topupopt.data.gis.utils as gis_utils
import src.topupopt.data.gis.osm as _osm
#******************************************************************************
#******************************************************************************
def examples(seed_number: int = None):
# seed_number = random.randint(1,int(1e5))
# # # seed_number = 871
# # seed_number = 57945
# # seed_number = 299
# seed_number = 57129
print('Seed number: ' + str(seed_number))
#**************************************************************************
# test calculating path lengths
examples_path_length(seed_number)
# test the identification and removal of intermediate nodes
examples_finding_replacing_nodes(seed_number)
# test finding nearest nodes
example_nearest_node_keys(seed_number)
# test the removal of redundant arcs
example_removal_redundant_arcs(seed_number)
# test removing roundabouts
examples_transform_roundabouts(seed_number)
#******************************************************************************
#******************************************************************************
#******************************************************************************
#******************************************************************************
#******************************************************************************
#******************************************************************************
def get_network_A():
#**************************************************************************
# create network
network = nx.MultiDiGraph()
network.graph['crs'] = "EPSG:4326"
# add nodes
number_nodes = 8
for node_index in range(number_nodes):
xy = (random.random(), random.random())
geo = Point(xy)
network.add_node(node_index, x=xy[0], y=xy[1], geometry=geo)
# edges: should include straight paths, self-loops, redundant paths, dead ends
list_edges = [
(0,2),(1,2),(2,3),(3,4),(5,4),(6,5),(5,7)
]
for _edge in list_edges:
geo = LineString(
[(network.nodes[_edge[0]]['x'],
network.nodes[_edge[0]]['y']),
(network.nodes[_edge[1]]['x'],
network.nodes[_edge[1]]['y'])]
)
length = network.nodes[_edge[0]]['geometry'].distance(
network.nodes[_edge[1]]['geometry']
)
network.add_edge(_edge[0],
_edge[1],
geometry=geo,
length=length)
# add duplicate edges
network.add_edge(_edge[0],
_edge[1],
length=length)
network.add_edge(_edge[0],
_edge[1],
length=length+1)
protected_nodes = []
return network, protected_nodes
#**************************************************************************
#******************************************************************************
#******************************************************************************
def get_network_B():
#**************************************************************************
# create network
network = nx.MultiDiGraph()
network.graph['crs'] = "EPSG:4326"
# add nodes
number_nodes = 20
for node_index in range(number_nodes):
xy = (random.random(), random.random())
geo = Point(xy)
network.add_node(node_index, x=xy[0], y=xy[1], geometry=geo)
# edges: should include straight paths, self-loops, redundant paths, dead ends
list_edges = [
(0,1),(1,2),(2,3),(3,4),(4,5),(5,6),(6,7),(7,8),(8,9),(9,10),
(0,11),(11,12),(12,13),(13,5),
(6,14),(14,15),(15,16),(16,17),(17,18),(18,19),
#(3,3), # self-loop
(8,8), # self-loop
(15,16) # redundant
]
for _edge in list_edges:
geo = LineString(
[(network.nodes[_edge[0]]['x'],
network.nodes[_edge[0]]['y']),
(network.nodes[_edge[1]]['x'],
network.nodes[_edge[1]]['y'])]
)
length = network.nodes[_edge[0]]['geometry'].distance(
network.nodes[_edge[1]]['geometry']
)
network.add_edge(_edge[0],
_edge[1],
geometry=geo,
length=length)
protected_nodes = []
return network, protected_nodes
#**************************************************************************
#******************************************************************************
#******************************************************************************
def get_network_C():
#**************************************************************************
# create network
network = nx.MultiDiGraph()
network.graph['crs'] = "EPSG:4326"
# add nodes
number_nodes = 20
for node_index in range(number_nodes):
xy = (random.random(), random.random())
geo = Point(xy)
network.add_node(node_index, x=xy[0], y=xy[1], geometry=geo)
# network should include:
# 1) self loops
# 2) redundant arcs
# 3) dead ends
# 4) dead end loops
# 5) protected nodes
protected_nodes = [17, 8]
list_edges = [
(0,1),(1,2),(2,3),(3,4),(4,5),(5,6),(6,7),(7,8),(8,9),(9,10),
(0,11),(11,12),(12,13),(13,5),
(6,14),(14,15),(16,15),(16,17),(18,17),(18,19),
#(3,3), # self-loop
(8,8), # self-loop
(15,16) # redundant
]
for _edge in list_edges:
geo = LineString(
[(network.nodes[_edge[0]]['x'],
network.nodes[_edge[0]]['y']),
(network.nodes[_edge[1]]['x'],
network.nodes[_edge[1]]['y'])]
)
length = network.nodes[_edge[0]]['geometry'].distance(
network.nodes[_edge[1]]['geometry']
)
network.add_edge(_edge[0],
_edge[1],
geometry=geo,
length=length)
return network, protected_nodes
#**************************************************************************
#******************************************************************************
#******************************************************************************
def get_network_D():
#**************************************************************************
# create network
network = nx.MultiDiGraph()
network.graph['crs'] = "EPSG:4326"
# add nodes
number_nodes = 14
for node_index in range(number_nodes):
xy = (random.random(), random.random())
geo = Point(xy)
network.add_node(node_index, x=xy[0], y=xy[1], geometry=geo)
# edges: should include straight paths, self-loops, redundant paths, dead ends
list_edges = [
(0,1),(1,2),(2,3),(3,4),(4,5),(5,6),(6,7),(7,8),(8,9),(9,10),
(5,0),(10,6),
(10,11),(11,12),(12,13),(13,0)
]
for _edge in list_edges:
geo = LineString(
[(network.nodes[_edge[0]]['x'],
network.nodes[_edge[0]]['y']),
(network.nodes[_edge[1]]['x'],
network.nodes[_edge[1]]['y'])]
)
length = network.nodes[_edge[0]]['geometry'].distance(
network.nodes[_edge[1]]['geometry']
)
network.add_edge(_edge[0],
_edge[1],
geometry=geo,
length=length)
protected_nodes = []
return network, protected_nodes
#**************************************************************************
#******************************************************************************
#******************************************************************************
def examples_finding_replacing_nodes(seed_number: int = None):
#**************************************************************************
# paths via node keys
paths_as_arc_keys = False
if seed_number != None:
random.seed(seed_number)
networks = [
get_network_A(),
get_network_B(),
get_network_C(),
get_network_D()
]
protected_nodes = [
network[1]
for network in networks
]
networks = [
network[0]
for network in networks
]
for network_index, network in enumerate(networks):
example_intermediate_nodes_along_path(
network,
protected_nodes[network_index],
return_paths_as_arc_keys=paths_as_arc_keys
)
#**************************************************************************
# paths via arc keys
paths_as_arc_keys = True
if seed_number != None:
random.seed(seed_number)
networks = [
get_network_A(),
get_network_B(),
get_network_C(),
get_network_D()
]
protected_nodes = [
network[1]
for network in networks
]
networks = [
network[0]
for network in networks
]
for network_index, network in enumerate(networks):
example_intermediate_nodes_along_path(
network,
protected_nodes[network_index],
return_paths_as_arc_keys=paths_as_arc_keys
)
#******************************************************************************
#******************************************************************************
def examples_transform_roundabouts(seed_number: int = None):
if seed_number != None:
random.seed(seed_number)
#**************************************************************************
network, fake_roundabouts = get_network_with_roundabouts(seed_number)
example_roundabouts_protocol(network,
seed_number=seed_number,
minimum_perimeter=None,
maximum_perimeter=None,
minimum_number_nodes=None,
maximum_number_nodes=None)
#**************************************************************************
# roundabouts with perimeter limits
network, fake_roundabouts = get_network_with_roundabouts(seed_number)
example_roundabouts_protocol(network,
seed_number=seed_number,
minimum_perimeter=0.5,
maximum_perimeter=1.5,
minimum_number_nodes=None,
maximum_number_nodes=None)
# roundabouts with node number limits
network, fake_roundabouts = get_network_with_roundabouts(seed_number)
example_roundabouts_protocol(network,
seed_number=seed_number,
minimum_perimeter=None,
maximum_perimeter=None,
minimum_number_nodes=3,
maximum_number_nodes=3)
#**************************************************************************
#******************************************************************************
#******************************************************************************
def get_network_with_roundabouts(seed_number: int = None):
if seed_number != None:
random.seed(seed_number)
# create network
network = nx.MultiDiGraph()
network.graph['crs'] = "EPSG:4326"
# add nodes
number_nodes = 20
for node_index in range(number_nodes):
xy = (random.random(), random.random())
geo = Point(xy)
network.add_node(node_index, x=xy[0], y=xy[1], geometry=geo)
# roundabouts:
# 1) roundabout with 2 nodes
# 2) roundabout with 3 nodes
# 3) roundabout with 4 nodes
# 4) roundabout within roundabout 2
# 5) roundabout overlapping with roundabouts 1 and 3
list_roundabout_edges = [
# roundabout with 2 nodes
(0, 1), (1, 0),
# roundabout with 3 nodes
(2, 3), (3, 4), (4, 2),
# roundabout with 4 nodes
(5, 6), (6, 7), (7, 8), (8, 5),
# roundabout within roundabout 2
(2, 4),
# roundabout overlapping with roundabouts 1 and 3
(9, 10), (10, 11), (11, 9),
# fake roundabouts
# self loop
(12, 12),
# no oneway attr
(13, 14), (14, 15), (15, 13),
# oneway = False
(16, 17), (17, 18), (18, 19), (19, 16),
# connect 1 and 3 with a edge
(1, 6), (6, 1)
]
# add
for _edge in list_roundabout_edges:
geo = LineString(
[(network.nodes[_edge[0]]['x'],
network.nodes[_edge[0]]['y']),
(network.nodes[_edge[1]]['x'],
network.nodes[_edge[1]]['y'])]
)
length = network.nodes[_edge[0]]['geometry'].distance(
network.nodes[_edge[1]]['geometry']
)
# handle the various cases
if _edge == (12, 12):
# self loop
network.add_edge(_edge[0],
_edge[1],
geometry=geo,
length=length,
oneway=True)
elif _edge == (14, 15):
# no one way attr
network.add_edge(_edge[0],
_edge[1],
geometry=geo,
length=length)
elif (_edge == (17, 18) or _edge == (1, 6) or _edge == (6, 1)):
# oneway = False
network.add_edge(_edge[0],
_edge[1],
geometry=geo,
length=length,
oneway=False)
else:
# general case
network.add_edge(_edge[0],
_edge[1],
geometry=geo,
length=length,
oneway=True)
# add connecting edges
list_connecting_edges = [
(node_key, other_node_key)
for node_key in network.nodes()
for other_node_key in network.nodes()
if random.randint(0, 1)
]
while len(list_connecting_edges) > len(list_roundabout_edges):
random_pop = random.randint(0, len(list_connecting_edges)-1)
list_connecting_edges.pop(random_pop)
for edge_key in list_connecting_edges:
geo = LineString(
[(network.nodes[edge_key[0]]['x'],
network.nodes[edge_key[0]]['y']),
(network.nodes[edge_key[1]]['x'],
network.nodes[edge_key[1]]['y'])]
)
length = network.nodes[edge_key[0]]['geometry'].distance(
network.nodes[edge_key[1]]['geometry']
)
network.add_edge(edge_key[0],
edge_key[1],
geometry=geo,
length=length)
# fake roundabouts:
# 1) no oneway attr
# 2) oneway=False
# 3) self loop
# fake roundabouts
fake_roundabouts = [
# self-loop
[12],
# no oneway attr
[13, 14, 15],
# oneway = False
[16, 17, 18, 19]]
return network, fake_roundabouts
#******************************************************************************
#******************************************************************************
def example_roundabouts_protocol(network: nx.MultiDiGraph,
seed_number: int = None,
minimum_perimeter: float = 0,
maximum_perimeter: float = 100,
minimum_number_nodes: int = 0,
maximum_number_nodes: int = 5):
#**************************************************************************
# find all loops
all_loops = nx.simple_cycles(network)
initial_number_loops = len(list(all_loops))
assert initial_number_loops != 0
#**************************************************************************
# find the nodes constituting roundabouts
original_roundabouts = gis_ident.find_roundabouts(
network,
minimum_perimeter=minimum_perimeter,
maximum_perimeter=maximum_perimeter,
minimum_number_nodes=minimum_number_nodes,
maximum_number_nodes=maximum_number_nodes)
initial_number_roundabouts_found = len(original_roundabouts)
# assert that there is at least one roundabout
assert initial_number_roundabouts_found >= 1
for roundabout in original_roundabouts:
assert gis_ident.is_roundabout(network, roundabout)
#**************************************************************************
# locate all external nodes affected by transforming the roundabout
arcs_affected_by_roundabout = {
i: [
edge_key
# for each node in the roundabout
for node_key in original_roundabout_nodes
# for each neighbouring nodes
for other_node_key in gis_ident.neighbours(network, node_key)
# if it is not in the roundabout itself
if other_node_key not in original_roundabout_nodes
# for each arc between the two nodes
for edge_key in gis_ident.get_edges_between_two_nodes(
network,
node_key,
other_node_key)
]
for i, original_roundabout_nodes in enumerate(original_roundabouts)}
# get all the respective lengths
dict_edge_key_lengths = {
edge_key: network.edges[edge_key][_osm.KEY_OSMNX_LENGTH]
for roundabout_index in range(len(original_roundabouts))
for edge_key in arcs_affected_by_roundabout[roundabout_index]
}
#**************************************************************************
# transform the roundabouts
new_roundabout_nodes = gis_mod.transform_roundabouts_into_crossroads(
network,
original_roundabouts)
# assert that the nr. of roundabout nodes matches the number of roundabouts
assert len(new_roundabout_nodes) == len(original_roundabouts)
# final network
final_loops = nx.simple_cycles(network)
final_number_loops = len(list(final_loops))
# assert that the initial number of loops is greater than or equal to the
# number of loops in the final object plus those removed
assert initial_number_loops >= final_number_loops
roundabout_nodes = gis_ident.find_roundabouts(
network,
minimum_perimeter=minimum_perimeter,
maximum_perimeter=maximum_perimeter,
minimum_number_nodes=minimum_number_nodes,
maximum_number_nodes=maximum_number_nodes)
final_number_roundabouts_found = len(roundabout_nodes)
assert initial_number_roundabouts_found >= final_number_roundabouts_found
#**************************************************************************
# verify that the external nodes affected by the transformation are still
# connected to the respective roundabouts
# for each roundabout
for roundabout_index, new_roundabout_node_key in enumerate(new_roundabout_nodes):
# for each edge involving the roundabout under consideration
for edge_key in arcs_affected_by_roundabout[roundabout_index]:
# check if the current roundabout has been transformed
if new_roundabout_node_key is None:
# cases: 1) overlapping roundabouts
# the current roundabout has not been transformed: all original
# arcs should still exist unless they are part of roundabouts
# that were also transformed
if network.has_edge(u=edge_key[0],
v=edge_key[1],
key=edge_key[2]):
# the arc exists
# assert that it has the same length
assert (
dict_edge_key_lengths[edge_key] == network.edges[
edge_key][_osm.KEY_OSMNX_LENGTH])
# it does, now continue
continue
# the original arc does not exist: it was modified or deleted
# check if the source node exists
if network.has_node(edge_key[0]):
# TODO: introduce a roundabout to enter this test case
# it does, now check that the sink node does not exist
assert not network.has_node(edge_key[1])
# find original roundabouts to which the sink node belonged
roundabout_candidates = [
ra_i
for ra_i, ra_nodes in enumerate(original_roundabouts)
if edge_key[1] in ra_nodes
if new_roundabout_nodes[ra_i] != None
]
# assert that at least one was transformed
assert len(roundabout_candidates) >= 1
# assert that the length is longer than the original
for ra_i in roundabout_candidates:
for arc_key in gis_ident.get_edges_from_a_to_b(
network,
edge_key[0],
new_roundabout_nodes[ra_i]):
assert (dict_edge_key_lengths[edge_key] <=
network.edges[arc_key][_osm.KEY_OSMNX_LENGTH])
continue
# check if the sink node exists
if network.has_node(edge_key[1]):
# it does, now check that the source node does not exist
assert not network.has_node(edge_key[0])
# find original roundabouts to which the source node belonged
roundabout_candidates = [
ra_i
for ra_i, ra_nodes in enumerate(original_roundabouts)
if edge_key[0] in ra_nodes
if new_roundabout_nodes[ra_i] != None
]
# assert that at least one was transformed
assert len(roundabout_candidates) >= 1
# assert that the length is longer than the original
for ra_i in roundabout_candidates:
for arc_key in gis_ident.get_edges_from_a_to_b(
network,
new_roundabout_nodes[ra_i],
edge_key[1]):
assert (dict_edge_key_lengths[edge_key] <=
network.edges[arc_key][_osm.KEY_OSMNX_LENGTH])
continue
#******************************************************************
#******************************************************************
#******************************************************************
# the roundabout was transformed:
# 1) the source node no longer exists, the sink node does
# 2) the sink node no longer exists, the source node does
# 3) none of the nodes exists any more
#******************************************************************
#******************************************************************
#******************************************************************
# the original arc does not exist: it was modified or deleted
#******************************************************************
# check if the source node exists
if network.has_node(edge_key[0]):
# 2) the sink node no longer exists, the source node does
assert not network.has_node(edge_key[1])
# find original roundabouts to which the sink node belonged
roundabout_candidates = [
ra_i
for ra_i, ra_nodes in enumerate(original_roundabouts)
if edge_key[1] in ra_nodes
if new_roundabout_nodes[ra_i] != None
]
# assert that at least one was transformed
assert len(roundabout_candidates) >= 1
# assert that the length is longer than the original
for ra_i in roundabout_candidates:
for arc_key in gis_ident.get_edges_from_a_to_b(
network,
edge_key[0],
new_roundabout_nodes[ra_i]):
assert (dict_edge_key_lengths[edge_key] <=
network.edges[arc_key][_osm.KEY_OSMNX_LENGTH])
continue
#******************************************************************
# check if the sink node exists
if network.has_node(edge_key[1]):
# 1) the source node no longer exists, the sink node does
assert not network.has_node(edge_key[0])
# find original roundabouts to which the source node belonged
roundabout_candidates = [
ra_i
for ra_i, ra_nodes in enumerate(original_roundabouts)
if edge_key[0] in ra_nodes
if new_roundabout_nodes[ra_i] != None
]
# assert that at least one was transformed
assert len(roundabout_candidates) >= 1
# assert that the length is longer than the original
for ra_i in roundabout_candidates:
for arc_key in gis_ident.get_edges_from_a_to_b(
network,
new_roundabout_nodes[ra_i],
edge_key[1]):
assert (dict_edge_key_lengths[edge_key] <=
network.edges[arc_key][_osm.KEY_OSMNX_LENGTH])
continue
#******************************************************************
# 3) none of the nodes exists any more
assert not network.has_node(edge_key[0])
assert not network.has_node(edge_key[1])
# find original roundabouts to which the source node belonged
roundabout_candidates_source = [
ra_i
for ra_i, ra_nodes in enumerate(original_roundabouts)
if edge_key[0] in ra_nodes
if new_roundabout_nodes[ra_i] != None
]
# assert that at least one was transformed
assert len(roundabout_candidates_source) >= 1
# assert that the length is longer than the original
# find original roundabouts to which the source node belonged
roundabout_candidates_sink = [
ra_i
for ra_i, ra_nodes in enumerate(original_roundabouts)
if edge_key[1] in ra_nodes
if new_roundabout_nodes[ra_i] != None
]
# assert that at least one was transformed
assert len(roundabout_candidates_sink) >= 1
# assert that the length is longer than the original
for ra_so_i in roundabout_candidates_source:
for ra_si_i in roundabout_candidates_sink:
for arc_key in gis_ident.get_edges_from_a_to_b(
network,
new_roundabout_nodes[ra_so_i],
new_roundabout_nodes[ra_si_i]):
assert (dict_edge_key_lengths[edge_key] <=
network.edges[arc_key][_osm.KEY_OSMNX_LENGTH])
continue
#******************************************************************************
#******************************************************************************
def example_intermediate_nodes_along_path(network: nx.MultiDiGraph,
excluded_nodes: list,
return_paths_as_arc_keys: bool,
ignore_edge_direction: bool = True):
#**************************************************************************
#**************************************************************************
# find straight paths
simplifiable_paths = gis_ident.find_all_straight_paths(
network=network,
excluded_nodes=excluded_nodes,
return_paths_as_arc_keys=return_paths_as_arc_keys)
# test the paths
for path in simplifiable_paths:
assert gis_ident.is_path_simplifiable(
network,
path,
path_as_node_keys=(not return_paths_as_arc_keys)
)
# get the length of each path
if return_paths_as_arc_keys:
path_lengths = [
gis_calc.edge_path_length(
network,
path,
ignore_edge_direction=ignore_edge_direction)
for path in simplifiable_paths
]
else:
path_lengths = [
gis_calc.node_path_length(
network,
path,
return_minimum_length_only=True)
for path in simplifiable_paths
]
# replace the paths
for path_index, path in enumerate(simplifiable_paths):
# replace the path
gis_mod.replace_path(network,
path,
path_as_arc_keys=return_paths_as_arc_keys)
# try to find the paths again
new_simplifiable_paths = gis_ident.find_all_straight_paths(
network=network,
excluded_nodes=excluded_nodes,
return_paths_as_arc_keys=return_paths_as_arc_keys)
# test the paths again
for path in new_simplifiable_paths:
assert gis_ident.is_path_simplifiable(
network,
path,
path_as_node_keys=(not return_paths_as_arc_keys)
)
#**************************************************************************
#**************************************************************************
#******************************************************************************
#******************************************************************************
#**************************************************************************
#******************************************************************************
#******************************************************************************
#******************************************************************************
#******************************************************************************
#******************************************************************************
#******************************************************************************
def example_removal_redundant_arcs(seed_number: int = None):
#**************************************************************************
if seed_number != None:
random.seed(seed_number)
#**************************************************************************
# create a network
network_order = random.randint(4,6)
network = nx.MultiDiGraph(
incoming_graph_data=nx.binomial_tree(network_order,
nx.MultiDiGraph)
)
# add multiple edges between two nodes
list_arcs = [arc for arc in network.edges(keys=True)]
for arc in list_arcs:
# add length to first arc
network.add_edge(
arc[0],
arc[1],
key=arc[2],
length=random.random()
)
# add more arcs, also with a length attribute
if random.randint(0, 1):
# add more arcs and lengths
network.add_edge(
u_for_edge=arc[0],
v_for_edge=arc[1],
length=random.random()
)
# remove redundant arcs
gis_mod.remove_redundant_arcs(network)
# verify results
# for each arc, make sure there is only one arc per direction
for arc in network.edges(keys=True):
assert len(
gis_ident.get_edges_from_a_to_b(network,arc[0],arc[1])
) == 1
#**************************************************************************
#******************************************************************************
#******************************************************************************
#******************************************************************************
#******************************************************************************
#******************************************************************************
#******************************************************************************
def get_network_with_unconnected_points(
add_geometry_attr: bool = False):
# from shapely.geometry import LineString
# import osmnx as ox
# import networkx as nx
# create a network
network = nx.MultiDiGraph()
x_offset = 15
y_offset = 45
x_amp = 0.01
y_amp = 0.01
network.graph['crs'] = 'epsg:4326'
network.graph['simplified'] = True
#**************************************************************************
# define the nodes
list_nodes = [
# connected nodes
(0, {'x':x_offset+0*x_amp,'y':y_offset+0*y_amp}),
(1, {'x':x_offset+1*x_amp,'y':y_offset+0*y_amp}),
(2, {'x':x_offset+2*x_amp,'y':y_offset+0*y_amp}),
(3, {'x':x_offset+3*x_amp,'y':y_offset+1*y_amp}),
(4, {'x':x_offset+3*x_amp,'y':y_offset-1*y_amp}),
(5, {'x':x_offset+3*x_amp,'y':y_offset-2*y_amp}),
# unconnected nodes
(6, {'x':x_offset+0.5*x_amp,'y':y_offset+0.5*y_amp}),
(7, {'x':x_offset+2.0*x_amp,'y':y_offset+0.5*y_amp}),
(8, {'x':x_offset+2.0*x_amp,'y':y_offset+1.0*y_amp}),
(9, {'x':x_offset+3.0*x_amp,'y':y_offset+1.5*y_amp}),
(10, {'x':x_offset+4.0*x_amp,'y':y_offset+0.0*y_amp}),
(11, {'x':x_offset+2.0*x_amp,'y':y_offset-1.5*y_amp}),
(12, {'x':x_offset+4.0*x_amp,'y':y_offset-1.2*y_amp}),
(13, {'x':x_offset+5.0*x_amp,'y':y_offset-1.8*y_amp}),
(14, {'x':x_offset+3.0*x_amp,'y':y_offset-4.0*y_amp}),
(15, {'x':x_offset-1.0*x_amp,'y':y_offset+0.0*y_amp}),
]
network.add_nodes_from(list_nodes)
# define the edges
list_edges = [
(0,1),
(1,2),
(2,3),
(3,4),
(4,5),
]
network.add_edges_from(list_edges)
#**************************************************************************
# add geometry to select edges
network.add_edge(
u_for_edge=1,
v_for_edge=2,
key=0,
geometry=LineString(
[(x_offset+1*x_amp, y_offset+0*y_amp),
(x_offset+1.5*x_amp, y_offset+0.25*y_amp),
(x_offset+2*x_amp, y_offset+0*y_amp)]
)
)
network.add_edge(
u_for_edge=3,
v_for_edge=4,
key=0,
geometry=LineString(
[(x_offset+3.0*x_amp, y_offset+1*y_amp),
(x_offset+3.2*x_amp, y_offset+0*y_amp),
(x_offset+3.0*x_amp, y_offset-1*y_amp)]
)
)
# TODO: add oneway and reversed attributes
#**************************************************************************
# add lengths to edges
for edge_key in network.edges(keys=True):
# get its data dict
edge_dict = network.get_edge_data(u=edge_key[0],
v=edge_key[1],
key=edge_key[2])
# prepare the geometry
if 'geometry' in edge_dict:
edge_geo = edge_dict['geometry']
else:
edge_geo_point_list = [
(network.nodes[edge_key[0]]['x'],
network.nodes[edge_key[0]]['y']),
(network.nodes[edge_key[1]]['x'],
network.nodes[edge_key[1]]['y'])]
edge_geo = LineString(
edge_geo_point_list
)
if add_geometry_attr:
edge_dict['geometry'] = edge_geo
edge_dict['length'] = (
gis_calc.great_circle_distance_along_path(
edge_geo)
)
network.add_edge(
edge_key[0],
edge_key[1],
edge_key[2],
**edge_dict
)
#**************************************************************************
return network
#******************************************************************************
#******************************************************************************
def example_simplify_network(network: nx.MultiDiGraph,
nodes_excluded: list = None,
seed_number: int = None,
update_street_count: bool = True):
#**************************************************************************
if seed_number != None:
random.seed(seed_number)
#**************************************************************************
# exclude some nodes
if nodes_excluded == None or len(nodes_excluded) == 0:
nodes_excluded = [
node_key
for node_key in network.nodes()
if random.randint(0,1)]
#**************************************************************************
gis_utils.simplify_network(
network,
nodes_excluded,
update_street_count_per_node=update_street_count)
#**************************************************************************
#******************************************************************************
#******************************************************************************
def verify_street_count(network: nx.MultiDiGraph):
# count the number of streets per node
street_count_dict = count_streets_per_node(network)
# for each node
for node_key in network.nodes():
try:
street_count = network.nodes[node_key][_osm.KEY_OSMNX_STREET_COUNT]
except KeyError:
continue
assert street_count == street_count_dict[node_key]
#******************************************************************************
#******************************************************************************
def protocol_validate_path_length(network: nx.MultiDiGraph):
protocol_validate_node_path_length(network)
protocol_validate_edge_path_length(network)
#******************************************************************************
#******************************************************************************
def protocol_validate_edge_path_length(network: nx.MultiDiGraph):
# paths as arc keys, minimum only
# for each pair of nodes
for node_key in network.nodes():
for other_node_key in network.nodes():
if not nx.has_path(network, node_key, other_node_key):
continue
# for each possible path
list_simple_paths = [
path
for path in nx.all_simple_edge_paths(network,
node_key,
other_node_key)
]
list_simple_path_lengths = [
gis_calc.edge_path_length(
network,
path
)
for path in list_simple_paths]
# use k_shortest_paths to identify the shortest paths
list_k_shortest_paths = list(
k_shortest_paths(
network,
node_key,
other_node_key,
len(list_simple_paths))
)
# sort list_simple_path_lengths and list_simple_paths
temp_sort = sorted(
((v, i) for i, v in enumerate(list_simple_path_lengths)),
reverse=False)
# sorted_list_simple_path_lengths = [temp[0]
# for temp in temp_sort]
sorted_list_simple_paths = [list_simple_paths[temp[1]]
for temp in temp_sort]
# make sure the list_k_shortest_paths and sorted_list_simple_paths
# have the same size by adding elements to list_k_shortest_paths
# if list_k_shortest_paths is smaller than sorted_list_simple_paths
if len(list_k_shortest_paths) < len(sorted_list_simple_paths):
# if the node paths on list_k_shortest_paths are the same as in
# sorted_list_simple_paths, add them to list_kso
# for each (edge) path
for path_index, path in enumerate(sorted_list_simple_paths):
# transform it into a node path
path_in_nodes = [
arc_key[1]
for arc_key in path
if arc_key[0] != arc_key[1]]
path_in_nodes.insert(0, path[0][0])
# make sure this path is in list_k_shortest_paths
assert path_in_nodes in list_k_shortest_paths
# assert that the paths are the same and on the same position
try:
assert path_in_nodes == list_k_shortest_paths[path_index]
except IndexError:
# index exceeded: append path
list_k_shortest_paths.append(path_in_nodes)
except AssertionError:
# incorrect order: insert path in the path_index index
list_k_shortest_paths.insert(path_index, path_in_nodes)
# for each simple path
for path_index, path in enumerate(sorted_list_simple_paths):
# convert path to nodes
path_in_nodes = [
arc_key[1]
for arc_key in path
if arc_key[0] != arc_key[1]]
path_in_nodes.insert(0, path[0][0])
# assert that the ordered paths match
assert path_in_nodes == list_k_shortest_paths[path_index]
#******************************************************************************
#******************************************************************************
def protocol_validate_node_path_length(network: nx.MultiDiGraph):
# paths as node keys, minimum only
return_minimum_path_length = True
# for each pair of nodes
for node_key in network.nodes():
for other_node_key in network.nodes():
if not nx.has_path(network, node_key, other_node_key):
continue
# for each possible path
list_simple_paths = [
path
for path in nx.all_simple_paths(network,
node_key,
other_node_key)
]
# remove duplicates
for path in list_simple_paths:
while list_simple_paths.count(path) != 1:
list_simple_paths.remove(path)
list_simple_path_lengths = [
gis_calc.node_path_length(
network,
path,
return_minimum_length_only=return_minimum_path_length)
for path in list_simple_paths]
# use k_shortest_paths to identify the shortest paths
list_k_shortest_paths = list(
k_shortest_paths(network,
node_key,
other_node_key,
len(list_simple_paths))
)
# sort list_simple_path_lengths and list_simple_paths
temp_sort = sorted(
((v, i) for i, v in enumerate(list_simple_path_lengths)),
reverse=False)
# sorted_list_simple_path_lengths = [temp[0]
# for temp in temp_sort]
sorted_list_simple_paths = [list_simple_paths[temp[1]]
for temp in temp_sort]
# assert that the ordered paths match
assert list_k_shortest_paths == sorted_list_simple_paths
#**************************************************************************
# paths as node keys, all lengths
return_minimum_path_length = True
# for each pair of nodes
for node_key in network.nodes():
for other_node_key in network.nodes():
if not nx.has_path(network, node_key, other_node_key):
continue
# for each possible path
list_simple_paths = [
path
for path in nx.all_simple_paths(network,
node_key,
other_node_key)
]
# remove duplicates (paths_as_node_keys = True)
for path in list_simple_paths:
while list_simple_paths.count(path) != 1:
list_simple_paths.remove(path)
# for each path, get all the possible lengths
list_simple_path_lengths = [
gis_calc.node_path_length(
network,
path,
return_minimum_length_only=return_minimum_path_length)
for path in list_simple_paths]
# use k_shortest_paths to identify the shortest paths
list_k_shortest_paths = list(
k_shortest_paths(network,
node_key,
other_node_key,
len(list_simple_paths))
)
if len(list_simple_path_lengths) == 0:
assert len(list_k_shortest_paths) == 0
continue
# sort list_simple_path_lengths and list_simple_paths
temp_sort = sorted(
((v, i) for i, v in enumerate(list_simple_path_lengths)),
reverse=False)
# sorted_list_simple_path_lengths = [temp[0]
# for temp in temp_sort]
sorted_list_simple_paths = [list_simple_paths[temp[1]]
for temp in temp_sort]
# for each simple path
assert list_k_shortest_paths == sorted_list_simple_paths
#**************************************************************************
#******************************************************************************
#******************************************************************************
def examples_path_length(seed_number: int = None):
if seed_number != None:
random.seed(seed_number)
networks = [
get_network_A()[0],
get_network_B()[0],
get_network_C()[0],
get_network_D()[0]
]
for network in networks:
protocol_validate_path_length(network)
#**************************************************************************
# test unusual cases
network = get_network_A()[0]
# empty path
path = []
my_path_length = gis_calc.node_path_length(
network,
path,
return_minimum_length_only=True)
assert my_path_length == math.inf
# invalid path, minimum only
path = ['node1','node2','node3']
my_path_length = gis_calc.node_path_length(
network,
path,
return_minimum_length_only=True)
assert my_path_length == math.inf
# invalid path, all paths
path = ['node1','node2','node3']
my_path_length = gis_calc.node_path_length(
network,
path,
return_minimum_length_only=False)
assert my_path_length == [math.inf]
#**************************************************************************
#******************************************************************************
#******************************************************************************
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment