Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
utils.py 37.90 KiB
# imports
from ast import literal_eval
from uuid import uuid4
# from numbers import Real
# local, external
from networkx import MultiDiGraph, MultiGraph
from pandas import MultiIndex, Series
from numpy import float64, int64
from geopandas import GeoDataFrame, read_file
from shapely.geometry import Point
import contextily as cx
# local, internal
from ..gis import osm
from ..gis import identify as gis_iden
from ..gis import modify as gis_mod
from ..gis import calculate as gis_calc
# *****************************************************************************
# *****************************************************************************
# constants
KEY_GPD_CRS = "crs"
KEY_GPD_GEOMETRY = "geometry"
RKW_GPKG = "packed"
# *****************************************************************************
# *****************************************************************************
# TODO: complete method
def find_gpkg_packable_columns(gdf: GeoDataFrame) -> set:
# columns incompatible with GPKG format:
# 1) columns with equivalent lowercase names
# 2) columns of Nones (fiona 1.9.3; appears to work with fiona 1.8.x)
# 3) columns with lists, dicts (keys become strings), tuples and sets
# 4) columns with other types except 'geometry' types in the geometry col.
# 5) columns with multiple types
# packable columns: 1), 2) and 3)
# *************************************************************************
# 1) columns with equivalent lowercase names
lowercase_columns = tuple(column.lower() for column in gdf.columns)
set_columns = set(
column
for column, lccolumn in zip(gdf.columns, lowercase_columns)
if lowercase_columns.count(lccolumn) >= 2
)
# *************************************************************************
# for each column
for column in gdf.columns:
# if the column has already been identified, or if it is the geometry
# one (identified via KEY_GPD_GEOMETRY), skip the current column
if column == KEY_GPD_GEOMETRY or column in set_columns:
continue
# 2) columns of Nones (fiona 1.9.3; appears to work with fiona 1.8.x)
# 3) columns with lists, dicts (keys become strings), tuples and sets
# identify the type of objects in each row
set_types = set(type(gdf.loc[(index, column)]) for index in gdf.index)
# allowed types: int, float, numpy floats
if len(set_types) == 1 and (
str in set_types
or float in set_types
or int in set_types
or bool in set_types
or float64 in set_types
or int64 in set_types
):
# if (len(set_types) == 1 and
# (str in set_types or float in set_types or int in set_types or
# bool in set_types or float64 in set_types or int64 in set_types or
# type(None) in set_types)
# ):
# these are allowed
continue
else:
# two or more different types are not allowed
set_columns.add(column)
# *************************************************************************
return set_columns
# *****************************************************************************
# *****************************************************************************
def write_gdf_file(
gdf: GeoDataFrame,
filename: str,
columns_to_pack: tuple = None,
preserve_original: bool = True,
**kwargs
):
"""
Writes the contents of a GeoDataFrame object into a GIS-compatible file.
The method differs from the GeoDataFrame.to_file() method by allowing
objects with columns whose elements are containers to be written to a file.
For this, it relies on the repr() method. For correctly recognising these
elements while reading the file, the literal_eval() method should be used.
Note that the literal_eval() is not completely safe (e.g., is vulnerable to
denial of service attacks) but does not allow for arbitrary code execution.
Other format rules:
- Missing values in object columns should be specified as None types
Parameters
----------
gdf : GeoDataFrame
The GeoDataFrame object that is to be written to a file.
filename : str
The name of the file to be written.
columns_to_pack : tuple, optional
The names of the columns with container data. If none (None) are provi-
ded, the method will try to identify them. The default is None.
preserve_original : bool, optional
If True, the original GeoDataFrame object is not changed. If False, the
object will be modified, if necessary. The default is True.
**kwargs :
Key-value pairs to be supplied to the GeoDataFrame.to_file() method.
Raises
------
NotImplementedError
Raised if no columns are initially specified.
ValueError
Raised if the columns specified do not exist.
Returns
-------
None.
"""
if preserve_original:
# copy the original (slower)
new_gdf = gdf.copy()
else:
# just point to the original (faster)
new_gdf = gdf
if type(columns_to_pack) != tuple:
# no columns identified, find the columns with containers
# TODO: reach this statement
columns_to_pack = tuple(find_gpkg_packable_columns(gdf))
else:
# focus on specific columns
for column in columns_to_pack:
if column not in new_gdf.columns:
# TODO: reach this statement
raise ValueError("Unknown column: " + str(column))
# handle NaN and other values
# new_gdf[column].fillna("NaN", inplace=True)
# containers have to be transformed into strings
new_gdf[column] = new_gdf[column].apply(lambda x: repr(x))
# format specific limitations
# GPKG: columns with the same lower case equivalent are not allowed
if ".gpkg" in filename: # solution: use reserved words and numbers
# identify incompatible columns
lowercase_columns = tuple(column.lower() for column in gdf.columns)
# place all their contents into one new column
pack_columns(
gdf=new_gdf,
columns=list(
column
for column, lccolumn in zip(gdf.columns, lowercase_columns)
if lowercase_columns.count(lccolumn) >= 2
),
)
# the GeoDataFrame object is ready: write it
new_gdf.to_file(filename, **kwargs)
# *****************************************************************************
# *****************************************************************************
def pack_columns(
gdf: GeoDataFrame,
columns: list,
packed_column_name: str = RKW_GPKG,
convert_to_string: bool = True,
):
"""
Places the contents of multiple GeoDataFrame columns into a single one.
This method is intended to prepare a GeoDataFrame object for I/O, since so-
me file formats (e.g., GPKG) place restrictions on column names. By placing
the contents of various columns into a single one, these can be correctly
unpacked later, provided some conditions are met concerning the contents.
Parameters
----------
gdf : GeoDataFrame
The object with the columns mentioned and possibly more.
columns : list
The columns one wishes to pack.
packed_column_name : str, optional
The name of the column holding the data. The default is RKW_GPKG.
convert_to_string : bool, optional
If True, converts the column into a string. The default is True.
Raises
------
ValueError
Raised if the name for the column holding the data already exists.
Returns
-------
None.
"""
# if only one or no columns are specified, change nothing
if len(columns) <= 1:
return
# if the new column name is pre-existing, raise error
if packed_column_name in gdf.columns:
# TODO: reach this statement
raise ValueError("The desired column name already exists.")
# create a new data dict
data_dict = {
index: {
column: gdf.loc[(index, column)] # gdf[repeated_column].loc[index]
# column: repr(gdf.loc[(index,column)])
for column in columns
}
for index in gdf.index
}
# add a new column
gdf[packed_column_name] = Series(data=data_dict, index=gdf.index)
# convert it to a string, if needed
if convert_to_string:
gdf[packed_column_name] = gdf[packed_column_name].apply(lambda x: repr(x))
# drop original columns
gdf.drop(labels=columns, axis=1, inplace=True)
# *****************************************************************************
# *****************************************************************************
def unpack_columns(gdf: GeoDataFrame, packed_column_name: str = RKW_GPKG):
"""
Unpacks a specific GeoDataFrame column into multiple columns.
This method is intended to allow reading GeoDataFrame data from files, sin-
ce the conventional formats (e.g., GPKG) introduce some restrictions.
Parameters
----------
gdf : GeoDataFrame
The object with the packed column and possibly more.
packed_column_name : str, optional
The name of the column holding the data. The default is RKW_GPKG.
Raises
------
ValueError
Raised if the column specified does not exist.
Returns
-------
None.
"""
if packed_column_name not in gdf.columns:
# TODO: reach this statement
raise ValueError("The column specified does not exist.")
# if there are no rows, there is nothing to unpack
if len(gdf) != 0:
# the object is not empty
# create a dict with one dict per merged column
# each dict corresponds to one packed column, to be keyed with index
column_content_dict = {
merged_column: {
index: gdf.loc[(index, packed_column_name)][merged_column]
# index: gdf[packed_column_name].loc[index][merged_column]
for index in gdf.index
}
for merged_column in gdf[packed_column_name].iloc[0]
}
# create the columns
for name, content in column_content_dict.items():
gdf[name] = Series(data=content, index=gdf.index)
# delete the packed column
gdf.drop(labels=packed_column_name, axis=1, inplace=True)
# *****************************************************************************
# *****************************************************************************
def read_gdf_file(
filename: str, packed_columns: tuple = None, index: str or list = None
) -> GeoDataFrame:
"""
Loads the contents of a file with GIS data into a GeoDataFrame object.
The method differs from the GeoDataFrame.read_file() method by recognising
elements with container data. For this, it relies on the literal_eval() me-
tho, which is not completely safe (e.g., is vulnerable to denial of service
attacks) but does not allow for arbitrary code execution. Note that the li-
teral_eval() method does not allow for every type of object to be read.
Parameters
----------
filename : str
The name of the file to be written.
packed_columns : tuple, optional
The names of the columns with container data. If none (None) are provi-
ded, the method will try to identify them. The default is None.
index : str or list, optional
The index column(s). The default is None, in which case no index will
be selected. Alternatively, the index or MultiIndex given will be used.
Raises
------
NotImplementedError
Raised if the columns with containers are not initially identified.
ValueError
Raised if the columns identified as having containers do not exist.
Returns
-------
GeoDataFrame
The GeoDataFrame object with the data loaded from the file.
"""
gdf = read_file(filename)
# unpack special columns
if ".gpkg" in filename and RKW_GPKG in gdf.columns:
# packed column appears to exist: decode column contents
gdf[RKW_GPKG] = gdf[RKW_GPKG].apply(lambda x: literal_eval(x))
# unpack it
unpack_columns(gdf=gdf, packed_column_name=RKW_GPKG)
# handle types
if type(index) != type(None):
# a specific index is required, replace existing one
gdf.set_index(index, drop=True, inplace=True)
if type(packed_columns) != tuple:
# figure out which ones need it...
# TODO: reach this statement
raise NotImplementedError
# packed_columns = tuple(find_gpkg_packable_columns(gdf))
# focus on specific columns
for column in packed_columns:
if column not in gdf.columns:
# TODO: reach this statement
raise ValueError("Unknown column: " + str(column))
gdf[column] = gdf[column].apply(lambda x: literal_eval(x))
return gdf
# *****************************************************************************
# *****************************************************************************
# create osmnx-like geodataframes for nodes
def create_node_geodataframe(
longitudes: tuple or list,
latitudes: tuple or list,
osmids: tuple or list = None,
crs: str = "EPSG:4326",
**kwargs
) -> GeoDataFrame:
if len(longitudes) != len(latitudes):
raise ValueError("The input parameters have mismatched sizes.")
if type(osmids) != type(None):
# check sizes
if len(longitudes) != len(osmids):
raise ValueError("The input parameters have mismatched sizes.")
else:
# generate node keys
osmids = (str(uuid4()) for i in range(len(longitudes)))
data_dict = {
osm.KEY_OSMNX_GEOMETRY: [
Point(longitude, latitude)
for longitude, latitude in zip(longitudes, latitudes)
],
}
for kwarg in kwargs:
data_dict[kwarg] = kwargs[kwarg]
return GeoDataFrame(
data_dict,
index=MultiIndex.from_tuples(
[("node", osmid) for osmid in osmids],
names=[osm.KEY_OSMNX_ELEMENT_TYPE, osm.KEY_OSMNX_OSMID],
),
crs=crs,
)
# *****************************************************************************
# *****************************************************************************
def prepare_node_data_from_geodataframe(
gdf: GeoDataFrame,
node_key_column: str = None,
include_columns: list = None,
include_geometry: bool = False,
) -> tuple:
"""Prepare a container with node data from a GeoDataFrame object."""
node_keys = []
node_data_container = []
node_key_to_gdf_index_dict = {}
# check if the GeoDataFrame has the right type of index
if gdf.index.names != [osm.KEY_OSMNX_ELEMENT_TYPE, osm.KEY_OSMNX_OSMID]:
raise ValueError("The GeoDataFrame object does not have the right index.")
# for entry in the gdf object
for gdf_entry in range(len(gdf)):
# select key
if type(node_key_column) == str:
# the node_key_column has been specified: use a specific column as key
node_key = gdf.iloc[gdf_entry][node_key_column]
else: # default case: the key is the OSM identifier (should be unique)
# use the OSMID as the node key
node_key = gdf.index[gdf_entry][1]
# select node data
geo = gdf.iloc[gdf_entry][KEY_GPD_GEOMETRY]
node_dict = {osm.KEY_OSMNX_X: geo.x, osm.KEY_OSMNX_Y: geo.y}
# add geometry
if include_geometry:
node_dict[osm.KEY_OSMNX_GEOMETRY] = geo
# add extra columns
if type(include_columns) == list:
for other_column in include_columns:
node_dict[other_column] = gdf.iloc[gdf_entry][other_column]
# create new entry in container
node_data_container.append((node_key, node_dict))
# store node key
node_keys.append(node_key)
# update the dict
node_key_to_gdf_index_dict[node_key] = gdf.index[gdf_entry]
# *************************************************************************
return node_keys, node_data_container, node_key_to_gdf_index_dict
# *****************************************************************************
# *****************************************************************************
# TODO: simplify the passing of options to the methods relied upon
def plot_discrete_attributes(
gdf_buildings: GeoDataFrame,
column: str,
category_to_label: dict,
zoom_level: int = 15,
figsize: tuple = (25, 25),
legend_title: str = None,
markersize: int = 50,
edgecolor: str = "k",
linewidth: float = 0.5,
markeredgewidth: float = 0.5,
markeredgecolor: str = "k",
include_basemap: bool = False,
):
"""Plots a map with discrete attributes found in GeoDataFrame column."""
gdf_map = gdf_buildings.to_crs(epsg=3857)
ax = gdf_map.plot(
figsize=figsize,
legend=True,
categorical=True,
column=column,
markersize=markersize,
edgecolor=edgecolor,
linewidth=linewidth,
)
# adjust legend labels
legend_handles = ax.legend_.legend_handles
for legend_handle in legend_handles:
legend_handle.set_markeredgewidth(markeredgewidth)
legend_handle.set_markeredgecolor(markeredgecolor)
# convert keys to string (since that is what the method asks for)
_category_to_label = {str(key): value for key, value in category_to_label.items()}
legend_texts = [_category_to_label[text.get_text()] for text in ax.legend_.texts]
ax.legend(legend_handles, legend_texts, title=legend_title)
# add base map
if include_basemap:
cx.add_basemap(
ax,
# crs="EPSG:4326", # switch to another crs
zoom=zoom_level,
source=cx.providers.OpenStreetMap.Mapnik,
)
# *****************************************************************************
# *****************************************************************************
def count_ocurrences(
gdf: GeoDataFrame, column: str, column_entries: list = None
) -> dict:
"""
Counts the number of occurrences per entry in a DataFrame object's column.
If a list is provided, only the entries that match those in the list are
counted. If no list is provided, all unique entries are counted.
Parameters
----------
gdf : GeoDataFrame
The object holding the data.
column : str
A string with the name of the column.
column_entries : list, optional
A list with the entries that are to be counted. The default is None, in
which case all the unique entries will be counted.
Returns
-------
dict
A dictionary with the counts whose keys are the values counted.
"""
if type(column_entries) == list:
# find entries also present in the dict
# initialise dict
count_dict = {}
# for each key in the dict
for key in column_entries:
# store the number of rows
count_dict[key] = gdf[gdf[column] == key].shape[0]
# count the number of rows with this key
if type(key) == type(None):
count_dict[key] = gdf[gdf[column].isnull()].shape[0]
else:
count_dict[key] = gdf[gdf[column] == key].shape[0]
else:
# find all unique entries
# initialise dict
count_dict = {}
for entry in gdf[column]:
# check if it is already in the dict
if entry in count_dict:
# it is, skip
continue
# it is not, count and store the number of rows with said entry
if type(entry) == type(None):
count_dict[entry] = gdf[gdf[column].isnull()].shape[0]
else:
count_dict[entry] = gdf[gdf[column] == entry].shape[0]
# return statement
return count_dict
# *****************************************************************************
# *****************************************************************************
def get_directed(
network: MultiGraph, drop_unsimplified_geometries: bool = True
) -> MultiDiGraph:
"""
Converts an OSMnx-formatted MultiGraph object into a MultiDiGraph one.
Parameters
----------
network : MultiGraph
The object describing the multi-edge graph.
drop_unsimplified_geometries : bool, optional
If True, the unsimplified geometries are not included in the directed
graph object. The default is True.
Returns
-------
MultiDiGraph
An object describing the transformed graph.
"""
directed_network = MultiDiGraph()
directed_network.add_nodes_from(network.nodes(data=True))
for edge_key in network.edges(keys=True):
edge_data = dict(network.edges[edge_key])
u = edge_data["from"]
v = edge_data["to"]
edge_data.pop("from")
edge_data.pop("to")
if (
drop_unsimplified_geometries
and osm.KEY_OSMNX_GEOMETRY in edge_data
and len(edge_data[osm.KEY_OSMNX_GEOMETRY].coords) == 2
):
edge_data.pop(osm.KEY_OSMNX_GEOMETRY)
directed_network.add_edge(u_for_edge=u, v_for_edge=v, **edge_data)
return directed_network
# *****************************************************************************
# *****************************************************************************
def simplify_network(
network: MultiDiGraph,
protected_nodes: list,
dead_end_probing_depth: int = 5,
remove_opposite_parallel_edges: bool = False,
update_street_count_per_node: bool = True,
**roundabout_conditions
):
"""
Simplifies a network described in a OSMnx-formatted MultiDiGraph object.
Parameters
----------
network : MultiDiGraph
The object describing the network.
protected_nodes : list
A list with the keys for the nodes that must be preserved.
dead_end_probing_depth: int
The maximum number of nodes a dead end can have to be detectable.
remove_opposite_parallel_edges : bool, optional
If True, longer parallel edges in opposite directions are also removed.
The default is False.
update_street_count_per_node : bool, optional
If True, updates the street count on each node. The default is True.
**roundabout_conditions : keyword and value pairs
The conditions used to define which roundabouts are simplified.
Returns
-------
None.
"""
# 1) remove dead ends (tends to create straight paths)
gis_mod.remove_dead_ends(
network, protected_nodes, max_iterations=dead_end_probing_depth
)
# 2) remove longer parallel edges (tends to create straight paths)
gis_mod.remove_longer_parallel_edges(
network, ignore_edge_directions=remove_opposite_parallel_edges
)
# 3) remove self loops (tends to create straight paths and dead ends)
gis_mod.remove_self_loops(network)
# 4) join segments (can create self-loops)
simplifiable_paths = gis_iden.find_simplifiable_paths(network, protected_nodes)
for path in simplifiable_paths:
gis_mod.replace_path(network, path)
# 4) remove self loops (tends to create straight paths and dead ends)
gis_mod.remove_self_loops(network)
# 5) transform roundabouts into crossroads (can create straight paths)
list_roundabout_nodes = gis_iden.find_roundabouts(network, **roundabout_conditions)
gis_mod.transform_roundabouts_into_crossroads(network, list_roundabout_nodes)
# 6) update street count
if update_street_count_per_node:
gis_calc.update_street_count(network)
# *****************************************************************************
# *****************************************************************************
def identify_building_entrance_edges(
gdf: GeoDataFrame,
gdf_street_column: str,
network: gis_iden.nx.MultiDiGraph,
node_key_to_gdf_index_dict: dict,
crs: str = None,
revert_to_original_crs: bool = False,
) -> tuple:
"""
Identifies the edges that can be linked to special nodes in an OSMnx graph
through a OSMnx-formatted GeoDataFrame object.
The links between nodes and edges are determined by:
- the edge being the closest one to the node;
- the node and edge being associated through a string in the GeoDataFrame.
When a node\'s closest edge cannot be linked to it by a string, the node\'s
string is used to search for suitable alternatives, among which the closest
is selected. If none are available, the closest edge is selected.
Parameters
----------
gdf : GeoDataFrame
The object containg the data that allows nodes to be linked to edges.
The index contains the node keys and a specific column holds the string
that allows it to be linked to edges.
gdf_street_column : str
The name of the column in the GeoDataFrame object.
network : gis_iden.nx.MultiDiGraph
The object describing the graph.
node_key_to_gdf_index_dict : dict
A dictionary linking nodes to indices on the GeoDataFrame.
crs : str, optional
The coordinate reference system to be used. The default is None, which
means it will be automatically determined through OSMnx.
revert_to_original_crs : bool, optional
If True, the geometries will be converted back to the original
coordinate reference system once the main tasks have been completed.
The default is False.
Returns
-------
dict
A dictionary keyed by node and holding the selected edge key.
dict
A dictionary keyed by node and holding the key to its closest edge.
nx.MultiDiGraph
The object for the network used in the method.
"""
# Notes:
# - Each building is expected to have a street name associated with it;
# - If a building does not have a street name associated with it, then the
# edge corresponding to the street must be determined using distances.
# 1) for each node (building entrance), identify the closest edge
# 2) identify which edges identified before cannot be linked back to their
# respective nodes via street names or via (only) one intermediate edge
# 3) for the nodes whose closest edges that cannot be linked back to the no-
# des, find the edges that can, if any, (via their street names) and select
# the closest one among them as a substitute for the closest one in general
# 4) for all other cases, use the closest edge among all
# output: a list of edge keys (one per building entrance)
# exceptions: if a building cannot be linked to an edge key, link it to None
# *************************************************************************
if revert_to_original_crs:
original_crs = network.graph["crs"]
# *************************************************************************
# 1) for each building (entrance), identify the closest edge
node_keys = list(node_key_to_gdf_index_dict.keys())
closest_edge_keys, network = gis_iden.identify_edge_closest_to_node(
network=network, node_keys=node_keys, crs=crs
) # do not revert back to the original yet
# create a dict for the closest edge keys: {node keys: closest edge keys}
building_entrance_edges = dict(zip(node_keys, closest_edge_keys))
_closest_edge_keys_dict = dict(building_entrance_edges)
# *************************************************************************
# 2) identify the nodes that require additional precautions (i.e., those
# that should not be connected to their closest edges)
# the nodes not requiring additional precautions are the following:
# i) those that do not concern buildings (no address);
# ii) those whose closest edge has the same street name as the node;
# iii) those whose closest edge is a nameless intermediate edge that connects
# with another edge which has the same street name as the node (driveway).
# the nodes that require special precautions are:
# iv) those whose closest edges have names that do not match the node's;
# v) those whose closest edges do not have street names and do not lead to
# an edge whose street name matches that of the building address.
# in both cases, the solution is to find edges whose street names match
# those of the node and connect the one that is closest among them. If not
# possible (no edges), then the solution is to connect to the closest edge.
# 2.1) generate a dict with the correspondence between streets and nodes
node_street_names = {
node_key: gdf.loc[node_key_to_gdf_index_dict[node_key]][gdf_street_column]
for node_key in node_keys
}
trouble_nodes = []
for node_key, closest_edge_key in zip(node_keys, closest_edge_keys):
# check if the street name is a string
if type(node_street_names[node_key]) != str:
# not a string, this node is not problematic (case i)
continue
# check if the edge has a name attribute
if osm.KEY_OSMNX_NAME in network.edges[closest_edge_key]:
# edge object has name attribute, check if the street names match
if type(network.edges[closest_edge_key][osm.KEY_OSMNX_NAME]) == str:
# the address is a string
if (
network.edges[closest_edge_key][osm.KEY_OSMNX_NAME]
in node_street_names[node_key]
):
# the street names match, this is not a problematic node (ii)
continue
else:
# the streets names differ, this is a problematic node (iv)
trouble_nodes.append(node_key)
continue
else: # the address is not a string: it should be a list (osmnx)
# if the node street is found among the elements
matching_street_name_found_list = tuple(
_name in node_street_names[node_key]
for _name in network.edges[closest_edge_key][osm.KEY_OSMNX_NAME]
)
if True in matching_street_name_found_list:
# the street names match, this is not a problematic node (ii)
continue
else:
# the streets names differ, this is a problematic node (iv)
trouble_nodes.append(node_key)
continue
# otherwise, the edge is nameless but may not lead to the right street
# get adjacent/neighbouring edges
other_edges = gis_iden.get_edges_involving_node(
network=network, node_key=closest_edge_key[0], include_self_loops=False
)
other_edges.extend(
gis_iden.get_edges_involving_node(
network=network, node_key=closest_edge_key[1], include_self_loops=False
)
)
matching_street_name_found = False
# for each neighbour
for other_edge_key in other_edges:
# check if the current edge is the closest one
if closest_edge_key == other_edge_key:
# it is: skip, since it has already been considered
continue
# check if the current edge has the address/name attribute
if osm.KEY_OSMNX_NAME in network.edges[other_edge_key]:
# it does, now check if it is a string
if type(network.edges[other_edge_key][osm.KEY_OSMNX_NAME]) == str:
# it is, now check if the street names match
if (
network.edges[other_edge_key][osm.KEY_OSMNX_NAME]
in node_street_names[node_key]
):
# an edge with a matching street name was found (iii)
matching_street_name_found = True
break
else:
# if the node street is found among the elements
matching_street_name_found_list = tuple(
_name in node_street_names[node_key]
for _name in network.edges[other_edge_key][osm.KEY_OSMNX_NAME]
)
if True in matching_street_name_found_list:
# the street names match, this node is okay (case iii)
matching_street_name_found = True
break
# check if a matching street name was found among the neighbours
if matching_street_name_found:
# one was, this is not a problematic case (case iii)
continue
# all other cases are problematic: case v
trouble_nodes.append(node_key)
# *************************************************************************
# 3) for the nodes whose closest edges that cannot be linked back to the no-
# des, find the edges that can, if any, (via their street names) and select
# the closest one among them as a substitute for the closest one in general
# 3.1) generate the list of edge keys per street
unique_street_names = set(node_street_names[node_key] for node_key in trouble_nodes)
# edge keys with a given street name
edges_per_street_name = {
street_name: [
edge_key
for edge_key in network.edges(keys=True)
if osm.KEY_OSMNX_NAME in network.edges[edge_key]
if street_name in network.edges[edge_key][osm.KEY_OSMNX_NAME]
]
for street_name in unique_street_names
}
# 3.2) for each troublesome node, identify the edges that mention the same
# street and pick the closest on
for node_key in trouble_nodes:
# check the edges keys relevant for this node
other_edge_keys = edges_per_street_name[node_street_names[node_key]]
# check if there are no edges mentioning the street
if len(other_edge_keys) == 0:
# no edges mentioning that street, skip
continue
# create a view
new_network = network.edge_subgraph(edges=other_edge_keys)
# pick the one that is closest
other_closest_edge = gis_iden.nearest_edges(
new_network,
X=network.nodes[node_key][osm.KEY_OSMNX_X],
Y=network.nodes[node_key][osm.KEY_OSMNX_Y],
return_dist=False,
)
# replace previous entry
building_entrance_edges[node_key] = other_closest_edge
# *************************************************************************
# 4) for all other cases, use the closest edge among all
# *************************************************************************
# revert network crs back to the original, if necessary
if revert_to_original_crs:
network = gis_iden.project_graph(network, to_crs=original_crs)
# return edge keys
return building_entrance_edges, _closest_edge_keys_dict, network
# *************************************************************************
# *************************************************************************
# *****************************************************************************
# *****************************************************************************
def convert_edge_path(
network: MultiDiGraph, path: list, allow_reversed_edges: bool = False
) -> list:
"""
Converts a path of edge keys into a path of node keys.
Parameters
----------
network : nx.MultiDiGraph
The objet describing the network.
path : list
A list of sequential edge keys that form a path.
allow_reversed_edges : bool, optional
If True, edges in the opposite direction also count to form paths, as
long as the same nodes are involved. The default is False.
Returns
-------
list
A list of node keys forming a path.
"""
# check if the path corresponds to an edge path
if not gis_iden.is_edge_path(
network, path, ignore_edge_direction=allow_reversed_edges
):
raise ValueError("No edge path was provided.")
# path is a sequence of edge keys: convert to node path
if allow_reversed_edges:
# reverse edges are allowed
# drop self-loops, if any
edge_path = [
edge_key
for edge_key in path
if edge_key[0] != edge_key[1] # exclude self loops
]
# if there is only one edge, the node path is straightforward
if len(edge_path) == 1:
return [edge_path[0][0], edge_path[0][1]]
node_path = []
for edge_index, edge_key in enumerate(edge_path):
# if there are no nodes yet on the path
if len(node_path) == 0:
# find out which node comes first
if edge_key[0] in edge_path[1]:
# the start node is in the second edge too: reversed
node_path.append(edge_key[1])
node_path.append(edge_key[0])
else: # the edge is not reversed
node_path.append(edge_key[0])
node_path.append(edge_key[1])
else:
# find out which node comes after the previous node
if node_path[-1] == edge_key[0]:
# the start node is the same as the previous node
node_path.append(edge_key[1])
else:
# the end node is the same as the previous node
node_path.append(edge_key[0])
else:
# no reversed edges
node_path = [
edge_key[0]
for edge_key in path
if edge_key[0] != edge_key[1] # exclude self loops
]
# add the last edge's end node
node_path.append(path[-1][1])
# return statement
return node_path
# *****************************************************************************
# *****************************************************************************