Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
Loading items

Target

Select target project
  • pmag/topupopt
1 result
Select Git revision
Loading items
Show changes
Commits on Source (7)
Showing
with 2735 additions and 2811 deletions
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
This diff is collapsed.
......@@ -12,10 +12,7 @@ from .bbr import label_bbr_entrance_id, label_bbr_housing_area
# labels
selected_bbr_adgang_labels = [
"Opgang_id",
"AdgAdr_id",
"Bygning_id"]
selected_bbr_adgang_labels = ["Opgang_id", "AdgAdr_id", "Bygning_id"]
selected_bbr_building_point_labels = [
"KoorOest",
......@@ -47,15 +44,17 @@ selected_bbr_building_labels = [
"VARMEINSTAL_KODE",
"OPVARMNING_KODE",
"VARME_SUPPL_KODE",
"BygPkt_id"]
"BygPkt_id",
]
# label under which building entrance ids can be found in OSM
label_osm_entrance_id = 'osak:identifier'
label_osm_entrance_id = "osak:identifier"
# *****************************************************************************
# *****************************************************************************
def heat_demand_dict_by_building_entrance(
gdf_osm: GeoDataFrame,
gdf_buildings: GeoDataFrame,
......@@ -67,9 +66,8 @@ def heat_demand_dict_by_building_entrance(
key_osm_entr_id: str = label_osm_entrance_id,
key_bbr_entr_id: str = label_bbr_entrance_id,
avg_state: list = None,
state_correlates_with_output: bool = False
state_correlates_with_output: bool = False,
) -> dict:
# initialise dict for each building entrance
demand_dict = {}
......@@ -77,24 +75,19 @@ def heat_demand_dict_by_building_entrance(
# for each building entrance
for osm_index in gdf_osm.index:
# initialise dict for each building consumption point
heat_demand_profiles = []
# find the indexes for each building leading to the curr. cons. point
building_indexes = (
gdf_buildings[
gdf_buildings[key_bbr_entr_id] ==
gdf_osm.loc[osm_index][key_osm_entr_id]
building_indexes = gdf_buildings[
gdf_buildings[key_bbr_entr_id] == gdf_osm.loc[osm_index][key_osm_entr_id]
].index
)
# for each building
for building_index in building_indexes:
# get relevant data
# base_load_avg_ratio = 0.3
......@@ -106,7 +99,6 @@ def heat_demand_dict_by_building_entrance(
# estimate its demand
if type(avg_state) == type(None):
# ignore states
heat_demand_profiles.append(
......@@ -114,19 +106,18 @@ def heat_demand_dict_by_building_entrance(
discrete_sinusoid_matching_integral(
bdg_specific_demand[building_index] * area,
time_interval_durations=time_interval_durations,
bdg_ratio_min_max=bdg_ratio_min_max[building_index],
min_to_max_ratio=bdg_ratio_min_max[building_index],
phase_shift_radians=(
bdg_demand_phase_shift[building_index]
# bdg_demand_phase_shift_amplitude*np.random.random()
# if (type(bdg_demand_phase_shift_amplitude) ==
# type(None)) else None
)
),
)
)
)
else:
# states matter
heat_demand_profiles.append(
......@@ -137,8 +128,8 @@ def heat_demand_dict_by_building_entrance(
),
avg_state=avg_state,
time_interval_durations=time_interval_durations,
bdg_ratio_min_max=bdg_ratio_min_max[building_index],
state_correlates_with_output=state_correlates_with_output
min_to_max_ratio=bdg_ratio_min_max[building_index],
state_correlates_with_output=state_correlates_with_output,
)
)
)
......@@ -149,8 +140,7 @@ def heat_demand_dict_by_building_entrance(
if len(heat_demand_profiles) == 0:
final_profile = []
else:
final_profile = sum(profile
for profile in heat_demand_profiles)
final_profile = sum(profile for profile in heat_demand_profiles)
# *********************************************************************
......@@ -162,30 +152,30 @@ def heat_demand_dict_by_building_entrance(
# return
return demand_dict
# *****************************************************************************
# *****************************************************************************
def total_heating_area(
gdf_osm: GeoDataFrame,
gdf_buildings: GeoDataFrame,
key_osm_entr_id: str = label_osm_entrance_id,
key_bbr_entr_id: str = label_bbr_entrance_id
key_bbr_entr_id: str = label_bbr_entrance_id,
) -> float:
area = 0
for osm_index in gdf_osm.index:
# find the indexes for each building leading to the curr. cons. point
building_indexes = (
gdf_buildings[
gdf_buildings[label_bbr_entrance_id] ==
gdf_osm.loc[osm_index][label_osm_entrance_id]
building_indexes = gdf_buildings[
gdf_buildings[label_bbr_entrance_id]
== gdf_osm.loc[osm_index][label_osm_entrance_id]
].index
)
# for each building
for building_index in building_indexes:
# get relevant data
area += gdf_buildings.loc[building_index][label_bbr_housing_area]
return area
# *****************************************************************************
# *****************************************************************************
# -*- coding: utf-8 -*-
......@@ -24,15 +24,16 @@ from ...data.finance.utils import ArcInvestments
# constants
KEY_DHT_OPTIONS_OBJ = 'trench'
KEY_DHT_LENGTH = 'length'
KEY_DHT_UCF = 'capacity_unit_conversion_factor'
KEY_HHT_DHT_PIPES = 'pipes'
KEY_HHT_STD_PIPES = 'pipe_tuple'
KEY_DHT_OPTIONS_OBJ = "trench"
KEY_DHT_LENGTH = "length"
KEY_DHT_UCF = "capacity_unit_conversion_factor"
KEY_HHT_DHT_PIPES = "pipes"
KEY_HHT_STD_PIPES = "pipe_tuple"
# *****************************************************************************
# *****************************************************************************
class PipeTrenchOptions(ArcsWithoutProportionalLosses):
"A class for defining investments in district heating trenches."
......@@ -46,7 +47,6 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses):
capacity_is_instantaneous: bool = False,
unit_conversion_factor: float = 1.0,
):
# store the unit conversion
self.unit_conversion_factor = unit_conversion_factor
# keep the trench object
......@@ -54,13 +54,11 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses):
# keep the trench length
self.length = (
[length for i in range(trench.number_options())]
if trench.vector_mode else
length
if trench.vector_mode
else length
)
# determine the rated heat capacity
rhc = trench.rated_heat_capacity(
unit_conversion_factor=unit_conversion_factor
)
rhc = trench.rated_heat_capacity(unit_conversion_factor=unit_conversion_factor)
# initialise the object using the mother class
ArcsWithoutProportionalLosses.__init__(
self,
......@@ -70,10 +68,10 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses):
minimum_cost=minimum_cost,
specific_capacity_cost=(
0
if type(specific_capacity_cost) == type(None) else
specific_capacity_cost
if type(specific_capacity_cost) == type(None)
else specific_capacity_cost
),
capacity_is_instantaneous=False
capacity_is_instantaneous=False,
)
# initialise the minimum cost
if type(minimum_cost) == type(None):
......@@ -83,7 +81,6 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses):
# *************************************************************************
def set_minimum_cost(self, minimum_cost=None):
# minimum arc cost
# if no external minimum cost list was provided, calculate it
if type(minimum_cost) == type(None):
......@@ -91,13 +88,12 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses):
if self.trench.vector_mode:
# multiple options
self.minimum_cost = tuple(
(pipe.sp*length # twin pipes: one twin pipe
if self.trench.twin_pipes else
pipe.sp*length*2) # single pipes: two single pipes
for pipe, length in zip(
self.trench.supply_pipe,
self.length
)
(
pipe.sp * length # twin pipes: one twin pipe
if self.trench.twin_pipes
else pipe.sp * length * 2
) # single pipes: two single pipes
for pipe, length in zip(self.trench.supply_pipe, self.length)
)
else: # only one option
self.minimum_cost = (self.trench.supply_pipe.sp * self.length,)
......@@ -129,43 +125,39 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses):
temperature_surroundings: float or list,
length: float or list = None,
unit_conversion_factor: float = None,
**kwargs):
**kwargs
):
hts = self.trench.heat_transfer_surroundings(
ground_thermal_conductivity=ground_thermal_conductivity,
ground_air_heat_transfer_coefficient=(
ground_air_heat_transfer_coefficient),
ground_air_heat_transfer_coefficient=(ground_air_heat_transfer_coefficient),
time_interval_duration=time_interval_duration,
temperature_surroundings=temperature_surroundings,
length=(
self.length
if type(length) == type(None) else
length
),
length=(self.length if type(length) == type(None) else length),
unit_conversion_factor=(
self.unit_conversion_factor
if type(unit_conversion_factor) == type(None) else
unit_conversion_factor
if type(unit_conversion_factor) == type(None)
else unit_conversion_factor
),
**kwargs)
**kwargs
)
if self.trench.vector_mode:
# multiple options: hts is a vector
if (hasattr(self, "static_loss") and
type(self.static_loss) != type(None)):
if hasattr(self, "static_loss") and type(self.static_loss) != type(None):
# update the static loss dictionary
if type(hts[0]) == list:
# multiple time intervals
self.static_loss.update({
self.static_loss.update(
{
(h, scenario_key, k): hts[h][k]
for h, hts_h in enumerate(hts)
for k, hts_hk in enumerate(hts_h)
})
}
)
else: # not a list: one time interval
self.static_loss.update({
(h, scenario_key, 0): hts[h]
for h, hts_h in enumerate(hts)
})
self.static_loss.update(
{(h, scenario_key, 0): hts[h] for h, hts_h in enumerate(hts)}
)
else:
# no static loss dictionary, create it
if type(hts[0]) == list:
......@@ -177,40 +169,34 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses):
}
else: # not a list: one time interval
self.static_loss = {
(h, scenario_key, 0): hts[h]
for h, hts_h in enumerate(hts)
(h, scenario_key, 0): hts[h] for h, hts_h in enumerate(hts)
}
else:
# one option: hts might be a number
if (hasattr(self, "static_loss") and
type(self.static_loss) != type(None)):
if hasattr(self, "static_loss") and type(self.static_loss) != type(None):
# update the static loss dictionary
if not isinstance(hts, Real):
# multiple time intervals
self.static_loss.update({
(0, scenario_key, k): hts[k]
for k, hts_k in enumerate(hts)
})
self.static_loss.update(
{(0, scenario_key, k): hts[k] for k, hts_k in enumerate(hts)}
)
else: # not a list: one time interval
self.static_loss.update({
(0, scenario_key, 0): hts
})
self.static_loss.update({(0, scenario_key, 0): hts})
else:
# no static loss dictionary, create it
if not isinstance(hts, Real):
# multiple time intervals
self.static_loss = {
(0, scenario_key, k): hts_k
for k, hts_k in enumerate(hts)
(0, scenario_key, k): hts_k for k, hts_k in enumerate(hts)
}
else: # not a list: one time interval
self.static_loss = {
(0, scenario_key, 0): hts
}
self.static_loss = {(0, scenario_key, 0): hts}
# *****************************************************************************
# *****************************************************************************
class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions):
"A class for defining investments in district heating trenches."
......@@ -226,7 +212,6 @@ class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions):
unit_conversion_factor: float = 1.0,
**kwargs
):
# store the unit conversion
self.unit_conversion_factor = unit_conversion_factor
# keep the trench object
......@@ -234,13 +219,11 @@ class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions):
# keep the trench length
self.length = (
[length for i in range(trench.number_options())]
if trench.vector_mode else
length
if trench.vector_mode
else length
)
# determine the rated heat capacity
rhc = trench.rated_heat_capacity(
unit_conversion_factor=unit_conversion_factor
)
rhc = trench.rated_heat_capacity(unit_conversion_factor=unit_conversion_factor)
# initialise the object using the mother class
ArcInvestments.__init__(
self,
......@@ -252,11 +235,11 @@ class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions):
capacity=[rhc] if isinstance(rhc, Real) else rhc,
specific_capacity_cost=(
0
if type(specific_capacity_cost) == type(None) else
specific_capacity_cost
if type(specific_capacity_cost) == type(None)
else specific_capacity_cost
),
capacity_is_instantaneous=False,
validate=False
validate=False,
)
# # *************************************************************************
......@@ -388,9 +371,11 @@ class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions):
# (0, scenario_key, 0): hts
# }
# *****************************************************************************
# *****************************************************************************
class ExistingPipeTrench(PipeTrenchOptions):
"A class for existing pipe trenches."
......@@ -398,10 +383,12 @@ class ExistingPipeTrench(PipeTrenchOptions):
# initialise
PipeTrenchOptions.__init__(
self,
minimum_cost=[0 for i in range(kwargs['trench'].number_options())],
**kwargs)
minimum_cost=[0 for i in range(kwargs["trench"].number_options())],
**kwargs
)
# define the option that already exists
self.options_selected[option_selected] = True
# *****************************************************************************
# *****************************************************************************
......@@ -19,8 +19,8 @@ from numbers import Real
# *****************************************************************************
# *****************************************************************************
def cost_pipes(trench: SupplyReturnPipeTrench,
length: float or tuple) -> tuple:
def cost_pipes(trench: SupplyReturnPipeTrench, length: float or tuple) -> tuple:
"""
Returns the costs of each trench option for a given trench length.
......@@ -45,30 +45,33 @@ def cost_pipes(trench: SupplyReturnPipeTrench,
# use the specific pipe cost that features in the database
if trench.vector_mode:
# multiple options
if (type(length) == tuple and
len(length) == trench.number_options()):
if type(length) == tuple and len(length) == trench.number_options():
# multiple trench lengths
return tuple(
(pipe.sp*length # twin pipes: one twin pipe
if trench.twin_pipes else
pipe.sp*length*2) # single pipes: two single pipes
(
pipe.sp * length # twin pipes: one twin pipe
if trench.twin_pipes
else pipe.sp * length * 2
) # single pipes: two single pipes
for pipe, length in zip(trench.supply_pipe, length)
)
elif isinstance(length, Real):
# one trench length
return tuple(
(pipe.sp*length # twin pipes: one twin pipe
if trench.twin_pipes else
pipe.sp*length*2) # single pipes: two single pipes
(
pipe.sp * length # twin pipes: one twin pipe
if trench.twin_pipes
else pipe.sp * length * 2
) # single pipes: two single pipes
for pipe in trench.supply_pipe
)
else:
raise ValueError('Unrecognised input combination.')
elif (not trench.vector_mode and isinstance(length, Real)):
raise ValueError("Unrecognised input combination.")
elif not trench.vector_mode and isinstance(length, Real):
# only one option
return (trench.supply_pipe.sp * length,)
else: # only one option
raise ValueError('Unrecognised input combination.')
raise ValueError("Unrecognised input combination.")
# # keep the trench length
# self.length = (
......@@ -77,12 +80,13 @@ def cost_pipes(trench: SupplyReturnPipeTrench,
# length
# )
# *****************************************************************************
# *****************************************************************************
def summarise_network_by_pipe_technology(
network: Network,
print_output: bool = False
network: Network, print_output: bool = False
) -> dict:
"A method to summarise a network by pipe technology."
......@@ -100,8 +104,7 @@ def summarise_network_by_pipe_technology(
for arc_key in network.edges(keys=True):
# check if it is a PipeTrench object
if not isinstance(
network.edges[arc_key][Network.KEY_ARC_TECH],
PipeTrenchOptions
network.edges[arc_key][Network.KEY_ARC_TECH], PipeTrenchOptions
):
# if not, skip arc
continue
......@@ -116,13 +119,13 @@ def summarise_network_by_pipe_technology(
# get the length of the arc
arc_length = (
network.edges[arc_key][Network.KEY_ARC_TECH].length[h]
if type(network.edges[arc_key][
Network.KEY_ARC_TECH].length) == list else
network.edges[arc_key][Network.KEY_ARC_TECH].length
if type(network.edges[arc_key][Network.KEY_ARC_TECH].length) == list
else network.edges[arc_key][Network.KEY_ARC_TECH].length
)
# identify the option
tech_option_label = network.edges[arc_key][
Network.KEY_ARC_TECH].trench.printable_description(h)
Network.KEY_ARC_TECH
].trench.printable_description(h)
# if the arc technology has been previously selected...
if tech_option_label in length_dict:
# ...increment the total length
......@@ -135,33 +138,35 @@ def summarise_network_by_pipe_technology(
# *************************************************************************
if print_output:
print('printing the arc technologies selected by pipe size...')
print("printing the arc technologies selected by pipe size...")
for key, value in sorted(
(tech, length)
for tech, length in length_dict.items()
(tech, length) for tech, length in length_dict.items()
):
print(str(key)+': '+str(value))
print('total: '+str(sum(length_dict.values())))
print(str(key) + ": " + str(value))
print("total: " + str(sum(length_dict.values())))
return length_dict
# *************************************************************************
# *************************************************************************
# *****************************************************************************
# *****************************************************************************
def plot_network_layout(network: Network,
def plot_network_layout(
network: Network,
include_basemap: bool = False,
figure_size: tuple = (25, 25),
min_linewidth: float = 1.0,
max_linewidth: float = 3.0,
legend_fontsize: float = 20.0,
basemap_zoom_level: float = 15,
legend_location: str = 'lower left',
legend_location: str = "lower left",
legend_with_brand_model: bool = False,
legend_transparency: float = None):
legend_transparency: float = None,
):
# convert graph object to GDF
_, my_gdf_arcs = ox.graph_to_gdfs(network)
......@@ -178,8 +183,7 @@ def plot_network_layout(network: Network,
for arc_key in my_gdf.index:
# check if it is a PipeTrenchOptions object
if not isinstance(
network.edges[arc_key][Network.KEY_ARC_TECH],
PipeTrenchOptions
network.edges[arc_key][Network.KEY_ARC_TECH], PipeTrenchOptions
):
# if not, skip arc
continue
......@@ -188,10 +192,12 @@ def plot_network_layout(network: Network,
try:
selected_option = (
my_gdf[Network.KEY_ARC_TECH].loc[
arc_key].trench.printable_description(
my_gdf[Network.KEY_ARC_TECH].loc[
arc_key].options_selected.index(True)
my_gdf[Network.KEY_ARC_TECH]
.loc[arc_key]
.trench.printable_description(
my_gdf[Network.KEY_ARC_TECH]
.loc[arc_key]
.options_selected.index(True)
)
)
except ValueError:
......@@ -209,15 +215,17 @@ def plot_network_layout(network: Network,
(int(printable_description[2:]), printable_description)
for printable_description in arc_tech_summary_dict.keys()
)
(list_sorted_dn,
list_sorted_descriptions) = list(map(list,zip(*list_sorted)))
(list_sorted_dn, list_sorted_descriptions) = list(map(list, zip(*list_sorted)))
list_arc_widths = [
min_linewidth+
(max_linewidth-min_linewidth)*
iteration/(len(list_sorted_dn)-1)
list_arc_widths = (
[
min_linewidth
+ (max_linewidth - min_linewidth) * iteration / (len(list_sorted_dn) - 1)
for iteration, _ in enumerate(list_sorted_dn)
] if len(list_sorted_dn) != 1 else [(max_linewidth+min_linewidth)/2]
]
if len(list_sorted_dn) != 1
else [(max_linewidth + min_linewidth) / 2]
)
# *************************************************************************
# *************************************************************************
......@@ -226,35 +234,29 @@ def plot_network_layout(network: Network,
fig.set_size_inches(*figure_size)
for description, arc_width in zip(
list_sorted_descriptions,
list_arc_widths
):
for description, arc_width in zip(list_sorted_descriptions, list_arc_widths):
# prepare plot
my_gdf.loc[arc_tech_summary_dict[description]].plot(
edgecolor='k',
legend=True,
linewidth=arc_width,
ax=ax)
edgecolor="k", legend=True, linewidth=arc_width, ax=ax
)
# adjust legend labels
ax.legend(list_sorted_descriptions,
ax.legend(
list_sorted_descriptions,
fontsize=legend_fontsize,
loc=legend_location,
framealpha=(
legend_transparency
if type(legend_transparency) != type(None) else None
)
legend_transparency if type(legend_transparency) != type(None) else None
),
)
# add base map
if include_basemap:
cx.add_basemap(ax,
cx.add_basemap(
ax,
zoom=basemap_zoom_level,
source=cx.providers.OpenStreetMap.Mapnik,
# crs=gdf_map.crs,
......@@ -263,24 +265,25 @@ def plot_network_layout(network: Network,
# *************************************************************************
# *************************************************************************
# *****************************************************************************
# *****************************************************************************
def plot_heating_demand(
losses: list,
end_use_demand: list,
labels: list,
ylabel: str = 'Heating demand [MWh]',
title: str = 'Heat demand by month'
ylabel: str = "Heating demand [MWh]",
title: str = "Heat demand by month",
):
energy_totals = {
'Losses (optimised)': np.array(losses),
'End use (estimated)': np.array(end_use_demand),
"Losses (optimised)": np.array(losses),
"End use (estimated)": np.array(end_use_demand),
}
colors = {
'Losses (optimised)': 'tab:orange',
'End use (estimated)': 'tab:blue',
"Losses (optimised)": "tab:orange",
"End use (estimated)": "tab:blue",
}
# width = 0.8 # the width of the bars: can also be len(x) sequence
......@@ -293,18 +296,17 @@ def plot_heating_demand(
figure_size = (8, 4)
fig.set_size_inches(figure_size[0], figure_size[1])
for energy_category, energy_total in energy_totals.items():
p = ax.bar(
labels,
energy_total,
label=energy_category,
bottom=bottom,
color=colors[energy_category],
zorder=zorder_bars
zorder=zorder_bars,
)
bottom += energy_total
ax.bar_label(p, fmt='{:,.0f}', label_type='center')
ax.bar_label(p, fmt="{:,.0f}", label_type="center")
# ax.bar_label(p, fmt='{:,.0f}')
ax.grid(zorder=zorder_grid) # zorder=0 to make the grid
......@@ -313,5 +315,6 @@ def plot_heating_demand(
plt.show()
# *****************************************************************************
# *****************************************************************************
# -*- coding: utf-8 -*-
......@@ -10,6 +10,7 @@ from statistics import mean
# TODO: enable swapping the polarity
class Investment:
"""This class is meant to enable analysis of specific investments."""
......@@ -18,11 +19,13 @@ class Investment:
# TODO: consider using dicts to make things more intuitive, time-wise
def __init__(self,
def __init__(
self,
discount_rates: list,
net_cash_flows: list = None,
discount_rate: float = None,
analysis_period_span: int = None):
analysis_period_span: int = None,
):
"""
Create an object for investment analysis using typical information.
......@@ -41,54 +44,43 @@ class Investment:
# validate the inputs
if type(discount_rates) != type(None):
# discount_rates is not None:
if type(discount_rates) != tuple:
raise TypeError(
'The discount rates must be provided as a tuple.')
raise TypeError("The discount rates must be provided as a tuple.")
self.discount_rates = tuple(discount_rates)
self.analysis_period_span = len(self.discount_rates)
if self.analysis_period_span <= 0:
raise ValueError(
'The duration of the period under analysis must be '+
'positive.'
"The duration of the period under analysis must be " + "positive."
)
else:
# discount_rates is None:
# discount rate must be positive real under 1
# analysis_period_span must be an int
if type(discount_rate) != float:
raise TypeError(
'The discount rate must be provided as a float.')
raise TypeError("The discount rate must be provided as a float.")
if discount_rate <= 0 or discount_rate >= 1:
raise ValueError(
'The discount rate must be in the open interval between 0'+
' and 1.'
"The discount rate must be in the open interval between 0"
+ " and 1."
)
if type(analysis_period_span) != int:
raise TypeError(
'The duration of the period under consideration must be '+
'provided as an integer.')
"The duration of the period under consideration must be "
+ "provided as an integer."
)
if analysis_period_span <= 0:
raise ValueError(
'The duration of the period under analysis must be '+
'positive.'
"The duration of the period under analysis must be " + "positive."
)
self.analysis_period_span = analysis_period_span
......@@ -100,27 +92,18 @@ class Investment:
# check the net cash flows
if type(net_cash_flows) != type(None):
if type(net_cash_flows) != list:
raise TypeError(
'The net cash flows must be provided as a list.')
raise TypeError("The net cash flows must be provided as a list.")
if len(net_cash_flows) != self.analysis_period_span + 1:
raise ValueError(
'The inputs are consistent in terms of length.'
)
raise ValueError("The inputs are consistent in terms of length.")
self.net_cash_flows = list(net_cash_flows)
else:
# net_cash_flows is None: initialise it as a list of zeros
self.net_cash_flows = list(
0 for i in range(self.analysis_period_span+1)
)
self.net_cash_flows = list(0 for i in range(self.analysis_period_span + 1))
# discount factors
......@@ -132,15 +115,15 @@ class Investment:
# *************************************************************************
# *************************************************************************
def add_investment(self,
def add_investment(
self,
investment: float,
investment_period: int,
investment_longevity: int,
commissioning_delay_after_investment: int = 0,
salvage_value_method: str = 'annuity'):
if salvage_value_method == 'annuity':
salvage_value_method: str = "annuity",
):
if salvage_value_method == "annuity":
mean_discount_rate = mean(self.discount_rates)
residual_value = salvage_value_annuity(
......@@ -148,14 +131,13 @@ class Investment:
investment_longevity=investment_longevity,
investment_period=investment_period,
discount_rate=mean_discount_rate,
analysis_period_span=self.analysis_period_span
analysis_period_span=self.analysis_period_span,
)
self.net_cash_flows[investment_period] += investment
self.net_cash_flows[self.analysis_period_span] += -residual_value
else:
residual_value = salvage_value_linear_depreciation(
investment=investment,
investment_period=investment_period,
......@@ -163,7 +145,7 @@ class Investment:
analysis_period_span=self.analysis_period_span,
commissioning_delay_after_investment=(
commissioning_delay_after_investment
)
),
)
self.net_cash_flows[investment_period] += investment
......@@ -172,30 +154,24 @@ class Investment:
# *************************************************************************
# *************************************************************************
def add_operational_cash_flows(self,
cash_flow: float or int,
start_period: int,
longevity: int = None):
def add_operational_cash_flows(
self, cash_flow: float or int, start_period: int, longevity: int = None
):
"""Adds a sequence of cash flows to the analysis."""
if type(longevity) == type(None):
# until the planning horizon
for i in range(self.analysis_period_span - start_period + 1):
# add operational cash flows
self.net_cash_flows[i + start_period] += cash_flow
else:
# limited longevity
for i in range(longevity):
if i + start_period >= self.analysis_period_span + 1:
break
# add operational cash flows
......@@ -213,12 +189,14 @@ class Investment:
# *************************************************************************
# *************************************************************************
# *****************************************************************************
# *****************************************************************************
def npv(discount_rates: list,
net_cash_flows: list,
return_discount_factors: bool = False) -> float or tuple:
def npv(
discount_rates: list, net_cash_flows: list, return_discount_factors: bool = False
) -> float or tuple:
"""
Calculates the net present value using the information provided.
......@@ -248,33 +226,32 @@ def npv(discount_rates: list,
# check sizes
if len(discount_rates) != len(net_cash_flows) - 1:
# the inputs do not match, return None
raise ValueError('The inputs are inconsistent.')
raise ValueError("The inputs are inconsistent.")
discount_factors = [
discount_factor(discount_rates[:t])
for t in range(len(discount_rates)+1)
discount_factor(discount_rates[:t]) for t in range(len(discount_rates) + 1)
]
if return_discount_factors:
return sum(
ncf_t*df_t
for (ncf_t, df_t) in zip(net_cash_flows, discount_factors)
), discount_factors
return (
sum(
ncf_t * df_t for (ncf_t, df_t) in zip(net_cash_flows, discount_factors)
),
discount_factors,
)
else:
return sum(
ncf_t*df_t
for (ncf_t, df_t) in zip(net_cash_flows, discount_factors)
ncf_t * df_t for (ncf_t, df_t) in zip(net_cash_flows, discount_factors)
)
# *****************************************************************************
# *****************************************************************************
def discount_factor(discount_rates: list) -> float:
"""
Return the discount factor consistent with the discount rates provided.
......@@ -298,15 +275,18 @@ def discount_factor(discount_rates: list) -> float:
"""
return prod([1 / (1 + i) for i in discount_rates])
# *****************************************************************************
# *****************************************************************************
def salvage_value_linear_depreciation(
investment: int or float,
investment_period: int,
investment_longevity: int,
analysis_period_span: int,
commissioning_delay_after_investment: int = 1) -> float:
commissioning_delay_after_investment: int = 1,
) -> float:
"""
Determine an asset\'s salvage value by the end of an analysis period.
......@@ -341,66 +321,78 @@ def salvage_value_linear_depreciation(
"""
if investment_period >= analysis_period_span + 1:
raise ValueError(
'The investment has to be made within the period being analysed.'
"The investment has to be made within the period being analysed."
)
# calculate the salvage value
return (
investment_longevity+
investment_period+
commissioning_delay_after_investment-1-
analysis_period_span
)*investment/investment_longevity
(
investment_longevity
+ investment_period
+ commissioning_delay_after_investment
- 1
- analysis_period_span
)
* investment
/ investment_longevity
)
# *****************************************************************************
# *****************************************************************************
def salvage_value_annuity(investment: int or float,
def salvage_value_annuity(
investment: int or float,
discount_rate: float,
investment_longevity: int,
investment_period: int,
analysis_period_span: int) -> float:
analysis_period_span: int,
) -> float:
npv_salvage = present_salvage_value_annuity(
investment=investment,
investment_longevity=investment_longevity,
investment_period=investment_period,
discount_rate=discount_rate,
analysis_period_span=analysis_period_span,
return_annuity=False
return_annuity=False,
)
return npv_salvage / discount_factor(
tuple(discount_rate for i in range(analysis_period_span))
)
# *****************************************************************************
# *****************************************************************************
def annuity(investment: int or float,
investment_longevity: int,
discount_rate: float) -> float:
def annuity(
investment: int or float, investment_longevity: int, discount_rate: float
) -> float:
"Returns the annuity value for a given investment sum and longevity."
return (
investment*
discount_rate/(1-(1+discount_rate)**(
-investment_longevity
))
investment
* discount_rate
/ (1 - (1 + discount_rate) ** (-investment_longevity))
)
# *****************************************************************************
# *****************************************************************************
def present_salvage_value_annuity(investment: int or float,
def present_salvage_value_annuity(
investment: int or float,
investment_longevity: int,
investment_period: int,
discount_rate: float,
analysis_period_span: int,
return_annuity: bool = False) -> float:
return_annuity: bool = False,
) -> float:
"""
Calculates the present value of an asset after a given analysis period.
......@@ -441,25 +433,21 @@ def present_salvage_value_annuity(investment: int or float,
"""
if investment_period >= analysis_period_span + 1:
raise ValueError(
'The investment has to be made within the period being analysed.'
"The investment has to be made within the period being analysed."
)
# the present salvage value requires the lifetime to extend beyond the hor.
if analysis_period_span >= investment_longevity + investment_period:
if return_annuity:
return 0, annuity(
investment=investment,
investment_longevity=investment_longevity,
discount_rate=discount_rate
discount_rate=discount_rate,
)
else:
return 0
# the annuity has to consider the asset longevity and the commission. delay
......@@ -467,35 +455,29 @@ def present_salvage_value_annuity(investment: int or float,
value_annuity = annuity(
investment=investment,
investment_longevity=investment_longevity,
discount_rate=discount_rate
discount_rate=discount_rate,
)
discount_rates = tuple(
discount_rate
for i in range(investment_longevity+investment_period)
discount_rate for i in range(investment_longevity + investment_period)
)
net_cash_flows = list(
value_annuity
for i in range(investment_longevity+investment_period+1)
value_annuity for i in range(investment_longevity + investment_period + 1)
)
for year_index in range(analysis_period_span + 1):
net_cash_flows[year_index] = 0
if return_annuity:
return npv(
discount_rates=discount_rates,
net_cash_flows=net_cash_flows
), value_annuity
return (
npv(discount_rates=discount_rates, net_cash_flows=net_cash_flows),
value_annuity,
)
else:
return npv(discount_rates=discount_rates, net_cash_flows=net_cash_flows)
return npv(
discount_rates=discount_rates,
net_cash_flows=net_cash_flows
)
# *****************************************************************************
# *****************************************************************************
# *****************************************************************************
# *****************************************************************************
from ...problems.esipp.network import Arcs
# *****************************************************************************
# *****************************************************************************
class ArcInvestments(Arcs):
"""A class for defining arcs linked to investments."""
# *************************************************************************
# *************************************************************************
def __init__(self, investments: tuple, **kwargs):
# keep investment data
self.investments = investments
# initialise object
Arcs.__init__(
self,
minimum_cost=tuple([inv.net_present_value() for inv in self.investments]),
# validate=False,
**kwargs
)
# *************************************************************************
# *************************************************************************
def update_minimum_cost(self):
"Updates the minimum costs using the Investment objects."
self.minimum_cost = tuple([inv.net_present_value() for inv in self.investments])
# *************************************************************************
# *************************************************************************
# *****************************************************************************
# *****************************************************************************
\ No newline at end of file
# imports
from math import inf
......@@ -22,6 +21,7 @@ from ..gis import identify as ident
# *****************************************************************************
# *****************************************************************************
def edge_lengths(network: MultiDiGraph, edge_keys: tuple = None) -> dict:
"""
Calculate edge lengths in a OSMnx-formatted MultiDiGraph network object.
......@@ -44,7 +44,7 @@ def edge_lengths(network: MultiDiGraph, edge_keys: tuple = None) -> dict:
"""
# determine if the graph is projected or not
graph_is_projected = is_projected(network.graph['crs'])
graph_is_projected = is_projected(network.graph["crs"])
# check if edge keys were specified
if type(edge_keys) == type(None):
# no particular edge keys were provided: consider all edges (default)
......@@ -64,12 +64,16 @@ def edge_lengths(network: MultiDiGraph, edge_keys: tuple = None) -> dict:
else:
# use (projected) coordinates
start_point = Point(
(network.nodes[edge_key[0]][osm.KEY_OSMNX_X],
network.nodes[edge_key[0]][osm.KEY_OSMNX_Y])
(
network.nodes[edge_key[0]][osm.KEY_OSMNX_X],
network.nodes[edge_key[0]][osm.KEY_OSMNX_Y],
)
)
end_point = Point(
(network.nodes[edge_key[1]][osm.KEY_OSMNX_X],
network.nodes[edge_key[1]][osm.KEY_OSMNX_Y])
(
network.nodes[edge_key[1]][osm.KEY_OSMNX_X],
network.nodes[edge_key[1]][osm.KEY_OSMNX_Y],
)
)
length_dict[edge_key] = start_point.distance(end_point)
......@@ -86,14 +90,16 @@ def edge_lengths(network: MultiDiGraph, edge_keys: tuple = None) -> dict:
lat1=network.nodes[edge_key[0]][osm.KEY_OSMNX_Y],
lon1=network.nodes[edge_key[0]][osm.KEY_OSMNX_X],
lat2=network.nodes[edge_key[1]][osm.KEY_OSMNX_Y],
lon2=network.nodes[edge_key[1]][osm.KEY_OSMNX_X]
lon2=network.nodes[edge_key[1]][osm.KEY_OSMNX_X],
)
# return the dict with lengths of each edge
return length_dict
# *****************************************************************************
# *****************************************************************************
def great_circle_distance_along_path(path: LineString) -> float:
"""
Computes the great circle distance along a given path.
......@@ -121,13 +127,15 @@ def great_circle_distance_along_path(path: LineString) -> float:
lat[:-1], # latitudes of starting points
lon[:-1], # longitudes of starting points
lat[1:], # latitudes of ending points
lon[1:] # longitudes of ending points
lon[1:], # longitudes of ending points
)
)
# *****************************************************************************
# *****************************************************************************
def update_street_count(network: MultiDiGraph):
"""
Updates the street count attributes of nodes in a MultiDiGraph object.
......@@ -145,16 +153,20 @@ def update_street_count(network: MultiDiGraph):
# update street count
street_count_dict = count_streets_per_node(network)
network.add_nodes_from(
((key, {osm.KEY_OSMNX_STREET_COUNT:value})
for key, value in street_count_dict.items())
(
(key, {osm.KEY_OSMNX_STREET_COUNT: value})
for key, value in street_count_dict.items()
)
)
# *****************************************************************************
# *****************************************************************************
def node_path_length(network: MultiDiGraph,
path: list,
return_minimum_length_only: bool = True) -> list or float:
def node_path_length(
network: MultiDiGraph, path: list, return_minimum_length_only: bool = True
) -> list or float:
"""
Returns the length or lengths of a path defined using nodes.
......@@ -200,9 +212,7 @@ def node_path_length(network: MultiDiGraph,
for node_pair in range(path_length - 1):
# get the edges between these two nodes
edge_keys = ident.get_edges_from_a_to_b(
network,
path[node_pair],
path[node_pair+1]
network, path[node_pair], path[node_pair + 1]
)
number_edge_keys = len(edge_keys)
if number_edge_keys == 1:
......@@ -225,15 +235,12 @@ def node_path_length(network: MultiDiGraph,
# add the new edge
list_of_edge_key_paths[
path_index + edge_key_index * number_paths
].append(
edge_keys[edge_key_index]
)
].append(edge_keys[edge_key_index])
# *************************************************************************
path_lenths = [
sum(network.edges[edge_key][osm.KEY_OSMNX_LENGTH]
for edge_key in edge_key_path)
sum(network.edges[edge_key][osm.KEY_OSMNX_LENGTH] for edge_key in edge_key_path)
for edge_key_path in list_of_edge_key_paths
]
if return_minimum_length_only:
......@@ -243,12 +250,12 @@ def node_path_length(network: MultiDiGraph,
# *************************************************************************
# *****************************************************************************
# *****************************************************************************
def edge_path_length(network: MultiDiGraph,
path: list,
**kwargs) -> float:
def edge_path_length(network: MultiDiGraph, path: list, **kwargs) -> float:
"""
Returns the total length of a path defined using edges.
......@@ -274,19 +281,19 @@ def edge_path_length(network: MultiDiGraph,
if path_length == 0:
return inf
if ident.is_edge_path(network, path, **kwargs):
return sum(
network.edges[edge_key][osm.KEY_OSMNX_LENGTH] for edge_key in path
)
return sum(network.edges[edge_key][osm.KEY_OSMNX_LENGTH] for edge_key in path)
else:
# no path provided
return inf
# *****************************************************************************
# *****************************************************************************
def count_ocurrences(gdf: GeoDataFrame,
column: str,
column_entries: list = None) -> dict:
def count_ocurrences(
gdf: GeoDataFrame, column: str, column_entries: list = None
) -> dict:
"""
Counts the number of occurrences per entry in a DataFrame object's column.
......@@ -340,5 +347,6 @@ def count_ocurrences(gdf: GeoDataFrame,
# return statement
return count_dict
# *****************************************************************************
# *****************************************************************************
This diff is collapsed.
This diff is collapsed.
......@@ -5,14 +5,14 @@
# general
KEY_OSM_CITY = 'addr:city'
KEY_OSM_COUNTRY = 'addr:country'
KEY_OSM_HOUSE_NUMBER = 'addr:housenumber'
KEY_OSM_MUNICIPALITY = 'addr:municipality'
KEY_OSM_PLACE = 'addr:place'
KEY_OSM_POSTCODE = 'addr:postcode'
KEY_OSM_STREET = 'addr:street'
KEY_OSM_SOURCE = 'source'
KEY_OSM_CITY = "addr:city"
KEY_OSM_COUNTRY = "addr:country"
KEY_OSM_HOUSE_NUMBER = "addr:housenumber"
KEY_OSM_MUNICIPALITY = "addr:municipality"
KEY_OSM_PLACE = "addr:place"
KEY_OSM_POSTCODE = "addr:postcode"
KEY_OSM_STREET = "addr:street"
KEY_OSM_SOURCE = "source"
KEYS_OSM = [
KEY_OSM_CITY,
......@@ -22,37 +22,35 @@ KEYS_OSM = [
KEY_OSM_PLACE,
KEY_OSM_POSTCODE,
KEY_OSM_STREET,
KEY_OSM_SOURCE
KEY_OSM_SOURCE,
]
# country specific
KEY_COUNTRY_DK = 'dk'
KEY_COUNTRY_DK = "dk"
KEY_OSM_DK_BUILDING_ENTRANCE_ID = 'osak:identifier'
KEY_OSM_DK_BUILDING_ENTRANCE_ID = "osak:identifier"
KEY_OSM_BUILDING_ENTRANCE_ID = {
KEY_COUNTRY_DK: KEY_OSM_DK_BUILDING_ENTRANCE_ID
}
KEY_OSM_BUILDING_ENTRANCE_ID = {KEY_COUNTRY_DK: KEY_OSM_DK_BUILDING_ENTRANCE_ID}
# *****************************************************************************
# osmnx
KEY_OSMNX_OSMID = 'osmid'
KEY_OSMNX_ELEMENT_TYPE = 'element_type'
KEY_OSMNX_OSMID = "osmid"
KEY_OSMNX_ELEMENT_TYPE = "element_type"
KEY_OSMNX_NAME = 'name'
KEY_OSMNX_GEOMETRY = 'geometry'
KEY_OSMNX_REVERSED = 'reversed'
KEY_OSMNX_LENGTH = 'length'
KEY_OSMNX_ONEWAY = 'oneway'
KEY_OSMNX_X = 'x'
KEY_OSMNX_Y = 'y'
KEY_OSMNX_LON = 'lon'
KEY_OSMNX_LAT = 'lat'
KEY_OSMNX_STREET_COUNT = 'street_count'
KEY_OSMNX_NAME = "name"
KEY_OSMNX_GEOMETRY = "geometry"
KEY_OSMNX_REVERSED = "reversed"
KEY_OSMNX_LENGTH = "length"
KEY_OSMNX_ONEWAY = "oneway"
KEY_OSMNX_X = "x"
KEY_OSMNX_Y = "y"
KEY_OSMNX_LON = "lon"
KEY_OSMNX_LAT = "lat"
KEY_OSMNX_STREET_COUNT = "street_count"
KEYS_OSMNX = [
KEY_OSMNX_OSMID, # one half of multi-index for geodataframes from osmnx
......@@ -66,7 +64,7 @@ KEYS_OSMNX = [
KEY_OSMNX_Y,
KEY_OSMNX_LON,
KEY_OSMNX_LAT,
KEY_OSMNX_STREET_COUNT
KEY_OSMNX_STREET_COUNT,
]
KEYS_OSMNX_NODES = {
......@@ -77,28 +75,24 @@ KEYS_OSMNX_NODES = {
KEY_OSMNX_Y,
KEY_OSMNX_LON,
KEY_OSMNX_LAT,
KEY_OSMNX_STREET_COUNT
KEY_OSMNX_STREET_COUNT,
}
KEYS_OSMNX_NODES_ESSENTIAL = {
KEY_OSMNX_OSMID,
KEY_OSMNX_NAME,
KEY_OSMNX_STREET_COUNT
}
KEYS_OSMNX_NODES_ESSENTIAL = {KEY_OSMNX_OSMID, KEY_OSMNX_NAME, KEY_OSMNX_STREET_COUNT}
KEYS_OSMNX_EDGES = {
KEY_OSMNX_OSMID,
KEY_OSMNX_LENGTH,
KEY_OSMNX_ONEWAY,
KEY_OSMNX_GEOMETRY,
KEY_OSMNX_REVERSED
KEY_OSMNX_REVERSED,
}
KEYS_OSMNX_EDGES_ESSENTIAL = {
KEY_OSMNX_OSMID,
KEY_OSMNX_LENGTH,
KEY_OSMNX_ONEWAY,
KEY_OSMNX_REVERSED
KEY_OSMNX_REVERSED,
}
# *****************************************************************************
This diff is collapsed.
# -*- coding: utf-8 -*-