Newer
Older
# imports
# standard
import math
from statistics import mean
# local
# import numpy as np
# import networkx as nx
import pyomo.environ as pyo
# import src.topupopt.problems.esipp.utils as utils
from src.topupopt.data.misc.utils import generate_pseudo_unique_key
from src.topupopt.problems.esipp.problem import InfrastructurePlanningProblem
from src.topupopt.problems.esipp.network import Arcs, Network
from src.topupopt.problems.esipp.resource import ResourcePrice
from src.topupopt.problems.esipp.problem import simplify_peak_total_problem
from src.topupopt.problems.esipp.problem import is_peak_total_problem
from src.topupopt.problems.esipp.time import TimeFrame
# *****************************************************************************
# *****************************************************************************
def build_solve_ipp(
self,
solver: str = "glpk",
solver_options: dict = None,
use_sos_arcs: bool = False,
arc_sos_weight_key: str = (InfrastructurePlanningProblem.SOS1_ARC_WEIGHTS_NONE),
arc_use_real_variables_if_possible: bool = False,
use_sos_sense: bool = False,
sense_sos_weight_key: int = (
InfrastructurePlanningProblem.SOS1_SENSE_WEIGHT_NOMINAL_HIGHER
),
sense_use_real_variables_if_possible: bool = False,
sense_use_arc_interfaces: bool = False,
perform_analysis: bool = False,
plot_results: bool = False,
print_solver_output: bool = False,
static_losses_mode=None,
mandatory_arcs: list = None,
max_number_parallel_arcs: dict = None,
arc_groups_dict: dict = None,
init_aux_sets: bool = False,
discount_rates: dict = None,
assessment_weights: dict = None,
simplify_problem: bool = False,
):
if type(assessment_weights) != dict:
if type(converters) != dict:
converters = {}
# time weights
# relative weight of time period
# one interval twice as long as the average is worth twice
# one interval half as long as the average is worth half
# time_weights = [
# [time_period_duration/average_time_interval_duration
# for time_period_duration in intraperiod_time_interval_duration]
# for p in range(number_periods)]
time_weights = None # nothing yet
normalised_time_interval_duration = None # nothing yet
# create problem object
ipp = InfrastructurePlanningProblem(
time_frame=time_frame,
# reporting_periods=time_frame.reporting_periods,
# time_intervals=time_frame.time_interval_durations,
time_weights=time_weights,
normalised_time_interval_duration=normalised_time_interval_duration,
# add networks and systems
for netkey, net in networks.items():
ipp.add_network(network_key=netkey, network=net)
# add converters
for cvtkey, cvt in converters.items():
ipp.add_converter(converter_key=cvtkey, converter=cvt)
# define arcs as mandatory
if type(mandatory_arcs) == list:
for full_arc_key in mandatory_arcs:
ipp.make_arc_mandatory(full_arc_key[0], full_arc_key[1:])
# if make_all_arcs_mandatory:
# for network_key in ipp.networks:
# for arc_key in ipp.networks[network_key].edges(keys=True):
# # preexisting arcs are no good
# if ipp.networks[network_key].edges[arc_key][
# Network.KEY_ARC_TECH].has_been_selected():
# ipp.make_arc_mandatory(network_key, arc_key)
# set up the use of sos for arc selection
if use_sos_arcs:
for network_key in ipp.networks:
for arc_key in ipp.networks[network_key].edges(keys=True):
if (
ipp.networks[network_key]
.edges[arc_key][Network.KEY_ARC_TECH]
.has_been_selected()
):
continue
ipp.use_sos1_for_arc_selection(
arc_key,
use_real_variables_if_possible=(
arc_use_real_variables_if_possible
),
sos1_weight_method=arc_sos_weight_key,
)
# set up the use of sos for flow sense determination
if use_sos_sense:
for network_key in ipp.networks:
for arc_key in ipp.networks[network_key].edges(keys=True):
if not ipp.networks[network_key].edges[arc_key][
continue
ipp.use_sos1_for_flow_senses(
arc_key,
use_real_variables_if_possible=(
sense_use_real_variables_if_possible
use_interface_variables=sense_use_arc_interfaces,
sos1_weight_method=sense_sos_weight_key,
)
elif sense_use_arc_interfaces: # set up the use of arc interfaces w/o sos1
for network_key in ipp.networks:
for arc_key in ipp.networks[network_key].edges(keys=True):
if (
ipp.networks[network_key]
.edges[arc_key][Network.KEY_ARC_TECH]
.has_been_selected()
):
continue
ipp.use_interface_variables_for_arc_selection(network_key, arc_key)
# static losses
if static_losses_mode == ipp.STATIC_LOSS_MODE_ARR:
ipp.place_static_losses_arrival_node()
elif static_losses_mode == ipp.STATIC_LOSS_MODE_DEP:
ipp.place_static_losses_departure_node()
elif static_losses_mode == ipp.STATIC_LOSS_MODE_US:
ipp.place_static_losses_upstream()
elif static_losses_mode == ipp.STATIC_LOSS_MODE_DS:
ipp.place_static_losses_downstream()
else:
raise ValueError("Unknown static loss modelling mode.")
# *********************************************************************
# groups
if type(arc_groups_dict) != type(None):
for key in arc_groups_dict:
ipp.create_arc_group(arc_groups_dict[key])
# *********************************************************************
# maximum number of parallel arcs
for key in max_number_parallel_arcs:
ipp.set_maximum_number_parallel_arcs(
network_key=key[0],
node_a=key[1],
node_b=key[2],
limit=max_number_parallel_arcs[key],
)
# *********************************************************************
if simplify_problem:
ipp = simplify_peak_total_problem(ipp)
# *********************************************************************
# instantiate (disable the default case v-a-v fixed losses)
# ipp.instantiate(place_fixed_losses_upstream_if_possible=False)
ipp.instantiate(initialise_ancillary_sets=init_aux_sets)
# optimise
ipp.optimise(
solver_name=solver,
solver_options=solver_options,
output_options={},
print_solver_output=print_solver_output,
)
# return the problem object
return ipp
# *********************************************************************
# *********************************************************************
# *************************************************************************
# *************************************************************************
def test_single_network_single_arc_problem(self):
# scenario
q = 0
# time
number_intervals = 3
reporting_periods={q: (0, 1)},
reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)},
time_intervals={q: (0, 1, 2)},
time_interval_durations={q: (1, 1, 1)},
# 2 nodes: one import, one regular
mynet = Network()
node_IMP = generate_pseudo_unique_key(mynet.nodes())
mynet.add_import_node(
prices={
# (q, p, k): ResourcePrice(prices=1.0, volumes=None)
# for p in range(number_periods)
# for k in range(number_intervals)
qpk: ResourcePrice(prices=1.0, volumes=None)
for qpk in tf.qpk()
# other nodes
node_A = generate_pseudo_unique_key(mynet.nodes())
mynet.add_source_sink_node(
node_key=node_A,
# base_flow=[0.5, 0.0, 1.0],
base_flow={(q, 0): 0.50, (q, 1): 0.00, (q, 2): 1.00},
)
# arc IA
arc_tech_IA = Arcs(
efficiency_reverse=None,
static_loss=None,
capacity=[3],
minimum_cost=[2],
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)
# identify node types
mynet.identify_node_types()
# no sos, regular time intervals
ipp = self.build_solve_ipp(
# solver=solver,
solver_options={},
# use_sos_arcs=use_sos_arcs,
# arc_sos_weight_key=sos_weight_key,
# arc_use_real_variables_if_possible=use_real_variables_if_possible,
# use_sos_sense=use_sos_sense,
# sense_sos_weight_key=sense_sos_weight_key,
# sense_use_real_variables_if_possible=sense_use_real_variables_if_possible,
# sense_use_arc_interfaces=use_arc_interfaces,
perform_analysis=False,
print_solver_output=False,
# irregular_time_intervals=irregular_time_intervals,
networks={"mynet": mynet},
static_losses_mode=True, # just to reach a line,
mandatory_arcs=[],
max_number_parallel_arcs={},
discount_rates={0: tuple([0.035, 0.035])},
# init_aux_sets=init_aux_sets,
assert is_peak_total_problem(ipp)
assert ipp.results["Problem"][0]["Number of constraints"] == 24
assert ipp.results["Problem"][0]["Number of variables"] == 22
assert ipp.results["Problem"][0]["Number of nonzeros"] == 49
# *********************************************************************
# *********************************************************************
# validation
# the arc should be installed since it is required for feasibility
assert (
True
in ipp.networks["mynet"]
.edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# the flows should be 1.0, 0.0 and 2.0
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]),
1.0,
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 1)]),
0.0,
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 2)]),
2.0,
# arc amplitude should be two
assert math.isclose(
pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]),
2.0,
# capex should be four
assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3)
# sdncf should be -5.7
assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -5.7, abs_tol=1e-3)
# the objective function should be -9.7
assert math.isclose(pyo.value(ipp.instance.obj_f), -9.7, abs_tol=1e-3)
# *************************************************************************
# *************************************************************************
def test_single_network_single_arc_problem_simpler(self):
# scenario
q = 0
# time
number_intervals = 3
reporting_periods={q: (0, 1)},
reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)},
time_intervals={q: (0, 1, 2)},
time_interval_durations={q: (1, 1, 1)},
# 2 nodes: one import, one regular
mynet = Network()
# node_IMP = generate_pseudo_unique_key(mynet.nodes())
mynet.add_import_node(
prices={
# (q, p, k): ResourcePrice(prices=1.0, volumes=None)
# for p in range(number_periods)
# for k in range(number_intervals)
qpk: ResourcePrice(prices=1.0, volumes=None)
for qpk in tf.qpk()
# other nodes
# node_A = generate_pseudo_unique_key(mynet.nodes())
mynet.add_source_sink_node(
node_key=node_A,
# base_flow=[0.5, 0.0, 1.0],
base_flow={(q, 0): 0.50, (q, 1): 0.00, (q, 2): 1.00},
)
# arc IA
arc_tech_IA = Arcs(
efficiency_reverse=None,
static_loss=None,
capacity=[3],
minimum_cost=[2],
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)
# identify node types
mynet.identify_node_types()
# no sos, regular time intervals
ipp = self.build_solve_ipp(
# solver=solver,
solver_options={},
# use_sos_arcs=use_sos_arcs,
# arc_sos_weight_key=sos_weight_key,
# arc_use_real_variables_if_possible=use_real_variables_if_possible,
# use_sos_sense=use_sos_sense,
# sense_sos_weight_key=sense_sos_weight_key,
# sense_use_real_variables_if_possible=sense_use_real_variables_if_possible,
# sense_use_arc_interfaces=use_arc_interfaces,
perform_analysis=False,
print_solver_output=False,
networks={"mynet": mynet},
static_losses_mode=True, # just to reach a line,
mandatory_arcs=[],
max_number_parallel_arcs={},
discount_rates={0: tuple([0.035, 0.035])},
# init_aux_sets=init_aux_sets,
assert is_peak_total_problem(ipp)
assert ipp.results["Problem"][0]["Number of constraints"] == 20
assert ipp.results["Problem"][0]["Number of variables"] == 19
assert ipp.results["Problem"][0]["Number of nonzeros"] == 36
# *********************************************************************
# *********************************************************************
# validation
# the arc should be installed since it is required for feasibility
assert (
True
in ipp.networks["mynet"]
.edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# capex should be four
assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3)
# the objective function should be -9.7
assert math.isclose(pyo.value(ipp.instance.obj_f), -9.7, abs_tol=1e-3)
Pedro L. Magalhães
committed
# TODO: create method to automate getting data from the command line
import io
import sys
from contextlib import redirect_stdout
Pedro L. Magalhães
committed
# print('wow wow wow')
# ipp.instance.constr_imp_flow_cost.pprint()
expected_string = """constr_imp_flow_cost : Size=4, Index=constr_imp_flow_cost_index, Active=True\n Key : Lower : Body : Upper : Active\n ('mynet', 'thatimpnode', 'peak', 0, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,0,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'peak', 1, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,1,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'total', 0, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,0,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'total', 1, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,1,0] : 0.0 : True\n"""
Pedro L. Magalhães
committed
cmd_output = io.StringIO()
sys.stdout = cmd_output
ipp.instance.constr_imp_flow_cost.pprint()
sys.stdout = sys.__stdout__
assert cmd_output.getvalue() == expected_string
Pedro L. Magalhães
committed
expected_string = """constr_exp_flow_revenue : Size=0, Index=constr_exp_flow_revenue_index, Active=True\n Key : Lower : Body : Upper : Active\n"""
f = io.StringIO()
with redirect_stdout(f):
ipp.instance.constr_exp_flow_revenue.pprint()
assert f.getvalue() == expected_string
Pedro L. Magalhães
committed
# try the whole model
# print('wow wow wow')
# ipp.instance.pprint()
# expected_string = """constr_imp_flow_cost : Size=4, Index=constr_imp_flow_cost_index, Active=True\n Key : Lower : Body : Upper : Active\n ('mynet', 'thatimpnode', 'peak', 0, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,0,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'peak', 1, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,1,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'total', 0, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,0,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'total', 1, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,1,0] : 0.0 : True\n"""
# cmd_output = io.StringIO()
# sys.stdout = cmd_output
# ipp.instance.pprint()
# sys.stdout = sys.__stdout__
# assert cmd_output.getvalue() == expected_string
# from contextlib import redirect_stdout
# import io
# ipp.instance.constr_imp_flow_cost.pprint() # only one constraint
# f = io.StringIO()
# with redirect_stdout(f):
# # ipp.instance.pprint() # full model
# ipp.instance.constr_imp_flow_cost.pprint() # only one constraint
# expected_string = r"""constr_imp_flow_cost : Size=4, Index=constr_imp_flow_cost_index, Active=True
# Key : Lower : Body : Upper : Active
# ('mynet', 'thatimpnode', 'peak', 0, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,0,0] : 0.0 : True
# ('mynet', 'thatimpnode', 'peak', 1, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,1,0] : 0.0 : True
# ('mynet', 'thatimpnode', 'total', 0, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,0,0] : 0.0 : True
# ('mynet', 'thatimpnode', 'total', 1, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,1,0] : 0.0 : True
# """
# assert expected_string == f.getvalue()
# from contextlib import redirect_stdout
# import io
# f = io.StringIO()
# with redirect_stdout(f):
# print('foobar')
# print(12)
# 12+3
# print('Got stdout: "{0}"'.format(f.getvalue()))
# *************************************************************************
# *************************************************************************
def test_problem_increasing_imp_prices(self):
# scenario
q = 0
# time
number_intervals = 1
# # periods
# number_periods = 1
reporting_periods={q: (0,)},
reporting_period_durations={q: (365 * 24 * 3600,)},
time_intervals={q: (0,)},
time_interval_durations={q: (1,)},
# 2 nodes: one import, one regular
mynet = Network()
node_IMP = generate_pseudo_unique_key(mynet.nodes())
mynet.add_import_node(
# (q, p, k): ResourcePrice(prices=[1.0, 2.0], volumes=[0.5, None])
# for p in range(number_periods)
# for k in range(number_intervals)
qpk: ResourcePrice(prices=[1.0, 2.0], volumes=[0.5, None])
for qpk in tf.qpk()
# other nodes
node_A = generate_pseudo_unique_key(mynet.nodes())
mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 1.0})
# arc IA
arc_tech_IA = Arcs(
efficiency={(q, 0): 0.5},
efficiency_reverse=None,
static_loss=None,
capacity=[3],
minimum_cost=[2],
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)
# identify node types
mynet.identify_node_types()
# no sos, regular time intervals
ipp = self.build_solve_ipp(
# solver=solver,
solver_options={},
# use_sos_arcs=use_sos_arcs,
# arc_sos_weight_key=sos_weight_key,
# arc_use_real_variables_if_possible=use_real_variables_if_possible,
# use_sos_sense=use_sos_sense,
# sense_sos_weight_key=sense_sos_weight_key,
# sense_use_real_variables_if_possible=sense_use_real_variables_if_possible,
# sense_use_arc_interfaces=use_arc_interfaces,
perform_analysis=False,
networks={"mynet": mynet},
static_losses_mode=True, # just to reach a line,
mandatory_arcs=[],
max_number_parallel_arcs={},
# init_aux_sets=init_aux_sets,
simplify_problem=False,
# reporting_periods={0: (0,)},
assert not is_peak_total_problem(ipp)
assert ipp.results["Problem"][0]["Number of constraints"] == 10
assert ipp.results["Problem"][0]["Number of variables"] == 11
assert ipp.results["Problem"][0]["Number of nonzeros"] == 20
# *********************************************************************
# *********************************************************************
# the arc should be installed since it is required for feasibility
assert (
True
in ipp.networks["mynet"]
.edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# the flows should be 1.0, 0.0 and 2.0
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]),
# arc amplitude should be two
assert math.isclose(
pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]),
# capex should be four
assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3)
assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -3.5, abs_tol=1e-3)
# the objective function should be -7.5
assert math.isclose(pyo.value(ipp.instance.obj_f), -7.5, abs_tol=1e-3)
# *************************************************************************
# *************************************************************************
def test_problem_decreasing_exp_prices(self):
# scenario
q = 0
# time
number_intervals = 1
# periods
number_periods = 1
reporting_periods={q: (0,)},
reporting_period_durations={q: (365 * 24 * 3600,)},
time_intervals={q: (0,)},
time_interval_durations={q: (1,)},
# 2 nodes: one export, one regular
mynet = Network()
node_EXP = generate_pseudo_unique_key(mynet.nodes())
mynet.add_export_node(
(q, p, k): ResourcePrice(prices=[2.0, 1.0], volumes=[0.5, None])
for p in range(number_periods)
for k in range(number_intervals)
# other nodes
node_A = generate_pseudo_unique_key(mynet.nodes())
mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): -1.0})
# arc IA
arc_tech_IA = Arcs(
efficiency={(q, 0): 0.5},
efficiency_reverse=None,
static_loss=None,
capacity=[3],
minimum_cost=[2],
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_EXP, arcs=arc_tech_IA)
# identify node types
mynet.identify_node_types()
# no sos, regular time intervals
ipp = self.build_solve_ipp(
# solver=solver,
solver_options={},
# use_sos_arcs=use_sos_arcs,
# arc_sos_weight_key=sos_weight_key,
# arc_use_real_variables_if_possible=use_real_variables_if_possible,
# use_sos_sense=use_sos_sense,
# sense_sos_weight_key=sense_sos_weight_key,
# sense_use_real_variables_if_possible=sense_use_real_variables_if_possible,
# sense_use_arc_interfaces=use_arc_interfaces,
perform_analysis=False,
networks={"mynet": mynet},
static_losses_mode=True, # just to reach a line,
mandatory_arcs=[],
max_number_parallel_arcs={},
# init_aux_sets=init_aux_sets,
simplify_problem=False,
# reporting_periods={0: (0,)},
assert not is_peak_total_problem(ipp)
assert ipp.results["Problem"][0]["Number of constraints"] == 10
assert ipp.results["Problem"][0]["Number of variables"] == 11
assert ipp.results["Problem"][0]["Number of nonzeros"] == 20
# *********************************************************************
# *********************************************************************
# the arc should be installed since it is required for feasibility
assert (
True
in ipp.networks["mynet"]
.edges[(node_A, node_EXP, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# the flows should be 1.0, 0.0 and 2.0
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 0)]),
# arc amplitude should be two
assert math.isclose(
pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_A, node_EXP, 0)]),
# capex should be four
assert math.isclose(pyo.value(ipp.instance.var_capex), 3.0, abs_tol=1e-3)
assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), 1.0, abs_tol=1e-3)
# the objective function should be -7.5
assert math.isclose(pyo.value(ipp.instance.obj_f), -2.0, abs_tol=1e-3)
# *************************************************************************
# *************************************************************************
def test_problem_increasing_imp_decreasing_exp_prices(self):
# scenario
q = 0
# time
number_intervals = 2
# periods
number_periods = 1
reporting_periods={q: (0,)},
reporting_period_durations={q: (365 * 24 * 3600,)},
time_intervals={q: (0,1)},
time_interval_durations={q: (1,1)},
# 3 nodes: one import, one export, one regular
mynet = Network()
node_IMP = generate_pseudo_unique_key(mynet.nodes())
mynet.add_import_node(
(q, p, k): ResourcePrice(prices=[1.0, 2.0], volumes=[0.5, None])
for p in range(number_periods)
for k in range(number_intervals)
# export node
node_EXP = generate_pseudo_unique_key(mynet.nodes())
mynet.add_export_node(
(q, p, k): ResourcePrice(prices=[2.0, 1.0], volumes=[0.5, None])
for p in range(number_periods)
for k in range(number_intervals)
# other nodes
node_A = generate_pseudo_unique_key(mynet.nodes())
mynet.add_source_sink_node(
node_key=node_A, base_flow={(q, 0): 1.0, (q, 1): -1.0}
)
# arc IA
arc_tech_IA = Arcs(
efficiency={(q, 0): 0.5, (q, 1): 0.5},
efficiency_reverse=None,
static_loss=None,
capacity=[3],
minimum_cost=[2],
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)
# arc AE
arc_tech_AE = Arcs(
efficiency={(q, 0): 0.5, (q, 1): 0.5},
efficiency_reverse=None,
static_loss=None,
capacity=[3],
minimum_cost=[2],
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_EXP, arcs=arc_tech_AE)
# identify node types
mynet.identify_node_types()
# no sos, regular time intervals
ipp = self.build_solve_ipp(
solver_options={},
perform_analysis=False,
networks={"mynet": mynet},
static_losses_mode=True, # just to reach a line,
mandatory_arcs=[],
max_number_parallel_arcs={},
simplify_problem=False,
assert not is_peak_total_problem(ipp)
assert ipp.results["Problem"][0]["Number of constraints"] == 23
assert ipp.results["Problem"][0]["Number of variables"] == 26
assert ipp.results["Problem"][0]["Number of nonzeros"] == 57
# *********************************************************************
# *********************************************************************
# the arc should be installed since it is required for feasibility
assert (
True
in ipp.networks["mynet"]
.edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# the arc should be installed since it is required for feasibility
assert (
True
in ipp.networks["mynet"]
.edges[(node_A, node_EXP, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# interval 0: import only
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]),
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 0)]),
# interval 1: export only
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 1)]),
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 1)]),
# IA amplitude
assert math.isclose(
pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]),
# AE amplitude
assert math.isclose(
pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_A, node_EXP, 0)]),
# capex should be 7.0: 4+3
assert math.isclose(pyo.value(ipp.instance.var_capex), 7.0, abs_tol=1e-3)
# sdncf should be -2.5: -3.5+1.0
assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -2.5, abs_tol=1e-3)
# the objective function should be -9.5: -7.5-2.5
assert math.isclose(pyo.value(ipp.instance.obj_f), -9.5, abs_tol=1e-3)
# *************************************************************************
# *************************************************************************
def test_problem_converter_sink(self):
# scenario
q = 0
# time
number_intervals = 3
# periods
number_periods = 1
tf = TimeFrame(
reporting_periods={q: (0,)},
reporting_period_durations={q: (365 * 24 * 3600,)},
time_intervals={q: (0,1,2)},
time_interval_durations={q: (1,1,1)},
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
)
# 2 nodes: one import, one regular
mynet = Network()
# import node
node_IMP = generate_pseudo_unique_key(mynet.nodes())
mynet.add_import_node(
node_key=node_IMP,
prices={
(q, p, k): ResourcePrice(prices=1.0, volumes=None)
for p in range(number_periods)
for k in range(number_intervals)
},
)
# other nodes
node_A = generate_pseudo_unique_key(mynet.nodes())
mynet.add_source_sink_node(
node_key=node_A,
# base_flow=[0.5, 0.0, 1.0],
base_flow={(q, 0): 0.50, (q, 1): 0.00, (q, 2): 1.00},
)
# arc IA
arc_tech_IA = Arcs(
name="any",
# efficiency=[0.5, 0.5, 0.5],
efficiency={(q, 0): 0.5, (q, 1): 0.5, (q, 2): 0.5},
efficiency_reverse=None,
static_loss=None,
capacity=[3],
minimum_cost=[2],
specific_capacity_cost=1,
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)
# identify node types
mynet.identify_node_types()
# converters
# number of samples
time_step_durations = [1, 1, 1]
number_time_steps = len(time_step_durations)
# get the coefficients
import numpy as np
# a_innk
a_innk = {
("cvt1", 0, 0, 0): 0.95,
("cvt1", 0, 0, 1): 0.95,
("cvt1", 0, 0, 2): 0.95,
}
# b_inmk
b_inmk = {("cvt1", 0, 0, 0): 3, ("cvt1", 0, 0, 1): 3, ("cvt1", 0, 0, 2): 3}
# c_irnk
c_irnk = {}
# d_irmk
d_irmk = {}
# e_x_ink: depends on fixed signals
e_x_ink = {}
# e_y_irk: depends on fixed signals
e_y_irk = {}
# get the signals
inputs, states, outputs = get_two_node_model_signals(number_time_steps)
# create a dynamic system
ds = dynsys.DynamicSystem(
time_interval_durations=time_step_durations, A=a, B=b, C=c, D=d
)
# create a converter
cvn1 = cvn.Converter(
"cvn1",
sys=ds,
initial_states=x0,
turn_key_cost=3,
inputs=inputs,
states=states,
outputs=outputs,
)
# no sos, regular time intervals
ipp = self.build_solve_ipp(
# solver=solver,
solver_options={},
# use_sos_arcs=use_sos_arcs,
# arc_sos_weight_key=sos_weight_key,
# arc_use_real_variables_if_possible=use_real_variables_if_possible,
# use_sos_sense=use_sos_sense,
# sense_sos_weight_key=sense_sos_weight_key,
# sense_use_real_variables_if_possible=sense_use_real_variables_if_possible,
# sense_use_arc_interfaces=use_arc_interfaces,
perform_analysis=False,
plot_results=False, # True,
print_solver_output=False,
time_frame=tf,
networks={"mynet": mynet},
converters={"mycvt": cvt},
static_losses_mode=True, # just to reach a line,
mandatory_arcs=[],
max_number_parallel_arcs={},
# init_aux_sets=init_aux_sets,
simplify_problem=False,
)
assert is_peak_total_problem(ipp)
assert ipp.results["Problem"][0]["Number of constraints"] == 24
assert ipp.results["Problem"][0]["Number of variables"] == 22
assert ipp.results["Problem"][0]["Number of nonzeros"] == 49
# *********************************************************************
# *********************************************************************
# validation
# the arc should be installed since it is required for feasibility
assert (
True
in ipp.networks["mynet"]
.edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# the flows should be 1.0, 0.0 and 2.0
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]),
1.0,
abs_tol=1e-6,
)
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 1)]),
0.0,
abs_tol=1e-6,
)
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 2)]),
2.0,
abs_tol=1e-6,
)
# arc amplitude should be two
assert math.isclose(
pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]),
2.0,
abs_tol=0.01,
)
# capex should be four
assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3)
# sdncf should be -5.7
assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -5.7, abs_tol=1e-3)
# the objective function should be -9.7
assert math.isclose(pyo.value(ipp.instance.obj_f), -9.7, abs_tol=1e-3)
# *****************************************************************************
# *****************************************************************************