Coverage for wsimod\arcs\arcs.py: 29%
303 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-30 14:55 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-30 14:55 +0000
1# -*- coding: utf-8 -*-
2"""Created on Wed Apr 7 08:43:32 2021.
4@author: Barney
6Converted to totals on Thur Apr 21 2022
7"""
9from typing import Any, Dict
11from wsimod.core import constants
12from wsimod.core.core import DecayObj, WSIObj
14# from wsimod.nodes import nodes #Complains about circular imports.
15# I don't think it should do..
18class Arc(WSIObj):
19 """"""
21 def __init__(
22 self,
23 name="",
24 capacity=constants.UNBOUNDED_CAPACITY,
25 preference=1,
26 in_port=None,
27 out_port=None,
28 **kwargs,
29 ):
30 """Arc objects are the way for information to be passed between nodes in WSIMOD.
31 They have an in_port (where a message comes from) and an out_port (where a
32 message goes to).
34 Returns:
35 name (str): Name of arc. Defaults to ''.
36 capacity (float): Capacity of flow along an arc (vol/timestep).
37 Defaults to constants.UNBOUNDED_CAPACITY.
38 preference (float): Number used to prioritise or deprioritise use of an arc
39 when flexibility exists
40 in_port: A WSIMOD node object where the arc starts
41 out_port: A WSIMOD node object where the arc ends
42 """
43 # Default essential parameters
44 self.name = name
45 self.in_port = in_port
46 self.out_port = out_port
47 self.capacity = capacity
48 self.preference = preference
50 # Update args
51 WSIObj.__init__(self)
52 self.__dict__.update(kwargs)
54 # def all_subclasses(cls):
55 # return set(cls.__subclasses__()).union(
56 # [s for c in cls.__subclasses__() for s in all_subclasses(c)])
57 # node_types = [x.__name__ for x in all_subclasses(nodes.Node)] + ['Node']
59 # if self.name in node_types:
60 # print('Warning: arc name should not take a node class name')
61 # #TODO... not sure why... also currently commented for import issues..
63 # Initialise states
64 self.flow_in = 0
65 self.flow_out = 0
66 self.vqip_in = self.empty_vqip()
67 self.vqip_out = self.empty_vqip()
69 # Update ports
70 self.in_port.out_arcs[self.name] = self
71 self.out_port.in_arcs[self.name] = self
73 out_type = self.out_port.__class__.__name__
74 in_type = self.in_port.__class__.__name__
76 if hasattr(self.in_port, "out_arcs_type"):
77 self.in_port.out_arcs_type[out_type][self.name] = self
79 if hasattr(self.out_port, "in_arcs_type"):
80 self.out_port.in_arcs_type[in_type][self.name] = self
82 # Mass balance checking
83 self.mass_balance_in = [lambda: self.vqip_in]
84 self.mass_balance_out = [lambda: self.vqip_out]
85 self.mass_balance_ds = [lambda: self.empty_vqip()]
87 def apply_overrides(self, overrides: Dict[str, Any] = {}) -> None:
88 """Apply overrides to the node.
90 Args:
91 overrides (dict, optional): Dictionary of overrides. Defaults to {}.
92 """
93 self.capacity = overrides.pop("capacity", self.capacity)
94 self.preference = overrides.pop("preference", self.preference)
95 if len(overrides) > 0:
96 print(f"No override behaviour defined for: {overrides.keys()}")
98 def arc_mass_balance(self):
99 """Checks mass balance for inflows/outflows/storage change in an arc.
101 Returns:
102 in_ (dict) Total vqip of vqip_in and other inputs in mass_balance_in
103 ds_ (dict): Total vqip of change in arc in mass_balance_ds
104 out_ (dict): Total vqip of vqip_out and other outputs in mass_balance_out
106 Examples:
107 arc_in, arc_out, arc_ds = my_arc.arc_mass_balance()
108 """
109 in_, ds_, out_ = self.mass_balance()
110 return in_, ds_, out_
112 def send_push_request(self, vqip, tag="default", force=False):
113 """Function used to transmit a push request from one node (in_port) to another
114 node (out_port).
116 Args:
117 vqip (dict): A dict VQIP of water to push
118 tag (str, optional): optional message to direct the out_port's query_
119 handler which function to call. Defaults to 'default'.
120 force (bool, optional): Argument used to cause function to ignore tank
121 capacity of out_port, possibly resulting in pooling. Should not be used
122 unless
123 out_port is a tank object. Defaults to False.
125 Returns:
126 (dict): A VQIP amount of water that was not successfully pushed
127 """
128 vqip = self.copy_vqip(vqip)
130 # Apply pipe capacity
131 if force:
132 not_pushed = self.empty_vqip()
133 else:
134 excess_in = self.get_excess(direction="push", vqip=vqip, tag=tag)
135 not_pushed = self.v_change_vqip(
136 vqip, max(vqip["volume"] - excess_in["volume"], 0)
137 )
139 # Don't attempt to send volume that exceeds capacity
140 vqip = self.extract_vqip(vqip, not_pushed)
142 # Set push
143 reply = self.out_port.push_set(vqip, tag)
145 # Update total amount successfully sent
146 vqip = self.extract_vqip(vqip, reply)
148 # Combine non-sent water
149 reply = self.sum_vqip(reply, not_pushed)
151 # Update mass balance
152 self.flow_in += vqip["volume"]
153 self.flow_out = self.flow_in
155 self.vqip_in = self.sum_vqip(self.vqip_in, vqip)
156 self.vqip_out = self.vqip_in
158 return reply
160 def send_pull_request(self, vqip, tag="default"):
161 """Function used to transmit a pull request from one node (in_port) to another
162 node (out_port).
164 Args:
165 vqip (dict): A dict VQIP of water to pull (by default, only 'volume' key is
166 used)
167 tag (str, optional): optional message to direct the out_port's query_handler
168 which
169 function to call. Defaults to 'default'.
171 Returns:
172 (dict): A VQIP amount of water that was successfully pulled
173 """
174 volume = vqip["volume"]
175 # Apply pipe capacity
176 excess_in = self.get_excess(direction="pull", vqip=vqip, tag=tag)["volume"]
177 not_pulled = max(volume - excess_in, 0)
178 volume -= not_pulled
180 if volume > 0:
181 for pol in constants.ADDITIVE_POLLUTANTS:
182 if pol in vqip.keys():
183 vqip[pol] *= volume / vqip["volume"]
185 vqip["volume"] = volume
187 # Make pull
188 vqip = self.in_port.pull_set(vqip, tag)
190 # Update mass balance
191 self.flow_in += vqip["volume"]
192 self.flow_out = self.flow_in
194 self.vqip_in = self.sum_vqip(self.vqip_in, vqip)
195 self.vqip_out = self.vqip_in
197 return vqip
199 def send_push_check(self, vqip=None, tag="default"):
200 """Function used to transmit a push check from one node (in_port) to another
201 node (out_port).
203 Args:
204 vqip (dict): A dict VQIP of water to push that can be specified. Defaults to
205 None, which returns maximum capacity to push.
206 tag (str, optional): optional message to direct the out_port's
207 query_handler which function to call. Defaults to 'default'.
209 Returns:
210 (dict): A VQIP amount of water that could be pushed
211 """
212 return self.get_excess(direction="push", vqip=vqip, tag=tag)
214 def send_pull_check(self, vqip=None, tag="default"):
215 """Function used to transmit a pull check from one node (in_port) to another
216 node (out_port).
218 Args:
219 vqip (dict): A dict VQIP of water to pull that can be specified (by default,
220 only the 'volume' key is used). Defaults to None, which returns all
221 available water to pull.
222 tag (str, optional): optional message to direct the out_port's
223 query_handler which function to call. Defaults to 'default'.
225 Returns:
226 (dict): A VQIP amount of water that could be pulled
227 """
228 return self.get_excess(direction="pull", vqip=vqip, tag=tag)
230 def get_excess(self, direction, vqip=None, tag="default"):
231 """Calculate how much could be pull/pulled along the arc by combining both arc
232 capacity and out_port check information.
234 Args:
235 direction (str): should be 'pull' or 'push'
236 vqip (dict, optional): A VQIP amount to push/pull that can be
237 specified. Defaults to None, which returns all available water to
238 pull or maximum capacity to push (depending on 'direction').
239 tag (str, optional): optional message to direct the out_port's query_handler
240 which function to call. Defaults to 'default'.
242 Returns:
243 (dict): A VQIP amount of water that could be pulled/pushed
244 """
245 # Pipe capacity
246 pipe_excess = self.capacity - self.flow_in
248 # Node capacity
249 if direction == "push":
250 node_excess = self.out_port.push_check(vqip, tag)
251 elif direction == "pull":
252 node_excess = self.in_port.pull_check(vqip, tag)
253 excess = min(pipe_excess, node_excess["volume"])
255 # TODO sensible to min(vqip, excess) here? (though it should be applied by node)
257 return self.v_change_vqip(node_excess, excess)
259 def end_timestep(self):
260 """End timestep in an arc, resetting flow/vqip in/out (which determine) the
261 capacity for that timestep."""
262 self.vqip_in = self.empty_vqip()
263 self.vqip_out = self.empty_vqip()
264 self.flow_in = 0
265 self.flow_out = 0
267 def reinit(self):
268 """Reinitiatilise."""
269 self.end_timestep()
272class QueueArc(Arc):
273 """"""
275 def __init__(self, number_of_timesteps=0, **kwargs):
276 """A queue arc that stores each push or pull individually in the queue. Enables
277 implementation of travel time. A fixed number of timesteps can be specified as a
278 parameter, and additional number of timesteps can be specified when the requests
279 are made.
281 The queue is a list of requests, where their travel time is decremented
282 by 1 each timestep. Any requests with a travel time of 0 will be sent
283 onwards if the 'update_queue' function is called.
285 Args:
286 number_of_timesteps (int, optional): Fixed number of timesteps that
287 it takes to traverse the arc. Defaults to 0.
288 """
289 self.number_of_timesteps = number_of_timesteps
290 self.queue = []
291 super().__init__(**kwargs)
293 self.queue_storage = self.empty_vqip()
294 self.queue_storage_ = self.empty_vqip()
296 self.mass_balance_ds.append(lambda: self.queue_arc_ds())
298 def queue_arc_ds(self):
299 """Calculate change in amount of water and other pollutants in the arc.
301 Returns:
302 (dict): A VQIP amount of change
303 """
304 self.queue_storage = self.queue_arc_sum()
305 return self.extract_vqip(self.queue_storage, self.queue_storage_)
307 def queue_arc_sum(self):
308 """Sum the total water in the requests in the queue of the arc.
310 Returns:
311 (dict): A VQIP amount of water/pollutants in the arc
312 """
313 queue_storage = self.empty_vqip()
314 for request in self.queue:
315 queue_storage = self.sum_vqip(queue_storage, request["vqip"])
316 return queue_storage
318 def send_pull_request(self, vqip, tag="default", time=0):
319 """Function used to transmit a pull request from one node (in_port) to another
320 node (out_port). Any pulled water is immediately removed from the out_port and
321 then takes the travel time to be received. This function has not been
322 extensively tested.
324 Args:
325 vqip (_type_): A dict VQIP of water to pull (by default, only 'volume' key
326 is used)
327 tag (str, optional): optional message to direct the out_port's query_handler
328 which function to call. Defaults to 'default'.
329 time (int, optional): Travel time for request to spend in the arc (in
330 addition to the arc's 'number_of_timesteps' parameter). Defaults to 0.
332 Returns:
333 (dict): A VQIP amount of water that was successfully pulled.
334 """
335 volume = vqip["volume"]
336 # Apply pipe capacity
337 excess_in = self.get_excess(direction="pull", vqip=vqip)["volume"]
338 not_pulled = max(volume - excess_in, 0)
339 volume -= not_pulled
341 for pol in constants.ADDITIVE_POLLUTANTS:
342 if pol in vqip.keys():
343 vqip[pol] *= volume / vqip["volume"]
345 vqip["volume"] = volume
347 # Make pull
348 vqip = self.in_port.pull_set(vqip)
350 # Update to queue request
351 request = {"time": time + self.number_of_timesteps, "vqip": vqip}
353 # vqtip enters arc as a request
354 self.enter_queue(request, direction="pull")
356 # Update request queue and return pulls from queue
357 reply = self.update_queue(direction="pull")
358 return reply
360 def send_push_request(self, vqip_, tag="default", force=False, time=0):
361 """Function used to transmit a push request from one node (in_port) to another
362 node (out_port).
364 Args:
365 vqip_ (dict): A dict VQIP of water to push.
366 tag (str, optional): optional message to direct the out_port's query_handler
367 which function to call. Defaults to 'default'.
368 force (bool, optional): Ignore the capacity of the arc (note does not
369 currently, pass the force argument to the out_port). Defaults to False.
370 time (int, optional): Travel time for request to spend in the arc (in
371 addition to the arc's 'number_of_timesteps' parameter). Defaults to 0.
373 Returns:
374 (dict): A VQIP amount of water that was not successfully pushed
375 """
376 vqip = self.copy_vqip(vqip_)
378 if vqip["volume"] < constants.FLOAT_ACCURACY:
379 return self.empty_vqip()
381 # Apply pipe capacity
382 if force:
383 not_pushed = self.empty_vqip()
384 else:
385 excess_in = self.get_excess(direction="push", vqip=vqip, tag=tag)
386 not_pushed = self.v_change_vqip(
387 vqip, max(vqip["volume"] - excess_in["volume"], 0)
388 )
390 vqip = self.extract_vqip(vqip, not_pushed)
392 # Update to queue request
393 request = {"time": time + self.number_of_timesteps, "vqip": vqip}
395 # vqtip enters arc as a request
396 self.enter_queue(request, direction="push", tag=tag)
398 # Update request queue
399 backflow = self.update_queue(direction="push")
400 not_pushed = self.sum_vqip(not_pushed, backflow)
402 if backflow["volume"] > vqip_["volume"]:
403 print("more backflow than vqip...")
405 self.vqip_in = self.extract_vqip(self.vqip_in, backflow)
407 return not_pushed
409 def enter_arc(self, request, direction, tag):
410 """Function used to cause format a request into the format expected by the
411 enter_queue function.
413 Args:
414 request (dict): A dict with a VQIP under the 'vqip' key and the travel
415 time under the 'time' key.
416 direction (str): Direction of flow, can be 'push' or 'pull
417 tag (str, optional): optional message to direct the out_port's
418 query_handler which function to call. Defaults to 'default'.
420 Returns:
421 (dict): The request dict with additional information entered for the queue.
422 """
423 request["average_flow"] = request["vqip"]["volume"] / (request["time"] + 1)
424 request["direction"] = direction
425 request["tag"] = tag
427 self.flow_in += request["average_flow"]
428 self.vqip_in = self.sum_vqip(self.vqip_in, request["vqip"])
430 return request
432 def enter_queue(self, request, direction=None, tag="default"):
433 """Add a request into the arc's queue list.
435 Args:
436 request (dict): A dict with a VQIP under the 'vqip' key and the travel
437 time under the 'time' key.
438 direction (str): Direction of flow, can be 'push' or 'pull
439 tag (str, optional): optional message to direct the out_port's
440 query_handler which function to call. Defaults to 'default'.
441 """
442 # Update inflows and format request
443 request = self.enter_arc(request, direction, tag)
445 # Enter queue
446 self.queue.append(request)
448 def update_queue(self, direction=None, backflow_enabled=True):
449 """Iterate over all requests in the queue, removing them if they have no volume.
451 If a request is a push and has 0 travel time remaining then
452 the push will be triggered at the out_port, if the out_port responds that
453 it cannot receive the push, then this water will be returned as backflow
454 (if enabled).
456 If a request is a pull and has 0 travel time remaining then it is simply summed
457 with other 0 travel time pull_requests and returned (since the pull is made at
458 the out_port when the send_pull_request is made).
461 Args:
462 direction (str, optional): Direction of flow, can be 'push' or 'pull.
463 Defaults to None.
464 backflow_enabled (bool, optional): Enable backflow, described above, if not
465 enabled then the request will remain in the queue until all water has
466 been received. Defaults to True.
468 Returns:
469 total_backflow (dict): In the case of a push direction, any backflow will be
470 returned as a VQIP amount
471 total_removed (dict): In the case of a pull direction, any pulled water will
472 be returned as a VQIP amount
473 """
474 done_requests = []
476 total_removed = self.empty_vqip()
477 total_backflow = self.empty_vqip()
478 # Iterate over requests
479 for request in self.queue:
480 if request["direction"] == direction:
481 vqip = request["vqip"]
483 if vqip["volume"] < constants.FLOAT_ACCURACY:
484 # Add to queue for removal
485 done_requests.append(request)
486 elif request["time"] == 0:
487 if direction == "push":
488 # Attempt to push request
489 reply = self.out_port.push_set(vqip, request["tag"])
490 removed = vqip["volume"] - reply["volume"]
492 elif direction == "pull":
493 # Water has already been pulled, so assume all received
494 removed = vqip["volume"]
496 else:
497 print("No direction")
499 # Update outflows
500 self.flow_out += request["average_flow"] * removed / vqip["volume"]
501 vqip_ = self.v_change_vqip(vqip, removed)
502 total_removed = self.sum_vqip(total_removed, vqip_)
504 # Assume that any water that cannot arrive at destination this
505 # timestep is backflow
506 rejected = self.v_change_vqip(vqip, vqip["volume"] - removed)
508 if backflow_enabled | (
509 rejected["volume"] < constants.FLOAT_ACCURACY
510 ):
511 total_backflow = self.sum_vqip(rejected, total_backflow)
512 done_requests.append(request)
513 else:
514 request["vqip"] = rejected
516 self.vqip_out = self.sum_vqip(self.vqip_out, total_removed)
518 # Remove done requests
519 for request in done_requests:
520 self.queue.remove(request)
522 # return total_removed
523 if direction == "pull":
524 return total_removed
525 elif direction == "push":
526 return total_backflow
527 else:
528 print("No direction")
530 def end_timestep(self):
531 """End timestep in an arc, resetting flow/vqip in/out (which determine) the
532 capacity for that timestep.
534 Update times of requests in the queue.
535 """
536 self.vqip_in = self.empty_vqip()
537 self.vqip_out = self.empty_vqip()
538 self.flow_in = 0
539 self.flow_out = 0
541 self.queue_storage_ = self.copy_vqip(self.queue_storage)
542 self.queue_storage = self.empty_vqip()
544 for request in self.queue:
545 request["time"] = max(request["time"] - 1, 0)
547 # TODO - update_queue here?
549 def reinit(self):
550 """"""
551 self.end_timestep()
552 self.queue = []
555class AltQueueArc(QueueArc):
556 """"""
558 def __init__(self, **kwargs):
559 """A simpler queue arc that has a queue that is a dict where each key is the
560 travel time.
562 Cannot be used if arc capacity is dynamic. Cannot be used for pulls.
563 """
564 self.queue_arc_sum = self.alt_queue_arc_sum
566 super().__init__(**kwargs)
567 self.queue = {0: self.empty_vqip(), 1: self.empty_vqip()}
568 self.max_travel = 1
570 def alt_queue_arc_sum(self):
571 """Sum the total water in the queue of the arc.
573 Returns:
574 (dict): A VQIP amount of water/pollutants in the arc
575 """
576 queue_storage = self.empty_vqip()
577 for request in self.queue.values():
578 queue_storage = self.sum_vqip(queue_storage, request)
579 return queue_storage
581 def enter_queue(self, request, direction="push", tag="default"):
582 """Add a request into the arc's queue.
584 Args:
585 request (dict): A dict with a VQIP under the 'vqip' key and the travel
586 time under the 'time' key.
587 direction (str): Direction of flow, can be 'push' only. Defaults to 'push'
588 tag (str, optional): Optional message for out_port's query handler, can be
589 'default' only. Defaults to 'default'.
590 """
591 # Update inflows and format request
592 request = self.enter_arc(request, direction, tag)
594 # Sum into queue
595 if request["time"] in self.queue.keys():
596 self.queue[request["time"]] = self.sum_vqip(
597 self.queue[request["time"]], request["vqip"]
598 )
599 else:
600 self.queue[request["time"]] = request["vqip"]
601 self.max_travel = max(self.max_travel, request["time"])
603 def update_queue(self, direction=None, backflow_enabled=True):
604 """Trigger the push of water in the 0th key for the queue, if the out_port
605 responds that it cannot receive the push, then this water will be returned as
606 backflow (if enabled).
608 Args:
609 direction (str): Direction of flow, can be 'push' only. Defaults to 'push'
610 backflow_enabled (bool, optional): Enable backflow, described above, if not
611 enabled then the request will remain in the queue until all water has
612 been received. Defaults to True.
614 Returns:
615 backflow (dict): In the case of a push direction, any backflow will be
616 returned as a VQIP amount
617 """
618 # TODO - can this work for pulls??
620 total_removed = self.copy_vqip(self.queue[0])
622 # Push 0 travel time water
623 backflow = self.out_port.push_set(total_removed)
625 if not backflow_enabled:
626 self.queue[0] = backflow
627 backflow = self.empty_vqip()
628 else:
629 self.queue[0] = self.empty_vqip()
631 total_removed = self.v_change_vqip(
632 total_removed, total_removed["volume"] - backflow["volume"]
633 )
635 self.flow_out += total_removed["volume"]
636 self.vqip_out = self.sum_vqip(self.vqip_out, total_removed)
638 return backflow
640 def end_timestep(self):
641 """End timestep in an arc, resetting flow/vqip in/out (which determine) the
642 capacity for that timestep.
644 Update timings in the queue.
645 """
646 self.vqip_in = self.empty_vqip()
647 self.vqip_out = self.empty_vqip()
648 self.flow_in = 0
649 self.flow_out = 0
650 self.queue_storage_ = self.copy_vqip(self.queue_storage)
651 self.queue_storage = self.empty_vqip()
653 queue_ = self.queue.copy()
654 keys = self.queue.keys()
655 for i in range(self.max_travel):
656 if (i + 1) in keys:
657 self.queue[i] = queue_[i + 1]
658 self.queue[i + 1] = self.empty_vqip()
660 self.queue[0] = self.sum_vqip(queue_[0], queue_[1])
662 def reinit(self):
663 """"""
664 self.end_timestep()
665 self.queue = {0: self.empty_vqip(), 1: self.empty_vqip()}
668class DecayArc(QueueArc, DecayObj):
669 """"""
671 def __init__(self, decays={}, **kwargs):
672 """A QueueArc that applies decays from a DecayObj.
674 Args:
675 decays (dict, optional): A dict of dicts containing a key for each pollutant
676 that decays and within that, a key for each parameter (a constant and
677 exponent). Defaults to {}.
678 """
679 self.decays = decays
681 QueueArc.__init__(self, **kwargs)
682 DecayObj.__init__(self, decays)
684 self.mass_balance_out.append(lambda: self.total_decayed)
686 def enter_queue(self, request, direction=None, tag="default"):
687 """Add a request into the arc's queue list. Apply the make_decay function (i.e.,
688 the decay that occur's this timestep).
690 Args:
691 request (dict): A dict with a VQIP under the 'vqip' key and the travel
692 time under the 'time' key.
693 direction (str): Direction of flow, can be 'push' or 'pull
694 tag (str, optional): optional message to direct the out_port's
695 query_handler which function to call. Defaults to 'default'.
696 """
697 # Update inflows and format
698 request = self.enter_arc(request, direction, tag)
700 # TODO - currently decay depends on temp at the in_port data object..
701 # surely on vqip would be more sensible? (though this is true in many
702 # places including WTW)
704 # Decay on entry
705 request["vqip"] = self.make_decay(request["vqip"])
707 # Append to queue
708 self.queue.append(request)
710 def end_timestep(self):
711 """End timestep in an arc, resetting flow/vqip in/out (which determine) the
712 capacity for that timestep.
714 Update times of requests in the queue. Apply the make_decay function (i.e., the
715 decay that occurs in the following timestep).
716 """
717 self.vqip_in = self.empty_vqip()
718 self.vqip_out = self.empty_vqip()
719 self.total_decayed = self.empty_vqip()
720 self.flow_in = 0
721 self.flow_out = 0
723 self.queue_storage_ = self.copy_vqip(self.queue_storage)
724 self.queue_storage = self.empty_vqip()
726 for request in self.queue:
727 request["vqip"] = self.make_decay(request["vqip"])
728 request["time"] = max(request["time"] - 1, 0)
731class DecayArcAlt(AltQueueArc, DecayObj):
732 """"""
734 def __init__(self, decays={}, **kwargs):
735 """An AltQueueArc that applies decays from a DecayObj.
737 Args:
738 decays (dict, optional): A dict of dicts containing a key for each pollutant
739 that decays and within that, a key for each parameter (a constant and
740 exponent). Defaults to {}.
741 """
742 self.decays = {}
744 # super().__init__(**kwargs)
745 AltQueueArc.__init__(self, **kwargs)
746 DecayObj.__init__(self, decays)
748 self.end_timestep = self._end_timestep
750 self.mass_balance_out.append(lambda: self.total_decayed)
752 def enter_queue(self, request, direction=None, tag="default"):
753 """Add a request into the arc's queue. Apply the make_decay function (i.e., the
754 decay that occur's this timestep).
756 Args:
757 request (dict): A dict with a VQIP under the 'vqip' key and the travel
758 time under the 'time' key.
759 direction (str): Direction of flow, can be 'push' only. Defaults to 'push'
760 tag (str, optional): Optional message for out_port's query handler, can be
761 'default' only. Defaults to 'default'.
762 """
763 # TODO- has no tags
765 # Update inflows and format
766 request = self.enter_arc(request, direction, tag)
768 # Decay on entry
769 request["vqip"] = self.make_decay(request["vqip"])
771 # Sum into queue
772 if request["time"] in self.queue.keys():
773 self.queue[request["time"]] = self.sum_vqip(
774 self.queue[request["time"]], request["vqip"]
775 )
776 else:
777 self.queue[request["time"]] = request["vqip"]
778 self.max_travel = max(self.max_travel, request["time"])
780 def _end_timestep(self):
781 """End timestep in an arc, resetting flow/vqip in/out (which determine) the
782 capacity for that timestep.
784 Update timings in the queue. Apply the make_decay function (i.e., the decay that
785 occurs in the following timestep).
786 """
787 self.vqip_in = self.empty_vqip()
788 self.vqip_out = self.empty_vqip()
789 self.total_decayed = self.empty_vqip()
790 self.flow_in = 0
791 self.flow_out = 0
793 self.queue_storage_ = self.copy_vqip(self.queue_storage)
794 self.queue_storage = (
795 self.empty_vqip()
796 ) # TODO I don't think this (or any queue_storage= empty) is necessary
798 queue_ = self.queue.copy()
799 keys = self.queue.keys()
800 for i in range(self.max_travel):
801 if (i + 1) in keys:
802 self.queue[i] = self.make_decay(queue_[i + 1])
803 self.queue[i + 1] = self.empty_vqip()
805 self.queue[0] = self.sum_vqip(self.queue[0], self.make_decay(queue_[0]))
808class PullArc(Arc):
809 """"""
811 def __init__(self, **kwargs):
812 """Subclass of Arc where pushes return no availability to push.
814 This creates an Arc where only pull requests/checks can be sent, similar to a
815 river abstraction.
816 """
817 super().__init__(**kwargs)
818 self.send_push_request = self.send_push_deny
819 self.send_push_check = self.send_push_check_deny
821 def send_push_deny(self, vqip, tag="default", force=False):
822 """Function used to deny any push requests.
824 Args:
825 vqip (dict): A dict VQIP of water to push
826 tag (str, optional): optional message to direct the out_port's
827 query_handler which function to call. Defaults to 'default'.
828 force (bool, optional): Argument used to cause function to ignore tank
829 capacity of out_port, possibly resulting in pooling. Should not be used
830 unless out_port is a tank object. Defaults to False.
832 Returns:
833 (dict): A VQIP amount of water that was not successfully pushed
834 """
835 return vqip
837 def send_push_check_deny(self, vqip=None, tag="default"):
838 """Function used to deny any push checks.
840 Args:
841 vqip (dict): A dict VQIP of water to push that can be specified. Defaults to
842 None, which returns maximum capacity to push.
843 tag (str, optional): optional message to direct the out_port's
844 query_handler which function to call. Defaults to 'default'.
846 Returns:
847 (dict): An empty VQIP amount of water indicating no water can be pushed
848 """
849 return self.empty_vqip()
852class PushArc(Arc):
853 """"""
855 def __init__(self, **kwargs):
856 """Subclass of Arc where pushes return no availability to pull.
858 This creates an Arc where only push requests/checks can be sent, similar to a
859 CSO.
860 """
861 super().__init__(**kwargs)
862 self.send_pull_request = self.send_pull_deny
863 self.send_pull_check = self.send_pull_check_deny
865 def send_pull_deny(self, vqip, tag="default", force=False):
866 """Function used to deny any pull requests.
868 Args:
869 vqip (dict): A dict VQIP of water to pull
870 tag (str, optional): optional message to direct the out_port's
871 query_handler which function to call. Defaults to 'default'.
872 force (bool, optional): Argument used to cause function to ignore tank
873 capacity of out_port, possibly resulting in pooling. Should not be used
874 unless out_port is a tank object. Defaults to False.
876 Returns:
877 (dict): A VQIP amount of water that was successfully pulled
878 """
879 return self.empty_vqip()
881 def send_pull_check_deny(self, vqip=None, tag="default"):
882 """Function used to deny any pull checks.
884 Args:
885 vqip (dict): A dict VQIP of water to pull that can be specified. Defaults to
886 None, which returns maximum capacity to pull.
887 tag (str, optional): optional message to direct the out_port's
888 query_handler which function to call. Defaults to 'default'.
890 Returns:
891 (dict): An empty VQIP amount of water indicating no water can be pulled
892 """
893 return self.empty_vqip()
896class SewerArc(Arc):
897 """"""
899 pass
902class WeirArc(SewerArc):
903 """"""
905 pass