Skip to content
Snippets Groups Projects
test_esipp_problem.py 40.5 KiB
Newer Older
# imports

# standard
import math
from statistics import mean

# local
# import numpy as np
# import networkx as nx
import pyomo.environ as pyo
# import src.topupopt.problems.esipp.utils as utils
from src.topupopt.data.misc.utils import generate_pseudo_unique_key
from src.topupopt.problems.esipp.problem import InfrastructurePlanningProblem
from src.topupopt.problems.esipp.network import Arcs, Network
from src.topupopt.problems.esipp.resource import ResourcePrice
from src.topupopt.problems.esipp.problem import simplify_peak_total_problem
from src.topupopt.problems.esipp.problem import is_peak_total_problem
from src.topupopt.problems.esipp.time import TimeFrame

# *****************************************************************************
# *****************************************************************************

Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
class TestESIPPProblem:
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        self,
        solver: str = "glpk",
        solver_options: dict = None,
        use_sos_arcs: bool = False,
        arc_sos_weight_key: str = (InfrastructurePlanningProblem.SOS1_ARC_WEIGHTS_NONE),
        arc_use_real_variables_if_possible: bool = False,
        use_sos_sense: bool = False,
        sense_sos_weight_key: int = (
            InfrastructurePlanningProblem.SOS1_SENSE_WEIGHT_NOMINAL_HIGHER
        ),
        sense_use_real_variables_if_possible: bool = False,
        sense_use_arc_interfaces: bool = False,
        perform_analysis: bool = False,
        plot_results: bool = False,
        print_solver_output: bool = False,
        time_frame: TimeFrame = None,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        networks: dict = None,
        converters: dict = None,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        static_losses_mode=None,
        mandatory_arcs: list = None,
        max_number_parallel_arcs: dict = None,
        arc_groups_dict: dict = None,
        init_aux_sets: bool = False,
        discount_rates: dict = None,
        assessment_weights: dict = None,
        simplify_problem: bool = False,
    ):
        if type(assessment_weights) != dict:
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            assessment_weights = {}  # default

        if type(converters) != dict:
            converters = {}
        # one interval twice as long as the average is worth twice
        # one interval half as long as the average is worth half
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        #     [time_period_duration/average_time_interval_duration
        #       for time_period_duration in intraperiod_time_interval_duration]
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed

        time_weights = None  # nothing yet

        normalised_time_interval_duration = None  # nothing yet

        ipp = InfrastructurePlanningProblem(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            discount_rates=discount_rates,
            time_frame=time_frame,
            # reporting_periods=time_frame.reporting_periods,
            # time_intervals=time_frame.time_interval_durations,
            time_weights=time_weights,
            normalised_time_interval_duration=normalised_time_interval_duration,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            assessment_weights=assessment_weights,
        )

        for netkey, net in networks.items():
            ipp.add_network(network_key=netkey, network=net)
        # add converters

        for cvtkey, cvt in converters.items():
            ipp.add_converter(converter_key=cvtkey, converter=cvt)

        if type(mandatory_arcs) == list:
            for full_arc_key in mandatory_arcs:
                ipp.make_arc_mandatory(full_arc_key[0], full_arc_key[1:])
        #         for arc_key in ipp.networks[network_key].edges(keys=True):
        #             if ipp.networks[network_key].edges[arc_key][
        #                     Network.KEY_ARC_TECH].has_been_selected():
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed

        #                 continue

        #             ipp.make_arc_mandatory(network_key, arc_key)
        # set up the use of sos for arc selection
        if use_sos_arcs:
            for network_key in ipp.networks:
                for arc_key in ipp.networks[network_key].edges(keys=True):
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
                    if (
                        ipp.networks[network_key]
                        .edges[arc_key][Network.KEY_ARC_TECH]
                        .has_been_selected()
                    ):
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
                        network_key,
                        arc_key,
                        use_real_variables_if_possible=(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
                            arc_use_real_variables_if_possible
                        ),
                        sos1_weight_method=arc_sos_weight_key,
                    )

        # set up the use of sos for flow sense determination
        if use_sos_sense:
            for network_key in ipp.networks:
                for arc_key in ipp.networks[network_key].edges(keys=True):
                    if not ipp.networks[network_key].edges[arc_key][
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
                        Network.KEY_ARC_UND
                    ]:
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
                        network_key,
                        arc_key,
                        use_real_variables_if_possible=(
                            sense_use_real_variables_if_possible
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
                        ),
                        use_interface_variables=sense_use_arc_interfaces,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
                        sos1_weight_method=sense_sos_weight_key,
                    )

        elif sense_use_arc_interfaces:  # set up the use of arc interfaces w/o sos1
            for network_key in ipp.networks:
                for arc_key in ipp.networks[network_key].edges(keys=True):
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
                    if (
                        ipp.networks[network_key]
                        .edges[arc_key][Network.KEY_ARC_TECH]
                        .has_been_selected()
                    ):
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed

                    ipp.use_interface_variables_for_arc_selection(network_key, arc_key)

        if static_losses_mode == ipp.STATIC_LOSS_MODE_ARR:
            ipp.place_static_losses_arrival_node()
        elif static_losses_mode == ipp.STATIC_LOSS_MODE_DEP:
            ipp.place_static_losses_departure_node()
        elif static_losses_mode == ipp.STATIC_LOSS_MODE_US:
            ipp.place_static_losses_upstream()
        elif static_losses_mode == ipp.STATIC_LOSS_MODE_DS:
            ipp.place_static_losses_downstream()
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            raise ValueError("Unknown static loss modelling mode.")

        # *********************************************************************
        if type(arc_groups_dict) != type(None):
            for key in arc_groups_dict:
                ipp.create_arc_group(arc_groups_dict[key])
        # *********************************************************************
        for key in max_number_parallel_arcs:
            ipp.set_maximum_number_parallel_arcs(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
                network_key=key[0],
                node_a=key[1],
                node_b=key[2],
                limit=max_number_parallel_arcs[key],
            )

        # *********************************************************************
        if simplify_problem:
            ipp = simplify_peak_total_problem(ipp)
        # *********************************************************************
        # instantiate (disable the default case v-a-v fixed losses)
        # ipp.instantiate(place_fixed_losses_upstream_if_possible=False)
        ipp.instantiate(initialise_ancillary_sets=init_aux_sets)
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed

        ipp.optimise(
            solver_name=solver,
            solver_options=solver_options,
            output_options={},
            print_solver_output=print_solver_output,
        )

        # *********************************************************************
        # *********************************************************************
    # *************************************************************************
    # *************************************************************************
    def test_single_network_single_arc_problem(self):
        # scenario
        q = 0
        # time
        number_intervals = 3

        tf = TimeFrame(
            reporting_periods={q: (0, 1)},
            reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)},
            time_intervals={q: (0, 1, 2)},
            time_interval_durations={q: (1, 1, 1)},
        # 2 nodes: one import, one regular
        mynet = Network()
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed

        # import node
        node_IMP = generate_pseudo_unique_key(mynet.nodes())
        mynet.add_import_node(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            node_key=node_IMP,
                # (q, p, k): ResourcePrice(prices=1.0, volumes=None)
                # for p in range(number_periods)
                # for k in range(number_intervals)
                qpk: ResourcePrice(prices=1.0, volumes=None)
                for qpk in tf.qpk()
        node_A = generate_pseudo_unique_key(mynet.nodes())
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            node_key=node_A,
            # base_flow=[0.5, 0.0, 1.0],
            base_flow={(q, 0): 0.50, (q, 1): 0.00, (q, 2): 1.00},
        )

Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            name="any",
            efficiency={qk: 0.5 for qk in tf.qk()},
            efficiency_reverse=None,
            static_loss=None,
            capacity=[3],
            minimum_cost=[2],
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            specific_capacity_cost=1,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            validate=False,
        )

        mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)

        ipp = self.build_solve_ipp(
            # solver=solver,
            solver_options={},
            # use_sos_arcs=use_sos_arcs,
            # arc_sos_weight_key=sos_weight_key,
            # arc_use_real_variables_if_possible=use_real_variables_if_possible,
            # use_sos_sense=use_sos_sense,
            # sense_sos_weight_key=sense_sos_weight_key,
            # sense_use_real_variables_if_possible=sense_use_real_variables_if_possible,
            # sense_use_arc_interfaces=use_arc_interfaces,
            perform_analysis=False,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            plot_results=False,  # True,
            print_solver_output=False,
            # irregular_time_intervals=irregular_time_intervals,
            time_frame=tf,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            networks={"mynet": mynet},
            static_losses_mode=True,  # just to reach a line,
            mandatory_arcs=[],
            max_number_parallel_arcs={},
            discount_rates={0: tuple([0.035, 0.035])},
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            simplify_problem=False,
        )

Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        assert ipp.results["Problem"][0]["Number of constraints"] == 24
        assert ipp.results["Problem"][0]["Number of variables"] == 22
        assert ipp.results["Problem"][0]["Number of nonzeros"] == 49

        # *********************************************************************
        # *********************************************************************
        # the arc should be installed since it is required for feasibility
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        assert (
            True
            in ipp.networks["mynet"]
            .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
            .options_selected
        )

        # the flows should be 1.0, 0.0 and 2.0
        assert math.isclose(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]),
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            abs_tol=1e-6,
        )
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 1)]),
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            abs_tol=1e-6,
        )
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 2)]),
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            abs_tol=1e-6,
        )

        # arc amplitude should be two
        assert math.isclose(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]),
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            abs_tol=0.01,
        )

        # capex should be four
        assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3)
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -5.7, abs_tol=1e-3)

        # the objective function should be -9.7
        assert math.isclose(pyo.value(ipp.instance.obj_f), -9.7, abs_tol=1e-3)
    # *************************************************************************
    # *************************************************************************
    def test_single_network_single_arc_problem_simpler(self):
        # scenario
        q = 0
        # time
        number_intervals = 3

        tf = TimeFrame(
            reporting_periods={q: (0, 1)},
            reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)},
            time_intervals={q: (0, 1, 2)},
            time_interval_durations={q: (1, 1, 1)},
        # 2 nodes: one import, one regular
        mynet = Network()
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed

        # import node
        # node_IMP = generate_pseudo_unique_key(mynet.nodes())
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        node_IMP = "thatimpnode"
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            node_key=node_IMP,
                # (q, p, k): ResourcePrice(prices=1.0, volumes=None)
                # for p in range(number_periods)
                # for k in range(number_intervals)
                qpk: ResourcePrice(prices=1.0, volumes=None)
                for qpk in tf.qpk()
        # node_A = generate_pseudo_unique_key(mynet.nodes())
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        node_A = "thatnodea"

Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            node_key=node_A,
            # base_flow=[0.5, 0.0, 1.0],
            base_flow={(q, 0): 0.50, (q, 1): 0.00, (q, 2): 1.00},
        )

Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            name="any",
            efficiency={qk: 0.5 for qk in tf.qk()},
            efficiency_reverse=None,
            static_loss=None,
            capacity=[3],
            minimum_cost=[2],
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            specific_capacity_cost=1,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            validate=False,
        )

        mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)

        ipp = self.build_solve_ipp(
            # solver=solver,
            solver_options={},
            # use_sos_arcs=use_sos_arcs,
            # arc_sos_weight_key=sos_weight_key,
            # arc_use_real_variables_if_possible=use_real_variables_if_possible,
            # use_sos_sense=use_sos_sense,
            # sense_sos_weight_key=sense_sos_weight_key,
            # sense_use_real_variables_if_possible=sense_use_real_variables_if_possible,
            # sense_use_arc_interfaces=use_arc_interfaces,
            perform_analysis=False,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            plot_results=False,  # True,
            time_frame=tf,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            networks={"mynet": mynet},
            static_losses_mode=True,  # just to reach a line,
            mandatory_arcs=[],
            max_number_parallel_arcs={},
            discount_rates={0: tuple([0.035, 0.035])},
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            simplify_problem=True,
        )

Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        assert ipp.results["Problem"][0]["Number of constraints"] == 20
        assert ipp.results["Problem"][0]["Number of variables"] == 19
        assert ipp.results["Problem"][0]["Number of nonzeros"] == 36

        # *********************************************************************
        # *********************************************************************
        # the arc should be installed since it is required for feasibility
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        assert (
            True
            in ipp.networks["mynet"]
            .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
            .options_selected
        )

        # capex should be four
        assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3)
        # the objective function should be -9.7
        assert math.isclose(pyo.value(ipp.instance.obj_f), -9.7, abs_tol=1e-3)
        # TODO: create method to automate getting data from the command line
        import io
        import sys
        from contextlib import redirect_stdout
        # print('wow wow wow')
        # ipp.instance.constr_imp_flow_cost.pprint()
        expected_string = """constr_imp_flow_cost : Size=4, Index=constr_imp_flow_cost_index, Active=True\n    Key                                     : Lower : Body                                                                                      : Upper : Active\n     ('mynet', 'thatimpnode', 'peak', 0, 0) :   0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,0,0] :   0.0 :   True\n     ('mynet', 'thatimpnode', 'peak', 1, 0) :   0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,1,0] :   0.0 :   True\n    ('mynet', 'thatimpnode', 'total', 0, 0) :   0.0 : var_if_glqpks[mynet,thatimpnode,total,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,0,0] :   0.0 :   True\n    ('mynet', 'thatimpnode', 'total', 1, 0) :   0.0 : var_if_glqpks[mynet,thatimpnode,total,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,1,0] :   0.0 :   True\n"""
        cmd_output = io.StringIO()
        sys.stdout = cmd_output
        ipp.instance.constr_imp_flow_cost.pprint()
        sys.stdout = sys.__stdout__
        assert cmd_output.getvalue() == expected_string
        expected_string = """constr_exp_flow_revenue : Size=0, Index=constr_exp_flow_revenue_index, Active=True\n    Key : Lower : Body : Upper : Active\n"""
        f = io.StringIO()
        with redirect_stdout(f):
            ipp.instance.constr_exp_flow_revenue.pprint()
        assert f.getvalue() == expected_string
        # try the whole model
        # print('wow wow wow')
        # ipp.instance.pprint()
        # expected_string = """constr_imp_flow_cost : Size=4, Index=constr_imp_flow_cost_index, Active=True\n    Key                                     : Lower : Body                                                                                      : Upper : Active\n     ('mynet', 'thatimpnode', 'peak', 0, 0) :   0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,0,0] :   0.0 :   True\n     ('mynet', 'thatimpnode', 'peak', 1, 0) :   0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,1,0] :   0.0 :   True\n    ('mynet', 'thatimpnode', 'total', 0, 0) :   0.0 : var_if_glqpks[mynet,thatimpnode,total,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,0,0] :   0.0 :   True\n    ('mynet', 'thatimpnode', 'total', 1, 0) :   0.0 : var_if_glqpks[mynet,thatimpnode,total,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,1,0] :   0.0 :   True\n"""
        # cmd_output = io.StringIO()
        # sys.stdout = cmd_output
        # ipp.instance.pprint()
        # sys.stdout = sys.__stdout__
        # assert cmd_output.getvalue() == expected_string
        # from contextlib import redirect_stdout
        # import io

        # ipp.instance.constr_imp_flow_cost.pprint() # only one constraint
        # f = io.StringIO()
        # with redirect_stdout(f):
        #     # ipp.instance.pprint() # full model
        #     ipp.instance.constr_imp_flow_cost.pprint() # only one constraint
        # expected_string = r"""constr_imp_flow_cost : Size=4, Index=constr_imp_flow_cost_index, Active=True
        #     Key                                     : Lower : Body                                                                                      : Upper : Active
        #      ('mynet', 'thatimpnode', 'peak', 0, 0) :   0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,0,0] :   0.0 :   True
        #      ('mynet', 'thatimpnode', 'peak', 1, 0) :   0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,1,0] :   0.0 :   True
        #     ('mynet', 'thatimpnode', 'total', 0, 0) :   0.0 : var_if_glqpks[mynet,thatimpnode,total,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,0,0] :   0.0 :   True
        #     ('mynet', 'thatimpnode', 'total', 1, 0) :   0.0 : var_if_glqpks[mynet,thatimpnode,total,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,1,0] :   0.0 :   True
        # """
        # assert expected_string == f.getvalue()
        # from contextlib import redirect_stdout
        # import io
        # f = io.StringIO()
        # with redirect_stdout(f):
        #     print('foobar')
        #     print(12)
        #     12+3
        # print('Got stdout: "{0}"'.format(f.getvalue()))
    # *************************************************************************
    # *************************************************************************
    def test_problem_increasing_imp_prices(self):
        # scenario
        q = 0
        # time
        number_intervals = 1
        # # periods
        # number_periods = 1
        tf = TimeFrame(
            reporting_periods={q: (0,)},
            reporting_period_durations={q: (365 * 24 * 3600,)},
            time_intervals={q: (0,)},
            time_interval_durations={q: (1,)},
        # 2 nodes: one import, one regular
        mynet = Network()
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed

        # import node
        node_IMP = generate_pseudo_unique_key(mynet.nodes())
        mynet.add_import_node(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            node_key=node_IMP,
                # (q, p, k): ResourcePrice(prices=[1.0, 2.0], volumes=[0.5, None])
                # for p in range(number_periods)
                # for k in range(number_intervals)
                qpk: ResourcePrice(prices=[1.0, 2.0], volumes=[0.5, None])
                for qpk in tf.qpk()
        # other nodes
        node_A = generate_pseudo_unique_key(mynet.nodes())
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 1.0})

        # arc IA
        arc_tech_IA = Arcs(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            name="any",
            efficiency={(q, 0): 0.5},
            efficiency_reverse=None,
            static_loss=None,
            capacity=[3],
            minimum_cost=[2],
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            specific_capacity_cost=1,
            capacity_is_instantaneous=False,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            validate=False,
        )
        mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)

        # identify node types
        mynet.identify_node_types()
        # no sos, regular time intervals
        ipp = self.build_solve_ipp(
            # solver=solver,
            solver_options={},
            # use_sos_arcs=use_sos_arcs,
            # arc_sos_weight_key=sos_weight_key,
            # arc_use_real_variables_if_possible=use_real_variables_if_possible,
            # use_sos_sense=use_sos_sense,
            # sense_sos_weight_key=sense_sos_weight_key,
            # sense_use_real_variables_if_possible=sense_use_real_variables_if_possible,
            # sense_use_arc_interfaces=use_arc_interfaces,
            perform_analysis=False,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            plot_results=False,  # True,
            print_solver_output=False,
            time_frame=tf,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            networks={"mynet": mynet},
            static_losses_mode=True,  # just to reach a line,
            mandatory_arcs=[],
            max_number_parallel_arcs={},
            # init_aux_sets=init_aux_sets,
            simplify_problem=False,
            # reporting_periods={0: (0,)},
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            discount_rates={0: (0.0,)},
        )

        assert not is_peak_total_problem(ipp)
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        assert ipp.results["Problem"][0]["Number of constraints"] == 10
        assert ipp.results["Problem"][0]["Number of variables"] == 11
        assert ipp.results["Problem"][0]["Number of nonzeros"] == 20

        # *********************************************************************
        # *********************************************************************
        # the arc should be installed since it is required for feasibility
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        assert (
            True
            in ipp.networks["mynet"]
            .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
            .options_selected
        )

        # the flows should be 1.0, 0.0 and 2.0
        assert math.isclose(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]),
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            abs_tol=1e-6,
        )

        # arc amplitude should be two
        assert math.isclose(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]),
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            abs_tol=0.01,
        )

        # capex should be four
        assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3)
        # sdncf should be -3.5
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -3.5, abs_tol=1e-3)

        # the objective function should be -7.5
        assert math.isclose(pyo.value(ipp.instance.obj_f), -7.5, abs_tol=1e-3)
    # *************************************************************************
    # *************************************************************************
    def test_problem_decreasing_exp_prices(self):
        # scenario
        q = 0
        # time
        number_intervals = 1
        # periods
        number_periods = 1
        tf = TimeFrame(
            reporting_periods={q: (0,)},
            reporting_period_durations={q: (365 * 24 * 3600,)},
            time_intervals={q: (0,)},
            time_interval_durations={q: (1,)},
        # 2 nodes: one export, one regular
        mynet = Network()
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed

        # import node
        node_EXP = generate_pseudo_unique_key(mynet.nodes())
        mynet.add_export_node(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            node_key=node_EXP,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
                (q, p, k): ResourcePrice(prices=[2.0, 1.0], volumes=[0.5, None])
                for p in range(number_periods)
                for k in range(number_intervals)
        # other nodes
        node_A = generate_pseudo_unique_key(mynet.nodes())
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): -1.0})

        # arc IA
        arc_tech_IA = Arcs(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            name="any",
            efficiency={(q, 0): 0.5},
            efficiency_reverse=None,
            static_loss=None,
            capacity=[3],
            minimum_cost=[2],
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            specific_capacity_cost=1,
            capacity_is_instantaneous=False,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            validate=False,
        )
        mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_EXP, arcs=arc_tech_IA)

        # identify node types
        mynet.identify_node_types()
        # no sos, regular time intervals
        ipp = self.build_solve_ipp(
            # solver=solver,
            solver_options={},
            # use_sos_arcs=use_sos_arcs,
            # arc_sos_weight_key=sos_weight_key,
            # arc_use_real_variables_if_possible=use_real_variables_if_possible,
            # use_sos_sense=use_sos_sense,
            # sense_sos_weight_key=sense_sos_weight_key,
            # sense_use_real_variables_if_possible=sense_use_real_variables_if_possible,
            # sense_use_arc_interfaces=use_arc_interfaces,
            perform_analysis=False,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            plot_results=False,  # True,
            print_solver_output=False,
            time_frame=tf,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            networks={"mynet": mynet},
            static_losses_mode=True,  # just to reach a line,
            mandatory_arcs=[],
            max_number_parallel_arcs={},
            # init_aux_sets=init_aux_sets,
            simplify_problem=False,
            # reporting_periods={0: (0,)},
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            discount_rates={0: (0.0,)},
        )

        assert not is_peak_total_problem(ipp)
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        assert ipp.results["Problem"][0]["Number of constraints"] == 10
        assert ipp.results["Problem"][0]["Number of variables"] == 11
        assert ipp.results["Problem"][0]["Number of nonzeros"] == 20

        # *********************************************************************
        # *********************************************************************
        # the arc should be installed since it is required for feasibility
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        assert (
            True
            in ipp.networks["mynet"]
            .edges[(node_A, node_EXP, 0)][Network.KEY_ARC_TECH]
            .options_selected
        )

        # the flows should be 1.0, 0.0 and 2.0
        assert math.isclose(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 0)]),
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            abs_tol=1e-6,
        )

        # arc amplitude should be two
        assert math.isclose(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_A, node_EXP, 0)]),
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            abs_tol=0.01,
        )

        # capex should be four
        assert math.isclose(pyo.value(ipp.instance.var_capex), 3.0, abs_tol=1e-3)
        # sdncf should be 1.0
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), 1.0, abs_tol=1e-3)

        # the objective function should be -7.5
        assert math.isclose(pyo.value(ipp.instance.obj_f), -2.0, abs_tol=1e-3)
    # *************************************************************************
    # *************************************************************************
    def test_problem_increasing_imp_decreasing_exp_prices(self):
        # scenario
        q = 0
        # time
        number_intervals = 2
        # periods
        number_periods = 1
        tf = TimeFrame(
            reporting_periods={q: (0,)},
            reporting_period_durations={q: (365 * 24 * 3600,)},
            time_intervals={q: (0,1)},
            time_interval_durations={q: (1,1)},
        # 3 nodes: one import, one export, one regular
        mynet = Network()
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed

        # import node
        node_IMP = generate_pseudo_unique_key(mynet.nodes())
        mynet.add_import_node(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            node_key=node_IMP,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
                (q, p, k): ResourcePrice(prices=[1.0, 2.0], volumes=[0.5, None])
                for p in range(number_periods)
                for k in range(number_intervals)
        # export node
        node_EXP = generate_pseudo_unique_key(mynet.nodes())
        mynet.add_export_node(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            node_key=node_EXP,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
                (q, p, k): ResourcePrice(prices=[2.0, 1.0], volumes=[0.5, None])
                for p in range(number_periods)
                for k in range(number_intervals)
        # other nodes
        node_A = generate_pseudo_unique_key(mynet.nodes())
        mynet.add_source_sink_node(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            node_key=node_A, base_flow={(q, 0): 1.0, (q, 1): -1.0}
        )

        # arc IA
        arc_tech_IA = Arcs(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            name="any",
            efficiency={(q, 0): 0.5, (q, 1): 0.5},
            efficiency_reverse=None,
            static_loss=None,
            capacity=[3],
            minimum_cost=[2],
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            specific_capacity_cost=1,
            capacity_is_instantaneous=False,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            validate=False,
        )
        mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)

        # arc AE
        arc_tech_AE = Arcs(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            name="any",
            efficiency={(q, 0): 0.5, (q, 1): 0.5},
            efficiency_reverse=None,
            static_loss=None,
            capacity=[3],
            minimum_cost=[2],
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            specific_capacity_cost=1,
            capacity_is_instantaneous=False,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            validate=False,
        )
        mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_EXP, arcs=arc_tech_AE)

        # identify node types
        mynet.identify_node_types()
        # no sos, regular time intervals
        ipp = self.build_solve_ipp(
            solver_options={},
            perform_analysis=False,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            plot_results=False,  # True,
            print_solver_output=False,
            time_frame=tf,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            networks={"mynet": mynet},
            static_losses_mode=True,  # just to reach a line,
            mandatory_arcs=[],
            max_number_parallel_arcs={},
            simplify_problem=False,
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            discount_rates={0: (0.0,)},
        )

        assert not is_peak_total_problem(ipp)
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        assert ipp.results["Problem"][0]["Number of constraints"] == 23
        assert ipp.results["Problem"][0]["Number of variables"] == 26
        assert ipp.results["Problem"][0]["Number of nonzeros"] == 57

        # *********************************************************************
        # *********************************************************************
        # the arc should be installed since it is required for feasibility
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        assert (
            True
            in ipp.networks["mynet"]
            .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
            .options_selected
        )
        # the arc should be installed since it is required for feasibility
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        assert (
            True
            in ipp.networks["mynet"]
            .edges[(node_A, node_EXP, 0)][Network.KEY_ARC_TECH]
            .options_selected
        )

        # interval 0: import only
        assert math.isclose(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]),
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            abs_tol=1e-6,
        )
        assert math.isclose(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 0)]),
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            abs_tol=1e-6,
        )
        # interval 1: export only
        assert math.isclose(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 1)]),
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            abs_tol=1e-6,
        )
        assert math.isclose(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 1)]),
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            abs_tol=1e-6,
        )

        # IA amplitude
        assert math.isclose(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]),
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            abs_tol=0.01,
        )
        # AE amplitude
        assert math.isclose(
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_A, node_EXP, 0)]),
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
            abs_tol=0.01,
        )

        # capex should be 7.0: 4+3
        assert math.isclose(pyo.value(ipp.instance.var_capex), 7.0, abs_tol=1e-3)
        # sdncf should be -2.5: -3.5+1.0
Pedro L. Magalhães's avatar
Pedro L. Magalhães committed
        assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -2.5, abs_tol=1e-3)

        # the objective function should be -9.5: -7.5-2.5
        assert math.isclose(pyo.value(ipp.instance.obj_f), -9.5, abs_tol=1e-3)
    # *************************************************************************
    # *************************************************************************
    def test_problem_converter_sink(self):
        # scenario
        q = 0
        # time
        number_intervals = 3
        # periods
        number_periods = 1

        tf = TimeFrame(
            reporting_periods={q: (0,)},
            reporting_period_durations={q: (365 * 24 * 3600,)},
            time_intervals={q: (0,1,2)},
            time_interval_durations={q: (1,1,1)},
        )

        # 2 nodes: one import, one regular
        mynet = Network()

        # import node
        node_IMP = generate_pseudo_unique_key(mynet.nodes())
        mynet.add_import_node(
            node_key=node_IMP,
            prices={
                (q, p, k): ResourcePrice(prices=1.0, volumes=None)
                for p in range(number_periods)
                for k in range(number_intervals)
            },
        )

        # other nodes

        node_A = generate_pseudo_unique_key(mynet.nodes())

        mynet.add_source_sink_node(
            node_key=node_A,
            # base_flow=[0.5, 0.0, 1.0],
            base_flow={(q, 0): 0.50, (q, 1): 0.00, (q, 2): 1.00},
        )

        # arc IA

        arc_tech_IA = Arcs(
            name="any",
            # efficiency=[0.5, 0.5, 0.5],
            efficiency={(q, 0): 0.5, (q, 1): 0.5, (q, 2): 0.5},
            efficiency_reverse=None,
            static_loss=None,
            capacity=[3],
            minimum_cost=[2],
            specific_capacity_cost=1,
            capacity_is_instantaneous=False,
            validate=False,
        )

        mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)

        # identify node types

        mynet.identify_node_types()

        # converters

        # number of samples
        time_step_durations = [1, 1, 1]
        number_time_steps = len(time_step_durations)

        # get the coefficients
        import numpy as np

        # a_innk
        a_innk = {
            ("cvt1", 0, 0, 0): 0.95,
            ("cvt1", 0, 0, 1): 0.95,
            ("cvt1", 0, 0, 2): 0.95,
        }

        # b_inmk
        b_inmk = {("cvt1", 0, 0, 0): 3, ("cvt1", 0, 0, 1): 3, ("cvt1", 0, 0, 2): 3}

        # c_irnk
        c_irnk = {}
        # d_irmk
        d_irmk = {}
        # e_x_ink: depends on fixed signals
        e_x_ink = {}
        # e_y_irk: depends on fixed signals
        e_y_irk = {}

        # get the signals
        inputs, states, outputs = get_two_node_model_signals(number_time_steps)

        # create a dynamic system
        ds = dynsys.DynamicSystem(
            time_interval_durations=time_step_durations, A=a, B=b, C=c, D=d
        )

        # create a converter
        cvn1 = cvn.Converter(
            "cvn1",
            sys=ds,
            initial_states=x0,
            turn_key_cost=3,
            inputs=inputs,
            states=states,
            outputs=outputs,
        )

        # no sos, regular time intervals

        ipp = self.build_solve_ipp(
            # solver=solver,
            solver_options={},
            # use_sos_arcs=use_sos_arcs,
            # arc_sos_weight_key=sos_weight_key,
            # arc_use_real_variables_if_possible=use_real_variables_if_possible,
            # use_sos_sense=use_sos_sense,
            # sense_sos_weight_key=sense_sos_weight_key,
            # sense_use_real_variables_if_possible=sense_use_real_variables_if_possible,
            # sense_use_arc_interfaces=use_arc_interfaces,
            perform_analysis=False,
            plot_results=False,  # True,
            print_solver_output=False,
            time_frame=tf,
            networks={"mynet": mynet},
            converters={"mycvt": cvt},
            static_losses_mode=True,  # just to reach a line,
            mandatory_arcs=[],
            max_number_parallel_arcs={},
            # init_aux_sets=init_aux_sets,
            simplify_problem=False,
        )

        assert is_peak_total_problem(ipp)
        assert ipp.results["Problem"][0]["Number of constraints"] == 24
        assert ipp.results["Problem"][0]["Number of variables"] == 22
        assert ipp.results["Problem"][0]["Number of nonzeros"] == 49

        # *********************************************************************
        # *********************************************************************

        # validation

        # the arc should be installed since it is required for feasibility
        assert (
            True
            in ipp.networks["mynet"]
            .edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
            .options_selected
        )

        # the flows should be 1.0, 0.0 and 2.0
        assert math.isclose(
            pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]),
            1.0,
            abs_tol=1e-6,
        )
        assert math.isclose(
            pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 1)]),
            0.0,
            abs_tol=1e-6,
        )
        assert math.isclose(
            pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 2)]),
            2.0,
            abs_tol=1e-6,
        )

        # arc amplitude should be two
        assert math.isclose(
            pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]),
            2.0,
            abs_tol=0.01,
        )

        # capex should be four
        assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3)

        # sdncf should be -5.7
        assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -5.7, abs_tol=1e-3)

        # the objective function should be -9.7
        assert math.isclose(pyo.value(ipp.instance.obj_f), -9.7, abs_tol=1e-3)

Pedro L. Magalhães's avatar
Pedro L. Magalhães committed

# *****************************************************************************
# *****************************************************************************