Coverage for wsimod\arcs\arcs.py: 29%

303 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-30 14:55 +0000

1# -*- coding: utf-8 -*- 

2"""Created on Wed Apr 7 08:43:32 2021. 

3 

4@author: Barney 

5 

6Converted to totals on Thur Apr 21 2022 

7""" 

8 

9from typing import Any, Dict 

10 

11from wsimod.core import constants 

12from wsimod.core.core import DecayObj, WSIObj 

13 

14# from wsimod.nodes import nodes #Complains about circular imports. 

15# I don't think it should do.. 

16 

17 

18class Arc(WSIObj): 

19 """""" 

20 

21 def __init__( 

22 self, 

23 name="", 

24 capacity=constants.UNBOUNDED_CAPACITY, 

25 preference=1, 

26 in_port=None, 

27 out_port=None, 

28 **kwargs, 

29 ): 

30 """Arc objects are the way for information to be passed between nodes in WSIMOD. 

31 They have an in_port (where a message comes from) and an out_port (where a 

32 message goes to). 

33 

34 Returns: 

35 name (str): Name of arc. Defaults to ''. 

36 capacity (float): Capacity of flow along an arc (vol/timestep). 

37 Defaults to constants.UNBOUNDED_CAPACITY. 

38 preference (float): Number used to prioritise or deprioritise use of an arc 

39 when flexibility exists 

40 in_port: A WSIMOD node object where the arc starts 

41 out_port: A WSIMOD node object where the arc ends 

42 """ 

43 # Default essential parameters 

44 self.name = name 

45 self.in_port = in_port 

46 self.out_port = out_port 

47 self.capacity = capacity 

48 self.preference = preference 

49 

50 # Update args 

51 WSIObj.__init__(self) 

52 self.__dict__.update(kwargs) 

53 

54 # def all_subclasses(cls): 

55 # return set(cls.__subclasses__()).union( 

56 # [s for c in cls.__subclasses__() for s in all_subclasses(c)]) 

57 # node_types = [x.__name__ for x in all_subclasses(nodes.Node)] + ['Node'] 

58 

59 # if self.name in node_types: 

60 # print('Warning: arc name should not take a node class name') 

61 # #TODO... not sure why... also currently commented for import issues.. 

62 

63 # Initialise states 

64 self.flow_in = 0 

65 self.flow_out = 0 

66 self.vqip_in = self.empty_vqip() 

67 self.vqip_out = self.empty_vqip() 

68 

69 # Update ports 

70 self.in_port.out_arcs[self.name] = self 

71 self.out_port.in_arcs[self.name] = self 

72 

73 out_type = self.out_port.__class__.__name__ 

74 in_type = self.in_port.__class__.__name__ 

75 

76 if hasattr(self.in_port, "out_arcs_type"): 

77 self.in_port.out_arcs_type[out_type][self.name] = self 

78 

79 if hasattr(self.out_port, "in_arcs_type"): 

80 self.out_port.in_arcs_type[in_type][self.name] = self 

81 

82 # Mass balance checking 

83 self.mass_balance_in = [lambda: self.vqip_in] 

84 self.mass_balance_out = [lambda: self.vqip_out] 

85 self.mass_balance_ds = [lambda: self.empty_vqip()] 

86 

87 def apply_overrides(self, overrides: Dict[str, Any] = {}) -> None: 

88 """Apply overrides to the node. 

89 

90 Args: 

91 overrides (dict, optional): Dictionary of overrides. Defaults to {}. 

92 """ 

93 self.capacity = overrides.pop("capacity", self.capacity) 

94 self.preference = overrides.pop("preference", self.preference) 

95 if len(overrides) > 0: 

96 print(f"No override behaviour defined for: {overrides.keys()}") 

97 

98 def arc_mass_balance(self): 

99 """Checks mass balance for inflows/outflows/storage change in an arc. 

100 

101 Returns: 

102 in_ (dict) Total vqip of vqip_in and other inputs in mass_balance_in 

103 ds_ (dict): Total vqip of change in arc in mass_balance_ds 

104 out_ (dict): Total vqip of vqip_out and other outputs in mass_balance_out 

105 

106 Examples: 

107 arc_in, arc_out, arc_ds = my_arc.arc_mass_balance() 

108 """ 

109 in_, ds_, out_ = self.mass_balance() 

110 return in_, ds_, out_ 

111 

112 def send_push_request(self, vqip, tag="default", force=False): 

113 """Function used to transmit a push request from one node (in_port) to another 

114 node (out_port). 

115 

116 Args: 

117 vqip (dict): A dict VQIP of water to push 

118 tag (str, optional): optional message to direct the out_port's query_ 

119 handler which function to call. Defaults to 'default'. 

120 force (bool, optional): Argument used to cause function to ignore tank 

121 capacity of out_port, possibly resulting in pooling. Should not be used 

122 unless 

123 out_port is a tank object. Defaults to False. 

124 

125 Returns: 

126 (dict): A VQIP amount of water that was not successfully pushed 

127 """ 

128 vqip = self.copy_vqip(vqip) 

129 

130 # Apply pipe capacity 

131 if force: 

132 not_pushed = self.empty_vqip() 

133 else: 

134 excess_in = self.get_excess(direction="push", vqip=vqip, tag=tag) 

135 not_pushed = self.v_change_vqip( 

136 vqip, max(vqip["volume"] - excess_in["volume"], 0) 

137 ) 

138 

139 # Don't attempt to send volume that exceeds capacity 

140 vqip = self.extract_vqip(vqip, not_pushed) 

141 

142 # Set push 

143 reply = self.out_port.push_set(vqip, tag) 

144 

145 # Update total amount successfully sent 

146 vqip = self.extract_vqip(vqip, reply) 

147 

148 # Combine non-sent water 

149 reply = self.sum_vqip(reply, not_pushed) 

150 

151 # Update mass balance 

152 self.flow_in += vqip["volume"] 

153 self.flow_out = self.flow_in 

154 

155 self.vqip_in = self.sum_vqip(self.vqip_in, vqip) 

156 self.vqip_out = self.vqip_in 

157 

158 return reply 

159 

160 def send_pull_request(self, vqip, tag="default"): 

161 """Function used to transmit a pull request from one node (in_port) to another 

162 node (out_port). 

163 

164 Args: 

165 vqip (dict): A dict VQIP of water to pull (by default, only 'volume' key is 

166 used) 

167 tag (str, optional): optional message to direct the out_port's query_handler 

168 which 

169 function to call. Defaults to 'default'. 

170 

171 Returns: 

172 (dict): A VQIP amount of water that was successfully pulled 

173 """ 

174 volume = vqip["volume"] 

175 # Apply pipe capacity 

176 excess_in = self.get_excess(direction="pull", vqip=vqip, tag=tag)["volume"] 

177 not_pulled = max(volume - excess_in, 0) 

178 volume -= not_pulled 

179 

180 if volume > 0: 

181 for pol in constants.ADDITIVE_POLLUTANTS: 

182 if pol in vqip.keys(): 

183 vqip[pol] *= volume / vqip["volume"] 

184 

185 vqip["volume"] = volume 

186 

187 # Make pull 

188 vqip = self.in_port.pull_set(vqip, tag) 

189 

190 # Update mass balance 

191 self.flow_in += vqip["volume"] 

192 self.flow_out = self.flow_in 

193 

194 self.vqip_in = self.sum_vqip(self.vqip_in, vqip) 

195 self.vqip_out = self.vqip_in 

196 

197 return vqip 

198 

199 def send_push_check(self, vqip=None, tag="default"): 

200 """Function used to transmit a push check from one node (in_port) to another 

201 node (out_port). 

202 

203 Args: 

204 vqip (dict): A dict VQIP of water to push that can be specified. Defaults to 

205 None, which returns maximum capacity to push. 

206 tag (str, optional): optional message to direct the out_port's 

207 query_handler which function to call. Defaults to 'default'. 

208 

209 Returns: 

210 (dict): A VQIP amount of water that could be pushed 

211 """ 

212 return self.get_excess(direction="push", vqip=vqip, tag=tag) 

213 

214 def send_pull_check(self, vqip=None, tag="default"): 

215 """Function used to transmit a pull check from one node (in_port) to another 

216 node (out_port). 

217 

218 Args: 

219 vqip (dict): A dict VQIP of water to pull that can be specified (by default, 

220 only the 'volume' key is used). Defaults to None, which returns all 

221 available water to pull. 

222 tag (str, optional): optional message to direct the out_port's 

223 query_handler which function to call. Defaults to 'default'. 

224 

225 Returns: 

226 (dict): A VQIP amount of water that could be pulled 

227 """ 

228 return self.get_excess(direction="pull", vqip=vqip, tag=tag) 

229 

230 def get_excess(self, direction, vqip=None, tag="default"): 

231 """Calculate how much could be pull/pulled along the arc by combining both arc 

232 capacity and out_port check information. 

233 

234 Args: 

235 direction (str): should be 'pull' or 'push' 

236 vqip (dict, optional): A VQIP amount to push/pull that can be 

237 specified. Defaults to None, which returns all available water to 

238 pull or maximum capacity to push (depending on 'direction'). 

239 tag (str, optional): optional message to direct the out_port's query_handler 

240 which function to call. Defaults to 'default'. 

241 

242 Returns: 

243 (dict): A VQIP amount of water that could be pulled/pushed 

244 """ 

245 # Pipe capacity 

246 pipe_excess = self.capacity - self.flow_in 

247 

248 # Node capacity 

249 if direction == "push": 

250 node_excess = self.out_port.push_check(vqip, tag) 

251 elif direction == "pull": 

252 node_excess = self.in_port.pull_check(vqip, tag) 

253 excess = min(pipe_excess, node_excess["volume"]) 

254 

255 # TODO sensible to min(vqip, excess) here? (though it should be applied by node) 

256 

257 return self.v_change_vqip(node_excess, excess) 

258 

259 def end_timestep(self): 

260 """End timestep in an arc, resetting flow/vqip in/out (which determine) the 

261 capacity for that timestep.""" 

262 self.vqip_in = self.empty_vqip() 

263 self.vqip_out = self.empty_vqip() 

264 self.flow_in = 0 

265 self.flow_out = 0 

266 

267 def reinit(self): 

268 """Reinitiatilise.""" 

269 self.end_timestep() 

270 

271 

272class QueueArc(Arc): 

273 """""" 

274 

275 def __init__(self, number_of_timesteps=0, **kwargs): 

276 """A queue arc that stores each push or pull individually in the queue. Enables 

277 implementation of travel time. A fixed number of timesteps can be specified as a 

278 parameter, and additional number of timesteps can be specified when the requests 

279 are made. 

280 

281 The queue is a list of requests, where their travel time is decremented 

282 by 1 each timestep. Any requests with a travel time of 0 will be sent 

283 onwards if the 'update_queue' function is called. 

284 

285 Args: 

286 number_of_timesteps (int, optional): Fixed number of timesteps that 

287 it takes to traverse the arc. Defaults to 0. 

288 """ 

289 self.number_of_timesteps = number_of_timesteps 

290 self.queue = [] 

291 super().__init__(**kwargs) 

292 

293 self.queue_storage = self.empty_vqip() 

294 self.queue_storage_ = self.empty_vqip() 

295 

296 self.mass_balance_ds.append(lambda: self.queue_arc_ds()) 

297 

298 def queue_arc_ds(self): 

299 """Calculate change in amount of water and other pollutants in the arc. 

300 

301 Returns: 

302 (dict): A VQIP amount of change 

303 """ 

304 self.queue_storage = self.queue_arc_sum() 

305 return self.extract_vqip(self.queue_storage, self.queue_storage_) 

306 

307 def queue_arc_sum(self): 

308 """Sum the total water in the requests in the queue of the arc. 

309 

310 Returns: 

311 (dict): A VQIP amount of water/pollutants in the arc 

312 """ 

313 queue_storage = self.empty_vqip() 

314 for request in self.queue: 

315 queue_storage = self.sum_vqip(queue_storage, request["vqip"]) 

316 return queue_storage 

317 

318 def send_pull_request(self, vqip, tag="default", time=0): 

319 """Function used to transmit a pull request from one node (in_port) to another 

320 node (out_port). Any pulled water is immediately removed from the out_port and 

321 then takes the travel time to be received. This function has not been 

322 extensively tested. 

323 

324 Args: 

325 vqip (_type_): A dict VQIP of water to pull (by default, only 'volume' key 

326 is used) 

327 tag (str, optional): optional message to direct the out_port's query_handler 

328 which function to call. Defaults to 'default'. 

329 time (int, optional): Travel time for request to spend in the arc (in 

330 addition to the arc's 'number_of_timesteps' parameter). Defaults to 0. 

331 

332 Returns: 

333 (dict): A VQIP amount of water that was successfully pulled. 

334 """ 

335 volume = vqip["volume"] 

336 # Apply pipe capacity 

337 excess_in = self.get_excess(direction="pull", vqip=vqip)["volume"] 

338 not_pulled = max(volume - excess_in, 0) 

339 volume -= not_pulled 

340 

341 for pol in constants.ADDITIVE_POLLUTANTS: 

342 if pol in vqip.keys(): 

343 vqip[pol] *= volume / vqip["volume"] 

344 

345 vqip["volume"] = volume 

346 

347 # Make pull 

348 vqip = self.in_port.pull_set(vqip) 

349 

350 # Update to queue request 

351 request = {"time": time + self.number_of_timesteps, "vqip": vqip} 

352 

353 # vqtip enters arc as a request 

354 self.enter_queue(request, direction="pull") 

355 

356 # Update request queue and return pulls from queue 

357 reply = self.update_queue(direction="pull") 

358 return reply 

359 

360 def send_push_request(self, vqip_, tag="default", force=False, time=0): 

361 """Function used to transmit a push request from one node (in_port) to another 

362 node (out_port). 

363 

364 Args: 

365 vqip_ (dict): A dict VQIP of water to push. 

366 tag (str, optional): optional message to direct the out_port's query_handler 

367 which function to call. Defaults to 'default'. 

368 force (bool, optional): Ignore the capacity of the arc (note does not 

369 currently, pass the force argument to the out_port). Defaults to False. 

370 time (int, optional): Travel time for request to spend in the arc (in 

371 addition to the arc's 'number_of_timesteps' parameter). Defaults to 0. 

372 

373 Returns: 

374 (dict): A VQIP amount of water that was not successfully pushed 

375 """ 

376 vqip = self.copy_vqip(vqip_) 

377 

378 if vqip["volume"] < constants.FLOAT_ACCURACY: 

379 return self.empty_vqip() 

380 

381 # Apply pipe capacity 

382 if force: 

383 not_pushed = self.empty_vqip() 

384 else: 

385 excess_in = self.get_excess(direction="push", vqip=vqip, tag=tag) 

386 not_pushed = self.v_change_vqip( 

387 vqip, max(vqip["volume"] - excess_in["volume"], 0) 

388 ) 

389 

390 vqip = self.extract_vqip(vqip, not_pushed) 

391 

392 # Update to queue request 

393 request = {"time": time + self.number_of_timesteps, "vqip": vqip} 

394 

395 # vqtip enters arc as a request 

396 self.enter_queue(request, direction="push", tag=tag) 

397 

398 # Update request queue 

399 backflow = self.update_queue(direction="push") 

400 not_pushed = self.sum_vqip(not_pushed, backflow) 

401 

402 if backflow["volume"] > vqip_["volume"]: 

403 print("more backflow than vqip...") 

404 

405 self.vqip_in = self.extract_vqip(self.vqip_in, backflow) 

406 

407 return not_pushed 

408 

409 def enter_arc(self, request, direction, tag): 

410 """Function used to cause format a request into the format expected by the 

411 enter_queue function. 

412 

413 Args: 

414 request (dict): A dict with a VQIP under the 'vqip' key and the travel 

415 time under the 'time' key. 

416 direction (str): Direction of flow, can be 'push' or 'pull 

417 tag (str, optional): optional message to direct the out_port's 

418 query_handler which function to call. Defaults to 'default'. 

419 

420 Returns: 

421 (dict): The request dict with additional information entered for the queue. 

422 """ 

423 request["average_flow"] = request["vqip"]["volume"] / (request["time"] + 1) 

424 request["direction"] = direction 

425 request["tag"] = tag 

426 

427 self.flow_in += request["average_flow"] 

428 self.vqip_in = self.sum_vqip(self.vqip_in, request["vqip"]) 

429 

430 return request 

431 

432 def enter_queue(self, request, direction=None, tag="default"): 

433 """Add a request into the arc's queue list. 

434 

435 Args: 

436 request (dict): A dict with a VQIP under the 'vqip' key and the travel 

437 time under the 'time' key. 

438 direction (str): Direction of flow, can be 'push' or 'pull 

439 tag (str, optional): optional message to direct the out_port's 

440 query_handler which function to call. Defaults to 'default'. 

441 """ 

442 # Update inflows and format request 

443 request = self.enter_arc(request, direction, tag) 

444 

445 # Enter queue 

446 self.queue.append(request) 

447 

448 def update_queue(self, direction=None, backflow_enabled=True): 

449 """Iterate over all requests in the queue, removing them if they have no volume. 

450 

451 If a request is a push and has 0 travel time remaining then 

452 the push will be triggered at the out_port, if the out_port responds that 

453 it cannot receive the push, then this water will be returned as backflow 

454 (if enabled). 

455 

456 If a request is a pull and has 0 travel time remaining then it is simply summed 

457 with other 0 travel time pull_requests and returned (since the pull is made at 

458 the out_port when the send_pull_request is made). 

459 

460 

461 Args: 

462 direction (str, optional): Direction of flow, can be 'push' or 'pull. 

463 Defaults to None. 

464 backflow_enabled (bool, optional): Enable backflow, described above, if not 

465 enabled then the request will remain in the queue until all water has 

466 been received. Defaults to True. 

467 

468 Returns: 

469 total_backflow (dict): In the case of a push direction, any backflow will be 

470 returned as a VQIP amount 

471 total_removed (dict): In the case of a pull direction, any pulled water will 

472 be returned as a VQIP amount 

473 """ 

474 done_requests = [] 

475 

476 total_removed = self.empty_vqip() 

477 total_backflow = self.empty_vqip() 

478 # Iterate over requests 

479 for request in self.queue: 

480 if request["direction"] == direction: 

481 vqip = request["vqip"] 

482 

483 if vqip["volume"] < constants.FLOAT_ACCURACY: 

484 # Add to queue for removal 

485 done_requests.append(request) 

486 elif request["time"] == 0: 

487 if direction == "push": 

488 # Attempt to push request 

489 reply = self.out_port.push_set(vqip, request["tag"]) 

490 removed = vqip["volume"] - reply["volume"] 

491 

492 elif direction == "pull": 

493 # Water has already been pulled, so assume all received 

494 removed = vqip["volume"] 

495 

496 else: 

497 print("No direction") 

498 

499 # Update outflows 

500 self.flow_out += request["average_flow"] * removed / vqip["volume"] 

501 vqip_ = self.v_change_vqip(vqip, removed) 

502 total_removed = self.sum_vqip(total_removed, vqip_) 

503 

504 # Assume that any water that cannot arrive at destination this 

505 # timestep is backflow 

506 rejected = self.v_change_vqip(vqip, vqip["volume"] - removed) 

507 

508 if backflow_enabled | ( 

509 rejected["volume"] < constants.FLOAT_ACCURACY 

510 ): 

511 total_backflow = self.sum_vqip(rejected, total_backflow) 

512 done_requests.append(request) 

513 else: 

514 request["vqip"] = rejected 

515 

516 self.vqip_out = self.sum_vqip(self.vqip_out, total_removed) 

517 

518 # Remove done requests 

519 for request in done_requests: 

520 self.queue.remove(request) 

521 

522 # return total_removed 

523 if direction == "pull": 

524 return total_removed 

525 elif direction == "push": 

526 return total_backflow 

527 else: 

528 print("No direction") 

529 

530 def end_timestep(self): 

531 """End timestep in an arc, resetting flow/vqip in/out (which determine) the 

532 capacity for that timestep. 

533 

534 Update times of requests in the queue. 

535 """ 

536 self.vqip_in = self.empty_vqip() 

537 self.vqip_out = self.empty_vqip() 

538 self.flow_in = 0 

539 self.flow_out = 0 

540 

541 self.queue_storage_ = self.copy_vqip(self.queue_storage) 

542 self.queue_storage = self.empty_vqip() 

543 

544 for request in self.queue: 

545 request["time"] = max(request["time"] - 1, 0) 

546 

547 # TODO - update_queue here? 

548 

549 def reinit(self): 

550 """""" 

551 self.end_timestep() 

552 self.queue = [] 

553 

554 

555class AltQueueArc(QueueArc): 

556 """""" 

557 

558 def __init__(self, **kwargs): 

559 """A simpler queue arc that has a queue that is a dict where each key is the 

560 travel time. 

561 

562 Cannot be used if arc capacity is dynamic. Cannot be used for pulls. 

563 """ 

564 self.queue_arc_sum = self.alt_queue_arc_sum 

565 

566 super().__init__(**kwargs) 

567 self.queue = {0: self.empty_vqip(), 1: self.empty_vqip()} 

568 self.max_travel = 1 

569 

570 def alt_queue_arc_sum(self): 

571 """Sum the total water in the queue of the arc. 

572 

573 Returns: 

574 (dict): A VQIP amount of water/pollutants in the arc 

575 """ 

576 queue_storage = self.empty_vqip() 

577 for request in self.queue.values(): 

578 queue_storage = self.sum_vqip(queue_storage, request) 

579 return queue_storage 

580 

581 def enter_queue(self, request, direction="push", tag="default"): 

582 """Add a request into the arc's queue. 

583 

584 Args: 

585 request (dict): A dict with a VQIP under the 'vqip' key and the travel 

586 time under the 'time' key. 

587 direction (str): Direction of flow, can be 'push' only. Defaults to 'push' 

588 tag (str, optional): Optional message for out_port's query handler, can be 

589 'default' only. Defaults to 'default'. 

590 """ 

591 # Update inflows and format request 

592 request = self.enter_arc(request, direction, tag) 

593 

594 # Sum into queue 

595 if request["time"] in self.queue.keys(): 

596 self.queue[request["time"]] = self.sum_vqip( 

597 self.queue[request["time"]], request["vqip"] 

598 ) 

599 else: 

600 self.queue[request["time"]] = request["vqip"] 

601 self.max_travel = max(self.max_travel, request["time"]) 

602 

603 def update_queue(self, direction=None, backflow_enabled=True): 

604 """Trigger the push of water in the 0th key for the queue, if the out_port 

605 responds that it cannot receive the push, then this water will be returned as 

606 backflow (if enabled). 

607 

608 Args: 

609 direction (str): Direction of flow, can be 'push' only. Defaults to 'push' 

610 backflow_enabled (bool, optional): Enable backflow, described above, if not 

611 enabled then the request will remain in the queue until all water has 

612 been received. Defaults to True. 

613 

614 Returns: 

615 backflow (dict): In the case of a push direction, any backflow will be 

616 returned as a VQIP amount 

617 """ 

618 # TODO - can this work for pulls?? 

619 

620 total_removed = self.copy_vqip(self.queue[0]) 

621 

622 # Push 0 travel time water 

623 backflow = self.out_port.push_set(total_removed) 

624 

625 if not backflow_enabled: 

626 self.queue[0] = backflow 

627 backflow = self.empty_vqip() 

628 else: 

629 self.queue[0] = self.empty_vqip() 

630 

631 total_removed = self.v_change_vqip( 

632 total_removed, total_removed["volume"] - backflow["volume"] 

633 ) 

634 

635 self.flow_out += total_removed["volume"] 

636 self.vqip_out = self.sum_vqip(self.vqip_out, total_removed) 

637 

638 return backflow 

639 

640 def end_timestep(self): 

641 """End timestep in an arc, resetting flow/vqip in/out (which determine) the 

642 capacity for that timestep. 

643 

644 Update timings in the queue. 

645 """ 

646 self.vqip_in = self.empty_vqip() 

647 self.vqip_out = self.empty_vqip() 

648 self.flow_in = 0 

649 self.flow_out = 0 

650 self.queue_storage_ = self.copy_vqip(self.queue_storage) 

651 self.queue_storage = self.empty_vqip() 

652 

653 queue_ = self.queue.copy() 

654 keys = self.queue.keys() 

655 for i in range(self.max_travel): 

656 if (i + 1) in keys: 

657 self.queue[i] = queue_[i + 1] 

658 self.queue[i + 1] = self.empty_vqip() 

659 

660 self.queue[0] = self.sum_vqip(queue_[0], queue_[1]) 

661 

662 def reinit(self): 

663 """""" 

664 self.end_timestep() 

665 self.queue = {0: self.empty_vqip(), 1: self.empty_vqip()} 

666 

667 

668class DecayArc(QueueArc, DecayObj): 

669 """""" 

670 

671 def __init__(self, decays={}, **kwargs): 

672 """A QueueArc that applies decays from a DecayObj. 

673 

674 Args: 

675 decays (dict, optional): A dict of dicts containing a key for each pollutant 

676 that decays and within that, a key for each parameter (a constant and 

677 exponent). Defaults to {}. 

678 """ 

679 self.decays = decays 

680 

681 QueueArc.__init__(self, **kwargs) 

682 DecayObj.__init__(self, decays) 

683 

684 self.mass_balance_out.append(lambda: self.total_decayed) 

685 

686 def enter_queue(self, request, direction=None, tag="default"): 

687 """Add a request into the arc's queue list. Apply the make_decay function (i.e., 

688 the decay that occur's this timestep). 

689 

690 Args: 

691 request (dict): A dict with a VQIP under the 'vqip' key and the travel 

692 time under the 'time' key. 

693 direction (str): Direction of flow, can be 'push' or 'pull 

694 tag (str, optional): optional message to direct the out_port's 

695 query_handler which function to call. Defaults to 'default'. 

696 """ 

697 # Update inflows and format 

698 request = self.enter_arc(request, direction, tag) 

699 

700 # TODO - currently decay depends on temp at the in_port data object.. 

701 # surely on vqip would be more sensible? (though this is true in many 

702 # places including WTW) 

703 

704 # Decay on entry 

705 request["vqip"] = self.make_decay(request["vqip"]) 

706 

707 # Append to queue 

708 self.queue.append(request) 

709 

710 def end_timestep(self): 

711 """End timestep in an arc, resetting flow/vqip in/out (which determine) the 

712 capacity for that timestep. 

713 

714 Update times of requests in the queue. Apply the make_decay function (i.e., the 

715 decay that occurs in the following timestep). 

716 """ 

717 self.vqip_in = self.empty_vqip() 

718 self.vqip_out = self.empty_vqip() 

719 self.total_decayed = self.empty_vqip() 

720 self.flow_in = 0 

721 self.flow_out = 0 

722 

723 self.queue_storage_ = self.copy_vqip(self.queue_storage) 

724 self.queue_storage = self.empty_vqip() 

725 

726 for request in self.queue: 

727 request["vqip"] = self.make_decay(request["vqip"]) 

728 request["time"] = max(request["time"] - 1, 0) 

729 

730 

731class DecayArcAlt(AltQueueArc, DecayObj): 

732 """""" 

733 

734 def __init__(self, decays={}, **kwargs): 

735 """An AltQueueArc that applies decays from a DecayObj. 

736 

737 Args: 

738 decays (dict, optional): A dict of dicts containing a key for each pollutant 

739 that decays and within that, a key for each parameter (a constant and 

740 exponent). Defaults to {}. 

741 """ 

742 self.decays = {} 

743 

744 # super().__init__(**kwargs) 

745 AltQueueArc.__init__(self, **kwargs) 

746 DecayObj.__init__(self, decays) 

747 

748 self.end_timestep = self._end_timestep 

749 

750 self.mass_balance_out.append(lambda: self.total_decayed) 

751 

752 def enter_queue(self, request, direction=None, tag="default"): 

753 """Add a request into the arc's queue. Apply the make_decay function (i.e., the 

754 decay that occur's this timestep). 

755 

756 Args: 

757 request (dict): A dict with a VQIP under the 'vqip' key and the travel 

758 time under the 'time' key. 

759 direction (str): Direction of flow, can be 'push' only. Defaults to 'push' 

760 tag (str, optional): Optional message for out_port's query handler, can be 

761 'default' only. Defaults to 'default'. 

762 """ 

763 # TODO- has no tags 

764 

765 # Update inflows and format 

766 request = self.enter_arc(request, direction, tag) 

767 

768 # Decay on entry 

769 request["vqip"] = self.make_decay(request["vqip"]) 

770 

771 # Sum into queue 

772 if request["time"] in self.queue.keys(): 

773 self.queue[request["time"]] = self.sum_vqip( 

774 self.queue[request["time"]], request["vqip"] 

775 ) 

776 else: 

777 self.queue[request["time"]] = request["vqip"] 

778 self.max_travel = max(self.max_travel, request["time"]) 

779 

780 def _end_timestep(self): 

781 """End timestep in an arc, resetting flow/vqip in/out (which determine) the 

782 capacity for that timestep. 

783 

784 Update timings in the queue. Apply the make_decay function (i.e., the decay that 

785 occurs in the following timestep). 

786 """ 

787 self.vqip_in = self.empty_vqip() 

788 self.vqip_out = self.empty_vqip() 

789 self.total_decayed = self.empty_vqip() 

790 self.flow_in = 0 

791 self.flow_out = 0 

792 

793 self.queue_storage_ = self.copy_vqip(self.queue_storage) 

794 self.queue_storage = ( 

795 self.empty_vqip() 

796 ) # TODO I don't think this (or any queue_storage= empty) is necessary 

797 

798 queue_ = self.queue.copy() 

799 keys = self.queue.keys() 

800 for i in range(self.max_travel): 

801 if (i + 1) in keys: 

802 self.queue[i] = self.make_decay(queue_[i + 1]) 

803 self.queue[i + 1] = self.empty_vqip() 

804 

805 self.queue[0] = self.sum_vqip(self.queue[0], self.make_decay(queue_[0])) 

806 

807 

808class PullArc(Arc): 

809 """""" 

810 

811 def __init__(self, **kwargs): 

812 """Subclass of Arc where pushes return no availability to push. 

813 

814 This creates an Arc where only pull requests/checks can be sent, similar to a 

815 river abstraction. 

816 """ 

817 super().__init__(**kwargs) 

818 self.send_push_request = self.send_push_deny 

819 self.send_push_check = self.send_push_check_deny 

820 

821 def send_push_deny(self, vqip, tag="default", force=False): 

822 """Function used to deny any push requests. 

823 

824 Args: 

825 vqip (dict): A dict VQIP of water to push 

826 tag (str, optional): optional message to direct the out_port's 

827 query_handler which function to call. Defaults to 'default'. 

828 force (bool, optional): Argument used to cause function to ignore tank 

829 capacity of out_port, possibly resulting in pooling. Should not be used 

830 unless out_port is a tank object. Defaults to False. 

831 

832 Returns: 

833 (dict): A VQIP amount of water that was not successfully pushed 

834 """ 

835 return vqip 

836 

837 def send_push_check_deny(self, vqip=None, tag="default"): 

838 """Function used to deny any push checks. 

839 

840 Args: 

841 vqip (dict): A dict VQIP of water to push that can be specified. Defaults to 

842 None, which returns maximum capacity to push. 

843 tag (str, optional): optional message to direct the out_port's 

844 query_handler which function to call. Defaults to 'default'. 

845 

846 Returns: 

847 (dict): An empty VQIP amount of water indicating no water can be pushed 

848 """ 

849 return self.empty_vqip() 

850 

851 

852class PushArc(Arc): 

853 """""" 

854 

855 def __init__(self, **kwargs): 

856 """Subclass of Arc where pushes return no availability to pull. 

857 

858 This creates an Arc where only push requests/checks can be sent, similar to a 

859 CSO. 

860 """ 

861 super().__init__(**kwargs) 

862 self.send_pull_request = self.send_pull_deny 

863 self.send_pull_check = self.send_pull_check_deny 

864 

865 def send_pull_deny(self, vqip, tag="default", force=False): 

866 """Function used to deny any pull requests. 

867 

868 Args: 

869 vqip (dict): A dict VQIP of water to pull 

870 tag (str, optional): optional message to direct the out_port's 

871 query_handler which function to call. Defaults to 'default'. 

872 force (bool, optional): Argument used to cause function to ignore tank 

873 capacity of out_port, possibly resulting in pooling. Should not be used 

874 unless out_port is a tank object. Defaults to False. 

875 

876 Returns: 

877 (dict): A VQIP amount of water that was successfully pulled 

878 """ 

879 return self.empty_vqip() 

880 

881 def send_pull_check_deny(self, vqip=None, tag="default"): 

882 """Function used to deny any pull checks. 

883 

884 Args: 

885 vqip (dict): A dict VQIP of water to pull that can be specified. Defaults to 

886 None, which returns maximum capacity to pull. 

887 tag (str, optional): optional message to direct the out_port's 

888 query_handler which function to call. Defaults to 'default'. 

889 

890 Returns: 

891 (dict): An empty VQIP amount of water indicating no water can be pulled 

892 """ 

893 return self.empty_vqip() 

894 

895 

896class SewerArc(Arc): 

897 """""" 

898 

899 pass 

900 

901 

902class WeirArc(SewerArc): 

903 """""" 

904 

905 pass