Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • master
1 result

Target

Select target project
  • pmag/topupopt
1 result
Select Git revision
  • master
1 result
Show changes
Commits on Source (7)
Showing
with 2735 additions and 2811 deletions
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
This diff is collapsed.
...@@ -12,10 +12,7 @@ from .bbr import label_bbr_entrance_id, label_bbr_housing_area ...@@ -12,10 +12,7 @@ from .bbr import label_bbr_entrance_id, label_bbr_housing_area
# labels # labels
selected_bbr_adgang_labels = [ selected_bbr_adgang_labels = ["Opgang_id", "AdgAdr_id", "Bygning_id"]
"Opgang_id",
"AdgAdr_id",
"Bygning_id"]
selected_bbr_building_point_labels = [ selected_bbr_building_point_labels = [
"KoorOest", "KoorOest",
...@@ -47,15 +44,17 @@ selected_bbr_building_labels = [ ...@@ -47,15 +44,17 @@ selected_bbr_building_labels = [
"VARMEINSTAL_KODE", "VARMEINSTAL_KODE",
"OPVARMNING_KODE", "OPVARMNING_KODE",
"VARME_SUPPL_KODE", "VARME_SUPPL_KODE",
"BygPkt_id"] "BygPkt_id",
]
# label under which building entrance ids can be found in OSM # label under which building entrance ids can be found in OSM
label_osm_entrance_id = 'osak:identifier' label_osm_entrance_id = "osak:identifier"
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
def heat_demand_dict_by_building_entrance( def heat_demand_dict_by_building_entrance(
gdf_osm: GeoDataFrame, gdf_osm: GeoDataFrame,
gdf_buildings: GeoDataFrame, gdf_buildings: GeoDataFrame,
...@@ -67,9 +66,8 @@ def heat_demand_dict_by_building_entrance( ...@@ -67,9 +66,8 @@ def heat_demand_dict_by_building_entrance(
key_osm_entr_id: str = label_osm_entrance_id, key_osm_entr_id: str = label_osm_entrance_id,
key_bbr_entr_id: str = label_bbr_entrance_id, key_bbr_entr_id: str = label_bbr_entrance_id,
avg_state: list = None, avg_state: list = None,
state_correlates_with_output: bool = False state_correlates_with_output: bool = False,
) -> dict: ) -> dict:
# initialise dict for each building entrance # initialise dict for each building entrance
demand_dict = {} demand_dict = {}
...@@ -77,24 +75,19 @@ def heat_demand_dict_by_building_entrance( ...@@ -77,24 +75,19 @@ def heat_demand_dict_by_building_entrance(
# for each building entrance # for each building entrance
for osm_index in gdf_osm.index: for osm_index in gdf_osm.index:
# initialise dict for each building consumption point # initialise dict for each building consumption point
heat_demand_profiles = [] heat_demand_profiles = []
# find the indexes for each building leading to the curr. cons. point # find the indexes for each building leading to the curr. cons. point
building_indexes = ( building_indexes = gdf_buildings[
gdf_buildings[ gdf_buildings[key_bbr_entr_id] == gdf_osm.loc[osm_index][key_osm_entr_id]
gdf_buildings[key_bbr_entr_id] ==
gdf_osm.loc[osm_index][key_osm_entr_id]
].index ].index
)
# for each building # for each building
for building_index in building_indexes: for building_index in building_indexes:
# get relevant data # get relevant data
# base_load_avg_ratio = 0.3 # base_load_avg_ratio = 0.3
...@@ -106,7 +99,6 @@ def heat_demand_dict_by_building_entrance( ...@@ -106,7 +99,6 @@ def heat_demand_dict_by_building_entrance(
# estimate its demand # estimate its demand
if type(avg_state) == type(None): if type(avg_state) == type(None):
# ignore states # ignore states
heat_demand_profiles.append( heat_demand_profiles.append(
...@@ -114,19 +106,18 @@ def heat_demand_dict_by_building_entrance( ...@@ -114,19 +106,18 @@ def heat_demand_dict_by_building_entrance(
discrete_sinusoid_matching_integral( discrete_sinusoid_matching_integral(
bdg_specific_demand[building_index] * area, bdg_specific_demand[building_index] * area,
time_interval_durations=time_interval_durations, time_interval_durations=time_interval_durations,
bdg_ratio_min_max=bdg_ratio_min_max[building_index], min_to_max_ratio=bdg_ratio_min_max[building_index],
phase_shift_radians=( phase_shift_radians=(
bdg_demand_phase_shift[building_index] bdg_demand_phase_shift[building_index]
# bdg_demand_phase_shift_amplitude*np.random.random() # bdg_demand_phase_shift_amplitude*np.random.random()
# if (type(bdg_demand_phase_shift_amplitude) == # if (type(bdg_demand_phase_shift_amplitude) ==
# type(None)) else None # type(None)) else None
) ),
) )
) )
) )
else: else:
# states matter # states matter
heat_demand_profiles.append( heat_demand_profiles.append(
...@@ -137,8 +128,8 @@ def heat_demand_dict_by_building_entrance( ...@@ -137,8 +128,8 @@ def heat_demand_dict_by_building_entrance(
), ),
avg_state=avg_state, avg_state=avg_state,
time_interval_durations=time_interval_durations, time_interval_durations=time_interval_durations,
bdg_ratio_min_max=bdg_ratio_min_max[building_index], min_to_max_ratio=bdg_ratio_min_max[building_index],
state_correlates_with_output=state_correlates_with_output state_correlates_with_output=state_correlates_with_output,
) )
) )
) )
...@@ -149,8 +140,7 @@ def heat_demand_dict_by_building_entrance( ...@@ -149,8 +140,7 @@ def heat_demand_dict_by_building_entrance(
if len(heat_demand_profiles) == 0: if len(heat_demand_profiles) == 0:
final_profile = [] final_profile = []
else: else:
final_profile = sum(profile final_profile = sum(profile for profile in heat_demand_profiles)
for profile in heat_demand_profiles)
# ********************************************************************* # *********************************************************************
...@@ -162,30 +152,30 @@ def heat_demand_dict_by_building_entrance( ...@@ -162,30 +152,30 @@ def heat_demand_dict_by_building_entrance(
# return # return
return demand_dict return demand_dict
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
def total_heating_area( def total_heating_area(
gdf_osm: GeoDataFrame, gdf_osm: GeoDataFrame,
gdf_buildings: GeoDataFrame, gdf_buildings: GeoDataFrame,
key_osm_entr_id: str = label_osm_entrance_id, key_osm_entr_id: str = label_osm_entrance_id,
key_bbr_entr_id: str = label_bbr_entrance_id key_bbr_entr_id: str = label_bbr_entrance_id,
) -> float: ) -> float:
area = 0 area = 0
for osm_index in gdf_osm.index: for osm_index in gdf_osm.index:
# find the indexes for each building leading to the curr. cons. point # find the indexes for each building leading to the curr. cons. point
building_indexes = ( building_indexes = gdf_buildings[
gdf_buildings[ gdf_buildings[label_bbr_entrance_id]
gdf_buildings[label_bbr_entrance_id] == == gdf_osm.loc[osm_index][label_osm_entrance_id]
gdf_osm.loc[osm_index][label_osm_entrance_id]
].index ].index
)
# for each building # for each building
for building_index in building_indexes: for building_index in building_indexes:
# get relevant data # get relevant data
area += gdf_buildings.loc[building_index][label_bbr_housing_area] area += gdf_buildings.loc[building_index][label_bbr_housing_area]
return area return area
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
...@@ -24,15 +24,16 @@ from ...data.finance.utils import ArcInvestments ...@@ -24,15 +24,16 @@ from ...data.finance.utils import ArcInvestments
# constants # constants
KEY_DHT_OPTIONS_OBJ = 'trench' KEY_DHT_OPTIONS_OBJ = "trench"
KEY_DHT_LENGTH = 'length' KEY_DHT_LENGTH = "length"
KEY_DHT_UCF = 'capacity_unit_conversion_factor' KEY_DHT_UCF = "capacity_unit_conversion_factor"
KEY_HHT_DHT_PIPES = 'pipes' KEY_HHT_DHT_PIPES = "pipes"
KEY_HHT_STD_PIPES = 'pipe_tuple' KEY_HHT_STD_PIPES = "pipe_tuple"
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
class PipeTrenchOptions(ArcsWithoutProportionalLosses): class PipeTrenchOptions(ArcsWithoutProportionalLosses):
"A class for defining investments in district heating trenches." "A class for defining investments in district heating trenches."
...@@ -46,7 +47,6 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses): ...@@ -46,7 +47,6 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses):
capacity_is_instantaneous: bool = False, capacity_is_instantaneous: bool = False,
unit_conversion_factor: float = 1.0, unit_conversion_factor: float = 1.0,
): ):
# store the unit conversion # store the unit conversion
self.unit_conversion_factor = unit_conversion_factor self.unit_conversion_factor = unit_conversion_factor
# keep the trench object # keep the trench object
...@@ -54,13 +54,11 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses): ...@@ -54,13 +54,11 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses):
# keep the trench length # keep the trench length
self.length = ( self.length = (
[length for i in range(trench.number_options())] [length for i in range(trench.number_options())]
if trench.vector_mode else if trench.vector_mode
length else length
) )
# determine the rated heat capacity # determine the rated heat capacity
rhc = trench.rated_heat_capacity( rhc = trench.rated_heat_capacity(unit_conversion_factor=unit_conversion_factor)
unit_conversion_factor=unit_conversion_factor
)
# initialise the object using the mother class # initialise the object using the mother class
ArcsWithoutProportionalLosses.__init__( ArcsWithoutProportionalLosses.__init__(
self, self,
...@@ -70,10 +68,10 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses): ...@@ -70,10 +68,10 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses):
minimum_cost=minimum_cost, minimum_cost=minimum_cost,
specific_capacity_cost=( specific_capacity_cost=(
0 0
if type(specific_capacity_cost) == type(None) else if type(specific_capacity_cost) == type(None)
specific_capacity_cost else specific_capacity_cost
), ),
capacity_is_instantaneous=False capacity_is_instantaneous=False,
) )
# initialise the minimum cost # initialise the minimum cost
if type(minimum_cost) == type(None): if type(minimum_cost) == type(None):
...@@ -83,7 +81,6 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses): ...@@ -83,7 +81,6 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses):
# ************************************************************************* # *************************************************************************
def set_minimum_cost(self, minimum_cost=None): def set_minimum_cost(self, minimum_cost=None):
# minimum arc cost # minimum arc cost
# if no external minimum cost list was provided, calculate it # if no external minimum cost list was provided, calculate it
if type(minimum_cost) == type(None): if type(minimum_cost) == type(None):
...@@ -91,13 +88,12 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses): ...@@ -91,13 +88,12 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses):
if self.trench.vector_mode: if self.trench.vector_mode:
# multiple options # multiple options
self.minimum_cost = tuple( self.minimum_cost = tuple(
(pipe.sp*length # twin pipes: one twin pipe (
if self.trench.twin_pipes else pipe.sp * length # twin pipes: one twin pipe
pipe.sp*length*2) # single pipes: two single pipes if self.trench.twin_pipes
for pipe, length in zip( else pipe.sp * length * 2
self.trench.supply_pipe, ) # single pipes: two single pipes
self.length for pipe, length in zip(self.trench.supply_pipe, self.length)
)
) )
else: # only one option else: # only one option
self.minimum_cost = (self.trench.supply_pipe.sp * self.length,) self.minimum_cost = (self.trench.supply_pipe.sp * self.length,)
...@@ -129,43 +125,39 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses): ...@@ -129,43 +125,39 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses):
temperature_surroundings: float or list, temperature_surroundings: float or list,
length: float or list = None, length: float or list = None,
unit_conversion_factor: float = None, unit_conversion_factor: float = None,
**kwargs): **kwargs
):
hts = self.trench.heat_transfer_surroundings( hts = self.trench.heat_transfer_surroundings(
ground_thermal_conductivity=ground_thermal_conductivity, ground_thermal_conductivity=ground_thermal_conductivity,
ground_air_heat_transfer_coefficient=( ground_air_heat_transfer_coefficient=(ground_air_heat_transfer_coefficient),
ground_air_heat_transfer_coefficient),
time_interval_duration=time_interval_duration, time_interval_duration=time_interval_duration,
temperature_surroundings=temperature_surroundings, temperature_surroundings=temperature_surroundings,
length=( length=(self.length if type(length) == type(None) else length),
self.length
if type(length) == type(None) else
length
),
unit_conversion_factor=( unit_conversion_factor=(
self.unit_conversion_factor self.unit_conversion_factor
if type(unit_conversion_factor) == type(None) else if type(unit_conversion_factor) == type(None)
unit_conversion_factor else unit_conversion_factor
), ),
**kwargs) **kwargs
)
if self.trench.vector_mode: if self.trench.vector_mode:
# multiple options: hts is a vector # multiple options: hts is a vector
if (hasattr(self, "static_loss") and if hasattr(self, "static_loss") and type(self.static_loss) != type(None):
type(self.static_loss) != type(None)):
# update the static loss dictionary # update the static loss dictionary
if type(hts[0]) == list: if type(hts[0]) == list:
# multiple time intervals # multiple time intervals
self.static_loss.update({ self.static_loss.update(
{
(h, scenario_key, k): hts[h][k] (h, scenario_key, k): hts[h][k]
for h, hts_h in enumerate(hts) for h, hts_h in enumerate(hts)
for k, hts_hk in enumerate(hts_h) for k, hts_hk in enumerate(hts_h)
}) }
)
else: # not a list: one time interval else: # not a list: one time interval
self.static_loss.update({ self.static_loss.update(
(h, scenario_key, 0): hts[h] {(h, scenario_key, 0): hts[h] for h, hts_h in enumerate(hts)}
for h, hts_h in enumerate(hts) )
})
else: else:
# no static loss dictionary, create it # no static loss dictionary, create it
if type(hts[0]) == list: if type(hts[0]) == list:
...@@ -177,40 +169,34 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses): ...@@ -177,40 +169,34 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses):
} }
else: # not a list: one time interval else: # not a list: one time interval
self.static_loss = { self.static_loss = {
(h, scenario_key, 0): hts[h] (h, scenario_key, 0): hts[h] for h, hts_h in enumerate(hts)
for h, hts_h in enumerate(hts)
} }
else: else:
# one option: hts might be a number # one option: hts might be a number
if (hasattr(self, "static_loss") and if hasattr(self, "static_loss") and type(self.static_loss) != type(None):
type(self.static_loss) != type(None)):
# update the static loss dictionary # update the static loss dictionary
if not isinstance(hts, Real): if not isinstance(hts, Real):
# multiple time intervals # multiple time intervals
self.static_loss.update({ self.static_loss.update(
(0, scenario_key, k): hts[k] {(0, scenario_key, k): hts[k] for k, hts_k in enumerate(hts)}
for k, hts_k in enumerate(hts) )
})
else: # not a list: one time interval else: # not a list: one time interval
self.static_loss.update({ self.static_loss.update({(0, scenario_key, 0): hts})
(0, scenario_key, 0): hts
})
else: else:
# no static loss dictionary, create it # no static loss dictionary, create it
if not isinstance(hts, Real): if not isinstance(hts, Real):
# multiple time intervals # multiple time intervals
self.static_loss = { self.static_loss = {
(0, scenario_key, k): hts_k (0, scenario_key, k): hts_k for k, hts_k in enumerate(hts)
for k, hts_k in enumerate(hts)
} }
else: # not a list: one time interval else: # not a list: one time interval
self.static_loss = { self.static_loss = {(0, scenario_key, 0): hts}
(0, scenario_key, 0): hts
}
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions): class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions):
"A class for defining investments in district heating trenches." "A class for defining investments in district heating trenches."
...@@ -226,7 +212,6 @@ class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions): ...@@ -226,7 +212,6 @@ class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions):
unit_conversion_factor: float = 1.0, unit_conversion_factor: float = 1.0,
**kwargs **kwargs
): ):
# store the unit conversion # store the unit conversion
self.unit_conversion_factor = unit_conversion_factor self.unit_conversion_factor = unit_conversion_factor
# keep the trench object # keep the trench object
...@@ -234,13 +219,11 @@ class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions): ...@@ -234,13 +219,11 @@ class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions):
# keep the trench length # keep the trench length
self.length = ( self.length = (
[length for i in range(trench.number_options())] [length for i in range(trench.number_options())]
if trench.vector_mode else if trench.vector_mode
length else length
) )
# determine the rated heat capacity # determine the rated heat capacity
rhc = trench.rated_heat_capacity( rhc = trench.rated_heat_capacity(unit_conversion_factor=unit_conversion_factor)
unit_conversion_factor=unit_conversion_factor
)
# initialise the object using the mother class # initialise the object using the mother class
ArcInvestments.__init__( ArcInvestments.__init__(
self, self,
...@@ -252,11 +235,11 @@ class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions): ...@@ -252,11 +235,11 @@ class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions):
capacity=[rhc] if isinstance(rhc, Real) else rhc, capacity=[rhc] if isinstance(rhc, Real) else rhc,
specific_capacity_cost=( specific_capacity_cost=(
0 0
if type(specific_capacity_cost) == type(None) else if type(specific_capacity_cost) == type(None)
specific_capacity_cost else specific_capacity_cost
), ),
capacity_is_instantaneous=False, capacity_is_instantaneous=False,
validate=False validate=False,
) )
# # ************************************************************************* # # *************************************************************************
...@@ -388,9 +371,11 @@ class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions): ...@@ -388,9 +371,11 @@ class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions):
# (0, scenario_key, 0): hts # (0, scenario_key, 0): hts
# } # }
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
class ExistingPipeTrench(PipeTrenchOptions): class ExistingPipeTrench(PipeTrenchOptions):
"A class for existing pipe trenches." "A class for existing pipe trenches."
...@@ -398,10 +383,12 @@ class ExistingPipeTrench(PipeTrenchOptions): ...@@ -398,10 +383,12 @@ class ExistingPipeTrench(PipeTrenchOptions):
# initialise # initialise
PipeTrenchOptions.__init__( PipeTrenchOptions.__init__(
self, self,
minimum_cost=[0 for i in range(kwargs['trench'].number_options())], minimum_cost=[0 for i in range(kwargs["trench"].number_options())],
**kwargs) **kwargs
)
# define the option that already exists # define the option that already exists
self.options_selected[option_selected] = True self.options_selected[option_selected] = True
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
...@@ -19,8 +19,8 @@ from numbers import Real ...@@ -19,8 +19,8 @@ from numbers import Real
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
def cost_pipes(trench: SupplyReturnPipeTrench,
length: float or tuple) -> tuple: def cost_pipes(trench: SupplyReturnPipeTrench, length: float or tuple) -> tuple:
""" """
Returns the costs of each trench option for a given trench length. Returns the costs of each trench option for a given trench length.
...@@ -45,30 +45,33 @@ def cost_pipes(trench: SupplyReturnPipeTrench, ...@@ -45,30 +45,33 @@ def cost_pipes(trench: SupplyReturnPipeTrench,
# use the specific pipe cost that features in the database # use the specific pipe cost that features in the database
if trench.vector_mode: if trench.vector_mode:
# multiple options # multiple options
if (type(length) == tuple and if type(length) == tuple and len(length) == trench.number_options():
len(length) == trench.number_options()):
# multiple trench lengths # multiple trench lengths
return tuple( return tuple(
(pipe.sp*length # twin pipes: one twin pipe (
if trench.twin_pipes else pipe.sp * length # twin pipes: one twin pipe
pipe.sp*length*2) # single pipes: two single pipes if trench.twin_pipes
else pipe.sp * length * 2
) # single pipes: two single pipes
for pipe, length in zip(trench.supply_pipe, length) for pipe, length in zip(trench.supply_pipe, length)
) )
elif isinstance(length, Real): elif isinstance(length, Real):
# one trench length # one trench length
return tuple( return tuple(
(pipe.sp*length # twin pipes: one twin pipe (
if trench.twin_pipes else pipe.sp * length # twin pipes: one twin pipe
pipe.sp*length*2) # single pipes: two single pipes if trench.twin_pipes
else pipe.sp * length * 2
) # single pipes: two single pipes
for pipe in trench.supply_pipe for pipe in trench.supply_pipe
) )
else: else:
raise ValueError('Unrecognised input combination.') raise ValueError("Unrecognised input combination.")
elif (not trench.vector_mode and isinstance(length, Real)): elif not trench.vector_mode and isinstance(length, Real):
# only one option # only one option
return (trench.supply_pipe.sp * length,) return (trench.supply_pipe.sp * length,)
else: # only one option else: # only one option
raise ValueError('Unrecognised input combination.') raise ValueError("Unrecognised input combination.")
# # keep the trench length # # keep the trench length
# self.length = ( # self.length = (
...@@ -77,12 +80,13 @@ def cost_pipes(trench: SupplyReturnPipeTrench, ...@@ -77,12 +80,13 @@ def cost_pipes(trench: SupplyReturnPipeTrench,
# length # length
# ) # )
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
def summarise_network_by_pipe_technology( def summarise_network_by_pipe_technology(
network: Network, network: Network, print_output: bool = False
print_output: bool = False
) -> dict: ) -> dict:
"A method to summarise a network by pipe technology." "A method to summarise a network by pipe technology."
...@@ -100,8 +104,7 @@ def summarise_network_by_pipe_technology( ...@@ -100,8 +104,7 @@ def summarise_network_by_pipe_technology(
for arc_key in network.edges(keys=True): for arc_key in network.edges(keys=True):
# check if it is a PipeTrench object # check if it is a PipeTrench object
if not isinstance( if not isinstance(
network.edges[arc_key][Network.KEY_ARC_TECH], network.edges[arc_key][Network.KEY_ARC_TECH], PipeTrenchOptions
PipeTrenchOptions
): ):
# if not, skip arc # if not, skip arc
continue continue
...@@ -116,13 +119,13 @@ def summarise_network_by_pipe_technology( ...@@ -116,13 +119,13 @@ def summarise_network_by_pipe_technology(
# get the length of the arc # get the length of the arc
arc_length = ( arc_length = (
network.edges[arc_key][Network.KEY_ARC_TECH].length[h] network.edges[arc_key][Network.KEY_ARC_TECH].length[h]
if type(network.edges[arc_key][ if type(network.edges[arc_key][Network.KEY_ARC_TECH].length) == list
Network.KEY_ARC_TECH].length) == list else else network.edges[arc_key][Network.KEY_ARC_TECH].length
network.edges[arc_key][Network.KEY_ARC_TECH].length
) )
# identify the option # identify the option
tech_option_label = network.edges[arc_key][ tech_option_label = network.edges[arc_key][
Network.KEY_ARC_TECH].trench.printable_description(h) Network.KEY_ARC_TECH
].trench.printable_description(h)
# if the arc technology has been previously selected... # if the arc technology has been previously selected...
if tech_option_label in length_dict: if tech_option_label in length_dict:
# ...increment the total length # ...increment the total length
...@@ -135,33 +138,35 @@ def summarise_network_by_pipe_technology( ...@@ -135,33 +138,35 @@ def summarise_network_by_pipe_technology(
# ************************************************************************* # *************************************************************************
if print_output: if print_output:
print('printing the arc technologies selected by pipe size...') print("printing the arc technologies selected by pipe size...")
for key, value in sorted( for key, value in sorted(
(tech, length) (tech, length) for tech, length in length_dict.items()
for tech, length in length_dict.items()
): ):
print(str(key)+': '+str(value)) print(str(key) + ": " + str(value))
print('total: '+str(sum(length_dict.values()))) print("total: " + str(sum(length_dict.values())))
return length_dict return length_dict
# ************************************************************************* # *************************************************************************
# ************************************************************************* # *************************************************************************
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
def plot_network_layout(network: Network,
def plot_network_layout(
network: Network,
include_basemap: bool = False, include_basemap: bool = False,
figure_size: tuple = (25, 25), figure_size: tuple = (25, 25),
min_linewidth: float = 1.0, min_linewidth: float = 1.0,
max_linewidth: float = 3.0, max_linewidth: float = 3.0,
legend_fontsize: float = 20.0, legend_fontsize: float = 20.0,
basemap_zoom_level: float = 15, basemap_zoom_level: float = 15,
legend_location: str = 'lower left', legend_location: str = "lower left",
legend_with_brand_model: bool = False, legend_with_brand_model: bool = False,
legend_transparency: float = None): legend_transparency: float = None,
):
# convert graph object to GDF # convert graph object to GDF
_, my_gdf_arcs = ox.graph_to_gdfs(network) _, my_gdf_arcs = ox.graph_to_gdfs(network)
...@@ -178,8 +183,7 @@ def plot_network_layout(network: Network, ...@@ -178,8 +183,7 @@ def plot_network_layout(network: Network,
for arc_key in my_gdf.index: for arc_key in my_gdf.index:
# check if it is a PipeTrenchOptions object # check if it is a PipeTrenchOptions object
if not isinstance( if not isinstance(
network.edges[arc_key][Network.KEY_ARC_TECH], network.edges[arc_key][Network.KEY_ARC_TECH], PipeTrenchOptions
PipeTrenchOptions
): ):
# if not, skip arc # if not, skip arc
continue continue
...@@ -188,10 +192,12 @@ def plot_network_layout(network: Network, ...@@ -188,10 +192,12 @@ def plot_network_layout(network: Network,
try: try:
selected_option = ( selected_option = (
my_gdf[Network.KEY_ARC_TECH].loc[ my_gdf[Network.KEY_ARC_TECH]
arc_key].trench.printable_description( .loc[arc_key]
my_gdf[Network.KEY_ARC_TECH].loc[ .trench.printable_description(
arc_key].options_selected.index(True) my_gdf[Network.KEY_ARC_TECH]
.loc[arc_key]
.options_selected.index(True)
) )
) )
except ValueError: except ValueError:
...@@ -209,15 +215,17 @@ def plot_network_layout(network: Network, ...@@ -209,15 +215,17 @@ def plot_network_layout(network: Network,
(int(printable_description[2:]), printable_description) (int(printable_description[2:]), printable_description)
for printable_description in arc_tech_summary_dict.keys() for printable_description in arc_tech_summary_dict.keys()
) )
(list_sorted_dn, (list_sorted_dn, list_sorted_descriptions) = list(map(list, zip(*list_sorted)))
list_sorted_descriptions) = list(map(list,zip(*list_sorted)))
list_arc_widths = [ list_arc_widths = (
min_linewidth+ [
(max_linewidth-min_linewidth)* min_linewidth
iteration/(len(list_sorted_dn)-1) + (max_linewidth - min_linewidth) * iteration / (len(list_sorted_dn) - 1)
for iteration, _ in enumerate(list_sorted_dn) for iteration, _ in enumerate(list_sorted_dn)
] if len(list_sorted_dn) != 1 else [(max_linewidth+min_linewidth)/2] ]
if len(list_sorted_dn) != 1
else [(max_linewidth + min_linewidth) / 2]
)
# ************************************************************************* # *************************************************************************
# ************************************************************************* # *************************************************************************
...@@ -226,35 +234,29 @@ def plot_network_layout(network: Network, ...@@ -226,35 +234,29 @@ def plot_network_layout(network: Network,
fig.set_size_inches(*figure_size) fig.set_size_inches(*figure_size)
for description, arc_width in zip( for description, arc_width in zip(list_sorted_descriptions, list_arc_widths):
list_sorted_descriptions,
list_arc_widths
):
# prepare plot # prepare plot
my_gdf.loc[arc_tech_summary_dict[description]].plot( my_gdf.loc[arc_tech_summary_dict[description]].plot(
edgecolor='k', edgecolor="k", legend=True, linewidth=arc_width, ax=ax
legend=True, )
linewidth=arc_width,
ax=ax)
# adjust legend labels # adjust legend labels
ax.legend(list_sorted_descriptions, ax.legend(
list_sorted_descriptions,
fontsize=legend_fontsize, fontsize=legend_fontsize,
loc=legend_location, loc=legend_location,
framealpha=( framealpha=(
legend_transparency legend_transparency if type(legend_transparency) != type(None) else None
if type(legend_transparency) != type(None) else None ),
)
) )
# add base map # add base map
if include_basemap: if include_basemap:
cx.add_basemap(
cx.add_basemap(ax, ax,
zoom=basemap_zoom_level, zoom=basemap_zoom_level,
source=cx.providers.OpenStreetMap.Mapnik, source=cx.providers.OpenStreetMap.Mapnik,
# crs=gdf_map.crs, # crs=gdf_map.crs,
...@@ -263,24 +265,25 @@ def plot_network_layout(network: Network, ...@@ -263,24 +265,25 @@ def plot_network_layout(network: Network,
# ************************************************************************* # *************************************************************************
# ************************************************************************* # *************************************************************************
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
def plot_heating_demand( def plot_heating_demand(
losses: list, losses: list,
end_use_demand: list, end_use_demand: list,
labels: list, labels: list,
ylabel: str = 'Heating demand [MWh]', ylabel: str = "Heating demand [MWh]",
title: str = 'Heat demand by month' title: str = "Heat demand by month",
): ):
energy_totals = { energy_totals = {
'Losses (optimised)': np.array(losses), "Losses (optimised)": np.array(losses),
'End use (estimated)': np.array(end_use_demand), "End use (estimated)": np.array(end_use_demand),
} }
colors = { colors = {
'Losses (optimised)': 'tab:orange', "Losses (optimised)": "tab:orange",
'End use (estimated)': 'tab:blue', "End use (estimated)": "tab:blue",
} }
# width = 0.8 # the width of the bars: can also be len(x) sequence # width = 0.8 # the width of the bars: can also be len(x) sequence
...@@ -293,18 +296,17 @@ def plot_heating_demand( ...@@ -293,18 +296,17 @@ def plot_heating_demand(
figure_size = (8, 4) figure_size = (8, 4)
fig.set_size_inches(figure_size[0], figure_size[1]) fig.set_size_inches(figure_size[0], figure_size[1])
for energy_category, energy_total in energy_totals.items(): for energy_category, energy_total in energy_totals.items():
p = ax.bar( p = ax.bar(
labels, labels,
energy_total, energy_total,
label=energy_category, label=energy_category,
bottom=bottom, bottom=bottom,
color=colors[energy_category], color=colors[energy_category],
zorder=zorder_bars zorder=zorder_bars,
) )
bottom += energy_total bottom += energy_total
ax.bar_label(p, fmt='{:,.0f}', label_type='center') ax.bar_label(p, fmt="{:,.0f}", label_type="center")
# ax.bar_label(p, fmt='{:,.0f}') # ax.bar_label(p, fmt='{:,.0f}')
ax.grid(zorder=zorder_grid) # zorder=0 to make the grid ax.grid(zorder=zorder_grid) # zorder=0 to make the grid
...@@ -313,5 +315,6 @@ def plot_heating_demand( ...@@ -313,5 +315,6 @@ def plot_heating_demand(
plt.show() plt.show()
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
...@@ -10,6 +10,7 @@ from statistics import mean ...@@ -10,6 +10,7 @@ from statistics import mean
# TODO: enable swapping the polarity # TODO: enable swapping the polarity
class Investment: class Investment:
"""This class is meant to enable analysis of specific investments.""" """This class is meant to enable analysis of specific investments."""
...@@ -18,11 +19,13 @@ class Investment: ...@@ -18,11 +19,13 @@ class Investment:
# TODO: consider using dicts to make things more intuitive, time-wise # TODO: consider using dicts to make things more intuitive, time-wise
def __init__(self, def __init__(
self,
discount_rates: list, discount_rates: list,
net_cash_flows: list = None, net_cash_flows: list = None,
discount_rate: float = None, discount_rate: float = None,
analysis_period_span: int = None): analysis_period_span: int = None,
):
""" """
Create an object for investment analysis using typical information. Create an object for investment analysis using typical information.
...@@ -41,54 +44,43 @@ class Investment: ...@@ -41,54 +44,43 @@ class Investment:
# validate the inputs # validate the inputs
if type(discount_rates) != type(None): if type(discount_rates) != type(None):
# discount_rates is not None: # discount_rates is not None:
if type(discount_rates) != tuple: if type(discount_rates) != tuple:
raise TypeError("The discount rates must be provided as a tuple.")
raise TypeError(
'The discount rates must be provided as a tuple.')
self.discount_rates = tuple(discount_rates) self.discount_rates = tuple(discount_rates)
self.analysis_period_span = len(self.discount_rates) self.analysis_period_span = len(self.discount_rates)
if self.analysis_period_span <= 0: if self.analysis_period_span <= 0:
raise ValueError( raise ValueError(
'The duration of the period under analysis must be '+ "The duration of the period under analysis must be " + "positive."
'positive.'
) )
else: else:
# discount_rates is None: # discount_rates is None:
# discount rate must be positive real under 1 # discount rate must be positive real under 1
# analysis_period_span must be an int # analysis_period_span must be an int
if type(discount_rate) != float: if type(discount_rate) != float:
raise TypeError("The discount rate must be provided as a float.")
raise TypeError(
'The discount rate must be provided as a float.')
if discount_rate <= 0 or discount_rate >= 1: if discount_rate <= 0 or discount_rate >= 1:
raise ValueError( raise ValueError(
'The discount rate must be in the open interval between 0'+ "The discount rate must be in the open interval between 0"
' and 1.' + " and 1."
) )
if type(analysis_period_span) != int: if type(analysis_period_span) != int:
raise TypeError( raise TypeError(
'The duration of the period under consideration must be '+ "The duration of the period under consideration must be "
'provided as an integer.') + "provided as an integer."
)
if analysis_period_span <= 0: if analysis_period_span <= 0:
raise ValueError( raise ValueError(
'The duration of the period under analysis must be '+ "The duration of the period under analysis must be " + "positive."
'positive.'
) )
self.analysis_period_span = analysis_period_span self.analysis_period_span = analysis_period_span
...@@ -100,27 +92,18 @@ class Investment: ...@@ -100,27 +92,18 @@ class Investment:
# check the net cash flows # check the net cash flows
if type(net_cash_flows) != type(None): if type(net_cash_flows) != type(None):
if type(net_cash_flows) != list: if type(net_cash_flows) != list:
raise TypeError("The net cash flows must be provided as a list.")
raise TypeError(
'The net cash flows must be provided as a list.')
if len(net_cash_flows) != self.analysis_period_span + 1: if len(net_cash_flows) != self.analysis_period_span + 1:
raise ValueError("The inputs are consistent in terms of length.")
raise ValueError(
'The inputs are consistent in terms of length.'
)
self.net_cash_flows = list(net_cash_flows) self.net_cash_flows = list(net_cash_flows)
else: else:
# net_cash_flows is None: initialise it as a list of zeros # net_cash_flows is None: initialise it as a list of zeros
self.net_cash_flows = list( self.net_cash_flows = list(0 for i in range(self.analysis_period_span + 1))
0 for i in range(self.analysis_period_span+1)
)
# discount factors # discount factors
...@@ -132,15 +115,15 @@ class Investment: ...@@ -132,15 +115,15 @@ class Investment:
# ************************************************************************* # *************************************************************************
# ************************************************************************* # *************************************************************************
def add_investment(self, def add_investment(
self,
investment: float, investment: float,
investment_period: int, investment_period: int,
investment_longevity: int, investment_longevity: int,
commissioning_delay_after_investment: int = 0, commissioning_delay_after_investment: int = 0,
salvage_value_method: str = 'annuity'): salvage_value_method: str = "annuity",
):
if salvage_value_method == 'annuity': if salvage_value_method == "annuity":
mean_discount_rate = mean(self.discount_rates) mean_discount_rate = mean(self.discount_rates)
residual_value = salvage_value_annuity( residual_value = salvage_value_annuity(
...@@ -148,14 +131,13 @@ class Investment: ...@@ -148,14 +131,13 @@ class Investment:
investment_longevity=investment_longevity, investment_longevity=investment_longevity,
investment_period=investment_period, investment_period=investment_period,
discount_rate=mean_discount_rate, discount_rate=mean_discount_rate,
analysis_period_span=self.analysis_period_span analysis_period_span=self.analysis_period_span,
) )
self.net_cash_flows[investment_period] += investment self.net_cash_flows[investment_period] += investment
self.net_cash_flows[self.analysis_period_span] += -residual_value self.net_cash_flows[self.analysis_period_span] += -residual_value
else: else:
residual_value = salvage_value_linear_depreciation( residual_value = salvage_value_linear_depreciation(
investment=investment, investment=investment,
investment_period=investment_period, investment_period=investment_period,
...@@ -163,7 +145,7 @@ class Investment: ...@@ -163,7 +145,7 @@ class Investment:
analysis_period_span=self.analysis_period_span, analysis_period_span=self.analysis_period_span,
commissioning_delay_after_investment=( commissioning_delay_after_investment=(
commissioning_delay_after_investment commissioning_delay_after_investment
) ),
) )
self.net_cash_flows[investment_period] += investment self.net_cash_flows[investment_period] += investment
...@@ -172,30 +154,24 @@ class Investment: ...@@ -172,30 +154,24 @@ class Investment:
# ************************************************************************* # *************************************************************************
# ************************************************************************* # *************************************************************************
def add_operational_cash_flows(self, def add_operational_cash_flows(
cash_flow: float or int, self, cash_flow: float or int, start_period: int, longevity: int = None
start_period: int, ):
longevity: int = None):
"""Adds a sequence of cash flows to the analysis.""" """Adds a sequence of cash flows to the analysis."""
if type(longevity) == type(None): if type(longevity) == type(None):
# until the planning horizon # until the planning horizon
for i in range(self.analysis_period_span - start_period + 1): for i in range(self.analysis_period_span - start_period + 1):
# add operational cash flows # add operational cash flows
self.net_cash_flows[i + start_period] += cash_flow self.net_cash_flows[i + start_period] += cash_flow
else: else:
# limited longevity # limited longevity
for i in range(longevity): for i in range(longevity):
if i + start_period >= self.analysis_period_span + 1: if i + start_period >= self.analysis_period_span + 1:
break break
# add operational cash flows # add operational cash flows
...@@ -213,12 +189,14 @@ class Investment: ...@@ -213,12 +189,14 @@ class Investment:
# ************************************************************************* # *************************************************************************
# ************************************************************************* # *************************************************************************
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
def npv(discount_rates: list,
net_cash_flows: list, def npv(
return_discount_factors: bool = False) -> float or tuple: discount_rates: list, net_cash_flows: list, return_discount_factors: bool = False
) -> float or tuple:
""" """
Calculates the net present value using the information provided. Calculates the net present value using the information provided.
...@@ -248,33 +226,32 @@ def npv(discount_rates: list, ...@@ -248,33 +226,32 @@ def npv(discount_rates: list,
# check sizes # check sizes
if len(discount_rates) != len(net_cash_flows) - 1: if len(discount_rates) != len(net_cash_flows) - 1:
# the inputs do not match, return None # the inputs do not match, return None
raise ValueError('The inputs are inconsistent.') raise ValueError("The inputs are inconsistent.")
discount_factors = [ discount_factors = [
discount_factor(discount_rates[:t]) discount_factor(discount_rates[:t]) for t in range(len(discount_rates) + 1)
for t in range(len(discount_rates)+1)
] ]
if return_discount_factors: if return_discount_factors:
return (
return sum( sum(
ncf_t*df_t ncf_t * df_t for (ncf_t, df_t) in zip(net_cash_flows, discount_factors)
for (ncf_t, df_t) in zip(net_cash_flows, discount_factors) ),
), discount_factors discount_factors,
)
else: else:
return sum( return sum(
ncf_t*df_t ncf_t * df_t for (ncf_t, df_t) in zip(net_cash_flows, discount_factors)
for (ncf_t, df_t) in zip(net_cash_flows, discount_factors)
) )
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
def discount_factor(discount_rates: list) -> float: def discount_factor(discount_rates: list) -> float:
""" """
Return the discount factor consistent with the discount rates provided. Return the discount factor consistent with the discount rates provided.
...@@ -298,15 +275,18 @@ def discount_factor(discount_rates: list) -> float: ...@@ -298,15 +275,18 @@ def discount_factor(discount_rates: list) -> float:
""" """
return prod([1 / (1 + i) for i in discount_rates]) return prod([1 / (1 + i) for i in discount_rates])
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
def salvage_value_linear_depreciation( def salvage_value_linear_depreciation(
investment: int or float, investment: int or float,
investment_period: int, investment_period: int,
investment_longevity: int, investment_longevity: int,
analysis_period_span: int, analysis_period_span: int,
commissioning_delay_after_investment: int = 1) -> float: commissioning_delay_after_investment: int = 1,
) -> float:
""" """
Determine an asset\'s salvage value by the end of an analysis period. Determine an asset\'s salvage value by the end of an analysis period.
...@@ -341,66 +321,78 @@ def salvage_value_linear_depreciation( ...@@ -341,66 +321,78 @@ def salvage_value_linear_depreciation(
""" """
if investment_period >= analysis_period_span + 1: if investment_period >= analysis_period_span + 1:
raise ValueError( raise ValueError(
'The investment has to be made within the period being analysed.' "The investment has to be made within the period being analysed."
) )
# calculate the salvage value # calculate the salvage value
return ( return (
investment_longevity+ (
investment_period+ investment_longevity
commissioning_delay_after_investment-1- + investment_period
analysis_period_span + commissioning_delay_after_investment
)*investment/investment_longevity - 1
- analysis_period_span
)
* investment
/ investment_longevity
)
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
def salvage_value_annuity(investment: int or float,
def salvage_value_annuity(
investment: int or float,
discount_rate: float, discount_rate: float,
investment_longevity: int, investment_longevity: int,
investment_period: int, investment_period: int,
analysis_period_span: int) -> float: analysis_period_span: int,
) -> float:
npv_salvage = present_salvage_value_annuity( npv_salvage = present_salvage_value_annuity(
investment=investment, investment=investment,
investment_longevity=investment_longevity, investment_longevity=investment_longevity,
investment_period=investment_period, investment_period=investment_period,
discount_rate=discount_rate, discount_rate=discount_rate,
analysis_period_span=analysis_period_span, analysis_period_span=analysis_period_span,
return_annuity=False return_annuity=False,
) )
return npv_salvage / discount_factor( return npv_salvage / discount_factor(
tuple(discount_rate for i in range(analysis_period_span)) tuple(discount_rate for i in range(analysis_period_span))
) )
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
def annuity(investment: int or float,
investment_longevity: int, def annuity(
discount_rate: float) -> float: investment: int or float, investment_longevity: int, discount_rate: float
) -> float:
"Returns the annuity value for a given investment sum and longevity." "Returns the annuity value for a given investment sum and longevity."
return ( return (
investment* investment
discount_rate/(1-(1+discount_rate)**( * discount_rate
-investment_longevity / (1 - (1 + discount_rate) ** (-investment_longevity))
))
) )
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
def present_salvage_value_annuity(investment: int or float,
def present_salvage_value_annuity(
investment: int or float,
investment_longevity: int, investment_longevity: int,
investment_period: int, investment_period: int,
discount_rate: float, discount_rate: float,
analysis_period_span: int, analysis_period_span: int,
return_annuity: bool = False) -> float: return_annuity: bool = False,
) -> float:
""" """
Calculates the present value of an asset after a given analysis period. Calculates the present value of an asset after a given analysis period.
...@@ -441,25 +433,21 @@ def present_salvage_value_annuity(investment: int or float, ...@@ -441,25 +433,21 @@ def present_salvage_value_annuity(investment: int or float,
""" """
if investment_period >= analysis_period_span + 1: if investment_period >= analysis_period_span + 1:
raise ValueError( raise ValueError(
'The investment has to be made within the period being analysed.' "The investment has to be made within the period being analysed."
) )
# the present salvage value requires the lifetime to extend beyond the hor. # the present salvage value requires the lifetime to extend beyond the hor.
if analysis_period_span >= investment_longevity + investment_period: if analysis_period_span >= investment_longevity + investment_period:
if return_annuity: if return_annuity:
return 0, annuity( return 0, annuity(
investment=investment, investment=investment,
investment_longevity=investment_longevity, investment_longevity=investment_longevity,
discount_rate=discount_rate discount_rate=discount_rate,
) )
else: else:
return 0 return 0
# the annuity has to consider the asset longevity and the commission. delay # the annuity has to consider the asset longevity and the commission. delay
...@@ -467,35 +455,29 @@ def present_salvage_value_annuity(investment: int or float, ...@@ -467,35 +455,29 @@ def present_salvage_value_annuity(investment: int or float,
value_annuity = annuity( value_annuity = annuity(
investment=investment, investment=investment,
investment_longevity=investment_longevity, investment_longevity=investment_longevity,
discount_rate=discount_rate discount_rate=discount_rate,
) )
discount_rates = tuple( discount_rates = tuple(
discount_rate discount_rate for i in range(investment_longevity + investment_period)
for i in range(investment_longevity+investment_period)
) )
net_cash_flows = list( net_cash_flows = list(
value_annuity value_annuity for i in range(investment_longevity + investment_period + 1)
for i in range(investment_longevity+investment_period+1)
) )
for year_index in range(analysis_period_span + 1): for year_index in range(analysis_period_span + 1):
net_cash_flows[year_index] = 0 net_cash_flows[year_index] = 0
if return_annuity: if return_annuity:
return (
return npv( npv(discount_rates=discount_rates, net_cash_flows=net_cash_flows),
discount_rates=discount_rates, value_annuity,
net_cash_flows=net_cash_flows )
), value_annuity
else: else:
return npv(discount_rates=discount_rates, net_cash_flows=net_cash_flows)
return npv(
discount_rates=discount_rates,
net_cash_flows=net_cash_flows
)
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
# *****************************************************************************
# *****************************************************************************
from ...problems.esipp.network import Arcs
# *****************************************************************************
# *****************************************************************************
class ArcInvestments(Arcs):
"""A class for defining arcs linked to investments."""
# *************************************************************************
# *************************************************************************
def __init__(self, investments: tuple, **kwargs):
# keep investment data
self.investments = investments
# initialise object
Arcs.__init__(
self,
minimum_cost=tuple([inv.net_present_value() for inv in self.investments]),
# validate=False,
**kwargs
)
# *************************************************************************
# *************************************************************************
def update_minimum_cost(self):
"Updates the minimum costs using the Investment objects."
self.minimum_cost = tuple([inv.net_present_value() for inv in self.investments])
# *************************************************************************
# *************************************************************************
# *****************************************************************************
# *****************************************************************************
\ No newline at end of file
# imports # imports
from math import inf from math import inf
...@@ -22,6 +21,7 @@ from ..gis import identify as ident ...@@ -22,6 +21,7 @@ from ..gis import identify as ident
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
def edge_lengths(network: MultiDiGraph, edge_keys: tuple = None) -> dict: def edge_lengths(network: MultiDiGraph, edge_keys: tuple = None) -> dict:
""" """
Calculate edge lengths in a OSMnx-formatted MultiDiGraph network object. Calculate edge lengths in a OSMnx-formatted MultiDiGraph network object.
...@@ -44,7 +44,7 @@ def edge_lengths(network: MultiDiGraph, edge_keys: tuple = None) -> dict: ...@@ -44,7 +44,7 @@ def edge_lengths(network: MultiDiGraph, edge_keys: tuple = None) -> dict:
""" """
# determine if the graph is projected or not # determine if the graph is projected or not
graph_is_projected = is_projected(network.graph['crs']) graph_is_projected = is_projected(network.graph["crs"])
# check if edge keys were specified # check if edge keys were specified
if type(edge_keys) == type(None): if type(edge_keys) == type(None):
# no particular edge keys were provided: consider all edges (default) # no particular edge keys were provided: consider all edges (default)
...@@ -64,12 +64,16 @@ def edge_lengths(network: MultiDiGraph, edge_keys: tuple = None) -> dict: ...@@ -64,12 +64,16 @@ def edge_lengths(network: MultiDiGraph, edge_keys: tuple = None) -> dict:
else: else:
# use (projected) coordinates # use (projected) coordinates
start_point = Point( start_point = Point(
(network.nodes[edge_key[0]][osm.KEY_OSMNX_X], (
network.nodes[edge_key[0]][osm.KEY_OSMNX_Y]) network.nodes[edge_key[0]][osm.KEY_OSMNX_X],
network.nodes[edge_key[0]][osm.KEY_OSMNX_Y],
)
) )
end_point = Point( end_point = Point(
(network.nodes[edge_key[1]][osm.KEY_OSMNX_X], (
network.nodes[edge_key[1]][osm.KEY_OSMNX_Y]) network.nodes[edge_key[1]][osm.KEY_OSMNX_X],
network.nodes[edge_key[1]][osm.KEY_OSMNX_Y],
)
) )
length_dict[edge_key] = start_point.distance(end_point) length_dict[edge_key] = start_point.distance(end_point)
...@@ -86,14 +90,16 @@ def edge_lengths(network: MultiDiGraph, edge_keys: tuple = None) -> dict: ...@@ -86,14 +90,16 @@ def edge_lengths(network: MultiDiGraph, edge_keys: tuple = None) -> dict:
lat1=network.nodes[edge_key[0]][osm.KEY_OSMNX_Y], lat1=network.nodes[edge_key[0]][osm.KEY_OSMNX_Y],
lon1=network.nodes[edge_key[0]][osm.KEY_OSMNX_X], lon1=network.nodes[edge_key[0]][osm.KEY_OSMNX_X],
lat2=network.nodes[edge_key[1]][osm.KEY_OSMNX_Y], lat2=network.nodes[edge_key[1]][osm.KEY_OSMNX_Y],
lon2=network.nodes[edge_key[1]][osm.KEY_OSMNX_X] lon2=network.nodes[edge_key[1]][osm.KEY_OSMNX_X],
) )
# return the dict with lengths of each edge # return the dict with lengths of each edge
return length_dict return length_dict
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
def great_circle_distance_along_path(path: LineString) -> float: def great_circle_distance_along_path(path: LineString) -> float:
""" """
Computes the great circle distance along a given path. Computes the great circle distance along a given path.
...@@ -121,13 +127,15 @@ def great_circle_distance_along_path(path: LineString) -> float: ...@@ -121,13 +127,15 @@ def great_circle_distance_along_path(path: LineString) -> float:
lat[:-1], # latitudes of starting points lat[:-1], # latitudes of starting points
lon[:-1], # longitudes of starting points lon[:-1], # longitudes of starting points
lat[1:], # latitudes of ending points lat[1:], # latitudes of ending points
lon[1:] # longitudes of ending points lon[1:], # longitudes of ending points
) )
) )
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
def update_street_count(network: MultiDiGraph): def update_street_count(network: MultiDiGraph):
""" """
Updates the street count attributes of nodes in a MultiDiGraph object. Updates the street count attributes of nodes in a MultiDiGraph object.
...@@ -145,16 +153,20 @@ def update_street_count(network: MultiDiGraph): ...@@ -145,16 +153,20 @@ def update_street_count(network: MultiDiGraph):
# update street count # update street count
street_count_dict = count_streets_per_node(network) street_count_dict = count_streets_per_node(network)
network.add_nodes_from( network.add_nodes_from(
((key, {osm.KEY_OSMNX_STREET_COUNT:value}) (
for key, value in street_count_dict.items()) (key, {osm.KEY_OSMNX_STREET_COUNT: value})
for key, value in street_count_dict.items()
) )
)
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
def node_path_length(network: MultiDiGraph,
path: list, def node_path_length(
return_minimum_length_only: bool = True) -> list or float: network: MultiDiGraph, path: list, return_minimum_length_only: bool = True
) -> list or float:
""" """
Returns the length or lengths of a path defined using nodes. Returns the length or lengths of a path defined using nodes.
...@@ -200,9 +212,7 @@ def node_path_length(network: MultiDiGraph, ...@@ -200,9 +212,7 @@ def node_path_length(network: MultiDiGraph,
for node_pair in range(path_length - 1): for node_pair in range(path_length - 1):
# get the edges between these two nodes # get the edges between these two nodes
edge_keys = ident.get_edges_from_a_to_b( edge_keys = ident.get_edges_from_a_to_b(
network, network, path[node_pair], path[node_pair + 1]
path[node_pair],
path[node_pair+1]
) )
number_edge_keys = len(edge_keys) number_edge_keys = len(edge_keys)
if number_edge_keys == 1: if number_edge_keys == 1:
...@@ -225,15 +235,12 @@ def node_path_length(network: MultiDiGraph, ...@@ -225,15 +235,12 @@ def node_path_length(network: MultiDiGraph,
# add the new edge # add the new edge
list_of_edge_key_paths[ list_of_edge_key_paths[
path_index + edge_key_index * number_paths path_index + edge_key_index * number_paths
].append( ].append(edge_keys[edge_key_index])
edge_keys[edge_key_index]
)
# ************************************************************************* # *************************************************************************
path_lenths = [ path_lenths = [
sum(network.edges[edge_key][osm.KEY_OSMNX_LENGTH] sum(network.edges[edge_key][osm.KEY_OSMNX_LENGTH] for edge_key in edge_key_path)
for edge_key in edge_key_path)
for edge_key_path in list_of_edge_key_paths for edge_key_path in list_of_edge_key_paths
] ]
if return_minimum_length_only: if return_minimum_length_only:
...@@ -243,12 +250,12 @@ def node_path_length(network: MultiDiGraph, ...@@ -243,12 +250,12 @@ def node_path_length(network: MultiDiGraph,
# ************************************************************************* # *************************************************************************
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
def edge_path_length(network: MultiDiGraph,
path: list, def edge_path_length(network: MultiDiGraph, path: list, **kwargs) -> float:
**kwargs) -> float:
""" """
Returns the total length of a path defined using edges. Returns the total length of a path defined using edges.
...@@ -274,19 +281,19 @@ def edge_path_length(network: MultiDiGraph, ...@@ -274,19 +281,19 @@ def edge_path_length(network: MultiDiGraph,
if path_length == 0: if path_length == 0:
return inf return inf
if ident.is_edge_path(network, path, **kwargs): if ident.is_edge_path(network, path, **kwargs):
return sum( return sum(network.edges[edge_key][osm.KEY_OSMNX_LENGTH] for edge_key in path)
network.edges[edge_key][osm.KEY_OSMNX_LENGTH] for edge_key in path
)
else: else:
# no path provided # no path provided
return inf return inf
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
def count_ocurrences(gdf: GeoDataFrame,
column: str, def count_ocurrences(
column_entries: list = None) -> dict: gdf: GeoDataFrame, column: str, column_entries: list = None
) -> dict:
""" """
Counts the number of occurrences per entry in a DataFrame object's column. Counts the number of occurrences per entry in a DataFrame object's column.
...@@ -340,5 +347,6 @@ def count_ocurrences(gdf: GeoDataFrame, ...@@ -340,5 +347,6 @@ def count_ocurrences(gdf: GeoDataFrame,
# return statement # return statement
return count_dict return count_dict
# ***************************************************************************** # *****************************************************************************
# ***************************************************************************** # *****************************************************************************
This diff is collapsed.
This diff is collapsed.
...@@ -5,14 +5,14 @@ ...@@ -5,14 +5,14 @@
# general # general
KEY_OSM_CITY = 'addr:city' KEY_OSM_CITY = "addr:city"
KEY_OSM_COUNTRY = 'addr:country' KEY_OSM_COUNTRY = "addr:country"
KEY_OSM_HOUSE_NUMBER = 'addr:housenumber' KEY_OSM_HOUSE_NUMBER = "addr:housenumber"
KEY_OSM_MUNICIPALITY = 'addr:municipality' KEY_OSM_MUNICIPALITY = "addr:municipality"
KEY_OSM_PLACE = 'addr:place' KEY_OSM_PLACE = "addr:place"
KEY_OSM_POSTCODE = 'addr:postcode' KEY_OSM_POSTCODE = "addr:postcode"
KEY_OSM_STREET = 'addr:street' KEY_OSM_STREET = "addr:street"
KEY_OSM_SOURCE = 'source' KEY_OSM_SOURCE = "source"
KEYS_OSM = [ KEYS_OSM = [
KEY_OSM_CITY, KEY_OSM_CITY,
...@@ -22,37 +22,35 @@ KEYS_OSM = [ ...@@ -22,37 +22,35 @@ KEYS_OSM = [
KEY_OSM_PLACE, KEY_OSM_PLACE,
KEY_OSM_POSTCODE, KEY_OSM_POSTCODE,
KEY_OSM_STREET, KEY_OSM_STREET,
KEY_OSM_SOURCE KEY_OSM_SOURCE,
] ]
# country specific # country specific
KEY_COUNTRY_DK = 'dk' KEY_COUNTRY_DK = "dk"
KEY_OSM_DK_BUILDING_ENTRANCE_ID = 'osak:identifier' KEY_OSM_DK_BUILDING_ENTRANCE_ID = "osak:identifier"
KEY_OSM_BUILDING_ENTRANCE_ID = { KEY_OSM_BUILDING_ENTRANCE_ID = {KEY_COUNTRY_DK: KEY_OSM_DK_BUILDING_ENTRANCE_ID}
KEY_COUNTRY_DK: KEY_OSM_DK_BUILDING_ENTRANCE_ID
}
# ***************************************************************************** # *****************************************************************************
# osmnx # osmnx
KEY_OSMNX_OSMID = 'osmid' KEY_OSMNX_OSMID = "osmid"
KEY_OSMNX_ELEMENT_TYPE = 'element_type' KEY_OSMNX_ELEMENT_TYPE = "element_type"
KEY_OSMNX_NAME = 'name' KEY_OSMNX_NAME = "name"
KEY_OSMNX_GEOMETRY = 'geometry' KEY_OSMNX_GEOMETRY = "geometry"
KEY_OSMNX_REVERSED = 'reversed' KEY_OSMNX_REVERSED = "reversed"
KEY_OSMNX_LENGTH = 'length' KEY_OSMNX_LENGTH = "length"
KEY_OSMNX_ONEWAY = 'oneway' KEY_OSMNX_ONEWAY = "oneway"
KEY_OSMNX_X = 'x' KEY_OSMNX_X = "x"
KEY_OSMNX_Y = 'y' KEY_OSMNX_Y = "y"
KEY_OSMNX_LON = 'lon' KEY_OSMNX_LON = "lon"
KEY_OSMNX_LAT = 'lat' KEY_OSMNX_LAT = "lat"
KEY_OSMNX_STREET_COUNT = 'street_count' KEY_OSMNX_STREET_COUNT = "street_count"
KEYS_OSMNX = [ KEYS_OSMNX = [
KEY_OSMNX_OSMID, # one half of multi-index for geodataframes from osmnx KEY_OSMNX_OSMID, # one half of multi-index for geodataframes from osmnx
...@@ -66,7 +64,7 @@ KEYS_OSMNX = [ ...@@ -66,7 +64,7 @@ KEYS_OSMNX = [
KEY_OSMNX_Y, KEY_OSMNX_Y,
KEY_OSMNX_LON, KEY_OSMNX_LON,
KEY_OSMNX_LAT, KEY_OSMNX_LAT,
KEY_OSMNX_STREET_COUNT KEY_OSMNX_STREET_COUNT,
] ]
KEYS_OSMNX_NODES = { KEYS_OSMNX_NODES = {
...@@ -77,28 +75,24 @@ KEYS_OSMNX_NODES = { ...@@ -77,28 +75,24 @@ KEYS_OSMNX_NODES = {
KEY_OSMNX_Y, KEY_OSMNX_Y,
KEY_OSMNX_LON, KEY_OSMNX_LON,
KEY_OSMNX_LAT, KEY_OSMNX_LAT,
KEY_OSMNX_STREET_COUNT KEY_OSMNX_STREET_COUNT,
} }
KEYS_OSMNX_NODES_ESSENTIAL = { KEYS_OSMNX_NODES_ESSENTIAL = {KEY_OSMNX_OSMID, KEY_OSMNX_NAME, KEY_OSMNX_STREET_COUNT}
KEY_OSMNX_OSMID,
KEY_OSMNX_NAME,
KEY_OSMNX_STREET_COUNT
}
KEYS_OSMNX_EDGES = { KEYS_OSMNX_EDGES = {
KEY_OSMNX_OSMID, KEY_OSMNX_OSMID,
KEY_OSMNX_LENGTH, KEY_OSMNX_LENGTH,
KEY_OSMNX_ONEWAY, KEY_OSMNX_ONEWAY,
KEY_OSMNX_GEOMETRY, KEY_OSMNX_GEOMETRY,
KEY_OSMNX_REVERSED KEY_OSMNX_REVERSED,
} }
KEYS_OSMNX_EDGES_ESSENTIAL = { KEYS_OSMNX_EDGES_ESSENTIAL = {
KEY_OSMNX_OSMID, KEY_OSMNX_OSMID,
KEY_OSMNX_LENGTH, KEY_OSMNX_LENGTH,
KEY_OSMNX_ONEWAY, KEY_OSMNX_ONEWAY,
KEY_OSMNX_REVERSED KEY_OSMNX_REVERSED,
} }
# ***************************************************************************** # *****************************************************************************
This diff is collapsed.
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-