Newer
Older
# imports
# standard
import math
from statistics import mean
# local
# import numpy as np
# import networkx as nx
import pyomo.environ as pyo
# import src.topupopt.problems.esipp.utils as utils
from src.topupopt.data.misc.utils import generate_pseudo_unique_key
from src.topupopt.problems.esipp.problem import InfrastructurePlanningProblem
from src.topupopt.problems.esipp.network import Arcs, Network
from src.topupopt.problems.esipp.resource import ResourcePrice
from src.topupopt.problems.esipp.problem import simplify_peak_total_problem
from src.topupopt.problems.esipp.problem import is_peak_total_problem
from src.topupopt.problems.esipp.time import TimeFrame
# *****************************************************************************
# *****************************************************************************
def build_solve_ipp(
self,
solver: str = "glpk",
solver_options: dict = None,
use_sos_arcs: bool = False,
arc_sos_weight_key: str = (InfrastructurePlanningProblem.SOS1_ARC_WEIGHTS_NONE),
arc_use_real_variables_if_possible: bool = False,
use_sos_sense: bool = False,
sense_sos_weight_key: int = (
InfrastructurePlanningProblem.SOS1_SENSE_WEIGHT_NOMINAL_HIGHER
),
sense_use_real_variables_if_possible: bool = False,
sense_use_arc_interfaces: bool = False,
perform_analysis: bool = False,
plot_results: bool = False,
print_solver_output: bool = False,
static_losses_mode=None,
mandatory_arcs: list = None,
max_number_parallel_arcs: dict = None,
arc_groups_dict: dict = None,
init_aux_sets: bool = False,
discount_rates: dict = None,
assessment_weights: dict = None,
simplify_problem: bool = False,
):
if type(assessment_weights) != dict:
if type(converters) != dict:
converters = {}
# time weights
# relative weight of time period
# one interval twice as long as the average is worth twice
# one interval half as long as the average is worth half
# time_weights = [
# [time_period_duration/average_time_interval_duration
# for time_period_duration in intraperiod_time_interval_duration]
# for p in range(number_periods)]
time_weights = None # nothing yet
normalised_time_interval_duration = None # nothing yet
# create problem object
ipp = InfrastructurePlanningProblem(
time_frame=time_frame,
# reporting_periods=time_frame.reporting_periods,
# time_intervals=time_frame.time_interval_durations,
time_weights=time_weights,
normalised_time_interval_duration=normalised_time_interval_duration,
# add networks and systems
for netkey, net in networks.items():
ipp.add_network(network_key=netkey, network=net)
# add converters
for cvtkey, cvt in converters.items():
ipp.add_converter(converter_key=cvtkey, converter=cvt)
# define arcs as mandatory
if type(mandatory_arcs) == list:
for full_arc_key in mandatory_arcs:
ipp.make_arc_mandatory(full_arc_key[0], full_arc_key[1:])
# if make_all_arcs_mandatory:
# for network_key in ipp.networks:
# for arc_key in ipp.networks[network_key].edges(keys=True):
# # preexisting arcs are no good
# if ipp.networks[network_key].edges[arc_key][
# Network.KEY_ARC_TECH].has_been_selected():
# ipp.make_arc_mandatory(network_key, arc_key)
# set up the use of sos for arc selection
if use_sos_arcs:
for network_key in ipp.networks:
for arc_key in ipp.networks[network_key].edges(keys=True):
if (
ipp.networks[network_key]
.edges[arc_key][Network.KEY_ARC_TECH]
.has_been_selected()
):
continue
ipp.use_sos1_for_arc_selection(
arc_key,
use_real_variables_if_possible=(
arc_use_real_variables_if_possible
),
sos1_weight_method=arc_sos_weight_key,
)
# set up the use of sos for flow sense determination
if use_sos_sense:
for network_key in ipp.networks:
for arc_key in ipp.networks[network_key].edges(keys=True):
if not ipp.networks[network_key].edges[arc_key][
continue
ipp.use_sos1_for_flow_senses(
arc_key,
use_real_variables_if_possible=(
sense_use_real_variables_if_possible
use_interface_variables=sense_use_arc_interfaces,
sos1_weight_method=sense_sos_weight_key,
)
elif sense_use_arc_interfaces: # set up the use of arc interfaces w/o sos1
for network_key in ipp.networks:
for arc_key in ipp.networks[network_key].edges(keys=True):
if (
ipp.networks[network_key]
.edges[arc_key][Network.KEY_ARC_TECH]
.has_been_selected()
):
continue
ipp.use_interface_variables_for_arc_selection(network_key, arc_key)
# static losses
if static_losses_mode == ipp.STATIC_LOSS_MODE_ARR:
ipp.place_static_losses_arrival_node()
elif static_losses_mode == ipp.STATIC_LOSS_MODE_DEP:
ipp.place_static_losses_departure_node()
elif static_losses_mode == ipp.STATIC_LOSS_MODE_US:
ipp.place_static_losses_upstream()
elif static_losses_mode == ipp.STATIC_LOSS_MODE_DS:
ipp.place_static_losses_downstream()
else:
raise ValueError("Unknown static loss modelling mode.")
# *********************************************************************
# groups
if type(arc_groups_dict) != type(None):
for key in arc_groups_dict:
ipp.create_arc_group(arc_groups_dict[key])
# *********************************************************************
# maximum number of parallel arcs
for key in max_number_parallel_arcs:
ipp.set_maximum_number_parallel_arcs(
network_key=key[0],
node_a=key[1],
node_b=key[2],
limit=max_number_parallel_arcs[key],
)
# *********************************************************************
if simplify_problem:
ipp = simplify_peak_total_problem(ipp)
# *********************************************************************
# instantiate (disable the default case v-a-v fixed losses)
# ipp.instantiate(place_fixed_losses_upstream_if_possible=False)
ipp.instantiate(initialise_ancillary_sets=init_aux_sets)
# optimise
ipp.optimise(
solver_name=solver,
solver_options=solver_options,
output_options={},
print_solver_output=print_solver_output,
)
# return the problem object
return ipp
# *********************************************************************
# *********************************************************************
# *************************************************************************
# *************************************************************************
def test_single_network_single_arc_problem(self):
# scenario
q = 0
# time
number_intervals = 3
reporting_periods={q: (0, 1)},
reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)},
time_intervals={q: (0, 1, 2)},
time_interval_durations={q: (1, 1, 1)},
# 2 nodes: one import, one regular
mynet = Network()
node_IMP = generate_pseudo_unique_key(mynet.nodes())
mynet.add_import_node(
prices={
# (q, p, k): ResourcePrice(prices=1.0, volumes=None)
# for p in range(number_periods)
# for k in range(number_intervals)
qpk: ResourcePrice(prices=1.0, volumes=None)
for qpk in tf.qpk()
# other nodes
node_A = generate_pseudo_unique_key(mynet.nodes())
mynet.add_source_sink_node(
node_key=node_A,
# base_flow=[0.5, 0.0, 1.0],
base_flow={(q, 0): 0.50, (q, 1): 0.00, (q, 2): 1.00},
)
# arc IA
arc_tech_IA = Arcs(
efficiency_reverse=None,
static_loss=None,
capacity=[3],
minimum_cost=[2],
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)
# identify node types
mynet.identify_node_types()
# no sos, regular time intervals
ipp = self.build_solve_ipp(
# solver=solver,
solver_options={},
# use_sos_arcs=use_sos_arcs,
# arc_sos_weight_key=sos_weight_key,
# arc_use_real_variables_if_possible=use_real_variables_if_possible,
# use_sos_sense=use_sos_sense,
# sense_sos_weight_key=sense_sos_weight_key,
# sense_use_real_variables_if_possible=sense_use_real_variables_if_possible,
# sense_use_arc_interfaces=use_arc_interfaces,
perform_analysis=False,
print_solver_output=False,
# irregular_time_intervals=irregular_time_intervals,
networks={"mynet": mynet},
static_losses_mode=True, # just to reach a line,
mandatory_arcs=[],
max_number_parallel_arcs={},
discount_rates={0: tuple([0.035, 0.035])},
# init_aux_sets=init_aux_sets,
assert is_peak_total_problem(ipp)
assert ipp.results["Problem"][0]["Number of constraints"] == 24
assert ipp.results["Problem"][0]["Number of variables"] == 22
assert ipp.results["Problem"][0]["Number of nonzeros"] == 49
# *********************************************************************
# *********************************************************************
# validation
# the arc should be installed since it is required for feasibility
assert (
True
in ipp.networks["mynet"]
.edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# the flows should be 1.0, 0.0 and 2.0
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]),
1.0,
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 1)]),
0.0,
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 2)]),
2.0,
# arc amplitude should be two
assert math.isclose(
pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]),
2.0,
# capex should be four
assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3)
# sdncf should be -5.7
assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -5.7, abs_tol=1e-3)
# the objective function should be -9.7
assert math.isclose(pyo.value(ipp.instance.obj_f), -9.7, abs_tol=1e-3)
# *************************************************************************
# *************************************************************************
def test_single_network_single_arc_problem_simpler(self):
# scenario
q = 0
# time
number_intervals = 3
reporting_periods={q: (0, 1)},
reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)},
time_intervals={q: (0, 1, 2)},
time_interval_durations={q: (1, 1, 1)},
# 2 nodes: one import, one regular
mynet = Network()
# node_IMP = generate_pseudo_unique_key(mynet.nodes())
mynet.add_import_node(
prices={
# (q, p, k): ResourcePrice(prices=1.0, volumes=None)
# for p in range(number_periods)
# for k in range(number_intervals)
qpk: ResourcePrice(prices=1.0, volumes=None)
for qpk in tf.qpk()
# other nodes
# node_A = generate_pseudo_unique_key(mynet.nodes())
mynet.add_source_sink_node(
node_key=node_A,
# base_flow=[0.5, 0.0, 1.0],
base_flow={(q, 0): 0.50, (q, 1): 0.00, (q, 2): 1.00},
)
# arc IA
arc_tech_IA = Arcs(
efficiency_reverse=None,
static_loss=None,
capacity=[3],
minimum_cost=[2],
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)
# identify node types
mynet.identify_node_types()
# no sos, regular time intervals
ipp = self.build_solve_ipp(
# solver=solver,
solver_options={},
# use_sos_arcs=use_sos_arcs,
# arc_sos_weight_key=sos_weight_key,
# arc_use_real_variables_if_possible=use_real_variables_if_possible,
# use_sos_sense=use_sos_sense,
# sense_sos_weight_key=sense_sos_weight_key,
# sense_use_real_variables_if_possible=sense_use_real_variables_if_possible,
# sense_use_arc_interfaces=use_arc_interfaces,
perform_analysis=False,
print_solver_output=False,
networks={"mynet": mynet},
static_losses_mode=True, # just to reach a line,
mandatory_arcs=[],
max_number_parallel_arcs={},
discount_rates={0: tuple([0.035, 0.035])},
# init_aux_sets=init_aux_sets,
assert is_peak_total_problem(ipp)
assert ipp.results["Problem"][0]["Number of constraints"] == 20
assert ipp.results["Problem"][0]["Number of variables"] == 19
assert ipp.results["Problem"][0]["Number of nonzeros"] == 36
# *********************************************************************
# *********************************************************************
# validation
# the arc should be installed since it is required for feasibility
assert (
True
in ipp.networks["mynet"]
.edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# capex should be four
assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3)
# the objective function should be -9.7
assert math.isclose(pyo.value(ipp.instance.obj_f), -9.7, abs_tol=1e-3)
Pedro L. Magalhães
committed
# TODO: create method to automate getting data from the command line
import io
import sys
from contextlib import redirect_stdout
Pedro L. Magalhães
committed
# print('wow wow wow')
# ipp.instance.constr_imp_flow_cost.pprint()
expected_string = """constr_imp_flow_cost : Size=4, Index=constr_imp_flow_cost_index, Active=True\n Key : Lower : Body : Upper : Active\n ('mynet', 'thatimpnode', 'peak', 0, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,0,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'peak', 1, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,1,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'total', 0, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,0,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'total', 1, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,1,0] : 0.0 : True\n"""
Pedro L. Magalhães
committed
cmd_output = io.StringIO()
sys.stdout = cmd_output
ipp.instance.constr_imp_flow_cost.pprint()
sys.stdout = sys.__stdout__
assert cmd_output.getvalue() == expected_string
Pedro L. Magalhães
committed
expected_string = """constr_exp_flow_revenue : Size=0, Index=constr_exp_flow_revenue_index, Active=True\n Key : Lower : Body : Upper : Active\n"""
f = io.StringIO()
with redirect_stdout(f):
ipp.instance.constr_exp_flow_revenue.pprint()
assert f.getvalue() == expected_string
Pedro L. Magalhães
committed
# try the whole model
# print('wow wow wow')
# ipp.instance.pprint()
# expected_string = """constr_imp_flow_cost : Size=4, Index=constr_imp_flow_cost_index, Active=True\n Key : Lower : Body : Upper : Active\n ('mynet', 'thatimpnode', 'peak', 0, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,0,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'peak', 1, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,1,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'total', 0, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,0,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'total', 1, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,1,0] : 0.0 : True\n"""
# cmd_output = io.StringIO()
# sys.stdout = cmd_output
# ipp.instance.pprint()
# sys.stdout = sys.__stdout__
# assert cmd_output.getvalue() == expected_string
# from contextlib import redirect_stdout
# import io
# ipp.instance.constr_imp_flow_cost.pprint() # only one constraint
# f = io.StringIO()
# with redirect_stdout(f):
# # ipp.instance.pprint() # full model
# ipp.instance.constr_imp_flow_cost.pprint() # only one constraint
# expected_string = r"""constr_imp_flow_cost : Size=4, Index=constr_imp_flow_cost_index, Active=True
# Key : Lower : Body : Upper : Active
# ('mynet', 'thatimpnode', 'peak', 0, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,0,0] : 0.0 : True
# ('mynet', 'thatimpnode', 'peak', 1, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,1,0] : 0.0 : True
# ('mynet', 'thatimpnode', 'total', 0, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,0,0] : 0.0 : True
# ('mynet', 'thatimpnode', 'total', 1, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,1,0] : 0.0 : True
# """
# assert expected_string == f.getvalue()
# from contextlib import redirect_stdout
# import io
# f = io.StringIO()
# with redirect_stdout(f):
# print('foobar')
# print(12)
# 12+3
# print('Got stdout: "{0}"'.format(f.getvalue()))
# *************************************************************************
# *************************************************************************
def test_problem_increasing_imp_prices(self):
# scenario
q = 0
# time
number_intervals = 1
# # periods
# number_periods = 1
reporting_periods={q: (0,)},
reporting_period_durations={q: (365 * 24 * 3600,)},
time_intervals={q: (0,)},
time_interval_durations={q: (1,)},
# 2 nodes: one import, one regular
mynet = Network()
node_IMP = generate_pseudo_unique_key(mynet.nodes())
mynet.add_import_node(
# (q, p, k): ResourcePrice(prices=[1.0, 2.0], volumes=[0.5, None])
# for p in range(number_periods)
# for k in range(number_intervals)
qpk: ResourcePrice(prices=[1.0, 2.0], volumes=[0.5, None])
for qpk in tf.qpk()
# other nodes
node_A = generate_pseudo_unique_key(mynet.nodes())
mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 1.0})
# arc IA
arc_tech_IA = Arcs(
efficiency={(q, 0): 0.5},
efficiency_reverse=None,
static_loss=None,
capacity=[3],
minimum_cost=[2],
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)
# identify node types
mynet.identify_node_types()
# no sos, regular time intervals
ipp = self.build_solve_ipp(
# solver=solver,
solver_options={},
# use_sos_arcs=use_sos_arcs,
# arc_sos_weight_key=sos_weight_key,
# arc_use_real_variables_if_possible=use_real_variables_if_possible,
# use_sos_sense=use_sos_sense,
# sense_sos_weight_key=sense_sos_weight_key,
# sense_use_real_variables_if_possible=sense_use_real_variables_if_possible,
# sense_use_arc_interfaces=use_arc_interfaces,
perform_analysis=False,
networks={"mynet": mynet},
static_losses_mode=True, # just to reach a line,
mandatory_arcs=[],
max_number_parallel_arcs={},
# init_aux_sets=init_aux_sets,
simplify_problem=False,
# reporting_periods={0: (0,)},
assert not is_peak_total_problem(ipp)
assert ipp.results["Problem"][0]["Number of constraints"] == 10
assert ipp.results["Problem"][0]["Number of variables"] == 11
assert ipp.results["Problem"][0]["Number of nonzeros"] == 20
# *********************************************************************
# *********************************************************************
# the arc should be installed since it is required for feasibility
assert (
True
in ipp.networks["mynet"]
.edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# the flows should be 1.0, 0.0 and 2.0
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]),
# arc amplitude should be two
assert math.isclose(
pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]),
# capex should be four
assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3)
assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -3.5, abs_tol=1e-3)
# the objective function should be -7.5
assert math.isclose(pyo.value(ipp.instance.obj_f), -7.5, abs_tol=1e-3)
# *************************************************************************
# *************************************************************************
def test_problem_decreasing_exp_prices(self):
# scenario
q = 0
# time
number_intervals = 1
# periods
number_periods = 1
reporting_periods={q: (0,)},
reporting_period_durations={q: (365 * 24 * 3600,)},
time_intervals={q: (0,)},
time_interval_durations={q: (1,)},
# 2 nodes: one export, one regular
mynet = Network()
node_EXP = generate_pseudo_unique_key(mynet.nodes())
mynet.add_export_node(
(q, p, k): ResourcePrice(prices=[2.0, 1.0], volumes=[0.5, None])
for p in range(number_periods)
for k in range(number_intervals)
# other nodes
node_A = generate_pseudo_unique_key(mynet.nodes())
mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): -1.0})
# arc IA
arc_tech_IA = Arcs(
efficiency={(q, 0): 0.5},
efficiency_reverse=None,
static_loss=None,
capacity=[3],
minimum_cost=[2],
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_EXP, arcs=arc_tech_IA)
# identify node types
mynet.identify_node_types()
# no sos, regular time intervals
ipp = self.build_solve_ipp(
# solver=solver,
solver_options={},
# use_sos_arcs=use_sos_arcs,
# arc_sos_weight_key=sos_weight_key,
# arc_use_real_variables_if_possible=use_real_variables_if_possible,
# use_sos_sense=use_sos_sense,
# sense_sos_weight_key=sense_sos_weight_key,
# sense_use_real_variables_if_possible=sense_use_real_variables_if_possible,
# sense_use_arc_interfaces=use_arc_interfaces,
perform_analysis=False,
networks={"mynet": mynet},
static_losses_mode=True, # just to reach a line,
mandatory_arcs=[],
max_number_parallel_arcs={},
# init_aux_sets=init_aux_sets,
simplify_problem=False,
# reporting_periods={0: (0,)},
assert not is_peak_total_problem(ipp)
assert ipp.results["Problem"][0]["Number of constraints"] == 10
assert ipp.results["Problem"][0]["Number of variables"] == 11
assert ipp.results["Problem"][0]["Number of nonzeros"] == 20
# *********************************************************************
# *********************************************************************
# the arc should be installed since it is required for feasibility
assert (
True
in ipp.networks["mynet"]
.edges[(node_A, node_EXP, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# the flows should be 1.0, 0.0 and 2.0
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 0)]),
# arc amplitude should be two
assert math.isclose(
pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_A, node_EXP, 0)]),
# capex should be four
assert math.isclose(pyo.value(ipp.instance.var_capex), 3.0, abs_tol=1e-3)
assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), 1.0, abs_tol=1e-3)
# the objective function should be -7.5
assert math.isclose(pyo.value(ipp.instance.obj_f), -2.0, abs_tol=1e-3)
# *************************************************************************
# *************************************************************************
def test_problem_increasing_imp_decreasing_exp_prices(self):
# scenario
q = 0
# time
number_intervals = 2
# periods
number_periods = 1
reporting_periods={q: (0,)},
reporting_period_durations={q: (365 * 24 * 3600,)},
time_intervals={q: (0,1)},
time_interval_durations={q: (1,1)},
# 3 nodes: one import, one export, one regular
mynet = Network()
node_IMP = generate_pseudo_unique_key(mynet.nodes())
mynet.add_import_node(
(q, p, k): ResourcePrice(prices=[1.0, 2.0], volumes=[0.5, None])
for p in range(number_periods)
for k in range(number_intervals)
# export node
node_EXP = generate_pseudo_unique_key(mynet.nodes())
mynet.add_export_node(
(q, p, k): ResourcePrice(prices=[2.0, 1.0], volumes=[0.5, None])
for p in range(number_periods)
for k in range(number_intervals)
# other nodes
node_A = generate_pseudo_unique_key(mynet.nodes())
mynet.add_source_sink_node(
node_key=node_A, base_flow={(q, 0): 1.0, (q, 1): -1.0}
)
# arc IA
arc_tech_IA = Arcs(
efficiency={(q, 0): 0.5, (q, 1): 0.5},
efficiency_reverse=None,
static_loss=None,
capacity=[3],
minimum_cost=[2],
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)
# arc AE
arc_tech_AE = Arcs(
efficiency={(q, 0): 0.5, (q, 1): 0.5},
efficiency_reverse=None,
static_loss=None,
capacity=[3],
minimum_cost=[2],
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_EXP, arcs=arc_tech_AE)
# identify node types
mynet.identify_node_types()
# no sos, regular time intervals
ipp = self.build_solve_ipp(
solver_options={},
perform_analysis=False,
networks={"mynet": mynet},
static_losses_mode=True, # just to reach a line,
mandatory_arcs=[],
max_number_parallel_arcs={},
simplify_problem=False,
assert not is_peak_total_problem(ipp)
assert ipp.results["Problem"][0]["Number of constraints"] == 23
assert ipp.results["Problem"][0]["Number of variables"] == 26
assert ipp.results["Problem"][0]["Number of nonzeros"] == 57
# *********************************************************************
# *********************************************************************
# the arc should be installed since it is required for feasibility
assert (
True
in ipp.networks["mynet"]
.edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# the arc should be installed since it is required for feasibility
assert (
True
in ipp.networks["mynet"]
.edges[(node_A, node_EXP, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# interval 0: import only
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]),
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 0)]),
# interval 1: export only
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 1)]),
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 1)]),
# IA amplitude
assert math.isclose(
pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]),
# AE amplitude
assert math.isclose(
pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_A, node_EXP, 0)]),
# capex should be 7.0: 4+3
assert math.isclose(pyo.value(ipp.instance.var_capex), 7.0, abs_tol=1e-3)
# sdncf should be -2.5: -3.5+1.0
assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -2.5, abs_tol=1e-3)
# the objective function should be -9.5: -7.5-2.5
assert math.isclose(pyo.value(ipp.instance.obj_f), -9.5, abs_tol=1e-3)
# *************************************************************************
# *************************************************************************
def test_problem_converter_sink(self):
# scenario
q = 0
# time
number_intervals = 3
# periods
number_periods = 1
tf = TimeFrame(
reporting_periods={q: (0,)},
reporting_period_durations={q: (365 * 24 * 3600,)},
time_intervals={q: (0,1,2)},
time_interval_durations={q: (1,1,1)},
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
)
# 2 nodes: one import, one regular
mynet = Network()
# import node
node_IMP = generate_pseudo_unique_key(mynet.nodes())
mynet.add_import_node(
node_key=node_IMP,
prices={
(q, p, k): ResourcePrice(prices=1.0, volumes=None)
for p in range(number_periods)
for k in range(number_intervals)
},
)
# other nodes
node_A = generate_pseudo_unique_key(mynet.nodes())
mynet.add_source_sink_node(
node_key=node_A,
# base_flow=[0.5, 0.0, 1.0],
base_flow={(q, 0): 0.50, (q, 1): 0.00, (q, 2): 1.00},
)
# arc IA
arc_tech_IA = Arcs(
name="any",
# efficiency=[0.5, 0.5, 0.5],
efficiency={(q, 0): 0.5, (q, 1): 0.5, (q, 2): 0.5},