Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • pmag/topupopt
1 result
Show changes
Commits on Source (7)
Showing
with 2735 additions and 2811 deletions
# -*- coding: utf-8 -*-
#from . import mvesipp
\ No newline at end of file
# from . import mvesipp
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
This diff is collapsed.
......@@ -12,22 +12,19 @@ from .bbr import label_bbr_entrance_id, label_bbr_housing_area
# labels
selected_bbr_adgang_labels = [
"Opgang_id",
"AdgAdr_id",
"Bygning_id"]
selected_bbr_adgang_labels = ["Opgang_id", "AdgAdr_id", "Bygning_id"]
selected_bbr_building_point_labels = [
"KoorOest",
"KoorNord",
"KoorSystem",
#"koordinater" # corresponds to a list, which cannot be written to a file
]
# "koordinater" # corresponds to a list, which cannot be written to a file
]
selected_bbr_building_labels = [
"BYG_ANVEND_KODE",
"OPFOERELSE_AAR", # new
"OMBYG_AAR", # new
"OPFOERELSE_AAR", # new
"OMBYG_AAR", # new
"BYG_ARL_SAML",
"BYG_BOLIG_ARL_SAML",
"ERHV_ARL_SAML",
......@@ -47,145 +44,138 @@ selected_bbr_building_labels = [
"VARMEINSTAL_KODE",
"OPVARMNING_KODE",
"VARME_SUPPL_KODE",
"BygPkt_id"]
"BygPkt_id",
]
# label under which building entrance ids can be found in OSM
label_osm_entrance_id = 'osak:identifier'
label_osm_entrance_id = "osak:identifier"
# *****************************************************************************
# *****************************************************************************
def heat_demand_dict_by_building_entrance(
gdf_osm: GeoDataFrame,
gdf_buildings: GeoDataFrame,
number_intervals: int,
time_interval_durations: list,
bdg_specific_demand: dict,
bdg_ratio_min_max: dict,
bdg_demand_phase_shift: dict = None,
key_osm_entr_id: str = label_osm_entrance_id,
key_bbr_entr_id: str = label_bbr_entrance_id,
avg_state: list = None,
state_correlates_with_output: bool = False
) -> dict:
gdf_osm: GeoDataFrame,
gdf_buildings: GeoDataFrame,
number_intervals: int,
time_interval_durations: list,
bdg_specific_demand: dict,
bdg_ratio_min_max: dict,
bdg_demand_phase_shift: dict = None,
key_osm_entr_id: str = label_osm_entrance_id,
key_bbr_entr_id: str = label_bbr_entrance_id,
avg_state: list = None,
state_correlates_with_output: bool = False,
) -> dict:
# initialise dict for each building entrance
demand_dict = {}
# for each building entrance
for osm_index in gdf_osm.index:
# initialise dict for each building consumption point
heat_demand_profiles = []
# find the indexes for each building leading to the curr. cons. point
building_indexes = (
gdf_buildings[
gdf_buildings[key_bbr_entr_id] ==
gdf_osm.loc[osm_index][key_osm_entr_id]
].index
)
building_indexes = gdf_buildings[
gdf_buildings[key_bbr_entr_id] == gdf_osm.loc[osm_index][key_osm_entr_id]
].index
# for each building
for building_index in building_indexes:
# get relevant data
# base_load_avg_ratio = 0.3
# specific_demand = 107 # kWh/m2/year
area = gdf_buildings.loc[building_index][label_bbr_housing_area]
# estimate its demand
if type(avg_state) == type(None):
# ignore states
heat_demand_profiles.append(
np.array(
discrete_sinusoid_matching_integral(
bdg_specific_demand[building_index]*area,
time_interval_durations=time_interval_durations,
bdg_ratio_min_max=bdg_ratio_min_max[building_index],
bdg_specific_demand[building_index] * area,
time_interval_durations=time_interval_durations,
min_to_max_ratio=bdg_ratio_min_max[building_index],
phase_shift_radians=(
bdg_demand_phase_shift[building_index]
# bdg_demand_phase_shift_amplitude*np.random.random()
# if (type(bdg_demand_phase_shift_amplitude) ==
# bdg_demand_phase_shift_amplitude*np.random.random()
# if (type(bdg_demand_phase_shift_amplitude) ==
# type(None)) else None
)
)
),
)
)
)
else:
# states matter
heat_demand_profiles.append(
np.array(
create_profile_using_time_weighted_state(
integration_result=(
bdg_specific_demand[building_index]*area
),
avg_state=avg_state,
time_interval_durations=time_interval_durations,
bdg_ratio_min_max=bdg_ratio_min_max[building_index],
state_correlates_with_output=state_correlates_with_output
)
bdg_specific_demand[building_index] * area
),
avg_state=avg_state,
time_interval_durations=time_interval_durations,
min_to_max_ratio=bdg_ratio_min_max[building_index],
state_correlates_with_output=state_correlates_with_output,
)
)
)
# *****************************************************************
# add the profiles, time step by time step
if len(heat_demand_profiles) == 0:
final_profile = []
else:
final_profile = sum(profile
for profile in heat_demand_profiles)
final_profile = sum(profile for profile in heat_demand_profiles)
# *********************************************************************
# store the demand profile
demand_dict[osm_index] = final_profile
# *********************************************************************
# return
return demand_dict
# *****************************************************************************
# *****************************************************************************
def total_heating_area(
gdf_osm: GeoDataFrame,
gdf_buildings: GeoDataFrame,
key_osm_entr_id: str = label_osm_entrance_id,
key_bbr_entr_id: str = label_bbr_entrance_id
) -> float:
gdf_osm: GeoDataFrame,
gdf_buildings: GeoDataFrame,
key_osm_entr_id: str = label_osm_entrance_id,
key_bbr_entr_id: str = label_bbr_entrance_id,
) -> float:
area = 0
for osm_index in gdf_osm.index:
# find the indexes for each building leading to the curr. cons. point
building_indexes = (
gdf_buildings[
gdf_buildings[label_bbr_entrance_id] ==
gdf_osm.loc[osm_index][label_osm_entrance_id]
].index
)
building_indexes = gdf_buildings[
gdf_buildings[label_bbr_entrance_id]
== gdf_osm.loc[osm_index][label_osm_entrance_id]
].index
# for each building
for building_index in building_indexes:
# get relevant data
area += gdf_buildings.loc[building_index][label_bbr_housing_area]
return area
# *****************************************************************************
# *****************************************************************************
# *****************************************************************************
\ No newline at end of file
# -*- coding: utf-8 -*-
......@@ -24,29 +24,29 @@ from ...data.finance.utils import ArcInvestments
# constants
KEY_DHT_OPTIONS_OBJ = 'trench'
KEY_DHT_LENGTH = 'length'
KEY_DHT_UCF = 'capacity_unit_conversion_factor'
KEY_HHT_DHT_PIPES = 'pipes'
KEY_HHT_STD_PIPES = 'pipe_tuple'
KEY_DHT_OPTIONS_OBJ = "trench"
KEY_DHT_LENGTH = "length"
KEY_DHT_UCF = "capacity_unit_conversion_factor"
KEY_HHT_DHT_PIPES = "pipes"
KEY_HHT_STD_PIPES = "pipe_tuple"
# *****************************************************************************
# *****************************************************************************
class PipeTrenchOptions(ArcsWithoutProportionalLosses):
"A class for defining investments in district heating trenches."
def __init__(
self,
trench: SupplyReturnPipeTrench,
name: str,
length: float,
specific_capacity_cost: float or list = None,
minimum_cost: list or tuple = None, # default: pipes
capacity_is_instantaneous: bool = False,
unit_conversion_factor: float = 1.0,
):
self,
trench: SupplyReturnPipeTrench,
name: str,
length: float,
specific_capacity_cost: float or list = None,
minimum_cost: list or tuple = None, # default: pipes
capacity_is_instantaneous: bool = False,
unit_conversion_factor: float = 1.0,
):
# store the unit conversion
self.unit_conversion_factor = unit_conversion_factor
# keep the trench object
......@@ -54,36 +54,33 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses):
# keep the trench length
self.length = (
[length for i in range(trench.number_options())]
if trench.vector_mode else
length
)
if trench.vector_mode
else length
)
# determine the rated heat capacity
rhc = trench.rated_heat_capacity(
unit_conversion_factor=unit_conversion_factor
)
rhc = trench.rated_heat_capacity(unit_conversion_factor=unit_conversion_factor)
# initialise the object using the mother class
ArcsWithoutProportionalLosses.__init__(
self,
name=name,
static_loss=None,
capacity=[rhc] if isinstance(rhc, Real) else rhc,
minimum_cost=minimum_cost,
self,
name=name,
static_loss=None,
capacity=[rhc] if isinstance(rhc, Real) else rhc,
minimum_cost=minimum_cost,
specific_capacity_cost=(
0
if type(specific_capacity_cost) == type(None) else
specific_capacity_cost
),
capacity_is_instantaneous=False
)
if type(specific_capacity_cost) == type(None)
else specific_capacity_cost
),
capacity_is_instantaneous=False,
)
# initialise the minimum cost
if type(minimum_cost) == type(None):
self.set_minimum_cost()
# *************************************************************************
# *************************************************************************
def set_minimum_cost(self, minimum_cost = None):
def set_minimum_cost(self, minimum_cost=None):
# minimum arc cost
# if no external minimum cost list was provided, calculate it
if type(minimum_cost) == type(None):
......@@ -91,22 +88,21 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses):
if self.trench.vector_mode:
# multiple options
self.minimum_cost = tuple(
(pipe.sp*length # twin pipes: one twin pipe
if self.trench.twin_pipes else
pipe.sp*length*2) # single pipes: two single pipes
for pipe, length in zip(
self.trench.supply_pipe,
self.length
)
)
else: # only one option
self.minimum_cost = (self.trench.supply_pipe.sp*self.length,)
else: # use an external minimum cost
(
pipe.sp * length # twin pipes: one twin pipe
if self.trench.twin_pipes
else pipe.sp * length * 2
) # single pipes: two single pipes
for pipe, length in zip(self.trench.supply_pipe, self.length)
)
else: # only one option
self.minimum_cost = (self.trench.supply_pipe.sp * self.length,)
else: # use an external minimum cost
self.minimum_cost = tuple(minimum_cost)
# *************************************************************************
# *************************************************************************
def set_capacity(self, **kwargs):
# retrieve the rated heat capacity
rhc = self.trench.rated_heat_capacity(**kwargs)
......@@ -116,56 +112,52 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses):
else:
# one option, rhc is one value
self.capacity = (rhc,)
# *************************************************************************
# *************************************************************************
def set_static_losses(
self,
scenario_key,
ground_thermal_conductivity: float or list,
ground_air_heat_transfer_coefficient: float or list,
time_interval_duration: float or list,
temperature_surroundings: float or list,
length: float or list = None,
unit_conversion_factor: float = None,
**kwargs):
self,
scenario_key,
ground_thermal_conductivity: float or list,
ground_air_heat_transfer_coefficient: float or list,
time_interval_duration: float or list,
temperature_surroundings: float or list,
length: float or list = None,
unit_conversion_factor: float = None,
**kwargs
):
hts = self.trench.heat_transfer_surroundings(
ground_thermal_conductivity=ground_thermal_conductivity,
ground_air_heat_transfer_coefficient=(
ground_air_heat_transfer_coefficient),
ground_air_heat_transfer_coefficient=(ground_air_heat_transfer_coefficient),
time_interval_duration=time_interval_duration,
temperature_surroundings=temperature_surroundings,
length=(
self.length
if type(length) == type(None) else
length
),
length=(self.length if type(length) == type(None) else length),
unit_conversion_factor=(
self.unit_conversion_factor
if type(unit_conversion_factor) == type(None) else
unit_conversion_factor
),
**kwargs)
self.unit_conversion_factor
if type(unit_conversion_factor) == type(None)
else unit_conversion_factor
),
**kwargs
)
if self.trench.vector_mode:
# multiple options: hts is a vector
if (hasattr(self, "static_loss") and
type(self.static_loss) != type(None)):
if hasattr(self, "static_loss") and type(self.static_loss) != type(None):
# update the static loss dictionary
if type(hts[0]) == list:
# multiple time intervals
self.static_loss.update({
(h, scenario_key, k): hts[h][k]
for h, hts_h in enumerate(hts)
for k, hts_hk in enumerate(hts_h)
})
else: # not a list: one time interval
self.static_loss.update({
(h, scenario_key, 0): hts[h]
for h, hts_h in enumerate(hts)
})
self.static_loss.update(
{
(h, scenario_key, k): hts[h][k]
for h, hts_h in enumerate(hts)
for k, hts_hk in enumerate(hts_h)
}
)
else: # not a list: one time interval
self.static_loss.update(
{(h, scenario_key, 0): hts[h] for h, hts_h in enumerate(hts)}
)
else:
# no static loss dictionary, create it
if type(hts[0]) == list:
......@@ -174,59 +166,52 @@ class PipeTrenchOptions(ArcsWithoutProportionalLosses):
(h, scenario_key, k): hts[h][k]
for h, hts_h in enumerate(hts)
for k, hts_hk in enumerate(hts_h)
}
else: # not a list: one time interval
}
else: # not a list: one time interval
self.static_loss = {
(h, scenario_key, 0): hts[h]
for h, hts_h in enumerate(hts)
}
(h, scenario_key, 0): hts[h] for h, hts_h in enumerate(hts)
}
else:
# one option: hts might be a number
if (hasattr(self, "static_loss") and
type(self.static_loss) != type(None)):
if hasattr(self, "static_loss") and type(self.static_loss) != type(None):
# update the static loss dictionary
if not isinstance(hts, Real):
# multiple time intervals
self.static_loss.update({
(0, scenario_key, k): hts[k]
for k, hts_k in enumerate(hts)
})
else: # not a list: one time interval
self.static_loss.update({
(0, scenario_key, 0): hts
})
self.static_loss.update(
{(0, scenario_key, k): hts[k] for k, hts_k in enumerate(hts)}
)
else: # not a list: one time interval
self.static_loss.update({(0, scenario_key, 0): hts})
else:
# no static loss dictionary, create it
if not isinstance(hts, Real):
# multiple time intervals
self.static_loss = {
(0, scenario_key, k): hts_k
for k, hts_k in enumerate(hts)
}
else: # not a list: one time interval
self.static_loss = {
(0, scenario_key, 0): hts
}
(0, scenario_key, k): hts_k for k, hts_k in enumerate(hts)
}
else: # not a list: one time interval
self.static_loss = {(0, scenario_key, 0): hts}
# *****************************************************************************
# *****************************************************************************
class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions):
"A class for defining investments in district heating trenches."
def __init__(
self,
trench: SupplyReturnPipeTrench,
name: str,
length: float,
investments: tuple,
static_loss: dict = None,
specific_capacity_cost: float or list = None,
capacity_is_instantaneous: bool = False,
unit_conversion_factor: float = 1.0,
**kwargs
):
self,
trench: SupplyReturnPipeTrench,
name: str,
length: float,
investments: tuple,
static_loss: dict = None,
specific_capacity_cost: float or list = None,
capacity_is_instantaneous: bool = False,
unit_conversion_factor: float = 1.0,
**kwargs
):
# store the unit conversion
self.unit_conversion_factor = unit_conversion_factor
# keep the trench object
......@@ -234,36 +219,34 @@ class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions):
# keep the trench length
self.length = (
[length for i in range(trench.number_options())]
if trench.vector_mode else
length
)
if trench.vector_mode
else length
)
# determine the rated heat capacity
rhc = trench.rated_heat_capacity(
unit_conversion_factor=unit_conversion_factor
)
rhc = trench.rated_heat_capacity(unit_conversion_factor=unit_conversion_factor)
# initialise the object using the mother class
ArcInvestments.__init__(
self,
investments=investments,
name=name,
efficiency=None,
efficiency_reverse=None,
self,
investments=investments,
name=name,
efficiency=None,
efficiency_reverse=None,
static_loss=static_loss,
capacity=[rhc] if isinstance(rhc, Real) else rhc,
specific_capacity_cost=(
0
if type(specific_capacity_cost) == type(None) else
specific_capacity_cost
),
capacity_is_instantaneous=False,
validate=False
)
if type(specific_capacity_cost) == type(None)
else specific_capacity_cost
),
capacity_is_instantaneous=False,
validate=False,
)
# # *************************************************************************
# # *************************************************************************
# def set_minimum_cost(self, minimum_cost = None):
# # minimum arc cost
# # if no external minimum cost list was provided, calculate it
# if type(minimum_cost) == type(None):
......@@ -272,10 +255,10 @@ class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions):
# # multiple options
# self.minimum_cost = tuple(
# (pipe.sp*length # twin pipes: one twin pipe
# if self.trench.twin_pipes else
# if self.trench.twin_pipes else
# pipe.sp*length*2) # single pipes: two single pipes
# for pipe, length in zip(
# self.trench.supply_pipe,
# self.trench.supply_pipe,
# self.length
# )
# )
......@@ -283,10 +266,10 @@ class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions):
# self.minimum_cost = (self.trench.supply_pipe.sp*self.length,)
# else: # use an external minimum cost
# self.minimum_cost = tuple(minimum_cost)
# # *************************************************************************
# # *************************************************************************
# def set_capacity(self, **kwargs):
# # retrieve the rated heat capacity
# rhc = self.trench.rated_heat_capacity(**kwargs)
......@@ -296,12 +279,12 @@ class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions):
# else:
# # one option, rhc is one value
# self.capacity = (rhc,)
# # *************************************************************************
# # *************************************************************************
# def set_static_losses(
# self,
# self,
# scenario_key,
# ground_thermal_conductivity: float or list,
# ground_air_heat_transfer_coefficient: float or list,
......@@ -310,7 +293,7 @@ class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions):
# length: float or list = None,
# unit_conversion_factor: float = None,
# **kwargs):
# hts = self.trench.heat_transfer_surroundings(
# ground_thermal_conductivity=ground_thermal_conductivity,
# ground_air_heat_transfer_coefficient=(
......@@ -318,20 +301,20 @@ class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions):
# time_interval_duration=time_interval_duration,
# temperature_surroundings=temperature_surroundings,
# length=(
# self.length
# if type(length) == type(None) else
# self.length
# if type(length) == type(None) else
# length
# ),
# unit_conversion_factor=(
# self.unit_conversion_factor
# if type(unit_conversion_factor) == type(None) else
# self.unit_conversion_factor
# if type(unit_conversion_factor) == type(None) else
# unit_conversion_factor
# ),
# **kwargs)
# if self.trench.vector_mode:
# # multiple options: hts is a vector
# if (hasattr(self, "static_loss") and
# if (hasattr(self, "static_loss") and
# type(self.static_loss) != type(None)):
# # update the static loss dictionary
# if type(hts[0]) == list:
......@@ -362,7 +345,7 @@ class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions):
# }
# else:
# # one option: hts might be a number
# if (hasattr(self, "static_loss") and
# if (hasattr(self, "static_loss") and
# type(self.static_loss) != type(None)):
# # update the static loss dictionary
# if not isinstance(hts, Real):
......@@ -387,21 +370,25 @@ class PipeTrenchInvestments(ArcInvestments, PipeTrenchOptions):
# self.static_loss = {
# (0, scenario_key, 0): hts
# }
# *****************************************************************************
# *****************************************************************************
class ExistingPipeTrench(PipeTrenchOptions):
"A class for existing pipe trenches."
def __init__(self, option_selected: int, **kwargs):
# initialise
PipeTrenchOptions.__init__(
self,
minimum_cost=[0 for i in range(kwargs['trench'].number_options())],
**kwargs)
minimum_cost=[0 for i in range(kwargs["trench"].number_options())],
**kwargs
)
# define the option that already exists
self.options_selected[option_selected] = True
# *****************************************************************************
# *****************************************************************************
# *****************************************************************************
\ No newline at end of file
......@@ -15,12 +15,12 @@ from ...problems.esipp.network import Network
from .network import PipeTrenchOptions
from topupheat.pipes.trenches import SupplyReturnPipeTrench
from numbers import Real
# *****************************************************************************
# *****************************************************************************
def cost_pipes(trench: SupplyReturnPipeTrench,
length: float or tuple) -> tuple:
def cost_pipes(trench: SupplyReturnPipeTrench, length: float or tuple) -> tuple:
"""
Returns the costs of each trench option for a given trench length.
......@@ -45,84 +45,87 @@ def cost_pipes(trench: SupplyReturnPipeTrench,
# use the specific pipe cost that features in the database
if trench.vector_mode:
# multiple options
if (type(length) == tuple and
len(length) == trench.number_options()):
if type(length) == tuple and len(length) == trench.number_options():
# multiple trench lengths
return tuple(
(pipe.sp*length # twin pipes: one twin pipe
if trench.twin_pipes else
pipe.sp*length*2) # single pipes: two single pipes
(
pipe.sp * length # twin pipes: one twin pipe
if trench.twin_pipes
else pipe.sp * length * 2
) # single pipes: two single pipes
for pipe, length in zip(trench.supply_pipe, length)
)
)
elif isinstance(length, Real):
# one trench length
return tuple(
(pipe.sp*length # twin pipes: one twin pipe
if trench.twin_pipes else
pipe.sp*length*2) # single pipes: two single pipes
(
pipe.sp * length # twin pipes: one twin pipe
if trench.twin_pipes
else pipe.sp * length * 2
) # single pipes: two single pipes
for pipe in trench.supply_pipe
)
)
else:
raise ValueError('Unrecognised input combination.')
elif (not trench.vector_mode and isinstance(length, Real)):
raise ValueError("Unrecognised input combination.")
elif not trench.vector_mode and isinstance(length, Real):
# only one option
return (trench.supply_pipe.sp*length,)
else: # only one option
raise ValueError('Unrecognised input combination.')
return (trench.supply_pipe.sp * length,)
else: # only one option
raise ValueError("Unrecognised input combination.")
# # keep the trench length
# self.length = (
# [length for i in range(trench.number_options())]
# if trench.vector_mode else
# if trench.vector_mode else
# length
# )
# *****************************************************************************
# *****************************************************************************
def summarise_network_by_pipe_technology(
network: Network,
print_output: bool = False
) -> dict:
network: Network, print_output: bool = False
) -> dict:
"A method to summarise a network by pipe technology."
# *************************************************************************
# *************************************************************************
# create a dictionary that compiles the lengths of each arc technology
length_dict = {}
# *************************************************************************
# *************************************************************************
# for each arc
for arc_key in network.edges(keys=True):
# check if it is a PipeTrench object
if not isinstance(
network.edges[arc_key][Network.KEY_ARC_TECH],
PipeTrenchOptions
):
network.edges[arc_key][Network.KEY_ARC_TECH], PipeTrenchOptions
):
# if not, skip arc
continue
# for each arc technology option
for h, tech_option in enumerate(
network.edges[arc_key][Network.KEY_ARC_TECH].options_selected
):
network.edges[arc_key][Network.KEY_ARC_TECH].options_selected
):
# check if the tech option was selected
if tech_option:
# technology option was selected
# get the length of the arc
arc_length = (
network.edges[arc_key][Network.KEY_ARC_TECH].length[h]
if type(network.edges[arc_key][
Network.KEY_ARC_TECH].length) == list else
network.edges[arc_key][Network.KEY_ARC_TECH].length
)
if type(network.edges[arc_key][Network.KEY_ARC_TECH].length) == list
else network.edges[arc_key][Network.KEY_ARC_TECH].length
)
# identify the option
tech_option_label = network.edges[arc_key][
Network.KEY_ARC_TECH].trench.printable_description(h)
Network.KEY_ARC_TECH
].trench.printable_description(h)
# if the arc technology has been previously selected...
if tech_option_label in length_dict:
# ...increment the total length
......@@ -130,158 +133,158 @@ def summarise_network_by_pipe_technology(
else:
# if not, add a new arc technology to the dictionary
length_dict[tech_option_label] = arc_length
# *************************************************************************
# *************************************************************************
if print_output:
print('printing the arc technologies selected by pipe size...')
if print_output:
print("printing the arc technologies selected by pipe size...")
for key, value in sorted(
(tech, length)
for tech, length in length_dict.items()
):
print(str(key)+': '+str(value))
print('total: '+str(sum(length_dict.values())))
(tech, length) for tech, length in length_dict.items()
):
print(str(key) + ": " + str(value))
print("total: " + str(sum(length_dict.values())))
return length_dict
# *************************************************************************
# *************************************************************************
# *****************************************************************************
# *****************************************************************************
def plot_network_layout(network: Network,
include_basemap: bool = False,
figure_size: tuple = (25, 25),
min_linewidth: float = 1.0,
max_linewidth: float = 3.0,
legend_fontsize: float = 20.0,
basemap_zoom_level: float = 15,
legend_location: str = 'lower left',
legend_with_brand_model: bool = False,
legend_transparency: float = None):
def plot_network_layout(
network: Network,
include_basemap: bool = False,
figure_size: tuple = (25, 25),
min_linewidth: float = 1.0,
max_linewidth: float = 3.0,
legend_fontsize: float = 20.0,
basemap_zoom_level: float = 15,
legend_location: str = "lower left",
legend_with_brand_model: bool = False,
legend_transparency: float = None,
):
# convert graph object to GDF
_, my_gdf_arcs = ox.graph_to_gdfs(network)
# convert to final plot CRS
my_gdf = my_gdf_arcs.to_crs(epsg=3857)
# dict: keys are the pipe tuples and the values are lists of edge keys
arc_tech_summary_dict = {}
# for each edge
for arc_key in my_gdf.index:
# check if it is a PipeTrenchOptions object
if not isinstance(
network.edges[arc_key][Network.KEY_ARC_TECH],
PipeTrenchOptions
):
network.edges[arc_key][Network.KEY_ARC_TECH], PipeTrenchOptions
):
# if not, skip arc
continue
# find the trench's description, if it was selected
try:
try:
selected_option = (
my_gdf[Network.KEY_ARC_TECH].loc[
arc_key].trench.printable_description(
my_gdf[Network.KEY_ARC_TECH].loc[
arc_key].options_selected.index(True)
)
my_gdf[Network.KEY_ARC_TECH]
.loc[arc_key]
.trench.printable_description(
my_gdf[Network.KEY_ARC_TECH]
.loc[arc_key]
.options_selected.index(True)
)
)
except ValueError:
continue
# if the pipe tuple already exists as a key in the dict
if selected_option in arc_tech_summary_dict:
# append the edge_key to the list obtained via that pipe tuple key
arc_tech_summary_dict[selected_option].append(arc_key)
else: # if not
else: # if not
# add a new dict entry whose key is the pipe tuple and create a list
arc_tech_summary_dict[selected_option] = [arc_key]
list_sorted = sorted(
(int(printable_description[2:]), printable_description)
for printable_description in arc_tech_summary_dict.keys()
)
(list_sorted_dn,
list_sorted_descriptions) = list(map(list,zip(*list_sorted)))
list_arc_widths = [
min_linewidth+
(max_linewidth-min_linewidth)*
iteration/(len(list_sorted_dn)-1)
for iteration, _ in enumerate(list_sorted_dn)
] if len(list_sorted_dn) != 1 else [(max_linewidth+min_linewidth)/2]
)
(list_sorted_dn, list_sorted_descriptions) = list(map(list, zip(*list_sorted)))
list_arc_widths = (
[
min_linewidth
+ (max_linewidth - min_linewidth) * iteration / (len(list_sorted_dn) - 1)
for iteration, _ in enumerate(list_sorted_dn)
]
if len(list_sorted_dn) != 1
else [(max_linewidth + min_linewidth) / 2]
)
# *************************************************************************
# *************************************************************************
fig, ax = plt.subplots(1,1)
fig, ax = plt.subplots(1, 1)
fig.set_size_inches(*figure_size)
for description, arc_width in zip(
list_sorted_descriptions,
list_arc_widths
):
for description, arc_width in zip(list_sorted_descriptions, list_arc_widths):
# prepare plot
my_gdf.loc[arc_tech_summary_dict[description]].plot(
edgecolor='k',
legend=True,
linewidth=arc_width,
ax=ax)
edgecolor="k", legend=True, linewidth=arc_width, ax=ax
)
# adjust legend labels
ax.legend(list_sorted_descriptions,
fontsize=legend_fontsize,
loc=legend_location,
framealpha=(
legend_transparency
if type(legend_transparency) != type(None) else None
)
)
ax.legend(
list_sorted_descriptions,
fontsize=legend_fontsize,
loc=legend_location,
framealpha=(
legend_transparency if type(legend_transparency) != type(None) else None
),
)
# add base map
if include_basemap:
cx.add_basemap(ax,
zoom=basemap_zoom_level,
source=cx.providers.OpenStreetMap.Mapnik,
#crs=gdf_map.crs,
)
cx.add_basemap(
ax,
zoom=basemap_zoom_level,
source=cx.providers.OpenStreetMap.Mapnik,
# crs=gdf_map.crs,
)
# *************************************************************************
# *************************************************************************
# *****************************************************************************
# *****************************************************************************
def plot_heating_demand(
losses: list,
end_use_demand: list,
labels: list,
ylabel: str = 'Heating demand [MWh]',
title: str = 'Heat demand by month'
):
losses: list,
end_use_demand: list,
labels: list,
ylabel: str = "Heating demand [MWh]",
title: str = "Heat demand by month",
):
energy_totals = {
'Losses (optimised)': np.array(losses),
'End use (estimated)': np.array(end_use_demand),
}
"Losses (optimised)": np.array(losses),
"End use (estimated)": np.array(end_use_demand),
}
colors = {
'Losses (optimised)': 'tab:orange',
'End use (estimated)': 'tab:blue',
}
"Losses (optimised)": "tab:orange",
"End use (estimated)": "tab:blue",
}
# width = 0.8 # the width of the bars: can also be len(x) sequence
# make sure the grid lines are behind the bars
......@@ -290,28 +293,28 @@ def plot_heating_demand(
fig, ax = plt.subplots()
bottom = np.zeros(len(labels))
figure_size = (8,4)
figure_size = (8, 4)
fig.set_size_inches(figure_size[0], figure_size[1])
for energy_category, energy_total in energy_totals.items():
p = ax.bar(
labels,
energy_total,
label=energy_category,
bottom=bottom,
color=colors[energy_category],
zorder=zorder_bars
)
labels,
energy_total,
label=energy_category,
bottom=bottom,
color=colors[energy_category],
zorder=zorder_bars,
)
bottom += energy_total
ax.bar_label(p, fmt='{:,.0f}', label_type='center')
ax.bar_label(p, fmt="{:,.0f}", label_type="center")
# ax.bar_label(p, fmt='{:,.0f}')
ax.grid(zorder=zorder_grid) # zorder=0 to make the grid
ax.grid(zorder=zorder_grid) # zorder=0 to make the grid
ax.set(ylabel=ylabel, title=title)
ax.legend()
plt.show()
# *****************************************************************************
# *****************************************************************************
# *****************************************************************************
\ No newline at end of file
# -*- coding: utf-8 -*-
This diff is collapsed.
# *****************************************************************************
# *****************************************************************************
from ...problems.esipp.network import Arcs
# *****************************************************************************
# *****************************************************************************
class ArcInvestments(Arcs):
"""A class for defining arcs linked to investments."""
# *************************************************************************
# *************************************************************************
def __init__(self, investments: tuple, **kwargs):
# keep investment data
self.investments = investments
# initialise object
Arcs.__init__(
self,
minimum_cost=tuple([inv.net_present_value() for inv in self.investments]),
# validate=False,
**kwargs
)
# *************************************************************************
# *************************************************************************
def update_minimum_cost(self):
"Updates the minimum costs using the Investment objects."
self.minimum_cost = tuple([inv.net_present_value() for inv in self.investments])
# *************************************************************************
# *************************************************************************
# *****************************************************************************
# *****************************************************************************
\ No newline at end of file
# -*- coding: utf-8 -*-
# import osm
\ No newline at end of file
# import osm
# imports
from math import inf
......@@ -22,10 +21,11 @@ from ..gis import identify as ident
# *****************************************************************************
# *****************************************************************************
def edge_lengths(network: MultiDiGraph, edge_keys: tuple = None) -> dict:
"""
Calculate edge lengths in a OSMnx-formatted MultiDiGraph network object.
The calculation method changes depending on whether the coordinates are
projected and depending on whether the edges are simplified.
......@@ -44,60 +44,66 @@ def edge_lengths(network: MultiDiGraph, edge_keys: tuple = None) -> dict:
"""
# determine if the graph is projected or not
graph_is_projected = is_projected(network.graph['crs'])
graph_is_projected = is_projected(network.graph["crs"])
# check if edge keys were specified
if type(edge_keys) == type(None):
# no particular edge keys were provided: consider all edges (default)
edge_keys = network.edges(keys=True) # tuple(network.edges(keys=True))
# initialise length dict
edge_keys = network.edges(keys=True) # tuple(network.edges(keys=True))
# initialise length dict
length_dict = {}
# for each edge on the graph
for edge_key in edge_keys:
# calculate it using the library
if graph_is_projected:
# calculate it using projected coordinates
if osm.KEY_OSMNX_GEOMETRY in network.edges[edge_key]:
if osm.KEY_OSMNX_GEOMETRY in network.edges[edge_key]:
# use geometry
length_dict[edge_key] = length(
network.edges[edge_key][osm.KEY_OSMNX_GEOMETRY]
)
)
else:
# use (projected) coordinates
start_point = Point(
(network.nodes[edge_key[0]][osm.KEY_OSMNX_X],
network.nodes[edge_key[0]][osm.KEY_OSMNX_Y])
(
network.nodes[edge_key[0]][osm.KEY_OSMNX_X],
network.nodes[edge_key[0]][osm.KEY_OSMNX_Y],
)
)
end_point = Point(
(network.nodes[edge_key[1]][osm.KEY_OSMNX_X],
network.nodes[edge_key[1]][osm.KEY_OSMNX_Y])
(
network.nodes[edge_key[1]][osm.KEY_OSMNX_X],
network.nodes[edge_key[1]][osm.KEY_OSMNX_Y],
)
)
length_dict[edge_key] = start_point.distance(end_point)
else:
# calculate it using unprojected coordinates (lat/long)
if osm.KEY_OSMNX_GEOMETRY in network.edges[edge_key]:
if osm.KEY_OSMNX_GEOMETRY in network.edges[edge_key]:
# use geometry
length_dict[edge_key] = great_circle_distance_along_path(
network.edges[edge_key][osm.KEY_OSMNX_GEOMETRY]
)
)
else:
# use (unprojected) coordinates
length_dict[edge_key] = great_circle(
lat1=network.nodes[edge_key[0]][osm.KEY_OSMNX_Y],
lon1=network.nodes[edge_key[0]][osm.KEY_OSMNX_X],
lat2=network.nodes[edge_key[1]][osm.KEY_OSMNX_Y],
lon2=network.nodes[edge_key[1]][osm.KEY_OSMNX_X]
)
lat1=network.nodes[edge_key[0]][osm.KEY_OSMNX_Y],
lon1=network.nodes[edge_key[0]][osm.KEY_OSMNX_X],
lat2=network.nodes[edge_key[1]][osm.KEY_OSMNX_Y],
lon2=network.nodes[edge_key[1]][osm.KEY_OSMNX_X],
)
# return the dict with lengths of each edge
return length_dict
# *****************************************************************************
# *****************************************************************************
def great_circle_distance_along_path(path: LineString) -> float:
"""
Computes the great circle distance along a given path.
The distance is to be calculated using a shapely LineString object made of
(longitude, latitude) coordinate tuples. The calculation is vectorised.
......@@ -118,16 +124,18 @@ def great_circle_distance_along_path(path: LineString) -> float:
# sum individual distances and return
return sum(
great_circle(
lat[:-1], # latitudes of starting points
lon[:-1], # longitudes of starting points
lat[:-1], # latitudes of starting points
lon[:-1], # longitudes of starting points
lat[1:], # latitudes of ending points
lon[1:] # longitudes of ending points
)
lon[1:], # longitudes of ending points
)
)
# *****************************************************************************
# *****************************************************************************
def update_street_count(network: MultiDiGraph):
"""
Updates the street count attributes of nodes in a MultiDiGraph object.
......@@ -145,22 +153,26 @@ def update_street_count(network: MultiDiGraph):
# update street count
street_count_dict = count_streets_per_node(network)
network.add_nodes_from(
((key, {osm.KEY_OSMNX_STREET_COUNT:value})
for key, value in street_count_dict.items())
(
(key, {osm.KEY_OSMNX_STREET_COUNT: value})
for key, value in street_count_dict.items()
)
)
# *****************************************************************************
# *****************************************************************************
def node_path_length(network: MultiDiGraph,
path: list,
return_minimum_length_only: bool = True) -> list or float:
def node_path_length(
network: MultiDiGraph, path: list, return_minimum_length_only: bool = True
) -> list or float:
"""
Returns the length or lengths of a path defined using nodes.
If more than one edge connects adjacent nodes along the path, a length value
will be returned for each possible path combination.
Parameters
----------
network : MultiDiGraph
......@@ -176,15 +188,15 @@ def node_path_length(network: MultiDiGraph,
The path\'s length or all lengths consistent with the path provided.
"""
# direction matters
path_length = len(path)
if path_length == 0:
return inf
# if the path is given as a list of node keys, then it is subjective
# i.e., it may refer to many paths, namely if parallel edges exist
# check if the path object qualifies as such
if not is_path(network, path):
# it does not, exit
......@@ -192,69 +204,64 @@ def node_path_length(network: MultiDiGraph,
return inf
else:
return [inf]
# prepare a list with all possible paths given as lists of edge keys
list_of_edge_key_paths = [[]] # a list of edge key lists
list_of_edge_key_paths = [[]] # a list of edge key lists
# for each pair of nodes in the path
for node_pair in range(path_length-1):
for node_pair in range(path_length - 1):
# get the edges between these two nodes
edge_keys = ident.get_edges_from_a_to_b(
network,
path[node_pair],
path[node_pair+1]
)
network, path[node_pair], path[node_pair + 1]
)
number_edge_keys = len(edge_keys)
if number_edge_keys == 1:
# only one edge exists: append its key to all existing lists/paths
for edge_key_path in list_of_edge_key_paths:
edge_key_path.append(edge_keys[0])
else: # multiple edges exist: each path identified so far has to be
if number_edge_keys == 1:
# only one edge exists: append its key to all existing lists/paths
for edge_key_path in list_of_edge_key_paths:
edge_key_path.append(edge_keys[0])
else: # multiple edges exist: each path identified so far has to be
# replicated a total of number_edge_keys times and then updated
number_paths = len(list_of_edge_key_paths)
# for each parallel edge
for edge_key_index in range(number_edge_keys-1):
# replicate all paths
for path_index in range(number_paths):
number_paths = len(list_of_edge_key_paths)
# for each parallel edge
for edge_key_index in range(number_edge_keys - 1):
# replicate all paths
for path_index in range(number_paths):
list_of_edge_key_paths.append(
list(list_of_edge_key_paths[path_index])
)
# paths have been replicated, now add the edges
for edge_key_index in range(number_edge_keys):
for path_index in range(number_paths):
# add the new edge
)
# paths have been replicated, now add the edges
for edge_key_index in range(number_edge_keys):
for path_index in range(number_paths):
# add the new edge
list_of_edge_key_paths[
path_index+edge_key_index*number_paths
].append(
edge_keys[edge_key_index]
)
path_index + edge_key_index * number_paths
].append(edge_keys[edge_key_index])
# *************************************************************************
path_lenths = [
sum(network.edges[edge_key][osm.KEY_OSMNX_LENGTH]
for edge_key in edge_key_path)
sum(network.edges[edge_key][osm.KEY_OSMNX_LENGTH] for edge_key in edge_key_path)
for edge_key_path in list_of_edge_key_paths
]
if return_minimum_length_only:
return min(path_lenths)
else:
]
if return_minimum_length_only:
return min(path_lenths)
else:
return path_lenths
# *************************************************************************
# *****************************************************************************
# *****************************************************************************
def edge_path_length(network: MultiDiGraph,
path: list,
**kwargs) -> float:
def edge_path_length(network: MultiDiGraph, path: list, **kwargs) -> float:
"""
Returns the total length of a path defined using edges.
If the path does not exist, or if no path is provided, the result will be
infinity (math.inf).
Parameters
----------
network : MultiDiGraph
......@@ -268,29 +275,29 @@ def edge_path_length(network: MultiDiGraph,
The path\'s length or all lengths consistent with the path provided.
"""
# check the number of
# check the number of
path_length = len(path)
if path_length == 0:
return inf
if ident.is_edge_path(network, path, **kwargs):
return sum(
network.edges[edge_key][osm.KEY_OSMNX_LENGTH] for edge_key in path
)
return sum(network.edges[edge_key][osm.KEY_OSMNX_LENGTH] for edge_key in path)
else:
# no path provided
return inf
# *****************************************************************************
# *****************************************************************************
def count_ocurrences(gdf: GeoDataFrame,
column: str,
column_entries: list = None) -> dict:
def count_ocurrences(
gdf: GeoDataFrame, column: str, column_entries: list = None
) -> dict:
"""
Counts the number of occurrences per entry in a DataFrame object's column.
If a list is provided, only the entries that match those in the list are
If a list is provided, only the entries that match those in the list are
counted. If no list is provided, all unique entries are counted.
Parameters
......@@ -309,7 +316,7 @@ def count_ocurrences(gdf: GeoDataFrame,
A dictionary with the counts whose keys are the values counted.
"""
if type(column_entries) == list:
# find entries also present in the dict
# initialise dict
......@@ -317,12 +324,12 @@ def count_ocurrences(gdf: GeoDataFrame,
# for each key in the dict
for key in column_entries:
# # store the number of rows
# count_dict[key] = gdf[gdf[column]==key].shape[0]
# count_dict[key] = gdf[gdf[column]==key].shape[0]
# count the number of rows with this key
if isna(key):
count_dict[key] = gdf[gdf[column].isnull()].shape[0]
count_dict[key] = gdf[gdf[column].isnull()].shape[0]
else:
count_dict[key] = gdf[gdf[column]==key].shape[0]
count_dict[key] = gdf[gdf[column] == key].shape[0]
else:
# find all unique entries
# initialise dict
......@@ -333,12 +340,13 @@ def count_ocurrences(gdf: GeoDataFrame,
# it is, skip
continue
# it is not, count and store the number of rows with said entry
if isna(entry): #type(entry) == type(None):
count_dict[entry] = gdf[gdf[column].isnull()].shape[0]
if isna(entry): # type(entry) == type(None):
count_dict[entry] = gdf[gdf[column].isnull()].shape[0]
else:
count_dict[entry] = gdf[gdf[column]==entry].shape[0]
count_dict[entry] = gdf[gdf[column] == entry].shape[0]
# return statement
return count_dict
# *****************************************************************************
# *****************************************************************************
# *****************************************************************************
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
......@@ -5,14 +5,14 @@
# general
KEY_OSM_CITY = 'addr:city'
KEY_OSM_COUNTRY = 'addr:country'
KEY_OSM_HOUSE_NUMBER = 'addr:housenumber'
KEY_OSM_MUNICIPALITY = 'addr:municipality'
KEY_OSM_PLACE = 'addr:place'
KEY_OSM_POSTCODE = 'addr:postcode'
KEY_OSM_STREET = 'addr:street'
KEY_OSM_SOURCE = 'source'
KEY_OSM_CITY = "addr:city"
KEY_OSM_COUNTRY = "addr:country"
KEY_OSM_HOUSE_NUMBER = "addr:housenumber"
KEY_OSM_MUNICIPALITY = "addr:municipality"
KEY_OSM_PLACE = "addr:place"
KEY_OSM_POSTCODE = "addr:postcode"
KEY_OSM_STREET = "addr:street"
KEY_OSM_SOURCE = "source"
KEYS_OSM = [
KEY_OSM_CITY,
......@@ -22,41 +22,39 @@ KEYS_OSM = [
KEY_OSM_PLACE,
KEY_OSM_POSTCODE,
KEY_OSM_STREET,
KEY_OSM_SOURCE
]
KEY_OSM_SOURCE,
]
# country specific
KEY_COUNTRY_DK = 'dk'
KEY_COUNTRY_DK = "dk"
KEY_OSM_DK_BUILDING_ENTRANCE_ID = 'osak:identifier'
KEY_OSM_DK_BUILDING_ENTRANCE_ID = "osak:identifier"
KEY_OSM_BUILDING_ENTRANCE_ID = {KEY_COUNTRY_DK: KEY_OSM_DK_BUILDING_ENTRANCE_ID}
KEY_OSM_BUILDING_ENTRANCE_ID = {
KEY_COUNTRY_DK: KEY_OSM_DK_BUILDING_ENTRANCE_ID
}
# *****************************************************************************
# osmnx
KEY_OSMNX_OSMID = 'osmid'
KEY_OSMNX_ELEMENT_TYPE = 'element_type'
KEY_OSMNX_OSMID = "osmid"
KEY_OSMNX_ELEMENT_TYPE = "element_type"
KEY_OSMNX_NAME = 'name'
KEY_OSMNX_GEOMETRY = 'geometry'
KEY_OSMNX_REVERSED = 'reversed'
KEY_OSMNX_LENGTH = 'length'
KEY_OSMNX_ONEWAY = 'oneway'
KEY_OSMNX_X = 'x'
KEY_OSMNX_Y = 'y'
KEY_OSMNX_LON = 'lon'
KEY_OSMNX_LAT = 'lat'
KEY_OSMNX_STREET_COUNT = 'street_count'
KEY_OSMNX_NAME = "name"
KEY_OSMNX_GEOMETRY = "geometry"
KEY_OSMNX_REVERSED = "reversed"
KEY_OSMNX_LENGTH = "length"
KEY_OSMNX_ONEWAY = "oneway"
KEY_OSMNX_X = "x"
KEY_OSMNX_Y = "y"
KEY_OSMNX_LON = "lon"
KEY_OSMNX_LAT = "lat"
KEY_OSMNX_STREET_COUNT = "street_count"
KEYS_OSMNX = [
KEY_OSMNX_OSMID, # one half of multi-index for geodataframes from osmnx
KEY_OSMNX_ELEMENT_TYPE, # the other half of the multi-index from osmnx
KEY_OSMNX_OSMID, # one half of multi-index for geodataframes from osmnx
KEY_OSMNX_ELEMENT_TYPE, # the other half of the multi-index from osmnx
KEY_OSMNX_NAME,
KEY_OSMNX_GEOMETRY,
KEY_OSMNX_REVERSED,
......@@ -66,8 +64,8 @@ KEYS_OSMNX = [
KEY_OSMNX_Y,
KEY_OSMNX_LON,
KEY_OSMNX_LAT,
KEY_OSMNX_STREET_COUNT
]
KEY_OSMNX_STREET_COUNT,
]
KEYS_OSMNX_NODES = {
KEY_OSMNX_OSMID,
......@@ -77,28 +75,24 @@ KEYS_OSMNX_NODES = {
KEY_OSMNX_Y,
KEY_OSMNX_LON,
KEY_OSMNX_LAT,
KEY_OSMNX_STREET_COUNT
}
KEY_OSMNX_STREET_COUNT,
}
KEYS_OSMNX_NODES_ESSENTIAL = {
KEY_OSMNX_OSMID,
KEY_OSMNX_NAME,
KEY_OSMNX_STREET_COUNT
}
KEYS_OSMNX_NODES_ESSENTIAL = {KEY_OSMNX_OSMID, KEY_OSMNX_NAME, KEY_OSMNX_STREET_COUNT}
KEYS_OSMNX_EDGES = {
KEY_OSMNX_OSMID,
KEY_OSMNX_LENGTH,
KEY_OSMNX_ONEWAY,
KEY_OSMNX_GEOMETRY,
KEY_OSMNX_REVERSED
}
KEY_OSMNX_REVERSED,
}
KEYS_OSMNX_EDGES_ESSENTIAL = {
KEY_OSMNX_OSMID,
KEY_OSMNX_LENGTH,
KEY_OSMNX_ONEWAY,
KEY_OSMNX_REVERSED
}
KEY_OSMNX_REVERSED,
}
# *****************************************************************************
\ No newline at end of file
# *****************************************************************************
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.