Skip to content
Snippets Groups Projects
test_esipp_prices.py 39.2 KiB
Newer Older
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
# imports

# standard
import math
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed

# local
# import numpy as np
# import networkx as nx
import pyomo.environ as pyo

Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
# import src.topupopt.problems.esipp.utils as utils
from src.topupopt.data.misc.utils import generate_pseudo_unique_key
from src.topupopt.problems.esipp.problem import InfrastructurePlanningProblem
from src.topupopt.problems.esipp.network import Arcs, Network
from src.topupopt.problems.esipp.resource import ResourcePrice
# from src.topupopt.problems.esipp.utils import compute_cost_volume_metrics
from src.topupopt.problems.esipp.utils import statistics
from src.topupopt.problems.esipp.time import EconomicTimeFrame
# from src.topupopt.problems.esipp.converter import Converter
from src.topupopt.problems.esipp.blocks.prices import NODE_PRICE_OTHER, NODE_PRICE_DELTA, NODE_PRICE_LAMBDA
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed

# *****************************************************************************
# *****************************************************************************

# TODO: test time-varying tariffs (convex/non-convex)
# TODO: check problem sizes

Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
class TestESIPPProblem:

    # *************************************************************************
    # *************************************************************************

    @pytest.mark.parametrize(
        "use_prices_block, node_price_model", 
        [(True, NODE_PRICE_OTHER),
         (True, NODE_PRICE_DELTA),
         (True, NODE_PRICE_LAMBDA),
         (False, NODE_PRICE_OTHER),
         (False, NODE_PRICE_DELTA),
         (False, NODE_PRICE_LAMBDA)]
        )
    def test_problem_increasing_imp_prices(self, use_prices_block, node_price_model):
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        
        # assessment
        q = 0

        tf = EconomicTimeFrame(
            discount_rate=0.0,
            reporting_periods={q: (0,)},
            reporting_period_durations={q: (365 * 24 * 3600,)},
            time_intervals={q: (0,)},
            time_interval_durations={q: (1,)},
        )

        # 2 nodes: one import, one regular
        mynet = Network()

        # import node
        node_IMP = 'I'
        prices = [1.0, 2.0]
        volumes = [0.5, None] if node_price_model == NODE_PRICE_OTHER else [0.5, 1e5]
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        mynet.add_import_node(
            node_key=node_IMP,
            prices={
                qpk: ResourcePrice(prices=prices, volumes=volumes)
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
                for qpk in tf.qpk()
            },
        )

        # other nodes
        node_A = 'A'
        mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 1.0})

        # arc IA
        arc_tech_IA = Arcs(
            name="any",
            efficiency={(q, 0): 0.5},
            efficiency_reverse=None,
            static_loss=None,
            capacity=[3],
            minimum_cost=[2],
            specific_capacity_cost=1,
            capacity_is_instantaneous=False,
            validate=False,
        )
        mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)

        # no sos, regular time intervals
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            solver_options={},
            perform_analysis=False,
            plot_results=False,  # True,
            print_solver_output=False,
            time_frame=tf,
            networks={"mynet": mynet},
            static_losses_mode=True,  # just to reach a line,
            mandatory_arcs=[],
            max_number_parallel_arcs={},
            use_prices_block=use_prices_block,
            node_price_model=node_price_model
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        )

        assert not ipp.has_peak_total_assessments()
        # print('hey')
        # print((use_prices_block, node_price_model))
        # print(ipp.results["Problem"][0])
        if (use_prices_block, node_price_model) == (True, NODE_PRICE_OTHER):
            assert ipp.results["Problem"][0]["Number of constraints"] == 10
            assert ipp.results["Problem"][0]["Number of variables"] == 11
            assert ipp.results["Problem"][0]["Number of nonzeros"] == 20
        elif (use_prices_block, node_price_model) == (False, NODE_PRICE_OTHER):
            assert ipp.results["Problem"][0]["Number of constraints"] == 10
            assert ipp.results["Problem"][0]["Number of variables"] == 11
            assert ipp.results["Problem"][0]["Number of nonzeros"] == 20
        elif (use_prices_block, node_price_model) == (True, NODE_PRICE_DELTA):
            assert ipp.results["Problem"][0]["Number of constraints"] == 11
            assert ipp.results["Problem"][0]["Number of variables"] == 12
            assert ipp.results["Problem"][0]["Number of nonzeros"] == 22
        elif (use_prices_block, node_price_model) == (False, NODE_PRICE_DELTA):
            assert ipp.results["Problem"][0]["Number of constraints"] == 15
            assert ipp.results["Problem"][0]["Number of variables"] == 14
            assert ipp.results["Problem"][0]["Number of nonzeros"] == 30
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed

        # *********************************************************************
        # *********************************************************************

        # validation

        # the arc should be installed since it is required for feasibility
        assert (
            True
            in ipp.networks["mynet"]
            .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
            .options_selected
        )

        # the flows should be 1.0, 0.0 and 2.0
        assert math.isclose(
            pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]),
            2.0,
            abs_tol=1e-6,
        )

        # arc amplitude should be two
        assert math.isclose(
            pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]),
            2.0,
            abs_tol=0.01,
        )

        # capex should be four
        assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3)

        # sdncf should be -3.5
        assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -3.5, abs_tol=1e-3)

        # the objective function should be -7.5
        assert math.isclose(pyo.value(ipp.instance.obj_f), -7.5, abs_tol=1e-3)
        
    # *************************************************************************
    # *************************************************************************
    @pytest.mark.parametrize(
        "use_prices_block, node_price_model", 
        [(True, NODE_PRICE_OTHER),
         (True, NODE_PRICE_DELTA),
         (True, NODE_PRICE_LAMBDA),
         (False, NODE_PRICE_OTHER),
         (False, NODE_PRICE_DELTA),
        )
    def test_problem_decreasing_imp_prices(self, use_prices_block, node_price_model):
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        
        # assessment
        q = 0

        tf = EconomicTimeFrame(
            discount_rate=0.0,
            reporting_periods={q: (0,)},
            reporting_period_durations={q: (365 * 24 * 3600,)},
            time_intervals={q: (0,)},
            time_interval_durations={q: (1,)},
        )

        # 2 nodes: one import, one regular
        mynet = Network()

        # import node
        node_IMP = 'I'
        mynet.add_import_node(
            node_key=node_IMP,
            prices={
                qpk: ResourcePrice(prices=[2.0, 1.0], volumes=[0.5, 3.0])
                for qpk in tf.qpk()
            },
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        )

        # other nodes
        node_A = 'A'
        mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 1.0})

        # arc IA
        arc_tech_IA = Arcs(
            name="any",
            efficiency={(q, 0): 0.5},
            efficiency_reverse=None,
            static_loss=None,
            capacity=[3],
            minimum_cost=[2],
            specific_capacity_cost=1,
            capacity_is_instantaneous=False,
            validate=False,
        )
        mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)

        # no sos, regular time intervals
            solver='scip' if node_price_model == NODE_PRICE_LAMBDA else 'glpk',
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            solver_options={},
            perform_analysis=False,
            plot_results=False,  # True,
            print_solver_output=False, 
            time_frame=tf,
            networks={"mynet": mynet},
            static_losses_mode=True,  # just to reach a line,
            mandatory_arcs=[],
            max_number_parallel_arcs={},
            use_prices_block=use_prices_block,
            node_price_model=node_price_model
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        )

        assert not ipp.has_peak_total_assessments()
        # print('hey')
        # print((use_prices_block, node_price_model))
        # print(ipp.results["Problem"][0])
        if (use_prices_block, node_price_model) == (True, NODE_PRICE_OTHER):
            assert ipp.results["Problem"][0]["Number of constraints"] == 14
            assert ipp.results["Problem"][0]["Number of variables"] == 13
            assert ipp.results["Problem"][0]["Number of nonzeros"] == 28
        elif (use_prices_block, node_price_model) == (False, NODE_PRICE_OTHER):
            assert ipp.results["Problem"][0]["Number of constraints"] == 14
            assert ipp.results["Problem"][0]["Number of variables"] == 13
            assert ipp.results["Problem"][0]["Number of nonzeros"] == 28
        elif (use_prices_block, node_price_model) == (True, NODE_PRICE_DELTA):
            assert ipp.results["Problem"][0]["Number of constraints"] == 15
            assert ipp.results["Problem"][0]["Number of variables"] == 14
            assert ipp.results["Problem"][0]["Number of nonzeros"] == 30
        elif (use_prices_block, node_price_model) == (False, NODE_PRICE_DELTA):
            assert ipp.results["Problem"][0]["Number of constraints"] == 15
            assert ipp.results["Problem"][0]["Number of variables"] == 14
            assert ipp.results["Problem"][0]["Number of nonzeros"] == 30
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        # *********************************************************************
        # *********************************************************************

        # validation

        # the arc should be installed since it is required for feasibility
        assert (
            True
            in ipp.networks["mynet"]
            .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
            .options_selected
        )

        # the flows should be 1.0, 0.0 and 2.0
        assert math.isclose(
            pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]),
            2.0,
            abs_tol=1e-6,
        )

        # arc amplitude should be two
        assert math.isclose(
            pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]),
            2.0,
            abs_tol=0.01,
        )

        # capex should be four
        assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3)

        # sdncf should be -2.5
        assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -2.5, abs_tol=1e-3)

        # the objective function should be -7.5
        assert math.isclose(pyo.value(ipp.instance.obj_f), -6.5, abs_tol=1e-3)
        
    # *************************************************************************
    # *************************************************************************
    
    @pytest.mark.parametrize(
        "use_prices_block, node_price_model", 
        [(True, NODE_PRICE_OTHER),
         (True, NODE_PRICE_DELTA),
         (True, NODE_PRICE_LAMBDA),
         (False, NODE_PRICE_OTHER),
         (False, NODE_PRICE_DELTA),
         (False, NODE_PRICE_LAMBDA)
         ]
        )
    def test_problem_decreasing_imp_prices2(self, use_prices_block, node_price_model):
        
        # assessment
        q = 0

        tf = EconomicTimeFrame(
            discount_rate=0.0,
            reporting_periods={q: (0,)},
            reporting_period_durations={q: (365 * 24 * 3600,)},
            time_intervals={q: (0,)},
            time_interval_durations={q: (1,)},
        )

        # 2 nodes: one import, one regular
        mynet = Network()

        # import node
        node_IMP = 'I'
        mynet.add_import_node(
            node_key=node_IMP,
            prices={
                qpk: ResourcePrice(prices=[2.0, 1.0], volumes=[0.5, 3.0])
                for qpk in tf.qpk()
            },
        )

        # other nodes
        node_A = 'A'
        mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 0.25})

        # arc IA
        arc_tech_IA = Arcs(
            name="any",
            efficiency={(q, 0): 0.5},
            efficiency_reverse=None,
            static_loss=None,
            capacity=[3],
            minimum_cost=[2],
            specific_capacity_cost=1,
            capacity_is_instantaneous=False,
            validate=False,
        )
        mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)

        # no sos, regular time intervals
        ipp = build_solve_ipp(
            solver='scip' if node_price_model == NODE_PRICE_LAMBDA else 'glpk',
            solver_options={},
            perform_analysis=False,
            plot_results=False,  # True,
            print_solver_output=False, 
            time_frame=tf,
            networks={"mynet": mynet},
            static_losses_mode=True,  # just to reach a line,
            mandatory_arcs=[],
            max_number_parallel_arcs={},
            simplify_problem=False,
            use_prices_block=use_prices_block,
            node_price_model=node_price_model
        )

        assert not ipp.has_peak_total_assessments()
        # print('hey')
        # # print((use_prices_block, node_price_model))
        # print(ipp.results["Problem"][0])
        # # capex should be four
        # print(pyo.value(ipp.instance.var_capex))
        # print(pyo.value(ipp.instance.var_sdncf_q[q]))
        # print(pyo.value(ipp.instance.obj_f))
        if (use_prices_block, node_price_model) == (True, NODE_PRICE_OTHER):
            assert ipp.results["Problem"][0]["Number of constraints"] == 14
            assert ipp.results["Problem"][0]["Number of variables"] == 13
            assert ipp.results["Problem"][0]["Number of nonzeros"] == 28
        elif (use_prices_block, node_price_model) == (False, NODE_PRICE_OTHER):
            assert ipp.results["Problem"][0]["Number of constraints"] == 14
            assert ipp.results["Problem"][0]["Number of variables"] == 13
            assert ipp.results["Problem"][0]["Number of nonzeros"] == 28
        elif (use_prices_block, node_price_model) == (True, NODE_PRICE_DELTA):
            assert ipp.results["Problem"][0]["Number of constraints"] == 15
            assert ipp.results["Problem"][0]["Number of variables"] == 14
            assert ipp.results["Problem"][0]["Number of nonzeros"] == 30
        elif (use_prices_block, node_price_model) == (False, NODE_PRICE_DELTA):
            assert ipp.results["Problem"][0]["Number of constraints"] == 15
            assert ipp.results["Problem"][0]["Number of variables"] == 14
            assert ipp.results["Problem"][0]["Number of nonzeros"] == 30
        
        # *********************************************************************
        # *********************************************************************

        # validation

        # the arc should be installed since it is required for feasibility
        assert (
            True
            in ipp.networks["mynet"]
            .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
            .options_selected
        )

        # the flows should be 0.5
        assert math.isclose(
            pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]),
            0.5,
            abs_tol=1e-6,
        )

        # arc amplitude should be 0.5
        assert math.isclose(
            pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]),
            0.5,
            abs_tol=0.01,
        )

        # capex should be four
        assert math.isclose(pyo.value(ipp.instance.var_capex), 2.5, abs_tol=1e-3)

        # sdncf should be -2.5
        assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -1.0, abs_tol=1e-3)

        # the objective function should be -7.5
        assert math.isclose(pyo.value(ipp.instance.obj_f), -3.5, abs_tol=1e-3)
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
                
    # *************************************************************************
    # *************************************************************************

    @pytest.mark.parametrize(
        "use_prices_block, node_price_model", 
        [(True, NODE_PRICE_OTHER),
         (True, NODE_PRICE_DELTA),
         (True, NODE_PRICE_LAMBDA),
         (False, NODE_PRICE_OTHER),
         (False, NODE_PRICE_DELTA),
         (False, NODE_PRICE_LAMBDA)]
        )
    def test_problem_decreasing_imp_prices_infinite_capacity(self, use_prices_block, node_price_model):
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        
        # assessment
        q = 0

        tf = EconomicTimeFrame(
            discount_rate=0.0,
            reporting_periods={q: (0,)},
            reporting_period_durations={q: (365 * 24 * 3600,)},
            time_intervals={q: (0,)},
            time_interval_durations={q: (1,)},
        )

        # 2 nodes: one import, one regular
        mynet = Network()

        # import node
        node_IMP = 'I'
        mynet.add_import_node(
            node_key=node_IMP,
            prices={
                qpk: ResourcePrice(prices=[2.0, 1.0], volumes=[0.5, None])
                for qpk in tf.qpk()
            },
        )

        # other nodes
        node_A = 'A'
        mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 1.0})

        # arc IA
        arc_tech_IA = Arcs(
            name="any",
            efficiency={(q, 0): 0.5},
            efficiency_reverse=None,
            static_loss=None,
            capacity=[3],
            minimum_cost=[2],
            specific_capacity_cost=1,
            capacity_is_instantaneous=False,
            validate=False,
        )
        mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)
        
        # trigger the error
        error_raised = False
        try:
            # no sos, regular time intervals
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
                solver_options={},
                perform_analysis=False,
                plot_results=False,  # True,
                print_solver_output=False,
                time_frame=tf,
                networks={"mynet": mynet},
                static_losses_mode=True,  # just to reach a line,
                mandatory_arcs=[],
                max_number_parallel_arcs={},
                simplify_problem=False,
            )
        except Exception:
            error_raised = True
        assert error_raised

    # *************************************************************************
    # *************************************************************************

    @pytest.mark.parametrize(
        "use_prices_block, node_price_model", 
        [(True, NODE_PRICE_OTHER),
         (True, NODE_PRICE_DELTA),
         (True, NODE_PRICE_LAMBDA),
         (False, NODE_PRICE_OTHER),
         (False, NODE_PRICE_DELTA),
         (False, NODE_PRICE_LAMBDA)]
        )
    def test_problem_decreasing_exp_prices(self, use_prices_block, node_price_model):
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        # assessment
        q = 0
        # time
        number_intervals = 1
        # periods
        number_periods = 1

        tf = EconomicTimeFrame(
            discount_rate=0.0,
            reporting_periods={q: (0,)},
            reporting_period_durations={q: (365 * 24 * 3600,)},
            time_intervals={q: (0,)},
            time_interval_durations={q: (1,)},
        )

        # 2 nodes: one export, one regular
        mynet = Network()

        # import node
        node_EXP = generate_pseudo_unique_key(mynet.nodes())
        prices = [2.0, 1.0]
        volumes = [0.5, None] if node_price_model == NODE_PRICE_OTHER else [0.5, 1e5]
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        mynet.add_export_node(
            node_key=node_EXP,
            prices={
                (q, p, k): ResourcePrice(prices=prices, volumes=volumes)
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
                for p in range(number_periods)
                for k in range(number_intervals)
            },
        )

        # other nodes
        node_A = 'A'
        mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): -1.0})

        # arc IA
        arc_tech_IA = Arcs(
            name="any",
            efficiency={(q, 0): 0.5},
            efficiency_reverse=None,
            static_loss=None,
            capacity=[3],
            minimum_cost=[2],
            specific_capacity_cost=1,
            capacity_is_instantaneous=False,
            validate=False,
        )
        mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_EXP, arcs=arc_tech_IA)

        # no sos, regular time intervals
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            solver_options={},
            perform_analysis=False,
            plot_results=False,  # True,
            print_solver_output=False,
            time_frame=tf,
            networks={"mynet": mynet},
            static_losses_mode=True,  # just to reach a line,
            mandatory_arcs=[],
            max_number_parallel_arcs={},
            simplify_problem=False,
            use_prices_block=use_prices_block,
            node_price_model=node_price_model
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        )

        assert not ipp.has_peak_total_assessments()

        # *********************************************************************
        # *********************************************************************

        # validation

        # the arc should be installed since it is required for feasibility
        assert (
            True
            in ipp.networks["mynet"]
            .edges[(node_A, node_EXP, 0)][Network.KEY_ARC_TECH]
            .options_selected
        )

        # the flows should be 1.0, 0.0 and 2.0
        assert math.isclose(
            pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 0)]),
            1.0,
            abs_tol=1e-6,
        )

        # arc amplitude should be two
        assert math.isclose(
            pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_A, node_EXP, 0)]),
            1.0,
            abs_tol=0.01,
        )

        # capex should be four
        assert math.isclose(pyo.value(ipp.instance.var_capex), 3.0, abs_tol=1e-3)

        # sdncf should be 1.0
        assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), 1.0, abs_tol=1e-3)

        # the objective function should be -7.5
        assert math.isclose(pyo.value(ipp.instance.obj_f), -2.0, abs_tol=1e-3)
        
    # *************************************************************************
    # *************************************************************************

    @pytest.mark.parametrize(
        "use_prices_block, node_price_model", 
        [(True, NODE_PRICE_OTHER),
         (True, NODE_PRICE_DELTA),
         (True, NODE_PRICE_LAMBDA),
         (False, NODE_PRICE_OTHER),
         (False, NODE_PRICE_DELTA),
         (False, NODE_PRICE_LAMBDA)]
        )
    def test_problem_increasing_exp_prices(self, use_prices_block, node_price_model):
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        # assessment
        q = 0
        # time
        number_intervals = 1
        # periods
        number_periods = 1

        tf = EconomicTimeFrame(
            discount_rate=0.0,
            reporting_periods={q: (0,)},
            reporting_period_durations={q: (365 * 24 * 3600,)},
            time_intervals={q: (0,)},
            time_interval_durations={q: (1,)},
        )

        # 2 nodes: one export, one regular
        mynet = Network()

        # import node
        node_EXP = generate_pseudo_unique_key(mynet.nodes())
        mynet.add_export_node(
            node_key=node_EXP,
            prices={
                (q, p, k): ResourcePrice(prices=[1.0, 2.0], volumes=[0.25, 3.0])
                for p in range(number_periods)
                for k in range(number_intervals)
            },
        )

        # other nodes
        node_A = 'A'
        mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): -1.0})

        # arc IA
        arc_tech_IA = Arcs(
            name="any",
            efficiency={(q, 0): 0.5},
            efficiency_reverse=None,
            static_loss=None,
            capacity=[3],
            minimum_cost=[2],
            specific_capacity_cost=1,
            capacity_is_instantaneous=False,
            validate=False,
        )
        mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_EXP, arcs=arc_tech_IA)

        # no sos, regular time intervals
            solver='scip' if node_price_model == NODE_PRICE_LAMBDA else 'glpk',
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            solver_options={},
            perform_analysis=False,
            plot_results=False,  # True,
            print_solver_output=False,
            time_frame=tf,
            networks={"mynet": mynet},
            static_losses_mode=True,  # just to reach a line,
            mandatory_arcs=[],
            max_number_parallel_arcs={},
            simplify_problem=False,
            use_prices_block=use_prices_block,
            node_price_model=node_price_model
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        )
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        assert not ipp.has_peak_total_assessments()

        # *********************************************************************
        # *********************************************************************

        # validation

        # the arc should be installed since it is required for feasibility
        assert (
            True
            in ipp.networks["mynet"]
            .edges[(node_A, node_EXP, 0)][Network.KEY_ARC_TECH]
            .options_selected
        )

        # the flows should be 1.0, 0.0 and 2.0
        assert math.isclose(
            pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 0)]),
            1.0,
            abs_tol=1e-6,
        )

        # arc amplitude should be two
        assert math.isclose(
            pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_A, node_EXP, 0)]),
            1.0,
            abs_tol=0.01,
        )

        # capex should be four
        assert math.isclose(pyo.value(ipp.instance.var_capex), 3.0, abs_tol=1e-3)

        # sdncf should be 0.75
        assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), 0.75, abs_tol=1e-3)

        # the objective function should be -2.25
        assert math.isclose(pyo.value(ipp.instance.obj_f), -2.25, abs_tol=1e-3)
                
    # *************************************************************************
    # *************************************************************************

    @pytest.mark.parametrize(
        "use_prices_block, node_price_model", 
        [(True, NODE_PRICE_OTHER),
         (True, NODE_PRICE_DELTA),
         (True, NODE_PRICE_LAMBDA),
         (False, NODE_PRICE_OTHER),
         (False, NODE_PRICE_DELTA),
         (False, NODE_PRICE_LAMBDA)]
        )
    def test_problem_increasing_exp_prices_infinite_capacity(self, use_prices_block, node_price_model):
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        # assessment
        q = 0
        # time
        number_intervals = 1
        # periods
        number_periods = 1

        tf = EconomicTimeFrame(
            discount_rate=0.0,
            reporting_periods={q: (0,)},
            reporting_period_durations={q: (365 * 24 * 3600,)},
            time_intervals={q: (0,)},
            time_interval_durations={q: (1,)},
        )

        # 2 nodes: one export, one regular
        mynet = Network()

        # import node
        node_EXP = generate_pseudo_unique_key(mynet.nodes())
        mynet.add_export_node(
            node_key=node_EXP,
            prices={
                (q, p, k): ResourcePrice(prices=[1.0, 2.0], volumes=[0.25, None])
                for p in range(number_periods)
                for k in range(number_intervals)
            },
        )

        # other nodes
        node_A = 'A'
        mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): -1.0})

        # arc IA
        arc_tech_IA = Arcs(
            name="any",
            efficiency={(q, 0): 0.5},
            efficiency_reverse=None,
            static_loss=None,
            capacity=[3],
            minimum_cost=[2],
            specific_capacity_cost=1,
            capacity_is_instantaneous=False,
            validate=False,
        )
        mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_EXP, arcs=arc_tech_IA)
        
        # trigger the error
        error_raised = False
        try:
            # no sos, regular time intervals
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
                solver_options={},
                perform_analysis=False,
                plot_results=False,  # True,
                print_solver_output=False,
                time_frame=tf,
                networks={"mynet": mynet},
                static_losses_mode=True,  # just to reach a line,
                mandatory_arcs=[],
                max_number_parallel_arcs={},
                simplify_problem=False,
                use_prices_block=use_prices_block,
                node_price_model=node_price_model
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            )
        except Exception:
            error_raised = True
        assert error_raised

    # *************************************************************************
    # *************************************************************************

    @pytest.mark.parametrize(
        "use_prices_block, node_price_model", 
        [(True, NODE_PRICE_OTHER),
         (True, NODE_PRICE_DELTA),
         (True, NODE_PRICE_LAMBDA),
         (False, NODE_PRICE_OTHER),
         (False, NODE_PRICE_DELTA),
         (False, NODE_PRICE_LAMBDA)]
        )
    def test_problem_increasing_imp_decreasing_exp_prices(self, use_prices_block, node_price_model):
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        # scenario
        q = 0
        # time
        number_intervals = 2
        # periods
        number_periods = 1

        tf = EconomicTimeFrame(
            discount_rate=0.0,
            reporting_periods={q: (0,)},
            reporting_period_durations={q: (365 * 24 * 3600,)},
            time_intervals={q: (0,1)},
            time_interval_durations={q: (1,1)},
        )

        # 3 nodes: one import, one export, one regular
        mynet = Network()

        # import node
        node_IMP = 'I'
        prices = [1.0, 2.0]
        volumes = [0.5, None] if node_price_model == NODE_PRICE_OTHER else [0.5, 1e5]
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        mynet.add_import_node(
            node_key=node_IMP,
            prices={
                (q, p, k): ResourcePrice(prices=prices, volumes=volumes)
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
                for p in range(number_periods)
                for k in range(number_intervals)
            },
        )

        # export node
        node_EXP = generate_pseudo_unique_key(mynet.nodes())
        prices = [2.0, 1.0]
        volumes = [0.5, None] if node_price_model == NODE_PRICE_OTHER else [0.5, 1e5]
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        mynet.add_export_node(
            node_key=node_EXP,
            prices={
                (q, p, k): ResourcePrice(prices=prices, volumes=volumes)
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
                for p in range(number_periods)
                for k in range(number_intervals)
            },
        )

        # other nodes
        node_A = 'A'
        mynet.add_source_sink_node(
            node_key=node_A, base_flow={(q, 0): 1.0, (q, 1): -1.0}
        )

        # arc IA
        arc_tech_IA = Arcs(
            name="any",
            efficiency={(q, 0): 0.5, (q, 1): 0.5},
            efficiency_reverse=None,
            static_loss=None,
            capacity=[3],
            minimum_cost=[2],
            specific_capacity_cost=1,
            capacity_is_instantaneous=False,
            validate=False,
        )
        mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)

        # arc AE
        arc_tech_AE = Arcs(
            name="any",
            efficiency={(q, 0): 0.5, (q, 1): 0.5},
            efficiency_reverse=None,
            static_loss=None,
            capacity=[3],
            minimum_cost=[2],
            specific_capacity_cost=1,
            capacity_is_instantaneous=False,
            validate=False,
        )
        mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_EXP, arcs=arc_tech_AE)

        # no sos, regular time intervals
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            solver_options={},
            perform_analysis=False,
            plot_results=False,  # True,
            print_solver_output=False,
            time_frame=tf,
            networks={"mynet": mynet},
            static_losses_mode=True,  # just to reach a line,
            mandatory_arcs=[],
            max_number_parallel_arcs={},
            simplify_problem=False,
            # discount_rates={0: (0.0,)},
            use_prices_block=use_prices_block,
            node_price_model=node_price_model
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        )

        assert not ipp.has_peak_total_assessments()

        # *********************************************************************
        # *********************************************************************

        # validation

        # the arc should be installed since it is required for feasibility
        assert (
            True
            in ipp.networks["mynet"]
            .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
            .options_selected
        )
        # the arc should be installed since it is required for feasibility
        assert (
            True
            in ipp.networks["mynet"]
            .edges[(node_A, node_EXP, 0)][Network.KEY_ARC_TECH]
            .options_selected
        )

        # interval 0: import only
        assert math.isclose(
            pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]),
            2.0,
            abs_tol=1e-6,
        )
        assert math.isclose(
            pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 0)]),
            0.0,
            abs_tol=1e-6,
        )
        # interval 1: export only
        assert math.isclose(
            pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 1)]),
            0.0,
            abs_tol=1e-6,
        )
        assert math.isclose(
            pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 1)]),
            1.0,
            abs_tol=1e-6,
        )

        # IA amplitude
        assert math.isclose(
            pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]),
            2.0,
            abs_tol=0.01,
        )
        # AE amplitude
        assert math.isclose(
            pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_A, node_EXP, 0)]),
            1.0,
            abs_tol=0.01,
        )

        # capex should be 7.0: 4+3
        assert math.isclose(pyo.value(ipp.instance.var_capex), 7.0, abs_tol=1e-3)

        # sdncf should be -2.5: -3.5+1.0
        assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -2.5, abs_tol=1e-3)

        # the objective function should be -9.5: -7.5-2.5
        assert math.isclose(pyo.value(ipp.instance.obj_f), -9.5, abs_tol=1e-3)

            
    # *************************************************************************
    # *************************************************************************
    @pytest.mark.parametrize(
        "use_prices_block, node_price_model", 
        [(True, NODE_PRICE_OTHER),
         (True, NODE_PRICE_DELTA),
         (True, NODE_PRICE_LAMBDA),
         (False, NODE_PRICE_OTHER),
         (False, NODE_PRICE_DELTA),
         (False, NODE_PRICE_LAMBDA)]
        )
    def test_direct_imp_exp_network_higher_exp_prices(self, use_prices_block, node_price_model):
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        
        # time frame
        q = 0
        tf = EconomicTimeFrame(
            discount_rate=3.5/100,
            reporting_periods={q: (0,1)},
            reporting_period_durations={q: (365 * 24 * 3600,365 * 24 * 3600)},
            time_intervals={q: (0,1)},
            time_interval_durations={q: (1,1)},
        )    
        
        # 4 nodes: one import, one export, two supply/demand nodes
        mynet = Network()
    
        # import node
        imp_node_key = 'thatimpnode'
        imp_prices = {
            qpk: ResourcePrice(
                prices=0.5,
                volumes=None if node_price_model == NODE_PRICE_OTHER else 1e4,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            )
            for qpk in tf.qpk()
            }
        mynet.add_import_node(
            node_key=imp_node_key,
            prices=imp_prices
        )
    
        # export node
        exp_node_key = 'thatexpnode'
        exp_prices = {
            qpk: ResourcePrice(
                prices=1.5,
                volumes=None if node_price_model == NODE_PRICE_OTHER else 1e4,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            )
            for qpk in tf.qpk()
            }
        mynet.add_export_node(
            node_key=exp_node_key,
            prices=exp_prices,
        )
        
        # add arc without fixed losses from import node to export
        arc_tech_IE = Arcs(
            name="IE",
            # efficiency=[1, 1, 1, 1],
            efficiency={(0, 0): 1, (0, 1): 1, (0, 2): 1, (0, 3): 1},
            efficiency_reverse=None,
            static_loss=None,
            validate=False,
            capacity=[0.5, 1.0, 2.0],
            minimum_cost=[5, 5.1, 5.2],
            specific_capacity_cost=1,
            capacity_is_instantaneous=False,
        )
        mynet.add_directed_arc(
            node_key_a=imp_node_key, node_key_b=exp_node_key, arcs=arc_tech_IE
        )
    
        # no sos, regular time intervals
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            solver_options={},
            perform_analysis=False,
            plot_results=False,  # True,
            print_solver_output=False,
            networks={"mynet": mynet},
            time_frame=tf,
            static_losses_mode=InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP,
            mandatory_arcs=[],
            use_prices_block=use_prices_block,
            node_price_model=node_price_model
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        )
    
        # export prices are higher: it makes sense to install the arc since the
        # revenue (@ max. cap.) exceeds the cost of installing the arc

        assert (
            True
            in ipp.networks["mynet"]
            .edges[(imp_node_key, exp_node_key, 0)][Network.KEY_ARC_TECH]
            .options_selected
        )
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        # overview
        (imports_qpk, 
         exports_qpk, 
         balance_qpk, 
         import_costs_qpk, 
         export_revenue_qpk, 
         ncf_qpk, 
         aggregate_static_demand_qpk,
         aggregate_static_supply_qpk,
         aggregate_static_balance_qpk) = statistics(ipp)

        # there should be no imports

        abs_tol = 1e-6
        
        abs_tol = 1e-3
        imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0)
        assert imports_qp > 0.0 - abs_tol

        abs_tol = 1e-3
        import_costs_qp = sum(import_costs_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0)
        assert import_costs_qp > 0.0 - abs_tol

        # there should be no exports

        abs_tol = 1e-2

        exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q])
        export_revenue_qp = sum(export_revenue_qpk[(q, 0, k)] for k in tf.time_intervals[q])
        assert exports_qp > 0.0 - abs_tol
        assert export_revenue_qp > 0.0 - abs_tol

        # the revenue should exceed the costs

        abs_tol = 1e-2

        assert (
            export_revenue_qp > import_costs_qp - abs_tol
        )

        # the capex should be positive

        abs_tol = 1e-6

        assert pyo.value(ipp.instance.var_capex) > 0 - abs_tol
        
    # *************************************************************************
    # *************************************************************************

# *****************************************************************************
# *****************************************************************************