Coverage for wsimod/arcs/arcs.py: 17%

298 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2024-01-11 16:39 +0000

1# -*- coding: utf-8 -*- 

2"""Created on Wed Apr 7 08:43:32 2021. 

3 

4@author: Barney 

5 

6Converted to totals on Thur Apr 21 2022 

7""" 

8 

9from wsimod.core import constants 

10from wsimod.core.core import DecayObj, WSIObj 

11 

12# from wsimod.nodes import nodes #Complains about circular imports. 

13# I don't think it should do.. 

14 

15 

16class Arc(WSIObj): 

17 """""" 

18 

19 def __init__( 

20 self, 

21 name="", 

22 capacity=constants.UNBOUNDED_CAPACITY, 

23 preference=1, 

24 in_port=None, 

25 out_port=None, 

26 **kwargs, 

27 ): 

28 """Arc objects are the way for information to be passed between nodes in WSIMOD. 

29 They have an in_port (where a message comes from) and an out_port (where a 

30 message goes to). 

31 

32 Returns: 

33 name (str): Name of arc. Defaults to ''. 

34 capacity (float): Capacity of flow along an arc (vol/timestep). 

35 Defaults to constants.UNBOUNDED_CAPACITY. 

36 preference (float): Number used to prioritise or deprioritise use of an arc 

37 when flexibility exists 

38 in_port: A WSIMOD node object where the arc starts 

39 out_port: A WSIMOD node object where the arc ends 

40 """ 

41 # Default essential parameters 

42 self.name = name 

43 self.in_port = in_port 

44 self.out_port = out_port 

45 self.capacity = capacity 

46 self.preference = preference 

47 

48 # Update args 

49 WSIObj.__init__(self) 

50 self.__dict__.update(kwargs) 

51 

52 # def all_subclasses(cls): 

53 # return set(cls.__subclasses__()).union( 

54 # [s for c in cls.__subclasses__() for s in all_subclasses(c)]) 

55 # node_types = [x.__name__ for x in all_subclasses(nodes.Node)] + ['Node'] 

56 

57 # if self.name in node_types: 

58 # print('Warning: arc name should not take a node class name') 

59 # #TODO... not sure why... also currently commented for import issues.. 

60 

61 # Initialise states 

62 self.flow_in = 0 

63 self.flow_out = 0 

64 self.vqip_in = self.empty_vqip() 

65 self.vqip_out = self.empty_vqip() 

66 

67 # Update ports 

68 self.in_port.out_arcs[self.name] = self 

69 self.out_port.in_arcs[self.name] = self 

70 

71 out_type = self.out_port.__class__.__name__ 

72 in_type = self.in_port.__class__.__name__ 

73 

74 if hasattr(self.in_port, "out_arcs_type"): 

75 self.in_port.out_arcs_type[out_type][self.name] = self 

76 

77 if hasattr(self.out_port, "in_arcs_type"): 

78 self.out_port.in_arcs_type[in_type][self.name] = self 

79 

80 # Mass balance checking 

81 self.mass_balance_in = [lambda: self.vqip_in] 

82 self.mass_balance_out = [lambda: self.vqip_out] 

83 self.mass_balance_ds = [lambda: self.empty_vqip()] 

84 

85 def arc_mass_balance(self): 

86 """Checks mass balance for inflows/outflows/storage change in an arc. 

87 

88 Returns: 

89 in_ (dict) Total vqip of vqip_in and other inputs in mass_balance_in 

90 ds_ (dict): Total vqip of change in arc in mass_balance_ds 

91 out_ (dict): Total vqip of vqip_out and other outputs in mass_balance_out 

92 

93 Examples: 

94 arc_in, arc_out, arc_ds = my_arc.arc_mass_balance() 

95 """ 

96 in_, ds_, out_ = self.mass_balance() 

97 return in_, ds_, out_ 

98 

99 def send_push_request(self, vqip, tag="default", force=False): 

100 """Function used to transmit a push request from one node (in_port) to another 

101 node (out_port). 

102 

103 Args: 

104 vqip (dict): A dict VQIP of water to push 

105 tag (str, optional): optional message to direct the out_port's query_ 

106 handler which function to call. Defaults to 'default'. 

107 force (bool, optional): Argument used to cause function to ignore tank 

108 capacity of out_port, possibly resulting in pooling. Should not be used 

109 unless 

110 out_port is a tank object. Defaults to False. 

111 

112 Returns: 

113 (dict): A VQIP amount of water that was not successfully pushed 

114 """ 

115 vqip = self.copy_vqip(vqip) 

116 

117 # Apply pipe capacity 

118 if force: 

119 not_pushed = self.empty_vqip() 

120 else: 

121 excess_in = self.get_excess(direction="push", vqip=vqip, tag=tag) 

122 not_pushed = self.v_change_vqip( 

123 vqip, max(vqip["volume"] - excess_in["volume"], 0) 

124 ) 

125 

126 # Don't attempt to send volume that exceeds capacity 

127 vqip = self.extract_vqip(vqip, not_pushed) 

128 

129 # Set push 

130 reply = self.out_port.push_set(vqip, tag) 

131 

132 # Update total amount successfully sent 

133 vqip = self.extract_vqip(vqip, reply) 

134 

135 # Combine non-sent water 

136 reply = self.sum_vqip(reply, not_pushed) 

137 

138 # Update mass balance 

139 self.flow_in += vqip["volume"] 

140 self.flow_out = self.flow_in 

141 

142 self.vqip_in = self.sum_vqip(self.vqip_in, vqip) 

143 self.vqip_out = self.vqip_in 

144 

145 return reply 

146 

147 def send_pull_request(self, vqip, tag="default"): 

148 """Function used to transmit a pull request from one node (in_port) to another 

149 node (out_port). 

150 

151 Args: 

152 vqip (dict): A dict VQIP of water to pull (by default, only 'volume' key is 

153 used) 

154 tag (str, optional): optional message to direct the out_port's query_handler 

155 which 

156 function to call. Defaults to 'default'. 

157 

158 Returns: 

159 (dict): A VQIP amount of water that was successfully pulled 

160 """ 

161 volume = vqip["volume"] 

162 # Apply pipe capacity 

163 excess_in = self.get_excess(direction="pull", vqip=vqip, tag=tag)["volume"] 

164 not_pulled = max(volume - excess_in, 0) 

165 volume -= not_pulled 

166 

167 if volume > 0: 

168 for pol in constants.ADDITIVE_POLLUTANTS: 

169 if pol in vqip.keys(): 

170 vqip[pol] *= volume / vqip["volume"] 

171 

172 vqip["volume"] = volume 

173 

174 # Make pull 

175 vqip = self.in_port.pull_set(vqip, tag) 

176 

177 # Update mass balance 

178 self.flow_in += vqip["volume"] 

179 self.flow_out = self.flow_in 

180 

181 self.vqip_in = self.sum_vqip(self.vqip_in, vqip) 

182 self.vqip_out = self.vqip_in 

183 

184 return vqip 

185 

186 def send_push_check(self, vqip=None, tag="default"): 

187 """Function used to transmit a push check from one node (in_port) to another 

188 node (out_port). 

189 

190 Args: 

191 vqip (dict): A dict VQIP of water to push that can be specified. Defaults to 

192 None, which returns maximum capacity to push. 

193 tag (str, optional): optional message to direct the out_port's 

194 query_handler which function to call. Defaults to 'default'. 

195 

196 Returns: 

197 (dict): A VQIP amount of water that could be pushed 

198 """ 

199 return self.get_excess(direction="push", vqip=vqip, tag=tag) 

200 

201 def send_pull_check(self, vqip=None, tag="default"): 

202 """Function used to transmit a pull check from one node (in_port) to another 

203 node (out_port). 

204 

205 Args: 

206 vqip (dict): A dict VQIP of water to pull that can be specified (by default, 

207 only the 'volume' key is used). Defaults to None, which returns all 

208 available water to pull. 

209 tag (str, optional): optional message to direct the out_port's 

210 query_handler which function to call. Defaults to 'default'. 

211 

212 Returns: 

213 (dict): A VQIP amount of water that could be pulled 

214 """ 

215 return self.get_excess(direction="pull", vqip=vqip, tag=tag) 

216 

217 def get_excess(self, direction, vqip=None, tag="default"): 

218 """Calculate how much could be pull/pulled along the arc by combining both arc 

219 capacity and out_port check information. 

220 

221 Args: 

222 direction (str): should be 'pull' or 'push' 

223 vqip (dict, optional): A VQIP amount to push/pull that can be 

224 specified. Defaults to None, which returns all available water to 

225 pull or maximum capacity to push (depending on 'direction'). 

226 tag (str, optional): optional message to direct the out_port's query_handler 

227 which function to call. Defaults to 'default'. 

228 

229 Returns: 

230 (dict): A VQIP amount of water that could be pulled/pushed 

231 """ 

232 # Pipe capacity 

233 pipe_excess = self.capacity - self.flow_in 

234 

235 # Node capacity 

236 if direction == "push": 

237 node_excess = self.out_port.push_check(vqip, tag) 

238 elif direction == "pull": 

239 node_excess = self.in_port.pull_check(vqip, tag) 

240 excess = min(pipe_excess, node_excess["volume"]) 

241 

242 # TODO sensible to min(vqip, excess) here? (though it should be applied by node) 

243 

244 return self.v_change_vqip(node_excess, excess) 

245 

246 def end_timestep(self): 

247 """End timestep in an arc, resetting flow/vqip in/out (which determine) the 

248 capacity for that timestep.""" 

249 self.vqip_in = self.empty_vqip() 

250 self.vqip_out = self.empty_vqip() 

251 self.flow_in = 0 

252 self.flow_out = 0 

253 

254 def reinit(self): 

255 """Reinitiatilise.""" 

256 self.end_timestep() 

257 

258 

259class QueueArc(Arc): 

260 """""" 

261 

262 def __init__(self, number_of_timesteps=0, **kwargs): 

263 """A queue arc that stores each push or pull individually in the queue. Enables 

264 implementation of travel time. A fixed number of timesteps can be specified as a 

265 parameter, and additional number of timesteps can be specified when the requests 

266 are made. 

267 

268 The queue is a list of requests, where their travel time is decremented 

269 by 1 each timestep. Any requests with a travel time of 0 will be sent 

270 onwards if the 'update_queue' function is called. 

271 

272 Args: 

273 number_of_timesteps (int, optional): Fixed number of timesteps that 

274 it takes to traverse the arc. Defaults to 0. 

275 """ 

276 self.number_of_timesteps = number_of_timesteps 

277 self.queue = [] 

278 super().__init__(**kwargs) 

279 

280 self.queue_storage = self.empty_vqip() 

281 self.queue_storage_ = self.empty_vqip() 

282 

283 self.mass_balance_ds.append(lambda: self.queue_arc_ds()) 

284 

285 def queue_arc_ds(self): 

286 """Calculate change in amount of water and other pollutants in the arc. 

287 

288 Returns: 

289 (dict): A VQIP amount of change 

290 """ 

291 self.queue_storage = self.queue_arc_sum() 

292 return self.extract_vqip(self.queue_storage, self.queue_storage_) 

293 

294 def queue_arc_sum(self): 

295 """Sum the total water in the requests in the queue of the arc. 

296 

297 Returns: 

298 (dict): A VQIP amount of water/pollutants in the arc 

299 """ 

300 queue_storage = self.empty_vqip() 

301 for request in self.queue: 

302 queue_storage = self.sum_vqip(queue_storage, request["vqip"]) 

303 return queue_storage 

304 

305 def send_pull_request(self, vqip, tag="default", time=0): 

306 """Function used to transmit a pull request from one node (in_port) to another 

307 node (out_port). Any pulled water is immediately removed from the out_port and 

308 then takes the travel time to be received. This function has not been 

309 extensively tested. 

310 

311 Args: 

312 vqip (_type_): A dict VQIP of water to pull (by default, only 'volume' key 

313 is used) 

314 tag (str, optional): optional message to direct the out_port's query_handler 

315 which function to call. Defaults to 'default'. 

316 time (int, optional): Travel time for request to spend in the arc (in 

317 addition to the arc's 'number_of_timesteps' parameter). Defaults to 0. 

318 

319 Returns: 

320 (dict): A VQIP amount of water that was successfully pulled. 

321 """ 

322 volume = vqip["volume"] 

323 # Apply pipe capacity 

324 excess_in = self.get_excess(direction="pull", vqip=vqip)["volume"] 

325 not_pulled = max(volume - excess_in, 0) 

326 volume -= not_pulled 

327 

328 for pol in constants.ADDITIVE_POLLUTANTS: 

329 if pol in vqip.keys(): 

330 vqip[pol] *= volume / vqip["volume"] 

331 

332 vqip["volume"] = volume 

333 

334 # Make pull 

335 vqip = self.in_port.pull_set(vqip) 

336 

337 # Update to queue request 

338 request = {"time": time + self.number_of_timesteps, "vqip": vqip} 

339 

340 # vqtip enters arc as a request 

341 self.enter_queue(request, direction="pull") 

342 

343 # Update request queue and return pulls from queue 

344 reply = self.update_queue(direction="pull") 

345 return reply 

346 

347 def send_push_request(self, vqip_, tag="default", force=False, time=0): 

348 """Function used to transmit a push request from one node (in_port) to another 

349 node (out_port). 

350 

351 Args: 

352 vqip_ (dict): A dict VQIP of water to push. 

353 tag (str, optional): optional message to direct the out_port's query_handler 

354 which function to call. Defaults to 'default'. 

355 force (bool, optional): Ignore the capacity of the arc (note does not 

356 currently, pass the force argument to the out_port). Defaults to False. 

357 time (int, optional): Travel time for request to spend in the arc (in 

358 addition to the arc's 'number_of_timesteps' parameter). Defaults to 0. 

359 

360 Returns: 

361 (dict): A VQIP amount of water that was not successfully pushed 

362 """ 

363 vqip = self.copy_vqip(vqip_) 

364 

365 if vqip["volume"] < constants.FLOAT_ACCURACY: 

366 return self.empty_vqip() 

367 

368 # Apply pipe capacity 

369 if force: 

370 not_pushed = self.empty_vqip() 

371 else: 

372 excess_in = self.get_excess(direction="push", vqip=vqip, tag=tag) 

373 not_pushed = self.v_change_vqip( 

374 vqip, max(vqip["volume"] - excess_in["volume"], 0) 

375 ) 

376 

377 vqip = self.extract_vqip(vqip, not_pushed) 

378 

379 # Update to queue request 

380 request = {"time": time + self.number_of_timesteps, "vqip": vqip} 

381 

382 # vqtip enters arc as a request 

383 self.enter_queue(request, direction="push", tag=tag) 

384 

385 # Update request queue 

386 backflow = self.update_queue(direction="push") 

387 not_pushed = self.sum_vqip(not_pushed, backflow) 

388 

389 if backflow["volume"] > vqip_["volume"]: 

390 print("more backflow than vqip...") 

391 

392 self.vqip_in = self.extract_vqip(self.vqip_in, backflow) 

393 

394 return not_pushed 

395 

396 def enter_arc(self, request, direction, tag): 

397 """Function used to cause format a request into the format expected by the 

398 enter_queue function. 

399 

400 Args: 

401 request (dict): A dict with a VQIP under the 'vqip' key and the travel 

402 time under the 'time' key. 

403 direction (str): Direction of flow, can be 'push' or 'pull 

404 tag (str, optional): optional message to direct the out_port's 

405 query_handler which function to call. Defaults to 'default'. 

406 

407 Returns: 

408 (dict): The request dict with additional information entered for the queue. 

409 """ 

410 request["average_flow"] = request["vqip"]["volume"] / (request["time"] + 1) 

411 request["direction"] = direction 

412 request["tag"] = tag 

413 

414 self.flow_in += request["average_flow"] 

415 self.vqip_in = self.sum_vqip(self.vqip_in, request["vqip"]) 

416 

417 return request 

418 

419 def enter_queue(self, request, direction=None, tag="default"): 

420 """Add a request into the arc's queue list. 

421 

422 Args: 

423 request (dict): A dict with a VQIP under the 'vqip' key and the travel 

424 time under the 'time' key. 

425 direction (str): Direction of flow, can be 'push' or 'pull 

426 tag (str, optional): optional message to direct the out_port's 

427 query_handler which function to call. Defaults to 'default'. 

428 """ 

429 # Update inflows and format request 

430 request = self.enter_arc(request, direction, tag) 

431 

432 # Enter queue 

433 self.queue.append(request) 

434 

435 def update_queue(self, direction=None, backflow_enabled=True): 

436 """Iterate over all requests in the queue, removing them if they have no volume. 

437 

438 If a request is a push and has 0 travel time remaining then 

439 the push will be triggered at the out_port, if the out_port responds that 

440 it cannot receive the push, then this water will be returned as backflow 

441 (if enabled). 

442 

443 If a request is a pull and has 0 travel time remaining then it is simply summed 

444 with other 0 travel time pull_requests and returned (since the pull is made at 

445 the out_port when the send_pull_request is made). 

446 

447 

448 Args: 

449 direction (str, optional): Direction of flow, can be 'push' or 'pull. 

450 Defaults to None. 

451 backflow_enabled (bool, optional): Enable backflow, described above, if not 

452 enabled then the request will remain in the queue until all water has 

453 been received. Defaults to True. 

454 

455 Returns: 

456 total_backflow (dict): In the case of a push direction, any backflow will be 

457 returned as a VQIP amount 

458 total_removed (dict): In the case of a pull direction, any pulled water will 

459 be returned as a VQIP amount 

460 """ 

461 done_requests = [] 

462 

463 total_removed = self.empty_vqip() 

464 total_backflow = self.empty_vqip() 

465 # Iterate over requests 

466 for request in self.queue: 

467 if request["direction"] == direction: 

468 vqip = request["vqip"] 

469 

470 if vqip["volume"] < constants.FLOAT_ACCURACY: 

471 # Add to queue for removal 

472 done_requests.append(request) 

473 elif request["time"] == 0: 

474 if direction == "push": 

475 # Attempt to push request 

476 reply = self.out_port.push_set(vqip, request["tag"]) 

477 removed = vqip["volume"] - reply["volume"] 

478 

479 elif direction == "pull": 

480 # Water has already been pulled, so assume all received 

481 removed = vqip["volume"] 

482 

483 else: 

484 print("No direction") 

485 

486 # Update outflows 

487 self.flow_out += request["average_flow"] * removed / vqip["volume"] 

488 vqip_ = self.v_change_vqip(vqip, removed) 

489 total_removed = self.sum_vqip(total_removed, vqip_) 

490 

491 # Assume that any water that cannot arrive at destination this 

492 # timestep is backflow 

493 rejected = self.v_change_vqip(vqip, vqip["volume"] - removed) 

494 

495 if backflow_enabled | ( 

496 rejected["volume"] < constants.FLOAT_ACCURACY 

497 ): 

498 total_backflow = self.sum_vqip(rejected, total_backflow) 

499 done_requests.append(request) 

500 else: 

501 request["vqip"] = rejected 

502 

503 self.vqip_out = self.sum_vqip(self.vqip_out, total_removed) 

504 

505 # Remove done requests 

506 for request in done_requests: 

507 self.queue.remove(request) 

508 

509 # return total_removed 

510 if direction == "pull": 

511 return total_removed 

512 elif direction == "push": 

513 return total_backflow 

514 else: 

515 print("No direction") 

516 

517 def end_timestep(self): 

518 """End timestep in an arc, resetting flow/vqip in/out (which determine) the 

519 capacity for that timestep. 

520 

521 Update times of requests in the queue. 

522 """ 

523 self.vqip_in = self.empty_vqip() 

524 self.vqip_out = self.empty_vqip() 

525 self.flow_in = 0 

526 self.flow_out = 0 

527 

528 self.queue_storage_ = self.copy_vqip(self.queue_storage) 

529 self.queue_storage = self.empty_vqip() 

530 

531 for request in self.queue: 

532 request["time"] = max(request["time"] - 1, 0) 

533 

534 # TODO - update_queue here? 

535 

536 def reinit(self): 

537 """""" 

538 self.end_timestep() 

539 self.queue = [] 

540 

541 

542class AltQueueArc(QueueArc): 

543 """""" 

544 

545 def __init__(self, **kwargs): 

546 """A simpler queue arc that has a queue that is a dict where each key is the 

547 travel time. 

548 

549 Cannot be used if arc capacity is dynamic. Cannot be used for pulls. 

550 """ 

551 self.queue_arc_sum = self.alt_queue_arc_sum 

552 

553 super().__init__(**kwargs) 

554 self.queue = {0: self.empty_vqip(), 1: self.empty_vqip()} 

555 self.max_travel = 1 

556 

557 def alt_queue_arc_sum(self): 

558 """Sum the total water in the queue of the arc. 

559 

560 Returns: 

561 (dict): A VQIP amount of water/pollutants in the arc 

562 """ 

563 queue_storage = self.empty_vqip() 

564 for request in self.queue.values(): 

565 queue_storage = self.sum_vqip(queue_storage, request) 

566 return queue_storage 

567 

568 def enter_queue(self, request, direction="push", tag="default"): 

569 """Add a request into the arc's queue. 

570 

571 Args: 

572 request (dict): A dict with a VQIP under the 'vqip' key and the travel 

573 time under the 'time' key. 

574 direction (str): Direction of flow, can be 'push' only. Defaults to 'push' 

575 tag (str, optional): Optional message for out_port's query handler, can be 

576 'default' only. Defaults to 'default'. 

577 """ 

578 # Update inflows and format request 

579 request = self.enter_arc(request, direction, tag) 

580 

581 # Sum into queue 

582 if request["time"] in self.queue.keys(): 

583 self.queue[request["time"]] = self.sum_vqip( 

584 self.queue[request["time"]], request["vqip"] 

585 ) 

586 else: 

587 self.queue[request["time"]] = request["vqip"] 

588 self.max_travel = max(self.max_travel, request["time"]) 

589 

590 def update_queue(self, direction=None, backflow_enabled=True): 

591 """Trigger the push of water in the 0th key for the queue, if the out_port 

592 responds that it cannot receive the push, then this water will be returned as 

593 backflow (if enabled). 

594 

595 Args: 

596 direction (str): Direction of flow, can be 'push' only. Defaults to 'push' 

597 backflow_enabled (bool, optional): Enable backflow, described above, if not 

598 enabled then the request will remain in the queue until all water has 

599 been received. Defaults to True. 

600 

601 Returns: 

602 backflow (dict): In the case of a push direction, any backflow will be 

603 returned as a VQIP amount 

604 """ 

605 # TODO - can this work for pulls?? 

606 

607 total_removed = self.copy_vqip(self.queue[0]) 

608 

609 # Push 0 travel time water 

610 backflow = self.out_port.push_set(total_removed) 

611 

612 if not backflow_enabled: 

613 self.queue[0] = backflow 

614 backflow = self.empty_vqip() 

615 else: 

616 self.queue[0] = self.empty_vqip() 

617 

618 total_removed = self.v_change_vqip( 

619 total_removed, total_removed["volume"] - backflow["volume"] 

620 ) 

621 

622 self.flow_out += total_removed["volume"] 

623 self.vqip_out = self.sum_vqip(self.vqip_out, total_removed) 

624 

625 return backflow 

626 

627 def end_timestep(self): 

628 """End timestep in an arc, resetting flow/vqip in/out (which determine) the 

629 capacity for that timestep. 

630 

631 Update timings in the queue. 

632 """ 

633 self.vqip_in = self.empty_vqip() 

634 self.vqip_out = self.empty_vqip() 

635 self.flow_in = 0 

636 self.flow_out = 0 

637 self.queue_storage_ = self.copy_vqip(self.queue_storage) 

638 self.queue_storage = self.empty_vqip() 

639 

640 queue_ = self.queue.copy() 

641 keys = self.queue.keys() 

642 for i in range(self.max_travel): 

643 if (i + 1) in keys: 

644 self.queue[i] = queue_[i + 1] 

645 self.queue[i + 1] = self.empty_vqip() 

646 

647 self.queue[0] = self.sum_vqip(queue_[0], queue_[1]) 

648 

649 def reinit(self): 

650 """""" 

651 self.end_timestep() 

652 self.queue = {0: self.empty_vqip(), 1: self.empty_vqip()} 

653 

654 

655class DecayArc(QueueArc, DecayObj): 

656 """""" 

657 

658 def __init__(self, decays={}, **kwargs): 

659 """A QueueArc that applies decays from a DecayObj. 

660 

661 Args: 

662 decays (dict, optional): A dict of dicts containing a key for each pollutant 

663 that decays and within that, a key for each parameter (a constant and 

664 exponent). Defaults to {}. 

665 """ 

666 self.decays = decays 

667 

668 QueueArc.__init__(self, **kwargs) 

669 DecayObj.__init__(self, decays) 

670 

671 self.mass_balance_out.append(lambda: self.total_decayed) 

672 

673 def enter_queue(self, request, direction=None, tag="default"): 

674 """Add a request into the arc's queue list. Apply the make_decay function (i.e., 

675 the decay that occur's this timestep). 

676 

677 Args: 

678 request (dict): A dict with a VQIP under the 'vqip' key and the travel 

679 time under the 'time' key. 

680 direction (str): Direction of flow, can be 'push' or 'pull 

681 tag (str, optional): optional message to direct the out_port's 

682 query_handler which function to call. Defaults to 'default'. 

683 """ 

684 # Update inflows and format 

685 request = self.enter_arc(request, direction, tag) 

686 

687 # TODO - currently decay depends on temp at the in_port data object.. 

688 # surely on vqip would be more sensible? (though this is true in many 

689 # places including WTW) 

690 

691 # Decay on entry 

692 request["vqip"] = self.make_decay(request["vqip"]) 

693 

694 # Append to queue 

695 self.queue.append(request) 

696 

697 def end_timestep(self): 

698 """End timestep in an arc, resetting flow/vqip in/out (which determine) the 

699 capacity for that timestep. 

700 

701 Update times of requests in the queue. Apply the make_decay function (i.e., the 

702 decay that occurs in the following timestep). 

703 """ 

704 self.vqip_in = self.empty_vqip() 

705 self.vqip_out = self.empty_vqip() 

706 self.total_decayed = self.empty_vqip() 

707 self.flow_in = 0 

708 self.flow_out = 0 

709 

710 self.queue_storage_ = self.copy_vqip(self.queue_storage) 

711 self.queue_storage = self.empty_vqip() 

712 

713 for request in self.queue: 

714 request["vqip"] = self.make_decay(request["vqip"]) 

715 request["time"] = max(request["time"] - 1, 0) 

716 

717 

718class DecayArcAlt(AltQueueArc, DecayObj): 

719 """""" 

720 

721 def __init__(self, decays={}, **kwargs): 

722 """An AltQueueArc that applies decays from a DecayObj. 

723 

724 Args: 

725 decays (dict, optional): A dict of dicts containing a key for each pollutant 

726 that decays and within that, a key for each parameter (a constant and 

727 exponent). Defaults to {}. 

728 """ 

729 self.decays = {} 

730 

731 # super().__init__(**kwargs) 

732 AltQueueArc.__init__(self, **kwargs) 

733 DecayObj.__init__(self, decays) 

734 

735 self.end_timestep = self._end_timestep 

736 

737 self.mass_balance_out.append(lambda: self.total_decayed) 

738 

739 def enter_queue(self, request, direction=None, tag="default"): 

740 """Add a request into the arc's queue. Apply the make_decay function (i.e., the 

741 decay that occur's this timestep). 

742 

743 Args: 

744 request (dict): A dict with a VQIP under the 'vqip' key and the travel 

745 time under the 'time' key. 

746 direction (str): Direction of flow, can be 'push' only. Defaults to 'push' 

747 tag (str, optional): Optional message for out_port's query handler, can be 

748 'default' only. Defaults to 'default'. 

749 """ 

750 # TODO- has no tags 

751 

752 # Update inflows and format 

753 request = self.enter_arc(request, direction, tag) 

754 

755 # Decay on entry 

756 request["vqip"] = self.make_decay(request["vqip"]) 

757 

758 # Sum into queue 

759 if request["time"] in self.queue.keys(): 

760 self.queue[request["time"]] = self.sum_vqip( 

761 self.queue[request["time"]], request["vqip"] 

762 ) 

763 else: 

764 self.queue[request["time"]] = request["vqip"] 

765 self.max_travel = max(self.max_travel, request["time"]) 

766 

767 def _end_timestep(self): 

768 """End timestep in an arc, resetting flow/vqip in/out (which determine) the 

769 capacity for that timestep. 

770 

771 Update timings in the queue. Apply the make_decay function (i.e., the decay that 

772 occurs in the following timestep). 

773 """ 

774 self.vqip_in = self.empty_vqip() 

775 self.vqip_out = self.empty_vqip() 

776 self.total_decayed = self.empty_vqip() 

777 self.flow_in = 0 

778 self.flow_out = 0 

779 

780 self.queue_storage_ = self.copy_vqip(self.queue_storage) 

781 self.queue_storage = ( 

782 self.empty_vqip() 

783 ) # TODO I don't think this (or any queue_storage= empty) is necessary 

784 

785 queue_ = self.queue.copy() 

786 keys = self.queue.keys() 

787 for i in range(self.max_travel): 

788 if (i + 1) in keys: 

789 self.queue[i] = self.make_decay(queue_[i + 1]) 

790 self.queue[i + 1] = self.empty_vqip() 

791 

792 self.queue[0] = self.sum_vqip(self.queue[0], self.make_decay(queue_[0])) 

793 

794 

795class PullArc(Arc): 

796 """""" 

797 

798 def __init__(self, **kwargs): 

799 """Subclass of Arc where pushes return no availability to push. 

800 

801 This creates an Arc where only pull requests/checks can be sent, similar to a 

802 river abstraction. 

803 """ 

804 super().__init__(**kwargs) 

805 self.send_push_request = self.send_push_deny 

806 self.send_push_check = self.send_push_check_deny 

807 

808 def send_push_deny(self, vqip, tag="default", force=False): 

809 """Function used to deny any push requests. 

810 

811 Args: 

812 vqip (dict): A dict VQIP of water to push 

813 tag (str, optional): optional message to direct the out_port's 

814 query_handler which function to call. Defaults to 'default'. 

815 force (bool, optional): Argument used to cause function to ignore tank 

816 capacity of out_port, possibly resulting in pooling. Should not be used 

817 unless out_port is a tank object. Defaults to False. 

818 

819 Returns: 

820 (dict): A VQIP amount of water that was not successfully pushed 

821 """ 

822 return vqip 

823 

824 def send_push_check_deny(self, vqip=None, tag="default"): 

825 """Function used to deny any push checks. 

826 

827 Args: 

828 vqip (dict): A dict VQIP of water to push that can be specified. Defaults to 

829 None, which returns maximum capacity to push. 

830 tag (str, optional): optional message to direct the out_port's 

831 query_handler which function to call. Defaults to 'default'. 

832 

833 Returns: 

834 (dict): An empty VQIP amount of water indicating no water can be pushed 

835 """ 

836 return self.empty_vqip() 

837 

838 

839class PushArc(Arc): 

840 """""" 

841 

842 def __init__(self, **kwargs): 

843 """Subclass of Arc where pushes return no availability to pull. 

844 

845 This creates an Arc where only push requests/checks can be sent, similar to a 

846 CSO. 

847 """ 

848 super().__init__(**kwargs) 

849 self.send_pull_request = self.send_pull_deny 

850 self.send_pull_check = self.send_pull_check_deny 

851 

852 def send_pull_deny(self, vqip, tag="default", force=False): 

853 """Function used to deny any pull requests. 

854 

855 Args: 

856 vqip (dict): A dict VQIP of water to pull 

857 tag (str, optional): optional message to direct the out_port's 

858 query_handler which function to call. Defaults to 'default'. 

859 force (bool, optional): Argument used to cause function to ignore tank 

860 capacity of out_port, possibly resulting in pooling. Should not be used 

861 unless out_port is a tank object. Defaults to False. 

862 

863 Returns: 

864 (dict): A VQIP amount of water that was successfully pulled 

865 """ 

866 return self.empty_vqip() 

867 

868 def send_pull_check_deny(self, vqip=None, tag="default"): 

869 """Function used to deny any pull checks. 

870 

871 Args: 

872 vqip (dict): A dict VQIP of water to pull that can be specified. Defaults to 

873 None, which returns maximum capacity to pull. 

874 tag (str, optional): optional message to direct the out_port's 

875 query_handler which function to call. Defaults to 'default'. 

876 

877 Returns: 

878 (dict): An empty VQIP amount of water indicating no water can be pulled 

879 """ 

880 return self.empty_vqip() 

881 

882 

883class SewerArc(Arc): 

884 """""" 

885 

886 pass 

887 

888 

889class WeirArc(SewerArc): 

890 """""" 

891 

892 pass