Newer
Older
7001
7002
7003
7004
7005
7006
7007
7008
7009
7010
7011
7012
7013
7014
7015
7016
7017
7018
7019
7020
7021
7022
7023
7024
7025
7026
7027
7028
7029
7030
7031
7032
7033
7034
7035
7036
7037
7038
7039
7040
7041
7042
7043
7044
7045
7046
7047
7048
7049
7050
7051
7052
7053
7054
7055
7056
7057
7058
7059
7060
7061
7062
7063
7064
7065
7066
7067
7068
7069
7070
7071
7072
7073
7074
7075
7076
7077
7078
7079
7080
7081
7082
7083
7084
7085
7086
7087
7088
7089
7090
7091
7092
7093
abs_tol=abs_tol,
)
# *************************************************************************
# *************************************************************************
def test_directed_arc_static_upstream_pre(self):
# time
number_intervals = 1
number_periods = 2
# time frame
q = 0
tf = EconomicTimeFrame(
discount_rate=3.5/100,
reporting_periods={q: (0,1)},
reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)},
time_intervals={q: (0,)},
time_interval_durations={q: (1,)},
)
# 4 nodes: two import nodes, two supply/demand nodes
mynet = Network()
# import nodes
imp1_node_key = generate_pseudo_unique_key(mynet.nodes())
mynet.add_import_node(
node_key=imp1_node_key,
prices={
(q, p, k): ResourcePrice(prices=1, volumes=None)
for p in range(number_periods)
for k in range(number_intervals)
},
)
imp2_node_key = generate_pseudo_unique_key(mynet.nodes())
mynet.add_import_node(
node_key=imp2_node_key,
prices={
(q, p, k): ResourcePrice(prices=2, volumes=None)
for p in range(number_periods)
for k in range(number_intervals)
},
)
# other nodes
node_A = generate_pseudo_unique_key(mynet.nodes())
mynet.add_waypoint_node(node_key=node_A)
node_B = generate_pseudo_unique_key(mynet.nodes())
mynet.add_source_sink_node(
node_key=node_B,
base_flow={
(q, 0): 1.0,
},
)
# add arcs
# I1A
mynet.add_preexisting_directed_arc(
node_key_a=imp1_node_key,
node_key_b=node_A,
efficiency=None,
static_loss=None,
capacity=1,
capacity_is_instantaneous=False,
)
# I2B
mynet.add_preexisting_directed_arc(
node_key_a=imp2_node_key,
node_key_b=node_B,
efficiency=None,
static_loss=None,
capacity=0.1,
capacity_is_instantaneous=False,
)
# AB
mynet.add_preexisting_directed_arc(
node_key_a=node_A,
node_key_b=node_B,
efficiency=None,
static_loss={(0, q, 0): 0.1},
capacity=1,
capacity_is_instantaneous=False,
)
# identify node types
mynet.identify_node_types()
# no sos, regular time intervals
ipp = self.build_solve_ipp(
solver='cbc', # TODO: make this work with other solvers
7095
7096
7097
7098
7099
7100
7101
7102
7103
7104
7105
7106
7107
7108
7109
7110
7111
7112
7113
7114
7115
7116
7117
7118
7119
7120
7121
7122
7123
7124
7125
7126
7127
solver_options={},
plot_results=False, # True,
print_solver_output=False,
time_frame=tf,
networks={"mynet": mynet},
static_losses_mode=InfrastructurePlanningProblem.STATIC_LOSS_MODE_DEP,
mandatory_arcs=[],
max_number_parallel_arcs={}
)
# all arcs should be installed (they are not new)
assert (
True
in ipp.networks["mynet"]
.edges[(imp1_node_key, node_A, 0)][Network.KEY_ARC_TECH]
.options_selected
)
assert (
True
in ipp.networks["mynet"]
.edges[(imp2_node_key, node_B, 0)][Network.KEY_ARC_TECH]
.options_selected
)
assert (
True
in ipp.networks["mynet"]
.edges[(node_A, node_B, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# overview
(imports_qpk,
exports_qpk,
balance_qpk,
import_costs_qpk,
export_revenue_qpk,
ncf_qpk,
aggregate_static_demand_qpk,
aggregate_static_supply_qpk,
aggregate_static_balance_qpk) = statistics(ipp)
# there should be imports
abs_tol = 1e-6
imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0)
assert math.isclose(imports_qp, 1.1, abs_tol=abs_tol)
# there should be no exports
abs_tol = 1e-6
exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q])
export_revenue_qp = sum(export_revenue_qpk[(q, 0, k)] for k in tf.time_intervals[q])
assert math.isclose(exports_qp, 0, abs_tol=abs_tol)
7149
7150
7151
7152
7153
7154
7155
7156
7157
7158
7159
7160
7161
7162
7163
7164
7165
7166
7167
7168
7169
7170
# interval 0: flow through IA1 must be 1
abs_tol = 1e-6
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", imp1_node_key, node_A, 0, 0, 0)]),
1,
abs_tol=abs_tol,
)
# interval 0: flow through AB must be 0.9
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_B, 0, 0, 0)]),
0.9,
abs_tol=abs_tol,
)
# interval 0: flow through IB2 must be 0.1
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", imp2_node_key, node_B, 0, 0, 0)]),
0.1,
abs_tol=abs_tol,
)
# *************************************************************************
# *************************************************************************
def test_directed_arc_static_downstream_new(self):
# time
q = 0
tf = EconomicTimeFrame(
discount_rate=3.5/100,
reporting_periods={q: (0,1)},
reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)},
time_intervals={q: (0,)},
time_interval_durations={q: (1,)},
)
number_periods = 2
# 4 nodes: one import, one export, two supply/demand nodes
mynet = Network()
Pedro L. Magalhães
committed
imp_node_key = 'thatimpnode'
mynet.add_import_node(
node_key=imp_node_key,
prices={
(q, p, k): ResourcePrice(prices=1 + 0.1, volumes=None)
for p in range(number_periods)
for k in range(number_intervals)
},
)
# other nodes
Pedro L. Magalhães
committed
node_A = 'A'
mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 1.0})
# add arcs
# IA1
mynet.add_preexisting_directed_arc(
node_key_a=imp_node_key,
node_key_b=node_A,
efficiency={(q, 0): 0.9},
static_loss={(q, 0, 0): 0.1},
capacity=0.5,
capacity_is_instantaneous=False,
)
# IA2
arcs_ia2 = Arcs(
name="IA2",
efficiency=None,
efficiency_reverse=None,
static_loss=None,
capacity=tuple([1.2]),
minimum_cost=tuple([0.1]),
specific_capacity_cost=0,
capacity_is_instantaneous=False,
validate=True,
)
mynet.add_directed_arc(node_key_a=imp_node_key, node_key_b=node_A, arcs=arcs_ia2)
mynet.identify_node_types()
ipp = self.build_solve_ipp(
solver='cbc', # TODO: make this work with other solvers
solver_options={},
plot_results=False, # True,
print_solver_output=False,
networks={"mynet": mynet},
time_frame=tf,
# static_losses_mode=True,
static_losses_mode=InfrastructurePlanningProblem.STATIC_LOSS_MODE_ARR,
mandatory_arcs=[],
# **************************************************************************
# all arcs should be installed (they are not new)
assert (
True
in ipp.networks["mynet"]
.edges[(imp_node_key, node_A, 0)][Network.KEY_ARC_TECH]
.options_selected
)
assert (
True
in ipp.networks["mynet"]
.edges[(imp_node_key, node_A, 1)][Network.KEY_ARC_TECH]
.options_selected
)
# overview
(imports_qpk,
exports_qpk,
balance_qpk,
import_costs_qpk,
export_revenue_qpk,
ncf_qpk,
aggregate_static_demand_qpk,
aggregate_static_supply_qpk,
aggregate_static_balance_qpk) = statistics(ipp)
# there should be imports
abs_tol = 1e-6
imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0)
assert math.isclose(imports_qp, (1.0 + 0.1), abs_tol=abs_tol)
# there should be no exports
exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q])
export_revenue_qp = sum(export_revenue_qpk[(q, 0, k)] for k in tf.time_intervals[q])
assert math.isclose(exports_qp, 0, abs_tol=abs_tol)
abs_tol = 1e-6
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", imp_node_key, node_A, 0, 0, 0)]),
abs_tol=abs_tol,
)
7300
7301
7302
7303
7304
7305
7306
7307
7308
7309
7310
7311
7312
7313
7314
7315
7316
7317
7318
7319
7320
7321
7322
7323
7324
7325
7326
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", imp_node_key, node_A, 1, 0, 0)]),
1.0,
abs_tol=abs_tol,
)
# *************************************************************************
# *************************************************************************
def test_directed_arc_static_downstream_pre(self):
# time
q = 0
tf = EconomicTimeFrame(
discount_rate=3.5/100,
reporting_periods={q: (0,1)},
reporting_period_durations={q: (365 * 24 * 3600, 365 * 24 * 3600)},
time_intervals={q: (0,)},
time_interval_durations={q: (1,)},
)
number_intervals = 1
number_periods = 2
# 4 nodes: one import, one export, two supply/demand nodes
mynet = Network()
# import node
Pedro L. Magalhães
committed
imp_node_key = 'thatimpnode'
mynet.add_import_node(
node_key=imp_node_key,
prices={
(q, p, k): ResourcePrice(prices=1 + 0.1, volumes=None)
for p in range(number_periods)
for k in range(number_intervals)
},
)
# other nodes
Pedro L. Magalhães
committed
node_A = 'A'
7339
7340
7341
7342
7343
7344
7345
7346
7347
7348
7349
7350
7351
7352
7353
7354
7355
7356
7357
7358
7359
7360
7361
7362
7363
7364
7365
7366
mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 1.0})
# add arcs
# IA1
mynet.add_preexisting_directed_arc(
node_key_a=imp_node_key,
node_key_b=node_A,
efficiency={(q, 0): 0.9},
static_loss={(q, 0, 0): 0.1},
capacity=0.5,
capacity_is_instantaneous=False,
)
# IA2
mynet.add_preexisting_directed_arc(
node_key_a=imp_node_key,
node_key_b=node_A,
efficiency=None,
static_loss=None,
capacity=1.2,
capacity_is_instantaneous=False,
)
# identify node types
mynet.identify_node_types()
# no sos, regular time intervals
ipp = self.build_solve_ipp(
solver='cbc', # TODO: make this work with other solvers
solver_options={},
plot_results=False, # True,
print_solver_output=False,
networks={"mynet": mynet},
time_frame=tf,
# static_losses_mode=True,
static_losses_mode=InfrastructurePlanningProblem.STATIC_LOSS_MODE_ARR,
7375
7376
7377
7378
7379
7380
7381
7382
7383
7384
7385
7386
7387
7388
7389
7390
7391
7392
7393
7394
7395
7396
7397
mandatory_arcs=[],
max_number_parallel_arcs={},
)
# **************************************************************************
# all arcs should be installed (they are not new)
assert (
True
in ipp.networks["mynet"]
.edges[(imp_node_key, node_A, 0)][Network.KEY_ARC_TECH]
.options_selected
)
assert (
True
in ipp.networks["mynet"]
.edges[(imp_node_key, node_A, 1)][Network.KEY_ARC_TECH]
.options_selected
)
# overview
(imports_qpk,
exports_qpk,
balance_qpk,
import_costs_qpk,
export_revenue_qpk,
ncf_qpk,
aggregate_static_demand_qpk,
aggregate_static_supply_qpk,
aggregate_static_balance_qpk) = statistics(ipp)
# there should be imports
abs_tol = 1e-6
imports_qp = sum(imports_qpk[qpk] for qpk in tf.qpk() if qpk[1] == 0)
assert math.isclose(imports_qp, (1.0 + 0.1), abs_tol=abs_tol)
# there should be no exports
abs_tol = 1e-6
exports_qp = sum(exports_qpk[(q, 0, k)] for k in tf.time_intervals[q])
assert math.isclose(exports_qp, 0, abs_tol=abs_tol)
7420
7421
7422
7423
7424
7425
7426
7427
7428
7429
7430
7431
7432
7433
7434
7435
7436
7437
7438
7439
7440
7441
7442
7443
7444
7445
7446
7447
7448
7449
7450
7451
7452
7453
7454
7455
7456
7457
7458
7459
7460
7461
7462
7463
7464
7465
7466
7467
7468
7469
7470
7471
7472
7473
7474
7475
7476
7477
7478
7479
7480
7481
7482
7483
7484
7485
7486
7487
7488
7489
7490
7491
7492
7493
7494
7495
7496
7497
7498
7499
7500
7501
7502
7503
7504
7505
7506
7507
7508
7509
7510
7511
7512
7513
7514
7515
7516
7517
7518
7519
7520
7521
7522
7523
7524
7525
7526
7527
7528
7529
7530
7531
7532
7533
7534
7535
7536
7537
7538
7539
7540
7541
7542
7543
7544
7545
7546
7547
7548
7549
7550
7551
7552
7553
7554
7555
7556
7557
7558
7559
7560
7561
7562
7563
7564
7565
7566
7567
7568
7569
7570
7571
7572
7573
7574
7575
7576
7577
7578
7579
7580
7581
7582
7583
7584
7585
7586
7587
7588
7589
7590
7591
7592
7593
# flow through IA1 must be 0.1
abs_tol = 1e-6
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", imp_node_key, node_A, 0, 0, 0)]),
0.1,
abs_tol=abs_tol,
)
# flow through IA2 must be 1.0
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", imp_node_key, node_A, 1, 0, 0)]),
1.0,
abs_tol=abs_tol,
)
# *************************************************************************
# *************************************************************************
# def test_problem_converter_sink(self):
# # assessment
# q = 0
# tf = EconomicTimeFrame(
# discount_rate=3.5/100,
# reporting_periods={q: (0,)},
# reporting_period_durations={q: (365 * 24 * 3600,)},
# time_intervals={q: (0,1,2)},
# time_interval_durations={q: (1,1,1)},
# )
# # 2 nodes: one import, one regular
# mynet = Network()
# # import node
# node_IMP = generate_pseudo_unique_key(mynet.nodes())
# mynet.add_import_node(
# node_key=node_IMP,
# prices={
# qpk: ResourcePrice(prices=1.0, volumes=None)
# for qpk in tf.qpk()
# },
# )
# # other nodes
# node_A = generate_pseudo_unique_key(mynet.nodes())
# mynet.add_source_sink_node(
# node_key=node_A,
# base_flow={(q, 0): 0.50, (q, 1): 0.00, (q, 2): 1.00},
# )
# # arc IA
# arc_tech_IA = Arcs(
# name="any",
# # efficiency=[0.5, 0.5, 0.5],
# efficiency={(q, 0): 0.5, (q, 1): 0.5, (q, 2): 0.5},
# efficiency_reverse=None,
# static_loss=None,
# capacity=[3],
# minimum_cost=[2],
# specific_capacity_cost=1,
# capacity_is_instantaneous=False,
# validate=False,
# )
# mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)
# # identify node types
# mynet.identify_node_types()
# # converter
# a_nnk = {
# (0, 0, 0): 0.95,
# (0, 0, 1): 0.95,
# (0, 0, 2): 0.95,
# }
# b_nmk = {
# (0, 0, 0): 3,
# (0, 0, 1): 3,
# (0, 0, 2): 3
# }
# x_n0 = {0: 18}
# # get the signals
# inputs, states, outputs = get_two_node_model_signals(
# tf.number_time_intervals(q)
# )
# # create a discretised dynamic system from dictionaries
# dds = dynsys.DiscretisedDynamicSystem(
# a_nnk=a_nnk,
# b_nmk=b_nmk,
# x_n0=x_n0,
# time_frame=tf
# )
# # create a converter
# cvt = Converter(
# time_frame=tf,
# dds=dds,
# turn_key_cost=3,
# inputs=inputs,
# states=states,
# outputs=outputs,
# )
# # no sos, regular time intervals
# ipp = self.build_solve_ipp(
# solver_options={},
# perform_analysis=False,
# plot_results=False, # True,
# print_solver_output=False,
# time_frame=tf,
# networks={"mynet": mynet},
# converters={"mycvt": cvt},
# static_losses_mode=False,
# mandatory_arcs=[],
# max_number_parallel_arcs={},
# # init_aux_sets=init_aux_sets,
# simplify_problem=False,
# )
# assert not ipp.has_peak_total_assessments()
# assert ipp.results["Problem"][0]["Number of constraints"] == 24
# assert ipp.results["Problem"][0]["Number of variables"] == 22
# assert ipp.results["Problem"][0]["Number of nonzeros"] == 49
# # *********************************************************************
# # *********************************************************************
# # validation
# # if uC,M 1,q,0 = 0, then xC,N 1,q,1 = 17.1 # infeasible
# # if uC,M 1,q,0 = 1, then xC,N 1,q,1 = 20.1. # only feasible option
# # if uC,M 1,q,1 = 0, then xC,N 1,q,2 = 19.095 # only feasible option
# # if uC,M 1,q,1 = 1, then xC,N 1,q,2 = 22.095 # infeasible
# # if uC,M 1,q,2 = 0, then xC,N 1,q,3 = 18.14025 # feasible
# # if uC,M 1,q,2 = 1, then xC,N 1,q,3 = 21.14025 # feasible
# true_u_imqk = {
# ('mycvt', 0, q, 0): 1,
# ('mycvt', 0, q, 1): 0,
# ('mycvt', 0, q, 2): 0, # could also be 1
# }
# true_x_inqk = {
# ('mycvt', 0, q, 0): 20.1,
# ('mycvt', 0, q, 1): 19.095,
# ('mycvt', 0, q, 2): 18.14025, # could also be 21.14025
# }
# # check the inputs
# for imqk, u in true_u_imqk.items():
# assert math.isclose(
# pyo.value(ipp.instance.var_u_imqk[imqk]),
# u,
# abs_tol=1e-6,
# )
# # check the states
# for inqk, x in true_x_inqk.items():
# assert math.isclose(
# pyo.value(ipp.instance.var_x_inqk[inqk]),
# x,
# abs_tol=1e-6,
# )
# *************************************************************************
# *************************************************************************
# TODO: test non-simplifiable problems with time varying prices on select assessments
# TODO: test non-simplifiable problems with volume varying prices on select assessments
# *************************************************************************
# *************************************************************************
def test_problem_with_tree_network(self):
# assessment
q = 0
tf = EconomicTimeFrame(
discount_rate=3.5/100,
reporting_periods={q: (0,)},
reporting_period_durations={q: (365 * 24 * 3600,)},
time_intervals={q: (0,)},
time_interval_durations={q: (1,)},
)
# 2 nodes: one import, one regular
Pedro L. Magalhães
committed
mynet = Network(network_type=Network.NET_TYPE_TREE)
7613
7614
7615
7616
7617
7618
7619
7620
7621
7622
7623
7624
7625
7626
7627
7628
7629
7630
7631
7632
7633
7634
7635
7636
7637
7638
7639
7640
7641
7642
7643
7644
7645
7646
7647
7648
7649
7650
7651
7652
7653
7654
7655
7656
7657
7658
7659
7660
7661
7662
7663
7664
7665
7666
7667
7668
7669
7670
7671
7672
7673
7674
7675
7676
7677
7678
7679
7680
7681
7682
7683
7684
7685
7686
7687
7688
7689
7690
7691
7692
7693
7694
7695
7696
7697
# import node
node_IMP = "thatimpnode"
mynet.add_import_node(
node_key=node_IMP,
prices={
qpk: ResourcePrice(prices=1.0, volumes=None)
for qpk in tf.qpk()
},
)
# node A
node_A = "thatnodea"
mynet.add_source_sink_node(
node_key=node_A,
base_flow={(q, 0): 0.50},
)
# node B
node_B = "thatnodeb"
mynet.add_source_sink_node(
node_key=node_B,
base_flow={(q, 0): 0.25},
)
# node C
node_C = "thatnodec"
mynet.add_source_sink_node(
node_key=node_C,
base_flow={(q, 0): 1.25},
)
list_imp_arcs = [
(node_IMP, node_A), # IA
(node_IMP, node_B), # IB
(node_IMP, node_C), # IC
]
for i, node_pair in enumerate(list_imp_arcs):
# import arcs: IA, IB, IC
new_arc = Arcs(
name="IA",
efficiency=None,
efficiency_reverse=None,
static_loss=None,
capacity=[2],
minimum_cost=[6],
specific_capacity_cost=i,
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(*node_pair, arcs=new_arc)
# arcs: AB, BA, BC, CB, AC, CA
list_other_arcs = [
(node_A, node_B), # AB
(node_B, node_A), # BA
(node_B, node_C), # BC
(node_C, node_B), # CB
(node_A, node_C), # AC
(node_C, node_A), # CA
]
for node_pair in list_other_arcs:
# arc
new_arc_tech = Arcs(
name="any",
efficiency=None,
efficiency_reverse=None,
static_loss=None,
capacity=[3],
minimum_cost=[2],
specific_capacity_cost=0,
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(*node_pair, arcs=new_arc_tech)
# identify node types
mynet.identify_node_types()
# no sos, regular time intervals
ipp = self.build_solve_ipp(
solver_options={},
perform_analysis=False,
plot_results=False, # True,
print_solver_output=False,
time_frame=tf,
networks={"mynet": mynet},
static_losses_mode=True, # just to reach a line,
mandatory_arcs=[],
max_number_parallel_arcs={},
simplify_problem=True,
)
assert ipp.has_peak_total_assessments()
Pedro L. Magalhães
committed
assert ipp.results["Problem"][0]["Number of constraints"] == 61
7708
7709
7710
7711
7712
7713
7714
7715
7716
7717
7718
7719
7720
7721
7722
7723
7724
7725
7726
7727
7728
7729
7730
7731
7732
7733
7734
7735
7736
7737
7738
7739
assert ipp.results["Problem"][0]["Number of variables"] == 53
assert ipp.results["Problem"][0]["Number of nonzeros"] == 143
# *********************************************************************
# *********************************************************************
# validation
# only the IA arc should be installed
true_imp_arcs_selected = [True, False, False]
for node_pair, true_arc_decision in zip(list_imp_arcs, true_imp_arcs_selected):
assert (
true_arc_decision
in ipp.networks["mynet"]
.edges[(*node_pair, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# only two arcs between A, B and C can be installed
arcs_selected = tuple(
1
for node_pair in list_other_arcs
if True in ipp.networks["mynet"]
.edges[(*node_pair, 0)][Network.KEY_ARC_TECH]
.options_selected
)
assert sum(arcs_selected) == 2
# the network must be tree-shaped
assert ipp.networks["mynet"].has_tree_topology()
# capex
assert math.isclose(pyo.value(ipp.instance.var_capex), 10.0, abs_tol=1e-3)
# the objective function
assert math.isclose(pyo.value(ipp.instance.obj_f), -1.193236715e+01, abs_tol=1e-3)
# *************************************************************************
# *************************************************************************
def test_problem_with_reverse_tree_network(self):
# assessment
q = 0
tf = EconomicTimeFrame(
discount_rate=3.5/100,
reporting_periods={q: (0,)},
reporting_period_durations={q: (365 * 24 * 3600,)},
time_intervals={q: (0,)},
time_interval_durations={q: (1,)},
)
# 2 nodes: one import, one regular
Pedro L. Magalhães
committed
mynet = Network(network_type=Network.NET_TYPE_REV_TREE)
7760
7761
7762
7763
7764
7765
7766
7767
7768
7769
7770
7771
7772
7773
7774
7775
7776
7777
7778
7779
7780
7781
7782
7783
7784
7785
7786
7787
7788
# export node
node_EXP = "thatexpnode"
mynet.add_export_node(
node_key=node_EXP,
prices={
qpk: ResourcePrice(prices=1.0, volumes=None)
for qpk in tf.qpk()
},
)
# node A
node_A = "thatnodea"
mynet.add_source_sink_node(
node_key=node_A,
base_flow={(q, 0): -0.50},
)
# node B
node_B = "thatnodeb"
mynet.add_source_sink_node(
node_key=node_B,
base_flow={(q, 0): -0.25},
)
# node C
node_C = "thatnodec"
mynet.add_source_sink_node(
node_key=node_C,
base_flow={(q, 0): -1.25},
)
Pedro L. Magalhães
committed
list_exp_arcs = [
(node_A, node_EXP), # AE
(node_B, node_EXP), # BE
(node_C, node_EXP), # CE
]
Pedro L. Magalhães
committed
for i, node_pair in enumerate(list_exp_arcs):
7795
7796
7797
7798
7799
7800
7801
7802
7803
7804
7805
7806
7807
7808
7809
7810
7811
7812
7813
7814
7815
7816
7817
7818
7819
7820
7821
7822
7823
7824
7825
7826
7827
7828
7829
7830
7831
7832
7833
7834
7835
7836
7837
7838
7839
7840
7841
7842
7843
7844
7845
7846
7847
7848
7849
7850
7851
7852
# import arcs: AE, BE, CE
new_arc = Arcs(
name="arc_"+str(node_pair),
efficiency=None,
efficiency_reverse=None,
static_loss=None,
capacity=[2],
minimum_cost=[6],
specific_capacity_cost=i,
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(*node_pair, arcs=new_arc)
# arcs: AB, BA, BC, CB, AC, CA
list_other_arcs = [
(node_A, node_B), # AB
(node_B, node_A), # BA
(node_B, node_C), # BC
(node_C, node_B), # CB
(node_A, node_C), # AC
(node_C, node_A), # CA
]
for node_pair in list_other_arcs:
# arc
new_arc_tech = Arcs(
name="any",
efficiency=None,
efficiency_reverse=None,
static_loss=None,
capacity=[3],
minimum_cost=[2],
specific_capacity_cost=0,
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(*node_pair, arcs=new_arc_tech)
# identify node types
mynet.identify_node_types()
# no sos, regular time intervals
ipp = self.build_solve_ipp(
solver_options={},
perform_analysis=False,
plot_results=False, # True,
print_solver_output=False,
time_frame=tf,
networks={"mynet": mynet},
static_losses_mode=True, # just to reach a line,
mandatory_arcs=[],
max_number_parallel_arcs={},
simplify_problem=True,
)
assert ipp.has_peak_total_assessments()
Pedro L. Magalhães
committed
assert ipp.results["Problem"][0]["Number of constraints"] == 61
assert ipp.results["Problem"][0]["Number of variables"] == 53
assert ipp.results["Problem"][0]["Number of nonzeros"] == 143 #
# *********************************************************************
# *********************************************************************
# validation
Pedro L. Magalhães
committed
# only the AE arc should be installed
true_exp_arcs_selected = [True, False, False]
for node_pair, true_arc_decision in zip(list_exp_arcs, true_exp_arcs_selected):
7865
7866
7867
7868
7869
7870
7871
7872
7873
7874
7875
7876
7877
7878
7879
7880
7881
7882
7883
7884
7885
assert (
true_arc_decision
in ipp.networks["mynet"]
.edges[(*node_pair, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# only two arcs between A, B and C can be installed
arcs_selected = tuple(
1
for node_pair in list_other_arcs
if True in ipp.networks["mynet"]
.edges[(*node_pair, 0)][Network.KEY_ARC_TECH]
.options_selected
)
assert sum(arcs_selected) == 2
# the network must be tree-shaped
assert ipp.networks["mynet"].has_tree_topology()
# capex
assert math.isclose(pyo.value(ipp.instance.var_capex), 10.0, abs_tol=1e-3)
# the objective function
assert math.isclose(pyo.value(ipp.instance.obj_f), -(10+(-11.93236715+10)), abs_tol=1e-3)
# *****************************************************************************
# *****************************************************************************