Coverage for wsimod/nodes/nodes.py: 20%
334 statements
« prev ^ index » next coverage.py v7.3.2, created at 2024-01-11 16:39 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2024-01-11 16:39 +0000
1# -*- coding: utf-8 -*-
2"""Created on Wed Apr 7 08:43:32 2021.
4@author: Barney
6Converted to totals on Thur Apr 21 2022
7"""
8from wsimod.arcs.arcs import AltQueueArc, DecayArcAlt
9from wsimod.core import constants
10from wsimod.core.core import DecayObj, WSIObj
11from wsimod.nodes import nodes
14class Node(WSIObj):
15 """"""
17 def __init__(self, name, data_input_dict=None):
18 """Base class for CWSD nodes. Constructs all the necessary attributes for the
19 node object.
21 Args:
22 name (str): Name of node
23 data_input_dict (dict, optional): Dictionary of data inputs relevant for
24 the node. Keys are tuples where first value is the name of the
25 variable to read from the dict and the second value is the time.
26 Defaults to None.
28 Examples:
29 >>> my_node = nodes.Node(name = 'london_river_junction')
31 Key assumptions:
32 - No physical processes represented, can be used as a junction.
34 Input data and parameter requirements:
35 - All nodes require a `name`
36 """
38 # Get node types
39 def all_subclasses(cls):
40 """
42 Args:
43 cls:
45 Returns:
47 """
48 return set(cls.__subclasses__()).union(
49 [s for c in cls.__subclasses__() for s in all_subclasses(c)]
50 )
52 node_types = [x.__name__ for x in all_subclasses(nodes.Node)] + ["Node"]
54 # Default essential parameters
55 # Dictionary of arcs
56 self.in_arcs = {}
57 self.out_arcs = {}
58 self.in_arcs_type = {x: {} for x in node_types}
59 self.out_arcs_type = {x: {} for x in node_types}
61 # Set parameters
62 self.name = name
63 self.t = None
64 self.data_input_dict = data_input_dict
66 # Initiailise default handlers
67 self.pull_set_handler = {"default": self.pull_distributed}
68 self.push_set_handler = {
69 "default": lambda x: self.push_distributed(
70 x, of_type=["Node", "River", "Waste", "Reservoir"]
71 )
72 }
73 self.pull_check_handler = {"default": self.pull_check_basic}
74 self.push_check_handler = {
75 "default": lambda x: self.push_check_basic(
76 x, of_type=["Node", "River", "Waste", "Reservoir"]
77 )
78 }
80 super().__init__()
82 # Mass balance checking
83 self.mass_balance_in = [self.total_in]
84 self.mass_balance_out = [self.total_out]
85 self.mass_balance_ds = [lambda: self.empty_vqip()]
87 def total_in(self):
88 """Sum flow and pollutant amounts entering a node via in_arcs.
90 Returns:
91 in_ (dict): Summed VQIP of in_arcs
93 Examples:
94 >>> node_inflow = my_node.total_in()
95 """
96 in_ = self.empty_vqip()
97 for arc in self.in_arcs.values():
98 in_ = self.sum_vqip(in_, arc.vqip_out)
100 return in_
102 def total_out(self):
103 """Sum flow and pollutant amounts leaving a node via out_arcs.
105 Returns:
106 out_ (dict): Summed VQIP of out_arcs
108 Examples:
109 >>> node_outflow = my_node.total_out()
110 """
111 out_ = self.empty_vqip()
112 for arc in self.out_arcs.values():
113 out_ = self.sum_vqip(out_, arc.vqip_in)
115 return out_
117 def node_mass_balance(self):
118 """Wrapper for core.py/WSIObj/mass_balance. Tracks change in mass balance.
120 Returns:
121 in_ (dict): A VQIP of the total from mass_balance_in functions
122 ds_ (dict): A VQIP of the total from mass_balance_ds functions
123 out_ (dict): A VQIP of the total from mass_balance_out functions
125 Examples:
126 >>> node_in, node_out, node_ds = my_node.node_mass_balance()
127 """
128 in_, ds_, out_ = self.mass_balance()
129 return in_, ds_, out_
131 def pull_set(self, vqip, tag="default"):
132 """Receives pull set requests from arcs and passes request to query handler.
134 Args:
135 vqip (dict): the VQIP pull request (by default, only the 'volume' key is
136 needed).
137 tag (str, optional): optional message to direct query_handler which pull
138 function to call. Defaults to 'default'.
140 Returns:
141 (dict): VQIP received from query_handler
143 Examples:
144 >>> water_received = my_node.pull_set({'volume' : 10})
145 """
146 return self.query_handler(self.pull_set_handler, vqip, tag)
148 def push_set(self, vqip, tag="default"):
149 """Receives push set requests from arcs and passes request to query handler.
151 Args:
152 vqip (_type_): the VQIP push request
153 tag (str, optional): optional message to direct query_handler which pull
154 function to call. Defaults to 'default'.
156 Returns:
157 (dict): VQIP not received from query_handler
159 Examples:
160 water_not_pushed = my_node.push_set(wastewater_vqip)
161 """
162 return self.query_handler(self.push_set_handler, vqip, tag)
164 def pull_check(self, vqip=None, tag="default"):
165 """Receives pull check requests from arcs and passes request to query handler.
167 Args:
168 vqip (dict, optional): the VQIP pull check (by default, only the
169 'volume' key is used). Defaults to None, which returns all available
170 water to pull.
171 tag (str, optional): optional message to direct query_handler which pull
172 function to call. Defaults to 'default'.
174 Returns:
175 (dict): VQIP available from query_handler
177 Examples:
178 >>> water_available = my_node.pull_check({'volume' : 10})
179 >>> total_water_available = my_node.pull_check()
180 """
181 return self.query_handler(self.pull_check_handler, vqip, tag)
183 def push_check(self, vqip=None, tag="default"):
184 """Receives push check requests from arcs and passes request to query handler.
186 Args:
187 vqip (dict, optional): the VQIP push check. Defaults to None, which
188 returns all available capacity to push
189 tag (str, optional): optional message to direct query_handler which pull
190 function to call. Defaults to 'default'
192 Returns:
193 (dict): VQIP available to push from query_handler
195 Examples:
196 >>> total_available_push_capacity = my_node.push_check()
197 >>> available_push_capacity = my_node.push_check(wastewater_vqip)
198 """
199 return self.query_handler(self.push_check_handler, vqip, tag)
201 def get_direction_arcs(self, direction, of_type=None):
202 """Identify arcs to/from all attached nodes in a given direction.
204 Args:
205 direction (str): can be either 'pull' or 'push' to send checks to
206 receiving or contributing nodes
207 of_type (str or list) : optional, can be specified to send checks only
208 to nodes of a given type (must be a subclass in nodes.py)
210 Returns:
211 f (str): Either 'send_pull_check' or 'send_push_check' depending on
212 direction
213 arcs (list): List of arc objects
215 Raises:
216 Message if no direction is specified
218 Examples:
219 >>> arcs_to_push_to = my_node.get_direction_arcs('push')
220 >>> arcs_to_pull_from = my_node.get_direction_arcs('pull')
221 >>> arcs_from_reservoirs = my_node.get_direction_arcs('pull', of_type =
222 'Reservoir')
223 """
224 if of_type is None:
225 # Return all arcs
226 if direction == "pull":
227 arcs = list(self.in_arcs.values())
228 f = "send_pull_check"
229 elif direction == "push":
230 arcs = list(self.out_arcs.values())
231 f = "send_push_check"
232 else:
233 print("No direction")
235 else:
236 if isinstance(of_type, str):
237 of_type = [of_type]
239 # Assign arcs/function based on parameters
240 arcs = []
241 if direction == "pull":
242 for type_ in of_type:
243 arcs += list(self.in_arcs_type[type_].values())
244 f = "send_pull_check"
245 elif direction == "push":
246 for type_ in of_type:
247 arcs += list(self.out_arcs_type[type_].values())
248 f = "send_push_check"
249 else:
250 print("No direction")
252 return f, arcs
254 def get_connected(self, direction="pull", of_type=None, tag="default"):
255 """Send push/pull checks to all attached arcs in a given direction.
257 Args:
258 direction (str, optional): The type of check to send to all attached
259 nodes. Can be 'push' or 'pull'. The default is 'pull'.
260 of_type (str or list) : optional, can be specified to send checks only
261 to nodes of a given type (must be a subclass in nodes.py)
262 tag (str, optional): optional message to direct query_handler which pull
263 function to call. Defaults to 'default'.
265 Returns:
266 connected (dict) :
267 Dictionary containing keys:
268 'avail': (float) - total available volume for push/pull
269 'priority': (float) - total (availability * preference)
270 of attached arcs
271 'allocation': (dict) - contains all attached arcs in specified
272 direction and respective (availability * preference)
274 Examples:
275 >>> vqip_available_to_pull = my_node.get_direction_arcs()
276 >>> vqip_available_to_push = my_node.get_direction_arcs('push')
277 >>> avail_reservoir_vqip = my_node.get_direction_arcs('pull',
278 of_type = 'Reservoir')
279 >>> avail_sewer_push_to_sewers = my_node.get_direction_arcs('push',
280 of_type = 'Sewer',
281 tag = 'Sewer')
282 """
283 # Initialise connected dict
284 connected = {"avail": 0, "priority": 0, "allocation": {}, "capacity": {}}
286 # Get arcs
287 f, arcs = self.get_direction_arcs(direction, of_type)
289 # Iterate over arcs, updating connected dict
290 for arc in arcs:
291 avail = getattr(arc, f)(tag=tag)["volume"]
292 if avail < constants.FLOAT_ACCURACY:
293 avail = 0 # Improves convergence
294 connected["avail"] += avail
295 preference = arc.preference
296 connected["priority"] += avail * preference
297 connected["allocation"][arc.name] = avail * preference
298 connected["capacity"][arc.name] = avail
300 return connected
302 def query_handler(self, handler, ip, tag):
303 """Sends all push/pull requests/checks using the handler (i.e., ensures the
304 correct function is used that lines up with 'tag').
306 Args:
307 handler (dict): contains all push/pull requests for various tags
308 ip (vqip): the vqip request
309 tag (str): describes what type of push/pull request should be called
311 Returns:
312 (dict): the VQIP reply from push/pull request
314 Raises:
315 Message if no functions are defined for tag and if request/check
316 function fails
317 """
318 try:
319 return handler[tag](ip)
320 except Exception:
321 if tag not in handler.keys():
322 print("No functions defined for " + tag)
323 return handler[tag](ip)
324 else:
325 print("Some other error")
326 return handler[tag](ip)
328 def pull_distributed(self, vqip, of_type=None, tag="default"):
329 """Send pull requests to all (or specified by type) nodes connecting to self.
330 Iterate until request is met or maximum iterations are hit. Streamlines if only
331 one in_arc exists.
333 Args:
334 vqip (dict): Total amount to pull (by default, only the
335 'volume' key is used)
336 of_type (str or list) : optional, can be specified to send checks only
337 to nodes of a given type (must be a subclass in nodes.py)
338 tag (str, optional): optional message to direct query_handler which pull
339 function to call. Defaults to 'default'.
341 Returns:
342 pulled (dict): VQIP of combined pulled water
343 """
344 if len(self.in_arcs) == 1:
345 # If only one in_arc, just pull from that
346 if of_type is None:
347 pulled = next(iter(self.in_arcs.values())).send_pull_request(
348 vqip, tag=tag
349 )
350 elif any(
351 [x in of_type for x, y in self.in_arcs_type.items() if len(y) > 0]
352 ):
353 pulled = next(iter(self.in_arcs.values())).send_pull_request(
354 vqip, tag=tag
355 )
356 else:
357 # No viable out arcs
358 pulled = self.empty_vqip()
359 else:
360 # Pull in proportion from connected by priority
362 # Initialise pulled, deficit, connected, iter_
363 pulled = self.empty_vqip()
364 deficit = vqip["volume"]
365 connected = self.get_connected(direction="pull", of_type=of_type, tag=tag)
366 iter_ = 0
368 # Iterate over sending nodes until deficit met
369 while (
370 (deficit > constants.FLOAT_ACCURACY)
371 & (connected["avail"] > constants.FLOAT_ACCURACY)
372 ) & (iter_ < constants.MAXITER):
373 # Pull from connected
374 for key, allocation in connected["allocation"].items():
375 received = self.in_arcs[key].send_pull_request(
376 {"volume": deficit * allocation / connected["priority"]},
377 tag=tag,
378 )
379 pulled = self.sum_vqip(pulled, received)
381 # Update deficit, connected and iter_
382 deficit = vqip["volume"] - pulled["volume"]
383 connected = self.get_connected(
384 direction="pull", of_type=of_type, tag=tag
385 )
386 iter_ += 1
388 if iter_ == constants.MAXITER:
389 print("Maxiter reached in {0} at {1}".format(self.name, self.t))
390 return pulled
392 def push_distributed(self, vqip, of_type=None, tag="default"):
393 """Send push requests to all (or specified by type) nodes connecting to self.
394 Iterate until request is met or maximum iterations are hit. Streamlines if only
395 one in_arc exists.
397 Args:
398 vqip (dict): Total amount to push
399 of_type (str or list) : optional, can be specified to send checks only
400 to nodes of a given type (must be a subclass in nodes.py)
401 tag (str, optional): optional message to direct query_handler which pull
402 function to call. Defaults to 'default'.
404 Returns:
405 not_pushed_ (dict): VQIP of water that cannot be pushed
406 """
407 if len(self.out_arcs) == 1:
408 # If only one out_arc, just send the water down that
409 if of_type is None:
410 not_pushed_ = next(iter(self.out_arcs.values())).send_push_request(
411 vqip, tag=tag
412 )
413 elif any(
414 [x in of_type for x, y in self.out_arcs_type.items() if len(y) > 0]
415 ):
416 not_pushed_ = next(iter(self.out_arcs.values())).send_push_request(
417 vqip, tag=tag
418 )
419 else:
420 # No viable out arcs
421 not_pushed_ = vqip
422 else:
423 # Push in proportion to connected by priority
424 # Initialise pushed, deficit, connected, iter_
425 not_pushed = vqip["volume"]
426 not_pushed_ = self.copy_vqip(vqip)
427 connected = self.get_connected(direction="push", of_type=of_type, tag=tag)
428 iter_ = 0
429 if not_pushed > connected["avail"]:
430 # If more water than can be pushed, ignore preference and allocate all
431 # available based on capacity
432 connected["priority"] = connected["avail"]
433 connected["allocation"] = connected["capacity"]
435 # Iterate over receiving nodes until sent
436 while (
437 (not_pushed > constants.FLOAT_ACCURACY)
438 & (connected["avail"] > constants.FLOAT_ACCURACY)
439 & (iter_ < constants.MAXITER)
440 ):
441 # Push to connected
442 amount_to_push = min(connected["avail"], not_pushed)
444 for key, allocation in connected["allocation"].items():
445 to_send = amount_to_push * allocation / connected["priority"]
446 to_send = self.v_change_vqip(not_pushed_, to_send)
447 reply = self.out_arcs[key].send_push_request(to_send, tag=tag)
449 sent = self.extract_vqip(to_send, reply)
450 not_pushed_ = self.extract_vqip(not_pushed_, sent)
452 not_pushed = not_pushed_["volume"]
453 connected = self.get_connected(
454 direction="push", of_type=of_type, tag=tag
455 )
456 iter_ += 1
458 if iter_ == constants.MAXITER:
459 print("Maxiter reached in {0} at {1}".format(self.name, self.t))
461 return not_pushed_
463 def check_basic(self, direction, vqip=None, of_type=None, tag="default"):
464 """Generic function that conveys a pull or push check onwards to connected
465 nodes. It is the default behaviour that treats a node like a junction.
467 Args:
468 direction (str): can be either 'pull' or 'push' to send checks to
469 receiving or contributing nodes
470 vqip (dict, optional): The VQIP to check. Defaults to None (if pulling
471 this will return available water to pull, if pushing then available
472 capacity to push).
473 of_type (str or list) : optional, can be specified to send checks only
474 to nodes of a given type (must be a subclass in nodes.py)
475 tag (str, optional): optional message to direct query_handler which pull
476 function to call. Defaults to 'default'.
478 Returns:
479 avail (dict): VQIP responses summed over all requests
480 """
481 f, arcs = self.get_direction_arcs(direction, of_type)
483 # Iterate over arcs, updating total
484 avail = self.empty_vqip()
485 for arc in arcs:
486 avail = self.sum_vqip(avail, getattr(arc, f)(tag=tag))
488 if vqip is not None:
489 avail = self.v_change_vqip(avail, min(avail["volume"], vqip["volume"]))
491 return avail
493 def pull_check_basic(self, vqip=None, of_type=None, tag="default"):
494 """Default node check behaviour that treats a node like a junction. Water
495 available to pull is just the water available to pull from upstream connected
496 nodes.
498 Args:
499 vqip (dict, optional): VQIP from handler of amount to pull check
500 (by default, only the 'volume' key is used). Defaults to None (which
501 returns all availalbe water to pull).
502 of_type (str or list) : optional, can be specified to send checks only
503 to nodes of a given type (must be a subclass in nodes.py)
504 tag (str, optional): optional message to direct query_handler which pull
505 function to call. Defaults to 'default'.
507 Returns:
508 (dict): VQIP check response of upstream nodes
509 """
510 return self.check_basic("pull", vqip, of_type, tag)
512 def push_check_basic(self, vqip=None, of_type=None, tag="default"):
513 """Default node check behaviour that treats a node like a junction. Water
514 available to push is just the water available to push to downstream connected
515 nodes.
517 Args:
518 vqip (dict, optional): VQIP from handler of amount to push check.
519 Defaults to None (which returns all available capacity to push).
520 of_type (str or list) : optional, can be specified to send checks only
521 to nodes of a given type (must be a subclass in nodes.py)
522 tag (str, optional): optional message to direct query_handler which pull
523 function to call. Defaults to 'default'.
525 Returns:
526 (dict): VQIP check response of downstream nodes
527 """
528 return self.check_basic("push", vqip, of_type, tag)
530 def pull_set_deny(self, vqip):
531 """Responds that no water is available to pull from a request.
533 Args:
534 vqip (dict): A VQIP amount of water requested (ignored)
536 Returns:
537 (dict): An empty VQIP indicated no water was pulled
539 Raises:
540 Message when called, since it would usually occur if a model is
541 improperly connected
542 """
543 print("Attempted pull set from deny")
544 return self.empty_vqip()
546 def pull_check_deny(self, vqip=None):
547 """Responds that no water is available to pull from a check.
549 Args:
550 vqip (dict): A VQIP amount of water requested (ignored)
552 Returns:
553 (dict): An empty VQIP indicated no water was pulled
555 Raises:
556 Message when called, since it would usually occur if a model is
557 improperly connected
558 """
559 print("Attempted pull check from deny")
560 return self.empty_vqip()
562 def push_set_deny(self, vqip):
563 """Responds that no water is available to push in a request.
565 Args:
566 vqip (dict): A VQIP amount of water to push
568 Returns:
569 vqip (dict): Returns the request indicating no water was pushed
571 Raises:
572 Message when called, since it would usually occur if a model is
573 improperly connected
574 """
575 print("Attempted push set to deny")
576 return vqip
578 def push_check_deny(self, vqip=None):
579 """Responds that no water is available to push in a check.
581 Args:
582 vqip (dict): A VQIP amount of water to push check (ignored)
584 Returns:
585 (dict): An empty VQIP indicated no capacity for pushes exists
587 Raises:
588 Message when called, since it would usually occur if a model is
589 improperly connected
590 """
591 print("Attempted push check to deny")
592 return self.empty_vqip()
594 def push_check_accept(self, vqip=None):
595 """Push check function that accepts all water.
597 Args:
598 vqip (dict, optional): A VQIP that has been pushed (ignored)
600 Returns:
601 (dict): VQIP or an unbounded capacity, indicating all water can be received
602 """
603 if not vqip:
604 vqip = self.empty_vqip()
605 vqip["volume"] = constants.UNBOUNDED_CAPACITY
606 return vqip
608 def get_data_input(self, var):
609 """Read data from data_input_dict. Keys are tuples with the first entry as the
610 variable to read and second entry the time.
612 Args:
613 var (str): Name of variable
615 Returns:
616 Data read
617 """
618 return self.data_input_dict[(var, self.t)]
620 def end_timestep(self):
621 """Empty function intended to be called at the end of every timestep.
623 Subclasses will overwrite this functions.
624 """
625 pass
627 def reinit(self):
628 """Empty function to be written if reinitialisation capability is added."""
629 pass
632"""
633 This is an attempt to generalise the behaviour of pull/push_distributed
634 It doesn't yet work...
636 def general_distribute(self, vqip, of_type = None, tag = 'default', direction =
637 None):
638 if direction == 'push':
639 arcs = self.out_arcs
640 arcs_type = self.out_arcs_type
641 tracker = self.copy_vqip(vqip)
642 requests = {x.name : lambda y : x.send_push_request(y, tag) for x in arcs.
643 values()}
644 elif direction == 'pull':
645 arcs = self.in_arcs
646 arcs_type = self.in_arcs_type
647 tracker = self.empty_vqip()
648 requests = {x.name : lambda y : x.send_pull_request(y, tag) for x in arcs.
649 values()}
650 else:
651 print('No direction')
653 if len(arcs) == 1:
654 if (of_type == None) | any([x in of_type for x, y in arcs_type.items() if
655 len(y) > 0]):
656 arc = next(iter(arcs.keys()))
657 return requests[arc](vqip)
658 else:
659 #No viable arcs
660 return tracker
662 connected = self.get_connected(direction = direction,
663 of_type = of_type,
664 tag = tag)
666 iter_ = 0
668 target = self.copy_vqip(vqip)
669 #Iterate over sending nodes until deficit met
670 while (((target['volume'] > constants.FLOAT_ACCURACY) &
671 (connected['avail'] > constants.FLOAT_ACCURACY)) &
672 (iter_ < constants.MAXITER)):
674 amount = min(connected['avail'], target['volume']) #Deficit or amount
675 still to push
676 replies = self.empty_vqip()
678 for key, allocation in connected['allocation'].items():
679 to_request = amount * allocation / connected['priority']
680 to_request = self.v_change_vqip(target, to_request)
681 reply = requests[key](to_request)
682 replies = self.sum_vqip(replies, reply)
684 if direction == 'pull':
685 target = self.extract_vqip(target, replies)
686 elif direction == 'push':
687 target = replies
689 connected = self.get_connected(direction = direction,
690 of_type = of_type,
691 tag = tag)
692 iter_ += 1
694 if iter_ == constants.MAXITER:
695 print('Maxiter reached')
696 return target"""
698# def replace(self, newnode):
699# #TODO - doesn't work because the bound methods (e.g., pull_set_handler) get
700# #associated with newnode and can't (as far as I can tell) be moved to self
701#
702# """
703# Replace node with new node, maintaining any existing references to original node
705# Example
706# -------
707# print(my_node.name)
708# my_node.update(Node(name = 'new_node'))
709# print(my_node.name)
710# """
713# #You could use the below code to move the arcs_type references to arcs
714# #that are attached to this node. However, if you were creating a new
715# #subclass of Node then you would need to include this key in all
716# #arcs_type dictionaries in all nodes... which would get complicated
717# #probably safest to leave the arcs_type keys that are associated with
718# #this arc the same (for now...) -therefore - needs the same name
720# # for arc in self.in_arcs.values():
721# # _ = arc.in_port.out_arcs_type[self.__class__.__name__].pop(arc.name)
722# # arc.in_port.out_arcs_type[newnode.__class__.__name__][arc.name] = arc
723# # for arc in self.out_arcs.values():
724# # _ = arc.out_port.in_arcs_type[self.__class__.__name__].pop(arc.name)
725# # arc.out_port.in_arcs_type[newnode.__class__.__name__][arc.name] = arc
728# #Replace class
729# self.__class__ = newnode.__class__
731# #Replace object information (keeping some old information such as in_arcs)
732# for key in ['in_arcs',
733# 'out_arcs',
734# 'in_arcs_type',
735# 'out_arcs_type']:
736# newnode.__dict__[key] = self.__dict__[key]
738# self.__dict__.clear()
739# self.__dict__.update(newnode.__dict__)
742class Tank(WSIObj):
743 """"""
745 def __init__(self, capacity=0, area=1, datum=10, initial_storage=0):
746 """A standard storage object.
748 Args:
749 capacity (float, optional): Volumetric tank capacity. Defaults to 0.
750 area (float, optional): Area of tank. Defaults to 1.
751 datum (float, optional): Datum of tank base (not currently used in any
752 functions). Defaults to 10.
753 initial_storage (optional): Initial storage for tank.
754 float: Tank will be initialised with zero pollutants and the float
755 as volume
756 dict: Tank will be initialised with this VQIP
757 Defaults to 0 (i.e., no volume, no pollutants).
758 """
759 # Set parameters
760 self.capacity = capacity
761 self.area = area
762 self.datum = datum
763 self.initial_storage = initial_storage
765 WSIObj.__init__(self) # Not sure why I do this rather than super()
767 # TODO I don't think the outer if statement is needed
768 if "initial_storage" in dir(self):
769 if isinstance(self.initial_storage, dict):
770 # Assume dict is VQIP describing storage
771 self.storage = self.copy_vqip(self.initial_storage)
772 self.storage_ = self.copy_vqip(
773 self.initial_storage
774 ) # Lagged storage for mass balance
775 else:
776 # Assume number describes initial stroage
777 self.storage = self.v_change_vqip(
778 self.empty_vqip(), self.initial_storage
779 )
780 self.storage_ = self.v_change_vqip(
781 self.empty_vqip(), self.initial_storage
782 ) # Lagged storage for mass balance
783 else:
784 self.storage = self.empty_vqip()
785 self.storage_ = self.empty_vqip() # Lagged storage for mass balance
787 def ds(self):
788 """Should be called by parent object to get change in storage.
790 Returns:
791 (dict): Change in storage
792 """
793 return self.ds_vqip(self.storage, self.storage_)
795 def pull_ponded(self):
796 """Pull any volume that is above the tank's capacity.
798 Returns:
799 ponded (vqip): Amount of ponded water that has been removed from the
800 tank
802 Examples:
803 >>> constants.ADDITIVE_POLLUTANTS = ['phosphate']
804 >>> my_tank = Tank(capacity = 9, initial_storage = {'volume' : 10,
805 'phosphate' : 0.2})
806 >>> print(my_tank.storage)
807 {'volume' : 10, 'phosphate' : 0.2}
808 >>> print(my_tank.pull_ponded())
809 {'volume' : 1, 'phosphate' : 0.02}
810 >>> print(my_tank.storage)
811 {'volume' : 9, 'phosphate' : 0.18}
812 """
813 # Get amount
814 ponded = max(self.storage["volume"] - self.capacity, 0)
815 # Pull from tank
816 ponded = self.pull_storage({"volume": ponded})
817 return ponded
819 def get_avail(self, vqip=None):
820 """Get minimum of the amount of water in storage and vqip (if provided).
822 Args:
823 vqip (dict, optional): Maximum water required (only 'volume' is used).
824 Defaults to None.
826 Returns:
827 reply (dict): Water available
829 Examples:
830 >>> constants.ADDITIVE_POLLUTANTS = ['phosphate']
831 >>> my_tank = Tank(capacity = 9, initial_storage = {'volume' : 10,
832 'phosphate' : 0.2})
833 >>> print(my_tank.storage)
834 {'volume' : 10, 'phosphate' : 0.2}
835 >>> print(my_tank.get_avail())
836 {'volume' : 10, 'phosphate' : 0.2}
837 >>> print(my_tank.get_avail({'volume' : 1}))
838 {'volume' : 1, 'phosphate' : 0.02}
839 """
840 reply = self.copy_vqip(self.storage)
841 if vqip is None:
842 # Return storage
843 return reply
844 else:
845 # Adjust storage pollutants to match volume in vqip
846 reply = self.v_change_vqip(reply, min(reply["volume"], vqip["volume"]))
847 return reply
849 def get_excess(self, vqip=None):
850 """Get difference between current storage and tank capacity.
852 Args:
853 vqip (dict, optional): Maximum capacity required (only 'volume' is
854 used). Defaults to None.
856 Returns:
857 (dict): Difference available
859 Examples:
860 >>> constants.ADDITIVE_POLLUTANTS = ['phosphate']
861 >>> my_tank = Tank(capacity = 9, initial_storage = {'volume' : 5,
862 'phosphate' : 0.2})
863 >>> print(my_tank.get_excess())
864 {'volume' : 4, 'phosphate' : 0.16}
865 >>> print(my_tank.get_excess({'volume' : 2}))
866 {'volume' : 2, 'phosphate' : 0.08}
867 """
868 vol = max(self.capacity - self.storage["volume"], 0)
869 if vqip is not None:
870 vol = min(vqip["volume"], vol)
872 # Adjust storage pollutants to match volume in vqip
873 # TODO the v_change_vqip in the reply here is a weird default (if a VQIP is not
874 # provided)
875 return self.v_change_vqip(self.storage, vol)
877 def push_storage(self, vqip, force=False):
878 """Push water into tank, updating the storage VQIP. Force argument can be used
879 to ignore tank capacity.
881 Args:
882 vqip (dict): VQIP amount to be pushed
883 force (bool, optional): Argument used to cause function to ignore tank
884 capacity, possibly resulting in pooling. Defaults to False.
886 Returns:
887 reply (dict): A VQIP of water not successfully pushed to the tank
889 Examples:
890 >>> constants.ADDITIVE_POLLUTANTS = ['phosphate']
891 >>> constants.POLLUTANTS = ['phosphate']
892 >>> constants.NON_ADDITIVE_POLLUTANTS = []
893 >>> my_tank = Tank(capacity = 9, initial_storage = {'volume' : 5,
894 'phosphate' : 0.2})
895 >>> my_push = {'volume' : 10, 'phosphate' : 0.5}
896 >>> reply = my_tank.push_storage(my_push)
897 >>> print(reply)
898 {'volume' : 6, 'phosphate' : 0.3}
899 >>> print(my_tank.storage)
900 {'volume': 9.0, 'phosphate': 0.4}
901 >>> print(my_tank.push_storage(reply, force = True))
902 {'phosphate': 0, 'volume': 0}
903 >>> print(my_tank.storage)
904 {'volume': 15.0, 'phosphate': 0.7}
905 """
906 if force:
907 # Directly add request to storage
908 self.storage = self.sum_vqip(self.storage, vqip)
909 return self.empty_vqip()
911 # Check whether request can be met
912 excess = self.get_excess()["volume"]
914 # Adjust accordingly
915 reply = max(vqip["volume"] - excess, 0)
916 reply = self.v_change_vqip(vqip, reply)
917 entered = self.v_change_vqip(vqip, vqip["volume"] - reply["volume"])
919 # Update storage
920 self.storage = self.sum_vqip(self.storage, entered)
922 return reply
924 def pull_storage(self, vqip):
925 """Pull water from tank, updating the storage VQIP. Pollutants are removed from
926 tank in proportion to 'volume' in vqip (pollutant values in vqip are ignored).
928 Args:
929 vqip (dict): VQIP amount to be pulled, (only 'volume' key is needed)
931 Returns:
932 reply (dict): A VQIP water successfully pulled from the tank
934 Examples:
935 >>> constants.ADDITIVE_POLLUTANTS = ['phosphate']
936 >>> my_tank = Tank(capacity = 9, initial_storage = {'volume' : 5,
937 'phosphate' : 0.2})
938 >>> print(my_tank.pull_storage({'volume' : 6}))
939 {'volume': 5.0, 'phosphate': 0.2}
940 >>> print(my_tank.storage)
941 {'volume': 0, 'phosphate': 0}
942 """
943 # Pull from Tank by volume (taking pollutants in proportion)
944 if self.storage["volume"] == 0:
945 return self.empty_vqip()
947 # Adjust based on available volume
948 reply = min(vqip["volume"], self.storage["volume"])
950 # Update reply to vqip (in proportion to concentration in storage)
951 reply = self.v_change_vqip(self.storage, reply)
953 # Extract from storage
954 self.storage = self.extract_vqip(self.storage, reply)
956 return reply
958 def pull_pollutants(self, vqip):
959 """Pull water from tank, updating the storage VQIP. Pollutants are removed from
960 tank in according to their values in vqip.
962 Args:
963 vqip (dict): VQIP amount to be pulled
965 Returns:
966 vqip (dict): A VQIP water successfully pulled from the tank
968 Examples:
969 >>> constants.ADDITIVE_POLLUTANTS = ['phosphate']
970 >>> my_tank = Tank(capacity = 9, initial_storage = {'volume' : 5,
971 'phosphate' : 0.2})
972 >>> print(my_tank.pull_pollutants({'volume' : 2, 'phosphate' : 0.15}))
973 {'volume': 2.0, 'phosphate': 0.15}
974 >>> print(my_tank.storage)
975 {'volume': 3, 'phosphate': 0.05}
976 """
977 # Adjust based on available mass
978 for pol in constants.ADDITIVE_POLLUTANTS + ["volume"]:
979 vqip[pol] = min(self.storage[pol], vqip[pol])
981 # Extract from storage
982 self.storage = self.extract_vqip(self.storage, vqip)
983 return vqip
985 def get_head(self, datum=None, non_head_storage=0):
986 """Area volume calculation for head calcuations. Datum and storage that does not
987 contribute to head can be specified.
989 Args:
990 datum (float, optional): Value to add to pressure head in tank.
991 Defaults to None.
992 non_head_storage (float, optional): Amount of storage that does
993 not contribute to generation of head. The tank must exceed
994 this value to generate any pressure head. Defaults to 0.
996 Returns:
997 head (float): Total head in tank
999 Examples:
1000 >>> my_tank = Tank(datum = 10, initial_storage = 5, capacity = 10, area = 2)
1001 >>> print(my_tank.get_head())
1002 12.5
1003 >>> print(my_tank.get_head(non_head_storage = 1))
1004 12
1005 >>> print(my_tank.get_head(non_head_storage = 1, datum = 0))
1006 2
1007 """
1008 # If datum not provided use object datum
1009 if datum is None:
1010 datum = self.datum
1012 # Calculate pressure head generating storage
1013 head_storage = max(self.storage["volume"] - non_head_storage, 0)
1015 # Perform head calculation
1016 head = head_storage / self.area + datum
1018 return head
1020 def evaporate(self, evap):
1021 """Wrapper for v_distill_vqip to apply a volumetric subtraction from tank
1022 storage. Volume removed from storage and no change in pollutant values.
1024 Args:
1025 evap (float): Volume to evaporate
1027 Returns:
1028 evap (float): Volumetric amount of evaporation successfully removed
1029 """
1030 avail = self.get_avail()["volume"]
1032 evap = min(evap, avail)
1033 self.storage = self.v_distill_vqip(self.storage, evap)
1034 return evap
1036 ##Old function no longer needed (check it is not used anywhere and remove)
1037 def push_total(self, vqip):
1038 """
1040 Args:
1041 vqip:
1043 Returns:
1045 """
1046 self.storage = self.sum_vqip(self.storage, vqip)
1047 return self.empty_vqip()
1049 ##Old function no longer needed (check it is not used anywhere and remove)
1050 def push_total_c(self, vqip):
1051 """
1053 Args:
1054 vqip:
1056 Returns:
1058 """
1059 # Push vqip to storage where pollutants are given as a concentration rather
1060 # than storage
1061 vqip = self.concentration_to_total(self.vqip)
1062 self.storage = self.sum_vqip(self.storage, vqip)
1063 return self.empty_vqip()
1065 def end_timestep(self):
1066 """Function to be called by parent object, tracks previously timestep's
1067 storage."""
1068 self.storage_ = self.copy_vqip(self.storage)
1070 def reinit(self):
1071 """Set storage to an empty VQIP."""
1072 self.storage = self.empty_vqip()
1073 self.storage_ = self.empty_vqip()
1076class ResidenceTank(Tank):
1077 """"""
1079 def __init__(self, residence_time=2, **kwargs):
1080 """A tank that has a residence time property that limits storage pulled from the
1081 'pull_outflow' function.
1083 Args:
1084 residence_time (float, optional): Residence time, in theory given
1085 in timesteps, in practice it just means that storage /
1086 residence time can be pulled each time pull_outflow is called.
1087 Defaults to 2.
1088 """
1089 self.residence_time = residence_time
1090 super().__init__(**kwargs)
1092 def pull_outflow(self):
1093 """Pull storage by residence time from the tank, updating tank storage.
1095 Returns:
1096 outflow (dict): A VQIP with volume of pulled volume and pollutants
1097 proportionate to the tank's pollutants
1098 """
1099 # Calculate outflow
1100 outflow = self.storage["volume"] / self.residence_time
1101 # Update pollutant amounts
1102 outflow = self.v_change_vqip(self.storage, outflow)
1103 # Remove from tank
1104 outflow = self.pull_storage(outflow)
1105 return outflow
1108class DecayTank(Tank, DecayObj):
1109 """"""
1111 def __init__(self, decays={}, parent=None, **kwargs):
1112 """A tank that has DecayObj functions. Decay occurs in end_timestep, after
1113 updating state variables. In this sense, decay is occurring at the very
1114 beginning of the timestep.
1116 Args:
1117 decays (dict): A dict of dicts containing a key for each pollutant that
1118 decays and, within that, a key for each parameter (a constant and
1119 exponent)
1120 parent (object): An object that can be used to read temperature data from
1121 """
1122 # Store parameters
1123 self.parent = parent
1125 # Initialise Tank
1126 Tank.__init__(self, **kwargs)
1128 # Initialise decay object
1129 DecayObj.__init__(self, decays)
1131 # Update timestep and ds functions
1132 self.end_timestep = self.end_timestep_decay
1133 self.ds = self.decay_ds
1135 def end_timestep_decay(self):
1136 """Update state variables and call make_decay."""
1137 self.total_decayed = self.empty_vqip()
1138 self.storage_ = self.copy_vqip(self.storage)
1140 self.storage = self.make_decay(self.storage)
1142 def decay_ds(self):
1143 """Track storage and amount decayed.
1145 Returns:
1146 ds (dict): A VQIP of change in storage and total decayed
1147 """
1148 ds = self.ds_vqip(self.storage, self.storage_)
1149 ds = self.sum_vqip(ds, self.total_decayed)
1150 return ds
1153class QueueTank(Tank):
1154 """"""
1156 def __init__(self, number_of_timesteps=0, **kwargs):
1157 """A tank with an internal queue arc, whose queue must be completed before
1158 storage is available for use. The storage that has completed the queue is under
1159 the 'active_storage' property.
1161 Args:
1162 number_of_timesteps (int, optional): Built in delay for the internal
1163 queue - it is always added to the queue time, although delay can be
1164 provided with pushes only. Defaults to 0.
1165 """
1166 # Set parameters
1167 self.number_of_timesteps = number_of_timesteps
1169 super().__init__(**kwargs)
1170 self.end_timestep = self._end_timestep
1171 self.active_storage = self.copy_vqip(self.storage)
1173 # TODO enable queue to be initialised not empty
1174 self.out_arcs = {}
1175 self.in_arcs = {}
1176 # Create internal queue arc
1177 self.internal_arc = AltQueueArc(
1178 in_port=self, out_port=self, number_of_timesteps=self.number_of_timesteps
1179 )
1180 # TODO should mass balance call internal arc (is this arc called in arc mass
1181 # balance?)
1183 def get_avail(self):
1184 """Return the active_storage of the tank.
1186 Returns:
1187 (dict): VQIP of active_storage
1188 """
1189 return self.copy_vqip(self.active_storage)
1191 def push_storage(self, vqip, time=0, force=False):
1192 """Push storage into QueueTank, applying travel time, unless forced.
1194 Args:
1195 vqip (dict): A VQIP of the amount to push
1196 time (int, optional): Number of timesteps to spend in queue, in addition
1197 to number_of_timesteps property of internal_arc. Defaults to 0.
1198 force (bool, optional): Force property that will ignore tank capacity
1199 and ignore travel time. Defaults to False.
1201 Returns:
1202 reply (dict): A VQIP of water that could not be received by the tank
1203 """
1204 if force:
1205 # Directly add request to storage, skipping queue
1206 self.storage = self.sum_vqip(self.storage, vqip)
1207 self.active_storage = self.sum_vqip(self.active_storage, vqip)
1208 return self.empty_vqip()
1210 # Push to QueueTank
1211 reply = self.internal_arc.send_push_request(vqip, force=force, time=time)
1212 # Update storage
1213 # TODO storage won't be accurately tracking temperature..
1214 self.storage = self.sum_vqip(
1215 self.storage, self.v_change_vqip(vqip, vqip["volume"] - reply["volume"])
1216 )
1217 return reply
1219 def pull_storage(self, vqip):
1220 """Pull storage from the QueueTank, only water in active_storage is available.
1221 Returning water pulled and updating tank states. Pollutants are removed from
1222 tank in proportion to 'volume' in vqip (pollutant values in vqip are ignored).
1224 Args:
1225 vqip (dict): VQIP amount to pull, only 'volume' property is used
1227 Returns:
1228 reply (dict): VQIP amount that was pulled
1229 """
1230 # Adjust based on available volume
1231 reply = min(vqip["volume"], self.active_storage["volume"])
1233 # Update reply to vqip
1234 reply = self.v_change_vqip(self.active_storage, reply)
1236 # Extract from active_storage
1237 self.active_storage = self.extract_vqip(self.active_storage, reply)
1239 # Extract from storage
1240 self.storage = self.extract_vqip(self.storage, reply)
1242 return reply
1244 def pull_storage_exact(self, vqip):
1245 """Pull storage from the QueueTank, only water in active_storage is available.
1246 Pollutants are removed from tank in according to their values in vqip.
1248 Args:
1249 vqip (dict): A VQIP amount to pull
1251 Returns:
1252 reply (dict): A VQIP amount successfully pulled
1253 """
1254 # Adjust based on available
1255 reply = self.copy_vqip(vqip)
1256 for pol in ["volume"] + constants.ADDITIVE_POLLUTANTS:
1257 reply[pol] = min(reply[pol], self.active_storage[pol])
1259 # Pull from QueueTank
1260 self.active_storage = self.extract_vqip(self.active_storage, reply)
1262 # Extract from storage
1263 self.storage = self.extract_vqip(self.storage, reply)
1264 return reply
1266 def push_check(self, vqip=None, tag="default"):
1267 """Wrapper for get_excess but applies comparison to volume in VQIP.
1268 Needed to enable use of internal_arc, which assumes it is connecting nodes .
1269 rather than tanks.
1270 NOTE: this is intended only for use with the internal_arc. Pushing to
1271 QueueTanks should use 'push_storage'.
1273 Args:
1274 vqip (dict, optional): VQIP amount to push. Defaults to None.
1275 tag (str, optional): Tag, see Node, don't think it should actually be
1276 used for a QueueTank since there are no handlers. Defaults to
1277 'default'.
1279 Returns:
1280 excess (dict): a VQIP amount of excess capacity
1281 """
1282 # TODO does behaviour for volume = None need to be defined?
1283 excess = self.get_excess()
1284 if vqip is not None:
1285 excess["volume"] = min(vqip["volume"], excess["volume"])
1286 return excess
1288 def push_set(self, vqip, tag="default"):
1289 """Behaves differently from normal push setting, it assumes sufficient tank
1290 capacity and receives VQIPs that have reached the END of the internal_arc.
1291 NOTE: this is intended only for use with the internal_arc. Pushing to
1292 QueueTanks should use 'push_storage'.
1294 Args:
1295 vqip (dict): VQIP amount to push
1296 tag (str, optional): Tag, see Node, don't think it should actually be
1297 used for a QueueTank since there are no handlers. Defaults to
1298 'default'.
1300 Returns:
1301 (dict): Returns empty VQIP, indicating all water received (since it
1302 assumes capacity was checked before entering the internal arc)
1303 """
1304 # Update active_storage (since it has reached the end of the internal_arc)
1305 self.active_storage = self.sum_vqip(self.active_storage, vqip)
1307 return self.empty_vqip()
1309 def _end_timestep(self):
1310 """Wrapper for end_timestep that also ends the timestep in the internal_arc."""
1311 self.internal_arc.end_timestep()
1312 self.internal_arc.update_queue()
1313 self.storage_ = self.copy_vqip(self.storage)
1315 def reinit(self):
1316 """Zeros storages and arc."""
1317 self.internal_arc.reinit()
1318 self.storage = self.empty_vqip()
1319 self.storage_ = self.empty_vqip()
1320 self.active_storage = self.empty_vqip()
1323class DecayQueueTank(QueueTank):
1324 """"""
1326 def __init__(self, decays={}, parent=None, number_of_timesteps=1, **kwargs):
1327 """Adds a DecayAltArc in QueueTank to enable decay to occur within the
1328 internal_arc queue.
1330 Args:
1331 decays (dict): A dict of dicts containing a key for each pollutant and,
1332 within that, a key for each parameter (a constant and exponent)
1333 parent (object): An object that can be used to read temperature data from
1334 number_of_timesteps (int, optional): Built in delay for the internal
1335 queue - it is always added to the queue time, although delay can be
1336 provided with pushes only. Defaults to 0.
1337 """
1338 # Initialise QueueTank
1339 super().__init__(number_of_timesteps=number_of_timesteps, **kwargs)
1340 # Replace internal_arc with a DecayArcAlt
1341 self.internal_arc = DecayArcAlt(
1342 in_port=self,
1343 out_port=self,
1344 number_of_timesteps=number_of_timesteps,
1345 parent=parent,
1346 decays=decays,
1347 )
1349 self.end_timestep = self._end_timestep
1351 def _end_timestep(self):
1352 """End timestep wrapper that removes decayed pollutants and calls internal
1353 arc."""
1354 # TODO Should the active storage decay if decays are given (probably.. though
1355 # that sounds like a nightmare)?
1356 self.storage = self.extract_vqip(self.storage, self.internal_arc.total_decayed)
1357 self.storage_ = self.copy_vqip(self.storage)
1358 self.internal_arc.end_timestep()