Coverage for wsimod/arcs/arcs.py: 17%
298 statements
« prev ^ index » next coverage.py v7.3.2, created at 2024-01-11 16:39 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2024-01-11 16:39 +0000
1# -*- coding: utf-8 -*-
2"""Created on Wed Apr 7 08:43:32 2021.
4@author: Barney
6Converted to totals on Thur Apr 21 2022
7"""
9from wsimod.core import constants
10from wsimod.core.core import DecayObj, WSIObj
12# from wsimod.nodes import nodes #Complains about circular imports.
13# I don't think it should do..
16class Arc(WSIObj):
17 """"""
19 def __init__(
20 self,
21 name="",
22 capacity=constants.UNBOUNDED_CAPACITY,
23 preference=1,
24 in_port=None,
25 out_port=None,
26 **kwargs,
27 ):
28 """Arc objects are the way for information to be passed between nodes in WSIMOD.
29 They have an in_port (where a message comes from) and an out_port (where a
30 message goes to).
32 Returns:
33 name (str): Name of arc. Defaults to ''.
34 capacity (float): Capacity of flow along an arc (vol/timestep).
35 Defaults to constants.UNBOUNDED_CAPACITY.
36 preference (float): Number used to prioritise or deprioritise use of an arc
37 when flexibility exists
38 in_port: A WSIMOD node object where the arc starts
39 out_port: A WSIMOD node object where the arc ends
40 """
41 # Default essential parameters
42 self.name = name
43 self.in_port = in_port
44 self.out_port = out_port
45 self.capacity = capacity
46 self.preference = preference
48 # Update args
49 WSIObj.__init__(self)
50 self.__dict__.update(kwargs)
52 # def all_subclasses(cls):
53 # return set(cls.__subclasses__()).union(
54 # [s for c in cls.__subclasses__() for s in all_subclasses(c)])
55 # node_types = [x.__name__ for x in all_subclasses(nodes.Node)] + ['Node']
57 # if self.name in node_types:
58 # print('Warning: arc name should not take a node class name')
59 # #TODO... not sure why... also currently commented for import issues..
61 # Initialise states
62 self.flow_in = 0
63 self.flow_out = 0
64 self.vqip_in = self.empty_vqip()
65 self.vqip_out = self.empty_vqip()
67 # Update ports
68 self.in_port.out_arcs[self.name] = self
69 self.out_port.in_arcs[self.name] = self
71 out_type = self.out_port.__class__.__name__
72 in_type = self.in_port.__class__.__name__
74 if hasattr(self.in_port, "out_arcs_type"):
75 self.in_port.out_arcs_type[out_type][self.name] = self
77 if hasattr(self.out_port, "in_arcs_type"):
78 self.out_port.in_arcs_type[in_type][self.name] = self
80 # Mass balance checking
81 self.mass_balance_in = [lambda: self.vqip_in]
82 self.mass_balance_out = [lambda: self.vqip_out]
83 self.mass_balance_ds = [lambda: self.empty_vqip()]
85 def arc_mass_balance(self):
86 """Checks mass balance for inflows/outflows/storage change in an arc.
88 Returns:
89 in_ (dict) Total vqip of vqip_in and other inputs in mass_balance_in
90 ds_ (dict): Total vqip of change in arc in mass_balance_ds
91 out_ (dict): Total vqip of vqip_out and other outputs in mass_balance_out
93 Examples:
94 arc_in, arc_out, arc_ds = my_arc.arc_mass_balance()
95 """
96 in_, ds_, out_ = self.mass_balance()
97 return in_, ds_, out_
99 def send_push_request(self, vqip, tag="default", force=False):
100 """Function used to transmit a push request from one node (in_port) to another
101 node (out_port).
103 Args:
104 vqip (dict): A dict VQIP of water to push
105 tag (str, optional): optional message to direct the out_port's query_
106 handler which function to call. Defaults to 'default'.
107 force (bool, optional): Argument used to cause function to ignore tank
108 capacity of out_port, possibly resulting in pooling. Should not be used
109 unless
110 out_port is a tank object. Defaults to False.
112 Returns:
113 (dict): A VQIP amount of water that was not successfully pushed
114 """
115 vqip = self.copy_vqip(vqip)
117 # Apply pipe capacity
118 if force:
119 not_pushed = self.empty_vqip()
120 else:
121 excess_in = self.get_excess(direction="push", vqip=vqip, tag=tag)
122 not_pushed = self.v_change_vqip(
123 vqip, max(vqip["volume"] - excess_in["volume"], 0)
124 )
126 # Don't attempt to send volume that exceeds capacity
127 vqip = self.extract_vqip(vqip, not_pushed)
129 # Set push
130 reply = self.out_port.push_set(vqip, tag)
132 # Update total amount successfully sent
133 vqip = self.extract_vqip(vqip, reply)
135 # Combine non-sent water
136 reply = self.sum_vqip(reply, not_pushed)
138 # Update mass balance
139 self.flow_in += vqip["volume"]
140 self.flow_out = self.flow_in
142 self.vqip_in = self.sum_vqip(self.vqip_in, vqip)
143 self.vqip_out = self.vqip_in
145 return reply
147 def send_pull_request(self, vqip, tag="default"):
148 """Function used to transmit a pull request from one node (in_port) to another
149 node (out_port).
151 Args:
152 vqip (dict): A dict VQIP of water to pull (by default, only 'volume' key is
153 used)
154 tag (str, optional): optional message to direct the out_port's query_handler
155 which
156 function to call. Defaults to 'default'.
158 Returns:
159 (dict): A VQIP amount of water that was successfully pulled
160 """
161 volume = vqip["volume"]
162 # Apply pipe capacity
163 excess_in = self.get_excess(direction="pull", vqip=vqip, tag=tag)["volume"]
164 not_pulled = max(volume - excess_in, 0)
165 volume -= not_pulled
167 if volume > 0:
168 for pol in constants.ADDITIVE_POLLUTANTS:
169 if pol in vqip.keys():
170 vqip[pol] *= volume / vqip["volume"]
172 vqip["volume"] = volume
174 # Make pull
175 vqip = self.in_port.pull_set(vqip, tag)
177 # Update mass balance
178 self.flow_in += vqip["volume"]
179 self.flow_out = self.flow_in
181 self.vqip_in = self.sum_vqip(self.vqip_in, vqip)
182 self.vqip_out = self.vqip_in
184 return vqip
186 def send_push_check(self, vqip=None, tag="default"):
187 """Function used to transmit a push check from one node (in_port) to another
188 node (out_port).
190 Args:
191 vqip (dict): A dict VQIP of water to push that can be specified. Defaults to
192 None, which returns maximum capacity to push.
193 tag (str, optional): optional message to direct the out_port's
194 query_handler which function to call. Defaults to 'default'.
196 Returns:
197 (dict): A VQIP amount of water that could be pushed
198 """
199 return self.get_excess(direction="push", vqip=vqip, tag=tag)
201 def send_pull_check(self, vqip=None, tag="default"):
202 """Function used to transmit a pull check from one node (in_port) to another
203 node (out_port).
205 Args:
206 vqip (dict): A dict VQIP of water to pull that can be specified (by default,
207 only the 'volume' key is used). Defaults to None, which returns all
208 available water to pull.
209 tag (str, optional): optional message to direct the out_port's
210 query_handler which function to call. Defaults to 'default'.
212 Returns:
213 (dict): A VQIP amount of water that could be pulled
214 """
215 return self.get_excess(direction="pull", vqip=vqip, tag=tag)
217 def get_excess(self, direction, vqip=None, tag="default"):
218 """Calculate how much could be pull/pulled along the arc by combining both arc
219 capacity and out_port check information.
221 Args:
222 direction (str): should be 'pull' or 'push'
223 vqip (dict, optional): A VQIP amount to push/pull that can be
224 specified. Defaults to None, which returns all available water to
225 pull or maximum capacity to push (depending on 'direction').
226 tag (str, optional): optional message to direct the out_port's query_handler
227 which function to call. Defaults to 'default'.
229 Returns:
230 (dict): A VQIP amount of water that could be pulled/pushed
231 """
232 # Pipe capacity
233 pipe_excess = self.capacity - self.flow_in
235 # Node capacity
236 if direction == "push":
237 node_excess = self.out_port.push_check(vqip, tag)
238 elif direction == "pull":
239 node_excess = self.in_port.pull_check(vqip, tag)
240 excess = min(pipe_excess, node_excess["volume"])
242 # TODO sensible to min(vqip, excess) here? (though it should be applied by node)
244 return self.v_change_vqip(node_excess, excess)
246 def end_timestep(self):
247 """End timestep in an arc, resetting flow/vqip in/out (which determine) the
248 capacity for that timestep."""
249 self.vqip_in = self.empty_vqip()
250 self.vqip_out = self.empty_vqip()
251 self.flow_in = 0
252 self.flow_out = 0
254 def reinit(self):
255 """Reinitiatilise."""
256 self.end_timestep()
259class QueueArc(Arc):
260 """"""
262 def __init__(self, number_of_timesteps=0, **kwargs):
263 """A queue arc that stores each push or pull individually in the queue. Enables
264 implementation of travel time. A fixed number of timesteps can be specified as a
265 parameter, and additional number of timesteps can be specified when the requests
266 are made.
268 The queue is a list of requests, where their travel time is decremented
269 by 1 each timestep. Any requests with a travel time of 0 will be sent
270 onwards if the 'update_queue' function is called.
272 Args:
273 number_of_timesteps (int, optional): Fixed number of timesteps that
274 it takes to traverse the arc. Defaults to 0.
275 """
276 self.number_of_timesteps = number_of_timesteps
277 self.queue = []
278 super().__init__(**kwargs)
280 self.queue_storage = self.empty_vqip()
281 self.queue_storage_ = self.empty_vqip()
283 self.mass_balance_ds.append(lambda: self.queue_arc_ds())
285 def queue_arc_ds(self):
286 """Calculate change in amount of water and other pollutants in the arc.
288 Returns:
289 (dict): A VQIP amount of change
290 """
291 self.queue_storage = self.queue_arc_sum()
292 return self.extract_vqip(self.queue_storage, self.queue_storage_)
294 def queue_arc_sum(self):
295 """Sum the total water in the requests in the queue of the arc.
297 Returns:
298 (dict): A VQIP amount of water/pollutants in the arc
299 """
300 queue_storage = self.empty_vqip()
301 for request in self.queue:
302 queue_storage = self.sum_vqip(queue_storage, request["vqip"])
303 return queue_storage
305 def send_pull_request(self, vqip, tag="default", time=0):
306 """Function used to transmit a pull request from one node (in_port) to another
307 node (out_port). Any pulled water is immediately removed from the out_port and
308 then takes the travel time to be received. This function has not been
309 extensively tested.
311 Args:
312 vqip (_type_): A dict VQIP of water to pull (by default, only 'volume' key
313 is used)
314 tag (str, optional): optional message to direct the out_port's query_handler
315 which function to call. Defaults to 'default'.
316 time (int, optional): Travel time for request to spend in the arc (in
317 addition to the arc's 'number_of_timesteps' parameter). Defaults to 0.
319 Returns:
320 (dict): A VQIP amount of water that was successfully pulled.
321 """
322 volume = vqip["volume"]
323 # Apply pipe capacity
324 excess_in = self.get_excess(direction="pull", vqip=vqip)["volume"]
325 not_pulled = max(volume - excess_in, 0)
326 volume -= not_pulled
328 for pol in constants.ADDITIVE_POLLUTANTS:
329 if pol in vqip.keys():
330 vqip[pol] *= volume / vqip["volume"]
332 vqip["volume"] = volume
334 # Make pull
335 vqip = self.in_port.pull_set(vqip)
337 # Update to queue request
338 request = {"time": time + self.number_of_timesteps, "vqip": vqip}
340 # vqtip enters arc as a request
341 self.enter_queue(request, direction="pull")
343 # Update request queue and return pulls from queue
344 reply = self.update_queue(direction="pull")
345 return reply
347 def send_push_request(self, vqip_, tag="default", force=False, time=0):
348 """Function used to transmit a push request from one node (in_port) to another
349 node (out_port).
351 Args:
352 vqip_ (dict): A dict VQIP of water to push.
353 tag (str, optional): optional message to direct the out_port's query_handler
354 which function to call. Defaults to 'default'.
355 force (bool, optional): Ignore the capacity of the arc (note does not
356 currently, pass the force argument to the out_port). Defaults to False.
357 time (int, optional): Travel time for request to spend in the arc (in
358 addition to the arc's 'number_of_timesteps' parameter). Defaults to 0.
360 Returns:
361 (dict): A VQIP amount of water that was not successfully pushed
362 """
363 vqip = self.copy_vqip(vqip_)
365 if vqip["volume"] < constants.FLOAT_ACCURACY:
366 return self.empty_vqip()
368 # Apply pipe capacity
369 if force:
370 not_pushed = self.empty_vqip()
371 else:
372 excess_in = self.get_excess(direction="push", vqip=vqip, tag=tag)
373 not_pushed = self.v_change_vqip(
374 vqip, max(vqip["volume"] - excess_in["volume"], 0)
375 )
377 vqip = self.extract_vqip(vqip, not_pushed)
379 # Update to queue request
380 request = {"time": time + self.number_of_timesteps, "vqip": vqip}
382 # vqtip enters arc as a request
383 self.enter_queue(request, direction="push", tag=tag)
385 # Update request queue
386 backflow = self.update_queue(direction="push")
387 not_pushed = self.sum_vqip(not_pushed, backflow)
389 if backflow["volume"] > vqip_["volume"]:
390 print("more backflow than vqip...")
392 self.vqip_in = self.extract_vqip(self.vqip_in, backflow)
394 return not_pushed
396 def enter_arc(self, request, direction, tag):
397 """Function used to cause format a request into the format expected by the
398 enter_queue function.
400 Args:
401 request (dict): A dict with a VQIP under the 'vqip' key and the travel
402 time under the 'time' key.
403 direction (str): Direction of flow, can be 'push' or 'pull
404 tag (str, optional): optional message to direct the out_port's
405 query_handler which function to call. Defaults to 'default'.
407 Returns:
408 (dict): The request dict with additional information entered for the queue.
409 """
410 request["average_flow"] = request["vqip"]["volume"] / (request["time"] + 1)
411 request["direction"] = direction
412 request["tag"] = tag
414 self.flow_in += request["average_flow"]
415 self.vqip_in = self.sum_vqip(self.vqip_in, request["vqip"])
417 return request
419 def enter_queue(self, request, direction=None, tag="default"):
420 """Add a request into the arc's queue list.
422 Args:
423 request (dict): A dict with a VQIP under the 'vqip' key and the travel
424 time under the 'time' key.
425 direction (str): Direction of flow, can be 'push' or 'pull
426 tag (str, optional): optional message to direct the out_port's
427 query_handler which function to call. Defaults to 'default'.
428 """
429 # Update inflows and format request
430 request = self.enter_arc(request, direction, tag)
432 # Enter queue
433 self.queue.append(request)
435 def update_queue(self, direction=None, backflow_enabled=True):
436 """Iterate over all requests in the queue, removing them if they have no volume.
438 If a request is a push and has 0 travel time remaining then
439 the push will be triggered at the out_port, if the out_port responds that
440 it cannot receive the push, then this water will be returned as backflow
441 (if enabled).
443 If a request is a pull and has 0 travel time remaining then it is simply summed
444 with other 0 travel time pull_requests and returned (since the pull is made at
445 the out_port when the send_pull_request is made).
448 Args:
449 direction (str, optional): Direction of flow, can be 'push' or 'pull.
450 Defaults to None.
451 backflow_enabled (bool, optional): Enable backflow, described above, if not
452 enabled then the request will remain in the queue until all water has
453 been received. Defaults to True.
455 Returns:
456 total_backflow (dict): In the case of a push direction, any backflow will be
457 returned as a VQIP amount
458 total_removed (dict): In the case of a pull direction, any pulled water will
459 be returned as a VQIP amount
460 """
461 done_requests = []
463 total_removed = self.empty_vqip()
464 total_backflow = self.empty_vqip()
465 # Iterate over requests
466 for request in self.queue:
467 if request["direction"] == direction:
468 vqip = request["vqip"]
470 if vqip["volume"] < constants.FLOAT_ACCURACY:
471 # Add to queue for removal
472 done_requests.append(request)
473 elif request["time"] == 0:
474 if direction == "push":
475 # Attempt to push request
476 reply = self.out_port.push_set(vqip, request["tag"])
477 removed = vqip["volume"] - reply["volume"]
479 elif direction == "pull":
480 # Water has already been pulled, so assume all received
481 removed = vqip["volume"]
483 else:
484 print("No direction")
486 # Update outflows
487 self.flow_out += request["average_flow"] * removed / vqip["volume"]
488 vqip_ = self.v_change_vqip(vqip, removed)
489 total_removed = self.sum_vqip(total_removed, vqip_)
491 # Assume that any water that cannot arrive at destination this
492 # timestep is backflow
493 rejected = self.v_change_vqip(vqip, vqip["volume"] - removed)
495 if backflow_enabled | (
496 rejected["volume"] < constants.FLOAT_ACCURACY
497 ):
498 total_backflow = self.sum_vqip(rejected, total_backflow)
499 done_requests.append(request)
500 else:
501 request["vqip"] = rejected
503 self.vqip_out = self.sum_vqip(self.vqip_out, total_removed)
505 # Remove done requests
506 for request in done_requests:
507 self.queue.remove(request)
509 # return total_removed
510 if direction == "pull":
511 return total_removed
512 elif direction == "push":
513 return total_backflow
514 else:
515 print("No direction")
517 def end_timestep(self):
518 """End timestep in an arc, resetting flow/vqip in/out (which determine) the
519 capacity for that timestep.
521 Update times of requests in the queue.
522 """
523 self.vqip_in = self.empty_vqip()
524 self.vqip_out = self.empty_vqip()
525 self.flow_in = 0
526 self.flow_out = 0
528 self.queue_storage_ = self.copy_vqip(self.queue_storage)
529 self.queue_storage = self.empty_vqip()
531 for request in self.queue:
532 request["time"] = max(request["time"] - 1, 0)
534 # TODO - update_queue here?
536 def reinit(self):
537 """"""
538 self.end_timestep()
539 self.queue = []
542class AltQueueArc(QueueArc):
543 """"""
545 def __init__(self, **kwargs):
546 """A simpler queue arc that has a queue that is a dict where each key is the
547 travel time.
549 Cannot be used if arc capacity is dynamic. Cannot be used for pulls.
550 """
551 self.queue_arc_sum = self.alt_queue_arc_sum
553 super().__init__(**kwargs)
554 self.queue = {0: self.empty_vqip(), 1: self.empty_vqip()}
555 self.max_travel = 1
557 def alt_queue_arc_sum(self):
558 """Sum the total water in the queue of the arc.
560 Returns:
561 (dict): A VQIP amount of water/pollutants in the arc
562 """
563 queue_storage = self.empty_vqip()
564 for request in self.queue.values():
565 queue_storage = self.sum_vqip(queue_storage, request)
566 return queue_storage
568 def enter_queue(self, request, direction="push", tag="default"):
569 """Add a request into the arc's queue.
571 Args:
572 request (dict): A dict with a VQIP under the 'vqip' key and the travel
573 time under the 'time' key.
574 direction (str): Direction of flow, can be 'push' only. Defaults to 'push'
575 tag (str, optional): Optional message for out_port's query handler, can be
576 'default' only. Defaults to 'default'.
577 """
578 # Update inflows and format request
579 request = self.enter_arc(request, direction, tag)
581 # Sum into queue
582 if request["time"] in self.queue.keys():
583 self.queue[request["time"]] = self.sum_vqip(
584 self.queue[request["time"]], request["vqip"]
585 )
586 else:
587 self.queue[request["time"]] = request["vqip"]
588 self.max_travel = max(self.max_travel, request["time"])
590 def update_queue(self, direction=None, backflow_enabled=True):
591 """Trigger the push of water in the 0th key for the queue, if the out_port
592 responds that it cannot receive the push, then this water will be returned as
593 backflow (if enabled).
595 Args:
596 direction (str): Direction of flow, can be 'push' only. Defaults to 'push'
597 backflow_enabled (bool, optional): Enable backflow, described above, if not
598 enabled then the request will remain in the queue until all water has
599 been received. Defaults to True.
601 Returns:
602 backflow (dict): In the case of a push direction, any backflow will be
603 returned as a VQIP amount
604 """
605 # TODO - can this work for pulls??
607 total_removed = self.copy_vqip(self.queue[0])
609 # Push 0 travel time water
610 backflow = self.out_port.push_set(total_removed)
612 if not backflow_enabled:
613 self.queue[0] = backflow
614 backflow = self.empty_vqip()
615 else:
616 self.queue[0] = self.empty_vqip()
618 total_removed = self.v_change_vqip(
619 total_removed, total_removed["volume"] - backflow["volume"]
620 )
622 self.flow_out += total_removed["volume"]
623 self.vqip_out = self.sum_vqip(self.vqip_out, total_removed)
625 return backflow
627 def end_timestep(self):
628 """End timestep in an arc, resetting flow/vqip in/out (which determine) the
629 capacity for that timestep.
631 Update timings in the queue.
632 """
633 self.vqip_in = self.empty_vqip()
634 self.vqip_out = self.empty_vqip()
635 self.flow_in = 0
636 self.flow_out = 0
637 self.queue_storage_ = self.copy_vqip(self.queue_storage)
638 self.queue_storage = self.empty_vqip()
640 queue_ = self.queue.copy()
641 keys = self.queue.keys()
642 for i in range(self.max_travel):
643 if (i + 1) in keys:
644 self.queue[i] = queue_[i + 1]
645 self.queue[i + 1] = self.empty_vqip()
647 self.queue[0] = self.sum_vqip(queue_[0], queue_[1])
649 def reinit(self):
650 """"""
651 self.end_timestep()
652 self.queue = {0: self.empty_vqip(), 1: self.empty_vqip()}
655class DecayArc(QueueArc, DecayObj):
656 """"""
658 def __init__(self, decays={}, **kwargs):
659 """A QueueArc that applies decays from a DecayObj.
661 Args:
662 decays (dict, optional): A dict of dicts containing a key for each pollutant
663 that decays and within that, a key for each parameter (a constant and
664 exponent). Defaults to {}.
665 """
666 self.decays = decays
668 QueueArc.__init__(self, **kwargs)
669 DecayObj.__init__(self, decays)
671 self.mass_balance_out.append(lambda: self.total_decayed)
673 def enter_queue(self, request, direction=None, tag="default"):
674 """Add a request into the arc's queue list. Apply the make_decay function (i.e.,
675 the decay that occur's this timestep).
677 Args:
678 request (dict): A dict with a VQIP under the 'vqip' key and the travel
679 time under the 'time' key.
680 direction (str): Direction of flow, can be 'push' or 'pull
681 tag (str, optional): optional message to direct the out_port's
682 query_handler which function to call. Defaults to 'default'.
683 """
684 # Update inflows and format
685 request = self.enter_arc(request, direction, tag)
687 # TODO - currently decay depends on temp at the in_port data object..
688 # surely on vqip would be more sensible? (though this is true in many
689 # places including WTW)
691 # Decay on entry
692 request["vqip"] = self.make_decay(request["vqip"])
694 # Append to queue
695 self.queue.append(request)
697 def end_timestep(self):
698 """End timestep in an arc, resetting flow/vqip in/out (which determine) the
699 capacity for that timestep.
701 Update times of requests in the queue. Apply the make_decay function (i.e., the
702 decay that occurs in the following timestep).
703 """
704 self.vqip_in = self.empty_vqip()
705 self.vqip_out = self.empty_vqip()
706 self.total_decayed = self.empty_vqip()
707 self.flow_in = 0
708 self.flow_out = 0
710 self.queue_storage_ = self.copy_vqip(self.queue_storage)
711 self.queue_storage = self.empty_vqip()
713 for request in self.queue:
714 request["vqip"] = self.make_decay(request["vqip"])
715 request["time"] = max(request["time"] - 1, 0)
718class DecayArcAlt(AltQueueArc, DecayObj):
719 """"""
721 def __init__(self, decays={}, **kwargs):
722 """An AltQueueArc that applies decays from a DecayObj.
724 Args:
725 decays (dict, optional): A dict of dicts containing a key for each pollutant
726 that decays and within that, a key for each parameter (a constant and
727 exponent). Defaults to {}.
728 """
729 self.decays = {}
731 # super().__init__(**kwargs)
732 AltQueueArc.__init__(self, **kwargs)
733 DecayObj.__init__(self, decays)
735 self.end_timestep = self._end_timestep
737 self.mass_balance_out.append(lambda: self.total_decayed)
739 def enter_queue(self, request, direction=None, tag="default"):
740 """Add a request into the arc's queue. Apply the make_decay function (i.e., the
741 decay that occur's this timestep).
743 Args:
744 request (dict): A dict with a VQIP under the 'vqip' key and the travel
745 time under the 'time' key.
746 direction (str): Direction of flow, can be 'push' only. Defaults to 'push'
747 tag (str, optional): Optional message for out_port's query handler, can be
748 'default' only. Defaults to 'default'.
749 """
750 # TODO- has no tags
752 # Update inflows and format
753 request = self.enter_arc(request, direction, tag)
755 # Decay on entry
756 request["vqip"] = self.make_decay(request["vqip"])
758 # Sum into queue
759 if request["time"] in self.queue.keys():
760 self.queue[request["time"]] = self.sum_vqip(
761 self.queue[request["time"]], request["vqip"]
762 )
763 else:
764 self.queue[request["time"]] = request["vqip"]
765 self.max_travel = max(self.max_travel, request["time"])
767 def _end_timestep(self):
768 """End timestep in an arc, resetting flow/vqip in/out (which determine) the
769 capacity for that timestep.
771 Update timings in the queue. Apply the make_decay function (i.e., the decay that
772 occurs in the following timestep).
773 """
774 self.vqip_in = self.empty_vqip()
775 self.vqip_out = self.empty_vqip()
776 self.total_decayed = self.empty_vqip()
777 self.flow_in = 0
778 self.flow_out = 0
780 self.queue_storage_ = self.copy_vqip(self.queue_storage)
781 self.queue_storage = (
782 self.empty_vqip()
783 ) # TODO I don't think this (or any queue_storage= empty) is necessary
785 queue_ = self.queue.copy()
786 keys = self.queue.keys()
787 for i in range(self.max_travel):
788 if (i + 1) in keys:
789 self.queue[i] = self.make_decay(queue_[i + 1])
790 self.queue[i + 1] = self.empty_vqip()
792 self.queue[0] = self.sum_vqip(self.queue[0], self.make_decay(queue_[0]))
795class PullArc(Arc):
796 """"""
798 def __init__(self, **kwargs):
799 """Subclass of Arc where pushes return no availability to push.
801 This creates an Arc where only pull requests/checks can be sent, similar to a
802 river abstraction.
803 """
804 super().__init__(**kwargs)
805 self.send_push_request = self.send_push_deny
806 self.send_push_check = self.send_push_check_deny
808 def send_push_deny(self, vqip, tag="default", force=False):
809 """Function used to deny any push requests.
811 Args:
812 vqip (dict): A dict VQIP of water to push
813 tag (str, optional): optional message to direct the out_port's
814 query_handler which function to call. Defaults to 'default'.
815 force (bool, optional): Argument used to cause function to ignore tank
816 capacity of out_port, possibly resulting in pooling. Should not be used
817 unless out_port is a tank object. Defaults to False.
819 Returns:
820 (dict): A VQIP amount of water that was not successfully pushed
821 """
822 return vqip
824 def send_push_check_deny(self, vqip=None, tag="default"):
825 """Function used to deny any push checks.
827 Args:
828 vqip (dict): A dict VQIP of water to push that can be specified. Defaults to
829 None, which returns maximum capacity to push.
830 tag (str, optional): optional message to direct the out_port's
831 query_handler which function to call. Defaults to 'default'.
833 Returns:
834 (dict): An empty VQIP amount of water indicating no water can be pushed
835 """
836 return self.empty_vqip()
839class PushArc(Arc):
840 """"""
842 def __init__(self, **kwargs):
843 """Subclass of Arc where pushes return no availability to pull.
845 This creates an Arc where only push requests/checks can be sent, similar to a
846 CSO.
847 """
848 super().__init__(**kwargs)
849 self.send_pull_request = self.send_pull_deny
850 self.send_pull_check = self.send_pull_check_deny
852 def send_pull_deny(self, vqip, tag="default", force=False):
853 """Function used to deny any pull requests.
855 Args:
856 vqip (dict): A dict VQIP of water to pull
857 tag (str, optional): optional message to direct the out_port's
858 query_handler which function to call. Defaults to 'default'.
859 force (bool, optional): Argument used to cause function to ignore tank
860 capacity of out_port, possibly resulting in pooling. Should not be used
861 unless out_port is a tank object. Defaults to False.
863 Returns:
864 (dict): A VQIP amount of water that was successfully pulled
865 """
866 return self.empty_vqip()
868 def send_pull_check_deny(self, vqip=None, tag="default"):
869 """Function used to deny any pull checks.
871 Args:
872 vqip (dict): A dict VQIP of water to pull that can be specified. Defaults to
873 None, which returns maximum capacity to pull.
874 tag (str, optional): optional message to direct the out_port's
875 query_handler which function to call. Defaults to 'default'.
877 Returns:
878 (dict): An empty VQIP amount of water indicating no water can be pulled
879 """
880 return self.empty_vqip()
883class SewerArc(Arc):
884 """"""
886 pass
889class WeirArc(SewerArc):
890 """"""
892 pass