Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • master
1 result

Target

Select target project
  • pmag/topupopt
1 result
Select Git revision
  • master
1 result
Show changes

Commits on Source 14

Showing
with 2910 additions and 2983 deletions
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
This diff is collapsed.
......@@ -2,13 +2,9 @@
# *****************************************************************************
import numpy as np
from geopandas import GeoDataFrame
from ...misc.utils import discrete_sinusoid_matching_integral
from ...misc.utils import create_profile_using_time_weighted_state
from .bbr import label_bbr_entrance_id, label_bbr_housing_area
# *****************************************************************************
......@@ -16,10 +12,7 @@ from .bbr import label_bbr_entrance_id, label_bbr_housing_area
# labels
selected_bbr_adgang_labels = [
"Opgang_id",
"AdgAdr_id",
"Bygning_id"]
selected_bbr_adgang_labels = ["Opgang_id", "AdgAdr_id", "Bygning_id"]
selected_bbr_building_point_labels = [
"KoorOest",
......@@ -51,12 +44,12 @@ selected_bbr_building_labels = [
"VARMEINSTAL_KODE",
"OPVARMNING_KODE",
"VARME_SUPPL_KODE",
"BygPkt_id"]
"BygPkt_id",
]
# label under which building entrance ids can be found in OSM
label_osm_entrance_id = 'osak:identifier'
label_osm_entrance_id = "osak:identifier"
# *****************************************************************************
# *****************************************************************************
......@@ -67,73 +60,52 @@ def heat_demand_dict_by_building_entrance(
number_intervals: int,
time_interval_durations: list,
bdg_specific_demand: dict,
bdg_min_to_max_ratio: dict,
bdg_ratio_min_max: dict,
bdg_demand_phase_shift: dict = None,
key_osm_entr_id: str = label_osm_entrance_id,
key_bbr_entr_id: str = label_bbr_entrance_id,
avg_state: list = None,
state_correlates_with_output: bool = False
state_correlates_with_output: bool = False,
) -> dict:
# initialise dict for each building entrance
demand_dict = {}
# for each building entrance
for osm_index in gdf_osm.index:
# initialise dict for each building consumption point
heat_demand_profiles = []
# find the indexes for each building leading to the curr. cons. point
building_indexes = (
gdf_buildings[
gdf_buildings[key_bbr_entr_id] ==
gdf_osm.loc[osm_index][key_osm_entr_id]
building_indexes = gdf_buildings[
gdf_buildings[key_bbr_entr_id] == gdf_osm.loc[osm_index][key_osm_entr_id]
].index
)
# for each building
for building_index in building_indexes:
# get relevant data
# base_load_avg_ratio = 0.3
# specific_demand = 107 # kWh/m2/year
area = gdf_buildings.loc[building_index][label_bbr_housing_area]
# estimate its demand
if type(avg_state) == type(None):
# ignore states
heat_demand_profiles.append(
np.array(
discrete_sinusoid_matching_integral(
bdg_specific_demand[building_index] * area,
time_interval_durations=time_interval_durations,
min_to_max_ratio=bdg_min_to_max_ratio[building_index],
min_to_max_ratio=bdg_ratio_min_max[building_index],
phase_shift_radians=(
bdg_demand_phase_shift[building_index]
# bdg_demand_phase_shift_amplitude*np.random.random()
# if (type(bdg_demand_phase_shift_amplitude) ==
# type(None)) else None
)
),
)
)
)
else:
# states matter
heat_demand_profiles.append(
np.array(
create_profile_using_time_weighted_state(
......@@ -142,69 +114,48 @@ def heat_demand_dict_by_building_entrance(
),
avg_state=avg_state,
time_interval_durations=time_interval_durations,
min_to_max_ratio=bdg_min_to_max_ratio[building_index],
state_correlates_with_output=state_correlates_with_output
min_to_max_ratio=bdg_ratio_min_max[building_index],
state_correlates_with_output=state_correlates_with_output,
)
)
)
#******************************************************************
# add the profiles, time step by time step
if len(heat_demand_profiles) == 0:
final_profile = []
else:
final_profile = sum(profile
for profile in heat_demand_profiles)
#**********************************************************************
final_profile = sum(profile for profile in heat_demand_profiles)
# store the demand profile
demand_dict[osm_index] = final_profile
#**********************************************************************
# return
return demand_dict
# *****************************************************************************
# *****************************************************************************
def total_heating_area(
gdf_osm: GeoDataFrame,
gdf_buildings: GeoDataFrame,
key_osm_entr_id: str = label_osm_entrance_id,
key_bbr_entr_id: str = label_bbr_entrance_id
key_bbr_entr_id: str = label_bbr_entrance_id,
) -> float:
area = 0
for osm_index in gdf_osm.index:
# find the indexes for each building leading to the curr. cons. point
building_indexes = (
gdf_buildings[
gdf_buildings[label_bbr_entrance_id] ==
gdf_osm.loc[osm_index][label_osm_entrance_id]
building_indexes = gdf_buildings[
gdf_buildings[label_bbr_entrance_id]
== gdf_osm.loc[osm_index][label_osm_entrance_id]
].index
)
# for each building
for building_index in building_indexes:
# get relevant data
area += gdf_buildings.loc[building_index][label_bbr_housing_area]
return area
# *****************************************************************************
# *****************************************************************************
\ No newline at end of file
# -*- coding: utf-8 -*-
This diff is collapsed.
......@@ -13,14 +13,82 @@ import numpy as np
from ...problems.esipp.network import Network
from .network import PipeTrenchOptions
from topupheat.pipes.trenches import SupplyReturnPipeTrench
from numbers import Real
# *****************************************************************************
# *****************************************************************************
def summarise_network_by_arc_technology(
network: Network,
print_output: bool = False
def cost_pipes(trench: SupplyReturnPipeTrench, length: float or tuple) -> tuple:
"""
Returns the costs of each trench option for a given trench length.
Parameters
----------
trench : SupplyReturnPipeTrench
The object describing the trench options.
length : float or tuple
The trench length in meters.
Raises
------
ValueError
This error is raised if unrecognised inputs are inserted.
Returns
-------
tuple
A tuple of the pipe costs for each option.
"""
# use the specific pipe cost that features in the database
if trench.vector_mode:
# multiple options
if type(length) == tuple and len(length) == trench.number_options():
# multiple trench lengths
return tuple(
(
pipe.sp * length # twin pipes: one twin pipe
if trench.twin_pipes
else pipe.sp * length * 2
) # single pipes: two single pipes
for pipe, length in zip(trench.supply_pipe, length)
)
elif isinstance(length, Real):
# one trench length
return tuple(
(
pipe.sp * length # twin pipes: one twin pipe
if trench.twin_pipes
else pipe.sp * length * 2
) # single pipes: two single pipes
for pipe in trench.supply_pipe
)
else:
raise ValueError("Unrecognised input combination.")
elif not trench.vector_mode and isinstance(length, Real):
# only one option
return (trench.supply_pipe.sp * length,)
else: # only one option
raise ValueError("Unrecognised input combination.")
# # keep the trench length
# self.length = (
# [length for i in range(trench.number_options())]
# if trench.vector_mode else
# length
# )
# *****************************************************************************
# *****************************************************************************
def summarise_network_by_pipe_technology(
network: Network, print_output: bool = False
) -> dict:
"A method to summarise a network by pipe technology."
# *************************************************************************
# *************************************************************************
......@@ -36,8 +104,7 @@ def summarise_network_by_arc_technology(
for arc_key in network.edges(keys=True):
# check if it is a PipeTrench object
if not isinstance(
network.edges[arc_key][Network.KEY_ARC_TECH],
PipeTrenchOptions
network.edges[arc_key][Network.KEY_ARC_TECH], PipeTrenchOptions
):
# if not, skip arc
continue
......@@ -52,13 +119,13 @@ def summarise_network_by_arc_technology(
# get the length of the arc
arc_length = (
network.edges[arc_key][Network.KEY_ARC_TECH].length[h]
if type(network.edges[arc_key][
Network.KEY_ARC_TECH].length) == list else
network.edges[arc_key][Network.KEY_ARC_TECH].length
if type(network.edges[arc_key][Network.KEY_ARC_TECH].length) == list
else network.edges[arc_key][Network.KEY_ARC_TECH].length
)
# identify the option
tech_option_label = network.edges[arc_key][
Network.KEY_ARC_TECH].trench.printable_description(h)
Network.KEY_ARC_TECH
].trench.printable_description(h)
# if the arc technology has been previously selected...
if tech_option_label in length_dict:
# ...increment the total length
......@@ -71,33 +138,35 @@ def summarise_network_by_arc_technology(
# *************************************************************************
if print_output:
print('printing the arc technologies selected by pipe size...')
print("printing the arc technologies selected by pipe size...")
for key, value in sorted(
(tech, length)
for tech, length in length_dict.items()
(tech, length) for tech, length in length_dict.items()
):
print(str(key)+': '+str(value))
print('total: '+str(sum(length_dict.values())))
print(str(key) + ": " + str(value))
print("total: " + str(sum(length_dict.values())))
return length_dict
# *************************************************************************
# *************************************************************************
# *****************************************************************************
# *****************************************************************************
def plot_network_layout(network: Network,
def plot_network_layout(
network: Network,
include_basemap: bool = False,
figure_size: tuple = (25, 25),
min_linewidth: float = 1.0,
max_linewidth: float = 3.0,
legend_fontsize: float = 20.0,
basemap_zoom_level: float = 15,
legend_location: str = 'lower left',
legend_location: str = "lower left",
legend_with_brand_model: bool = False,
legend_transparency: float = None):
legend_transparency: float = None,
):
# convert graph object to GDF
_, my_gdf_arcs = ox.graph_to_gdfs(network)
......@@ -114,8 +183,7 @@ def plot_network_layout(network: Network,
for arc_key in my_gdf.index:
# check if it is a PipeTrenchOptions object
if not isinstance(
network.edges[arc_key][Network.KEY_ARC_TECH],
PipeTrenchOptions
network.edges[arc_key][Network.KEY_ARC_TECH], PipeTrenchOptions
):
# if not, skip arc
continue
......@@ -124,10 +192,12 @@ def plot_network_layout(network: Network,
try:
selected_option = (
my_gdf[Network.KEY_ARC_TECH].loc[
arc_key].trench.printable_description(
my_gdf[Network.KEY_ARC_TECH].loc[
arc_key].options_selected.index(True)
my_gdf[Network.KEY_ARC_TECH]
.loc[arc_key]
.trench.printable_description(
my_gdf[Network.KEY_ARC_TECH]
.loc[arc_key]
.options_selected.index(True)
)
)
except ValueError:
......@@ -145,15 +215,17 @@ def plot_network_layout(network: Network,
(int(printable_description[2:]), printable_description)
for printable_description in arc_tech_summary_dict.keys()
)
(list_sorted_dn,
list_sorted_descriptions) = list(map(list,zip(*list_sorted)))
(list_sorted_dn, list_sorted_descriptions) = list(map(list, zip(*list_sorted)))
list_arc_widths = [
min_linewidth+
(max_linewidth-min_linewidth)*
iteration/(len(list_sorted_dn)-1)
list_arc_widths = (
[
min_linewidth
+ (max_linewidth - min_linewidth) * iteration / (len(list_sorted_dn) - 1)
for iteration, _ in enumerate(list_sorted_dn)
] if len(list_sorted_dn) == 1 else [(max_linewidth+min_linewidth)/2]
]
if len(list_sorted_dn) != 1
else [(max_linewidth + min_linewidth) / 2]
)
# *************************************************************************
# *************************************************************************
......@@ -162,35 +234,29 @@ def plot_network_layout(network: Network,
fig.set_size_inches(*figure_size)
for description, arc_width in zip(
list_sorted_descriptions,
list_arc_widths
):
for description, arc_width in zip(list_sorted_descriptions, list_arc_widths):
# prepare plot
my_gdf.loc[arc_tech_summary_dict[description]].plot(
edgecolor='k',
legend=True,
linewidth=arc_width,
ax=ax)
edgecolor="k", legend=True, linewidth=arc_width, ax=ax
)
# adjust legend labels
ax.legend(list_sorted_descriptions,
ax.legend(
list_sorted_descriptions,
fontsize=legend_fontsize,
loc=legend_location,
framealpha=(
legend_transparency
if type(legend_transparency) != type(None) else None
)
legend_transparency if type(legend_transparency) != type(None) else None
),
)
# add base map
if include_basemap:
# TODO: reach this statement in tests
cx.add_basemap(ax,
cx.add_basemap(
ax,
zoom=basemap_zoom_level,
source=cx.providers.OpenStreetMap.Mapnik,
# crs=gdf_map.crs,
......@@ -199,24 +265,25 @@ def plot_network_layout(network: Network,
# *************************************************************************
# *************************************************************************
# *****************************************************************************
# *****************************************************************************
def plot_heating_demand(
losses: list,
end_use_demand: list,
labels: list,
ylabel: str = 'Heating demand [MWh]',
title: str = 'Heat demand by month'
ylabel: str = "Heating demand [MWh]",
title: str = "Heat demand by month",
):
energy_totals = {
'Losses (optimised)': np.array(losses),
'End use (estimated)': np.array(end_use_demand),
"Losses (optimised)": np.array(losses),
"End use (estimated)": np.array(end_use_demand),
}
colors = {
'Losses (optimised)': 'tab:orange',
'End use (estimated)': 'tab:blue',
"Losses (optimised)": "tab:orange",
"End use (estimated)": "tab:blue",
}
# width = 0.8 # the width of the bars: can also be len(x) sequence
......@@ -229,18 +296,17 @@ def plot_heating_demand(
figure_size = (8, 4)
fig.set_size_inches(figure_size[0], figure_size[1])
for energy_category, energy_total in energy_totals.items():
p = ax.bar(
labels,
energy_total,
label=energy_category,
bottom=bottom,
color=colors[energy_category],
zorder=zorder_bars
zorder=zorder_bars,
)
bottom += energy_total
ax.bar_label(p, fmt='{:,.0f}', label_type='center')
ax.bar_label(p, fmt="{:,.0f}", label_type="center")
# ax.bar_label(p, fmt='{:,.0f}')
ax.grid(zorder=zorder_grid) # zorder=0 to make the grid
......@@ -249,5 +315,6 @@ def plot_heating_demand(
plt.show()
# *****************************************************************************
# *****************************************************************************
# -*- coding: utf-8 -*-
#******************************************************************************
#******************************************************************************
# *****************************************************************************
# *****************************************************************************
from math import prod
from statistics import mean
#******************************************************************************
#******************************************************************************
# *****************************************************************************
# *****************************************************************************
# TODO: enable swapping the polarity
class Investment:
"""This class is meant to enable analysis of specific investments."""
......@@ -16,11 +19,13 @@ class Investment:
# TODO: consider using dicts to make things more intuitive, time-wise
def __init__(self,
def __init__(
self,
discount_rates: list,
net_cash_flows: list = None,
discount_rate: float = None,
analysis_period_span: int = None):
analysis_period_span: int = None,
):
"""
Create an object for investment analysis using typical information.
......@@ -39,54 +44,43 @@ class Investment:
# validate the inputs
if type(discount_rates) != type(None):
# discount_rates is not None:
if type(discount_rates) != tuple:
raise TypeError(
'The discount rates must be provided as a tuple.')
raise TypeError("The discount rates must be provided as a tuple.")
self.discount_rates = tuple(discount_rates)
self.analysis_period_span = len(self.discount_rates)
if self.analysis_period_span <= 0:
raise ValueError(
'The duration of the period under analysis must be '+
'positive.'
"The duration of the period under analysis must be " + "positive."
)
else:
# discount_rates is None:
# discount rate must be positive real under 1
# analysis_period_span must be an int
if type(discount_rate) != float:
raise TypeError(
'The discount rate must be provided as a float.')
raise TypeError("The discount rate must be provided as a float.")
if discount_rate <= 0 or discount_rate >= 1:
raise ValueError(
'The discount rate must be in the open interval between 0'+
' and 1.'
"The discount rate must be in the open interval between 0"
+ " and 1."
)
if type(analysis_period_span) != int:
raise TypeError(
'The duration of the period under consideration must be '+
'provided as an integer.')
"The duration of the period under consideration must be "
+ "provided as an integer."
)
if analysis_period_span <= 0:
raise ValueError(
'The duration of the period under analysis must be '+
'positive.'
"The duration of the period under analysis must be " + "positive."
)
self.analysis_period_span = analysis_period_span
......@@ -98,27 +92,18 @@ class Investment:
# check the net cash flows
if type(net_cash_flows) != type(None):
if type(net_cash_flows) != list:
raise TypeError(
'The net cash flows must be provided as a list.')
raise TypeError("The net cash flows must be provided as a list.")
if len(net_cash_flows) != self.analysis_period_span + 1:
raise ValueError(
'The inputs are consistent in terms of length.'
)
raise ValueError("The inputs are consistent in terms of length.")
self.net_cash_flows = list(net_cash_flows)
else:
# net_cash_flows is None: initialise it as a list of zeros
self.net_cash_flows = list(
0 for i in range(self.analysis_period_span+1)
)
self.net_cash_flows = list(0 for i in range(self.analysis_period_span + 1))
# discount factors
......@@ -127,18 +112,18 @@ class Investment:
for i in range(self.analysis_period_span + 1)
)
#**************************************************************************
#**************************************************************************
# *************************************************************************
# *************************************************************************
def add_investment(self,
def add_investment(
self,
investment: float,
investment_period: int,
investment_longevity: int,
commissioning_delay_after_investment: int = 0,
salvage_value_method: str = 'annuity'):
if salvage_value_method == 'annuity':
salvage_value_method: str = "annuity",
):
if salvage_value_method == "annuity":
mean_discount_rate = mean(self.discount_rates)
residual_value = salvage_value_annuity(
......@@ -146,14 +131,13 @@ class Investment:
investment_longevity=investment_longevity,
investment_period=investment_period,
discount_rate=mean_discount_rate,
analysis_period_span=self.analysis_period_span
analysis_period_span=self.analysis_period_span,
)
self.net_cash_flows[investment_period] += investment
self.net_cash_flows[self.analysis_period_span] += -residual_value
else:
residual_value = salvage_value_linear_depreciation(
investment=investment,
investment_period=investment_period,
......@@ -161,61 +145,58 @@ class Investment:
analysis_period_span=self.analysis_period_span,
commissioning_delay_after_investment=(
commissioning_delay_after_investment
)
),
)
self.net_cash_flows[investment_period] += investment
self.net_cash_flows[self.analysis_period_span] += -residual_value
#**************************************************************************
#**************************************************************************
# *************************************************************************
# *************************************************************************
def add_operational_cash_flows(self,
cash_flow: float or int,
start_period: int,
longevity: int = None):
def add_operational_cash_flows(
self, cash_flow: float or int, start_period: int, longevity: int = None
):
"""Adds a sequence of cash flows to the analysis."""
if type(longevity) == type(None):
# until the planning horizon
for i in range(self.analysis_period_span - start_period + 1):
# add operational cash flows
self.net_cash_flows[i + start_period] += cash_flow
else:
# limited longevity
for i in range(longevity):
if i + start_period >= self.analysis_period_span + 1:
break
# add operational cash flows
self.net_cash_flows[i + start_period] += cash_flow
#**************************************************************************
#**************************************************************************
# *************************************************************************
# *************************************************************************
def net_present_value(self):
"""Returns the net present value for the investment under analysis."""
return npv(self.discount_rates, self.net_cash_flows)
#**************************************************************************
#**************************************************************************
# *************************************************************************
# *************************************************************************
#******************************************************************************
#******************************************************************************
# *****************************************************************************
# *****************************************************************************
def npv(discount_rates: list,
net_cash_flows: list,
return_discount_factors: bool = False) -> float or tuple:
def npv(
discount_rates: list, net_cash_flows: list, return_discount_factors: bool = False
) -> float or tuple:
"""
Calculates the net present value using the information provided.
......@@ -245,32 +226,31 @@ def npv(discount_rates: list,
# check sizes
if len(discount_rates) != len(net_cash_flows) - 1:
# the inputs do not match, return None
raise ValueError('The inputs are inconsistent.')
raise ValueError("The inputs are inconsistent.")
discount_factors = [
discount_factor(discount_rates[:t])
for t in range(len(discount_rates)+1)
discount_factor(discount_rates[:t]) for t in range(len(discount_rates) + 1)
]
if return_discount_factors:
return sum(
ncf_t*df_t
for (ncf_t, df_t) in zip(net_cash_flows, discount_factors)
), discount_factors
return (
sum(
ncf_t * df_t for (ncf_t, df_t) in zip(net_cash_flows, discount_factors)
),
discount_factors,
)
else:
return sum(
ncf_t*df_t
for (ncf_t, df_t) in zip(net_cash_flows, discount_factors)
ncf_t * df_t for (ncf_t, df_t) in zip(net_cash_flows, discount_factors)
)
#******************************************************************************
#******************************************************************************
# *****************************************************************************
# *****************************************************************************
def discount_factor(discount_rates: list) -> float:
"""
......@@ -295,15 +275,18 @@ def discount_factor(discount_rates: list) -> float:
"""
return prod([1 / (1 + i) for i in discount_rates])
#******************************************************************************
#******************************************************************************
# *****************************************************************************
# *****************************************************************************
def salvage_value_linear_depreciation(
investment: int or float,
investment_period: int,
investment_longevity: int,
analysis_period_span: int,
commissioning_delay_after_investment: int = 1) -> float:
commissioning_delay_after_investment: int = 1,
) -> float:
"""
Determine an asset\'s salvage value by the end of an analysis period.
......@@ -338,66 +321,78 @@ def salvage_value_linear_depreciation(
"""
if investment_period >= analysis_period_span + 1:
raise ValueError(
'The investment has to be made within the period being analysed.'
"The investment has to be made within the period being analysed."
)
# calculate the salvage value
return (
investment_longevity+
investment_period+
commissioning_delay_after_investment-1-
analysis_period_span
)*investment/investment_longevity
(
investment_longevity
+ investment_period
+ commissioning_delay_after_investment
- 1
- analysis_period_span
)
* investment
/ investment_longevity
)
#******************************************************************************
#******************************************************************************
# *****************************************************************************
# *****************************************************************************
def salvage_value_annuity(investment: int or float,
def salvage_value_annuity(
investment: int or float,
discount_rate: float,
investment_longevity: int,
investment_period: int,
analysis_period_span: int) -> float:
analysis_period_span: int,
) -> float:
npv_salvage = present_salvage_value_annuity(
investment=investment,
investment_longevity=investment_longevity,
investment_period=investment_period,
discount_rate=discount_rate,
analysis_period_span=analysis_period_span,
return_annuity=False
return_annuity=False,
)
return npv_salvage / discount_factor(
tuple(discount_rate for i in range(analysis_period_span))
)
#******************************************************************************
#******************************************************************************
def annuity(investment: int or float,
investment_longevity: int,
discount_rate: float) -> float:
# *****************************************************************************
# *****************************************************************************
def annuity(
investment: int or float, investment_longevity: int, discount_rate: float
) -> float:
"Returns the annuity value for a given investment sum and longevity."
return (
investment*
discount_rate/(1-(1+discount_rate)**(
-investment_longevity
))
investment
* discount_rate
/ (1 - (1 + discount_rate) ** (-investment_longevity))
)
#******************************************************************************
#******************************************************************************
def present_salvage_value_annuity(investment: int or float,
# *****************************************************************************
# *****************************************************************************
def present_salvage_value_annuity(
investment: int or float,
investment_longevity: int,
investment_period: int,
discount_rate: float,
analysis_period_span: int,
return_annuity: bool = False) -> float:
return_annuity: bool = False,
) -> float:
"""
Calculates the present value of an asset after a given analysis period.
......@@ -438,25 +433,21 @@ def present_salvage_value_annuity(investment: int or float,
"""
if investment_period >= analysis_period_span + 1:
raise ValueError(
'The investment has to be made within the period being analysed.'
"The investment has to be made within the period being analysed."
)
# the present salvage value requires the lifetime to extend beyond the hor.
if analysis_period_span >= investment_longevity + investment_period:
if return_annuity:
return 0, annuity(
investment=investment,
investment_longevity=investment_longevity,
discount_rate=discount_rate
discount_rate=discount_rate,
)
else:
return 0
# the annuity has to consider the asset longevity and the commission. delay
......@@ -464,35 +455,29 @@ def present_salvage_value_annuity(investment: int or float,
value_annuity = annuity(
investment=investment,
investment_longevity=investment_longevity,
discount_rate=discount_rate
discount_rate=discount_rate,
)
discount_rates = tuple(
discount_rate
for i in range(investment_longevity+investment_period)
discount_rate for i in range(investment_longevity + investment_period)
)
net_cash_flows = list(
value_annuity
for i in range(investment_longevity+investment_period+1)
value_annuity for i in range(investment_longevity + investment_period + 1)
)
for year_index in range(analysis_period_span + 1):
net_cash_flows[year_index] = 0
if return_annuity:
return npv(
discount_rates=discount_rates,
net_cash_flows=net_cash_flows
), value_annuity
return (
npv(discount_rates=discount_rates, net_cash_flows=net_cash_flows),
value_annuity,
)
else:
return npv(discount_rates=discount_rates, net_cash_flows=net_cash_flows)
return npv(
discount_rates=discount_rates,
net_cash_flows=net_cash_flows
)
#******************************************************************************
#******************************************************************************
\ No newline at end of file
# *****************************************************************************
# *****************************************************************************
# *****************************************************************************
# *****************************************************************************
from ...problems.esipp.network import Arcs
# *****************************************************************************
# *****************************************************************************
class ArcInvestments(Arcs):
"""A class for defining arcs linked to investments."""
# *************************************************************************
# *************************************************************************
def __init__(self, investments: tuple, **kwargs):
# keep investment data
self.investments = investments
# initialise object
Arcs.__init__(
self,
minimum_cost=tuple([inv.net_present_value() for inv in self.investments]),
# validate=False,
**kwargs
)
# *************************************************************************
# *************************************************************************
def update_minimum_cost(self):
"Updates the minimum costs using the Investment objects."
self.minimum_cost = tuple([inv.net_present_value() for inv in self.investments])
# *************************************************************************
# *************************************************************************
# *****************************************************************************
# *****************************************************************************
\ No newline at end of file
# imports
from math import inf
......@@ -22,6 +21,7 @@ from ..gis import identify as ident
# *****************************************************************************
# *****************************************************************************
def edge_lengths(network: MultiDiGraph, edge_keys: tuple = None) -> dict:
"""
Calculate edge lengths in a OSMnx-formatted MultiDiGraph network object.
......@@ -44,7 +44,7 @@ def edge_lengths(network: MultiDiGraph, edge_keys: tuple = None) -> dict:
"""
# determine if the graph is projected or not
graph_is_projected = is_projected(network.graph['crs'])
graph_is_projected = is_projected(network.graph["crs"])
# check if edge keys were specified
if type(edge_keys) == type(None):
# no particular edge keys were provided: consider all edges (default)
......@@ -64,12 +64,16 @@ def edge_lengths(network: MultiDiGraph, edge_keys: tuple = None) -> dict:
else:
# use (projected) coordinates
start_point = Point(
(network.nodes[edge_key[0]][osm.KEY_OSMNX_X],
network.nodes[edge_key[0]][osm.KEY_OSMNX_Y])
(
network.nodes[edge_key[0]][osm.KEY_OSMNX_X],
network.nodes[edge_key[0]][osm.KEY_OSMNX_Y],
)
)
end_point = Point(
(network.nodes[edge_key[1]][osm.KEY_OSMNX_X],
network.nodes[edge_key[1]][osm.KEY_OSMNX_Y])
(
network.nodes[edge_key[1]][osm.KEY_OSMNX_X],
network.nodes[edge_key[1]][osm.KEY_OSMNX_Y],
)
)
length_dict[edge_key] = start_point.distance(end_point)
......@@ -86,14 +90,16 @@ def edge_lengths(network: MultiDiGraph, edge_keys: tuple = None) -> dict:
lat1=network.nodes[edge_key[0]][osm.KEY_OSMNX_Y],
lon1=network.nodes[edge_key[0]][osm.KEY_OSMNX_X],
lat2=network.nodes[edge_key[1]][osm.KEY_OSMNX_Y],
lon2=network.nodes[edge_key[1]][osm.KEY_OSMNX_X]
lon2=network.nodes[edge_key[1]][osm.KEY_OSMNX_X],
)
# return the dict with lengths of each edge
return length_dict
# *****************************************************************************
# *****************************************************************************
def great_circle_distance_along_path(path: LineString) -> float:
"""
Computes the great circle distance along a given path.
......@@ -121,13 +127,15 @@ def great_circle_distance_along_path(path: LineString) -> float:
lat[:-1], # latitudes of starting points
lon[:-1], # longitudes of starting points
lat[1:], # latitudes of ending points
lon[1:] # longitudes of ending points
lon[1:], # longitudes of ending points
)
)
# *****************************************************************************
# *****************************************************************************
def update_street_count(network: MultiDiGraph):
"""
Updates the street count attributes of nodes in a MultiDiGraph object.
......@@ -145,16 +153,20 @@ def update_street_count(network: MultiDiGraph):
# update street count
street_count_dict = count_streets_per_node(network)
network.add_nodes_from(
((key, {osm.KEY_OSMNX_STREET_COUNT:value})
for key, value in street_count_dict.items())
(
(key, {osm.KEY_OSMNX_STREET_COUNT: value})
for key, value in street_count_dict.items()
)
)
# *****************************************************************************
# *****************************************************************************
def node_path_length(network: MultiDiGraph,
path: list,
return_minimum_length_only: bool = True) -> list or float:
def node_path_length(
network: MultiDiGraph, path: list, return_minimum_length_only: bool = True
) -> list or float:
"""
Returns the length or lengths of a path defined using nodes.
......@@ -200,9 +212,7 @@ def node_path_length(network: MultiDiGraph,
for node_pair in range(path_length - 1):
# get the edges between these two nodes
edge_keys = ident.get_edges_from_a_to_b(
network,
path[node_pair],
path[node_pair+1]
network, path[node_pair], path[node_pair + 1]
)
number_edge_keys = len(edge_keys)
if number_edge_keys == 1:
......@@ -225,15 +235,12 @@ def node_path_length(network: MultiDiGraph,
# add the new edge
list_of_edge_key_paths[
path_index + edge_key_index * number_paths
].append(
edge_keys[edge_key_index]
)
].append(edge_keys[edge_key_index])
# *************************************************************************
path_lenths = [
sum(network.edges[edge_key][osm.KEY_OSMNX_LENGTH]
for edge_key in edge_key_path)
sum(network.edges[edge_key][osm.KEY_OSMNX_LENGTH] for edge_key in edge_key_path)
for edge_key_path in list_of_edge_key_paths
]
if return_minimum_length_only:
......@@ -243,12 +250,12 @@ def node_path_length(network: MultiDiGraph,
# *************************************************************************
# *****************************************************************************
# *****************************************************************************
def edge_path_length(network: MultiDiGraph,
path: list,
**kwargs) -> float:
def edge_path_length(network: MultiDiGraph, path: list, **kwargs) -> float:
"""
Returns the total length of a path defined using edges.
......@@ -274,19 +281,19 @@ def edge_path_length(network: MultiDiGraph,
if path_length == 0:
return inf
if ident.is_edge_path(network, path, **kwargs):
return sum(
network.edges[edge_key][osm.KEY_OSMNX_LENGTH] for edge_key in path
)
return sum(network.edges[edge_key][osm.KEY_OSMNX_LENGTH] for edge_key in path)
else:
# no path provided
return inf
# *****************************************************************************
# *****************************************************************************
def count_ocurrences(gdf: GeoDataFrame,
column: str,
column_entries: list = None) -> dict:
def count_ocurrences(
gdf: GeoDataFrame, column: str, column_entries: list = None
) -> dict:
"""
Counts the number of occurrences per entry in a DataFrame object's column.
......@@ -340,5 +347,6 @@ def count_ocurrences(gdf: GeoDataFrame,
# return statement
return count_dict
# *****************************************************************************
# *****************************************************************************
This diff is collapsed.
This diff is collapsed.
#******************************************************************************
#******************************************************************************
# *****************************************************************************
# *****************************************************************************
# OpenStreetMaps
# general
KEY_OSM_CITY = 'addr:city'
KEY_OSM_COUNTRY = 'addr:country'
KEY_OSM_HOUSE_NUMBER = 'addr:housenumber'
KEY_OSM_MUNICIPALITY = 'addr:municipality'
KEY_OSM_PLACE = 'addr:place'
KEY_OSM_POSTCODE = 'addr:postcode'
KEY_OSM_STREET = 'addr:street'
KEY_OSM_SOURCE = 'source'
KEY_OSM_CITY = "addr:city"
KEY_OSM_COUNTRY = "addr:country"
KEY_OSM_HOUSE_NUMBER = "addr:housenumber"
KEY_OSM_MUNICIPALITY = "addr:municipality"
KEY_OSM_PLACE = "addr:place"
KEY_OSM_POSTCODE = "addr:postcode"
KEY_OSM_STREET = "addr:street"
KEY_OSM_SOURCE = "source"
KEYS_OSM = [
KEY_OSM_CITY,
......@@ -22,37 +22,35 @@ KEYS_OSM = [
KEY_OSM_PLACE,
KEY_OSM_POSTCODE,
KEY_OSM_STREET,
KEY_OSM_SOURCE
KEY_OSM_SOURCE,
]
# country specific
KEY_COUNTRY_DK = 'dk'
KEY_COUNTRY_DK = "dk"
KEY_OSM_DK_BUILDING_ENTRANCE_ID = 'osak:identifier'
KEY_OSM_DK_BUILDING_ENTRANCE_ID = "osak:identifier"
KEY_OSM_BUILDING_ENTRANCE_ID = {
KEY_COUNTRY_DK: KEY_OSM_DK_BUILDING_ENTRANCE_ID
}
KEY_OSM_BUILDING_ENTRANCE_ID = {KEY_COUNTRY_DK: KEY_OSM_DK_BUILDING_ENTRANCE_ID}
#******************************************************************************
# *****************************************************************************
# osmnx
KEY_OSMNX_OSMID = 'osmid'
KEY_OSMNX_ELEMENT_TYPE = 'element_type'
KEY_OSMNX_OSMID = "osmid"
KEY_OSMNX_ELEMENT_TYPE = "element_type"
KEY_OSMNX_NAME = 'name'
KEY_OSMNX_GEOMETRY = 'geometry'
KEY_OSMNX_REVERSED = 'reversed'
KEY_OSMNX_LENGTH = 'length'
KEY_OSMNX_ONEWAY = 'oneway'
KEY_OSMNX_X = 'x'
KEY_OSMNX_Y = 'y'
KEY_OSMNX_LON = 'lon'
KEY_OSMNX_LAT = 'lat'
KEY_OSMNX_STREET_COUNT = 'street_count'
KEY_OSMNX_NAME = "name"
KEY_OSMNX_GEOMETRY = "geometry"
KEY_OSMNX_REVERSED = "reversed"
KEY_OSMNX_LENGTH = "length"
KEY_OSMNX_ONEWAY = "oneway"
KEY_OSMNX_X = "x"
KEY_OSMNX_Y = "y"
KEY_OSMNX_LON = "lon"
KEY_OSMNX_LAT = "lat"
KEY_OSMNX_STREET_COUNT = "street_count"
KEYS_OSMNX = [
KEY_OSMNX_OSMID, # one half of multi-index for geodataframes from osmnx
......@@ -66,7 +64,7 @@ KEYS_OSMNX = [
KEY_OSMNX_Y,
KEY_OSMNX_LON,
KEY_OSMNX_LAT,
KEY_OSMNX_STREET_COUNT
KEY_OSMNX_STREET_COUNT,
]
KEYS_OSMNX_NODES = {
......@@ -77,28 +75,24 @@ KEYS_OSMNX_NODES = {
KEY_OSMNX_Y,
KEY_OSMNX_LON,
KEY_OSMNX_LAT,
KEY_OSMNX_STREET_COUNT
KEY_OSMNX_STREET_COUNT,
}
KEYS_OSMNX_NODES_ESSENTIAL = {
KEY_OSMNX_OSMID,
KEY_OSMNX_NAME,
KEY_OSMNX_STREET_COUNT
}
KEYS_OSMNX_NODES_ESSENTIAL = {KEY_OSMNX_OSMID, KEY_OSMNX_NAME, KEY_OSMNX_STREET_COUNT}
KEYS_OSMNX_EDGES = {
KEY_OSMNX_OSMID,
KEY_OSMNX_LENGTH,
KEY_OSMNX_ONEWAY,
KEY_OSMNX_GEOMETRY,
KEY_OSMNX_REVERSED
KEY_OSMNX_REVERSED,
}
KEYS_OSMNX_EDGES_ESSENTIAL = {
KEY_OSMNX_OSMID,
KEY_OSMNX_LENGTH,
KEY_OSMNX_ONEWAY,
KEY_OSMNX_REVERSED
KEY_OSMNX_REVERSED,
}
#******************************************************************************
\ No newline at end of file
# *****************************************************************************
This diff is collapsed.
# -*- coding: utf-8 -*-
# constants
#******************************************************************************
#******************************************************************************
# *****************************************************************************
# *****************************************************************************
# unit conversion factors
......@@ -15,12 +15,12 @@ GJ_DIV_MWh = 1000/3600
MWh_DIV_J = 1 / (3600 * 1000 * 1000)
#******************************************************************************
#******************************************************************************
# *****************************************************************************
# *****************************************************************************
# currency conversions
EUR_DIV_DKK = 1 / (743.95 / 100)
#******************************************************************************
#******************************************************************************
\ No newline at end of file
# *****************************************************************************
# *****************************************************************************