diff --git a/tests/examples_gis.py b/tests/examples_gis.py deleted file mode 100644 index 9a8d30a3adc6d71e6ee59c1544f5930294d07bce..0000000000000000000000000000000000000000 --- a/tests/examples_gis.py +++ /dev/null @@ -1,1642 +0,0 @@ -# imports - -# standard - -import uuid -import random -import math - -# local, external - -import networkx as nx -import osmnx as ox -from shapely.geometry import Point, LineString -from numpy.testing import assert_allclose -from osmnx.stats import count_streets_per_node -# from osmnx.projection import project_graph -from osmnx.routing import k_shortest_paths - -# local, internal - -import src.topupopt.data.gis.identify as gis_ident -import src.topupopt.data.gis.calculate as gis_calc -import src.topupopt.data.gis.modify as gis_mod -import src.topupopt.data.gis.utils as gis_utils -import src.topupopt.data.gis.osm as _osm - -#****************************************************************************** -#****************************************************************************** - -def examples(seed_number: int = None): - - # seed_number = random.randint(1,int(1e5)) - - # # # seed_number = 871 - - # # seed_number = 57945 - - # # seed_number = 299 - - # seed_number = 57129 - - print('Seed number: ' + str(seed_number)) - - #************************************************************************** - - # test calculating path lengths - - examples_path_length(seed_number) - - # test the identification and removal of intermediate nodes - - examples_finding_replacing_nodes(seed_number) - - # test finding nearest nodes - - example_nearest_node_keys(seed_number) - - # test the removal of redundant arcs - - example_removal_redundant_arcs(seed_number) - - # test removing roundabouts - - examples_transform_roundabouts(seed_number) - -#****************************************************************************** -#****************************************************************************** -#****************************************************************************** -#****************************************************************************** -#****************************************************************************** -#****************************************************************************** - -def get_network_A(): - - #************************************************************************** - - # create network - - network = nx.MultiDiGraph() - - network.graph['crs'] = "EPSG:4326" - - # add nodes - - number_nodes = 8 - - for node_index in range(number_nodes): - - xy = (random.random(), random.random()) - - geo = Point(xy) - - network.add_node(node_index, x=xy[0], y=xy[1], geometry=geo) - - # edges: should include straight paths, self-loops, redundant paths, dead ends - - list_edges = [ - (0,2),(1,2),(2,3),(3,4),(5,4),(6,5),(5,7) - ] - - for _edge in list_edges: - - geo = LineString( - [(network.nodes[_edge[0]]['x'], - network.nodes[_edge[0]]['y']), - (network.nodes[_edge[1]]['x'], - network.nodes[_edge[1]]['y'])] - ) - - length = network.nodes[_edge[0]]['geometry'].distance( - network.nodes[_edge[1]]['geometry'] - ) - - network.add_edge(_edge[0], - _edge[1], - geometry=geo, - length=length) - - # add duplicate edges - - network.add_edge(_edge[0], - _edge[1], - length=length) - - network.add_edge(_edge[0], - _edge[1], - length=length+1) - - protected_nodes = [] - - return network, protected_nodes - - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** - -def get_network_B(): - - #************************************************************************** - - # create network - - network = nx.MultiDiGraph() - - network.graph['crs'] = "EPSG:4326" - - # add nodes - - number_nodes = 20 - - for node_index in range(number_nodes): - - xy = (random.random(), random.random()) - - geo = Point(xy) - - network.add_node(node_index, x=xy[0], y=xy[1], geometry=geo) - - # edges: should include straight paths, self-loops, redundant paths, dead ends - - list_edges = [ - (0,1),(1,2),(2,3),(3,4),(4,5),(5,6),(6,7),(7,8),(8,9),(9,10), - (0,11),(11,12),(12,13),(13,5), - (6,14),(14,15),(15,16),(16,17),(17,18),(18,19), - #(3,3), # self-loop - (8,8), # self-loop - (15,16) # redundant - ] - - for _edge in list_edges: - - geo = LineString( - [(network.nodes[_edge[0]]['x'], - network.nodes[_edge[0]]['y']), - (network.nodes[_edge[1]]['x'], - network.nodes[_edge[1]]['y'])] - ) - - length = network.nodes[_edge[0]]['geometry'].distance( - network.nodes[_edge[1]]['geometry'] - ) - - network.add_edge(_edge[0], - _edge[1], - geometry=geo, - length=length) - - protected_nodes = [] - - return network, protected_nodes - - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** - -def get_network_C(): - - #************************************************************************** - - # create network - - network = nx.MultiDiGraph() - - network.graph['crs'] = "EPSG:4326" - - # add nodes - - number_nodes = 20 - - for node_index in range(number_nodes): - - xy = (random.random(), random.random()) - - geo = Point(xy) - - network.add_node(node_index, x=xy[0], y=xy[1], geometry=geo) - - # network should include: - # 1) self loops - # 2) redundant arcs - # 3) dead ends - # 4) dead end loops - # 5) protected nodes - - protected_nodes = [17, 8] - - list_edges = [ - (0,1),(1,2),(2,3),(3,4),(4,5),(5,6),(6,7),(7,8),(8,9),(9,10), - (0,11),(11,12),(12,13),(13,5), - (6,14),(14,15),(16,15),(16,17),(18,17),(18,19), - #(3,3), # self-loop - (8,8), # self-loop - (15,16) # redundant - ] - - for _edge in list_edges: - - geo = LineString( - [(network.nodes[_edge[0]]['x'], - network.nodes[_edge[0]]['y']), - (network.nodes[_edge[1]]['x'], - network.nodes[_edge[1]]['y'])] - ) - - length = network.nodes[_edge[0]]['geometry'].distance( - network.nodes[_edge[1]]['geometry'] - ) - - network.add_edge(_edge[0], - _edge[1], - geometry=geo, - length=length) - - return network, protected_nodes - - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** - -def get_network_D(): - - #************************************************************************** - - # create network - - network = nx.MultiDiGraph() - - network.graph['crs'] = "EPSG:4326" - - # add nodes - - number_nodes = 14 - - for node_index in range(number_nodes): - - xy = (random.random(), random.random()) - - geo = Point(xy) - - network.add_node(node_index, x=xy[0], y=xy[1], geometry=geo) - - # edges: should include straight paths, self-loops, redundant paths, dead ends - - list_edges = [ - (0,1),(1,2),(2,3),(3,4),(4,5),(5,6),(6,7),(7,8),(8,9),(9,10), - (5,0),(10,6), - (10,11),(11,12),(12,13),(13,0) - ] - - for _edge in list_edges: - - geo = LineString( - [(network.nodes[_edge[0]]['x'], - network.nodes[_edge[0]]['y']), - (network.nodes[_edge[1]]['x'], - network.nodes[_edge[1]]['y'])] - ) - - length = network.nodes[_edge[0]]['geometry'].distance( - network.nodes[_edge[1]]['geometry'] - ) - - network.add_edge(_edge[0], - _edge[1], - geometry=geo, - length=length) - - protected_nodes = [] - - return network, protected_nodes - - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** - -def examples_finding_replacing_nodes(seed_number: int = None): - - #************************************************************************** - - # paths via node keys - - paths_as_arc_keys = False - - if seed_number != None: - - random.seed(seed_number) - - networks = [ - get_network_A(), - get_network_B(), - get_network_C(), - get_network_D() - ] - - protected_nodes = [ - network[1] - for network in networks - ] - - networks = [ - network[0] - for network in networks - ] - - for network_index, network in enumerate(networks): - - example_intermediate_nodes_along_path( - network, - protected_nodes[network_index], - return_paths_as_arc_keys=paths_as_arc_keys - ) - - #************************************************************************** - - # paths via arc keys - - paths_as_arc_keys = True - - if seed_number != None: - - random.seed(seed_number) - - networks = [ - get_network_A(), - get_network_B(), - get_network_C(), - get_network_D() - ] - - protected_nodes = [ - network[1] - for network in networks - ] - - networks = [ - network[0] - for network in networks - ] - - for network_index, network in enumerate(networks): - - example_intermediate_nodes_along_path( - network, - protected_nodes[network_index], - return_paths_as_arc_keys=paths_as_arc_keys - ) - -#****************************************************************************** -#****************************************************************************** - -def examples_transform_roundabouts(seed_number: int = None): - - if seed_number != None: - - random.seed(seed_number) - - #************************************************************************** - - network, fake_roundabouts = get_network_with_roundabouts(seed_number) - - example_roundabouts_protocol(network, - seed_number=seed_number, - minimum_perimeter=None, - maximum_perimeter=None, - minimum_number_nodes=None, - maximum_number_nodes=None) - - #************************************************************************** - - # roundabouts with perimeter limits - - network, fake_roundabouts = get_network_with_roundabouts(seed_number) - - example_roundabouts_protocol(network, - seed_number=seed_number, - minimum_perimeter=0.5, - maximum_perimeter=1.5, - minimum_number_nodes=None, - maximum_number_nodes=None) - - # roundabouts with node number limits - - network, fake_roundabouts = get_network_with_roundabouts(seed_number) - - example_roundabouts_protocol(network, - seed_number=seed_number, - minimum_perimeter=None, - maximum_perimeter=None, - minimum_number_nodes=3, - maximum_number_nodes=3) - - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** - -def get_network_with_roundabouts(seed_number: int = None): - - if seed_number != None: - - random.seed(seed_number) - - # create network - - network = nx.MultiDiGraph() - - network.graph['crs'] = "EPSG:4326" - - # add nodes - - number_nodes = 20 - - for node_index in range(number_nodes): - - xy = (random.random(), random.random()) - - geo = Point(xy) - - network.add_node(node_index, x=xy[0], y=xy[1], geometry=geo) - - # roundabouts: - # 1) roundabout with 2 nodes - # 2) roundabout with 3 nodes - # 3) roundabout with 4 nodes - # 4) roundabout within roundabout 2 - # 5) roundabout overlapping with roundabouts 1 and 3 - - list_roundabout_edges = [ - # roundabout with 2 nodes - (0, 1), (1, 0), - # roundabout with 3 nodes - (2, 3), (3, 4), (4, 2), - # roundabout with 4 nodes - (5, 6), (6, 7), (7, 8), (8, 5), - # roundabout within roundabout 2 - (2, 4), - # roundabout overlapping with roundabouts 1 and 3 - (9, 10), (10, 11), (11, 9), - # fake roundabouts - # self loop - (12, 12), - # no oneway attr - (13, 14), (14, 15), (15, 13), - # oneway = False - (16, 17), (17, 18), (18, 19), (19, 16), - # connect 1 and 3 with a edge - (1, 6), (6, 1) - ] - - # add - - for _edge in list_roundabout_edges: - - geo = LineString( - [(network.nodes[_edge[0]]['x'], - network.nodes[_edge[0]]['y']), - (network.nodes[_edge[1]]['x'], - network.nodes[_edge[1]]['y'])] - ) - - length = network.nodes[_edge[0]]['geometry'].distance( - network.nodes[_edge[1]]['geometry'] - ) - - # handle the various cases - - if _edge == (12, 12): - - # self loop - - network.add_edge(_edge[0], - _edge[1], - geometry=geo, - length=length, - oneway=True) - - elif _edge == (14, 15): - - # no one way attr - - network.add_edge(_edge[0], - _edge[1], - geometry=geo, - length=length) - - elif (_edge == (17, 18) or _edge == (1, 6) or _edge == (6, 1)): - - # oneway = False - - network.add_edge(_edge[0], - _edge[1], - geometry=geo, - length=length, - oneway=False) - - else: - - # general case - - network.add_edge(_edge[0], - _edge[1], - geometry=geo, - length=length, - oneway=True) - - # add connecting edges - - list_connecting_edges = [ - (node_key, other_node_key) - for node_key in network.nodes() - for other_node_key in network.nodes() - if random.randint(0, 1) - ] - - while len(list_connecting_edges) > len(list_roundabout_edges): - - random_pop = random.randint(0, len(list_connecting_edges)-1) - - list_connecting_edges.pop(random_pop) - - for edge_key in list_connecting_edges: - - geo = LineString( - [(network.nodes[edge_key[0]]['x'], - network.nodes[edge_key[0]]['y']), - (network.nodes[edge_key[1]]['x'], - network.nodes[edge_key[1]]['y'])] - ) - - length = network.nodes[edge_key[0]]['geometry'].distance( - network.nodes[edge_key[1]]['geometry'] - ) - - network.add_edge(edge_key[0], - edge_key[1], - geometry=geo, - length=length) - - # fake roundabouts: - # 1) no oneway attr - # 2) oneway=False - # 3) self loop - - # fake roundabouts - - fake_roundabouts = [ - # self-loop - [12], - # no oneway attr - [13, 14, 15], - # oneway = False - [16, 17, 18, 19]] - - return network, fake_roundabouts - -#****************************************************************************** -#****************************************************************************** - -def example_roundabouts_protocol(network: nx.MultiDiGraph, - seed_number: int = None, - minimum_perimeter: float = 0, - maximum_perimeter: float = 100, - minimum_number_nodes: int = 0, - maximum_number_nodes: int = 5): - - #************************************************************************** - - # find all loops - - all_loops = nx.simple_cycles(network) - - initial_number_loops = len(list(all_loops)) - - assert initial_number_loops != 0 - - #************************************************************************** - - # find the nodes constituting roundabouts - - original_roundabouts = gis_ident.find_roundabouts( - network, - minimum_perimeter=minimum_perimeter, - maximum_perimeter=maximum_perimeter, - minimum_number_nodes=minimum_number_nodes, - maximum_number_nodes=maximum_number_nodes) - - initial_number_roundabouts_found = len(original_roundabouts) - - # assert that there is at least one roundabout - - assert initial_number_roundabouts_found >= 1 - - for roundabout in original_roundabouts: - - assert gis_ident.is_roundabout(network, roundabout) - - #************************************************************************** - - # locate all external nodes affected by transforming the roundabout - - arcs_affected_by_roundabout = { - i: [ - edge_key - # for each node in the roundabout - for node_key in original_roundabout_nodes - # for each neighbouring nodes - for other_node_key in gis_ident.neighbours(network, node_key) - # if it is not in the roundabout itself - if other_node_key not in original_roundabout_nodes - # for each arc between the two nodes - for edge_key in gis_ident.get_edges_between_two_nodes( - network, - node_key, - other_node_key) - ] - for i, original_roundabout_nodes in enumerate(original_roundabouts)} - - # get all the respective lengths - - dict_edge_key_lengths = { - edge_key: network.edges[edge_key][_osm.KEY_OSMNX_LENGTH] - for roundabout_index in range(len(original_roundabouts)) - for edge_key in arcs_affected_by_roundabout[roundabout_index] - } - - #************************************************************************** - - # transform the roundabouts - - new_roundabout_nodes = gis_mod.transform_roundabouts_into_crossroads( - network, - original_roundabouts) - - # assert that the nr. of roundabout nodes matches the number of roundabouts - - assert len(new_roundabout_nodes) == len(original_roundabouts) - - # final network - - final_loops = nx.simple_cycles(network) - - final_number_loops = len(list(final_loops)) - - # assert that the initial number of loops is greater than or equal to the - # number of loops in the final object plus those removed - - assert initial_number_loops >= final_number_loops - - roundabout_nodes = gis_ident.find_roundabouts( - network, - minimum_perimeter=minimum_perimeter, - maximum_perimeter=maximum_perimeter, - minimum_number_nodes=minimum_number_nodes, - maximum_number_nodes=maximum_number_nodes) - - final_number_roundabouts_found = len(roundabout_nodes) - - assert initial_number_roundabouts_found >= final_number_roundabouts_found - - #************************************************************************** - - # verify that the external nodes affected by the transformation are still - # connected to the respective roundabouts - - # for each roundabout - - for roundabout_index, new_roundabout_node_key in enumerate(new_roundabout_nodes): - - # for each edge involving the roundabout under consideration - - for edge_key in arcs_affected_by_roundabout[roundabout_index]: - - # check if the current roundabout has been transformed - - if new_roundabout_node_key is None: - - # cases: 1) overlapping roundabouts - - # the current roundabout has not been transformed: all original - # arcs should still exist unless they are part of roundabouts - # that were also transformed - - if network.has_edge(u=edge_key[0], - v=edge_key[1], - key=edge_key[2]): - - # the arc exists - - # assert that it has the same length - - assert ( - dict_edge_key_lengths[edge_key] == network.edges[ - edge_key][_osm.KEY_OSMNX_LENGTH]) - - # it does, now continue - - continue - - # the original arc does not exist: it was modified or deleted - - # check if the source node exists - - if network.has_node(edge_key[0]): - - # TODO: introduce a roundabout to enter this test case - - # it does, now check that the sink node does not exist - - assert not network.has_node(edge_key[1]) - - # find original roundabouts to which the sink node belonged - - roundabout_candidates = [ - ra_i - for ra_i, ra_nodes in enumerate(original_roundabouts) - if edge_key[1] in ra_nodes - if new_roundabout_nodes[ra_i] != None - ] - - # assert that at least one was transformed - - assert len(roundabout_candidates) >= 1 - - # assert that the length is longer than the original - - for ra_i in roundabout_candidates: - - for arc_key in gis_ident.get_edges_from_a_to_b( - network, - edge_key[0], - new_roundabout_nodes[ra_i]): - - assert (dict_edge_key_lengths[edge_key] <= - network.edges[arc_key][_osm.KEY_OSMNX_LENGTH]) - - continue - - # check if the sink node exists - - if network.has_node(edge_key[1]): - - # it does, now check that the source node does not exist - - assert not network.has_node(edge_key[0]) - - # find original roundabouts to which the source node belonged - - roundabout_candidates = [ - ra_i - for ra_i, ra_nodes in enumerate(original_roundabouts) - if edge_key[0] in ra_nodes - if new_roundabout_nodes[ra_i] != None - ] - - # assert that at least one was transformed - - assert len(roundabout_candidates) >= 1 - - # assert that the length is longer than the original - - for ra_i in roundabout_candidates: - - for arc_key in gis_ident.get_edges_from_a_to_b( - network, - new_roundabout_nodes[ra_i], - edge_key[1]): - - assert (dict_edge_key_lengths[edge_key] <= - network.edges[arc_key][_osm.KEY_OSMNX_LENGTH]) - - continue - - #****************************************************************** - #****************************************************************** - #****************************************************************** - - # the roundabout was transformed: - # 1) the source node no longer exists, the sink node does - # 2) the sink node no longer exists, the source node does - # 3) none of the nodes exists any more - - #****************************************************************** - #****************************************************************** - #****************************************************************** - - # the original arc does not exist: it was modified or deleted - - #****************************************************************** - - # check if the source node exists - - if network.has_node(edge_key[0]): - - # 2) the sink node no longer exists, the source node does - - assert not network.has_node(edge_key[1]) - - # find original roundabouts to which the sink node belonged - - roundabout_candidates = [ - ra_i - for ra_i, ra_nodes in enumerate(original_roundabouts) - if edge_key[1] in ra_nodes - if new_roundabout_nodes[ra_i] != None - ] - - # assert that at least one was transformed - - assert len(roundabout_candidates) >= 1 - - # assert that the length is longer than the original - - for ra_i in roundabout_candidates: - - for arc_key in gis_ident.get_edges_from_a_to_b( - network, - edge_key[0], - new_roundabout_nodes[ra_i]): - - assert (dict_edge_key_lengths[edge_key] <= - network.edges[arc_key][_osm.KEY_OSMNX_LENGTH]) - - continue - - #****************************************************************** - - # check if the sink node exists - - if network.has_node(edge_key[1]): - - # 1) the source node no longer exists, the sink node does - - assert not network.has_node(edge_key[0]) - - # find original roundabouts to which the source node belonged - - roundabout_candidates = [ - ra_i - for ra_i, ra_nodes in enumerate(original_roundabouts) - if edge_key[0] in ra_nodes - if new_roundabout_nodes[ra_i] != None - ] - - # assert that at least one was transformed - - assert len(roundabout_candidates) >= 1 - - # assert that the length is longer than the original - - for ra_i in roundabout_candidates: - - for arc_key in gis_ident.get_edges_from_a_to_b( - network, - new_roundabout_nodes[ra_i], - edge_key[1]): - - assert (dict_edge_key_lengths[edge_key] <= - network.edges[arc_key][_osm.KEY_OSMNX_LENGTH]) - - continue - - #****************************************************************** - - # 3) none of the nodes exists any more - - assert not network.has_node(edge_key[0]) - - assert not network.has_node(edge_key[1]) - - # find original roundabouts to which the source node belonged - - roundabout_candidates_source = [ - ra_i - for ra_i, ra_nodes in enumerate(original_roundabouts) - if edge_key[0] in ra_nodes - if new_roundabout_nodes[ra_i] != None - ] - - # assert that at least one was transformed - - assert len(roundabout_candidates_source) >= 1 - - # assert that the length is longer than the original - - # find original roundabouts to which the source node belonged - - roundabout_candidates_sink = [ - ra_i - for ra_i, ra_nodes in enumerate(original_roundabouts) - if edge_key[1] in ra_nodes - if new_roundabout_nodes[ra_i] != None - ] - - # assert that at least one was transformed - - assert len(roundabout_candidates_sink) >= 1 - - # assert that the length is longer than the original - - for ra_so_i in roundabout_candidates_source: - - for ra_si_i in roundabout_candidates_sink: - - for arc_key in gis_ident.get_edges_from_a_to_b( - network, - new_roundabout_nodes[ra_so_i], - new_roundabout_nodes[ra_si_i]): - - assert (dict_edge_key_lengths[edge_key] <= - network.edges[arc_key][_osm.KEY_OSMNX_LENGTH]) - - continue - -#****************************************************************************** -#****************************************************************************** - -def example_intermediate_nodes_along_path(network: nx.MultiDiGraph, - excluded_nodes: list, - return_paths_as_arc_keys: bool, - ignore_edge_direction: bool = True): - - #************************************************************************** - #************************************************************************** - - # find straight paths - - simplifiable_paths = gis_ident.find_all_straight_paths( - network=network, - excluded_nodes=excluded_nodes, - return_paths_as_arc_keys=return_paths_as_arc_keys) - - # test the paths - - for path in simplifiable_paths: - - assert gis_ident.is_path_simplifiable( - network, - path, - path_as_node_keys=(not return_paths_as_arc_keys) - ) - - # get the length of each path - - if return_paths_as_arc_keys: - - path_lengths = [ - gis_calc.edge_path_length( - network, - path, - ignore_edge_direction=ignore_edge_direction) - for path in simplifiable_paths - ] - - else: - - path_lengths = [ - gis_calc.node_path_length( - network, - path, - return_minimum_length_only=True) - for path in simplifiable_paths - ] - - # replace the paths - - for path_index, path in enumerate(simplifiable_paths): - - # replace the path - - gis_mod.replace_path(network, - path, - path_as_arc_keys=return_paths_as_arc_keys) - - # try to find the paths again - - new_simplifiable_paths = gis_ident.find_all_straight_paths( - network=network, - excluded_nodes=excluded_nodes, - return_paths_as_arc_keys=return_paths_as_arc_keys) - - # test the paths again - - for path in new_simplifiable_paths: - - assert gis_ident.is_path_simplifiable( - network, - path, - path_as_node_keys=(not return_paths_as_arc_keys) - ) - - #************************************************************************** - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** - - - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** -#****************************************************************************** -#****************************************************************************** -#****************************************************************************** -#****************************************************************************** - -def example_removal_redundant_arcs(seed_number: int = None): - - #************************************************************************** - - if seed_number != None: - - random.seed(seed_number) - - #************************************************************************** - - # create a network - - network_order = random.randint(4,6) - - network = nx.MultiDiGraph( - incoming_graph_data=nx.binomial_tree(network_order, - nx.MultiDiGraph) - ) - - # add multiple edges between two nodes - - list_arcs = [arc for arc in network.edges(keys=True)] - - for arc in list_arcs: - - # add length to first arc - - network.add_edge( - arc[0], - arc[1], - key=arc[2], - length=random.random() - ) - - # add more arcs, also with a length attribute - - if random.randint(0, 1): - - # add more arcs and lengths - - network.add_edge( - u_for_edge=arc[0], - v_for_edge=arc[1], - length=random.random() - ) - - # remove redundant arcs - - gis_mod.remove_redundant_arcs(network) - - # verify results - - # for each arc, make sure there is only one arc per direction - - for arc in network.edges(keys=True): - - assert len( - gis_ident.get_edges_from_a_to_b(network,arc[0],arc[1]) - ) == 1 - - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** -#****************************************************************************** -#****************************************************************************** -#****************************************************************************** -#****************************************************************************** - -def get_network_with_unconnected_points( - add_geometry_attr: bool = False): - - # from shapely.geometry import LineString - # import osmnx as ox - # import networkx as nx - - # create a network - - network = nx.MultiDiGraph() - - x_offset = 15 - - y_offset = 45 - - x_amp = 0.01 - - y_amp = 0.01 - - network.graph['crs'] = 'epsg:4326' - - network.graph['simplified'] = True - - #************************************************************************** - - # define the nodes - - list_nodes = [ - # connected nodes - (0, {'x':x_offset+0*x_amp,'y':y_offset+0*y_amp}), - (1, {'x':x_offset+1*x_amp,'y':y_offset+0*y_amp}), - (2, {'x':x_offset+2*x_amp,'y':y_offset+0*y_amp}), - (3, {'x':x_offset+3*x_amp,'y':y_offset+1*y_amp}), - (4, {'x':x_offset+3*x_amp,'y':y_offset-1*y_amp}), - (5, {'x':x_offset+3*x_amp,'y':y_offset-2*y_amp}), - # unconnected nodes - (6, {'x':x_offset+0.5*x_amp,'y':y_offset+0.5*y_amp}), - (7, {'x':x_offset+2.0*x_amp,'y':y_offset+0.5*y_amp}), - (8, {'x':x_offset+2.0*x_amp,'y':y_offset+1.0*y_amp}), - (9, {'x':x_offset+3.0*x_amp,'y':y_offset+1.5*y_amp}), - (10, {'x':x_offset+4.0*x_amp,'y':y_offset+0.0*y_amp}), - - (11, {'x':x_offset+2.0*x_amp,'y':y_offset-1.5*y_amp}), - (12, {'x':x_offset+4.0*x_amp,'y':y_offset-1.2*y_amp}), - (13, {'x':x_offset+5.0*x_amp,'y':y_offset-1.8*y_amp}), - (14, {'x':x_offset+3.0*x_amp,'y':y_offset-4.0*y_amp}), - (15, {'x':x_offset-1.0*x_amp,'y':y_offset+0.0*y_amp}), - - ] - - network.add_nodes_from(list_nodes) - - # define the edges - - list_edges = [ - (0,1), - (1,2), - (2,3), - (3,4), - (4,5), - ] - - network.add_edges_from(list_edges) - - #************************************************************************** - - # add geometry to select edges - - network.add_edge( - u_for_edge=1, - v_for_edge=2, - key=0, - geometry=LineString( - [(x_offset+1*x_amp, y_offset+0*y_amp), - (x_offset+1.5*x_amp, y_offset+0.25*y_amp), - (x_offset+2*x_amp, y_offset+0*y_amp)] - ) - ) - - network.add_edge( - u_for_edge=3, - v_for_edge=4, - key=0, - geometry=LineString( - [(x_offset+3.0*x_amp, y_offset+1*y_amp), - (x_offset+3.2*x_amp, y_offset+0*y_amp), - (x_offset+3.0*x_amp, y_offset-1*y_amp)] - ) - ) - - # TODO: add oneway and reversed attributes - - #************************************************************************** - - # add lengths to edges - - for edge_key in network.edges(keys=True): - - # get its data dict - - edge_dict = network.get_edge_data(u=edge_key[0], - v=edge_key[1], - key=edge_key[2]) - - # prepare the geometry - - if 'geometry' in edge_dict: - - edge_geo = edge_dict['geometry'] - - else: - - edge_geo_point_list = [ - (network.nodes[edge_key[0]]['x'], - network.nodes[edge_key[0]]['y']), - (network.nodes[edge_key[1]]['x'], - network.nodes[edge_key[1]]['y'])] - - edge_geo = LineString( - edge_geo_point_list - ) - - if add_geometry_attr: - - edge_dict['geometry'] = edge_geo - - edge_dict['length'] = ( - gis_calc.great_circle_distance_along_path( - edge_geo) - ) - - network.add_edge( - edge_key[0], - edge_key[1], - edge_key[2], - **edge_dict - ) - - #************************************************************************** - - return network - -#****************************************************************************** -#****************************************************************************** - -def example_simplify_network(network: nx.MultiDiGraph, - nodes_excluded: list = None, - seed_number: int = None, - update_street_count: bool = True): - - #************************************************************************** - - if seed_number != None: - - random.seed(seed_number) - - #************************************************************************** - - # exclude some nodes - - if nodes_excluded == None or len(nodes_excluded) == 0: - - nodes_excluded = [ - node_key - for node_key in network.nodes() - if random.randint(0,1)] - - #************************************************************************** - - gis_utils.simplify_network( - network, - nodes_excluded, - update_street_count_per_node=update_street_count) - - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** - -def verify_street_count(network: nx.MultiDiGraph): - - # count the number of streets per node - - street_count_dict = count_streets_per_node(network) - - # for each node - - for node_key in network.nodes(): - - try: - street_count = network.nodes[node_key][_osm.KEY_OSMNX_STREET_COUNT] - except KeyError: - continue - - assert street_count == street_count_dict[node_key] - -#****************************************************************************** -#****************************************************************************** - -def protocol_validate_path_length(network: nx.MultiDiGraph): - - protocol_validate_node_path_length(network) - - protocol_validate_edge_path_length(network) - -#****************************************************************************** -#****************************************************************************** - -def protocol_validate_edge_path_length(network: nx.MultiDiGraph): - - # paths as arc keys, minimum only - - # for each pair of nodes - - for node_key in network.nodes(): - - for other_node_key in network.nodes(): - - if not nx.has_path(network, node_key, other_node_key): - - continue - - # for each possible path - - list_simple_paths = [ - path - for path in nx.all_simple_edge_paths(network, - node_key, - other_node_key) - ] - - list_simple_path_lengths = [ - gis_calc.edge_path_length( - network, - path - ) - for path in list_simple_paths] - - # use k_shortest_paths to identify the shortest paths - - list_k_shortest_paths = list( - k_shortest_paths( - network, - node_key, - other_node_key, - len(list_simple_paths)) - ) - - # sort list_simple_path_lengths and list_simple_paths - - temp_sort = sorted( - ((v, i) for i, v in enumerate(list_simple_path_lengths)), - reverse=False) - - # sorted_list_simple_path_lengths = [temp[0] - # for temp in temp_sort] - - sorted_list_simple_paths = [list_simple_paths[temp[1]] - for temp in temp_sort] - - # make sure the list_k_shortest_paths and sorted_list_simple_paths - # have the same size by adding elements to list_k_shortest_paths - - # if list_k_shortest_paths is smaller than sorted_list_simple_paths - - if len(list_k_shortest_paths) < len(sorted_list_simple_paths): - - # if the node paths on list_k_shortest_paths are the same as in - # sorted_list_simple_paths, add them to list_kso - - # for each (edge) path - - for path_index, path in enumerate(sorted_list_simple_paths): - - # transform it into a node path - - path_in_nodes = [ - arc_key[1] - for arc_key in path - if arc_key[0] != arc_key[1]] - path_in_nodes.insert(0, path[0][0]) - - - # make sure this path is in list_k_shortest_paths - - assert path_in_nodes in list_k_shortest_paths - - # assert that the paths are the same and on the same position - - try: - - assert path_in_nodes == list_k_shortest_paths[path_index] - - except IndexError: - - # index exceeded: append path - - list_k_shortest_paths.append(path_in_nodes) - - except AssertionError: - - # incorrect order: insert path in the path_index index - - list_k_shortest_paths.insert(path_index, path_in_nodes) - - # for each simple path - - for path_index, path in enumerate(sorted_list_simple_paths): - - # convert path to nodes - - path_in_nodes = [ - arc_key[1] - for arc_key in path - if arc_key[0] != arc_key[1]] - path_in_nodes.insert(0, path[0][0]) - - # assert that the ordered paths match - - assert path_in_nodes == list_k_shortest_paths[path_index] - -#****************************************************************************** -#****************************************************************************** - -def protocol_validate_node_path_length(network: nx.MultiDiGraph): - - # paths as node keys, minimum only - - return_minimum_path_length = True - - # for each pair of nodes - - for node_key in network.nodes(): - - for other_node_key in network.nodes(): - - if not nx.has_path(network, node_key, other_node_key): - - continue - - # for each possible path - - list_simple_paths = [ - path - for path in nx.all_simple_paths(network, - node_key, - other_node_key) - ] - - # remove duplicates - - for path in list_simple_paths: - while list_simple_paths.count(path) != 1: - list_simple_paths.remove(path) - - list_simple_path_lengths = [ - gis_calc.node_path_length( - network, - path, - return_minimum_length_only=return_minimum_path_length) - for path in list_simple_paths] - - # use k_shortest_paths to identify the shortest paths - - list_k_shortest_paths = list( - k_shortest_paths(network, - node_key, - other_node_key, - len(list_simple_paths)) - ) - - # sort list_simple_path_lengths and list_simple_paths - - temp_sort = sorted( - ((v, i) for i, v in enumerate(list_simple_path_lengths)), - reverse=False) - - # sorted_list_simple_path_lengths = [temp[0] - # for temp in temp_sort] - - sorted_list_simple_paths = [list_simple_paths[temp[1]] - for temp in temp_sort] - - # assert that the ordered paths match - - assert list_k_shortest_paths == sorted_list_simple_paths - - #************************************************************************** - - # paths as node keys, all lengths - - return_minimum_path_length = True - - # for each pair of nodes - - for node_key in network.nodes(): - - for other_node_key in network.nodes(): - - if not nx.has_path(network, node_key, other_node_key): - - continue - - # for each possible path - - list_simple_paths = [ - path - for path in nx.all_simple_paths(network, - node_key, - other_node_key) - ] - - # remove duplicates (paths_as_node_keys = True) - - for path in list_simple_paths: - while list_simple_paths.count(path) != 1: - list_simple_paths.remove(path) - - # for each path, get all the possible lengths - - list_simple_path_lengths = [ - gis_calc.node_path_length( - network, - path, - return_minimum_length_only=return_minimum_path_length) - for path in list_simple_paths] - - # use k_shortest_paths to identify the shortest paths - - list_k_shortest_paths = list( - k_shortest_paths(network, - node_key, - other_node_key, - len(list_simple_paths)) - ) - - if len(list_simple_path_lengths) == 0: - - assert len(list_k_shortest_paths) == 0 - - continue - - # sort list_simple_path_lengths and list_simple_paths - - temp_sort = sorted( - ((v, i) for i, v in enumerate(list_simple_path_lengths)), - reverse=False) - - # sorted_list_simple_path_lengths = [temp[0] - # for temp in temp_sort] - - sorted_list_simple_paths = [list_simple_paths[temp[1]] - for temp in temp_sort] - - # for each simple path - - assert list_k_shortest_paths == sorted_list_simple_paths - - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** - -def examples_path_length(seed_number: int = None): - - if seed_number != None: - - random.seed(seed_number) - - networks = [ - get_network_A()[0], - get_network_B()[0], - get_network_C()[0], - get_network_D()[0] - ] - - for network in networks: - - protocol_validate_path_length(network) - - #************************************************************************** - - # test unusual cases - - network = get_network_A()[0] - - # empty path - - path = [] - - my_path_length = gis_calc.node_path_length( - network, - path, - return_minimum_length_only=True) - - assert my_path_length == math.inf - - # invalid path, minimum only - - path = ['node1','node2','node3'] - - my_path_length = gis_calc.node_path_length( - network, - path, - return_minimum_length_only=True) - - assert my_path_length == math.inf - - # invalid path, all paths - - path = ['node1','node2','node3'] - - my_path_length = gis_calc.node_path_length( - network, - path, - return_minimum_length_only=False) - - assert my_path_length == [math.inf] - - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** diff --git a/tests/examples_gis_utils.py b/tests/examples_gis_utils.py deleted file mode 100644 index e3f57734a69ad9907b231b2c11d13c8f198e4ced..0000000000000000000000000000000000000000 --- a/tests/examples_gis_utils.py +++ /dev/null @@ -1,2063 +0,0 @@ -# imports - -# standard - -from ast import literal_eval -import random - -# local, external - -from geopandas import GeoDataFrame -from pandas import concat, MultiIndex, Series -import networkx as nx -import osmnx as ox -from shapely.geometry import Point, LineString - -# local, internal - -import src.topupopt.data.gis.identify as gis_iden -import src.topupopt.data.gis.utils as gis_utils -import src.topupopt.data.gis.osm as _osm - -ox.settings.use_cache = True - -#****************************************************************************** -#****************************************************************************** - -def examples(network: nx.MultiDiGraph, seed_number: int = None): - - if type(seed_number) == type(None): - - seed_number = random.randint(0,int(1e5)) - - random.seed(a=seed_number) - - print('Seed number: ' + str(seed_number)) - - #************************************************************************** - - # test io - - examples_gpkg_write_errors() - - example_io_geodataframe( - preserve_original_gdf=True, identify_columns=False - ) - - example_io_geodataframe( - preserve_original_gdf=False, identify_columns=False - ) - - # example_io_geodataframe( - # preserve_original_gdf=True, identify_columns=True - # ) - - # example_io_geodataframe( - # preserve_original_gdf=False, identify_columns=True - # ) - - # TODO: handle GeoJSON files - - # example_io_geodataframe(preserve_original_gdf=True, - # file_extension='.json') - - # example_io_geodataframe(preserve_original_gdf=False, - # file_extension='.json') - - # test generating containers - - example_generate_node_container(False, False) - - example_generate_node_container(True, False) - - example_generate_node_container(False, True) - - example_generate_node_container(True, True) - - example_generate_node_container(False, False, True) - - # trigger error when generating node containers - - example_node_container_error() - - #************************************************************************** - - # test finding the building entrances - - example_identify_entrances_simple_no_driveway_closest() - - # no driveway, all nodes - - example_identify_entrances_simple_no_driveway(AB_right_BC_wrong=True, - create_reversed_arcs=False, - focus_on_node_P_only=False) - - example_identify_entrances_simple_no_driveway(AB_right_BC_wrong=False, - create_reversed_arcs=False, - focus_on_node_P_only=False) - - example_identify_entrances_simple_no_driveway(AB_right_BC_wrong=True, - create_reversed_arcs=True, - focus_on_node_P_only=False) - - example_identify_entrances_simple_no_driveway(AB_right_BC_wrong=False, - create_reversed_arcs=True, - focus_on_node_P_only=False) - - # no driveway, all nodes, multiple addresses per arc - - example_identify_entrances_simple_no_driveway(AB_right_BC_wrong=True, - create_reversed_arcs=False, - focus_on_node_P_only=False, - use_multiple_addresses=True) - - example_identify_entrances_simple_no_driveway(AB_right_BC_wrong=False, - create_reversed_arcs=False, - focus_on_node_P_only=False, - use_multiple_addresses=True) - - example_identify_entrances_simple_no_driveway(AB_right_BC_wrong=True, - create_reversed_arcs=True, - focus_on_node_P_only=False, - use_multiple_addresses=True) - - example_identify_entrances_simple_no_driveway(AB_right_BC_wrong=False, - create_reversed_arcs=True, - focus_on_node_P_only=False, - use_multiple_addresses=True) - - # no driveway, all nodes, revert projection - - example_identify_entrances_simple_no_driveway(AB_right_BC_wrong=False, - create_reversed_arcs=True, - focus_on_node_P_only=False, - revert_to_original_crs=True) - - # no driveway, limited selection of nodes - - example_identify_entrances_simple_no_driveway(AB_right_BC_wrong=True, - create_reversed_arcs=False, - focus_on_node_P_only=True) - - example_identify_entrances_simple_no_driveway(AB_right_BC_wrong=False, - create_reversed_arcs=False, - focus_on_node_P_only=True) - - example_identify_entrances_simple_no_driveway(AB_right_BC_wrong=True, - create_reversed_arcs=True, - focus_on_node_P_only=True) - - example_identify_entrances_simple_no_driveway(AB_right_BC_wrong=False, - create_reversed_arcs=True, - focus_on_node_P_only=True) - - # driveway, all nodes - - example_identify_entrances_simple_driveway(AB_right_BC_wrong=True, - create_reversed_arcs=False) - - example_identify_entrances_simple_driveway(AB_right_BC_wrong=False, - create_reversed_arcs=False) - - example_identify_entrances_simple_driveway(AB_right_BC_wrong=True, - create_reversed_arcs=True) - - example_identify_entrances_simple_driveway(AB_right_BC_wrong=False, - create_reversed_arcs=True) - - # driveway, all nodes, multiple addresses per arc - - example_identify_entrances_simple_driveway(AB_right_BC_wrong=True, - create_reversed_arcs=False, - use_multiple_addresses=True) - - example_identify_entrances_simple_driveway(AB_right_BC_wrong=False, - create_reversed_arcs=False, - use_multiple_addresses=True) - - example_identify_entrances_simple_driveway(AB_right_BC_wrong=True, - create_reversed_arcs=True, - use_multiple_addresses=True) - - example_identify_entrances_simple_driveway(AB_right_BC_wrong=False, - create_reversed_arcs=True, - use_multiple_addresses=True) - - # driveway, limited selection - - example_identify_entrances_simple_driveway(AB_right_BC_wrong=True, - create_reversed_arcs=False, - focus_on_node_P_only=True) - - example_identify_entrances_simple_driveway(AB_right_BC_wrong=False, - create_reversed_arcs=False, - focus_on_node_P_only=True) - - example_identify_entrances_simple_driveway(AB_right_BC_wrong=True, - create_reversed_arcs=True, - focus_on_node_P_only=True) - - example_identify_entrances_simple_driveway(AB_right_BC_wrong=False, - create_reversed_arcs=True, - focus_on_node_P_only=True) - - # driveway variation - - example_identify_entrances_simple_driveway(AB_right_BC_wrong=True, - create_reversed_arcs=False, - focus_on_node_P_only=False, - BD_with_name=False, - BD_right_address=False) - - example_identify_entrances_simple_driveway(AB_right_BC_wrong=True, - create_reversed_arcs=False, - focus_on_node_P_only=False, - BD_with_name=True, - BD_right_address=False) - - example_identify_entrances_simple_driveway(AB_right_BC_wrong=False, - create_reversed_arcs=False, - focus_on_node_P_only=False, - BD_with_name=True, - BD_right_address=False) - - example_identify_entrances_simple_driveway(AB_right_BC_wrong=True, - create_reversed_arcs=False, - focus_on_node_P_only=False, - BD_with_name=True, - BD_right_address=True) - - # special cases - - example_identify_entrances_simple_special(create_reversed_arcs=False, - revert_to_original_crs=False, - focus_on_node_P_only=False) - - example_identify_entrances_simple_special(create_reversed_arcs=True, - revert_to_original_crs=False, - focus_on_node_P_only=False) - - example_identify_entrances_simple_special(create_reversed_arcs=False, - revert_to_original_crs=False, - focus_on_node_P_only=True) - - example_identify_entrances_simple_special(create_reversed_arcs=True, - revert_to_original_crs=False, - focus_on_node_P_only=True) - - # no matching street name in the entire network - - example_identify_entrances_simple_special(create_reversed_arcs=False, - revert_to_original_crs=False, - focus_on_node_P_only=False, - CE_wrong=True) - - # TODO: test a case with multiple parallel arcs - - #************************************************************************** - - # test counting occurrences in a geodataframe - - example_occurrences() - - example_create_osmnx_gdf() - - example_discrete_plot_gdf() - - examples_convert_edge_paths() - - example_get_directed(network) - - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** -#****************************************************************************** -#****************************************************************************** -#****************************************************************************** -#****************************************************************************** - -def get_node_gdf_A(right_address: str = 'right', - wrong_address: str = 'wrong', - country_code: str = _osm.KEY_COUNTRY_DK): - - #************************************************************************** - - # 4 nodes: A, B, C and P - - # node A - - osmid_A = 'A' - - xy_A = (0, 0) - - geo_A = Point(xy_A) - - node_uid_A = 5123 - - address_A = None - - # node B - - osmid_B = 'B' - - xy_B = (1, 0) - - geo_B = Point(xy_B) - - node_uid_B = 1844 - - address_B = None - - # node C - - osmid_C = 'C' - - xy_C = (2, 0) - - geo_C = Point(xy_C) - - node_uid_C = 1845 - - address_C = None - - # node P - - osmid_P = 'P' - - xy_P = (0.5, 1) - - geo_P = Point(xy_P) - - node_uid_P = 9475 - - address_P = right_address - - #************************************************************************** - - # geodataframe: should have 'addr:street', 'osak:identifier' and index - - gdf = GeoDataFrame( - data={_osm.KEY_OSM_STREET: [address_A, # A - address_B, # B - address_C, # C - address_P], # P - _osm.KEY_OSM_BUILDING_ENTRANCE_ID[country_code]: [node_uid_A, # A - node_uid_B, # B - node_uid_C, # C - node_uid_P],# P - _osm.KEY_OSMNX_ELEMENT_TYPE: ['node','node','node','node'], - _osm.KEY_OSMNX_OSMID: [osmid_A, osmid_B, osmid_C, osmid_P]}, - geometry=[geo_A, geo_B, geo_C, geo_P] - ) - - gdf.set_index([_osm.KEY_OSMNX_ELEMENT_TYPE, _osm.KEY_OSMNX_OSMID], - drop=True, - inplace=True) - - return gdf - - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** - -def get_node_gdf_B(right_address: str = 'right', - wrong_address: str = 'wrong', - country_code: str = _osm.KEY_COUNTRY_DK): - - #************************************************************************** - - gdf = get_node_gdf_A(right_address=right_address, - wrong_address=wrong_address, - country_code=country_code) - - # add another node D closer to P than A, B and C - - osmid_D = 'D' - - xy_D = (0.75, 1) - - geo_D = Point(xy_D) - - node_uid_D = 8842 - - # address_D = None - - gdf_node_D = GeoDataFrame({_osm.KEY_OSMNX_GEOMETRY: [geo_D], - _osm.KEY_OSM_BUILDING_ENTRANCE_ID[country_code]: - [node_uid_D], - #_osm.KEY_OSM_STREET: [address_D],# P - #_osm.KEY_OSMNX_ELEMENT_TYPE: ['node'], - #_osm.KEY_OSMNX_OSMID: [osmid_D] - }, - #index=[('node', osmid_D)], - index=MultiIndex.from_tuples( - [('node',osmid_D)], - names=[_osm.KEY_OSMNX_ELEMENT_TYPE, - _osm.KEY_OSMNX_OSMID]) - ) - - #************************************************************************** - - gdf = concat([gdf, gdf_node_D]) - - return gdf - - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** - -def get_node_gdf_C(right_address: str = 'right', - wrong_address: str = 'wrong', - country_code: str = _osm.KEY_COUNTRY_DK): - - #************************************************************************** - - gdf = get_node_gdf_B(right_address=right_address, - wrong_address=wrong_address, - country_code=country_code) - - # add another node E, east of C - - osmid_E = 'E' - - xy_E = (3, 0) - - geo_E = Point(xy_E) - - node_uid_E = 9173 - - #address_E = right_address - - gdf_node_E = GeoDataFrame({_osm.KEY_OSMNX_GEOMETRY: [geo_E], - _osm.KEY_OSM_BUILDING_ENTRANCE_ID[country_code]: - [node_uid_E], - #_osm.KEY_OSM_STREET: [address_E], - }, - #index=[('node', osmid_E)] - index=MultiIndex.from_tuples( - [('node',osmid_E)], - names=[_osm.KEY_OSMNX_ELEMENT_TYPE, - _osm.KEY_OSMNX_OSMID]) - ) - - #************************************************************************** - - gdf = concat([gdf, gdf_node_E]) - - return gdf - - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** - -def get_network_A(gdf: GeoDataFrame, - right_address: str = 'right', - wrong_address: str = 'wrong', - AB_right_BC_wrong: bool = True, - country_code: str = _osm.KEY_COUNTRY_DK, - use_multiple_addresses: bool = False): - - #************************************************************************** - - # create network - - (node_keys, - node_data_container, - node_key_to_gdf_index_dict) = gis_utils.prepare_node_data_from_geodataframe( - include_geometry=True, - gdf=gdf) - - network = nx.MultiDiGraph() - - network.graph['crs'] = "EPSG:4326" - - network.add_nodes_from(node_data_container) - - #************************************************************************** - - # two arcs: AB and BC - - node_key_A = 'A' - node_key_B = 'B' - node_key_C = 'C' - - # arc AB - - geo_AB = LineString( - [(network.nodes[node_key_A][_osm.KEY_OSMNX_X], - network.nodes[node_key_A][_osm.KEY_OSMNX_Y]), - (network.nodes[node_key_B][_osm.KEY_OSMNX_X], - network.nodes[node_key_B][_osm.KEY_OSMNX_Y])] - ) - - length_AB = network.nodes[node_key_A][_osm.KEY_OSMNX_GEOMETRY].distance( - network.nodes[node_key_B][_osm.KEY_OSMNX_GEOMETRY] - ) - - network.add_edge(node_key_A, - node_key_B, - **{_osm.KEY_OSMNX_GEOMETRY: geo_AB, - _osm.KEY_OSMNX_LENGTH: length_AB, - _osm.KEY_OSMNX_NAME: ( - ['HZ', (right_address if AB_right_BC_wrong else - wrong_address)] if use_multiple_addresses else ( - right_address if AB_right_BC_wrong else - wrong_address) - ) - }) - - # arc BC - - geo_BC = LineString( - [(network.nodes[node_key_B][_osm.KEY_OSMNX_X], - network.nodes[node_key_B][_osm.KEY_OSMNX_Y]), - (network.nodes[node_key_C][_osm.KEY_OSMNX_X], - network.nodes[node_key_C][_osm.KEY_OSMNX_Y])] - ) - - length_BC = network.nodes[node_key_B][_osm.KEY_OSMNX_GEOMETRY].distance( - network.nodes[node_key_C][_osm.KEY_OSMNX_GEOMETRY] - ) - - network.add_edge(node_key_B, - node_key_C, - **{_osm.KEY_OSMNX_GEOMETRY: geo_BC, - _osm.KEY_OSMNX_LENGTH: length_BC, - _osm.KEY_OSMNX_NAME: ( - [(wrong_address if AB_right_BC_wrong else - right_address), 'UQ'] if use_multiple_addresses - else (wrong_address if AB_right_BC_wrong else - right_address) - ) - }) - - #************************************************************************** - - return network, node_keys, node_key_to_gdf_index_dict - - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** - -def get_network_B(gdf: GeoDataFrame, - right_address: str = 'right', - wrong_address: str = 'wrong', - AB_right_BC_wrong: bool = True, - BD_with_name: bool = False, - BD_right_address: bool = False, - country_code: str = _osm.KEY_COUNTRY_DK, - use_multiple_addresses: bool = False): - - #************************************************************************** - - # create network - - network, node_keys, node_key_to_gdf_index_dict = get_network_A( - gdf=gdf, - right_address=right_address, - wrong_address=wrong_address, - country_code=country_code, - AB_right_BC_wrong=AB_right_BC_wrong, - use_multiple_addresses=use_multiple_addresses) - - # add nameless BD arc - - node_key_B = 'B' - node_key_D = 'D' - - # arc AB - - geo_BD = LineString( - [(network.nodes[node_key_B][_osm.KEY_OSMNX_X], - network.nodes[node_key_B][_osm.KEY_OSMNX_Y]), - (network.nodes[node_key_D][_osm.KEY_OSMNX_X], - network.nodes[node_key_D][_osm.KEY_OSMNX_Y])] - ) - - length_BD = network.nodes[node_key_B][_osm.KEY_OSMNX_GEOMETRY].distance( - network.nodes[node_key_D][_osm.KEY_OSMNX_GEOMETRY] - ) - - BD_dict = { - _osm.KEY_OSMNX_GEOMETRY: geo_BD, - _osm.KEY_OSMNX_LENGTH: length_BD, - #_osm.KEY_OSMNX_NAME: ( # no name for BD - # right_address if AB_right_BC_wrong else - # wrong_address) - } - - if BD_with_name: - - BD_dict[_osm.KEY_OSMNX_NAME] = ( - right_address if BD_right_address else wrong_address - ) - - network.add_edge(node_key_B, - node_key_D, - **BD_dict) - - #************************************************************************** - - return network, node_keys, node_key_to_gdf_index_dict - - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** - -def get_network_C(gdf: GeoDataFrame, - right_address: str = 'right', - wrong_address: str = 'wrong', - country_code: str = _osm.KEY_COUNTRY_DK, - CE_wrong: bool = False, - use_multiple_addresses: bool = False): - - #************************************************************************** - - # create network - - network, node_keys, node_key_to_gdf_index_dict = get_network_B( - gdf=gdf, - right_address=wrong_address, - wrong_address=wrong_address, - country_code=country_code, - AB_right_BC_wrong=True) - - # add a CE arc with the right name - - node_key_C = 'C' - node_key_E = 'E' - - # arc AB - - geo_CE = LineString( - [(network.nodes[node_key_C][_osm.KEY_OSMNX_X], - network.nodes[node_key_C][_osm.KEY_OSMNX_Y]), - (network.nodes[node_key_E][_osm.KEY_OSMNX_X], - network.nodes[node_key_E][_osm.KEY_OSMNX_Y])] - ) - - length_CE = network.nodes[node_key_C][_osm.KEY_OSMNX_GEOMETRY].distance( - network.nodes[node_key_E][_osm.KEY_OSMNX_GEOMETRY] - ) - - network.add_edge(node_key_C, - node_key_E, - **{_osm.KEY_OSMNX_GEOMETRY: geo_CE, - _osm.KEY_OSMNX_LENGTH: length_CE, - _osm.KEY_OSMNX_NAME: ( - wrong_address if CE_wrong else right_address - ) - }) - - #************************************************************************** - - return network, node_keys, node_key_to_gdf_index_dict - - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** - -def example_identify_entrances_simple_special( - create_reversed_arcs: bool = False, - revert_to_original_crs: bool = False, - focus_on_node_P_only: bool = False, - CE_wrong: bool = False): - - # get problem details - - country_code = _osm.KEY_COUNTRY_DK - - gdf = get_node_gdf_C(country_code=country_code) - - network, node_keys, node_key_to_gdf_index_dict = get_network_C( - gdf=gdf, - country_code=country_code, - CE_wrong=CE_wrong) - - # create reverse arcs - - if create_reversed_arcs: - - previous_arc_keys = list( - arc_key for arc_key in network.edges(keys=True) - ) - - for arc_key in previous_arc_keys: - - arc_dict = network.get_edge_data(u=arc_key[0], - v=arc_key[1], - key=arc_key[2]) - - network.add_edge(u_for_edge=arc_key[1], - v_for_edge=arc_key[0], - **arc_dict) - - # find out which is the closest arc - - if focus_on_node_P_only: - - nearest_arc_keys, _, _ = gis_utils.identify_building_entrance_arcs( - gdf=gdf, - gdf_street_column=_osm.KEY_OSM_STREET, - network=network, - node_key_to_gdf_index_dict={ - 'P': node_key_to_gdf_index_dict['P'] - }, - crs=None, - revert_to_original_crs=revert_to_original_crs) - - else: - - nearest_arc_keys, _, _ = gis_utils.identify_building_entrance_arcs( - gdf=gdf, - gdf_street_column=_osm.KEY_OSM_STREET, - network=network, - node_key_to_gdf_index_dict=node_key_to_gdf_index_dict, - crs=None, - revert_to_original_crs=revert_to_original_crs) - - # validate the outcome - - if CE_wrong: - - # no arcs with the right address, the closest arc should be selected - - assert (('B','D', 0) == nearest_arc_keys['P'] or - ('D','B', 0) == nearest_arc_keys['P']) - - else: - - # CE has the right address, it should be selected - - assert (('C','E', 0) == nearest_arc_keys['P'] or - ('E','C', 0) == nearest_arc_keys['P']) - - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** - -def example_identify_entrances_simple_driveway( - AB_right_BC_wrong: bool = True, - create_reversed_arcs: bool = False, - revert_to_original_crs: bool = False, - focus_on_node_P_only: bool = False, - BD_with_name: bool = False, - BD_right_address: bool = False, - use_multiple_addresses: bool = False): - - - # get problem details - - country_code = _osm.KEY_COUNTRY_DK - - gdf = get_node_gdf_B(country_code=country_code) - - network, node_keys, node_key_to_gdf_index_dict = get_network_B( - gdf=gdf, - country_code=country_code, - BD_with_name=BD_with_name, - BD_right_address=BD_right_address, - AB_right_BC_wrong=AB_right_BC_wrong, - use_multiple_addresses=use_multiple_addresses) - - # create reverse arcs - - if create_reversed_arcs: - - previous_arc_keys = list( - arc_key for arc_key in network.edges(keys=True) - ) - - for arc_key in previous_arc_keys: - - arc_dict = network.get_edge_data(u=arc_key[0], - v=arc_key[1], - key=arc_key[2]) - - network.add_edge(u_for_edge=arc_key[1], - v_for_edge=arc_key[0], - **arc_dict) - - # find out which is the closest arc - - if focus_on_node_P_only: - - nearest_arc_keys, _, _ = gis_utils.identify_building_entrance_arcs( - gdf=gdf, - gdf_street_column=_osm.KEY_OSM_STREET, - network=network, - node_key_to_gdf_index_dict={ - 'P': node_key_to_gdf_index_dict['P'] - }, - crs=None, - revert_to_original_crs=revert_to_original_crs) - - else: - - nearest_arc_keys, _, _ = gis_utils.identify_building_entrance_arcs( - gdf=gdf, - gdf_street_column=_osm.KEY_OSM_STREET, - network=network, - node_key_to_gdf_index_dict=node_key_to_gdf_index_dict, - crs=None, - revert_to_original_crs=revert_to_original_crs) - - # validate the outcome - - if BD_with_name and not BD_right_address: - - if AB_right_BC_wrong: - - assert (('A','B', 0) == nearest_arc_keys['P'] or - ('B','A', 0) == nearest_arc_keys['P']) - - else: - - assert (('B','C', 0) == nearest_arc_keys['P'] or - ('C','B', 0) == nearest_arc_keys['P']) - - # elif BD_with_name and BD_right_address: - - # assert (('B','D', 0) == nearest_arc_keys['P'] or - # ('D','B', 0) == nearest_arc_keys['P']) - - else: - - assert (('B','D', 0) == nearest_arc_keys['P'] or - ('D','B', 0) == nearest_arc_keys['P']) - - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** - -def example_identify_entrances_simple_no_driveway( - AB_right_BC_wrong: bool = True, - create_reversed_arcs: bool = False, - focus_on_node_P_only: bool = False, - revert_to_original_crs: bool = False, - use_multiple_addresses: bool = False): - - # get problem details - - country_code = _osm.KEY_COUNTRY_DK - - gdf = get_node_gdf_A(country_code=country_code) - - network, node_keys, node_key_to_gdf_index_dict = get_network_A( - gdf=gdf, - country_code=country_code, - AB_right_BC_wrong=AB_right_BC_wrong, - use_multiple_addresses=use_multiple_addresses) - - # create reverse arcs - - if create_reversed_arcs: - - previous_arc_keys = list( - arc_key for arc_key in network.edges(keys=True) - ) - - for arc_key in previous_arc_keys: - - arc_dict = network.get_edge_data(u=arc_key[0], - v=arc_key[1], - key=arc_key[2]) - - network.add_edge(u_for_edge=arc_key[1], - v_for_edge=arc_key[0], - **arc_dict) - - # find out which is the closest arc - - if focus_on_node_P_only: - - nearest_arc_keys, _, _ = gis_utils.identify_building_entrance_arcs( - gdf=gdf, - gdf_street_column=_osm.KEY_OSM_STREET, - network=network, - node_key_to_gdf_index_dict={ - 'P': node_key_to_gdf_index_dict['P'] - }, - crs=None, - revert_to_original_crs=revert_to_original_crs) - - else: - - nearest_arc_keys, _, _ = gis_utils.identify_building_entrance_arcs( - gdf=gdf, - gdf_street_column=_osm.KEY_OSM_STREET, - network=network, - node_key_to_gdf_index_dict=node_key_to_gdf_index_dict, - crs=None, - revert_to_original_crs=revert_to_original_crs) - - # validate the outcome - - if AB_right_BC_wrong: - - # the closest arc should be AB - - assert ('A','B', 0) == nearest_arc_keys['P'] - - else: - - # the closest arc should be BC - - assert ('B','C', 0) == nearest_arc_keys['P'] - - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** - -def example_identify_entrances_simple_no_driveway_closest(): - - # get problem details - - country_code = _osm.KEY_COUNTRY_DK - - gdf = get_node_gdf_A(country_code=country_code) - - network, node_keys, node_key_to_gdf_index_dict = get_network_A( - gdf=gdf, - country_code=country_code) - - # find out which is the closest arc - - nearest_arc_keys, network = gis_iden.identify_edge_closest_to_node( - network, - node_keys=['P']) - - # the closest arc should be AB - - assert ('A','B', 0) in nearest_arc_keys - - assert len(nearest_arc_keys) == 1 - -#****************************************************************************** -#****************************************************************************** - -# test generating a node data container (to create nodes in a network object) - -def example_generate_node_container(include_geometry: bool = False, - include_street_column: bool = False, - use_id_as_node_key: bool = False): - - # get problem details - - country_code = _osm.KEY_COUNTRY_DK - - gdf = get_node_gdf_A(country_code=country_code) - - # prepare node data - - (node_keys, - node_data_container, - _) = gis_utils.prepare_node_data_from_geodataframe( - gdf=gdf, - node_key_column=(_osm.KEY_OSM_BUILDING_ENTRANCE_ID[country_code] - if use_id_as_node_key else None), - include_columns=([_osm.KEY_OSM_STREET] - if include_street_column else None), - include_geometry=include_geometry) - - # node key to gdf index - - node_key_to_gdf_index_dict = { - node_key: ( - 'node', gdf[ - gdf[_osm.KEY_OSM_BUILDING_ENTRANCE_ID[country_code]]==node_key - ].index[0][1] if use_id_as_node_key else node_key - ) - for node_key in node_keys} - - # add nodes to new network - - network = nx.MultiDiGraph() - - network.graph['crs'] = "EPSG:4326" - - network.add_nodes_from(node_data_container) - - # verify the data - - for node_key in node_keys: - - assert network.has_node(node_key) - - gdf_index = node_key_to_gdf_index_dict[node_key] - - assert (network.nodes[node_key][_osm.KEY_OSMNX_X] == - gdf.loc[gdf_index][gis_utils.KEY_GPD_GEOMETRY].x) - - assert (network.nodes[node_key][_osm.KEY_OSMNX_Y] == - gdf.loc[gdf_index][gis_utils.KEY_GPD_GEOMETRY].y) - - if include_geometry: - - assert (network.nodes[node_key][_osm.KEY_OSMNX_GEOMETRY] == - gdf.loc[gdf_index][gis_utils.KEY_GPD_GEOMETRY]) - - if include_street_column: - - assert (network.nodes[node_key][_osm.KEY_OSM_STREET] == - gdf.loc[gdf_index][_osm.KEY_OSM_STREET]) - -#****************************************************************************** -#****************************************************************************** - -# trigger ValueError by using an index that differs from the osmnx-provided one - -def example_node_container_error(country_code: str = 'dk'): - - # get problem details - - country_code = _osm.KEY_COUNTRY_DK - - gdf = get_node_gdf_A(country_code=country_code) - - # modify index to trigger error - - gdf.set_index(_osm.KEY_OSM_BUILDING_ENTRANCE_ID[country_code], - inplace=True) - - # trigger the error - - error_triggered = False - try: - (node_keys, - node_data_container, - _) = gis_utils.prepare_node_data_from_geodataframe( - gdf=gdf) - except ValueError: - error_triggered = True - assert error_triggered - -#****************************************************************************** -#****************************************************************************** - -# test the counting of occurrences in a geodataframe - -def example_occurrences(): - - gdf = GeoDataFrame( - data={'column_A': [1, 2, 2, 3, 4], - 'column_B': [5.46, 5.46, 7, 7, 7.3], - 'column_C': ['a','a','a','a','a'], - 'column_D': ['a','b','c','d','e'], - 'column_E': ['hello','goodbye',None,'hello',None]}, - geometry=[Point(0,1), Point(4,5), Point(2,3), Point(4,6), Point(7,2)] - ) - - solution = { - 'column_A': {1: 1, 2: 2, 3: 1, 4: 1}, - 'column_B': {5.46: 2, 7: 2, 7.3: 1}, - 'column_C': {'a': 5}, - 'column_D': {'a': 1, 'b': 1, 'c': 1, 'd': 1, 'e': 1}, - 'column_E': {'hello': 2, 'goodbye': 1, None: 2} - } - - # all elements - - for column in solution: - - assert gis_utils.count_ocurrences(gdf, column) == solution[column] - - # specific ones - - assert gis_utils.count_ocurrences(gdf, 'column_A', [1, 2]) == {1: 1, 2: 2} - - assert gis_utils.count_ocurrences(gdf, 'column_A', [10]) == {10: 0} - - assert ( - gis_utils.count_ocurrences(gdf, 'column_B', [7, 7.3]) == {7: 2, 7.3: 1} - ) - - assert gis_utils.count_ocurrences(gdf, 'column_C', ['a']) == {'a': 5} - - assert gis_utils.count_ocurrences(gdf, 'column_C', ['b']) == {'b': 0} - - assert gis_utils.count_ocurrences(gdf, 'column_D', ['b']) == {'b': 1} - - assert ( - gis_utils.count_ocurrences(gdf, 'column_E', ['hello']) == {'hello': 2} - ) - - assert ( - gis_utils.count_ocurrences(gdf, 'column_E', [None]) == {None: 2} - ) - -#****************************************************************************** -#****************************************************************************** - -# test creating osmnx-like geodataframes for nodes - -def example_create_osmnx_gdf(): - - # method for basic gdf compliance verification - - def verify_osmnx_gdf(gdf: GeoDataFrame, - extra_column_names: list = None): - - # index format - - assert type(gdf.index) == MultiIndex - - assert len(gdf.index.names) == 2 - - assert _osm.KEY_OSMNX_OSMID in gdf.index.names - - assert _osm.KEY_OSMNX_ELEMENT_TYPE in gdf.index.names - - # geometry column - - assert _osm.KEY_OSMNX_GEOMETRY in gdf.columns - - # extra columns - - if type(extra_column_names) != type(None): - - for extra_column_name in extra_column_names: - - assert extra_column_name in gdf.columns - - # the elements - - for index in gdf.index: - - # must be a node - - assert 'node' in index[0] # first position of multi-index - - # must have point geometry - - assert type(gdf.loc[index][_osm.KEY_OSMNX_GEOMETRY]) == Point - - # test gdf - - gdf_example = gis_utils.GeoDataFrame( - { - _osm.KEY_OSMNX_GEOMETRY: [Point(152, 546)], - }, - index=MultiIndex.from_tuples([('node', 'badfnbjiadbnd')], - names=[_osm.KEY_OSMNX_ELEMENT_TYPE, - _osm.KEY_OSMNX_OSMID]) - ) - - verify_osmnx_gdf(gdf_example) - - # single node - - _latitude = 23 - _longitude = -12 - - gdf_single = gis_utils.create_node_geodataframe( - longitudes=(_longitude,), - latitudes=(_latitude,) - ) - - verify_osmnx_gdf(gdf_single) - - # single node, using a specific key - - mynodekey = 'mynodekeyishere' - - gdf_single = gis_utils.create_node_geodataframe( - longitudes=(_longitude,), - latitudes=(_latitude,), - osmids=(mynodekey,) - ) - - verify_osmnx_gdf(gdf_single) - - assert gdf_single.index[0][1] == mynodekey - - # single node, with extra columns - - gdf_single = gis_utils.create_node_geodataframe( - longitudes=(_longitude,), - latitudes=(_latitude,), - osmids=(mynodekey,), - long=(_longitude,), - lat=(_latitude,) - ) - - verify_osmnx_gdf(gdf_single, - extra_column_names=('long','lat')) - - assert gdf_single.index[0][1] == mynodekey - assert gdf_single.iloc[0]['long'] == _longitude - assert gdf_single.iloc[0]['lat'] == _latitude - - #************************************************************************** - - # multiple nodes - - _latitudes = (23,45,73) - _longitudes = (-12,33,24) - - gdf_multi = gis_utils.create_node_geodataframe( - longitudes=_longitudes, - latitudes=_latitudes - ) - - verify_osmnx_gdf(gdf_multi) - - # multiple nodes and specific keys - - _osmids = (54,'a4h4',44.323) - - gdf_multi = gis_utils.create_node_geodataframe( - longitudes=_longitudes, - latitudes=_latitudes, - osmids=_osmids - ) - - verify_osmnx_gdf(gdf_multi) - - for i in range(len(gdf_multi)): - - assert gdf_multi.index[i][1] == _osmids[i] - - # multiple nodes and extra columns - - gdf_multi = gis_utils.create_node_geodataframe( - longitudes=_longitudes, - latitudes=_latitudes, - osmids=_osmids, - long=_longitudes, - lat=_latitudes - ) - - verify_osmnx_gdf(gdf_multi, - extra_column_names=('long','lat')) - - for i in range(len(gdf_multi)): - - assert gdf_multi.index[i][1] == _osmids[i] - assert gdf_multi.iloc[i]['long'] == _longitudes[i] - assert gdf_multi.iloc[i]['lat'] == _latitudes[i] - - #************************************************************************** - - # trigger errors - - # mismatched longitudes and latitudes - - error_triggered = False - try: - _ = gis_utils.create_node_geodataframe( - longitudes=(_longitude,528), - latitudes=(_latitude,) - ) - except ValueError: - error_triggered = True - assert error_triggered - - # mismatched longitudes/latitudes and osmids - - error_triggered = False - try: - _ = gis_utils.create_node_geodataframe( - longitudes=(_longitude,528), - latitudes=(_latitude,92), - osmids=(59,482,135) - ) - except ValueError: - error_triggered = True - assert error_triggered - -#****************************************************************************** -#****************************************************************************** - -# TODO: test plotting using cached data - -#****************************************************************************** -#****************************************************************************** - -# test writing a GeoDataFrame with containers - -def example_io_geodataframe(preserve_original_gdf: bool = True, - identify_columns: bool = False, - file_extension: str = '.gpkg'): - - #************************************************************************** - #************************************************************************** - - filename = 'tests/mygdffile'+file_extension - - # print('bing') - # print('preserve_original_gdf:'+str(preserve_original_gdf)) - # print('identify_columns:'+str(identify_columns)) - # print('file_extension:'+str(file_extension)) - - def verify_gdf_conformity(gdf, new_gdf, preserve_original_gdf): - - # verify conformity - # print(gdf) - # print(new_gdf) - - # for each column in the original gdf - - for column in gdf.columns: - - # assert that the column exists or that it is a merged 1 (no need) - - assert (column in new_gdf.columns or gis_utils.RKW_GPKG == column) - - # packed column - - if gis_utils.RKW_GPKG == column: - - # duplicates - - # if the original was preserved, there should be no packed col. - # hence, it cannot have been preserved - - assert not preserve_original_gdf - - # for each key in the packed column - # print(gdf.columns) - # print(gdf[column]) - - for index in gdf.index: - - contents_dict = literal_eval(gdf.loc[(index,column)]) - - for new_gdf_column in contents_dict.keys(): - - assert new_gdf_column in new_gdf.columns - # print(new_gdf_column) - # print("......................................................") - # #print(index) - # print(gdf[column].dtype) - # print(new_gdf[new_gdf_column].dtype) - # print(contents_dict[new_gdf_column]) - # print(new_gdf.loc[(index, new_gdf_column)]) - # print(type(contents_dict[new_gdf_column])) - # print(type(new_gdf.loc[(index, new_gdf_column)])) - # print(repr(contents_dict[new_gdf_column])) - # print(repr(new_gdf.loc[(index, new_gdf_column)])) - - if new_gdf_column in special_columns: - - # the contents are containers: use literal_eval - - assert repr( - literal_eval( - contents_dict[new_gdf_column] - ) - ) == repr( - new_gdf.loc[(index, new_gdf_column)] - ) - - else: # the contents are not containers - - # direct comparison - # TODO: reach this statement - assert repr(contents_dict[new_gdf_column] - ) == repr( - new_gdf.loc[(index, new_gdf_column)] - ) - - continue - - #****************************************************************** - #****************************************************************** - - # non-packed column - - for index in gdf.index: - - if preserve_original_gdf: - - # the original gdf has been preserved - - # print("......................................................") - # print(gdf[column].dtype) - # print(new_gdf[column].dtype) - # print(gdf.loc[(index, column)]) - # print(new_gdf.loc[(index, column)]) - # print(type(gdf.loc[(index, column)])) - # print(type(new_gdf.loc[(index, column)])) - # print(repr(gdf.loc[(index, column)])) - # print(repr(new_gdf.loc[(index, column)])) - - # the types should match - - assert type( - gdf.loc[(index, column)] - ) == type( - new_gdf.loc[(index, column)] - ) - - # sets require special behaviour - - if (type(gdf.loc[(index, column)]) == set or - column == gis_utils.KEY_GPD_GEOMETRY): - - # sets are non-ordered: - # repr() may reveal different results - - assert ( - gdf.loc[(index, column)] == - new_gdf.loc[(index, column)] - ) - - else: # standard - - assert repr( - gdf.loc[(index, column)] - ) == repr( - new_gdf.loc[(index, column)] - ) - - else: # the original gdf has not been preserved - - # print("......................................................") - # print(gdf.columns) - # print(gdf[column].dtype) - # print(new_gdf[column].dtype) - # print(gdf[column].loc[index]) - # print(new_gdf[column].loc[index]) - # print(type(gdf[column].loc[index])) - # print(type(new_gdf[column].loc[index])) - # print(repr(gdf[column].loc[index])) - # print(repr(new_gdf[column].loc[index])) - - if column == gis_utils.KEY_GPD_GEOMETRY: - - # assert ( - # gdf[column].loc[index] == - # new_gdf[column].loc[index] - # ) - - assert ( - gdf.loc[(index, column)] == - new_gdf.loc[(index, column)] - ) - - elif column in special_columns: - - assert repr( - literal_eval(gdf.loc[(index, column)]) - ) == repr( - new_gdf.loc[(index, column)] - ) - - else: - - assert repr( - gdf.loc[(index, column)] - ) == repr( - new_gdf.loc[(index, column)] - ) - - #************************************************************************** - #************************************************************************** - - # TODO: test methods without specifying the columns - - #************************************************************************** - #************************************************************************** - - # gdf object with simple index, undeclared - - gdf = GeoDataFrame( - {'id': [1, 2, 3], - 'mymymy': [None,None,1.23], - 'another_id': [53.4,54.4,55.4], - 'not_another_id': ['you','they','us'], - 'column_of_lists': [list([0,1,2]), - list([3,4,5]), - list([6,7,8])], - 'column_of_tuples': [tuple([-1,-2,-3]), - tuple([-4,-5,-6]), - tuple([-7,-8,-9])], - 'column_of_sets': [set([-1,-2,-3]), - set([-4,-5,-6]), - set([-7,-8,-9])], - 'column_of_dicts': [{1:34,6:'a',5:46.32}, - {'a':575,4:[],3:(2,3)}, - {(4,5):3,4:{2:5},3:4}], - 'column_of_strs': ["set([-1,-2,-3])", - "set([-4,-5,-6])", - "set([-7,-8,-9])"], - 'another_id2': ['hello',53.4,None], # requires special handling - 'another_id3': [53.4,'hello',None], # requires special handling - 'yet_another_id': [None,None,None], # requires special handling - }, - geometry=[LineString([(3, 2), (7, 7)]), - LineString([(3, 7), (7, 2)]), - LineString([(6, 2), (6, 6)])] - ) - - # identify the columns that require special treatment - - if identify_columns: - - # find the columns automatically - - special_columns = None # TODO: reach this statement - - else: - - special_columns = ( - 'column_of_lists', - 'column_of_tuples', - 'column_of_sets', - 'column_of_dicts', - #'column_of_strs' # can be omitted - 'another_id2', - 'another_id3', - 'yet_another_id' - ) - - # find the columns automatically - - set_packable_columns = gis_utils.find_gpkg_packable_columns(gdf) - - # make sure the columns can be identified - - for packable_column in set_packable_columns: - - assert packable_column in special_columns - - # write file - - gis_utils.write_gdf_file( - gdf=gdf, - filename=filename, - columns_to_pack=special_columns, - preserve_original=preserve_original_gdf - ) - - new_gdf = gis_utils.read_gdf_file( - filename=filename, - packed_columns=special_columns) - - # verify conformity - - verify_gdf_conformity(gdf, new_gdf, preserve_original_gdf) - - #************************************************************************** - #************************************************************************** - - # gdf object with simple index, declared - - gdf = GeoDataFrame( - {'id': [1, 2, 3], - 'column_of_lists': [list([0,1,2]), - list([3,4,5]), - list([6,7,8])], - 'column_of_tuples': [tuple([-1,-2,-3]), - tuple([-4,-5,-6]), - tuple([-7,-8,-9])], - 'column_of_sets': [set([-1,-2,-3]), - set([-4,-5,-6]), - set([-7,-8,-9])], - 'column_of_dicts': [{1:34,6:'a',5:46.32}, - {'a':575,4:[],3:(2,3)}, - {(4,5):3,4:{2:5},3:4}], - 'column_of_strs': ["set([-1,-2,-3])", - "set([-4,-5,-6])", - "set([-7,-8,-9])"] - }, - geometry=[LineString([(3, 2), (7, 7)]), - LineString([(3, 7), (7, 2)]), - LineString([(6, 2), (6, 6)])], - index=['a','b','c'], # index is declared - ) - - # identify the columns that require special treatment - - if identify_columns: - - # find the columns automatically - - special_columns = None # TODO: reach this statement - - else: - - special_columns = ( - 'column_of_lists', - 'column_of_tuples', - 'column_of_sets', - 'column_of_dicts', - 'column_of_strs' - ) - - # find the columns automatically - - set_packable_columns = gis_utils.find_gpkg_packable_columns(gdf) - - # make sure the columns can be identified - - for packable_column in set_packable_columns: - - assert packable_column in special_columns - - # write file - - gis_utils.write_gdf_file( - gdf=gdf, - filename=filename, - columns_to_pack=special_columns, - preserve_original=preserve_original_gdf - ) - - new_gdf = gis_utils.read_gdf_file( - filename=filename, - packed_columns=special_columns, - index='index') # index has to be specified - - # verify conformity - - verify_gdf_conformity(gdf, new_gdf, preserve_original_gdf) - - #************************************************************************** - #************************************************************************** - - # gdf object with multi-index, declared - - gdf = gis_utils.GeoDataFrame( - { - 'other_column': [1,2,3], - 'column_a': ['nadbnppadfb','agasdgnp','adfgdn'], - 'column_b': [12517.4247,0.54673,0.3723], - 'column_c': [(1,2,3,4),(5,6,7,8),(44,1247)], - 'column_d': [{'beans':'cheese','lollipops':'dentist'},{},{1:3}], - 'column_e': [[1,2,3],[4.5,4.6,4.7],[9.0,10.0,11.0]], - 'column_f': [{4,5,6},{5.64435,0.7545,1.4634},{'a','b','c'}], - 'geometry': [Point(12, 55), Point(2,4), Point(3,6)], - }, - index=MultiIndex.from_tuples([('a', 124), - ('b', 754), - ('c', 234)], - names=['index1', 'index2']) - ) - - # identify the columns that require special treatment - - if identify_columns: - - # find the columns automatically - - special_columns = None # TODO: reach this statement - - else: - - special_columns = ( - 'column_c', - 'column_d', - 'column_e', - 'column_f' - ) - - # find the columns automatically - - set_packable_columns = gis_utils.find_gpkg_packable_columns(gdf) - - # make sure the columns can be identified - - for packable_column in set_packable_columns: - - assert packable_column in special_columns - - # write file - - gis_utils.write_gdf_file( - gdf=gdf, - filename=filename, - columns_to_pack=special_columns, - preserve_original=preserve_original_gdf - ) - - new_gdf = gis_utils.read_gdf_file( - filename=filename, - packed_columns=special_columns, - index=['index1', 'index2']) - - # verify conformity - - verify_gdf_conformity(gdf, new_gdf, preserve_original_gdf) - - #************************************************************************** - #************************************************************************** - - # gdf with column names matching in lower case (not good for .gpkg files) - - gdf = GeoDataFrame( - {'id': [1, 2, 3], - 'mymymy': [901.1,53.4,None], - 'another_id': [53.4,54.4,55.4], - 'not_another_id': ['you','they','us'], - 'abc': [list([0,1,2]), - list([3,4,5]), - list([6,7,8])], - 'ABC': [tuple([-1,-2,-3]), - tuple([-4,-5,-6]), - tuple([-7,-8,-9])], - 'Abc': ['here', - 'there', - 'nowhere'], - 'aBc': [(1,2,3,4), - [5,6,7,8], - {9,10,11,12}], - 'aBC': [53.643, - {3:6,7:'goodbye'}, - None], - 'ABc': [None, - None, - None], - 'mymymy2': ['hello',53.4,None], # requires special handling - 'yet_another_id': [None,None,None], # requires special handling - }, - geometry=[LineString([(3, 2), (7, 7)]), - LineString([(3, 7), (7, 2)]), - LineString([(6, 2), (6, 6)])] - ) - - # identify the columns that require special treatment - - if identify_columns: - - # find the columns automatically - - special_columns = None # TODO: reach this statement - - else: - - special_columns = ( - 'abc', - 'ABC', - 'Abc', # no containers but has the same lowercase name as others - 'aBc', - 'aBC', - 'ABc', - # special cases - 'mymymy2', - 'yet_another_id' - ) - - # find the columns automatically - - set_packable_columns = gis_utils.find_gpkg_packable_columns(gdf) - - # make sure the columns can be identified - - for packable_column in set_packable_columns: - - assert packable_column in special_columns - - # write file - - gis_utils.write_gdf_file( - gdf=gdf, - filename=filename, - columns_to_pack=special_columns, - preserve_original=preserve_original_gdf - ) - - new_gdf = gis_utils.read_gdf_file( - filename=filename, - packed_columns=special_columns) - - # verify conformity - - verify_gdf_conformity(gdf, new_gdf, preserve_original_gdf) - - #************************************************************************** - #************************************************************************** - - # TODO: force the methods to throw errors with non-primitive types - - #************************************************************************** - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** - -def examples_gpkg_write_errors(filename_gpkg: str = 'test.gpkg'): - - #************************************************************************** - #************************************************************************** - - type_status = { - int: True, - str: True, - float: True, - bytes: False, - dict: True, # works but comes out incorrectly - set: False, - tuple : False, - list : False, - type(None): True # works but comes out incorrectly - } - - for a_type, a_status in type_status.items(): - - if a_type == int: - - data = [1,2] - - elif a_type == str: - - data = ['hello','goodbye'] - - elif a_type == float: - - data = [3.4,6.7] - - elif a_type == bytes: - - data = [b'\x04\x00',b'\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00'] - - elif a_type == dict: - - data = [{0:1},{'a':53.46}] - - elif a_type == set: - - data = [{0,1},{'a',53.46}] - - elif a_type == tuple: - - data = [(0,1),('a',53.46)] - - elif a_type == list: - - data = [list((0,1)),list(('a',53.46))] - - elif a_type == type(None): - - data = [None, None] - - #********************************************************************** - #********************************************************************** - - # create gdf - - gdf = GeoDataFrame( - { - 'data': data, - 'geometry': [Point(1, 2), Point(3,4)], - }, - index=['a','b'] - ) - - #********************************************************************** - #********************************************************************** - - # verify the status - - if a_status: - - # compatible: no errors are expected - - gdf.to_file(filename_gpkg) - - else: # incompatible: errors are expected - - error_triggered = False - try: - gdf.to_file(filename_gpkg) - except Exception: - error_triggered = True - assert error_triggered - - #************************************************************************** - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** - -def example_discrete_plot_gdf(): - - #************************************************************************** - - G = ox.graph_from_point( - (55.71654,9.11728), - network_type='drive', - custom_filter='["highway"~"residential|tertiary|unclassified|service"]', - truncate_by_edge=True - ) - - #************************************************************************** - - gdf = ox.utils_graph.graph_to_gdfs(G, edges=False) # nodes only - - #************************************************************************** - - # add add random discrete element to gdf - - column = 'discrete_category_column' - - offset = random.randint(0,int(1e3)) - - number_options = 10 - - category = { - idx: random.randint(0,number_options-1)+offset - for idx in gdf.index - } - - set_categories = set(category.values()) - - category_to_label = { - cat: 'label for '+str(cat) - for cat in set_categories - } - - # create column - - gdf[column] = Series(data=category, index=gdf.index) - - #************************************************************************** - - gis_utils.plot_discrete_attributes( - gdf, - column=column, - category_to_label=category_to_label - ) - - #************************************************************************** - -#****************************************************************************** -#****************************************************************************** - -def examples_convert_edge_paths(): - - # define how to validate the method - - def convert_edge_path_validation(edge_path, node_path, true_node_path): - - # check for empty paths - - if len(true_node_path) == 0: - - assert len(edge_path) == 0 - - assert len(node_path) == 0 - - return None - - # assert that all nodes are in the node path - - for node_key in node_path: - - assert node_key in true_node_path - - # assert that there is the same number of nodes - - assert len(node_path) == len(true_node_path) - - # assert that they form the right sequence - - for edge_index, edge_key in enumerate(edge_path): - - assert node_path[edge_index] == edge_key[0] - - assert node_path[-1] == edge_key[1] - - # example 1 - - edge_path = [(1,3),(3,7),(7,4)] - - node_path = gis_utils.convert_edge_path(edge_path) - - true_node_path = [1,3,7,4] - - convert_edge_path_validation(edge_path, node_path, true_node_path) - - # example 2 - - edge_path = [] - - node_path = gis_utils.convert_edge_path(edge_path) - - true_node_path = [] - - convert_edge_path_validation(edge_path, node_path, true_node_path) - -#****************************************************************************** -#****************************************************************************** - -def example_get_directed(network: nx.MultiDiGraph): - - # convert to undirected - - undirected_network = ox.get_undirected(network) - - # convert to directed - - directed_network = gis_utils.get_directed(undirected_network) - - # make sure the same nodes exist on both objects - - for node_key in network.nodes(): - - assert node_key in directed_network.nodes() - - assert network.number_of_nodes() == directed_network.number_of_nodes() - - # assert that all edges on the directed network exist on the undirected one - - assert network.number_of_edges() >= directed_network.number_of_edges() - - for edge_key in directed_network.edges(keys=True): - - # make sure at least one suitable edge exists - - assert network.has_edge(*edge_key) - - # cycle through suitable edges on the original network until the one is - # found that has all the matching attributes and content - - edge_dict = directed_network.edges[edge_key] - - for other_edge_key in gis_iden.get_edges_from_a_to_b( - network, edge_key[0], edge_key[1]): - - # check all attributes - - number_matching_attributes = 0 - - for edge_attr, edge_data in edge_dict.items(): - - try: - assert edge_data == network.edges[other_edge_key][edge_attr] - except AssertionError: - # the data is different - continue - except KeyError: - # the attribute does not exist - continue - # print(edge_key) - # print(other_edge_key) - # print(edge_dict) - # print(network.edges[other_edge_key]) - - # assert False - - number_matching_attributes += 1 - - if number_matching_attributes == len(edge_dict): - - # a compatible edge was found, break - - break - - assert number_matching_attributes == len(edge_dict) - -#****************************************************************************** -#****************************************************************************** \ No newline at end of file