# imports # standard import math # local # import numpy as np # import networkx as nx import pyomo.environ as pyo # import src.topupopt.problems.esipp.utils as utils from src.topupopt.data.misc.utils import generate_pseudo_unique_key from src.topupopt.problems.esipp.problem import InfrastructurePlanningProblem from src.topupopt.problems.esipp.network import Arcs, Network from src.topupopt.problems.esipp.network import ArcsWithoutStaticLosses from src.topupopt.problems.esipp.resource import ResourcePrice # from src.topupopt.problems.esipp.utils import compute_cost_volume_metrics from src.topupopt.problems.esipp.utils import statistics from src.topupopt.problems.esipp.time import EconomicTimeFrame # from src.topupopt.problems.esipp.converter import Converter # ***************************************************************************** # ***************************************************************************** class TestESIPPProblem: solver = 'glpk' # solver = 'scip' # solver = 'cbc' def build_solve_ipp( self, solver: str = None, solver_options: dict = None, use_sos_arcs: bool = False, arc_sos_weight_key: str = (InfrastructurePlanningProblem.SOS1_ARC_WEIGHTS_NONE), arc_use_real_variables_if_possible: bool = False, use_sos_sense: bool = False, sense_sos_weight_key: int = ( InfrastructurePlanningProblem.SOS1_SENSE_WEIGHT_NOMINAL_HIGHER ), sense_use_real_variables_if_possible: bool = False, sense_use_arc_interfaces: bool = False, perform_analysis: bool = False, plot_results: bool = False, print_solver_output: bool = False, time_frame: EconomicTimeFrame = None, networks: dict = None, converters: dict = None, static_losses_mode=None, mandatory_arcs: list = None, max_number_parallel_arcs: dict = None, arc_groups_dict: dict = None, init_aux_sets: bool = False, # discount_rates: dict = None, assessment_weights: dict = None, simplify_problem: bool = False, ): if type(solver) == type(None): solver = self.solver if type(assessment_weights) != dict: assessment_weights = {} # default if type(converters) != dict: converters = {} # time weights # relative weight of time period # one interval twice as long as the average is worth twice # one interval half as long as the average is worth half # time_weights = [ # [time_period_duration/average_time_interval_duration # for time_period_duration in intraperiod_time_interval_duration] # for p in range(number_periods)] time_weights = None # nothing yet normalised_time_interval_duration = None # nothing yet # create problem object ipp = InfrastructurePlanningProblem( # discount_rates=discount_rates, time_frame=time_frame, # reporting_periods=time_frame.reporting_periods, # time_intervals=time_frame.time_interval_durations, time_weights=time_weights, normalised_time_interval_duration=normalised_time_interval_duration, assessment_weights=assessment_weights, ) # add networks and systems for netkey, net in networks.items(): ipp.add_network(network_key=netkey, network=net) # add converters for cvtkey, cvt in converters.items(): ipp.add_converter(converter_key=cvtkey, converter=cvt) # define arcs as mandatory if type(mandatory_arcs) == list: for full_arc_key in mandatory_arcs: ipp.make_arc_mandatory(full_arc_key[0], full_arc_key[1:]) # if make_all_arcs_mandatory: # for network_key in ipp.networks: # for arc_key in ipp.networks[network_key].edges(keys=True): # # preexisting arcs are no good # if ipp.networks[network_key].edges[arc_key][ # Network.KEY_ARC_TECH].has_been_selected(): # continue # ipp.make_arc_mandatory(network_key, arc_key) # set up the use of sos for arc selection if use_sos_arcs: for network_key in ipp.networks: for arc_key in ipp.networks[network_key].edges(keys=True): if ( ipp.networks[network_key] .edges[arc_key][Network.KEY_ARC_TECH] .has_been_selected() ): continue ipp.use_sos1_for_arc_selection( network_key, arc_key, use_real_variables_if_possible=( arc_use_real_variables_if_possible ), sos1_weight_method=arc_sos_weight_key, ) # set up the use of sos for flow sense determination if use_sos_sense: for network_key in ipp.networks: for arc_key in ipp.networks[network_key].edges(keys=True): if not ipp.networks[network_key].edges[arc_key][ Network.KEY_ARC_UND ]: continue ipp.use_sos1_for_flow_senses( network_key, arc_key, use_real_variables_if_possible=( sense_use_real_variables_if_possible ), use_interface_variables=sense_use_arc_interfaces, sos1_weight_method=sense_sos_weight_key, ) elif sense_use_arc_interfaces: # set up the use of arc interfaces w/o sos1 for network_key in ipp.networks: for arc_key in ipp.networks[network_key].edges(keys=True): if ( ipp.networks[network_key] .edges[arc_key][Network.KEY_ARC_TECH] .has_been_selected() ): continue ipp.use_interface_variables_for_arc_selection(network_key, arc_key) # static losses if static_losses_mode == ipp.STATIC_LOSS_MODE_ARR: ipp.place_static_losses_arrival_node() elif static_losses_mode == ipp.STATIC_LOSS_MODE_DEP: ipp.place_static_losses_departure_node() elif static_losses_mode == ipp.STATIC_LOSS_MODE_US: ipp.place_static_losses_upstream() elif static_losses_mode == ipp.STATIC_LOSS_MODE_DS: ipp.place_static_losses_downstream() else: raise ValueError("Unknown static loss modelling mode.") # ********************************************************************* # groups if type(arc_groups_dict) != type(None): for key in arc_groups_dict: ipp.create_arc_group(arc_groups_dict[key]) # ********************************************************************* # maximum number of parallel arcs for key in max_number_parallel_arcs: ipp.set_maximum_number_parallel_arcs( network_key=key[0], node_a=key[1], node_b=key[2], limit=max_number_parallel_arcs[key], ) # ********************************************************************* if simplify_problem: ipp.simplify_peak_total_assessments() # ********************************************************************* # instantiate (disable the default case v-a-v fixed losses) # ipp.instantiate(place_fixed_losses_upstream_if_possible=False) ipp.instantiate(initialise_ancillary_sets=init_aux_sets) # optimise ipp.optimise( solver_name=solver, solver_options=solver_options, output_options={}, print_solver_output=print_solver_output, ) # return the problem object return ipp # ********************************************************************* # ********************************************************************* # ************************************************************************* # ************************************************************************* def test_single_network_single_arc_problem(self): # assessment q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0, 1)}, reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)}, time_intervals={q: (0, 1, 2)}, time_interval_durations={q: (1, 1, 1)}, ) # 2 nodes: one import, one regular mynet = Network() # import node node_IMP = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=node_IMP, prices={ qpk: ResourcePrice(prices=1.0, volumes=None) for qpk in tf.qpk() }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_A, base_flow={(q, 0): 0.50, (q, 1): 0.00, (q, 2): 1.00}, ) # arc IA arc_tech_IA = Arcs( name="any", efficiency={qk: 0.5 for qk in tf.qk()}, efficiency_reverse=None, static_loss=None, capacity=[3], minimum_cost=[2], specific_capacity_cost=1, capacity_is_instantaneous=False, validate=False, ) mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=True, # just to reach a line, mandatory_arcs=[], max_number_parallel_arcs={}, simplify_problem=False, ) # ********************************************************************* # ********************************************************************* # validation assert ipp.has_peak_total_assessments() assert ipp.results["Problem"][0]["Number of constraints"] == 24 assert ipp.results["Problem"][0]["Number of variables"] == 22 assert ipp.results["Problem"][0]["Number of nonzeros"] == 49 # the arc should be installed since it is required for feasibility assert ( True in ipp.networks["mynet"] .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) # the flows should be 1.0, 0.0 and 2.0 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]), 1.0, abs_tol=1e-6, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 1)]), 0.0, abs_tol=1e-6, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 2)]), 2.0, abs_tol=1e-6, ) # arc amplitude should be two assert math.isclose( pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]), 2.0, abs_tol=0.01, ) # capex should be four assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3) # sdncf should be -5.7 assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -5.7, abs_tol=1e-3) # the objective function should be -9.7 assert math.isclose(pyo.value(ipp.instance.obj_f), -9.7, abs_tol=1e-3) # ************************************************************************* # ************************************************************************* def test_single_network_two_arcs_problem(self): # TODO: test simplifying this problem # assessment q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0, 1)}, reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)}, time_intervals={q: (0, 1, 2)}, time_interval_durations={q: (1, 1, 1)}, ) # 2 nodes: one import, one regular mynet = Network() # import node node_IMP = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=node_IMP, prices={ qpk: ResourcePrice(prices=1.0, volumes=None) for qpk in tf.qpk() } ) # export node node_EXP = generate_pseudo_unique_key(mynet.nodes()) mynet.add_export_node( node_key=node_EXP, prices={ qpk: ResourcePrice(prices=0.5, volumes=None) for qpk in tf.qpk() } ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_A, base_flow={(q, 0): 0.50, (q, 1): -1.50, (q, 2): 1.00}, ) # arc IA arc_tech_IA = Arcs( name="any", efficiency={qk: 0.5 for qk in tf.qk()}, efficiency_reverse=None, static_loss=None, capacity=[3], minimum_cost=[2], specific_capacity_cost=1, capacity_is_instantaneous=False, validate=False, ) mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA) # arc AE arc_tech_AE = Arcs( name="any", efficiency={qk: 0.8 for qk in tf.qk()}, efficiency_reverse=None, static_loss=None, capacity=[3], minimum_cost=[2], specific_capacity_cost=1, capacity_is_instantaneous=False, validate=False, ) mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_EXP, arcs=arc_tech_AE) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=True, # just to reach a line, mandatory_arcs=[], max_number_parallel_arcs={}, simplify_problem=False, ) assert ipp.has_peak_total_assessments() assert ipp.results["Problem"][0]["Number of constraints"] == 42 assert ipp.results["Problem"][0]["Number of variables"] == 40 assert ipp.results["Problem"][0]["Number of nonzeros"] == 95 # ********************************************************************* # ********************************************************************* # validation # the arc should be installed since it is required for feasibility assert ( True in ipp.networks["mynet"] .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) # flows true_v_glljqk = { ("mynet", node_IMP, node_A, 0, q, 0): 1, ("mynet", node_IMP, node_A, 0, q, 1): 0, ("mynet", node_IMP, node_A, 0, q, 2): 2, ("mynet", node_A, node_EXP, 0, q, 0): 0, ("mynet", node_A, node_EXP, 0, q, 1): 1.5, ("mynet", node_A, node_EXP, 0, q, 2): 0 } for key, v in true_v_glljqk.items(): assert math.isclose(pyo.value(ipp.instance.var_v_glljqk[key]), v, abs_tol=1e-6) # arc amplitude should be two assert math.isclose( pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]), 2.0, abs_tol=0.01, ) assert math.isclose( pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_A, node_EXP, 0)]), 1.5, abs_tol=0.01, ) # capex should be four assert math.isclose(pyo.value(ipp.instance.var_capex), 7.5, abs_tol=1e-3) # sdncf should be -5.7+0.6*(0.966+0.934) assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -5.7+0.6*(0.966+0.934), abs_tol=1e-3) # the objective function should be -9.7+0.6*(0.966+0.934) assert math.isclose(pyo.value(ipp.instance.obj_f), -9.7+0.6*(0.966+0.934)-3.5, abs_tol=1e-3) # ************************************************************************* # ************************************************************************* def test_single_network_single_arc_problem_simpler(self): # assessment q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0, 1)}, reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)}, time_intervals={q: (0, 1, 2)}, time_interval_durations={q: (1, 1, 1)}, ) # 2 nodes: one import, one regular mynet = Network() # import node node_IMP = "thatimpnode" mynet.add_import_node( node_key=node_IMP, prices={ qpk: ResourcePrice(prices=1.0, volumes=None) for qpk in tf.qpk() }, ) # other nodes node_A = "thatnodea" mynet.add_source_sink_node( node_key=node_A, # base_flow=[0.5, 0.0, 1.0], base_flow={(q, 0): 0.50, (q, 1): 0.00, (q, 2): 1.00}, ) # arc IA arc_tech_IA = Arcs( name="any", efficiency={qk: 0.5 for qk in tf.qk()}, efficiency_reverse=None, static_loss=None, capacity=[3], minimum_cost=[2], specific_capacity_cost=1, capacity_is_instantaneous=False, validate=False, ) mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=True, # just to reach a line, mandatory_arcs=[], max_number_parallel_arcs={}, simplify_problem=True, ) assert ipp.has_peak_total_assessments() assert ipp.results["Problem"][0]["Number of constraints"] == 16 # 20 assert ipp.results["Problem"][0]["Number of variables"] == 15 # 19 assert ipp.results["Problem"][0]["Number of nonzeros"] == 28 # 36 # ********************************************************************* # ********************************************************************* # validation # the arc should be installed since it is required for feasibility assert ( True in ipp.networks["mynet"] .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) # capex should be four assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3) # the objective function should be -9.7 assert math.isclose(pyo.value(ipp.instance.obj_f), -9.7, abs_tol=1e-3) # ************************************************************************* # ************************************************************************* def test_problem_two_scenarios(self): # number_intraperiod_time_intervals = 4 nominal_discount_rate = 0.035 assessment_weights = {0: 0.7, 1: 0.3} tf = EconomicTimeFrame( discount_rate=nominal_discount_rate, reporting_periods={0: (0, 1), 1: (0, 1, 2)}, reporting_period_durations={0: (1, 1), 1: (1, 1, 1)}, # does not matter time_intervals={0: (0, 1, 2), 1: (0, 1)}, time_interval_durations={0: (1, 1, 1), 1: (1, 1)}, ) # 2 nodes: one import, one regular mynet = Network() node_IMP = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=node_IMP, prices={ qpk: ResourcePrice(prices=1.0, volumes=None) for qpk in tf.qpk() }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_A, base_flow={ (0, 0): 0.50, (0, 1): 0.00, (0, 2): 1.00, (1, 0): 1.25, (1, 1): 0.30, }, ) # arc IA arc_tech_IA = Arcs( name="any", # efficiency=[0.5, 0.5, 0.5], efficiency={(0, 0): 0.5, (0, 1): 0.5, (0, 2): 0.5, (1, 0): 0.5, (1, 1): 0.5}, efficiency_reverse=None, static_loss=None, capacity=[3], minimum_cost=[2], specific_capacity_cost=1, capacity_is_instantaneous=False, validate=False, ) mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, networks={"mynet": mynet}, converters={}, time_frame=tf, static_losses_mode=True, # just to reach a line, mandatory_arcs=[], max_number_parallel_arcs={}, assessment_weights=assessment_weights, ) assert ipp.has_peak_total_assessments() assert ipp.results["Problem"][0]["Number of constraints"] == 42 assert ipp.results["Problem"][0]["Number of variables"] == 38 assert ipp.results["Problem"][0]["Number of nonzeros"] == 87 # ********************************************************************* # validation # the arc should be installed since it is the only feasible solution assert ( True in ipp.networks["mynet"] .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) # the flows should be 1.0, 0.0 and 2.0 true_v_glljqk = { ("mynet", node_IMP, node_A, 0, 0, 0): 1, ("mynet", node_IMP, node_A, 0, 0, 1): 0, ("mynet", node_IMP, node_A, 0, 0, 2): 2, ("mynet", node_IMP, node_A, 0, 1, 0): 2.5, ("mynet", node_IMP, node_A, 0, 1, 1): 0.6, } for key, v in true_v_glljqk.items(): assert math.isclose(pyo.value(ipp.instance.var_v_glljqk[key]), v, abs_tol=1e-6) # arc amplitude should be two assert math.isclose( pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]), 2.5, abs_tol=0.01, ) # capex should be four assert math.isclose(pyo.value(ipp.instance.var_capex), 4.5, abs_tol=1e-3) # sdncf_q[0] should be -5.7 assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[0]), -5.7, abs_tol=1e-3) # the objective function should be -9.7 assert math.isclose(pyo.value(ipp.instance.obj_f), -11.096, abs_tol=3e-3) # ************************************************************************* # ************************************************************************* def test_problem_two_scenarios_simpler(self): # number_intraperiod_time_intervals = 4 nominal_discount_rate = 0.035 assessment_weights = {0: 0.7, 1: 0.3} tf = EconomicTimeFrame( discount_rate=nominal_discount_rate, reporting_periods={0: (0, 1), 1: (0, 1, 2)}, reporting_period_durations={0: (1, 1), 1: (1, 1, 1)}, # does not matter time_intervals={0: (0, 1, 2), 1: (0, 1)}, time_interval_durations={0: (1, 1, 1), 1: (1, 1)}, ) # 2 nodes: one import, one regular mynet = Network() node_IMP = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=node_IMP, prices={ qpk: ResourcePrice(prices=1.0, volumes=None) for qpk in tf.qpk() }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_A, base_flow={ (0, 0): 0.50, (0, 1): 0.00, (0, 2): 1.00, (1, 0): 1.25, (1, 1): 0.30, }, ) # arc IA arc_tech_IA = Arcs( name="any", # efficiency=[0.5, 0.5, 0.5], efficiency={(0, 0): 0.5, (0, 1): 0.5, (0, 2): 0.5, (1, 0): 0.5, (1, 1): 0.5}, efficiency_reverse=None, static_loss=None, capacity=[3], minimum_cost=[2], specific_capacity_cost=1, capacity_is_instantaneous=False, validate=False, ) mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, networks={"mynet": mynet}, converters={}, time_frame=tf, static_losses_mode=True, # just to reach a line, mandatory_arcs=[], max_number_parallel_arcs={}, assessment_weights=assessment_weights, simplify_problem=True ) assert ipp.has_peak_total_assessments() assert ipp.results["Problem"][0]["Number of constraints"] == 28 # 42 assert ipp.results["Problem"][0]["Number of variables"] == 25 # 38 assert ipp.results["Problem"][0]["Number of nonzeros"] == 51 # 87 # ********************************************************************* # validation # capex should be 4.5 assert math.isclose(pyo.value(ipp.instance.var_capex), 4.5, abs_tol=1e-3) # the objective function should be -11.096 assert math.isclose(pyo.value(ipp.instance.obj_f), -11.096, abs_tol=3e-3) # ************************************************************************* # ************************************************************************* def test_problem_two_scenarios_two_discount_rates(self): # two discount rates assessment_weights = {0: 0.7, 1: 0.3} tf = EconomicTimeFrame( discount_rates_q={0: (0.035, 0.035), 1: (0.1, 0.1, 0.1)}, reporting_periods={0: (0, 1), 1: (0, 1, 2)}, reporting_period_durations={0: (1, 1), 1: (1, 1, 1)}, # does not matter time_intervals={0: (0, 1, 2), 1: (0, 1)}, time_interval_durations={0: (1, 1, 1), 1: (1, 1)}, ) # 2 nodes: one import, one regular mynet = Network() node_IMP = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=node_IMP, prices={ qpk: ResourcePrice(prices=1.0, volumes=None) for qpk in tf.qpk() }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_A, base_flow={ (0, 0): 0.50, (0, 1): 0.00, (0, 2): 1.00, (1, 0): 1.25, (1, 1): 0.30, }, ) # arc IA arc_tech_IA = Arcs( name="any", # efficiency=[0.5, 0.5, 0.5], efficiency={(0, 0): 0.5, (0, 1): 0.5, (0, 2): 0.5, (1, 0): 0.5, (1, 1): 0.5}, efficiency_reverse=None, static_loss=None, capacity=[3], minimum_cost=[2], specific_capacity_cost=1, capacity_is_instantaneous=False, validate=False, ) mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, networks={"mynet": mynet}, converters={}, time_frame=tf, static_losses_mode=True, # just to reach a line, mandatory_arcs=[], max_number_parallel_arcs={}, assessment_weights=assessment_weights, ) assert ipp.has_peak_total_assessments() assert ipp.results["Problem"][0]["Number of constraints"] == 42 assert ipp.results["Problem"][0]["Number of variables"] == 38 assert ipp.results["Problem"][0]["Number of nonzeros"] == 87 # ********************************************************************* # validation # the arc should be installed since it is the only feasible solution assert ( True in ipp.networks["mynet"] .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) # the flows should be 1.0, 0.0 and 2.0 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, 0, 0)]), 1.0, abs_tol=1e-6, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, 0, 1)]), 0.0, abs_tol=1e-6, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, 0, 2)]), 2.0, abs_tol=1e-6, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, 1, 0)]), 2.5, abs_tol=1e-6, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, 1, 1)]), 0.6, abs_tol=1e-6, ) # arc amplitude should be two assert math.isclose( pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]), 2.5, abs_tol=0.01, ) # capex should be 4.5 assert math.isclose(pyo.value(ipp.instance.var_capex), 4.5, abs_tol=1e-3) # sdncf_q[0] should be -5.7 assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[0]), -5.7, abs_tol=1e-3) # the objective function should be -10.80213032963115 assert math.isclose(pyo.value(ipp.instance.obj_f), -10.80213032963115, abs_tol=3e-3) # ************************************************************************* # ************************************************************************* def test_problem_two_scenarios_two_discount_rates_simpler(self): # two discount rates assessment_weights = {0: 0.7, 1: 0.3} tf = EconomicTimeFrame( discount_rates_q={0: (0.035, 0.035), 1: (0.1, 0.1, 0.1)}, reporting_periods={0: (0, 1), 1: (0, 1, 2)}, reporting_period_durations={0: (1, 1), 1: (1, 1, 1)}, # does not matter time_intervals={0: (0, 1, 2), 1: (0, 1)}, time_interval_durations={0: (1, 1, 1), 1: (1, 1)}, ) # 2 nodes: one import, one regular mynet = Network() node_IMP = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=node_IMP, prices={ qpk: ResourcePrice(prices=1.0, volumes=None) for qpk in tf.qpk() }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_A, base_flow={ (0, 0): 0.50, (0, 1): 0.00, (0, 2): 1.00, (1, 0): 1.25, (1, 1): 0.30, }, ) # arc IA arc_tech_IA = Arcs( name="any", # efficiency=[0.5, 0.5, 0.5], efficiency={(0, 0): 0.5, (0, 1): 0.5, (0, 2): 0.5, (1, 0): 0.5, (1, 1): 0.5}, efficiency_reverse=None, static_loss=None, capacity=[3], minimum_cost=[2], specific_capacity_cost=1, capacity_is_instantaneous=False, validate=False, ) mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, networks={"mynet": mynet}, converters={}, time_frame=tf, static_losses_mode=True, # just to reach a line, mandatory_arcs=[], max_number_parallel_arcs={}, assessment_weights=assessment_weights, simplify_problem=True ) assert ipp.has_peak_total_assessments() assert ipp.results["Problem"][0]["Number of constraints"] == 28 # 42 assert ipp.results["Problem"][0]["Number of variables"] == 25 # 38 assert ipp.results["Problem"][0]["Number of nonzeros"] == 51 # 87 # ********************************************************************* # validation # arc amplitude should be two assert math.isclose( pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]), 2.5, abs_tol=0.01, ) # capex should be four assert math.isclose(pyo.value(ipp.instance.var_capex), 4.5, abs_tol=1e-3) # sdncf_q[0] should be -5.7 # assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[0]), -5.7, abs_tol=1e-3) # the objective function should be -10.80213032963115 (or -10.8027723516153) assert math.isclose(pyo.value(ipp.instance.obj_f), -10.80213032963115, abs_tol=3e-3) # ************************************************************************* # ************************************************************************* # problem with two symmetrical nodes and one undirected arc # problem with symmetrical nodes and one undirected arc with diff. tech. # problem with symmetrical nodes and one undirected arc, irregular steps # same problem as the previous one, except with interface variables # problem with two symmetrical nodes and one undirected arc, w/ simple sos1 def test_isolated_undirected_network(self): q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,)}, reporting_period_durations={q: (365 * 24 * 3600,)}, time_intervals={q: (0,1,2,3)}, time_interval_durations={q: (1,1,1,1)}, ) # 4 nodes: one import, one export, two supply/demand nodes mynet = Network() # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_A, # base_flow=[1, -1, 0.5, -0.5], base_flow={(0, 0): 1, (0, 1): -1, (0, 2): 0.5, (0, 3): -0.5}, ) node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_B, # base_flow=[-1, 1, -0.5, 0.5], base_flow={(0, 0): -1, (0, 1): 1, (0, 2): -0.5, (0, 3): 0.5}, ) # add arcs # undirected arc arc_tech_AB = ArcsWithoutStaticLosses( name="any", # efficiency=[1, 1, 1, 1], efficiency={(0, 0): 1, (0, 1): 1, (0, 2): 1, (0, 3): 1}, efficiency_reverse=None, capacity=[0.5, 0.75, 1.0, 1.25, 1.5, 2.0], minimum_cost=[10, 10.1, 10.2, 10.3, 10.4, 10.5], specific_capacity_cost=1, capacity_is_instantaneous=False, ) arc_key_AB_und = mynet.add_undirected_arc( node_key_a=node_A, node_key_b=node_B, arcs=arc_tech_AB ) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=True, # just to reach a line, mandatory_arcs=[], max_number_parallel_arcs={} ) assert ipp.has_peak_total_assessments() # TODO: make sure this is true assert ipp.results["Problem"][0]["Number of constraints"] == 34 assert ipp.results["Problem"][0]["Number of variables"] == 28 assert ipp.results["Problem"][0]["Number of nonzeros"] == 105 # ********************************************************************* # ********************************************************************* # validation # the arc should be installed since it is the only feasible solution assert ( True in ipp.networks["mynet"] .edges[(node_A, node_B, arc_key_AB_und)][Network.KEY_ARC_TECH] .options_selected ) # there should be no opex (imports or exports), only capex from arcs assert pyo.value(ipp.instance.var_sdncf_q[q]) == 0 assert pyo.value(ipp.instance.var_capex) > 0 assert ( pyo.value( ipp.instance.var_capex_arc_gllj[("mynet", node_A, node_B, arc_key_AB_und)] ) > 0 ) # ************************************************************************* # ************************************************************************* def test_isolated_undirected_network_diff_tech(self): # time frame q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,)}, reporting_period_durations={q: (365 * 24 * 3600,)}, time_intervals={q: (0,1,2,3)}, time_interval_durations={q: (1,1,1,1)}, ) # 4 nodes: one import, one export, two supply/demand nodes mynet = Network() # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_A, # base_flow=[1, -1, 0.5, -0.5] base_flow={(0, 0): 1, (0, 1): -1, (0, 2): 0.5, (0, 3): -0.5}, ) node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_B, # base_flow=[-1.25, 1, -0.625, 0.5] base_flow={(0, 0): -1.25, (0, 1): 1.0, (0, 2): -0.625, (0, 3): 0.5}, ) # add arcs # undirected arc arc_tech_AB = ArcsWithoutStaticLosses( name="any", # efficiency=[0.8, 1.0, 0.8, 1.0], efficiency={(0, 0): 0.8, (0, 1): 1.0, (0, 2): 0.8, (0, 3): 1.0}, efficiency_reverse=None, capacity=[1.25, 2.5], minimum_cost=[10, 15], specific_capacity_cost=1, capacity_is_instantaneous=False, ) arc_key_AB_und = mynet.add_undirected_arc( node_key_a=node_A, node_key_b=node_B, arcs=arc_tech_AB ) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP, mandatory_arcs=[], max_number_parallel_arcs={} ) assert ipp.has_peak_total_assessments() assert ipp.results["Problem"][0]["Number of constraints"] == 34 assert ipp.results["Problem"][0]["Number of variables"] == 24 assert ipp.results["Problem"][0]["Number of nonzeros"] == 77 # ********************************************************************* # ********************************************************************* # validation # the arc should be installed since it is the only feasible solution assert ( True in ipp.networks["mynet"] .edges[(node_A, node_B, arc_key_AB_und)][Network.KEY_ARC_TECH] .options_selected ) # there should be no opex (imports or exports), only capex from arcs assert pyo.value(ipp.instance.var_sdncf_q[q]) == 0 assert pyo.value(ipp.instance.var_capex) > 0 assert ( pyo.value( ipp.instance.var_capex_arc_gllj[("mynet", node_A, node_B, arc_key_AB_und)] ) > 0 ) # ********************************************************************* # ********************************************************************* # ************************************************************************* # ************************************************************************* # preexisting, reference # capacity is instantaneous # use dedicated method for preexisting arcs # capacity is instantaneous, using dedicated method # use different technologies for the undirected arc # use different technologies for the undirected arc, capacity is instant. # use different technologies for the undirected arc, using specific method # same as before but assuming the capacity is instantaneous def test_isolated_preexisting_undirected_network(self): capacity_is_instantaneous = False # assessment q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,)}, reporting_period_durations={q: (365 * 24 * 3600,)}, time_intervals={q: (0,1,2,3)}, time_interval_durations={q: (1,1,1,1)}, ) # 4 nodes: one import, one export, two supply/demand nodes mynet = Network() # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_A, # base_flow=[1, -1, 0.5, -0.5] base_flow={(0, 0): 1, (0, 1): -1, (0, 2): 0.5, (0, 3): -0.5}, ) node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_B, # base_flow=[-1, 1, -0.5, 0.5], base_flow={(0, 0): -1, (0, 1): 1, (0, 2): -0.5, (0, 3): 0.5}, ) # add arcs # isotropic mynet.add_preexisting_undirected_arc( node_key_a=node_A, node_key_b=node_B, # efficiency=[1, 1, 1, 1], efficiency={(0, 0): 1, (0, 1): 1, (0, 2): 1, (0, 3): 1}, efficiency_reverse=None, static_loss=None, capacity=1.0, capacity_is_instantaneous=capacity_is_instantaneous, ) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP, mandatory_arcs=[], max_number_parallel_arcs={} ) # validation # there should be no opex (imports or exports) and no capex assert pyo.value(ipp.instance.var_sdncf_q[q]) == 0 assert pyo.value(ipp.instance.var_capex) == 0 # ************************************************************************* # ************************************************************************* def test_isolated_preexisting_undirected_network_diff_tech(self): capacity_is_instantaneous = False # assessment q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,)}, reporting_period_durations={q: (365 * 24 * 3600,)}, time_intervals={q: (0,1,2,3)}, time_interval_durations={q: (1,1,1,1)}, ) # 4 nodes: one import, one export, two supply/demand nodes mynet = Network() # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_A, # base_flow=[1, -1, 0.5, -0.5] base_flow={(0, 0): 1, (0, 1): -1, (0, 2): 0.5, (0, 3): -0.5}, ) node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_B, # base_flow=[-1, 1, -0.5, 0.5], base_flow={(0, 0): -1, (0, 1): 1, (0, 2): -0.5, (0, 3): 0.5}, ) # add arcs # anisotropic mynet.add_preexisting_undirected_arc( node_key_a=node_A, node_key_b=node_B, # efficiency=[0.9, 1, 0.9, 1], efficiency={(0, 0): 0.9, (0, 1): 1, (0, 2): 0.9, (0, 3): 1}, capacity=1.0, capacity_is_instantaneous=capacity_is_instantaneous, # efficiency_reverse=[1, 0.9, 1, 0.9], efficiency_reverse={(0, 0): 1, (0, 1): 0.9, (0, 2): 1, (0, 3): 0.9}, static_loss=None, ) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP, mandatory_arcs=[], max_number_parallel_arcs={} ) # validation # there should be no opex (imports or exports) and no capex assert pyo.value(ipp.instance.var_sdncf_q[q]) == 0 assert pyo.value(ipp.instance.var_capex) == 0 # ************************************************************************* # ************************************************************************* def test_nonisolated_undirected_network(self): # scenario q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0, 1)}, reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)}, time_intervals={q: (0,1,2,3)}, time_interval_durations={q: (1,1,1,1)}, ) # 4 nodes: one import, one export, two supply/demand nodes mynet = Network() # import node imp_node_key = 'thatimpnode' mynet.add_import_node( node_key=imp_node_key, prices={ qpk: ResourcePrice(prices=1+i*0.05, volumes=None) for i, qpk in enumerate(tf.qpk()) }, ) # export node exp_node_key = 'thatexpnode' mynet.add_export_node( node_key=exp_node_key, prices={ qpk: ResourcePrice(prices=0.1+i*0.05, volumes=None) for i, qpk in enumerate(tf.qpk()) }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_A, # base_flow=[1, -1, 0.5, -0.5] base_flow={(0, 0): 1, (0, 1): -1, (0, 2): 0.5, (0, 3): -0.5}, ) node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_B, # base_flow=[-1, 1, -0.5, 0.5] base_flow={(0, 0): -1, (0, 1): 1, (0, 2): -0.5, (0, 3): 0.5}, ) # add arcs # import arc arc_tech_IA = Arcs( name="any", # efficiency=[1, 1, 1, 1], efficiency={(0, 0): 1, (0, 1): 1, (0, 2): 1, (0, 3): 1}, capacity=[0.5, 0.75, 1.0, 1.25, 1.5, 2.0], minimum_cost=[10, 10.1, 10.2, 10.3, 10.4, 10.5], specific_capacity_cost=1, capacity_is_instantaneous=False, efficiency_reverse=None, static_loss=None, validate=False, ) mynet.add_directed_arc( node_key_a=imp_node_key, node_key_b=node_A, arcs=arc_tech_IA ) # export arc arc_tech_BE = Arcs( name="any", # efficiency=[1, 1, 1, 1], efficiency={(0, 0): 1, (0, 1): 1, (0, 2): 1, (0, 3): 1}, capacity=[0.5, 0.75, 1.0, 1.25, 1.5, 2.0], minimum_cost=[10, 10.1, 10.2, 10.3, 10.4, 10.5], specific_capacity_cost=1, capacity_is_instantaneous=False, efficiency_reverse=None, static_loss=None, validate=False, ) mynet.add_directed_arc( node_key_a=node_B, node_key_b=exp_node_key, arcs=arc_tech_BE ) # undirected arc arc_tech_AB = Arcs( name="any", # efficiency=[1, 1, 1, 1], efficiency={(0, 0): 1, (0, 1): 1, (0, 2): 1, (0, 3): 1}, capacity=[0.5, 0.75, 1.0, 1.25, 1.5, 2.0], minimum_cost=[10.0, 10.1, 10.2, 10.3, 10.4, 10.5], specific_capacity_cost=1, capacity_is_instantaneous=False, efficiency_reverse=None, static_loss=None, validate=False, ) arc_key_AB_und = mynet.add_undirected_arc( node_key_a=node_A, node_key_b=node_B, arcs=arc_tech_AB ) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP, mandatory_arcs=[], max_number_parallel_arcs={} ) assert ipp.has_peak_total_assessments() assert ipp.results["Problem"][0]["Number of constraints"] == 80 assert ipp.results["Problem"][0]["Number of variables"] == 84 assert ipp.results["Problem"][0]["Number of nonzeros"] == 253 # ************************************************************************** # validation # network is still isolated # the import arc was not installed assert ( True not in ipp.networks["mynet"] .edges[(imp_node_key, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) # the export arc was not installed assert ( True not in ipp.networks["mynet"] .edges[(node_B, exp_node_key, 0)][Network.KEY_ARC_TECH] .options_selected ) # the undirected arc was installed assert ( True in ipp.networks["mynet"] .edges[(node_A, node_B, arc_key_AB_und)][Network.KEY_ARC_TECH] .options_selected ) # the opex should be zero assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), 0, abs_tol=1e-6) # the capex should be positive assert pyo.value(ipp.instance.var_capex) > 0 # ********************************************************************* # ********************************************************************* # ************************************************************************* # ************************************************************************* def test_nonisolated_undirected_network_diff_tech(self): # scenario q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,1)}, reporting_period_durations={q: (365 * 24 * 3600,365 * 24 * 3600)}, time_intervals={q: (0,1,2,3)}, time_interval_durations={q: (1,1,1,1)}, ) # 4 nodes: one import, one export, two supply/demand nodes mynet = Network() # import node imp_node_key = 'thatimpnode' mynet.add_import_node( node_key=imp_node_key, prices={ qpk: ResourcePrice(prices=1+i*0.05, volumes=None) for i, qpk in enumerate(tf.qpk()) }, ) # export node exp_node_key = 'thatexpnode' mynet.add_export_node( node_key=exp_node_key, prices={ qpk: ResourcePrice(prices=0.1+i*0.05, volumes=None) for i, qpk in enumerate(tf.qpk()) }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_A, # base_flow=[1, -1, 0.5, -0.5] base_flow={(0, 0): 1, (0, 1): -1, (0, 2): 0.5, (0, 3): -0.5}, ) node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_B, # base_flow=[-1, 1, -0.5, 0.5] base_flow={(0, 0): -1, (0, 1): 1, (0, 2): -0.5, (0, 3): 0.5}, ) # add arcs # import arc arc_tech_IA = Arcs( name="any", # efficiency=[1, 1, 1, 1], efficiency={(0, 0): 1, (0, 1): 1, (0, 2): 1, (0, 3): 1}, capacity=[0.5, 0.75, 1.0, 1.25, 1.5, 2.0], minimum_cost=[10, 10.1, 10.2, 10.3, 10.4, 10.5], specific_capacity_cost=1, capacity_is_instantaneous=False, efficiency_reverse=None, static_loss=None, validate=False, ) mynet.add_directed_arc( node_key_a=imp_node_key, node_key_b=node_A, arcs=arc_tech_IA ) # export arc arc_tech_BE = Arcs( name="any", # efficiency=[1, 1, 1, 1], efficiency={(0, 0): 1, (0, 1): 1, (0, 2): 1, (0, 3): 1}, capacity=[0.5, 0.75, 1.0, 1.25, 1.5, 2.0], minimum_cost=[10, 10.1, 10.2, 10.3, 10.4, 10.5], specific_capacity_cost=1, capacity_is_instantaneous=False, efficiency_reverse=None, static_loss=None, validate=False, ) mynet.add_directed_arc( node_key_a=node_B, node_key_b=exp_node_key, arcs=arc_tech_BE ) # undirected arc arc_tech_AB = Arcs( name="any", # efficiency=[0.95, 0.95, 0.95, 0.95], efficiency={(0, 0): 0.95, (0, 1): 0.95, (0, 2): 0.95, (0, 3): 0.95}, capacity=[0.5, 0.75, 1.0, 1.25, 1.5, 2.0], minimum_cost=[10, 10.1, 10.2, 10.3, 10.4, 10.5], specific_capacity_cost=1, capacity_is_instantaneous=False, # efficiency_reverse=[0.85, 0.85, 0.85, 0.85], efficiency_reverse={(0, 0): 0.85, (0, 1): 0.85, (0, 2): 0.85, (0, 3): 0.85}, static_loss=None, validate=False, ) arc_key_AB_und = mynet.add_undirected_arc( node_key_a=node_A, node_key_b=node_B, arcs=arc_tech_AB ) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP, mandatory_arcs=[], max_number_parallel_arcs={} ) assert ipp.has_peak_total_assessments() assert ipp.results["Problem"][0]["Number of constraints"] == 80 assert ipp.results["Problem"][0]["Number of variables"] == 84 assert ipp.results["Problem"][0]["Number of nonzeros"] == 253 # ********************************************************************* # ********************************************************************* # validation assert ( True in ipp.networks["mynet"] .edges[(node_A, node_B, arc_key_AB_und)][Network.KEY_ARC_TECH] .options_selected ) # the directed arc from the import should also be installed since node # B cannot fullfil all the demand since it has an efficiency of 0.85<1 assert ( True in ipp.networks["mynet"] .edges[(imp_node_key, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) # there should be no opex (imports or exports), only capex from arcs assert pyo.value(ipp.instance.var_sdncf_q[q]) < 0 assert pyo.value(ipp.instance.var_capex) > 0 assert ( pyo.value( ipp.instance.var_capex_arc_gllj[ ("mynet", node_A, node_B, arc_key_AB_und) ] ) > 0 ) assert ( pyo.value( ipp.instance.var_capex_arc_gllj[("mynet", imp_node_key, node_A, 0)] ) > 0 ) # ********************************************************************* # ********************************************************************* # ************************************************************************* # ************************************************************************* def test_nonisolated_network_preexisting_directed_arcs(self): # time frame q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,1)}, reporting_period_durations={q: (365 * 24 * 3600,365 * 24 * 3600)}, time_intervals={q: (0,1,2,3)}, time_interval_durations={q: (1,1,1,1)}, ) # 4 nodes: one import, one export, two supply/demand nodes mynet = Network() # import node imp_node_key = 'thatimpnode' mynet.add_import_node( node_key=imp_node_key, prices={ qpk: ResourcePrice(prices=1+i*0.05, volumes=None) for i, qpk in enumerate(tf.qpk()) }, ) # export node exp_node_key = 'thatexpnode' mynet.add_export_node( node_key=exp_node_key, prices={ qpk: ResourcePrice(prices=0.1+i*0.05, volumes=None) for i, qpk in enumerate(tf.qpk()) }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_A, # base_flow=[1, -1, 0.5, -0.5], base_flow={(0, 0): 1, (0, 1): -1, (0, 2): 0.5, (0, 3): -0.5}, ) node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_B, # base_flow=[-1, 1, -0.5, 0.5], base_flow={(0, 0): -1, (0, 1): 1, (0, 2): -0.5, (0, 3): 0.5}, ) # add arcs # import arc arc_tech_IA = Arcs( name="any", # efficiency=[1, 1, 1, 1], efficiency={(0, 0): 1, (0, 1): 1, (0, 2): 1, (0, 3): 1}, efficiency_reverse=None, static_loss=None, validate=False, capacity=[0.5, 0.75, 1.0, 1.25, 1.5, 2.0], minimum_cost=[10, 10.1, 10.2, 10.3, 10.4, 10.5], specific_capacity_cost=1, capacity_is_instantaneous=False, ) arc_tech_IA.options_selected[0] = True mynet.add_directed_arc(node_key_a=imp_node_key, node_key_b=node_A, arcs=arc_tech_IA) # export arc arc_tech_BE = Arcs( name="any", # efficiency=[1, 1, 1, 1], efficiency={(0, 0): 1, (0, 1): 1, (0, 2): 1, (0, 3): 1}, efficiency_reverse=None, static_loss=None, validate=False, capacity=[0.5, 0.75, 1.0, 1.25, 1.5, 2.0], minimum_cost=[10, 10.1, 10.2, 10.3, 10.4, 10.5], specific_capacity_cost=1, capacity_is_instantaneous=False, ) arc_tech_BE.options_selected[0] = True mynet.add_directed_arc(node_key_a=node_B, node_key_b=exp_node_key, arcs=arc_tech_BE) # undirected arc # isotropic arc arc_tech_AB = Arcs( name="any", # efficiency=[1, 1, 1, 1], efficiency={(0, 0): 1, (0, 1): 1, (0, 2): 1, (0, 3): 1}, efficiency_reverse=None, static_loss=None, validate=False, capacity=[0.5, 0.75, 1.0, 1.25, 1.5, 2.0], minimum_cost=[10.0, 10.1, 10.2, 10.3, 10.4, 10.5], specific_capacity_cost=1, capacity_is_instantaneous=False, ) arc_key_AB_und = mynet.add_undirected_arc( node_key_a=node_A, node_key_b=node_B, arcs=arc_tech_AB ) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP, mandatory_arcs=[], max_number_parallel_arcs={}, ) # ********************************************************************* # validation # network is still isolated # the undirected arc was installed assert ( True in ipp.networks["mynet"] .edges[(node_A, node_B, arc_key_AB_und)][Network.KEY_ARC_TECH] .options_selected ) # the opex should be zero assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), 0, abs_tol=1e-3) # the capex should be positive assert pyo.value(ipp.instance.var_capex) > 0 # ********************************************************************* # ********************************************************************* # ************************************************************************* # ************************************************************************* def test_nonisolated_network_preexisting_directed_arcs_diff_tech(self): # time frame q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,1)}, reporting_period_durations={q: (365 * 24 * 3600,365 * 24 * 3600)}, time_intervals={q: (0,1,2,3)}, time_interval_durations={q: (1,1,1,1)}, ) # 4 nodes: one import, one export, two supply/demand nodes mynet = Network() # import node imp_node_key = 'thatimpnode' mynet.add_import_node( node_key=imp_node_key, prices={ qpk: ResourcePrice(prices=1+i*0.05, volumes=None) for i, qpk in enumerate(tf.qpk()) }, ) # export node exp_node_key = 'thatexpnode' mynet.add_export_node( node_key=exp_node_key, prices={ qpk: ResourcePrice(prices=0.1+i*0.05, volumes=None) for i, qpk in enumerate(tf.qpk()) }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_A, # base_flow=[1, -1, 0.5, -0.5], base_flow={(0, 0): 1, (0, 1): -1, (0, 2): 0.5, (0, 3): -0.5}, ) node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_B, # base_flow=[-1, 1, -0.5, 0.5], base_flow={(0, 0): -1, (0, 1): 1, (0, 2): -0.5, (0, 3): 0.5}, ) # add arcs # import arc arc_tech_IA = Arcs( name="any", # efficiency=[1, 1, 1, 1], efficiency={(0, 0): 1, (0, 1): 1, (0, 2): 1, (0, 3): 1}, efficiency_reverse=None, static_loss=None, validate=False, capacity=[0.5, 0.75, 1.0, 1.25, 1.5, 2.0], minimum_cost=[10, 10.1, 10.2, 10.3, 10.4, 10.5], specific_capacity_cost=1, capacity_is_instantaneous=False, ) arc_tech_IA.options_selected[0] = True mynet.add_directed_arc(node_key_a=imp_node_key, node_key_b=node_A, arcs=arc_tech_IA) # export arc arc_tech_BE = Arcs( name="any", # efficiency=[1, 1, 1, 1], efficiency={(0, 0): 1, (0, 1): 1, (0, 2): 1, (0, 3): 1}, efficiency_reverse=None, static_loss=None, validate=False, capacity=[0.5, 0.75, 1.0, 1.25, 1.5, 2.0], minimum_cost=[10, 10.1, 10.2, 10.3, 10.4, 10.5], specific_capacity_cost=1, capacity_is_instantaneous=False, ) arc_tech_BE.options_selected[0] = True mynet.add_directed_arc(node_key_a=node_B, node_key_b=exp_node_key, arcs=arc_tech_BE) # undirected arc # anisotropic arc arc_tech_AB = Arcs( name="any", # efficiency=[0.95, 0.95, 0.95, 0.95], efficiency={(0, 0): 0.95, (0, 1): 0.95, (0, 2): 0.95, (0, 3): 0.95}, capacity=[0.5, 0.75, 1.0, 1.25, 1.5, 2.0], minimum_cost=[10, 10.1, 10.2, 10.3, 10.4, 10.5], # efficiency_reverse=[0.85, 0.85, 0.85, 0.85], efficiency_reverse={(0, 0): 0.85, (0, 1): 0.85, (0, 2): 0.85, (0, 3): 0.85}, static_loss=None, validate=False, specific_capacity_cost=1, capacity_is_instantaneous=False, ) arc_key_AB_und = mynet.add_undirected_arc( node_key_a=node_A, node_key_b=node_B, arcs=arc_tech_AB ) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver_options={},solver='scip', perform_analysis=False, plot_results=False, # True, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP, mandatory_arcs=[], max_number_parallel_arcs={}, ) # ************************************************************************** # validation # the undirected arc should be installed since it is cheaper tham imp. assert ( True in ipp.networks["mynet"] .edges[(node_A, node_B, arc_key_AB_und)][Network.KEY_ARC_TECH] .options_selected ) # the directed arc from the import should also be installed since node # B cannot fullfil all the demand since it has an efficiency of 0.85<1 assert ( True in ipp.networks["mynet"] .edges[(imp_node_key, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) # there should be no opex (imports or exports), only capex from arcs assert pyo.value(ipp.instance.var_sdncf_q[q]) < 0 assert pyo.value(ipp.instance.var_capex) > 0 assert ( pyo.value( ipp.instance.var_capex_arc_gllj[ ("mynet", node_A, node_B, arc_key_AB_und) ] ) > 0 ) # ********************************************************************* # ********************************************************************* # ************************************************************************* # ************************************************************************* def test_arc_groups_individual(self): # time frame q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,1)}, reporting_period_durations={q: (365 * 24 * 3600,365 * 24 * 3600)}, time_intervals={q: (0,1)}, time_interval_durations={q: (1,1)}, ) # 4 nodes: two import nodes, two supply/demand nodes mynet = Network() # ************************************************************************** # import nodes imp1_node_key = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=imp1_node_key, prices={ qpk: ResourcePrice(prices=qpk[2] + 1, volumes=None) for qpk in tf.qpk() }, ) imp2_node_key = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=imp2_node_key, prices={ qpk: ResourcePrice(prices=3 - qpk[2], volumes=None) for qpk in tf.qpk() }, ) imp3_node_key = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=imp3_node_key, prices={ qpk: ResourcePrice(prices=1 + 2 * qpk[2], volumes=None) for qpk in tf.qpk() }, ) # ************************************************************************** # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 1.0, (q, 1): 0.5}) # add arcs efficiency_IA1 = { (q, 0): 1.00, (q, 1): 0.85, } efficiency_IA2 = { (q, 0): 0.95, (q, 1): 0.90, } efficiency_IA3 = { (q, 0): 0.90, (q, 1): 0.95, } # I1A static_loss_IA1 = { (0, q, 0): 0.10, (0, q, 1): 0.10, (1, q, 0): 0.15, (1, q, 1): 0.15, (2, q, 0): 0.20, (2, q, 1): 0.20, } # I2A static_loss_IA2 = { (0, q, 0): 0.20, (0, q, 1): 0.20, (1, q, 0): 0.05, (1, q, 1): 0.05, (2, q, 0): 0.10, (2, q, 1): 0.10, } # I3A static_loss_IA3 = { (0, q, 0): 0.15, (0, q, 1): 0.15, (1, q, 0): 0.10, (1, q, 1): 0.10, (2, q, 0): 0.05, (2, q, 1): 0.05, } arcs_I1A = Arcs( name="IA1", efficiency=efficiency_IA1, efficiency_reverse=None, static_loss=static_loss_IA1, capacity=( 0.5, 0.75, 1.2, ), minimum_cost=( 0.2, 0.5, 0.75, ), specific_capacity_cost=0, capacity_is_instantaneous=False, validate=True, ) arc_key_I1A = mynet.add_directed_arc( node_key_a=imp1_node_key, node_key_b=node_A, arcs=arcs_I1A ) arcs_I2A = Arcs( name="IA2", efficiency=efficiency_IA2, efficiency_reverse=None, static_loss=static_loss_IA2, capacity=( 0.5, 0.75, 1.2, ), minimum_cost=( 0.2, 0.5, 0.75, ), specific_capacity_cost=0, capacity_is_instantaneous=False, validate=True, ) arc_key_I2A = mynet.add_directed_arc( node_key_a=imp2_node_key, node_key_b=node_A, arcs=arcs_I2A ) arcs_I3A = Arcs( name="IA3", efficiency=efficiency_IA3, efficiency_reverse=None, static_loss=static_loss_IA3, capacity=( 0.5, 0.75, 1.2, ), minimum_cost=( 0.2, 0.5, 0.75, ), specific_capacity_cost=0, capacity_is_instantaneous=False, validate=True, ) arc_key_I3A = mynet.add_directed_arc( node_key_a=imp3_node_key, node_key_b=node_A, arcs=arcs_I3A ) arc_groups_dict = { 0: ( ("mynet", imp1_node_key, node_A, arc_key_I1A), ("mynet", imp2_node_key, node_A, arc_key_I2A), ("mynet", imp3_node_key, node_A, arc_key_I3A), ) } # identify node types mynet.identify_node_types() # no sos, regular time intervals solver_options = {} solver_options["relative_mip_gap"] = 0 solver_options["absolute_mip_gap"] = 1e-4 ipp = self.build_solve_ipp( solver_options=solver_options, use_sos_arcs=False, arc_sos_weight_key=None, arc_use_real_variables_if_possible=False, use_sos_sense=False, sense_sos_weight_key=None, sense_use_real_variables_if_possible=False, sense_use_arc_interfaces=False, perform_analysis=False, plot_results=False, # True, print_solver_output=False, static_losses_mode=InfrastructurePlanningProblem.STATIC_LOSS_MODE_ARR, time_frame=tf, networks={"mynet": mynet}, mandatory_arcs=[], max_number_parallel_arcs={}, arc_groups_dict=arc_groups_dict ) # ********************************************************************* # overview (imports_qpk, exports_qpk, balance_qpk, import_costs_qpk, export_revenue_qpk, ncf_qpk, aggregate_static_demand_qpk, aggregate_static_supply_qpk, aggregate_static_balance_qpk) = statistics(ipp) q = 0 capex_ind = 0.75 capex_group = 1.5 imp_ind = 1.9882352941176473 imp_group = 2.2 # {'mynet': 2.2245012886626414} sdncf_group = -6.184560251632995 # -6.2386008960367345 sdncf_ind = -5.274445281858455 sdext_group = 0 sdext_ind = 0 # losses_ind = 0 # losses_group = 0 obj_ind = sdncf_ind + sdext_ind - capex_ind obj_group = sdncf_group + sdext_group - capex_group assert capex_ind < capex_group assert imp_ind < imp_group assert obj_ind > obj_group # all arcs have to be installed assert ( True in ipp.networks["mynet"] .edges[(imp1_node_key, node_A, arc_key_I1A)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(imp2_node_key, node_A, arc_key_I2A)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(imp3_node_key, node_A, arc_key_I3A)][Network.KEY_ARC_TECH] .options_selected ) # the same option has to be selected in all arcs h1 = ( ipp.networks["mynet"] .edges[(imp1_node_key, node_A, arc_key_I1A)][Network.KEY_ARC_TECH] .options_selected.index(True) ) h2 = ( ipp.networks["mynet"] .edges[(imp2_node_key, node_A, arc_key_I2A)][Network.KEY_ARC_TECH] .options_selected.index(True) ) h3 = ( ipp.networks["mynet"] .edges[(imp3_node_key, node_A, arc_key_I3A)][Network.KEY_ARC_TECH] .options_selected.index(True) ) assert h1 == h2 assert h1 == h3 # the capex have to be higher than those of the best individual arc abs_tol = 1e-3 assert math.isclose( pyo.value(ipp.instance.var_capex), capex_group, abs_tol=abs_tol ) # there should be no exports abs_tol = 1e-3 exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q]) assert math.isclose(exports_qp, 0, abs_tol=abs_tol) # the imports should be higher than with individual arcs abs_tol = 1e-3 imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0) assert math.isclose(imports_qp, imp_group, abs_tol=abs_tol) # the operating results should be lower than with an individual arc abs_tol = 1e-3 assert math.isclose( pyo.value(ipp.instance.var_sdncf_q[q]), sdncf_group, abs_tol=abs_tol ) # the externalities should be zero abs_tol = 1e-3 assert math.isclose(pyo.value(ipp.instance.var_sdext_q[q]), 0, abs_tol=abs_tol) # the objective function should be -6.3639758220728595-1.5 abs_tol = 1e-3 assert math.isclose(pyo.value(ipp.instance.obj_f), obj_group, abs_tol=abs_tol) # the imports should be greater than or equal to the losses for all arx losses_model = sum( pyo.value( ipp.instance.var_w_glljqk[ ("mynet", imp1_node_key, node_A, arc_key_I1A, q, k) ] ) + pyo.value( ipp.instance.var_w_glljqk[ ("mynet", imp2_node_key, node_A, arc_key_I2A, q, k) ] ) + pyo.value( ipp.instance.var_w_glljqk[ ("mynet", imp3_node_key, node_A, arc_key_I3A, q, k) ] ) for k in range(tf.number_time_intervals(q)) ) losses_data = sum( static_loss_IA1[(h1, q, k)] + static_loss_IA2[(h2, q, k)] + static_loss_IA3[(h3, q, k)] for k in range(tf.number_time_intervals(q)) ) assert math.isclose(losses_model, losses_data, abs_tol=abs_tol) assert imports_qp >= losses_model # ************************************************************************* # ************************************************************************* def test_arc_groups_individual_ref(self): # time frame q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,1)}, reporting_period_durations={q: (365 * 24 * 3600,365 * 24 * 3600)}, time_intervals={q: (0,1)}, time_interval_durations={q: (1,1)}, ) # 4 nodes: two import nodes, two supply/demand nodes mynet = Network() # ************************************************************************** # import nodes imp1_node_key = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=imp1_node_key, prices={ qpk: ResourcePrice(prices=qpk[2] + 1, volumes=None) for qpk in tf.qpk() }, ) imp2_node_key = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=imp2_node_key, prices={ qpk: ResourcePrice(prices=3 - qpk[2], volumes=None) for qpk in tf.qpk() }, ) imp3_node_key = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=imp3_node_key, prices={ qpk: ResourcePrice(prices=1 + 2 * qpk[2], volumes=None) for qpk in tf.qpk() }, ) # ************************************************************************** # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 1.0, (q, 1): 0.5}) # add arcs efficiency_IA1 = { (q, 0): 1.00, (q, 1): 0.85, } efficiency_IA2 = { (q, 0): 0.95, (q, 1): 0.90, } efficiency_IA3 = { (q, 0): 0.90, (q, 1): 0.95, } # I1A static_loss_IA1 = { (0, q, 0): 0.10, (0, q, 1): 0.10, (1, q, 0): 0.15, (1, q, 1): 0.15, (2, q, 0): 0.20, (2, q, 1): 0.20, } # I2A static_loss_IA2 = { (0, q, 0): 0.20, (0, q, 1): 0.20, (1, q, 0): 0.05, (1, q, 1): 0.05, (2, q, 0): 0.10, (2, q, 1): 0.10, } # I3A static_loss_IA3 = { (0, q, 0): 0.15, (0, q, 1): 0.15, (1, q, 0): 0.10, (1, q, 1): 0.10, (2, q, 0): 0.05, (2, q, 1): 0.05, } arcs_I1A = Arcs( name="IA1", efficiency=efficiency_IA1, efficiency_reverse=None, static_loss=static_loss_IA1, capacity=( 0.5, 0.75, 1.2, ), minimum_cost=( 0.2, 0.5, 0.75, ), specific_capacity_cost=0, capacity_is_instantaneous=False, validate=True, ) arc_key_I1A = mynet.add_directed_arc( node_key_a=imp1_node_key, node_key_b=node_A, arcs=arcs_I1A ) arcs_I2A = Arcs( name="IA2", efficiency=efficiency_IA2, efficiency_reverse=None, static_loss=static_loss_IA2, capacity=( 0.5, 0.75, 1.2, ), minimum_cost=( 0.2, 0.5, 0.75, ), specific_capacity_cost=0, capacity_is_instantaneous=False, validate=True, ) arc_key_I2A = mynet.add_directed_arc( node_key_a=imp2_node_key, node_key_b=node_A, arcs=arcs_I2A ) arcs_I3A = Arcs( name="IA3", efficiency=efficiency_IA3, efficiency_reverse=None, static_loss=static_loss_IA3, capacity=( 0.5, 0.75, 1.2, ), minimum_cost=( 0.2, 0.5, 0.75, ), specific_capacity_cost=0, capacity_is_instantaneous=False, validate=True, ) arc_key_I3A = mynet.add_directed_arc( node_key_a=imp3_node_key, node_key_b=node_A, arcs=arcs_I3A ) # do not use arc groups arc_groups_dict = {} # identify node types mynet.identify_node_types() # no sos, regular time intervals solver_options = {} solver_options["relative_mip_gap"] = 0 solver_options["absolute_mip_gap"] = 1e-4 ipp = self.build_solve_ipp( solver_options=solver_options, use_sos_arcs=False, arc_sos_weight_key=None, arc_use_real_variables_if_possible=False, use_sos_sense=False, sense_sos_weight_key=None, sense_use_real_variables_if_possible=False, sense_use_arc_interfaces=False, perform_analysis=False, plot_results=False, # True, print_solver_output=False, time_frame=tf, static_losses_mode=InfrastructurePlanningProblem.STATIC_LOSS_MODE_ARR, networks={"mynet": mynet}, mandatory_arcs=[], max_number_parallel_arcs={}, arc_groups_dict=arc_groups_dict ) # ************************************************************************** # overview (imports_qpk, exports_qpk, balance_qpk, import_costs_qpk, export_revenue_qpk, ncf_qpk, aggregate_static_demand_qpk, aggregate_static_supply_qpk, aggregate_static_balance_qpk) = statistics(ipp) q = 0 capex_ind = 0.75 capex_group = 1.5 imp_ind = 1.9882352941176473 imp_group = 2.2 # {'mynet': 2.2245012886626414} sdncf_group = -6.184560251632995 # -6.2386008960367345 sdncf_ind = -5.274445281858455 sdext_group = 0 sdext_ind = 0 # losses_ind = 0 # losses_group = 0 obj_ind = sdncf_ind + sdext_ind - capex_ind obj_group = sdncf_group + sdext_group - capex_group assert capex_ind < capex_group assert imp_ind < imp_group assert obj_ind > obj_group # at least one arc has to be installed assert ( True in ipp.networks["mynet"] .edges[(imp1_node_key, node_A, arc_key_I1A)][Network.KEY_ARC_TECH] .options_selected or True in ipp.networks["mynet"] .edges[(imp2_node_key, node_A, arc_key_I2A)][Network.KEY_ARC_TECH] .options_selected or True in ipp.networks["mynet"] .edges[(imp3_node_key, node_A, arc_key_I3A)][Network.KEY_ARC_TECH] .options_selected ) # the capex have to be lower than with a group of arcs abs_tol = 1e-3 assert math.isclose( pyo.value(ipp.instance.var_capex), capex_ind, abs_tol=abs_tol ) # there should be no exports abs_tol = 1e-3 exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q]) assert math.isclose(exports_qp, 0, abs_tol=abs_tol) # the imports should be lower than with a group of arcs abs_tol = 1e-3 imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0) assert math.isclose(imports_qp, imp_ind, abs_tol=abs_tol) # the operating results should be lower than with an individual arc abs_tol = 1e-3 assert math.isclose( pyo.value(ipp.instance.var_sdncf_q[q]), sdncf_ind, abs_tol=abs_tol ) # the externalities should be zero abs_tol = 1e-3 assert math.isclose( pyo.value(ipp.instance.var_sdext_q[q]), sdext_ind, abs_tol=abs_tol ) # the objective function should be -6.3639758220728595-1.5 abs_tol = 1e-3 assert math.isclose(pyo.value(ipp.instance.obj_f), obj_ind, abs_tol=abs_tol) # the imports should be greater than or equal to the losses for all arx losses_model = sum( pyo.value( ipp.instance.var_w_glljqk[ ("mynet", imp1_node_key, node_A, arc_key_I1A, q, k) ] ) + pyo.value( ipp.instance.var_w_glljqk[ ("mynet", imp2_node_key, node_A, arc_key_I2A, q, k) ] ) + pyo.value( ipp.instance.var_w_glljqk[ ("mynet", imp3_node_key, node_A, arc_key_I3A, q, k) ] ) for k in range(tf.number_time_intervals(q)) ) assert imports_qp >= losses_model # ************************************************************************* # ************************************************************************* def test_arc_groups_individual_undirected(self): # time frame q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,1)}, reporting_period_durations={q: (365 * 24 * 3600,365 * 24 * 3600)}, time_intervals={q: (0,1)}, time_interval_durations={q: (1,1)}, ) # 4 nodes: one import node, four regular nodes mynet = Network() # import nodes imp_node_key = 'thatimpnode' mynet.add_import_node( node_key=imp_node_key, prices={ qpk: ResourcePrice(prices=1 + qpk[2], volumes=None) for qpk in tf.qpk() }, ) # A node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 0.0, (q, 1): 1.0}) # B node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_B, base_flow={(q, 0): 1.0, (q, 1): -0.5}) # C node_C = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_C, base_flow={(q, 0): 0.0, (q, 1): 0.5}) # D node_D = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_D, base_flow={(q, 0): 0.5, (q, 1): -0.25}) # ************************************************************************** # add arcs # IA mynet.add_preexisting_directed_arc( node_key_a=imp_node_key, node_key_b=node_A, efficiency=None, static_loss=None, capacity=1.5, capacity_is_instantaneous=False, ) # IC mynet.add_preexisting_directed_arc( node_key_a=imp_node_key, node_key_b=node_C, efficiency=None, static_loss=None, capacity=1.5, capacity_is_instantaneous=False, ) # AB efficiency_AB = { (q, 0): 1.00, (q, 1): 0.85, } efficiency_BA = { (q, 0): 0.95, (q, 1): 0.80, } static_loss_AB = { (0, q, 0): 0.20, (0, q, 1): 0.25, (1, q, 0): 0.25, (1, q, 1): 0.30, (2, q, 0): 0.30, (2, q, 1): 0.35, } arcs_AB = Arcs( name="AB", efficiency=efficiency_AB, efficiency_reverse=efficiency_BA, static_loss=static_loss_AB, capacity=(0.85, 1.5, 2.5), minimum_cost=(1, 2, 3), specific_capacity_cost=0, capacity_is_instantaneous=False, validate=True, ) arc_key_AB = mynet.add_undirected_arc( node_key_a=node_A, node_key_b=node_B, arcs=arcs_AB ) # CD efficiency_CD = { (q, 0): 1.00, (q, 1): 0.85, } efficiency_DC = {(q, 0): 0.95, (q, 1): 0.80} static_loss_CD = { (0, q, 0): 0.010, (0, q, 1): 0.015, (1, q, 0): 0.015, (1, q, 1): 0.020, (2, q, 0): 0.020, (2, q, 1): 0.025, } arcs_CD = Arcs( name="CD", efficiency=efficiency_CD, efficiency_reverse=efficiency_DC, static_loss=static_loss_CD, capacity=(0.85, 1.5, 2.5), minimum_cost=(1, 2, 3), specific_capacity_cost=0, capacity_is_instantaneous=False, validate=True, ) arc_key_CD = mynet.add_undirected_arc( node_key_a=node_C, node_key_b=node_D, arcs=arcs_CD ) # arc groups arc_groups_dict = { 0: ( ("mynet", node_A, node_B, arc_key_AB), ("mynet", node_C, node_D, arc_key_CD), ) } # identify node types mynet.identify_node_types() # solver settings solver_options = {} solver_options["relative_mip_gap"] = 0 solver_options["absolute_mip_gap"] = 1e-4 # no sos, regular time intervals for static_losses_mode in InfrastructurePlanningProblem.STATIC_LOSS_MODES: # reset decisions if necessary if True in mynet.edges[(node_A, node_B, arc_key_AB)][Network.KEY_ARC_TECH].options_selected: mynet.edges[(node_A, node_B, arc_key_AB)][ Network.KEY_ARC_TECH].options_selected[ mynet.edges[(node_A, node_B, arc_key_AB)][ Network.KEY_ARC_TECH].options_selected.index(True) ] = False if True in mynet.edges[(node_C, node_D, arc_key_CD)][Network.KEY_ARC_TECH].options_selected: mynet.edges[(node_C, node_D, arc_key_CD)][ Network.KEY_ARC_TECH].options_selected[ mynet.edges[(node_C, node_D, arc_key_CD)][ Network.KEY_ARC_TECH].options_selected.index(True) ] = False ipp = self.build_solve_ipp( solver_options=solver_options, use_sos_arcs=False, arc_sos_weight_key=None, arc_use_real_variables_if_possible=False, use_sos_sense=False, sense_sos_weight_key=None, sense_use_real_variables_if_possible=False, sense_use_arc_interfaces=False, perform_analysis=False, plot_results=False, # True, print_solver_output=False, networks={"mynet": mynet}, time_frame=tf, static_losses_mode=static_losses_mode, mandatory_arcs=[], max_number_parallel_arcs={}, arc_groups_dict=arc_groups_dict ) # overview (imports_qpk, exports_qpk, balance_qpk, import_costs_qpk, export_revenue_qpk, ncf_qpk, aggregate_static_demand_qpk, aggregate_static_supply_qpk, aggregate_static_balance_qpk) = statistics(ipp) capex_ind = 3 capex_group = 4 imp_ind = 2.912 imp_group = 2.9210000000000003 sdncf_group = -7.745053560176434 sdnext_group = 0 obj_group = sdnext_group + sdncf_group - capex_group losses_ind = sum( static_loss_AB[(1, q, k)] + static_loss_CD[(0, q, k)] for k in range(tf.number_time_intervals(q)) ) losses_group = sum( static_loss_AB[(1, q, k)] + static_loss_CD[(1, q, k)] for k in range(tf.number_time_intervals(q)) ) losses_model = sum( pyo.value( ipp.instance.var_w_glljqk[("mynet", node_A, node_B, arc_key_AB, q, k)] ) + pyo.value( ipp.instance.var_w_glljqk[("mynet", node_C, node_D, arc_key_CD, q, k)] ) for k in range(tf.number_time_intervals(q)) ) assert capex_group > capex_ind # # assert math.isclose(losses_group, losses_ind, abs_tol=1e-3) assert losses_group > losses_ind assert imp_group > imp_ind # all arcs have to be installed assert ( True in ipp.networks["mynet"] .edges[(node_A, node_B, arc_key_AB)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(node_C, node_D, arc_key_CD)][Network.KEY_ARC_TECH] .options_selected ) # the same option has to be selected in all arcs h1 = ( ipp.networks["mynet"] .edges[(node_A, node_B, arc_key_AB)][Network.KEY_ARC_TECH] .options_selected.index(True) ) h2 = ( ipp.networks["mynet"] .edges[(node_C, node_D, arc_key_CD)][Network.KEY_ARC_TECH] .options_selected.index(True) ) assert h1 == h2 # the capex have to be higher than those of the best individual arc abs_tol = 1e-3 assert math.isclose( pyo.value(ipp.instance.var_capex), capex_group, abs_tol=abs_tol ) # there should be no exports abs_tol = 1e-3 exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q]) assert math.isclose(exports_qp, 0, abs_tol=abs_tol) # the imports should be higher than with individual arcs abs_tol = 1e-3 imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0) assert math.isclose(imports_qp, imp_group, abs_tol=abs_tol) assert imp_group > imp_ind # the operating results should be lower than with an individual arc abs_tol = 1e-3 assert math.isclose( pyo.value(ipp.instance.var_sdncf_q[q]), sdncf_group, abs_tol=abs_tol ) # the externalities should be zero abs_tol = 1e-3 assert math.isclose( pyo.value(ipp.instance.var_sdext_q[q]), sdnext_group, abs_tol=abs_tol ) # the objective function should be -6.3639758220728595-1.5 abs_tol = 1e-3 assert math.isclose(pyo.value(ipp.instance.obj_f), obj_group, abs_tol=abs_tol) # the imports should be greater than or equal to the losses for all arx losses_model = sum( pyo.value( ipp.instance.var_w_glljqk[("mynet", node_A, node_B, arc_key_AB, q, k)] ) + pyo.value( ipp.instance.var_w_glljqk[("mynet", node_C, node_D, arc_key_CD, q, k)] ) for k in range(tf.number_time_intervals(q)) ) losses_data = sum( static_loss_AB[(h1, q, k)] + static_loss_CD[(h2, q, k)] for k in range(tf.number_time_intervals(q)) ) assert math.isclose(losses_model, losses_data, abs_tol=abs_tol) assert math.isclose(losses_data, losses_group, abs_tol=abs_tol) # ************************************************************************* # ************************************************************************* def test_arc_groups_individual_undirected_ref(self): # time frame q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,1)}, reporting_period_durations={q: (365 * 24 * 3600,365 * 24 * 3600)}, time_intervals={q: (0,1)}, time_interval_durations={q: (1,1)}, ) # 4 nodes: one import node, four regular nodes mynet = Network() # import nodes imp_node_key = 'thatimpnode' mynet.add_import_node( node_key=imp_node_key, prices={ qpk: ResourcePrice(prices=1 + qpk[2], volumes=None) for qpk in tf.qpk() }, ) # A node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 0.0, (q, 1): 1.0}) # B node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_B, base_flow={(q, 0): 1.0, (q, 1): -0.5}) # C node_C = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_C, base_flow={(q, 0): 0.0, (q, 1): 0.5}) # D node_D = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_D, base_flow={(q, 0): 0.5, (q, 1): -0.25}) # ********************************************************************* # ********************************************************************* # add arcs # IA mynet.add_preexisting_directed_arc( node_key_a=imp_node_key, node_key_b=node_A, efficiency=None, static_loss=None, capacity=1.5, capacity_is_instantaneous=False, ) # IC mynet.add_preexisting_directed_arc( node_key_a=imp_node_key, node_key_b=node_C, efficiency=None, static_loss=None, capacity=1.5, capacity_is_instantaneous=False, ) # AB efficiency_AB = { (q, 0): 1.00, (q, 1): 0.85, } efficiency_BA = { (q, 0): 0.95, (q, 1): 0.80, } static_loss_AB = { (0, q, 0): 0.20, (0, q, 1): 0.25, (1, q, 0): 0.25, (1, q, 1): 0.30, (2, q, 0): 0.30, (2, q, 1): 0.35, } arcs_AB = Arcs( name="AB", efficiency=efficiency_AB, efficiency_reverse=efficiency_BA, static_loss=static_loss_AB, capacity=(0.85, 1.5, 2.5), minimum_cost=(1, 2, 3), specific_capacity_cost=0, capacity_is_instantaneous=False, validate=True, ) arc_key_AB = mynet.add_undirected_arc( node_key_a=node_A, node_key_b=node_B, arcs=arcs_AB ) # CD efficiency_CD = { (q, 0): 1.00, (q, 1): 0.85, } efficiency_DC = {(q, 0): 0.95, (q, 1): 0.80} static_loss_CD = { (0, q, 0): 0.010, (0, q, 1): 0.015, (1, q, 0): 0.015, (1, q, 1): 0.020, (2, q, 0): 0.020, (2, q, 1): 0.025, } arcs_CD = Arcs( name="CD", efficiency=efficiency_CD, efficiency_reverse=efficiency_DC, static_loss=static_loss_CD, capacity=(0.85, 1.5, 2.5), minimum_cost=(1, 2, 3), specific_capacity_cost=0, capacity_is_instantaneous=False, validate=True, ) arc_key_CD = mynet.add_undirected_arc( node_key_a=node_C, node_key_b=node_D, arcs=arcs_CD ) # arc groups arc_groups_dict = {} # identify node types mynet.identify_node_types() # solver settings solver_options = {} solver_options["relative_mip_gap"] = 0 solver_options["absolute_mip_gap"] = 1e-4 # no sos, regular time intervals for static_losses_mode in InfrastructurePlanningProblem.STATIC_LOSS_MODES: # reset decisions if necessary if True in mynet.edges[(node_A, node_B, arc_key_AB)][Network.KEY_ARC_TECH].options_selected: mynet.edges[(node_A, node_B, arc_key_AB)][ Network.KEY_ARC_TECH].options_selected[ mynet.edges[(node_A, node_B, arc_key_AB)][ Network.KEY_ARC_TECH].options_selected.index(True) ] = False if True in mynet.edges[(node_C, node_D, arc_key_CD)][Network.KEY_ARC_TECH].options_selected: mynet.edges[(node_C, node_D, arc_key_CD)][ Network.KEY_ARC_TECH].options_selected[ mynet.edges[(node_C, node_D, arc_key_CD)][ Network.KEY_ARC_TECH].options_selected.index(True) ] = False ipp = self.build_solve_ipp( solver_options=solver_options, perform_analysis=False, plot_results=False, # True, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=static_losses_mode, arc_groups_dict=arc_groups_dict, max_number_parallel_arcs={} ) # overview (imports_qpk, exports_qpk, balance_qpk, import_costs_qpk, export_revenue_qpk, ncf_qpk, aggregate_static_demand_qpk, aggregate_static_supply_qpk, aggregate_static_balance_qpk) = statistics(ipp) capex_ind = 3 capex_group = 4 imp_ind = 2.912 imp_group = 2.9210000000000003 sdncf_ind = -7.72035753459824 sdnext_ind = 0 obj_ind = sdnext_ind + sdncf_ind - capex_ind losses_ind = sum( static_loss_AB[(1, q, k)] + static_loss_CD[(0, q, k)] for k in range(tf.number_time_intervals(q)) ) losses_group = sum( static_loss_AB[(1, q, k)] + static_loss_CD[(1, q, k)] for k in range(tf.number_time_intervals(q)) ) losses_model = sum( pyo.value( ipp.instance.var_w_glljqk[("mynet", node_A, node_B, arc_key_AB, q, k)] ) + pyo.value( ipp.instance.var_w_glljqk[("mynet", node_C, node_D, arc_key_CD, q, k)] ) for k in range(tf.number_time_intervals(q)) ) assert capex_group > capex_ind # # assert math.isclose(losses_group, losses_ind, abs_tol=1e-3) assert losses_group > losses_ind assert imp_group > imp_ind # at least one arc has to be installed assert ( True in ipp.networks["mynet"] .edges[(node_A, node_B, arc_key_AB)][Network.KEY_ARC_TECH] .options_selected or True in ipp.networks["mynet"] .edges[(node_C, node_D, arc_key_CD)][Network.KEY_ARC_TECH] .options_selected ) # the capex have to be lower than with a group of arcs abs_tol = 1e-3 assert math.isclose( pyo.value(ipp.instance.var_capex), capex_ind, abs_tol=abs_tol ) # there should be no exports abs_tol = 1e-3 exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q]) assert math.isclose(exports_qp, 0, abs_tol=abs_tol) # the imports should be lower than with a group of arcs abs_tol = 1e-3 imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0) assert math.isclose(imports_qp, imp_ind, abs_tol=abs_tol) # the operating results should be lower than with an individual arc abs_tol = 1e-3 assert math.isclose( pyo.value(ipp.instance.var_sdncf_q[q]), sdncf_ind, abs_tol=abs_tol ) # the externalities should be zero abs_tol = 1e-3 assert math.isclose(pyo.value(ipp.instance.var_sdext_q[q]), 0, abs_tol=abs_tol) # the objective function should be -6.3639758220728595-1.5 abs_tol = 1e-3 assert math.isclose(pyo.value(ipp.instance.obj_f), obj_ind, abs_tol=abs_tol) # the imports should be greater than or equal to the losses for all arx assert math.isclose(losses_model, losses_ind, abs_tol=abs_tol) # ************************************************************************* # ************************************************************************* def test_direct_imp_exp_network(self): # time frame q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,1)}, reporting_period_durations={q: (365 * 24 * 3600,365 * 24 * 3600)}, time_intervals={q: (0,1)}, time_interval_durations={q: (1,1)}, ) # 4 nodes: one import, one export, two supply/demand nodes mynet = Network() # import node imp_node_key = 'thatimpnode' imp_prices = { qpk: ResourcePrice( prices=1.5, volumes=None, ) for qpk in tf.qpk() } mynet.add_import_node( node_key=imp_node_key, prices=imp_prices ) # export node exp_node_key = 'thatexpnode' exp_prices = { qpk: ResourcePrice( prices=0.5, volumes=None, ) for qpk in tf.qpk() } mynet.add_export_node( node_key=exp_node_key, prices=exp_prices, ) # add arc without fixed losses from import node to export arc_tech_IE = Arcs( name="IE", # efficiency=[1, 1, 1, 1], efficiency={(0, 0): 1, (0, 1): 1, (0, 2): 1, (0, 3): 1}, efficiency_reverse=None, static_loss=None, validate=False, capacity=[0.5, 1.0, 2.0], minimum_cost=[5, 5.1, 5.2], specific_capacity_cost=1, capacity_is_instantaneous=False, ) mynet.add_directed_arc( node_key_a=imp_node_key, node_key_b=exp_node_key, arcs=arc_tech_IE ) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, networks={"mynet": mynet}, time_frame=tf, static_losses_mode=InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP, mandatory_arcs=[], max_number_parallel_arcs={} ) # ********************************************************************* # ********************************************************************* # import prices are higher: it makes no sense to install the arc # the arc should not be installed (unless prices allow for it) assert ( True not in ipp.networks["mynet"] .edges[(imp_node_key, exp_node_key, 0)][Network.KEY_ARC_TECH] .options_selected ) # overview (imports_qpk, exports_qpk, balance_qpk, import_costs_qpk, export_revenue_qpk, ncf_qpk, aggregate_static_demand_qpk, aggregate_static_supply_qpk, aggregate_static_balance_qpk) = statistics(ipp) # there should be no imports abs_tol = 1e-6 abs_tol = 1e-3 imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0) assert math.isclose(imports_qp, 0.0, abs_tol=abs_tol) abs_tol = 1e-3 import_costs_qp = sum(import_costs_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0) assert math.isclose(import_costs_qp, 0.0, abs_tol=abs_tol) # there should be no exports abs_tol = 1e-2 exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q]) assert math.isclose(exports_qp, 0.0, abs_tol=abs_tol) export_revenue_qp = sum(export_revenue_qpk[(q, 0, k)] for k in tf.time_intervals[q]) assert math.isclose(export_revenue_qp, 0.0, abs_tol=abs_tol) # there should be no capex abs_tol = 1e-6 assert math.isclose(pyo.value(ipp.instance.var_capex), 0.0, abs_tol=abs_tol) # ************************************************************************* # ************************************************************************* def test_undirected_arc_static_upstream_new(self): # assessment q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0, 1)}, reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)}, time_intervals={q: (0, 1)}, time_interval_durations={q: (1, 1)}, ) # 4 nodes: two import nodes, two supply/demand nodes mynet = Network() # import nodes imp1_node_key = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=imp1_node_key, prices={ qpk: ResourcePrice(prices=qpk[2] + 1, volumes=None) for qpk in tf.qpk() }, ) imp2_node_key = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=imp2_node_key, prices={ qpk: ResourcePrice(prices=2-qpk[2], volumes=None) for qpk in tf.qpk() }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_A, base_flow={(0, 0): 0.0, (0, 1): 1.1} ) node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_B, base_flow={(0, 0): 1.1, (0, 1): 0.0} ) # add arcs # I1A mynet.add_preexisting_directed_arc( node_key_a=imp1_node_key, node_key_b=node_A, efficiency=None, static_loss=None, capacity=1.2, capacity_is_instantaneous=False, ) # I2B mynet.add_preexisting_directed_arc( node_key_a=imp2_node_key, node_key_b=node_B, efficiency=None, static_loss=None, capacity=1.2, capacity_is_instantaneous=False, ) efficiency_AB = {(0, 0): 1, (0, 1): 1} efficiency_BA = {(0, 0): 1, (0, 1): 1} # AB static_loss_AB = { (0, q, 0): 0.1, (0, q, 1): 0.1, (1, q, 0): 0.1, (1, q, 1): 0.1, } arcs_ab = Arcs( name="AB", efficiency=efficiency_AB, efficiency_reverse=efficiency_BA, static_loss=static_loss_AB, capacity=( 0.5, 1, ), minimum_cost=( 0.025, 0.05, ), specific_capacity_cost=0, capacity_is_instantaneous=False, validate=True, ) arc_key_AB_und = mynet.add_undirected_arc( node_key_a=node_A, node_key_b=node_B, arcs=arcs_ab ) # identify node types mynet.identify_node_types() # no sos, regular time intervals for static_losses_mode in InfrastructurePlanningProblem.STATIC_LOSS_MODES: # reset decisions if necessary if True in mynet.edges[(imp1_node_key, node_A, 0)][Network.KEY_ARC_TECH].options_selected: mynet.edges[(imp1_node_key, node_A, 0)][ Network.KEY_ARC_TECH].options_selected[ mynet.edges[(imp1_node_key, node_A, 0)][ Network.KEY_ARC_TECH].options_selected.index(True) ] = False if True in mynet.edges[(imp2_node_key, node_B, 0)][Network.KEY_ARC_TECH].options_selected: mynet.edges[(imp2_node_key, node_B, 0)][ Network.KEY_ARC_TECH].options_selected[ mynet.edges[(imp2_node_key, node_B, 0)][ Network.KEY_ARC_TECH].options_selected.index(True) ] = False if True in mynet.edges[(node_A, node_B, arc_key_AB_und)][Network.KEY_ARC_TECH].options_selected: mynet.edges[(node_A, node_B, arc_key_AB_und)][ Network.KEY_ARC_TECH].options_selected[ mynet.edges[(node_A, node_B, arc_key_AB_und)][ Network.KEY_ARC_TECH].options_selected.index(True) ] = False ipp = self.build_solve_ipp( solver_options={}, use_sos_arcs=False, arc_sos_weight_key=None, arc_use_real_variables_if_possible=False, use_sos_sense=False, sense_sos_weight_key=None, sense_use_real_variables_if_possible=False, sense_use_arc_interfaces=False, perform_analysis=False, plot_results=False, # True, print_solver_output=False, networks={"mynet": mynet}, time_frame=tf, static_losses_mode=static_losses_mode, mandatory_arcs=[], max_number_parallel_arcs={} ) # all arcs should be installed (they are not new) assert ( True in ipp.networks["mynet"] .edges[(imp1_node_key, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(imp2_node_key, node_B, 0)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(node_A, node_B, arc_key_AB_und)][Network.KEY_ARC_TECH] .options_selected ) # overview (imports_qpk, exports_qpk, balance_qpk, import_costs_qpk, export_revenue_qpk, ncf_qpk, aggregate_static_demand_qpk, aggregate_static_supply_qpk, aggregate_static_balance_qpk) = statistics(ipp) # the flow through AB should be from A to B during interval 0 abs_tol = 1e-6 assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 1, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 0) ] ), 0, abs_tol=abs_tol, ) # the flow through AB should be from B to A during interval 1 assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 1) ] ), 0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 1, abs_tol=abs_tol, ) # there should be imports abs_tol = 1e-6 imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0) assert math.isclose(imports_qp, (1.2 + 1.2), abs_tol=abs_tol) # there should be no exports abs_tol = 1e-6 exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q]) assert math.isclose(exports_qp, 0, abs_tol=abs_tol) # flow through I1A must be 1.0 during time interval 0 # flow through I1A must be 0.2 during time interval 1 abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp1_node_key, node_A, 0, 0, 0)]), 1.0, abs_tol=abs_tol, ) abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp1_node_key, node_A, 0, 0, 1)]), 0.2, abs_tol=abs_tol, ) # flow through I2B must be 0.2 during time interval 0 # flow through I2B must be 1.0 during time interval 1 abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp2_node_key, node_B, 0, 0, 0)]), 0.2, abs_tol=abs_tol, ) abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp2_node_key, node_B, 0, 0, 1)]), 1.0, abs_tol=abs_tol, ) # flow from B to A must be 0 during time interval 0 # flow from A to B must be 0 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[("mynet", node_B, node_A, arc_key_AB_und, 0, 0)] ), 0.0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[("mynet", node_A, node_B, arc_key_AB_und, 0, 1)] ), 0.0, abs_tol=abs_tol, ) # validation if static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_ARR: # arrival node # flow from A to B must be 1.0 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 0) ] ), 1.0, abs_tol=abs_tol, ) # flow from B to A must be 0.9 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 1) ] ), 0.9, abs_tol=abs_tol, ) elif static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP: # departure node # flow from A to B must be 0.9 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 0) ] ), 0.9, abs_tol=abs_tol, ) # flow from B to A must be 1.0 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 1) ] ), 1.0, abs_tol=abs_tol, ) elif static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_US: # upstream # flow from A to B must be 0.9 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 0) ] ), 0.9, abs_tol=abs_tol, ) # flow from B to A must be 0.9 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 1) ] ), 0.9, abs_tol=abs_tol, ) else: # downstream # flow from A to B must be 1.0 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 0) ] ), 1.0, abs_tol=abs_tol, ) # flow from B to A must be 1.0 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 1) ] ), 1.0, abs_tol=abs_tol, ) # ********************************************************************* # ********************************************************************* # ************************************************************************* # ************************************************************************* def test_undirected_arc_static_upstream_preexisting(self): # assessment q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0, 1)}, reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)}, time_intervals={q: (0, 1)}, time_interval_durations={q: (1, 1)}, ) # 4 nodes: two import nodes, two supply/demand nodes mynet = Network() # import nodes imp1_node_key = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=imp1_node_key, prices={ qpk: ResourcePrice(prices=qpk[2] + 1, volumes=None) for qpk in tf.qpk() }, ) imp2_node_key = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=imp2_node_key, prices={ qpk: ResourcePrice(prices=2-qpk[2], volumes=None) for qpk in tf.qpk() }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_A, base_flow={(0, 0): 0.0, (0, 1): 1.1} ) node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_B, base_flow={(0, 0): 1.1, (0, 1): 0.0} ) # add arcs # I1A mynet.add_preexisting_directed_arc( node_key_a=imp1_node_key, node_key_b=node_A, efficiency=None, static_loss=None, capacity=1.2, capacity_is_instantaneous=False, ) # I2B mynet.add_preexisting_directed_arc( node_key_a=imp2_node_key, node_key_b=node_B, efficiency=None, static_loss=None, capacity=1.2, capacity_is_instantaneous=False, ) efficiency_AB = {(0, 0): 1, (0, 1): 1} efficiency_BA = {(0, 0): 1, (0, 1): 1} # AB static_loss_AB = {(0, q, 0): 0.1, (0, q, 1): 0.1} arc_key_AB_und = mynet.add_preexisting_undirected_arc( node_key_a=node_A, node_key_b=node_B, efficiency=efficiency_AB, efficiency_reverse=efficiency_BA, static_loss=static_loss_AB, capacity=1, capacity_is_instantaneous=False, ) # identify node types mynet.identify_node_types() # no sos, regular time intervals for static_losses_mode in InfrastructurePlanningProblem.STATIC_LOSS_MODES: # reset decisions if necessary if True in mynet.edges[(imp1_node_key, node_A, 0)][Network.KEY_ARC_TECH].options_selected: mynet.edges[(imp1_node_key, node_A, 0)][ Network.KEY_ARC_TECH].options_selected[ mynet.edges[(imp1_node_key, node_A, 0)][ Network.KEY_ARC_TECH].options_selected.index(True) ] = False if True in mynet.edges[(imp2_node_key, node_B, 0)][Network.KEY_ARC_TECH].options_selected: mynet.edges[(imp2_node_key, node_B, 0)][ Network.KEY_ARC_TECH].options_selected[ mynet.edges[(imp2_node_key, node_B, 0)][ Network.KEY_ARC_TECH].options_selected.index(True) ] = False if True in mynet.edges[(node_A, node_B, arc_key_AB_und)][Network.KEY_ARC_TECH].options_selected: mynet.edges[(node_A, node_B, arc_key_AB_und)][ Network.KEY_ARC_TECH].options_selected[ mynet.edges[(node_A, node_B, arc_key_AB_und)][ Network.KEY_ARC_TECH].options_selected.index(True) ] = False ipp = self.build_solve_ipp( solver_options={}, use_sos_arcs=False, arc_sos_weight_key=None, arc_use_real_variables_if_possible=False, use_sos_sense=False, sense_sos_weight_key=None, sense_use_real_variables_if_possible=False, sense_use_arc_interfaces=False, perform_analysis=False, plot_results=False, # True, print_solver_output=False, networks={"mynet": mynet}, time_frame=tf, static_losses_mode=static_losses_mode, mandatory_arcs=[], max_number_parallel_arcs={} ) # all arcs should be installed (they are not new) assert ( True in ipp.networks["mynet"] .edges[(imp1_node_key, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(imp2_node_key, node_B, 0)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(node_A, node_B, arc_key_AB_und)][Network.KEY_ARC_TECH] .options_selected ) # overview (imports_qpk, exports_qpk, balance_qpk, import_costs_qpk, export_revenue_qpk, ncf_qpk, aggregate_static_demand_qpk, aggregate_static_supply_qpk, aggregate_static_balance_qpk) = statistics(ipp) # the flow through AB should be from A to B during interval 0 abs_tol = 1e-6 assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 1, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 0) ] ), 0, abs_tol=abs_tol, ) # the flow through AB should be from B to A during interval 1 assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 1) ] ), 0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 1, abs_tol=abs_tol, ) # there should be imports abs_tol = 1e-6 imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0) assert math.isclose(imports_qp, (1.2 + 1.2), abs_tol=abs_tol) # there should be no exports abs_tol = 1e-6 exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q]) export_revenue_qp = sum(export_revenue_qpk[(q, 0, k)] for k in tf.time_intervals[q]) assert math.isclose(exports_qp, 0, abs_tol=abs_tol) # flow through I1A must be 1.0 during time interval 0 # flow through I1A must be 0.2 during time interval 1 abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp1_node_key, node_A, 0, 0, 0)]), 1.0, abs_tol=abs_tol, ) abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp1_node_key, node_A, 0, 0, 1)]), 0.2, abs_tol=abs_tol, ) # flow through I2B must be 0.2 during time interval 0 # flow through I2B must be 1.0 during time interval 1 abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp2_node_key, node_B, 0, 0, 0)]), 0.2, abs_tol=abs_tol, ) abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp2_node_key, node_B, 0, 0, 1)]), 1.0, abs_tol=abs_tol, ) # flow from B to A must be 0 during time interval 0 # flow from A to B must be 0 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[("mynet", node_B, node_A, arc_key_AB_und, 0, 0)] ), 0.0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[("mynet", node_A, node_B, arc_key_AB_und, 0, 1)] ), 0.0, abs_tol=abs_tol, ) # validation if static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_ARR: # arrival node # flow from A to B must be 1.0 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 0) ] ), 1.0, abs_tol=abs_tol, ) # flow from B to A must be 0.9 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 1) ] ), 0.9, abs_tol=abs_tol, ) elif static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP: # departure node # flow from A to B must be 0.9 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 0) ] ), 0.9, abs_tol=abs_tol, ) # flow from B to A must be 1.0 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 1) ] ), 1.0, abs_tol=abs_tol, ) elif static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_US: # upstream # flow from A to B must be 0.9 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 0) ] ), 0.9, abs_tol=abs_tol, ) # flow from B to A must be 0.9 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 1) ] ), 0.9, abs_tol=abs_tol, ) else: # downstream # flow from A to B must be 1.0 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 0) ] ), 1.0, abs_tol=abs_tol, ) # flow from B to A must be 1.0 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 1) ] ), 1.0, abs_tol=abs_tol, ) # ************************************************************************* # ************************************************************************* def test_undirected_arc_static_downstream_new(self): # assessment q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0, 1)}, reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)}, time_intervals={q: (0, 1, 2, 3)}, time_interval_durations={q: (1, 1, 1, 1)}, ) # 4 nodes: two import nodes, two supply/demand nodes mynet = Network() # import nodes imp1_prices = [ResourcePrice(prices=k, volumes=None) for k in [1, 2, 1, 1]] imp1_node_key = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=imp1_node_key, prices={ qpk: imp1_prices[qpk[2]] for qpk in tf.qpk() }, ) imp2_prices = [ResourcePrice(prices=k, volumes=None) for k in [2, 1, 2, 2]] imp2_node_key = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=imp2_node_key, prices={ qpk: imp2_prices[qpk[2]] for qpk in tf.qpk() }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_A, base_flow={ (0, 0): 1.0, # to be provided via I1 but AB losses have to be comp. (0, 1): 0.0, (0, 2): 0.0, (0, 3): 0.0, }, ) node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_B, base_flow={ (0, 0): 0.0, (0, 1): 1.0, # to be provided via I2 but AB losses have to be comp. (0, 2): 2.0, # forces the undirected arc to be used and installed (0, 3): 0.9, # forces the undirected arc to be used and installed }, ) # add arcs # I1A mynet.add_preexisting_directed_arc( node_key_a=imp1_node_key, node_key_b=node_A, efficiency=None, static_loss=None, capacity=1.1, capacity_is_instantaneous=False, ) # I2B mynet.add_preexisting_directed_arc( node_key_a=imp2_node_key, node_key_b=node_B, efficiency=None, static_loss=None, capacity=1.1, capacity_is_instantaneous=False, ) efficiency_AB = {(0, 0): 1, (0, 1): 1, (0, 2): 1, (0, 3): 1} efficiency_BA = {(0, 0): 1, (0, 1): 1, (0, 2): 1, (0, 3): 1} # AB static_loss_AB = { (0, q, 0): 0.1, (0, q, 1): 0.1, (0, q, 2): 0.1, (0, q, 3): 0.1, } arcs_ab = Arcs( name="AB", efficiency=efficiency_AB, efficiency_reverse=efficiency_BA, static_loss=static_loss_AB, capacity=(1,), minimum_cost=(0.05,), specific_capacity_cost=0, capacity_is_instantaneous=False, validate=True, ) arc_key_AB_und = mynet.add_undirected_arc( node_key_a=node_A, node_key_b=node_B, arcs=arcs_ab ) # identify node types mynet.identify_node_types() # no sos, regular time intervals for static_losses_mode in InfrastructurePlanningProblem.STATIC_LOSS_MODES: ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, networks={"mynet": mynet}, time_frame=tf, static_losses_mode=static_losses_mode, mandatory_arcs=[], max_number_parallel_arcs={} ) # all arcs should be installed (they are not new) assert ( True in ipp.networks["mynet"] .edges[(imp1_node_key, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(imp2_node_key, node_B, 0)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(node_A, node_B, arc_key_AB_und)][Network.KEY_ARC_TECH] .options_selected ) # overview (imports_qpk, exports_qpk, balance_qpk, import_costs_qpk, export_revenue_qpk, ncf_qpk, aggregate_static_demand_qpk, aggregate_static_supply_qpk, aggregate_static_balance_qpk) = statistics(ipp) # there should be imports abs_tol = 1e-6 imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0) assert math.isclose( imports_qp, (1 + 1 + 2 + 0.3 + 1), abs_tol=abs_tol ) # there should be no exports abs_tol = 1e-6 exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q]) export_revenue_qp = sum(export_revenue_qpk[(q, 0, k)] for k in tf.time_intervals[q]) assert math.isclose(exports_qp, 0, abs_tol=abs_tol) # flow through I1A must be 1.1 during time interval 0 # flow through I1A must be 0.0 during time interval 1 # flow through I1A must be 1.0 during time interval 2 (flow from B to A) # flow through I1A must be 1.0 during time interval 3 (because AB is used from B to A) abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp1_node_key, node_A, 0, 0, 0)]), 1.1, abs_tol=abs_tol, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp1_node_key, node_A, 0, 0, 1)]), 0.0, abs_tol=abs_tol, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp1_node_key, node_A, 0, 0, 2)]), 1.0, abs_tol=abs_tol, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp1_node_key, node_A, 0, 0, 3)]), 1.0, abs_tol=abs_tol, ) # flow through I2B must be 0.0 during time interval 0 # flow through I2B must be 1.1 during time interval 1 # flow through I2B must be 1.1 during time interval 2 # flow through I2B must be 0.0 during time interval 3 abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp2_node_key, node_B, 0, 0, 0)]), 0.0, abs_tol=abs_tol, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp2_node_key, node_B, 0, 0, 1)]), 1.1, abs_tol=abs_tol, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp2_node_key, node_B, 0, 0, 2)]), 1.1, abs_tol=abs_tol, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp2_node_key, node_B, 0, 0, 3)]), 0, abs_tol=abs_tol, ) # validation if static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_ARR: # arrival node # losses are always in B # flow from A to B must be 0.1 during time interval 0 # flow from B to A must be 0 during time interval 0 abs_tol = 1e-3 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 0) ] ), 0.1, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 0) ] ), 0, abs_tol=abs_tol, ) # flow from A to B must be 0 during time interval 1 # flow from B to A must be 0 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 1) ] ), 0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 1) ] ), 0, abs_tol=abs_tol, ) # flow from A to B must be 1.0 during time interval 2 # flow from B to A must be 0 during time interval 2 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 2) ] ), 1.0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 2) ] ), 0, abs_tol=abs_tol, ) # flow from A to B must be 1.0 during time interval 3 # flow from B to A must be 0 during time interval 3 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 3) ] ), 1.0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 3) ] ), 0, abs_tol=abs_tol, ) elif static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP: # departure node # losses are always in A # flow from A to B must be 0 during time interval 0 # flow from B to A must be 0 during time interval 0 abs_tol = 1e-3 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 0) ] ), 0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 0) ] ), 0, abs_tol=abs_tol, ) # flow from A to B must be 0 during time interval 1 # flow from B to A must be 0.1 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 1) ] ), 0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 1) ] ), 0.1, abs_tol=abs_tol, ) # flow from A to B must be 0.9 during time interval 2 # flow from B to A must be 0 during time interval 2 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 2) ] ), 0.9, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 2) ] ), 0, abs_tol=abs_tol, ) # flow from A to B must be 0.9 during time interval 3 # flow from B to A must be 0 during time interval 3 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 3) ] ), 0.9, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 3) ] ), 0, abs_tol=abs_tol, ) elif static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_US: # upstream # flow from A to B must be 0 during time interval 0 # flow from B to A must be 0 during time interval 0 abs_tol = 1e-3 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 0) ] ), 0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 0) ] ), 0, abs_tol=abs_tol, ) # flow from A to B must be 0 during time interval 1 # flow from B to A must be 0 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 1) ] ), 0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 1) ] ), 0, abs_tol=abs_tol, ) # flow from A to B must be 0.9 during time interval 2 # flow from B to A must be 0 during time interval 2 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 2) ] ), 0.9, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 2) ] ), 0, abs_tol=abs_tol, ) # flow from A to B must be 0.9 during time interval 3 # flow from B to A must be 0 during time interval 3 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 3) ] ), 0.9, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 3) ] ), 0, abs_tol=abs_tol, ) else: # downstream # flow from A to B must be 0 during time interval 0 # flow from B to A must be 0 during time interval 0 abs_tol = 1e-3 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 0) ] ), 0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 0) ] ), 0, abs_tol=abs_tol, ) # flow from A to B must be 0 during time interval 1 # flow from B to A must be 0.1 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 1) ] ), 0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 1) ] ), 0.1, abs_tol=abs_tol, ) # flow from A to B must be 1.0 during time interval 2 # flow from B to A must be 0 during time interval 2 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 2) ] ), 1, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 2) ] ), 0, abs_tol=abs_tol, ) # flow from A to B must be 1.0 during time interval 3 # flow from B to A must be 0 during time interval 3 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 3) ] ), 1.0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 3) ] ), 0, abs_tol=abs_tol, ) # ************************************************************************* # ************************************************************************* def test_undirected_arc_static_downstream_preexisting(self): # assessment q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0, 1)}, reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)}, time_intervals={q: (0, 1, 2, 3)}, time_interval_durations={q: (1, 1, 1, 1)}, ) # 4 nodes: two import nodes, two supply/demand nodes mynet = Network() # import nodes imp1_prices = [ResourcePrice(prices=k, volumes=None) for k in [1, 2, 1, 1]] imp1_node_key = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=imp1_node_key, prices={ qpk: imp1_prices[qpk[2]] for qpk in tf.qpk() }, ) imp2_prices = [ResourcePrice(prices=k, volumes=None) for k in [2, 1, 2, 2]] imp2_node_key = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=imp2_node_key, prices={ qpk: imp2_prices[qpk[2]] for qpk in tf.qpk() }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_A, base_flow={ (0, 0): 1.0, # to be provided via I1 but AB losses have to be comp. (0, 1): 0.0, (0, 2): 0.0, (0, 3): 0.0, }, ) node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_B, base_flow={ (0, 0): 0.0, (0, 1): 1.0, # to be provided via I2 but AB losses have to be comp. (0, 2): 2.0, # forces the undirected arc to be used and installed (0, 3): 0.9, # forces the undirected arc to be used and installed }, ) # add arcs # I1A mynet.add_preexisting_directed_arc( node_key_a=imp1_node_key, node_key_b=node_A, efficiency=None, static_loss=None, capacity=1.1, capacity_is_instantaneous=False, ) # I2B mynet.add_preexisting_directed_arc( node_key_a=imp2_node_key, node_key_b=node_B, efficiency=None, static_loss=None, capacity=1.1, capacity_is_instantaneous=False, ) efficiency_AB = {(0, 0): 1, (0, 1): 1, (0, 2): 1, (0, 3): 1} efficiency_BA = {(0, 0): 1, (0, 1): 1, (0, 2): 1, (0, 3): 1} # AB static_loss_AB = { (0, q, 0): 0.1, (0, q, 1): 0.1, (0, q, 2): 0.1, (0, q, 3): 0.1, } arc_key_AB_und = mynet.add_preexisting_undirected_arc( node_key_a=node_A, node_key_b=node_B, efficiency=efficiency_AB, efficiency_reverse=efficiency_BA, static_loss=static_loss_AB, capacity=1, capacity_is_instantaneous=False, ) # identify node types mynet.identify_node_types() # no sos, regular time intervals for static_losses_mode in InfrastructurePlanningProblem.STATIC_LOSS_MODES: ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, networks={"mynet": mynet}, time_frame=tf, static_losses_mode=static_losses_mode, mandatory_arcs=[], max_number_parallel_arcs={} ) # all arcs should be installed (they are not new) assert ( True in ipp.networks["mynet"] .edges[(imp1_node_key, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(imp2_node_key, node_B, 0)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(node_A, node_B, arc_key_AB_und)][Network.KEY_ARC_TECH] .options_selected ) # overview (imports_qpk, exports_qpk, balance_qpk, import_costs_qpk, export_revenue_qpk, ncf_qpk, aggregate_static_demand_qpk, aggregate_static_supply_qpk, aggregate_static_balance_qpk) = statistics(ipp) # there should be imports abs_tol = 1e-6 imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0) assert math.isclose( imports_qp, (1 + 1 + 2 + 0.3 + 1), abs_tol=abs_tol ) # there should be no exports abs_tol = 1e-6 exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q]) export_revenue_qp = sum(export_revenue_qpk[(q, 0, k)] for k in tf.time_intervals[q]) assert math.isclose(exports_qp, 0, abs_tol=abs_tol) # flow through I1A must be 1.1 during time interval 0 # flow through I1A must be 0.0 during time interval 1 # flow through I1A must be 1.0 during time interval 2 (flow from B to A) # flow through I1A must be 1.0 during time interval 3 (because AB is used from B to A) abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp1_node_key, node_A, 0, 0, 0)]), 1.1, abs_tol=abs_tol, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp1_node_key, node_A, 0, 0, 1)]), 0.0, abs_tol=abs_tol, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp1_node_key, node_A, 0, 0, 2)]), 1.0, abs_tol=abs_tol, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp1_node_key, node_A, 0, 0, 3)]), 1.0, abs_tol=abs_tol, ) # flow through I2B must be 0.0 during time interval 0 # flow through I2B must be 1.1 during time interval 1 # flow through I2B must be 1.1 during time interval 2 # flow through I2B must be 0.0 during time interval 3 abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp2_node_key, node_B, 0, 0, 0)]), 0.0, abs_tol=abs_tol, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp2_node_key, node_B, 0, 0, 1)]), 1.1, abs_tol=abs_tol, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp2_node_key, node_B, 0, 0, 2)]), 1.1, abs_tol=abs_tol, ) assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp2_node_key, node_B, 0, 0, 3)]), 0, abs_tol=abs_tol, ) # validation if static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_ARR: # arrival node # losses are always in B # flow from A to B must be 0.1 during time interval 0 # flow from B to A must be 0 during time interval 0 abs_tol = 1e-3 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 0) ] ), 0.1, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 0) ] ), 0, abs_tol=abs_tol, ) # flow from A to B must be 0 during time interval 1 # flow from B to A must be 0 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 1) ] ), 0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 1) ] ), 0, abs_tol=abs_tol, ) # flow from A to B must be 1.0 during time interval 2 # flow from B to A must be 0 during time interval 2 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 2) ] ), 1.0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 2) ] ), 0, abs_tol=abs_tol, ) # flow from A to B must be 1.0 during time interval 3 # flow from B to A must be 0 during time interval 3 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 3) ] ), 1.0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 3) ] ), 0, abs_tol=abs_tol, ) elif static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP: # departure node # losses are always in A # flow from A to B must be 0 during time interval 0 # flow from B to A must be 0 during time interval 0 abs_tol = 1e-3 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 0) ] ), 0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 0) ] ), 0, abs_tol=abs_tol, ) # flow from A to B must be 0 during time interval 1 # flow from B to A must be 0.1 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 1) ] ), 0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 1) ] ), 0.1, abs_tol=abs_tol, ) # flow from A to B must be 0.9 during time interval 2 # flow from B to A must be 0 during time interval 2 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 2) ] ), 0.9, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 2) ] ), 0, abs_tol=abs_tol, ) # flow from A to B must be 0.9 during time interval 3 # flow from B to A must be 0 during time interval 3 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 3) ] ), 0.9, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 3) ] ), 0, abs_tol=abs_tol, ) elif static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_US: # upstream # flow from A to B must be 0 during time interval 0 # flow from B to A must be 0 during time interval 0 abs_tol = 1e-3 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 0) ] ), 0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 0) ] ), 0, abs_tol=abs_tol, ) # flow from A to B must be 0 during time interval 1 # flow from B to A must be 0 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 1) ] ), 0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 1) ] ), 0, abs_tol=abs_tol, ) # flow from A to B must be 0.9 during time interval 2 # flow from B to A must be 0 during time interval 2 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 2) ] ), 0.9, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 2) ] ), 0, abs_tol=abs_tol, ) # flow from A to B must be 0.9 during time interval 3 # flow from B to A must be 0 during time interval 3 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 3) ] ), 0.9, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 3) ] ), 0, abs_tol=abs_tol, ) else: # downstream # flow from A to B must be 0 during time interval 0 # flow from B to A must be 0 during time interval 0 abs_tol = 1e-3 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 0) ] ), 0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 0) ] ), 0, abs_tol=abs_tol, ) # flow from A to B must be 0 during time interval 1 # flow from B to A must be 0.1 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 1) ] ), 0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 1) ] ), 0.1, abs_tol=abs_tol, ) # flow from A to B must be 1.0 during time interval 2 # flow from B to A must be 0 during time interval 2 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 2) ] ), 1, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 2) ] ), 0, abs_tol=abs_tol, ) # flow from A to B must be 1.0 during time interval 3 # flow from B to A must be 0 during time interval 3 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, 0, 3) ] ), 1.0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, 0, 3) ] ), 0, abs_tol=abs_tol, ) # ************************************************************************* # ************************************************************************* def test_report_directed_network_static_losses_new(self): # assessment q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0, 1)}, reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)}, time_intervals={q: (0,)}, time_interval_durations={q: (1,)}, ) # 4 nodes: one import, one export, two supply/demand nodes mynet = Network() # import node imp_node_key = 'thatimpnode' mynet.add_import_node( node_key=imp_node_key, prices={ qpk: ResourcePrice(prices=1 + 0.05*qpk[2], volumes=None) for qpk in tf.qpk() }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_waypoint_node(node_key=node_A) node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_B, base_flow={(q, 0): 0.2}) # add arcs # IA arc mynet.add_infinite_capacity_arc( node_key_a=imp_node_key, node_key_b=node_A, efficiency={(q, 0): 1}, static_loss=None, ) # AB arc arc_tech_AB = Arcs( name="AB", efficiency={(q, 0): 0.8}, efficiency_reverse=None, validate=False, capacity=[1.0], minimum_cost=[0], # [0] specific_capacity_cost=0, capacity_is_instantaneous=False, static_loss={(0, q, 0): 0.10}, ) mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_B, arcs=arc_tech_AB) # identify node types mynet.identify_node_types() # no sos, regular time intervals for static_losses_mode in [ InfrastructurePlanningProblem.STATIC_LOSS_MODE_ARR, InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP ]: # reset decisions if necessary if True in mynet.edges[(node_A, node_B, 0)][Network.KEY_ARC_TECH].options_selected: mynet.edges[(node_A, node_B, 0)][ Network.KEY_ARC_TECH].options_selected[ mynet.edges[(node_A, node_B, 0)][ Network.KEY_ARC_TECH].options_selected.index(True) ] = False ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=static_losses_mode, mandatory_arcs=[], max_number_parallel_arcs={} ) # all arcs should be installed (they are not new) assert ( True in ipp.networks["mynet"] .edges[(imp_node_key, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(node_A, node_B, 0)][Network.KEY_ARC_TECH] .options_selected ) # overview (imports_qpk, exports_qpk, balance_qpk, import_costs_qpk, export_revenue_qpk, ncf_qpk, aggregate_static_demand_qpk, aggregate_static_supply_qpk, aggregate_static_balance_qpk) = statistics(ipp) # there should be imports abs_tol = 1e-6 imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0) assert math.isclose(imports_qp, 0.35, abs_tol=abs_tol) # there should be no exports abs_tol = 1e-6 exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q]) export_revenue_qp = sum(export_revenue_qpk[(q, 0, k)] for k in tf.time_intervals[q]) assert math.isclose(exports_qp, 0, abs_tol=abs_tol) # flow through IA must be 0.35 abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp_node_key, node_A, 0, 0, 0)]), 0.35, abs_tol=abs_tol, ) # validation if static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_ARR: # losses are downstream # flow through AB must be 0.35 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_B, 0, 0, 0)]), 0.35, abs_tol=abs_tol, ) else: # losses are upstream # flow through AB must be 0.25 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_B, 0, 0, 0)]), 0.25, abs_tol=abs_tol, ) # ************************************************************************* # ************************************************************************* def test_report_directed_network_static_losses_pre(self): # assessment q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0, 1)}, reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)}, time_intervals={q: (0,)}, time_interval_durations={q: (1,)}, ) # 4 nodes: one import, one export, two supply/demand nodes mynet = Network() # import node imp_node_key = 'thatimpnode' mynet.add_import_node( node_key=imp_node_key, prices={ qpk: ResourcePrice(prices=1 + 0.05*qpk[2], volumes=None) for qpk in tf.qpk() }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_waypoint_node(node_key=node_A) node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_B, base_flow={(q, 0): 0.2}) # add arcs # IA arc mynet.add_infinite_capacity_arc( node_key_a=imp_node_key, node_key_b=node_A, efficiency={(q, 0): 1}, static_loss=None, ) # AB arc mynet.add_preexisting_directed_arc( node_key_a=node_A, node_key_b=node_B, efficiency={(q, 0): 0.8}, static_loss={(0, q, 0): 0.10}, capacity=1.0, capacity_is_instantaneous=False, ) # arc_tech_AB = Arcs( # name="AB", # efficiency={(q, 0): 0.8}, # efficiency_reverse=None, # validate=False, # capacity=[1.0], # minimum_cost=[0], # [0] # specific_capacity_cost=0, # capacity_is_instantaneous=False, # static_loss={(0, q, 0): 0.10}, # ) # arc_tech_AB.options_selected[0] = True # mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_B, arcs=arc_tech_AB) # identify node types mynet.identify_node_types() # no sos, regular time intervals for static_losses_mode in [ InfrastructurePlanningProblem.STATIC_LOSS_MODE_ARR, InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP ]: # TODO: make this work with GLPK and SCIP ipp = self.build_solve_ipp( solver='cbc', # does not work with GLPK nor SCIP solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=static_losses_mode, mandatory_arcs=[], max_number_parallel_arcs={} ) # all arcs should be installed (they are not new) assert ( True in ipp.networks["mynet"] .edges[(imp_node_key, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(node_A, node_B, 0)][Network.KEY_ARC_TECH] .options_selected ) # overview (imports_qpk, exports_qpk, balance_qpk, import_costs_qpk, export_revenue_qpk, ncf_qpk, aggregate_static_demand_qpk, aggregate_static_supply_qpk, aggregate_static_balance_qpk) = statistics(ipp) # there should be imports abs_tol = 1e-6 imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0) assert math.isclose(imports_qp, 0.35, abs_tol=abs_tol) # there should be no exports abs_tol = 1e-6 exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q]) export_revenue_qp = sum(export_revenue_qpk[(q, 0, k)] for k in tf.time_intervals[q]) assert math.isclose(exports_qp, 0, abs_tol=abs_tol) # flow through IA must be 0.35 abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp_node_key, node_A, 0, q, 0)]), 0.35, abs_tol=abs_tol, ) # validation if static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_ARR: # losses are downstream # flow through AB must be 0.35 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_B, 0, q, 0)]), 0.35, abs_tol=abs_tol, ) else: # losses are upstream # flow through AB must be 0.25 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_B, 0, q, 0)]), 0.25, abs_tol=abs_tol, ) # ************************************************************************* # ************************************************************************* def test_report_undirected_network_static_losses_new_nom(self): # static losses on undirected arcs (example from the report) # time frame q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,1)}, reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)}, time_intervals={q: (0,1)}, time_interval_durations={q: (1,1)}, ) # 3 nodes: one import, two regular nodes mynet = Network() # import node imp_node_key = 'thatimpnode' mynet.add_import_node( node_key=imp_node_key, prices={ qpk: ResourcePrice(prices=1 + qpk[2], volumes=None) for qpk in tf.qpk() }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 0.0, (q, 1): 0.4}) node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_B, base_flow={(q, 0): 0.2, (q, 1): -0.6}) # add arcs # IA arc mynet.add_infinite_capacity_arc( node_key_a=imp_node_key, node_key_b=node_A, efficiency=None, static_loss=None ) AB_efficiency = {(q, 0): 0.8, (q, 1): 0.8} BA_efficiency = {(q, 0): 0.5, (q, 1): 0.5} # new AB arc arc_tech_AB = Arcs( name="AB", efficiency=AB_efficiency, efficiency_reverse=BA_efficiency, validate=False, capacity=[1.0], minimum_cost=[0.01], specific_capacity_cost=0, capacity_is_instantaneous=False, static_loss={(0, q, 0): 0.10, (0, q, 1): 0.10}, ) arc_key_AB_und = mynet.add_undirected_arc( node_key_a=node_A, node_key_b=node_B, arcs=arc_tech_AB ) # identify node types mynet.identify_node_types() # no sos, regular time intervals for static_losses_mode in InfrastructurePlanningProblem.STATIC_LOSS_MODES: # reset decisions if necessary if True in mynet.edges[(node_A, node_B, arc_key_AB_und)][Network.KEY_ARC_TECH].options_selected: mynet.edges[(node_A, node_B, arc_key_AB_und)][ Network.KEY_ARC_TECH].options_selected[ mynet.edges[(node_A, node_B, arc_key_AB_und)][ Network.KEY_ARC_TECH].options_selected.index(True) ] = False ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, networks={"mynet": mynet}, time_frame=tf, static_losses_mode=static_losses_mode, mandatory_arcs=[], max_number_parallel_arcs={} ) # all arcs should be installed (they are not new) assert ( True in ipp.networks["mynet"] .edges[(imp_node_key, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(node_A, node_B, arc_key_AB_und)][Network.KEY_ARC_TECH] .options_selected ) # overview (imports_qpk, exports_qpk, balance_qpk, import_costs_qpk, export_revenue_qpk, ncf_qpk, aggregate_static_demand_qpk, aggregate_static_supply_qpk, aggregate_static_balance_qpk) = statistics(ipp) # the flow through AB should be from A to B during interval 0 abs_tol = 1e-6 assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 1, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 0) ] ), 0, abs_tol=abs_tol, ) # the flow through AB should be from B to A during interval 1 assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 1) ] ), 0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 1, abs_tol=abs_tol, ) # there should be imports abs_tol = 1e-6 imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0) assert math.isclose(imports_qp, (0.35 + 0.15), abs_tol=abs_tol) # there should be no exports abs_tol = 1e-6 exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q]) export_revenue_qp = sum(export_revenue_qpk[(q, 0, k)] for k in tf.time_intervals[q]) assert math.isclose(exports_qp, 0, abs_tol=abs_tol) # flow through IA must be 0.35 during time interval 0 # flow through IA must be 0.15 during time interval 1 abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp_node_key, node_A, 0, q, 0)]), 0.35, abs_tol=abs_tol, ) abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp_node_key, node_A, 0, q, 1)]), 0.15, abs_tol=abs_tol, ) # flow from B to A must be 0 durng time interval 0 # flow from A to B must be 0 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[("mynet", node_B, node_A, arc_key_AB_und, 0, 0)] ), 0.0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[("mynet", node_A, node_B, arc_key_AB_und, 0, 1)] ), 0.0, abs_tol=abs_tol, ) # validation if static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_ARR: # arrival node # flow from A to B must be 0.35 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 0.35, abs_tol=abs_tol, ) # flow from B to A must be 0.6 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 0.5, abs_tol=abs_tol, ) elif static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP: # departure node # flow from A to B must be 0.25 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 0.25, abs_tol=abs_tol, ) # flow from B to A must be 0.6 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 0.6, abs_tol=abs_tol, ) elif static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_US: # upstream # flow from A to B must be 0.25 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 0.25, abs_tol=abs_tol, ) # flow from B to A must be 0.6 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 0.5, abs_tol=abs_tol, ) else: # downstream # flow from A to B must be 0.35 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 0.35, abs_tol=abs_tol, ) # flow from B to A must be 0.6 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 0.6, abs_tol=abs_tol, ) # ************************************************************************* # ************************************************************************* def test_report_undirected_network_static_losses_pre_nom(self): # static losses on undirected arcs (example from the report) # time frame q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,1)}, reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)}, time_intervals={q: (0,1)}, time_interval_durations={q: (1,1)}, ) # 3 nodes: one import, two regular nodes mynet = Network() # import node imp_node_key = 'thatimpnode' mynet.add_import_node( node_key=imp_node_key, prices={ qpk: ResourcePrice(prices=1 + qpk[2], volumes=None) for qpk in tf.qpk() }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 0.0, (q, 1): 0.4}) node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_B, base_flow={(q, 0): 0.2, (q, 1): -0.6}) # add arcs # IA arc mynet.add_infinite_capacity_arc( node_key_a=imp_node_key, node_key_b=node_A, efficiency=None, static_loss=None ) AB_efficiency = {(q, 0): 0.8, (q, 1): 0.8} BA_efficiency = {(q, 0): 0.5, (q, 1): 0.5} # pre-existing AB arc arc_key_AB_und = mynet.add_preexisting_undirected_arc( node_key_a=node_A, node_key_b=node_B, efficiency=AB_efficiency, efficiency_reverse=BA_efficiency, static_loss={(0, q, 0): 0.10, (0, q, 1): 0.10}, capacity=1.0, capacity_is_instantaneous=False, ) # identify node types mynet.identify_node_types() # no sos, regular time intervals for static_losses_mode in InfrastructurePlanningProblem.STATIC_LOSS_MODES: # reset decisions if necessary if True in mynet.edges[(node_A, node_B, arc_key_AB_und)][Network.KEY_ARC_TECH].options_selected: mynet.edges[(node_A, node_B, arc_key_AB_und)][ Network.KEY_ARC_TECH].options_selected[ mynet.edges[(node_A, node_B, arc_key_AB_und)][ Network.KEY_ARC_TECH].options_selected.index(True) ] = False ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, networks={"mynet": mynet}, time_frame=tf, static_losses_mode=static_losses_mode, mandatory_arcs=[], max_number_parallel_arcs={} ) # all arcs should be installed (they are not new) assert ( True in ipp.networks["mynet"] .edges[(imp_node_key, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(node_A, node_B, arc_key_AB_und)][Network.KEY_ARC_TECH] .options_selected ) # overview (imports_qpk, exports_qpk, balance_qpk, import_costs_qpk, export_revenue_qpk, ncf_qpk, aggregate_static_demand_qpk, aggregate_static_supply_qpk, aggregate_static_balance_qpk) = statistics(ipp) # the flow through AB should be from A to B during interval 0 abs_tol = 1e-6 assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 1, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 0) ] ), 0, abs_tol=abs_tol, ) # the flow through AB should be from B to A during interval 1 assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 1) ] ), 0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 1, abs_tol=abs_tol, ) # there should be imports abs_tol = 1e-6 imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0) assert math.isclose(imports_qp, (0.35 + 0.15), abs_tol=abs_tol) # there should be no exports abs_tol = 1e-6 exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q]) export_revenue_qp = sum(export_revenue_qpk[(q, 0, k)] for k in tf.time_intervals[q]) assert math.isclose(exports_qp, 0, abs_tol=abs_tol) # flow through IA must be 0.35 during time interval 0 # flow through IA must be 0.15 during time interval 1 abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp_node_key, node_A, 0, q, 0)]), 0.35, abs_tol=abs_tol, ) abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp_node_key, node_A, 0, q, 1)]), 0.15, abs_tol=abs_tol, ) # flow from B to A must be 0 durng time interval 0 # flow from A to B must be 0 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[("mynet", node_B, node_A, arc_key_AB_und, 0, 0)] ), 0.0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[("mynet", node_A, node_B, arc_key_AB_und, 0, 1)] ), 0.0, abs_tol=abs_tol, ) # validation if static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_ARR: # arrival node # flow from A to B must be 0.35 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 0.35, abs_tol=abs_tol, ) # flow from B to A must be 0.6 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 0.5, abs_tol=abs_tol, ) elif static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP: # departure node # flow from A to B must be 0.25 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 0.25, abs_tol=abs_tol, ) # flow from B to A must be 0.6 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 0.6, abs_tol=abs_tol, ) elif static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_US: # upstream # flow from A to B must be 0.25 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 0.25, abs_tol=abs_tol, ) # flow from B to A must be 0.6 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 0.5, abs_tol=abs_tol, ) else: # downstream # flow from A to B must be 0.35 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 0.35, abs_tol=abs_tol, ) # flow from B to A must be 0.6 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 0.6, abs_tol=abs_tol, ) # ************************************************************************* # ************************************************************************* def test_report_undirected_network_static_losses_new_rev(self): # static losses on undirected arcs (example from the report) # time frame q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,1)}, reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)}, time_intervals={q: (0,1)}, time_interval_durations={q: (1,1)}, ) # 3 nodes: one import, two regular nodes mynet = Network() # import node imp_node_key = 'thatimpnode' mynet.add_import_node( node_key=imp_node_key, prices={ qpk: ResourcePrice(prices=1 + qpk[2], volumes=None) for qpk in tf.qpk() }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 0.0, (q, 1): 0.4}) node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_B, base_flow={(q, 0): 0.2, (q, 1): -0.6}) # add arcs # IA arc mynet.add_infinite_capacity_arc( node_key_a=imp_node_key, node_key_b=node_A, efficiency=None, static_loss=None ) AB_efficiency = {(q, 0): 0.8, (q, 1): 0.8} BA_efficiency = {(q, 0): 0.5, (q, 1): 0.5} # new AB arc arc_tech_AB = Arcs( name="AB", efficiency=BA_efficiency, efficiency_reverse=AB_efficiency, validate=False, capacity=[1.0], minimum_cost=[0.01], specific_capacity_cost=0, capacity_is_instantaneous=False, static_loss={(0, q, 0): 0.10, (0, q, 1): 0.10}, ) arc_key_AB_und = mynet.add_undirected_arc( node_key_a=node_B, node_key_b=node_A, arcs=arc_tech_AB ) # identify node types mynet.identify_node_types() # no sos, regular time intervals for static_losses_mode in InfrastructurePlanningProblem.STATIC_LOSS_MODES: # reset decisions if necessary if True in mynet.edges[(node_B, node_A, arc_key_AB_und)][Network.KEY_ARC_TECH].options_selected: mynet.edges[(node_B, node_A, arc_key_AB_und)][ Network.KEY_ARC_TECH].options_selected[ mynet.edges[(node_B, node_A, arc_key_AB_und)][ Network.KEY_ARC_TECH].options_selected.index(True) ] = False ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, networks={"mynet": mynet}, time_frame=tf, static_losses_mode=static_losses_mode, mandatory_arcs=[], max_number_parallel_arcs={} ) # all arcs should be installed (they are not new) assert ( True in ipp.networks["mynet"] .edges[(imp_node_key, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(node_B, node_A, arc_key_AB_und)][Network.KEY_ARC_TECH] .options_selected ) # overview (imports_qpk, exports_qpk, balance_qpk, import_costs_qpk, export_revenue_qpk, ncf_qpk, aggregate_static_demand_qpk, aggregate_static_supply_qpk, aggregate_static_balance_qpk) = statistics(ipp) # the flow through AB should be from A to B during interval 0 abs_tol = 1e-6 assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 1, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 0) ] ), 0, abs_tol=abs_tol, ) # the flow through AB should be from B to A during interval 1 assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 1) ] ), 0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 1, abs_tol=abs_tol, ) # there should be imports abs_tol = 1e-6 imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0) assert math.isclose(imports_qp, (0.35 + 0.15), abs_tol=abs_tol) # there should be no exports abs_tol = 1e-6 exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q]) export_revenue_qp = sum(export_revenue_qpk[(q, 0, k)] for k in tf.time_intervals[q]) assert math.isclose(exports_qp, 0, abs_tol=abs_tol) # flow through IA must be 0.35 during time interval 0 # flow through IA must be 0.15 during time interval 1 abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp_node_key, node_A, 0, q, 0)]), 0.35, abs_tol=abs_tol, ) abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp_node_key, node_A, 0, q, 1)]), 0.15, abs_tol=abs_tol, ) # flow from B to A must be 0 durng time interval 0 # flow from A to B must be 0 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[("mynet", node_B, node_A, arc_key_AB_und, 0, 0)] ), 0.0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[("mynet", node_A, node_B, arc_key_AB_und, 0, 1)] ), 0.0, abs_tol=abs_tol, ) # validation if static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_ARR: # arrival node # flow from A to B must be 0.25 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 0.25, abs_tol=abs_tol, ) # flow from B to A must be 0.6 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 0.6, abs_tol=abs_tol, ) elif static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP: # departure node # arrival node # flow from A to B must be 0.35 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 0.35, abs_tol=abs_tol, ) # flow from B to A must be 0.6 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 0.5, abs_tol=abs_tol, ) elif static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_US: # upstream # flow from A to B must be 0.25 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 0.25, abs_tol=abs_tol, ) # flow from B to A must be 0.6 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 0.5, abs_tol=abs_tol, ) else: # downstream # flow from A to B must be 0.35 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 0.35, abs_tol=abs_tol, ) # flow from B to A must be 0.6 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 0.6, abs_tol=abs_tol, ) # ************************************************************************* # ************************************************************************* def test_report_undirected_network_static_losses_pre_rev(self): # static losses on undirected arcs (example from the report) # time frame q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,1)}, reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)}, time_intervals={q: (0,1)}, time_interval_durations={q: (1,1)}, ) # 3 nodes: one import, two regular nodes mynet = Network() # import node imp_node_key = 'thatimpnode' mynet.add_import_node( node_key=imp_node_key, prices={ qpk: ResourcePrice(prices=1 + qpk[2], volumes=None) for qpk in tf.qpk() }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 0.0, (q, 1): 0.4}) node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node(node_key=node_B, base_flow={(q, 0): 0.2, (q, 1): -0.6}) # add arcs # IA arc mynet.add_infinite_capacity_arc( node_key_a=imp_node_key, node_key_b=node_A, efficiency=None, static_loss=None ) AB_efficiency = {(q, 0): 0.8, (q, 1): 0.8} BA_efficiency = {(q, 0): 0.5, (q, 1): 0.5} # pre-existing AB arc arc_key_AB_und = mynet.add_preexisting_undirected_arc( node_key_a=node_B, node_key_b=node_A, efficiency=BA_efficiency, efficiency_reverse=AB_efficiency, static_loss={(0, q, 0): 0.10, (0, q, 1): 0.10}, capacity=1.0, capacity_is_instantaneous=False, ) # identify node types mynet.identify_node_types() # no sos, regular time intervals for static_losses_mode in InfrastructurePlanningProblem.STATIC_LOSS_MODES: # # reset decisions if necessary # if True in mynet.edges[(node_A, node_B, arc_key_AB_und)][Network.KEY_ARC_TECH].options_selected: # mynet.edges[(node_A, node_B, arc_key_AB_und)][ # Network.KEY_ARC_TECH].options_selected[ # mynet.edges[(node_A, node_B, arc_key_AB_und)][ # Network.KEY_ARC_TECH].options_selected.index(True) # ] = False ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, networks={"mynet": mynet}, time_frame=tf, static_losses_mode=static_losses_mode, mandatory_arcs=[], max_number_parallel_arcs={} ) # all arcs should be installed (they are not new) assert ( True in ipp.networks["mynet"] .edges[(imp_node_key, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(node_B, node_A, arc_key_AB_und)][Network.KEY_ARC_TECH] .options_selected ) # overview (imports_qpk, exports_qpk, balance_qpk, import_costs_qpk, export_revenue_qpk, ncf_qpk, aggregate_static_demand_qpk, aggregate_static_supply_qpk, aggregate_static_balance_qpk) = statistics(ipp) # the flow through AB should be from A to B during interval 0 abs_tol = 1e-6 assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 1, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 0) ] ), 0, abs_tol=abs_tol, ) # the flow through AB should be from B to A during interval 1 assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 1) ] ), 0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_zeta_sns_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 1, abs_tol=abs_tol, ) # there should be imports abs_tol = 1e-6 imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0) assert math.isclose(imports_qp, (0.35 + 0.15), abs_tol=abs_tol) # there should be no exports abs_tol = 1e-6 exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q]) export_revenue_qp = sum(export_revenue_qpk[(q, 0, k)] for k in tf.time_intervals[q]) assert math.isclose(exports_qp, 0, abs_tol=abs_tol) # flow through IA must be 0.35 during time interval 0 # flow through IA must be 0.15 during time interval 1 abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp_node_key, node_A, 0, q, 0)]), 0.35, abs_tol=abs_tol, ) abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp_node_key, node_A, 0, q, 1)]), 0.15, abs_tol=abs_tol, ) # flow from B to A must be 0 durng time interval 0 # flow from A to B must be 0 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[("mynet", node_B, node_A, arc_key_AB_und, 0, 0)] ), 0.0, abs_tol=abs_tol, ) assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[("mynet", node_A, node_B, arc_key_AB_und, 0, 1)] ), 0.0, abs_tol=abs_tol, ) # validation if static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_ARR: # arrival node # flow from A to B must be 0.25 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 0.25, abs_tol=abs_tol, ) # flow from B to A must be 0.6 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 0.6, abs_tol=abs_tol, ) elif static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP: # departure node # arrival node # flow from A to B must be 0.35 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 0.35, abs_tol=abs_tol, ) # flow from B to A must be 0.6 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 0.5, abs_tol=abs_tol, ) elif static_losses_mode == InfrastructurePlanningProblem.STATIC_LOSS_MODE_US: # upstream # flow from A to B must be 0.25 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 0.25, abs_tol=abs_tol, ) # flow from B to A must be 0.6 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 0.5, abs_tol=abs_tol, ) else: # downstream # flow from A to B must be 0.35 during time interval 0 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_A, node_B, arc_key_AB_und, q, 0) ] ), 0.35, abs_tol=abs_tol, ) # flow from B to A must be 0.6 during time interval 1 assert math.isclose( pyo.value( ipp.instance.var_v_glljqk[ ("mynet", node_B, node_A, arc_key_AB_und, q, 1) ] ), 0.6, abs_tol=abs_tol, ) # ************************************************************************* # ************************************************************************* def test_directed_arc_static_upstream_new(self): # time number_intervals = 1 number_periods = 2 # time frame q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,1)}, reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)}, time_intervals={q: (0,)}, time_interval_durations={q: (1,)}, ) # 4 nodes: two import nodes, two supply/demand nodes mynet = Network() # import nodes imp1_node_key = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=imp1_node_key, prices={ (q, p, k): ResourcePrice(prices=1, volumes=None) for p in range(number_periods) for k in range(number_intervals) }, ) imp2_node_key = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=imp2_node_key, prices={ (q, p, k): ResourcePrice(prices=2, volumes=None) for p in range(number_periods) for k in range(number_intervals) }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_waypoint_node(node_key=node_A) node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_B, base_flow={ (q, 0): 1.0, }, ) # add arcs # I1A mynet.add_preexisting_directed_arc( node_key_a=imp1_node_key, node_key_b=node_A, efficiency=None, static_loss=None, capacity=1, capacity_is_instantaneous=False, ) # I2B arcs_i2b = Arcs( name="I2B", efficiency=None, efficiency_reverse=None, static_loss=None, capacity=(0.1,), minimum_cost=(0.025,), specific_capacity_cost=0, capacity_is_instantaneous=False, validate=True, ) mynet.add_directed_arc( node_key_a=imp2_node_key, node_key_b=node_B, arcs=arcs_i2b ) # AB arcs_ab = Arcs( name="IA1", efficiency=None, efficiency_reverse=None, static_loss={(0, q, 0): 0.1}, capacity=(1,), minimum_cost=(0.05,), specific_capacity_cost=0, capacity_is_instantaneous=False, validate=True, ) mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_B, arcs=arcs_ab) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver_options={}, plot_results=False, # True, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP, mandatory_arcs=[], max_number_parallel_arcs={} ) # all arcs should be installed (they are not new) assert ( True in ipp.networks["mynet"] .edges[(imp1_node_key, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(imp2_node_key, node_B, 0)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(node_A, node_B, 0)][Network.KEY_ARC_TECH] .options_selected ) # overview (imports_qpk, exports_qpk, balance_qpk, import_costs_qpk, export_revenue_qpk, ncf_qpk, aggregate_static_demand_qpk, aggregate_static_supply_qpk, aggregate_static_balance_qpk) = statistics(ipp) # there should be imports abs_tol = 1e-6 imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0) assert math.isclose(imports_qp, 1.1, abs_tol=abs_tol) # there should be no exports abs_tol = 1e-6 exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q]) export_revenue_qp = sum(export_revenue_qpk[(q, 0, k)] for k in tf.time_intervals[q]) assert math.isclose(exports_qp, 0, abs_tol=abs_tol) # interval 0: flow through IA1 must be 1 abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp1_node_key, node_A, 0, 0, 0)]), 1, abs_tol=abs_tol, ) # interval 0: flow through AB must be 0.9 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_B, 0, 0, 0)]), 0.9, abs_tol=abs_tol, ) # interval 0: flow through IB2 must be 0.1 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp2_node_key, node_B, 0, 0, 0)]), 0.1, abs_tol=abs_tol, ) # ************************************************************************* # ************************************************************************* def test_directed_arc_static_upstream_pre(self): # time number_intervals = 1 number_periods = 2 # time frame q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,1)}, reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)}, time_intervals={q: (0,)}, time_interval_durations={q: (1,)}, ) # 4 nodes: two import nodes, two supply/demand nodes mynet = Network() # import nodes imp1_node_key = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=imp1_node_key, prices={ (q, p, k): ResourcePrice(prices=1, volumes=None) for p in range(number_periods) for k in range(number_intervals) }, ) imp2_node_key = generate_pseudo_unique_key(mynet.nodes()) mynet.add_import_node( node_key=imp2_node_key, prices={ (q, p, k): ResourcePrice(prices=2, volumes=None) for p in range(number_periods) for k in range(number_intervals) }, ) # other nodes node_A = generate_pseudo_unique_key(mynet.nodes()) mynet.add_waypoint_node(node_key=node_A) node_B = generate_pseudo_unique_key(mynet.nodes()) mynet.add_source_sink_node( node_key=node_B, base_flow={ (q, 0): 1.0, }, ) # add arcs # I1A mynet.add_preexisting_directed_arc( node_key_a=imp1_node_key, node_key_b=node_A, efficiency=None, static_loss=None, capacity=1, capacity_is_instantaneous=False, ) # I2B mynet.add_preexisting_directed_arc( node_key_a=imp2_node_key, node_key_b=node_B, efficiency=None, static_loss=None, capacity=0.1, capacity_is_instantaneous=False, ) # AB mynet.add_preexisting_directed_arc( node_key_a=node_A, node_key_b=node_B, efficiency=None, static_loss={(0, q, 0): 0.1}, capacity=1, capacity_is_instantaneous=False, ) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver='cbc', # TODO: make this work with other solvers solver_options={}, plot_results=False, # True, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP, mandatory_arcs=[], max_number_parallel_arcs={} ) # all arcs should be installed (they are not new) assert ( True in ipp.networks["mynet"] .edges[(imp1_node_key, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(imp2_node_key, node_B, 0)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(node_A, node_B, 0)][Network.KEY_ARC_TECH] .options_selected ) # overview (imports_qpk, exports_qpk, balance_qpk, import_costs_qpk, export_revenue_qpk, ncf_qpk, aggregate_static_demand_qpk, aggregate_static_supply_qpk, aggregate_static_balance_qpk) = statistics(ipp) # there should be imports abs_tol = 1e-6 imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0) assert math.isclose(imports_qp, 1.1, abs_tol=abs_tol) # there should be no exports abs_tol = 1e-6 exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q]) export_revenue_qp = sum(export_revenue_qpk[(q, 0, k)] for k in tf.time_intervals[q]) assert math.isclose(exports_qp, 0, abs_tol=abs_tol) # interval 0: flow through IA1 must be 1 abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp1_node_key, node_A, 0, 0, 0)]), 1, abs_tol=abs_tol, ) # interval 0: flow through AB must be 0.9 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_B, 0, 0, 0)]), 0.9, abs_tol=abs_tol, ) # interval 0: flow through IB2 must be 0.1 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp2_node_key, node_B, 0, 0, 0)]), 0.1, abs_tol=abs_tol, ) # ************************************************************************* # ************************************************************************* def test_directed_arc_static_downstream_new(self): # time q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,1)}, reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)}, time_intervals={q: (0,)}, time_interval_durations={q: (1,)}, ) number_intervals = 1 number_periods = 2 # 4 nodes: one import, one export, two supply/demand nodes mynet = Network() # import node imp_node_key = 'thatimpnode' mynet.add_import_node( node_key=imp_node_key, prices={ (q, p, k): ResourcePrice(prices=1 + 0.1, volumes=None) for p in range(number_periods) for k in range(number_intervals) }, ) # other nodes node_A = 'A' mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 1.0}) # add arcs # IA1 mynet.add_preexisting_directed_arc( node_key_a=imp_node_key, node_key_b=node_A, efficiency={(q, 0): 0.9}, static_loss={(q, 0, 0): 0.1}, capacity=0.5, capacity_is_instantaneous=False, ) # IA2 arcs_ia2 = Arcs( name="IA2", efficiency=None, efficiency_reverse=None, static_loss=None, capacity=tuple([1.2]), minimum_cost=tuple([0.1]), specific_capacity_cost=0, capacity_is_instantaneous=False, validate=True, ) mynet.add_directed_arc(node_key_a=imp_node_key, node_key_b=node_A, arcs=arcs_ia2) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver='cbc', # TODO: make this work with other solvers solver_options={}, plot_results=False, # True, print_solver_output=False, networks={"mynet": mynet}, time_frame=tf, # static_losses_mode=True, static_losses_mode=InfrastructurePlanningProblem.STATIC_LOSS_MODE_ARR, mandatory_arcs=[], max_number_parallel_arcs={}, ) # ************************************************************************** # all arcs should be installed (they are not new) assert ( True in ipp.networks["mynet"] .edges[(imp_node_key, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(imp_node_key, node_A, 1)][Network.KEY_ARC_TECH] .options_selected ) # overview (imports_qpk, exports_qpk, balance_qpk, import_costs_qpk, export_revenue_qpk, ncf_qpk, aggregate_static_demand_qpk, aggregate_static_supply_qpk, aggregate_static_balance_qpk) = statistics(ipp) # there should be imports abs_tol = 1e-6 imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0) assert math.isclose(imports_qp, (1.0 + 0.1), abs_tol=abs_tol) # there should be no exports abs_tol = 1e-6 exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q]) export_revenue_qp = sum(export_revenue_qpk[(q, 0, k)] for k in tf.time_intervals[q]) assert math.isclose(exports_qp, 0, abs_tol=abs_tol) # flow through IA1 must be 0.1 abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp_node_key, node_A, 0, 0, 0)]), 0.1, abs_tol=abs_tol, ) # flow through IA2 must be 1.0 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp_node_key, node_A, 1, 0, 0)]), 1.0, abs_tol=abs_tol, ) # ************************************************************************* # ************************************************************************* def test_directed_arc_static_downstream_pre(self): # time q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,1)}, reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)}, time_intervals={q: (0,)}, time_interval_durations={q: (1,)}, ) number_intervals = 1 number_periods = 2 # 4 nodes: one import, one export, two supply/demand nodes mynet = Network() # import node imp_node_key = 'thatimpnode' mynet.add_import_node( node_key=imp_node_key, prices={ (q, p, k): ResourcePrice(prices=1 + 0.1, volumes=None) for p in range(number_periods) for k in range(number_intervals) }, ) # other nodes node_A = 'A' mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 1.0}) # add arcs # IA1 mynet.add_preexisting_directed_arc( node_key_a=imp_node_key, node_key_b=node_A, efficiency={(q, 0): 0.9}, static_loss={(q, 0, 0): 0.1}, capacity=0.5, capacity_is_instantaneous=False, ) # IA2 mynet.add_preexisting_directed_arc( node_key_a=imp_node_key, node_key_b=node_A, efficiency=None, static_loss=None, capacity=1.2, capacity_is_instantaneous=False, ) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver='cbc', # TODO: make this work with other solvers solver_options={}, plot_results=False, # True, print_solver_output=False, networks={"mynet": mynet}, time_frame=tf, # static_losses_mode=True, static_losses_mode=InfrastructurePlanningProblem.STATIC_LOSS_MODE_ARR, mandatory_arcs=[], max_number_parallel_arcs={}, ) # ************************************************************************** # all arcs should be installed (they are not new) assert ( True in ipp.networks["mynet"] .edges[(imp_node_key, node_A, 0)][Network.KEY_ARC_TECH] .options_selected ) assert ( True in ipp.networks["mynet"] .edges[(imp_node_key, node_A, 1)][Network.KEY_ARC_TECH] .options_selected ) # overview (imports_qpk, exports_qpk, balance_qpk, import_costs_qpk, export_revenue_qpk, ncf_qpk, aggregate_static_demand_qpk, aggregate_static_supply_qpk, aggregate_static_balance_qpk) = statistics(ipp) # there should be imports abs_tol = 1e-6 imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0) assert math.isclose(imports_qp, (1.0 + 0.1), abs_tol=abs_tol) # there should be no exports abs_tol = 1e-6 exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q]) assert math.isclose(exports_qp, 0, abs_tol=abs_tol) # flow through IA1 must be 0.1 abs_tol = 1e-6 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp_node_key, node_A, 0, 0, 0)]), 0.1, abs_tol=abs_tol, ) # flow through IA2 must be 1.0 assert math.isclose( pyo.value(ipp.instance.var_v_glljqk[("mynet", imp_node_key, node_A, 1, 0, 0)]), 1.0, abs_tol=abs_tol, ) # ************************************************************************* # ************************************************************************* # def test_problem_converter_sink(self): # # assessment # q = 0 # tf = EconomicTimeFrame( # discount_rate=3.5/100, # reporting_periods={q: (0,)}, # reporting_period_durations={q: (365 * 24 * 3600,)}, # time_intervals={q: (0,1,2)}, # time_interval_durations={q: (1,1,1)}, # ) # # 2 nodes: one import, one regular # mynet = Network() # # import node # node_IMP = generate_pseudo_unique_key(mynet.nodes()) # mynet.add_import_node( # node_key=node_IMP, # prices={ # qpk: ResourcePrice(prices=1.0, volumes=None) # for qpk in tf.qpk() # }, # ) # # other nodes # node_A = generate_pseudo_unique_key(mynet.nodes()) # mynet.add_source_sink_node( # node_key=node_A, # base_flow={(q, 0): 0.50, (q, 1): 0.00, (q, 2): 1.00}, # ) # # arc IA # arc_tech_IA = Arcs( # name="any", # # efficiency=[0.5, 0.5, 0.5], # efficiency={(q, 0): 0.5, (q, 1): 0.5, (q, 2): 0.5}, # efficiency_reverse=None, # static_loss=None, # capacity=[3], # minimum_cost=[2], # specific_capacity_cost=1, # capacity_is_instantaneous=False, # validate=False, # ) # mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA) # # identify node types # mynet.identify_node_types() # # converter # a_nnk = { # (0, 0, 0): 0.95, # (0, 0, 1): 0.95, # (0, 0, 2): 0.95, # } # b_nmk = { # (0, 0, 0): 3, # (0, 0, 1): 3, # (0, 0, 2): 3 # } # x_n0 = {0: 18} # # get the signals # inputs, states, outputs = get_two_node_model_signals( # tf.number_time_intervals(q) # ) # # create a discretised dynamic system from dictionaries # dds = dynsys.DiscretisedDynamicSystem( # a_nnk=a_nnk, # b_nmk=b_nmk, # x_n0=x_n0, # time_frame=tf # ) # # create a converter # cvt = Converter( # time_frame=tf, # dds=dds, # turn_key_cost=3, # inputs=inputs, # states=states, # outputs=outputs, # ) # # no sos, regular time intervals # ipp = self.build_solve_ipp( # solver_options={}, # perform_analysis=False, # plot_results=False, # True, # print_solver_output=False, # time_frame=tf, # networks={"mynet": mynet}, # converters={"mycvt": cvt}, # static_losses_mode=False, # mandatory_arcs=[], # max_number_parallel_arcs={}, # # init_aux_sets=init_aux_sets, # simplify_problem=False, # ) # assert not ipp.has_peak_total_assessments() # assert ipp.results["Problem"][0]["Number of constraints"] == 24 # assert ipp.results["Problem"][0]["Number of variables"] == 22 # assert ipp.results["Problem"][0]["Number of nonzeros"] == 49 # # ********************************************************************* # # ********************************************************************* # # validation # # if uC,M 1,q,0 = 0, then xC,N 1,q,1 = 17.1 # infeasible # # if uC,M 1,q,0 = 1, then xC,N 1,q,1 = 20.1. # only feasible option # # if uC,M 1,q,1 = 0, then xC,N 1,q,2 = 19.095 # only feasible option # # if uC,M 1,q,1 = 1, then xC,N 1,q,2 = 22.095 # infeasible # # if uC,M 1,q,2 = 0, then xC,N 1,q,3 = 18.14025 # feasible # # if uC,M 1,q,2 = 1, then xC,N 1,q,3 = 21.14025 # feasible # true_u_imqk = { # ('mycvt', 0, q, 0): 1, # ('mycvt', 0, q, 1): 0, # ('mycvt', 0, q, 2): 0, # could also be 1 # } # true_x_inqk = { # ('mycvt', 0, q, 0): 20.1, # ('mycvt', 0, q, 1): 19.095, # ('mycvt', 0, q, 2): 18.14025, # could also be 21.14025 # } # # check the inputs # for imqk, u in true_u_imqk.items(): # assert math.isclose( # pyo.value(ipp.instance.var_u_imqk[imqk]), # u, # abs_tol=1e-6, # ) # # check the states # for inqk, x in true_x_inqk.items(): # assert math.isclose( # pyo.value(ipp.instance.var_x_inqk[inqk]), # x, # abs_tol=1e-6, # ) # ************************************************************************* # ************************************************************************* # TODO: test non-simplifiable problems with time varying prices on select assessments # TODO: test non-simplifiable problems with volume varying prices on select assessments # ************************************************************************* # ************************************************************************* def test_problem_with_tree_network(self): # assessment q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,)}, reporting_period_durations={q: (365 * 24 * 3600,)}, time_intervals={q: (0,)}, time_interval_durations={q: (1,)}, ) # 2 nodes: one import, one regular mynet = Network(network_type=Network.NET_TYPE_TREE) # import node node_IMP = "thatimpnode" mynet.add_import_node( node_key=node_IMP, prices={ qpk: ResourcePrice(prices=1.0, volumes=None) for qpk in tf.qpk() }, ) # node A node_A = "thatnodea" mynet.add_source_sink_node( node_key=node_A, base_flow={(q, 0): 0.50}, ) # node B node_B = "thatnodeb" mynet.add_source_sink_node( node_key=node_B, base_flow={(q, 0): 0.25}, ) # node C node_C = "thatnodec" mynet.add_source_sink_node( node_key=node_C, base_flow={(q, 0): 1.25}, ) list_imp_arcs = [ (node_IMP, node_A), # IA (node_IMP, node_B), # IB (node_IMP, node_C), # IC ] for i, node_pair in enumerate(list_imp_arcs): # import arcs: IA, IB, IC new_arc = Arcs( name="IA", efficiency=None, efficiency_reverse=None, static_loss=None, capacity=[2], minimum_cost=[6], specific_capacity_cost=i, capacity_is_instantaneous=False, validate=False, ) mynet.add_directed_arc(*node_pair, arcs=new_arc) # arcs: AB, BA, BC, CB, AC, CA list_other_arcs = [ (node_A, node_B), # AB (node_B, node_A), # BA (node_B, node_C), # BC (node_C, node_B), # CB (node_A, node_C), # AC (node_C, node_A), # CA ] for node_pair in list_other_arcs: # arc new_arc_tech = Arcs( name="any", efficiency=None, efficiency_reverse=None, static_loss=None, capacity=[3], minimum_cost=[2], specific_capacity_cost=0, capacity_is_instantaneous=False, validate=False, ) mynet.add_directed_arc(*node_pair, arcs=new_arc_tech) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=True, # just to reach a line, mandatory_arcs=[], max_number_parallel_arcs={}, simplify_problem=True, ) assert ipp.has_peak_total_assessments() assert ipp.results["Problem"][0]["Number of constraints"] == 61 assert ipp.results["Problem"][0]["Number of variables"] == 53 assert ipp.results["Problem"][0]["Number of nonzeros"] == 143 # ********************************************************************* # ********************************************************************* # validation # only the IA arc should be installed true_imp_arcs_selected = [True, False, False] for node_pair, true_arc_decision in zip(list_imp_arcs, true_imp_arcs_selected): assert ( true_arc_decision in ipp.networks["mynet"] .edges[(*node_pair, 0)][Network.KEY_ARC_TECH] .options_selected ) # only two arcs between A, B and C can be installed arcs_selected = tuple( 1 for node_pair in list_other_arcs if True in ipp.networks["mynet"] .edges[(*node_pair, 0)][Network.KEY_ARC_TECH] .options_selected ) assert sum(arcs_selected) == 2 # the network must be tree-shaped assert ipp.networks["mynet"].has_tree_topology() # capex assert math.isclose(pyo.value(ipp.instance.var_capex), 10.0, abs_tol=1e-3) # the objective function assert math.isclose(pyo.value(ipp.instance.obj_f), -1.193236715e+01, abs_tol=1e-3) # ************************************************************************* # ************************************************************************* def test_problem_with_reverse_tree_network(self): # assessment q = 0 tf = EconomicTimeFrame( discount_rate=3.5/100, reporting_periods={q: (0,)}, reporting_period_durations={q: (365 * 24 * 3600,)}, time_intervals={q: (0,)}, time_interval_durations={q: (1,)}, ) # 2 nodes: one import, one regular mynet = Network(network_type=Network.NET_TYPE_REV_TREE) # export node node_EXP = "thatexpnode" mynet.add_export_node( node_key=node_EXP, prices={ qpk: ResourcePrice(prices=1.0, volumes=None) for qpk in tf.qpk() }, ) # node A node_A = "thatnodea" mynet.add_source_sink_node( node_key=node_A, base_flow={(q, 0): -0.50}, ) # node B node_B = "thatnodeb" mynet.add_source_sink_node( node_key=node_B, base_flow={(q, 0): -0.25}, ) # node C node_C = "thatnodec" mynet.add_source_sink_node( node_key=node_C, base_flow={(q, 0): -1.25}, ) list_exp_arcs = [ (node_A, node_EXP), # AE (node_B, node_EXP), # BE (node_C, node_EXP), # CE ] for i, node_pair in enumerate(list_exp_arcs): # import arcs: AE, BE, CE new_arc = Arcs( name="arc_"+str(node_pair), efficiency=None, efficiency_reverse=None, static_loss=None, capacity=[2], minimum_cost=[6], specific_capacity_cost=i, capacity_is_instantaneous=False, validate=False, ) mynet.add_directed_arc(*node_pair, arcs=new_arc) # arcs: AB, BA, BC, CB, AC, CA list_other_arcs = [ (node_A, node_B), # AB (node_B, node_A), # BA (node_B, node_C), # BC (node_C, node_B), # CB (node_A, node_C), # AC (node_C, node_A), # CA ] for node_pair in list_other_arcs: # arc new_arc_tech = Arcs( name="any", efficiency=None, efficiency_reverse=None, static_loss=None, capacity=[3], minimum_cost=[2], specific_capacity_cost=0, capacity_is_instantaneous=False, validate=False, ) mynet.add_directed_arc(*node_pair, arcs=new_arc_tech) # identify node types mynet.identify_node_types() # no sos, regular time intervals ipp = self.build_solve_ipp( solver_options={}, perform_analysis=False, plot_results=False, # True, print_solver_output=False, time_frame=tf, networks={"mynet": mynet}, static_losses_mode=True, # just to reach a line, mandatory_arcs=[], max_number_parallel_arcs={}, simplify_problem=True, ) assert ipp.has_peak_total_assessments() assert ipp.results["Problem"][0]["Number of constraints"] == 61 assert ipp.results["Problem"][0]["Number of variables"] == 53 assert ipp.results["Problem"][0]["Number of nonzeros"] == 143 # # ********************************************************************* # ********************************************************************* # validation # only the AE arc should be installed true_exp_arcs_selected = [True, False, False] for node_pair, true_arc_decision in zip(list_exp_arcs, true_exp_arcs_selected): assert ( true_arc_decision in ipp.networks["mynet"] .edges[(*node_pair, 0)][Network.KEY_ARC_TECH] .options_selected ) # only two arcs between A, B and C can be installed arcs_selected = tuple( 1 for node_pair in list_other_arcs if True in ipp.networks["mynet"] .edges[(*node_pair, 0)][Network.KEY_ARC_TECH] .options_selected ) assert sum(arcs_selected) == 2 # the network must be tree-shaped assert ipp.networks["mynet"].has_tree_topology() # capex assert math.isclose(pyo.value(ipp.instance.var_capex), 10.0, abs_tol=1e-3) # the objective function assert math.isclose(pyo.value(ipp.instance.obj_f), -(10+(-11.93236715+10)), abs_tol=1e-3) # ***************************************************************************** # *****************************************************************************