Skip to content
Snippets Groups Projects
Select Git revision
  • bc6335c7d1ae8004105a8fcf37bedb73cae186e1
  • master default protected
2 results

utils.py

Blame
  • user avatar
    Pedro L. Magalhães authored
    bc6335c7
    History
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    utils.py 37.90 KiB
    # imports
    
    from ast import literal_eval
    from uuid import uuid4
    
    # from numbers import Real
    
    # local, external
    
    from networkx import MultiDiGraph, MultiGraph
    from pandas import MultiIndex, Series
    from numpy import float64, int64
    from geopandas import GeoDataFrame, read_file
    from shapely.geometry import Point
    import contextily as cx
    
    # local, internal
    
    from ..gis import osm
    from ..gis import identify as gis_iden
    from ..gis import modify as gis_mod
    from ..gis import calculate as gis_calc
    
    # *****************************************************************************
    # *****************************************************************************
    
    # constants
    
    KEY_GPD_CRS = "crs"
    KEY_GPD_GEOMETRY = "geometry"
    
    RKW_GPKG = "packed"
    
    # *****************************************************************************
    # *****************************************************************************
    
    # TODO: complete method
    
    
    def find_gpkg_packable_columns(gdf: GeoDataFrame) -> set:
        # columns incompatible with GPKG format:
        # 1) columns with equivalent lowercase names
        # 2) columns of Nones (fiona 1.9.3; appears to work with fiona 1.8.x)
        # 3) columns with lists, dicts (keys become strings), tuples and sets
        # 4) columns with other types except 'geometry' types in the geometry col.
        # 5) columns with multiple types
    
        # packable columns: 1), 2) and 3)
    
        # *************************************************************************
    
        # 1) columns with equivalent lowercase names
    
        lowercase_columns = tuple(column.lower() for column in gdf.columns)
    
        set_columns = set(
            column
            for column, lccolumn in zip(gdf.columns, lowercase_columns)
            if lowercase_columns.count(lccolumn) >= 2
        )
    
        # *************************************************************************
    
        # for each column
    
        for column in gdf.columns:
            # if the column has already been identified, or if it is the geometry
            # one (identified via KEY_GPD_GEOMETRY), skip the current column
    
            if column == KEY_GPD_GEOMETRY or column in set_columns:
                continue
    
            # 2) columns of Nones (fiona 1.9.3; appears to work with fiona 1.8.x)
            # 3) columns with lists, dicts (keys become strings), tuples and sets
    
            # identify the type of objects in each row
    
            set_types = set(type(gdf.loc[(index, column)]) for index in gdf.index)
    
            # allowed types: int, float, numpy floats
    
            if len(set_types) == 1 and (
                str in set_types
                or float in set_types
                or int in set_types
                or bool in set_types
                or float64 in set_types
                or int64 in set_types
            ):
                # if (len(set_types) == 1 and
                #     (str in set_types or float in set_types or int in set_types or
                #      bool in set_types or float64 in set_types or int64 in set_types or
                #      type(None) in set_types)
                #     ):
    
                # these are allowed
    
                continue
    
            else:
                # two or more different types are not allowed
    
                set_columns.add(column)
    
        # *************************************************************************
    
        return set_columns
    
    
    # *****************************************************************************
    # *****************************************************************************
    
    
    def write_gdf_file(
        gdf: GeoDataFrame,
        filename: str,
        columns_to_pack: tuple = None,
        preserve_original: bool = True,
        **kwargs
    ):
        """
        Writes the contents of a GeoDataFrame object into a GIS-compatible file.
    
        The method differs from the GeoDataFrame.to_file() method by allowing
        objects with columns whose elements are containers to be written to a file.
        For this, it relies on the repr() method. For correctly recognising these
        elements while reading the file, the literal_eval() method should be used.
        Note that the literal_eval() is not completely safe (e.g., is vulnerable to
        denial of service attacks) but does not allow for arbitrary code execution.
    
        Other format rules:
            - Missing values in object columns should be specified as None types
    
        Parameters
        ----------
        gdf : GeoDataFrame
            The GeoDataFrame object that is to be written to a file.
        filename : str
            The name of the file to be written.
        columns_to_pack : tuple, optional
            The names of the columns with container data. If none (None) are provi-
            ded, the method will try to identify them. The default is None.
        preserve_original : bool, optional
            If True, the original GeoDataFrame object is not changed. If False, the
            object will be modified, if necessary. The default is True.
        **kwargs :
            Key-value pairs to be supplied to the GeoDataFrame.to_file() method.
    
        Raises
        ------
        NotImplementedError
            Raised if no columns are initially specified.
        ValueError
            Raised if the columns specified do not exist.
    
        Returns
        -------
        None.
    
        """
        if preserve_original:
            # copy the original (slower)
    
            new_gdf = gdf.copy()
    
        else:
            # just point to the original (faster)
    
            new_gdf = gdf
    
        if type(columns_to_pack) != tuple:
            # no columns identified, find the columns with containers
            # TODO: reach this statement
            columns_to_pack = tuple(find_gpkg_packable_columns(gdf))
    
        else:
            # focus on specific columns
    
            for column in columns_to_pack:
                if column not in new_gdf.columns:
                    # TODO: reach this statement
                    raise ValueError("Unknown column: " + str(column))
    
                # handle NaN and other values
    
                # new_gdf[column].fillna("NaN", inplace=True)
    
                # containers have to be transformed into strings
    
                new_gdf[column] = new_gdf[column].apply(lambda x: repr(x))
    
            # format specific limitations
    
            # GPKG: columns with the same lower case equivalent are not allowed
    
            if ".gpkg" in filename:  # solution: use reserved words and numbers
                # identify incompatible columns
    
                lowercase_columns = tuple(column.lower() for column in gdf.columns)
    
                # place all their contents into one new column
    
                pack_columns(
                    gdf=new_gdf,
                    columns=list(
                        column
                        for column, lccolumn in zip(gdf.columns, lowercase_columns)
                        if lowercase_columns.count(lccolumn) >= 2
                    ),
                )
    
        # the GeoDataFrame object is ready: write it
    
        new_gdf.to_file(filename, **kwargs)
    
    
    # *****************************************************************************
    # *****************************************************************************
    
    
    def pack_columns(
        gdf: GeoDataFrame,
        columns: list,
        packed_column_name: str = RKW_GPKG,
        convert_to_string: bool = True,
    ):
        """
        Places the contents of multiple GeoDataFrame columns into a single one.
    
        This method is intended to prepare a GeoDataFrame object for I/O, since so-
        me file formats (e.g., GPKG) place restrictions on column names. By placing
        the contents of various columns into a single one, these can be correctly
        unpacked later, provided some conditions are met concerning the contents.
    
        Parameters
        ----------
        gdf : GeoDataFrame
            The object with the columns mentioned and possibly more.
        columns : list
            The columns one wishes to pack.
        packed_column_name : str, optional
            The name of the column holding the data. The default is RKW_GPKG.
        convert_to_string : bool, optional
            If True, converts the column into a string. The default is True.
    
        Raises
        ------
        ValueError
            Raised if the name for the column holding the data already exists.
    
        Returns
        -------
        None.
    
        """
        # if only one or no columns are specified, change nothing
    
        if len(columns) <= 1:
            return
    
        # if the new column name is pre-existing, raise error
    
        if packed_column_name in gdf.columns:
            # TODO: reach this statement
            raise ValueError("The desired column name already exists.")
    
        # create a new data dict
    
        data_dict = {
            index: {
                column: gdf.loc[(index, column)]  # gdf[repeated_column].loc[index]
                # column: repr(gdf.loc[(index,column)])
                for column in columns
            }
            for index in gdf.index
        }
    
        # add a new column
    
        gdf[packed_column_name] = Series(data=data_dict, index=gdf.index)
    
        # convert it to a string, if needed
    
        if convert_to_string:
            gdf[packed_column_name] = gdf[packed_column_name].apply(lambda x: repr(x))
    
        # drop original columns
    
        gdf.drop(labels=columns, axis=1, inplace=True)
    
    
    # *****************************************************************************
    # *****************************************************************************
    
    
    def unpack_columns(gdf: GeoDataFrame, packed_column_name: str = RKW_GPKG):
        """
        Unpacks a specific GeoDataFrame column into multiple columns.
    
        This method is intended to allow reading GeoDataFrame data from files, sin-
        ce the conventional formats (e.g., GPKG) introduce some restrictions.
    
        Parameters
        ----------
        gdf : GeoDataFrame
            The object with the packed column and possibly more.
        packed_column_name : str, optional
            The name of the column holding the data. The default is RKW_GPKG.
    
        Raises
        ------
        ValueError
            Raised if the column specified does not exist.
    
        Returns
        -------
        None.
    
        """
        if packed_column_name not in gdf.columns:
            # TODO: reach this statement
            raise ValueError("The column specified does not exist.")
    
        # if there are no rows, there is nothing to unpack
    
        if len(gdf) != 0:
            # the object is not empty
    
            # create a dict with one dict per merged column
            # each dict corresponds to one packed column, to be keyed with index
    
            column_content_dict = {
                merged_column: {
                    index: gdf.loc[(index, packed_column_name)][merged_column]
                    # index: gdf[packed_column_name].loc[index][merged_column]
                    for index in gdf.index
                }
                for merged_column in gdf[packed_column_name].iloc[0]
            }
    
            # create the columns
    
            for name, content in column_content_dict.items():
                gdf[name] = Series(data=content, index=gdf.index)
    
        # delete the packed column
    
        gdf.drop(labels=packed_column_name, axis=1, inplace=True)
    
    
    # *****************************************************************************
    # *****************************************************************************
    
    
    def read_gdf_file(
        filename: str, packed_columns: tuple = None, index: str or list = None
    ) -> GeoDataFrame:
        """
        Loads the contents of a file with GIS data into a GeoDataFrame object.
    
        The method differs from the GeoDataFrame.read_file() method by recognising
        elements with container data. For this, it relies on the literal_eval() me-
        tho, which is not completely safe (e.g., is vulnerable to denial of service
        attacks) but does not allow for arbitrary code execution. Note that the li-
        teral_eval() method does not allow for every type of object to be read.
    
        Parameters
        ----------
        filename : str
            The name of the file to be written.
        packed_columns : tuple, optional
            The names of the columns with container data. If none (None) are provi-
            ded, the method will try to identify them. The default is None.
        index : str or list, optional
            The index column(s). The default is None, in which case no index will
            be selected. Alternatively, the index or MultiIndex given will be used.
    
        Raises
        ------
        NotImplementedError
            Raised if the columns with containers are not initially identified.
        ValueError
            Raised if the columns identified as having containers do not exist.
    
        Returns
        -------
        GeoDataFrame
            The GeoDataFrame object with the data loaded from the file.
    
        """
        gdf = read_file(filename)
    
        # unpack special columns
    
        if ".gpkg" in filename and RKW_GPKG in gdf.columns:
            # packed column appears to exist: decode column contents
    
            gdf[RKW_GPKG] = gdf[RKW_GPKG].apply(lambda x: literal_eval(x))
    
            # unpack it
    
            unpack_columns(gdf=gdf, packed_column_name=RKW_GPKG)
    
        # handle types
    
        if type(index) != type(None):
            # a specific index is required, replace existing one
    
            gdf.set_index(index, drop=True, inplace=True)
    
        if type(packed_columns) != tuple:
            # figure out which ones need it...
            # TODO: reach this statement
            raise NotImplementedError
    
            # packed_columns = tuple(find_gpkg_packable_columns(gdf))
    
        # focus on specific columns
    
        for column in packed_columns:
            if column not in gdf.columns:
                # TODO: reach this statement
                raise ValueError("Unknown column: " + str(column))
    
            gdf[column] = gdf[column].apply(lambda x: literal_eval(x))
    
        return gdf
    
    
    # *****************************************************************************
    # *****************************************************************************
    
    # create osmnx-like geodataframes for nodes
    
    
    def create_node_geodataframe(
        longitudes: tuple or list,
        latitudes: tuple or list,
        osmids: tuple or list = None,
        crs: str = "EPSG:4326",
        **kwargs
    ) -> GeoDataFrame:
        if len(longitudes) != len(latitudes):
            raise ValueError("The input parameters have mismatched sizes.")
    
        if type(osmids) != type(None):
            # check sizes
    
            if len(longitudes) != len(osmids):
                raise ValueError("The input parameters have mismatched sizes.")
    
        else:
            # generate node keys
    
            osmids = (str(uuid4()) for i in range(len(longitudes)))
    
        data_dict = {
            osm.KEY_OSMNX_GEOMETRY: [
                Point(longitude, latitude)
                for longitude, latitude in zip(longitudes, latitudes)
            ],
        }
    
        for kwarg in kwargs:
            data_dict[kwarg] = kwargs[kwarg]
    
        return GeoDataFrame(
            data_dict,
            index=MultiIndex.from_tuples(
                [("node", osmid) for osmid in osmids],
                names=[osm.KEY_OSMNX_ELEMENT_TYPE, osm.KEY_OSMNX_OSMID],
            ),
            crs=crs,
        )
    
    
    # *****************************************************************************
    # *****************************************************************************
    
    
    def prepare_node_data_from_geodataframe(
        gdf: GeoDataFrame,
        node_key_column: str = None,
        include_columns: list = None,
        include_geometry: bool = False,
    ) -> tuple:
        """Prepare a container with node data from a GeoDataFrame object."""
    
        node_keys = []
    
        node_data_container = []
    
        node_key_to_gdf_index_dict = {}
    
        # check if the GeoDataFrame has the right type of index
    
        if gdf.index.names != [osm.KEY_OSMNX_ELEMENT_TYPE, osm.KEY_OSMNX_OSMID]:
            raise ValueError("The GeoDataFrame object does not have the right index.")
    
        # for entry in the gdf object
    
        for gdf_entry in range(len(gdf)):
            # select key
    
            if type(node_key_column) == str:
                # the node_key_column has been specified: use a specific column as key
    
                node_key = gdf.iloc[gdf_entry][node_key_column]
    
            else:  # default case: the key is the OSM identifier (should be unique)
                # use the OSMID as the node key
    
                node_key = gdf.index[gdf_entry][1]
    
            # select node data
    
            geo = gdf.iloc[gdf_entry][KEY_GPD_GEOMETRY]
    
            node_dict = {osm.KEY_OSMNX_X: geo.x, osm.KEY_OSMNX_Y: geo.y}
    
            # add geometry
    
            if include_geometry:
                node_dict[osm.KEY_OSMNX_GEOMETRY] = geo
    
            # add extra columns
    
            if type(include_columns) == list:
                for other_column in include_columns:
                    node_dict[other_column] = gdf.iloc[gdf_entry][other_column]
    
            # create new entry in container
    
            node_data_container.append((node_key, node_dict))
    
            # store node key
    
            node_keys.append(node_key)
    
            # update the dict
    
            node_key_to_gdf_index_dict[node_key] = gdf.index[gdf_entry]
    
        # *************************************************************************
    
        return node_keys, node_data_container, node_key_to_gdf_index_dict
    
    
    # *****************************************************************************
    # *****************************************************************************
    
    # TODO: simplify the passing of options to the methods relied upon
    
    
    def plot_discrete_attributes(
        gdf_buildings: GeoDataFrame,
        column: str,
        category_to_label: dict,
        zoom_level: int = 15,
        figsize: tuple = (25, 25),
        legend_title: str = None,
        markersize: int = 50,
        edgecolor: str = "k",
        linewidth: float = 0.5,
        markeredgewidth: float = 0.5,
        markeredgecolor: str = "k",
        include_basemap: bool = False,
    ):
        """Plots a map with discrete attributes found in GeoDataFrame column."""
    
        gdf_map = gdf_buildings.to_crs(epsg=3857)
    
        ax = gdf_map.plot(
            figsize=figsize,
            legend=True,
            categorical=True,
            column=column,
            markersize=markersize,
            edgecolor=edgecolor,
            linewidth=linewidth,
        )
    
        # adjust legend labels
    
        legend_handles = ax.legend_.legend_handles
    
        for legend_handle in legend_handles:
            legend_handle.set_markeredgewidth(markeredgewidth)
            legend_handle.set_markeredgecolor(markeredgecolor)
    
        # convert keys to string (since that is what the method asks for)
    
        _category_to_label = {str(key): value for key, value in category_to_label.items()}
    
        legend_texts = [_category_to_label[text.get_text()] for text in ax.legend_.texts]
    
        ax.legend(legend_handles, legend_texts, title=legend_title)
    
        # add base map
        if include_basemap:
            cx.add_basemap(
                ax,
                # crs="EPSG:4326", # switch to another crs
                zoom=zoom_level,
                source=cx.providers.OpenStreetMap.Mapnik,
            )
    
    
    # *****************************************************************************
    # *****************************************************************************
    
    
    def count_ocurrences(
        gdf: GeoDataFrame, column: str, column_entries: list = None
    ) -> dict:
        """
        Counts the number of occurrences per entry in a DataFrame object's column.
    
        If a list is provided, only the entries that match those in the list are
        counted. If no list is provided, all unique entries are counted.
    
        Parameters
        ----------
        gdf : GeoDataFrame
            The object holding the data.
        column : str
            A string with the name of the column.
        column_entries : list, optional
            A list with the entries that are to be counted. The default is None, in
            which case all the unique entries will be counted.
    
        Returns
        -------
        dict
            A dictionary with the counts whose keys are the values counted.
    
        """
    
        if type(column_entries) == list:
            # find entries also present in the dict
    
            # initialise dict
    
            count_dict = {}
    
            # for each key in the dict
    
            for key in column_entries:
                # store the number of rows
    
                count_dict[key] = gdf[gdf[column] == key].shape[0]
    
                # count the number of rows with this key
    
                if type(key) == type(None):
                    count_dict[key] = gdf[gdf[column].isnull()].shape[0]
    
                else:
                    count_dict[key] = gdf[gdf[column] == key].shape[0]
    
        else:
            # find all unique entries
    
            # initialise dict
    
            count_dict = {}
    
            for entry in gdf[column]:
                # check if it is already in the dict
    
                if entry in count_dict:
                    # it is, skip
    
                    continue
    
                # it is not, count and store the number of rows with said entry
    
                if type(entry) == type(None):
                    count_dict[entry] = gdf[gdf[column].isnull()].shape[0]
    
                else:
                    count_dict[entry] = gdf[gdf[column] == entry].shape[0]
    
        # return statement
    
        return count_dict
    
    
    # *****************************************************************************
    # *****************************************************************************
    
    
    def get_directed(
        network: MultiGraph, drop_unsimplified_geometries: bool = True
    ) -> MultiDiGraph:
        """
        Converts an OSMnx-formatted MultiGraph object into a MultiDiGraph one.
    
        Parameters
        ----------
        network : MultiGraph
            The object describing the multi-edge graph.
        drop_unsimplified_geometries : bool, optional
            If True, the unsimplified geometries are not included in the directed
            graph object. The default is True.
    
        Returns
        -------
        MultiDiGraph
            An object describing the transformed graph.
    
        """
    
        directed_network = MultiDiGraph()
    
        directed_network.add_nodes_from(network.nodes(data=True))
    
        for edge_key in network.edges(keys=True):
            edge_data = dict(network.edges[edge_key])
            u = edge_data["from"]
            v = edge_data["to"]
            edge_data.pop("from")
            edge_data.pop("to")
    
            if (
                drop_unsimplified_geometries
                and osm.KEY_OSMNX_GEOMETRY in edge_data
                and len(edge_data[osm.KEY_OSMNX_GEOMETRY].coords) == 2
            ):
                edge_data.pop(osm.KEY_OSMNX_GEOMETRY)
    
            directed_network.add_edge(u_for_edge=u, v_for_edge=v, **edge_data)
    
        return directed_network
    
    
    # *****************************************************************************
    # *****************************************************************************
    
    
    def simplify_network(
        network: MultiDiGraph,
        protected_nodes: list,
        dead_end_probing_depth: int = 5,
        remove_opposite_parallel_edges: bool = False,
        update_street_count_per_node: bool = True,
        **roundabout_conditions
    ):
        """
        Simplifies a network described in a OSMnx-formatted MultiDiGraph object.
    
        Parameters
        ----------
        network : MultiDiGraph
            The object describing the network.
        protected_nodes : list
            A list with the keys for the nodes that must be preserved.
        dead_end_probing_depth: int
            The maximum number of nodes a dead end can have to be detectable.
        remove_opposite_parallel_edges : bool, optional
            If True, longer parallel edges in opposite directions are also removed.
            The default is False.
        update_street_count_per_node : bool, optional
            If True, updates the street count on each node. The default is True.
        **roundabout_conditions : keyword and value pairs
            The conditions used to define which roundabouts are simplified.
    
        Returns
        -------
        None.
    
        """
    
        # 1) remove dead ends (tends to create straight paths)
        gis_mod.remove_dead_ends(
            network, protected_nodes, max_iterations=dead_end_probing_depth
        )
        # 2) remove longer parallel edges (tends to create straight paths)
        gis_mod.remove_longer_parallel_edges(
            network, ignore_edge_directions=remove_opposite_parallel_edges
        )
        # 3) remove self loops (tends to create straight paths and dead ends)
        gis_mod.remove_self_loops(network)
        # 4) join segments (can create self-loops)
        simplifiable_paths = gis_iden.find_simplifiable_paths(network, protected_nodes)
        for path in simplifiable_paths:
            gis_mod.replace_path(network, path)
        # 4) remove self loops (tends to create straight paths and dead ends)
        gis_mod.remove_self_loops(network)
        # 5) transform roundabouts into crossroads (can create straight paths)
        list_roundabout_nodes = gis_iden.find_roundabouts(network, **roundabout_conditions)
        gis_mod.transform_roundabouts_into_crossroads(network, list_roundabout_nodes)
        # 6) update street count
        if update_street_count_per_node:
            gis_calc.update_street_count(network)
    
    
    # *****************************************************************************
    # *****************************************************************************
    
    
    def identify_building_entrance_edges(
        gdf: GeoDataFrame,
        gdf_street_column: str,
        network: gis_iden.nx.MultiDiGraph,
        node_key_to_gdf_index_dict: dict,
        crs: str = None,
        revert_to_original_crs: bool = False,
    ) -> tuple:
        """
        Identifies the edges that can be linked to special nodes in an OSMnx graph
        through a OSMnx-formatted GeoDataFrame object.
    
        The links between nodes and edges are determined by:
        - the edge being the closest one to the node;
        - the node and edge being associated through a string in the GeoDataFrame.
    
        When a node\'s closest edge cannot be linked to it by a string, the node\'s
        string is used to search for suitable alternatives, among which the closest
        is selected. If none are available, the closest edge is selected.
    
        Parameters
        ----------
        gdf : GeoDataFrame
            The object containg the data that allows nodes to be linked to edges.
            The index contains the node keys and a specific column holds the string
            that allows it to be linked to edges.
        gdf_street_column : str
            The name of the column in the GeoDataFrame object.
        network : gis_iden.nx.MultiDiGraph
            The object describing the graph.
        node_key_to_gdf_index_dict : dict
            A dictionary linking nodes to indices on the GeoDataFrame.
        crs : str, optional
            The coordinate reference system to be used. The default is None, which
            means it will be automatically determined through OSMnx.
        revert_to_original_crs : bool, optional
            If True, the geometries will be converted back to the original
            coordinate reference system once the main tasks have been completed.
            The default is False.
    
        Returns
        -------
        dict
            A dictionary keyed by node and holding the selected edge key.
        dict
            A dictionary keyed by node and holding the key to its closest edge.
        nx.MultiDiGraph
            The object for the network used in the method.
    
        """
    
        # Notes:
        # - Each building is expected to have a street name associated with it;
        # - If a building does not have a street name associated with it, then the
        # edge corresponding to the street must be determined using distances.
    
        # 1) for each node (building entrance), identify the closest edge
        # 2) identify which edges identified before cannot be linked back to their
        # respective nodes via street names or via (only) one intermediate edge
        # 3) for the nodes whose closest edges that cannot be linked back to the no-
        # des, find the edges that can, if any, (via their street names) and select
        # the closest one among them as a substitute for the closest one in general
        # 4) for all other cases, use the closest edge among all
    
        # output: a list of edge keys (one per building entrance)
        # exceptions: if a building cannot be linked to an edge key, link it to None
    
        # *************************************************************************
    
        if revert_to_original_crs:
            original_crs = network.graph["crs"]
    
        # *************************************************************************
    
        # 1) for each building (entrance), identify the closest edge
    
        node_keys = list(node_key_to_gdf_index_dict.keys())
        closest_edge_keys, network = gis_iden.identify_edge_closest_to_node(
            network=network, node_keys=node_keys, crs=crs
        )  # do not revert back to the original yet
    
        # create a dict for the closest edge keys: {node keys: closest edge keys}
    
        building_entrance_edges = dict(zip(node_keys, closest_edge_keys))
    
        _closest_edge_keys_dict = dict(building_entrance_edges)
    
        # *************************************************************************
    
        # 2) identify the nodes that require additional precautions (i.e., those
        # that should not be connected to their closest edges)
    
        # the nodes not requiring additional precautions are the following:
        # i) those that do not concern buildings (no address);
        # ii) those whose closest edge has the same street name as the node;
        # iii) those whose closest edge is a nameless intermediate edge that connects
        # with another edge which has  the same street name as the node (driveway).
    
        # the nodes that require special precautions are:
        # iv) those whose closest edges have names that do not match the node's;
        # v) those whose closest edges do not have street names and do not lead to
        # an edge whose street name matches that of the building address.
    
        # in both cases, the solution is to find edges whose street names match
        # those of the node and connect the one that is closest among them. If not
        # possible (no edges), then the solution is to connect to the closest edge.
    
        # 2.1) generate a dict with the correspondence between streets and nodes
    
        node_street_names = {
            node_key: gdf.loc[node_key_to_gdf_index_dict[node_key]][gdf_street_column]
            for node_key in node_keys
        }
    
        trouble_nodes = []
    
        for node_key, closest_edge_key in zip(node_keys, closest_edge_keys):
            # check if the street name is a string
    
            if type(node_street_names[node_key]) != str:
                # not a string, this node is not problematic (case i)
    
                continue
    
            # check if the edge has a name attribute
    
            if osm.KEY_OSMNX_NAME in network.edges[closest_edge_key]:
                # edge object has name attribute, check if the street names match
    
                if type(network.edges[closest_edge_key][osm.KEY_OSMNX_NAME]) == str:
                    # the address is a string
    
                    if (
                        network.edges[closest_edge_key][osm.KEY_OSMNX_NAME]
                        in node_street_names[node_key]
                    ):
                        # the street names match, this is not a problematic node (ii)
    
                        continue
    
                    else:
                        # the streets names differ, this is a problematic node (iv)
    
                        trouble_nodes.append(node_key)
    
                        continue
    
                else:  # the address is not a string: it should be a list (osmnx)
                    # if the node street is found among the elements
    
                    matching_street_name_found_list = tuple(
                        _name in node_street_names[node_key]
                        for _name in network.edges[closest_edge_key][osm.KEY_OSMNX_NAME]
                    )
    
                    if True in matching_street_name_found_list:
                        # the street names match, this is not a problematic node (ii)
    
                        continue
    
                    else:
                        # the streets names differ, this is a problematic node (iv)
    
                        trouble_nodes.append(node_key)
    
                        continue
    
            # otherwise, the edge is nameless but may not lead to the right street
    
            # get adjacent/neighbouring edges
            other_edges = gis_iden.get_edges_involving_node(
                network=network, node_key=closest_edge_key[0], include_self_loops=False
            )
            other_edges.extend(
                gis_iden.get_edges_involving_node(
                    network=network, node_key=closest_edge_key[1], include_self_loops=False
                )
            )
    
            matching_street_name_found = False
    
            # for each neighbour
    
            for other_edge_key in other_edges:
                # check if the current edge is the closest one
    
                if closest_edge_key == other_edge_key:
                    # it is: skip, since it has already been considered
    
                    continue
    
                # check if the current edge has the address/name attribute
    
                if osm.KEY_OSMNX_NAME in network.edges[other_edge_key]:
                    # it does, now check if it is a string
    
                    if type(network.edges[other_edge_key][osm.KEY_OSMNX_NAME]) == str:
                        # it is, now check if the street names match
    
                        if (
                            network.edges[other_edge_key][osm.KEY_OSMNX_NAME]
                            in node_street_names[node_key]
                        ):
                            # an edge with a matching street name was found (iii)
    
                            matching_street_name_found = True
    
                            break
    
                    else:
                        # if the node street is found among the elements
    
                        matching_street_name_found_list = tuple(
                            _name in node_street_names[node_key]
                            for _name in network.edges[other_edge_key][osm.KEY_OSMNX_NAME]
                        )
    
                        if True in matching_street_name_found_list:
                            # the street names match, this node is okay (case iii)
    
                            matching_street_name_found = True
    
                            break
    
            # check if a matching street name was found among the neighbours
    
            if matching_street_name_found:
                # one was, this is not a problematic case (case iii)
    
                continue
    
            # all other cases are problematic: case v
    
            trouble_nodes.append(node_key)
    
        # *************************************************************************
    
        # 3) for the nodes whose closest edges that cannot be linked back to the no-
        # des, find the edges that can, if any, (via their street names) and select
        # the closest one among them as a substitute for the closest one in general
    
        # 3.1) generate the list of edge keys per street
    
        unique_street_names = set(node_street_names[node_key] for node_key in trouble_nodes)
    
        # edge keys with a given street name
    
        edges_per_street_name = {
            street_name: [
                edge_key
                for edge_key in network.edges(keys=True)
                if osm.KEY_OSMNX_NAME in network.edges[edge_key]
                if street_name in network.edges[edge_key][osm.KEY_OSMNX_NAME]
            ]
            for street_name in unique_street_names
        }
    
        # 3.2) for each troublesome node, identify the edges that mention the same
        # street and pick the closest on
    
        for node_key in trouble_nodes:
            # check the edges keys relevant for this node
    
            other_edge_keys = edges_per_street_name[node_street_names[node_key]]
    
            # check if there are no edges mentioning the street
    
            if len(other_edge_keys) == 0:
                # no edges mentioning that street, skip
    
                continue
    
            # create a view
    
            new_network = network.edge_subgraph(edges=other_edge_keys)
    
            # pick the one that is closest
    
            other_closest_edge = gis_iden.nearest_edges(
                new_network,
                X=network.nodes[node_key][osm.KEY_OSMNX_X],
                Y=network.nodes[node_key][osm.KEY_OSMNX_Y],
                return_dist=False,
            )
    
            # replace previous entry
    
            building_entrance_edges[node_key] = other_closest_edge
    
        # *************************************************************************
    
        # 4) for all other cases, use the closest edge among all
    
        # *************************************************************************
    
        # revert network crs back to the original, if necessary
    
        if revert_to_original_crs:
            network = gis_iden.project_graph(network, to_crs=original_crs)
    
        # return edge keys
    
        return building_entrance_edges, _closest_edge_keys_dict, network
    
        # *************************************************************************
        # *************************************************************************
    
    
    # *****************************************************************************
    # *****************************************************************************
    
    
    def convert_edge_path(
        network: MultiDiGraph, path: list, allow_reversed_edges: bool = False
    ) -> list:
        """
        Converts a path of edge keys into a path of node keys.
    
        Parameters
        ----------
        network : nx.MultiDiGraph
            The objet describing the network.
        path : list
            A list of sequential edge keys that form a path.
        allow_reversed_edges : bool, optional
            If True, edges in the opposite direction also count to form paths, as
            long as the same nodes are involved. The default is False.
    
        Returns
        -------
        list
            A list of node keys forming a path.
    
        """
    
        # check if the path corresponds to an edge path
        if not gis_iden.is_edge_path(
            network, path, ignore_edge_direction=allow_reversed_edges
        ):
            raise ValueError("No edge path was provided.")
    
        # path is a sequence of edge keys: convert to node path
        if allow_reversed_edges:
            # reverse edges are allowed
            # drop self-loops, if any
            edge_path = [
                edge_key
                for edge_key in path
                if edge_key[0] != edge_key[1]  # exclude self loops
            ]
    
            # if there is only one edge, the node path is straightforward
            if len(edge_path) == 1:
                return [edge_path[0][0], edge_path[0][1]]
    
            node_path = []
            for edge_index, edge_key in enumerate(edge_path):
                # if there are no nodes yet on the path
                if len(node_path) == 0:
                    # find out which node comes first
                    if edge_key[0] in edge_path[1]:
                        # the start node is in the second edge too: reversed
                        node_path.append(edge_key[1])
                        node_path.append(edge_key[0])
                    else:  # the edge is not reversed
                        node_path.append(edge_key[0])
                        node_path.append(edge_key[1])
                else:
                    # find out which node comes after the previous node
                    if node_path[-1] == edge_key[0]:
                        # the start node is the same as the previous node
                        node_path.append(edge_key[1])
                    else:
                        # the end node is the same as the previous node
                        node_path.append(edge_key[0])
        else:
            # no reversed edges
            node_path = [
                edge_key[0]
                for edge_key in path
                if edge_key[0] != edge_key[1]  # exclude self loops
            ]
            # add the last edge's end node
            node_path.append(path[-1][1])
        # return statement
        return node_path
    
    
    # *****************************************************************************
    # *****************************************************************************