# imports # standard import math from statistics import mean # local # import numpy as np # import networkx as nx import pyomo.environ as pyo # import src.topupopt.problems.esipp.utils as utils from src.topupopt.data.misc.utils import generate_pseudo_unique_key from src.topupopt.problems.esipp.problem import InfrastructurePlanningProblem from src.topupopt.problems.esipp.network import Arcs, Network from src.topupopt.problems.esipp.resource import ResourcePrice from src.topupopt.problems.esipp.problem import simplify_peak_total_problem from src.topupopt.problems.esipp.problem import is_peak_total_problem from src.topupopt.problems.esipp.time import TimeFrame # ***************************************************************************** # ***************************************************************************** class TestESIPPProblem: def build_solve_ipp( self, solver: str = "glpk", solver_options: dict = None, use_sos_arcs: bool = False, arc_sos_weight_key: str = (InfrastructurePlanningProblem.SOS1_ARC_WEIGHTS_NONE), arc_use_real_variables_if_possible: bool = False, use_sos_sense: bool = False, sense_sos_weight_key: int = ( InfrastructurePlanningProblem.SOS1_SENSE_WEIGHT_NOMINAL_HIGHER ), sense_use_real_variables_if_possible: bool = False, sense_use_arc_interfaces: bool = False, perform_analysis: bool = False, plot_results: bool = False, print_solver_output: bool = False, time_frame: TimeFrame = None, networks: dict = None, converters: dict = None, static_losses_mode=None, mandatory_arcs: list = None, max_number_parallel_arcs: dict = None, arc_groups_dict: dict = None, init_aux_sets: bool = False, discount_rates: dict = None, assessment_weights: dict = None, simplify_problem: bool = False, ): if type(assessment_weights) != dict: assessment_weights = {} # default if type(converters) != dict: converters = {} # time weights # relative weight of time period # one interval twice as long as the average is worth twice # one interval half as long as the average is worth half # time_weights = [ # [time_period_duration/average_time_interval_duration # for time_period_duration in intraperiod_time_interval_duration] # for p in range(number_periods)] time_weights = None # nothing yet normalised_time_interval_duration = None # nothing yet # create problem object ipp = InfrastructurePlanningProblem( discount_rates=discount_rates, time_frame=time_frame, # reporting_periods=time_frame.reporting_periods, # time_intervals=time_frame.time_interval_durations, time_weights=time_weights, normalised_time_interval_duration=normalised_time_interval_duration, assessment_weights=assessment_weights, ) # add networks and systems for netkey, net in networks.items(): ipp.add_network(network_key=netkey, network=net) # add converters for cvtkey, cvt in converters.items(): ipp.add_converter(converter_key=cvtkey, converter=cvt) # define arcs as mandatory if type(mandatory_arcs) == list: for full_arc_key in mandatory_arcs: ipp.make_arc_mandatory(full_arc_key[0], full_arc_key[1:]) # if make_all_arcs_mandatory: # for network_key in ipp.networks: # for arc_key in ipp.networks[network_key].edges(keys=True): # # preexisting arcs are no good # if ipp.networks[network_key].edges[arc_key][ # Network.KEY_ARC_TECH].has_been_selected(): # continue # ipp.make_arc_mandatory(network_key, arc_key) # set up the use of sos for arc selection if use_sos_arcs: for network_key in ipp.networks: for arc_key in ipp.networks[network_key].edges(keys=True): if ( ipp.networks[network_key] .edges[arc_key][Network.KEY_ARC_TECH] .has_been_selected() ): continue ipp.use_sos1_for_arc_selection( network_key, arc_key, use_real_variables_if_possible=( arc_use_real_variables_if_possible ), sos1_weight_method=arc_sos_weight_key, ) # set up the use of sos for flow sense determination if use_sos_sense: for network_key in ipp.networks: for arc_key in ipp.networks[network_key].edges(keys=True): if not ipp.networks[network_key].edges[arc_key][ Network.KEY_ARC_UND ]: continue ipp.use_sos1_for_flow_senses( network_key, arc_key, use_real_variables_if_possible=( sense_use_real_variables_if_possible ), use_interface_variables=sense_use_arc_interfaces, sos1_weight_method=sense_sos_weight_key, ) elif sense_use_arc_interfaces: # set up the use of arc interfaces w/o sos1 for network_key in ipp.networks: for arc_key in ipp.networks[network_key].edges(keys=True): if ( ipp.networks[network_key] .edges[arc_key][Network.KEY_ARC_TECH] .has_been_selected() ): continue ipp.use_interface_variables_for_arc_selection(network_key, arc_key) # static losses if static_losses_mode == ipp.STATIC_LOSS_MODE_ARR: ipp.place_static_losses_arrival_node() elif static_losses_mode == ipp.STATIC_LOSS_MODE_DEP: ipp.place_static_losses_departure_node() elif static_losses_mode == ipp.STATIC_LOSS_MODE_US: ipp.place_static_losses_upstream() elif static_losses_mode == ipp.STATIC_LOSS_MODE_DS: ipp.place_static_losses_downstream() else: raise ValueError("Unknown static loss modelling mode.") # ********************************************************************* # groups if type(arc_groups_dict) != type(None): for key in arc_groups_dict: ipp.create_arc_group(arc_groups_dict[key]) # ********************************************************************* # maximum number of parallel arcs for key in max_number_parallel_arcs: ipp.set_maximum_number_parallel_arcs( network_key=key[0], node_a=key[1], node_b=key[2], limit=max_number_parallel_arcs[key], ) # ********************************************************************* if simplify_problem: ipp = simplify_peak_total_problem(ipp) # ********************************************************************* # instantiate (disable the default case v-a-v fixed losses) # ipp.instantiate(place_fixed_losses_upstream_if_possible=False) ipp.instantiate(initialise_ancillary_sets=init_aux_sets) # optimise ipp.optimise( solver_name=solver, solver_options=solver_options, output_options={}, print_solver_output=print_solver_output, ) # return the problem object return ipp # ********************************************************************* # ********************************************************************* # ************************************************************************* # ************************************************************************* def test_single_network_single_arc_problem(self): # scenario q = 0 # time number_intervals = 3 tf = TimeFrame( reporting_periods={q: (0, 1)}, reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)}, time_intervals={q: (0, 1, 2)}, time_interval_durations={q: (1, 1, 1)}, ) # 2 nodes: one import, one regular mynet = Network() # import node node_IMP = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=node_IMP, prices={ # (q, p, k): ResourcePrice(prices=1.0, volumes=None) # for p in range(number_periods) # for k in range(number_intervals) qpk: ResourcePrice(prices=1.0, volumes=None) for qpk in tf.qpk() }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_A, # base_flow=[0.5, 0.0, 1.0], base_flow={(q, 0): 0.50, (q, 1): 0.00, (q, 2): 1.00}, ) # arc IA arc_tech_IA = Arcs( name="any", efficiency={qk: 0.5 for qk in tf.qk()}, efficiency_reverse=None, static_loss=None, capacity=[3], minimum_cost=[2], specific_capacity_cost=1, capacity_is_instantaneous=False, validate=False, ) mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( # solver=solver, solver_options={}, # use_sos_arcs=use_sos_arcs, # arc_sos_weight_key=sos_weight_key, # arc_use_real_variables_if_possible=use_real_variables_if_possible, # use_sos_sense=use_sos_sense, # sense_sos_weight_key=sense_sos_weight_key, # sense_use_real_variables_if_possible=sense_use_real_variables_if_possible, # sense_use_arc_interfaces=use_arc_interfaces, perform_analysis=False, plot_results=False, # True, print_solver_output=False, # irregular_time_intervals=irregular_time_intervals, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=True, # just to reach a line, mandatory_arcs=[], max_number_parallel_arcs={}, discount_rates={0: tuple([0.035, 0.035])}, # init_aux_sets=init_aux_sets, simplify_problem=False, ) assert is_peak_total_problem(ipp) assert ipp.results["Problem"][0]["Number of constraints"] == 24 assert ipp.results["Problem"][0]["Number of variables"] == 22 assert ipp.results["Problem"][0]["Number of nonzeros"] == 49 # ********************************************************************* # ********************************************************************* # validation # the arc should be installed since it is required for feasibility assert ( True in ipp.networks["mynet"] .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) # the flows should be 1.0, 0.0 and 2.0 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]), 1.0, abs_tol=1e-6, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 1)]), 0.0, abs_tol=1e-6, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 2)]), 2.0, abs_tol=1e-6, ) # arc amplitude should be two assert math.isclose( pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]), 2.0, abs_tol=0.01, ) # capex should be four assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3) # sdncf should be -5.7 assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -5.7, abs_tol=1e-3) # the objective function should be -9.7 assert math.isclose(pyo.value(ipp.instance.obj_f), -9.7, abs_tol=1e-3) # ************************************************************************* # ************************************************************************* def test_single_network_single_arc_problem_simpler(self): # scenario q = 0 # time number_intervals = 3 tf = TimeFrame( reporting_periods={q: (0, 1)}, reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)}, time_intervals={q: (0, 1, 2)}, time_interval_durations={q: (1, 1, 1)}, ) # 2 nodes: one import, one regular mynet = Network() # import node # node_IMP = generate_pseudo_unique_key(mynet.nodes()) node_IMP = "thatimpnode" mynet.add_import_node( node_key=node_IMP, prices={ # (q, p, k): ResourcePrice(prices=1.0, volumes=None) # for p in range(number_periods) # for k in range(number_intervals) qpk: ResourcePrice(prices=1.0, volumes=None) for qpk in tf.qpk() }, ) # other nodes # node_A = generate_pseudo_unique_key(mynet.nodes()) node_A = "thatnodea" mynet.add_source_sink_node( node_key=node_A, # base_flow=[0.5, 0.0, 1.0], base_flow={(q, 0): 0.50, (q, 1): 0.00, (q, 2): 1.00}, ) # arc IA arc_tech_IA = Arcs( name="any", efficiency={qk: 0.5 for qk in tf.qk()}, efficiency_reverse=None, static_loss=None, capacity=[3], minimum_cost=[2], specific_capacity_cost=1, capacity_is_instantaneous=False, validate=False, ) mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( # solver=solver, solver_options={}, # use_sos_arcs=use_sos_arcs, # arc_sos_weight_key=sos_weight_key, # arc_use_real_variables_if_possible=use_real_variables_if_possible, # use_sos_sense=use_sos_sense, # sense_sos_weight_key=sense_sos_weight_key, # sense_use_real_variables_if_possible=sense_use_real_variables_if_possible, # sense_use_arc_interfaces=use_arc_interfaces, perform_analysis=False, plot_results=False, # True, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=True, # just to reach a line, mandatory_arcs=[], max_number_parallel_arcs={}, discount_rates={0: tuple([0.035, 0.035])}, # init_aux_sets=init_aux_sets, simplify_problem=True, ) assert is_peak_total_problem(ipp) assert ipp.results["Problem"][0]["Number of constraints"] == 20 assert ipp.results["Problem"][0]["Number of variables"] == 19 assert ipp.results["Problem"][0]["Number of nonzeros"] == 36 # ********************************************************************* # ********************************************************************* # validation # the arc should be installed since it is required for feasibility assert ( True in ipp.networks["mynet"] .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) # capex should be four assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3) # the objective function should be -9.7 assert math.isclose(pyo.value(ipp.instance.obj_f), -9.7, abs_tol=1e-3) # TODO: create method to automate getting data from the command line import io import sys from contextlib import redirect_stdout # print('wow wow wow') # ipp.instance.constr_imp_flow_cost.pprint() expected_string = """constr_imp_flow_cost : Size=4, Index=constr_imp_flow_cost_index, Active=True\n Key : Lower : Body : Upper : Active\n ('mynet', 'thatimpnode', 'peak', 0, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,0,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'peak', 1, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,1,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'total', 0, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,0,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'total', 1, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,1,0] : 0.0 : True\n""" cmd_output = io.StringIO() sys.stdout = cmd_output ipp.instance.constr_imp_flow_cost.pprint() sys.stdout = sys.__stdout__ assert cmd_output.getvalue() == expected_string expected_string = """constr_exp_flow_revenue : Size=0, Index=constr_exp_flow_revenue_index, Active=True\n Key : Lower : Body : Upper : Active\n""" f = io.StringIO() with redirect_stdout(f): ipp.instance.constr_exp_flow_revenue.pprint() assert f.getvalue() == expected_string # try the whole model # print('wow wow wow') # ipp.instance.pprint() # expected_string = """constr_imp_flow_cost : Size=4, Index=constr_imp_flow_cost_index, Active=True\n Key : Lower : Body : Upper : Active\n ('mynet', 'thatimpnode', 'peak', 0, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,0,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'peak', 1, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,1,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'total', 0, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,0,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'total', 1, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,1,0] : 0.0 : True\n""" # cmd_output = io.StringIO() # sys.stdout = cmd_output # ipp.instance.pprint() # sys.stdout = sys.__stdout__ # assert cmd_output.getvalue() == expected_string # from contextlib import redirect_stdout # import io # ipp.instance.constr_imp_flow_cost.pprint() # only one constraint # f = io.StringIO() # with redirect_stdout(f): # # ipp.instance.pprint() # full model # ipp.instance.constr_imp_flow_cost.pprint() # only one constraint # expected_string = r"""constr_imp_flow_cost : Size=4, Index=constr_imp_flow_cost_index, Active=True # Key : Lower : Body : Upper : Active # ('mynet', 'thatimpnode', 'peak', 0, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,0,0] : 0.0 : True # ('mynet', 'thatimpnode', 'peak', 1, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,1,0] : 0.0 : True # ('mynet', 'thatimpnode', 'total', 0, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,0,0] : 0.0 : True # ('mynet', 'thatimpnode', 'total', 1, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,1,0] : 0.0 : True # """ # assert expected_string == f.getvalue() # from contextlib import redirect_stdout # import io # f = io.StringIO() # with redirect_stdout(f): # print('foobar') # print(12) # 12+3 # print('Got stdout: "{0}"'.format(f.getvalue())) # ************************************************************************* # ************************************************************************* def test_problem_increasing_imp_prices(self): # scenario q = 0 # time number_intervals = 1 # # periods # number_periods = 1 tf = TimeFrame( reporting_periods={q: (0,)}, reporting_period_durations={q: (365 * 24 * 3600,)}, time_intervals={q: (0,)}, time_interval_durations={q: (1,)}, ) # 2 nodes: one import, one regular mynet = Network() # import node node_IMP = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=node_IMP, prices={ # (q, p, k): ResourcePrice(prices=[1.0, 2.0], volumes=[0.5, None]) # for p in range(number_periods) # for k in range(number_intervals) qpk: ResourcePrice(prices=[1.0, 2.0], volumes=[0.5, None]) for qpk in tf.qpk() }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 1.0}) # arc IA arc_tech_IA = Arcs( name="any", efficiency={(q, 0): 0.5}, efficiency_reverse=None, static_loss=None, capacity=[3], minimum_cost=[2], specific_capacity_cost=1, capacity_is_instantaneous=False, validate=False, ) mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( # solver=solver, solver_options={}, # use_sos_arcs=use_sos_arcs, # arc_sos_weight_key=sos_weight_key, # arc_use_real_variables_if_possible=use_real_variables_if_possible, # use_sos_sense=use_sos_sense, # sense_sos_weight_key=sense_sos_weight_key, # sense_use_real_variables_if_possible=sense_use_real_variables_if_possible, # sense_use_arc_interfaces=use_arc_interfaces, perform_analysis=False, plot_results=False, # True, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=True, # just to reach a line, mandatory_arcs=[], max_number_parallel_arcs={}, # init_aux_sets=init_aux_sets, simplify_problem=False, # reporting_periods={0: (0,)}, discount_rates={0: (0.0,)}, ) assert not is_peak_total_problem(ipp) assert ipp.results["Problem"][0]["Number of constraints"] == 10 assert ipp.results["Problem"][0]["Number of variables"] == 11 assert ipp.results["Problem"][0]["Number of nonzeros"] == 20 # ********************************************************************* # ********************************************************************* # validation # the arc should be installed since it is required for feasibility assert ( True in ipp.networks["mynet"] .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) # the flows should be 1.0, 0.0 and 2.0 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]), 2.0, abs_tol=1e-6, ) # arc amplitude should be two assert math.isclose( pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]), 2.0, abs_tol=0.01, ) # capex should be four assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3) # sdncf should be -3.5 assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -3.5, abs_tol=1e-3) # the objective function should be -7.5 assert math.isclose(pyo.value(ipp.instance.obj_f), -7.5, abs_tol=1e-3) # ************************************************************************* # ************************************************************************* def test_problem_decreasing_exp_prices(self): # scenario q = 0 # time number_intervals = 1 # periods number_periods = 1 tf = TimeFrame( reporting_periods={q: (0,)}, reporting_period_durations={q: (365 * 24 * 3600,)}, time_intervals={q: (0,)}, time_interval_durations={q: (1,)}, ) # 2 nodes: one export, one regular mynet = Network() # import node node_EXP = generate_pseudo_unique_key(mynet.nodes()) mynet.add_export_node( node_key=node_EXP, prices={ (q, p, k): ResourcePrice(prices=[2.0, 1.0], volumes=[0.5, None]) for p in range(number_periods) for k in range(number_intervals) }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): -1.0}) # arc IA arc_tech_IA = Arcs( name="any", efficiency={(q, 0): 0.5}, efficiency_reverse=None, static_loss=None, capacity=[3], minimum_cost=[2], specific_capacity_cost=1, capacity_is_instantaneous=False, validate=False, ) mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_EXP, arcs=arc_tech_IA) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( # solver=solver, solver_options={}, # use_sos_arcs=use_sos_arcs, # arc_sos_weight_key=sos_weight_key, # arc_use_real_variables_if_possible=use_real_variables_if_possible, # use_sos_sense=use_sos_sense, # sense_sos_weight_key=sense_sos_weight_key, # sense_use_real_variables_if_possible=sense_use_real_variables_if_possible, # sense_use_arc_interfaces=use_arc_interfaces, perform_analysis=False, plot_results=False, # True, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=True, # just to reach a line, mandatory_arcs=[], max_number_parallel_arcs={}, # init_aux_sets=init_aux_sets, simplify_problem=False, # reporting_periods={0: (0,)}, discount_rates={0: (0.0,)}, ) assert not is_peak_total_problem(ipp) assert ipp.results["Problem"][0]["Number of constraints"] == 10 assert ipp.results["Problem"][0]["Number of variables"] == 11 assert ipp.results["Problem"][0]["Number of nonzeros"] == 20 # ********************************************************************* # ********************************************************************* # validation # the arc should be installed since it is required for feasibility assert ( True in ipp.networks["mynet"] .edges[(node_A, node_EXP, 0)][Network.KEY_ARC_TECH] .options_selected ) # the flows should be 1.0, 0.0 and 2.0 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 0)]), 1.0, abs_tol=1e-6, ) # arc amplitude should be two assert math.isclose( pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_A, node_EXP, 0)]), 1.0, abs_tol=0.01, ) # capex should be four assert math.isclose(pyo.value(ipp.instance.var_capex), 3.0, abs_tol=1e-3) # sdncf should be 1.0 assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), 1.0, abs_tol=1e-3) # the objective function should be -7.5 assert math.isclose(pyo.value(ipp.instance.obj_f), -2.0, abs_tol=1e-3) # ************************************************************************* # ************************************************************************* def test_problem_increasing_imp_decreasing_exp_prices(self): # scenario q = 0 # time number_intervals = 2 # periods number_periods = 1 tf = TimeFrame( reporting_periods={q: (0,)}, reporting_period_durations={q: (365 * 24 * 3600,)}, time_intervals={q: (0,1)}, time_interval_durations={q: (1,1)}, ) # 3 nodes: one import, one export, one regular mynet = Network() # import node node_IMP = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=node_IMP, prices={ (q, p, k): ResourcePrice(prices=[1.0, 2.0], volumes=[0.5, None]) for p in range(number_periods) for k in range(number_intervals) }, ) # export node node_EXP = generate_pseudo_unique_key(mynet.nodes()) mynet.add_export_node( node_key=node_EXP, prices={ (q, p, k): ResourcePrice(prices=[2.0, 1.0], volumes=[0.5, None]) for p in range(number_periods) for k in range(number_intervals) }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_A, base_flow={(q, 0): 1.0, (q, 1): -1.0} ) # arc IA arc_tech_IA = Arcs( name="any", efficiency={(q, 0): 0.5, (q, 1): 0.5}, efficiency_reverse=None, static_loss=None, capacity=[3], minimum_cost=[2], specific_capacity_cost=1, capacity_is_instantaneous=False, validate=False, ) mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA) # arc AE arc_tech_AE = Arcs( name="any", efficiency={(q, 0): 0.5, (q, 1): 0.5}, efficiency_reverse=None, static_loss=None, capacity=[3], minimum_cost=[2], specific_capacity_cost=1, capacity_is_instantaneous=False, validate=False, ) mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_EXP, arcs=arc_tech_AE) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=True, # just to reach a line, mandatory_arcs=[], max_number_parallel_arcs={}, simplify_problem=False, discount_rates={0: (0.0,)}, ) assert not is_peak_total_problem(ipp) assert ipp.results["Problem"][0]["Number of constraints"] == 23 assert ipp.results["Problem"][0]["Number of variables"] == 26 assert ipp.results["Problem"][0]["Number of nonzeros"] == 57 # ********************************************************************* # ********************************************************************* # validation # the arc should be installed since it is required for feasibility assert ( True in ipp.networks["mynet"] .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) # the arc should be installed since it is required for feasibility assert ( True in ipp.networks["mynet"] .edges[(node_A, node_EXP, 0)][Network.KEY_ARC_TECH] .options_selected ) # interval 0: import only assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]), 2.0, abs_tol=1e-6, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 0)]), 0.0, abs_tol=1e-6, ) # interval 1: export only assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 1)]), 0.0, abs_tol=1e-6, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 1)]), 1.0, abs_tol=1e-6, ) # IA amplitude assert math.isclose( pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]), 2.0, abs_tol=0.01, ) # AE amplitude assert math.isclose( pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_A, node_EXP, 0)]), 1.0, abs_tol=0.01, ) # capex should be 7.0: 4+3 assert math.isclose(pyo.value(ipp.instance.var_capex), 7.0, abs_tol=1e-3) # sdncf should be -2.5: -3.5+1.0 assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -2.5, abs_tol=1e-3) # the objective function should be -9.5: -7.5-2.5 assert math.isclose(pyo.value(ipp.instance.obj_f), -9.5, abs_tol=1e-3) # ************************************************************************* # ************************************************************************* def test_problem_converter_sink(self): # scenario q = 0 # time number_intervals = 3 # periods number_periods = 1 tf = TimeFrame( reporting_periods={q: (0,)}, reporting_period_durations={q: (365 * 24 * 3600,)}, time_intervals={q: (0,1,2)}, time_interval_durations={q: (1,1,1)}, ) # 2 nodes: one import, one regular mynet = Network() # import node node_IMP = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=node_IMP, prices={ (q, p, k): ResourcePrice(prices=1.0, volumes=None) for p in range(number_periods) for k in range(number_intervals) }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_A, # base_flow=[0.5, 0.0, 1.0], base_flow={(q, 0): 0.50, (q, 1): 0.00, (q, 2): 1.00}, ) # arc IA arc_tech_IA = Arcs( name="any", # efficiency=[0.5, 0.5, 0.5], efficiency={(q, 0): 0.5, (q, 1): 0.5, (q, 2): 0.5}, efficiency_reverse=None, static_loss=None, capacity=[3], minimum_cost=[2], specific_capacity_cost=1, capacity_is_instantaneous=False, validate=False, ) mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA) # identify node types mynet.identify_node_types() # converters # number of samples time_step_durations = [1, 1, 1] number_time_steps = len(time_step_durations) # get the coefficients import numpy as np # a_innk a_innk = { ("cvt1", 0, 0, 0): 0.95, ("cvt1", 0, 0, 1): 0.95, ("cvt1", 0, 0, 2): 0.95, } # b_inmk b_inmk = {("cvt1", 0, 0, 0): 3, ("cvt1", 0, 0, 1): 3, ("cvt1", 0, 0, 2): 3} # c_irnk c_irnk = {} # d_irmk d_irmk = {} # e_x_ink: depends on fixed signals e_x_ink = {} # e_y_irk: depends on fixed signals e_y_irk = {} # get the signals inputs, states, outputs = get_two_node_model_signals(number_time_steps) # create a dynamic system ds = dynsys.DynamicSystem( time_interval_durations=time_step_durations, A=a, B=b, C=c, D=d ) # create a converter cvn1 = cvn.Converter( "cvn1", sys=ds, initial_states=x0, turn_key_cost=3, inputs=inputs, states=states, outputs=outputs, ) # no sos, regular time intervals ipp = self.build_solve_ipp( # solver=solver, solver_options={}, # use_sos_arcs=use_sos_arcs, # arc_sos_weight_key=sos_weight_key, # arc_use_real_variables_if_possible=use_real_variables_if_possible, # use_sos_sense=use_sos_sense, # sense_sos_weight_key=sense_sos_weight_key, # sense_use_real_variables_if_possible=sense_use_real_variables_if_possible, # sense_use_arc_interfaces=use_arc_interfaces, perform_analysis=False, plot_results=False, # True, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, converters={"mycvt": cvt}, static_losses_mode=True, # just to reach a line, mandatory_arcs=[], max_number_parallel_arcs={}, # init_aux_sets=init_aux_sets, simplify_problem=False, ) assert is_peak_total_problem(ipp) assert ipp.results["Problem"][0]["Number of constraints"] == 24 assert ipp.results["Problem"][0]["Number of variables"] == 22 assert ipp.results["Problem"][0]["Number of nonzeros"] == 49 # ********************************************************************* # ********************************************************************* # validation # the arc should be installed since it is required for feasibility assert ( True in ipp.networks["mynet"] .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) # the flows should be 1.0, 0.0 and 2.0 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]), 1.0, abs_tol=1e-6, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 1)]), 0.0, abs_tol=1e-6, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 2)]), 2.0, abs_tol=1e-6, ) # arc amplitude should be two assert math.isclose( pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]), 2.0, abs_tol=0.01, ) # capex should be four assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3) # sdncf should be -5.7 assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -5.7, abs_tol=1e-3) # the objective function should be -9.7 assert math.isclose(pyo.value(ipp.instance.obj_f), -9.7, abs_tol=1e-3) # ***************************************************************************** # *****************************************************************************