Skip to content
Snippets Groups Projects
test_esipp_prices.py 37 KiB
Newer Older
  • Learn to ignore specific revisions
  • Pedro L. Magalhães's avatar
    Pedro L. Magalhães committed
    # imports
    
    # standard
    import math
    
    Pedro L. Magalhães's avatar
    Pedro L. Magalhães committed
    
    # local
    # import numpy as np
    # import networkx as nx
    import pyomo.environ as pyo
    
    
    Pedro L. Magalhães's avatar
    Pedro L. Magalhães committed
    # import src.topupopt.problems.esipp.utils as utils
    from src.topupopt.data.misc.utils import generate_pseudo_unique_key
    from src.topupopt.problems.esipp.problem import InfrastructurePlanningProblem
    from src.topupopt.problems.esipp.network import Arcs, Network
    from src.topupopt.problems.esipp.resource import ResourcePrice
    # from src.topupopt.problems.esipp.utils import compute_cost_volume_metrics
    from src.topupopt.problems.esipp.utils import statistics
    from src.topupopt.problems.esipp.time import EconomicTimeFrame
    # from src.topupopt.problems.esipp.converter import Converter
    
    # *****************************************************************************
    # *****************************************************************************
    
    class TestESIPPProblem:
        
        solver = 'glpk'
        # solver = 'scip'
        # solver = 'cbc'
        
        def build_solve_ipp(
            self,
            solver: str = None,
            solver_options: dict = None,
            use_sos_arcs: bool = False,
            arc_sos_weight_key: str = (InfrastructurePlanningProblem.SOS1_ARC_WEIGHTS_NONE),
            arc_use_real_variables_if_possible: bool = False,
            use_sos_sense: bool = False,
            sense_sos_weight_key: int = (
                InfrastructurePlanningProblem.SOS1_SENSE_WEIGHT_NOMINAL_HIGHER
            ),
            sense_use_real_variables_if_possible: bool = False,
            sense_use_arc_interfaces: bool = False,
            perform_analysis: bool = False,
            plot_results: bool = False,
            print_solver_output: bool = False,
            time_frame: EconomicTimeFrame = None,
            networks: dict = None,
            converters: dict = None,
            static_losses_mode=None,
            mandatory_arcs: list = None,
            max_number_parallel_arcs: dict = None,
            arc_groups_dict: dict = None,
            init_aux_sets: bool = False,
            # discount_rates: dict = None,
            assessment_weights: dict = None,
            simplify_problem: bool = False,
    
            use_prices_block: bool = False
    
    Pedro L. Magalhães's avatar
    Pedro L. Magalhães committed
        ):
            if type(solver) == type(None):
                solver = self.solver
            
            if type(assessment_weights) != dict:
                assessment_weights = {}  # default
    
            if type(converters) != dict:
                converters = {}
                
            # time weights
    
            # relative weight of time period
    
            # one interval twice as long as the average is worth twice
            # one interval half as long as the average is worth half
    
            # time_weights = [
            #     [time_period_duration/average_time_interval_duration
            #       for time_period_duration in intraperiod_time_interval_duration]
            #     for p in range(number_periods)]
    
            time_weights = None  # nothing yet
    
            normalised_time_interval_duration = None  # nothing yet
    
            # create problem object
    
            ipp = InfrastructurePlanningProblem(
                # discount_rates=discount_rates,
                time_frame=time_frame,
                # reporting_periods=time_frame.reporting_periods,
                # time_intervals=time_frame.time_interval_durations,
                time_weights=time_weights,
                normalised_time_interval_duration=normalised_time_interval_duration,
                assessment_weights=assessment_weights,
    
                use_prices_block=use_prices_block
    
    Pedro L. Magalhães's avatar
    Pedro L. Magalhães committed
            )
    
            # add networks and systems
    
            for netkey, net in networks.items():
                ipp.add_network(network_key=netkey, network=net)
    
            # add converters
    
            for cvtkey, cvt in converters.items():
                ipp.add_converter(converter_key=cvtkey, converter=cvt)
    
            # define arcs as mandatory
    
            if type(mandatory_arcs) == list:
                for full_arc_key in mandatory_arcs:
                    ipp.make_arc_mandatory(full_arc_key[0], full_arc_key[1:])
    
            # if make_all_arcs_mandatory:
    
            #     for network_key in ipp.networks:
    
            #         for arc_key in ipp.networks[network_key].edges(keys=True):
    
            #             # preexisting arcs are no good
    
            #             if ipp.networks[network_key].edges[arc_key][
            #                     Network.KEY_ARC_TECH].has_been_selected():
    
            #                 continue
    
            #             ipp.make_arc_mandatory(network_key, arc_key)
    
            # set up the use of sos for arc selection
    
            if use_sos_arcs:
                for network_key in ipp.networks:
                    for arc_key in ipp.networks[network_key].edges(keys=True):
                        if (
                            ipp.networks[network_key]
                            .edges[arc_key][Network.KEY_ARC_TECH]
                            .has_been_selected()
                        ):
                            continue
    
                        ipp.use_sos1_for_arc_selection(
                            network_key,
                            arc_key,
                            use_real_variables_if_possible=(
                                arc_use_real_variables_if_possible
                            ),
                            sos1_weight_method=arc_sos_weight_key,
                        )
    
            # set up the use of sos for flow sense determination
    
            if use_sos_sense:
                for network_key in ipp.networks:
                    for arc_key in ipp.networks[network_key].edges(keys=True):
                        if not ipp.networks[network_key].edges[arc_key][
                            Network.KEY_ARC_UND
                        ]:
                            continue
    
                        ipp.use_sos1_for_flow_senses(
                            network_key,
                            arc_key,
                            use_real_variables_if_possible=(
                                sense_use_real_variables_if_possible
                            ),
                            use_interface_variables=sense_use_arc_interfaces,
                            sos1_weight_method=sense_sos_weight_key,
                        )
    
            elif sense_use_arc_interfaces:  # set up the use of arc interfaces w/o sos1
                for network_key in ipp.networks:
                    for arc_key in ipp.networks[network_key].edges(keys=True):
                        if (
                            ipp.networks[network_key]
                            .edges[arc_key][Network.KEY_ARC_TECH]
                            .has_been_selected()
                        ):
                            continue
    
                        ipp.use_interface_variables_for_arc_selection(network_key, arc_key)
    
            # static losses
    
            if static_losses_mode == ipp.STATIC_LOSS_MODE_ARR:
                ipp.place_static_losses_arrival_node()
    
            elif static_losses_mode == ipp.STATIC_LOSS_MODE_DEP:
                ipp.place_static_losses_departure_node()
    
            elif static_losses_mode == ipp.STATIC_LOSS_MODE_US:
                ipp.place_static_losses_upstream()
    
            elif static_losses_mode == ipp.STATIC_LOSS_MODE_DS:
                ipp.place_static_losses_downstream()
    
            else:
                raise ValueError("Unknown static loss modelling mode.")
    
            # *********************************************************************
    
            # groups
    
            if type(arc_groups_dict) != type(None):
                for key in arc_groups_dict:
                    ipp.create_arc_group(arc_groups_dict[key])
    
            # *********************************************************************
    
            # maximum number of parallel arcs
    
            for key in max_number_parallel_arcs:
                ipp.set_maximum_number_parallel_arcs(
                    network_key=key[0],
                    node_a=key[1],
                    node_b=key[2],
                    limit=max_number_parallel_arcs[key],
                )
    
            # *********************************************************************
    
            if simplify_problem:
                ipp.simplify_peak_total_assessments()
    
            # *********************************************************************
            
            # instantiate (disable the default case v-a-v fixed losses)
    
            # ipp.instantiate(place_fixed_losses_upstream_if_possible=False)
    
            ipp.instantiate(initialise_ancillary_sets=init_aux_sets)
            # ipp.instance.pprint()
            # optimise
            ipp.optimise(
                solver_name=solver,
                solver_options=solver_options,
                output_options={},
                print_solver_output=print_solver_output,
            )
            # ipp.instance.pprint()
            # return the problem object
            return ipp
    
            # *********************************************************************
            # *********************************************************************
    
        # *************************************************************************
        # *************************************************************************
    
    
        @pytest.mark.parametrize("use_prices_block", [True, False])
        def test_problem_increasing_imp_prices(self, use_prices_block):
    
    Pedro L. Magalhães's avatar
    Pedro L. Magalhães committed
            
            # assessment
            q = 0
    
            tf = EconomicTimeFrame(
                discount_rate=0.0,
                reporting_periods={q: (0,)},
                reporting_period_durations={q: (365 * 24 * 3600,)},
                time_intervals={q: (0,)},
                time_interval_durations={q: (1,)},
            )
    
            # 2 nodes: one import, one regular
            mynet = Network()
    
            # import node
            node_IMP = 'I'
            mynet.add_import_node(
                node_key=node_IMP,
                prices={
                    qpk: ResourcePrice(prices=[1.0, 2.0], volumes=[0.5, None])
                    for qpk in tf.qpk()
                },
            )
    
            # other nodes
            node_A = 'A'
            mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 1.0})
    
            # arc IA
            arc_tech_IA = Arcs(
                name="any",
                efficiency={(q, 0): 0.5},
                efficiency_reverse=None,
                static_loss=None,
                capacity=[3],
                minimum_cost=[2],
                specific_capacity_cost=1,
                capacity_is_instantaneous=False,
                validate=False,
            )
            mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)
    
            # no sos, regular time intervals
            ipp = self.build_solve_ipp(
                solver_options={},
                perform_analysis=False,
                plot_results=False,  # True,
                print_solver_output=False,
                time_frame=tf,
                networks={"mynet": mynet},
                static_losses_mode=True,  # just to reach a line,
                mandatory_arcs=[],
                max_number_parallel_arcs={},
    
                simplify_problem=False,
                use_prices_block=use_prices_block
    
    Pedro L. Magalhães's avatar
    Pedro L. Magalhães committed
            )
    
            assert not ipp.has_peak_total_assessments()
            assert ipp.results["Problem"][0]["Number of constraints"] == 10
            assert ipp.results["Problem"][0]["Number of variables"] == 11
            assert ipp.results["Problem"][0]["Number of nonzeros"] == 20
    
            # *********************************************************************
            # *********************************************************************
    
            # validation
    
            # the arc should be installed since it is required for feasibility
            assert (
                True
                in ipp.networks["mynet"]
                .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
                .options_selected
            )
    
            # the flows should be 1.0, 0.0 and 2.0
            assert math.isclose(
                pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]),
                2.0,
                abs_tol=1e-6,
            )
    
            # arc amplitude should be two
            assert math.isclose(
                pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]),
                2.0,
                abs_tol=0.01,
            )
    
            # capex should be four
            assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3)
    
            # sdncf should be -3.5
            assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -3.5, abs_tol=1e-3)
    
            # the objective function should be -7.5
            assert math.isclose(pyo.value(ipp.instance.obj_f), -7.5, abs_tol=1e-3)
            
        # *************************************************************************
        # *************************************************************************
    
    
        @pytest.mark.parametrize("use_prices_block", [True, False])
        def test_problem_decreasing_imp_prices(self, use_prices_block):
    
    Pedro L. Magalhães's avatar
    Pedro L. Magalhães committed
            
            # assessment
            q = 0
    
            tf = EconomicTimeFrame(
                discount_rate=0.0,
                reporting_periods={q: (0,)},
                reporting_period_durations={q: (365 * 24 * 3600,)},
                time_intervals={q: (0,)},
                time_interval_durations={q: (1,)},
            )
    
            # 2 nodes: one import, one regular
            mynet = Network()
    
            # import node
            node_IMP = 'I'
            mynet.add_import_node(
                node_key=node_IMP,
                prices={
                    qpk: ResourcePrice(prices=[2.0, 1.0], volumes=[0.5, 3.0])
                    for qpk in tf.qpk()
                },
            )
    
            # other nodes
            node_A = 'A'
            mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 1.0})
    
            # arc IA
            arc_tech_IA = Arcs(
                name="any",
                efficiency={(q, 0): 0.5},
                efficiency_reverse=None,
                static_loss=None,
                capacity=[3],
                minimum_cost=[2],
                specific_capacity_cost=1,
                capacity_is_instantaneous=False,
                validate=False,
            )
            mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)
    
            # no sos, regular time intervals
            ipp = self.build_solve_ipp(
                solver_options={},
                perform_analysis=False,
                plot_results=False,  # True,
                print_solver_output=False, 
                time_frame=tf,
                networks={"mynet": mynet},
                static_losses_mode=True,  # just to reach a line,
                mandatory_arcs=[],
                max_number_parallel_arcs={},
    
                simplify_problem=False,
                use_prices_block=use_prices_block
    
    Pedro L. Magalhães's avatar
    Pedro L. Magalhães committed
            )
    
            assert not ipp.has_peak_total_assessments()
            assert ipp.results["Problem"][0]["Number of constraints"] == 14 # 10 prior to nonconvex block
            assert ipp.results["Problem"][0]["Number of variables"] == 13 # 11 prior to nonconvex block
            assert ipp.results["Problem"][0]["Number of nonzeros"] == 28 # 20 prior to nonconvex block
    
            # *********************************************************************
            # *********************************************************************
    
            # validation
    
            # the arc should be installed since it is required for feasibility
            assert (
                True
                in ipp.networks["mynet"]
                .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
                .options_selected
            )
    
            # the flows should be 1.0, 0.0 and 2.0
            assert math.isclose(
                pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]),
                2.0,
                abs_tol=1e-6,
            )
    
            # arc amplitude should be two
            assert math.isclose(
                pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]),
                2.0,
                abs_tol=0.01,
            )
    
            # capex should be four
            assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3)
    
            # sdncf should be -2.5
            assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -2.5, abs_tol=1e-3)
    
            # the objective function should be -7.5
            assert math.isclose(pyo.value(ipp.instance.obj_f), -6.5, abs_tol=1e-3)
                    
        # *************************************************************************
        # *************************************************************************
    
    
        @pytest.mark.parametrize("use_prices_block", [True, False])
        def test_problem_decreasing_imp_prices_infinite_capacity(self, use_prices_block):
    
    Pedro L. Magalhães's avatar
    Pedro L. Magalhães committed
            
            # assessment
            q = 0
    
            tf = EconomicTimeFrame(
                discount_rate=0.0,
                reporting_periods={q: (0,)},
                reporting_period_durations={q: (365 * 24 * 3600,)},
                time_intervals={q: (0,)},
                time_interval_durations={q: (1,)},
            )
    
            # 2 nodes: one import, one regular
            mynet = Network()
    
            # import node
            node_IMP = 'I'
            mynet.add_import_node(
                node_key=node_IMP,
                prices={
                    qpk: ResourcePrice(prices=[2.0, 1.0], volumes=[0.5, None])
                    for qpk in tf.qpk()
                },
            )
    
            # other nodes
            node_A = 'A'
            mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 1.0})
    
            # arc IA
            arc_tech_IA = Arcs(
                name="any",
                efficiency={(q, 0): 0.5},
                efficiency_reverse=None,
                static_loss=None,
                capacity=[3],
                minimum_cost=[2],
                specific_capacity_cost=1,
                capacity_is_instantaneous=False,
                validate=False,
            )
            mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)
            
            # trigger the error
            error_raised = False
            try:
                # no sos, regular time intervals
                self.build_solve_ipp(
                    solver_options={},
                    perform_analysis=False,
                    plot_results=False,  # True,
                    print_solver_output=False,
                    time_frame=tf,
                    networks={"mynet": mynet},
                    static_losses_mode=True,  # just to reach a line,
                    mandatory_arcs=[],
                    max_number_parallel_arcs={},
                    simplify_problem=False,
                )
            except Exception:
                error_raised = True
            assert error_raised
    
        # *************************************************************************
        # *************************************************************************
    
    
        @pytest.mark.parametrize("use_prices_block", [True, False])
        def test_problem_decreasing_exp_prices(self, use_prices_block):
    
    Pedro L. Magalhães's avatar
    Pedro L. Magalhães committed
            # assessment
            q = 0
            # time
            number_intervals = 1
            # periods
            number_periods = 1
    
            tf = EconomicTimeFrame(
                discount_rate=0.0,
                reporting_periods={q: (0,)},
                reporting_period_durations={q: (365 * 24 * 3600,)},
                time_intervals={q: (0,)},
                time_interval_durations={q: (1,)},
            )
    
            # 2 nodes: one export, one regular
            mynet = Network()
    
            # import node
            node_EXP = generate_pseudo_unique_key(mynet.nodes())
            mynet.add_export_node(
                node_key=node_EXP,
                prices={
                    (q, p, k): ResourcePrice(prices=[2.0, 1.0], volumes=[0.5, None])
                    for p in range(number_periods)
                    for k in range(number_intervals)
                },
            )
    
            # other nodes
            node_A = 'A'
            mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): -1.0})
    
            # arc IA
            arc_tech_IA = Arcs(
                name="any",
                efficiency={(q, 0): 0.5},
                efficiency_reverse=None,
                static_loss=None,
                capacity=[3],
                minimum_cost=[2],
                specific_capacity_cost=1,
                capacity_is_instantaneous=False,
                validate=False,
            )
            mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_EXP, arcs=arc_tech_IA)
    
            # no sos, regular time intervals
            ipp = self.build_solve_ipp(
                solver_options={},
                perform_analysis=False,
                plot_results=False,  # True,
                print_solver_output=False,
                time_frame=tf,
                networks={"mynet": mynet},
                static_losses_mode=True,  # just to reach a line,
                mandatory_arcs=[],
                max_number_parallel_arcs={},
                simplify_problem=False,
            )
    
            assert not ipp.has_peak_total_assessments()
            assert ipp.results["Problem"][0]["Number of constraints"] == 10
            assert ipp.results["Problem"][0]["Number of variables"] == 11
            assert ipp.results["Problem"][0]["Number of nonzeros"] == 20
    
            # *********************************************************************
            # *********************************************************************
    
            # validation
    
            # the arc should be installed since it is required for feasibility
            assert (
                True
                in ipp.networks["mynet"]
                .edges[(node_A, node_EXP, 0)][Network.KEY_ARC_TECH]
                .options_selected
            )
    
            # the flows should be 1.0, 0.0 and 2.0
            assert math.isclose(
                pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 0)]),
                1.0,
                abs_tol=1e-6,
            )
    
            # arc amplitude should be two
            assert math.isclose(
                pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_A, node_EXP, 0)]),
                1.0,
                abs_tol=0.01,
            )
    
            # capex should be four
            assert math.isclose(pyo.value(ipp.instance.var_capex), 3.0, abs_tol=1e-3)
    
            # sdncf should be 1.0
            assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), 1.0, abs_tol=1e-3)
    
            # the objective function should be -7.5
            assert math.isclose(pyo.value(ipp.instance.obj_f), -2.0, abs_tol=1e-3)
            
        # *************************************************************************
        # *************************************************************************
    
    
        @pytest.mark.parametrize("use_prices_block", [True, False])
        def test_problem_increasing_exp_prices(self, use_prices_block):
    
    Pedro L. Magalhães's avatar
    Pedro L. Magalhães committed
            # assessment
            q = 0
            # time
            number_intervals = 1
            # periods
            number_periods = 1
    
            tf = EconomicTimeFrame(
                discount_rate=0.0,
                reporting_periods={q: (0,)},
                reporting_period_durations={q: (365 * 24 * 3600,)},
                time_intervals={q: (0,)},
                time_interval_durations={q: (1,)},
            )
    
            # 2 nodes: one export, one regular
            mynet = Network()
    
            # import node
            node_EXP = generate_pseudo_unique_key(mynet.nodes())
            mynet.add_export_node(
                node_key=node_EXP,
                prices={
                    (q, p, k): ResourcePrice(prices=[1.0, 2.0], volumes=[0.25, 3.0])
                    for p in range(number_periods)
                    for k in range(number_intervals)
                },
            )
    
            # other nodes
            node_A = 'A'
            mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): -1.0})
    
            # arc IA
            arc_tech_IA = Arcs(
                name="any",
                efficiency={(q, 0): 0.5},
                efficiency_reverse=None,
                static_loss=None,
                capacity=[3],
                minimum_cost=[2],
                specific_capacity_cost=1,
                capacity_is_instantaneous=False,
                validate=False,
            )
            mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_EXP, arcs=arc_tech_IA)
    
            # no sos, regular time intervals
            ipp = self.build_solve_ipp(
                solver_options={},
                perform_analysis=False,
                plot_results=False,  # True,
                print_solver_output=False,
                time_frame=tf,
                networks={"mynet": mynet},
                static_losses_mode=True,  # just to reach a line,
                mandatory_arcs=[],
                max_number_parallel_arcs={},
                simplify_problem=False,
            )
    
    Pedro L. Magalhães's avatar
    Pedro L. Magalhães committed
            assert not ipp.has_peak_total_assessments()
            assert ipp.results["Problem"][0]["Number of constraints"] == 14 # 10 before nonconvex block
            assert ipp.results["Problem"][0]["Number of variables"] == 13 # 11 before nonconvex block
            assert ipp.results["Problem"][0]["Number of nonzeros"] == 28 # 20 before nonconvex block
    
            # *********************************************************************
            # *********************************************************************
    
            # validation
    
            # the arc should be installed since it is required for feasibility
            assert (
                True
                in ipp.networks["mynet"]
                .edges[(node_A, node_EXP, 0)][Network.KEY_ARC_TECH]
                .options_selected
            )
    
            # the flows should be 1.0, 0.0 and 2.0
            assert math.isclose(
                pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 0)]),
                1.0,
                abs_tol=1e-6,
            )
    
            # arc amplitude should be two
            assert math.isclose(
                pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_A, node_EXP, 0)]),
                1.0,
                abs_tol=0.01,
            )
    
            # capex should be four
            assert math.isclose(pyo.value(ipp.instance.var_capex), 3.0, abs_tol=1e-3)
    
            # sdncf should be 0.75
            assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), 0.75, abs_tol=1e-3)
    
            # the objective function should be -2.25
            assert math.isclose(pyo.value(ipp.instance.obj_f), -2.25, abs_tol=1e-3)
                    
        # *************************************************************************
        # *************************************************************************
    
    
        @pytest.mark.parametrize("use_prices_block", [True, False])
        def test_problem_increasing_exp_prices_infinite_capacity(self, use_prices_block):
    
    Pedro L. Magalhães's avatar
    Pedro L. Magalhães committed
            # assessment
            q = 0
            # time
            number_intervals = 1
            # periods
            number_periods = 1
    
            tf = EconomicTimeFrame(
                discount_rate=0.0,
                reporting_periods={q: (0,)},
                reporting_period_durations={q: (365 * 24 * 3600,)},
                time_intervals={q: (0,)},
                time_interval_durations={q: (1,)},
            )
    
            # 2 nodes: one export, one regular
            mynet = Network()
    
            # import node
            node_EXP = generate_pseudo_unique_key(mynet.nodes())
            mynet.add_export_node(
                node_key=node_EXP,
                prices={
                    (q, p, k): ResourcePrice(prices=[1.0, 2.0], volumes=[0.25, None])
                    for p in range(number_periods)
                    for k in range(number_intervals)
                },
            )
    
            # other nodes
            node_A = 'A'
            mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): -1.0})
    
            # arc IA
            arc_tech_IA = Arcs(
                name="any",
                efficiency={(q, 0): 0.5},
                efficiency_reverse=None,
                static_loss=None,
                capacity=[3],
                minimum_cost=[2],
                specific_capacity_cost=1,
                capacity_is_instantaneous=False,
                validate=False,
            )
            mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_EXP, arcs=arc_tech_IA)
            
            # trigger the error
            error_raised = False
            try:
                # no sos, regular time intervals
                self.build_solve_ipp(
                    solver_options={},
                    perform_analysis=False,
                    plot_results=False,  # True,
                    print_solver_output=False,
                    time_frame=tf,
                    networks={"mynet": mynet},
                    static_losses_mode=True,  # just to reach a line,
                    mandatory_arcs=[],
                    max_number_parallel_arcs={},
                    simplify_problem=False,
    
                    use_prices_block=use_prices_block
    
    Pedro L. Magalhães's avatar
    Pedro L. Magalhães committed
                )
            except Exception:
                error_raised = True
            assert error_raised
    
        # *************************************************************************
        # *************************************************************************
    
    
        @pytest.mark.parametrize("use_prices_block", [True, False])
        def test_problem_increasing_imp_decreasing_exp_prices(self, use_prices_block):
    
    Pedro L. Magalhães's avatar
    Pedro L. Magalhães committed
            # scenario
            q = 0
            # time
            number_intervals = 2
            # periods
            number_periods = 1
    
            tf = EconomicTimeFrame(
                discount_rate=0.0,
                reporting_periods={q: (0,)},
                reporting_period_durations={q: (365 * 24 * 3600,)},
                time_intervals={q: (0,1)},
                time_interval_durations={q: (1,1)},
            )
    
            # 3 nodes: one import, one export, one regular
            mynet = Network()
    
            # import node
            node_IMP = 'I'
            mynet.add_import_node(
                node_key=node_IMP,
                prices={
                    (q, p, k): ResourcePrice(prices=[1.0, 2.0], volumes=[0.5, None])
                    for p in range(number_periods)
                    for k in range(number_intervals)
                },
            )
    
            # export node
            node_EXP = generate_pseudo_unique_key(mynet.nodes())
            mynet.add_export_node(
                node_key=node_EXP,
                prices={
                    (q, p, k): ResourcePrice(prices=[2.0, 1.0], volumes=[0.5, None])
                    for p in range(number_periods)
                    for k in range(number_intervals)
                },
            )
    
            # other nodes
            node_A = 'A'
            mynet.add_source_sink_node(
                node_key=node_A, base_flow={(q, 0): 1.0, (q, 1): -1.0}
            )
    
            # arc IA
            arc_tech_IA = Arcs(
                name="any",
                efficiency={(q, 0): 0.5, (q, 1): 0.5},
                efficiency_reverse=None,
                static_loss=None,
                capacity=[3],
                minimum_cost=[2],
                specific_capacity_cost=1,
                capacity_is_instantaneous=False,
                validate=False,
            )
            mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)
    
            # arc AE
            arc_tech_AE = Arcs(
                name="any",
                efficiency={(q, 0): 0.5, (q, 1): 0.5},
                efficiency_reverse=None,
                static_loss=None,
                capacity=[3],
                minimum_cost=[2],
                specific_capacity_cost=1,
                capacity_is_instantaneous=False,
                validate=False,
            )
            mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_EXP, arcs=arc_tech_AE)
    
            # no sos, regular time intervals
            ipp = self.build_solve_ipp(
                solver_options={},
                perform_analysis=False,
                plot_results=False,  # True,
                print_solver_output=False,
                time_frame=tf,
                networks={"mynet": mynet},
                static_losses_mode=True,  # just to reach a line,
                mandatory_arcs=[],
                max_number_parallel_arcs={},
                simplify_problem=False,
                # discount_rates={0: (0.0,)},
    
                use_prices_block=use_prices_block
    
    Pedro L. Magalhães's avatar
    Pedro L. Magalhães committed
            )
    
            assert not ipp.has_peak_total_assessments()
            assert ipp.results["Problem"][0]["Number of constraints"] == 23
            assert ipp.results["Problem"][0]["Number of variables"] == 26
            assert ipp.results["Problem"][0]["Number of nonzeros"] == 57
    
            # *********************************************************************
            # *********************************************************************
    
            # validation
    
            # the arc should be installed since it is required for feasibility
            assert (
                True
                in ipp.networks["mynet"]
                .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
                .options_selected
            )
            # the arc should be installed since it is required for feasibility
            assert (
                True
                in ipp.networks["mynet"]
                .edges[(node_A, node_EXP, 0)][Network.KEY_ARC_TECH]
                .options_selected
            )
    
            # interval 0: import only
            assert math.isclose(
                pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]),
                2.0,
                abs_tol=1e-6,
            )
            assert math.isclose(
                pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 0)]),
                0.0,
                abs_tol=1e-6,
            )
            # interval 1: export only
            assert math.isclose(
                pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 1)]),
                0.0,
                abs_tol=1e-6,
            )
            assert math.isclose(
                pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 1)]),
                1.0,
                abs_tol=1e-6,
            )
    
            # IA amplitude
            assert math.isclose(
                pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]),
                2.0,
                abs_tol=0.01,
            )
            # AE amplitude
            assert math.isclose(
                pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_A, node_EXP, 0)]),
                1.0,
                abs_tol=0.01,
            )
    
            # capex should be 7.0: 4+3
            assert math.isclose(pyo.value(ipp.instance.var_capex), 7.0, abs_tol=1e-3)
    
            # sdncf should be -2.5: -3.5+1.0
            assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -2.5, abs_tol=1e-3)
    
            # the objective function should be -9.5: -7.5-2.5
            assert math.isclose(pyo.value(ipp.instance.obj_f), -9.5, abs_tol=1e-3)
    
                
        # *************************************************************************
        # *************************************************************************
    
        
        @pytest.mark.parametrize("use_prices_block", [True, False])
        def test_direct_imp_exp_network_higher_exp_prices(self, use_prices_block):
    
    Pedro L. Magalhães's avatar
    Pedro L. Magalhães committed
            
            # time frame
            q = 0
            tf = EconomicTimeFrame(
                discount_rate=3.5/100,
                reporting_periods={q: (0,1)},
                reporting_period_durations={q: (365 * 24 * 3600,365 * 24 * 3600)},
                time_intervals={q: (0,1)},
                time_interval_durations={q: (1,1)},
            )    
            
            # 4 nodes: one import, one export, two supply/demand nodes
            mynet = Network()
        
            # import node
            imp_node_key = 'thatimpnode'
            imp_prices = {
                qpk: ResourcePrice(
                    prices=0.5,
                    volumes=None,