Skip to content

API Reference - Nodes

This section of the documentation provides a reference for the API of the nodes.nodes module.

Created on Wed Apr 7 08:43:32 2021.

@author: Barney

Converted to totals on Thur Apr 21 2022

Node

Bases: WSIObj

Source code in wsimod/nodes/nodes.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
class Node(WSIObj):
    """"""

    def __init_subclass__(cls, **kwargs):
        """Adds all subclasses to the nodes registry."""
        super().__init_subclass__(**kwargs)
        if cls.__name__ in NODES_REGISTRY:
            logging.warning(f"Overwriting {cls.__name__} in NODES_REGISTRY with {cls}")

        NODES_REGISTRY[cls.__name__] = cls

    def __init__(self, name, data_input_dict=None):
        """Base class for CWSD nodes. Constructs all the necessary attributes for the
        node object.

        Args:
            name (str): Name of node
            data_input_dict (dict, optional): Dictionary of data inputs relevant for
                the node. Keys are tuples where first value is the name of the
                variable to read from the dict and the second value is the time.
                Defaults to None.

        Examples:
            >>> my_node = nodes.Node(name = 'london_river_junction')

        Key assumptions:
            - No physical processes represented, can be used as a junction.

        Input data and parameter requirements:
            - All nodes require a `name`
        """
        node_types = list(NODES_REGISTRY.keys())

        # Default essential parameters
        # Dictionary of arcs
        self.in_arcs = {}
        self.out_arcs = {}
        self.in_arcs_type = {x: {} for x in node_types}
        self.out_arcs_type = {x: {} for x in node_types}

        # Set parameters
        self.name = name
        self.t = None
        self.data_input_dict = data_input_dict

        # Initiailise default handlers
        self.pull_set_handler = {"default": self.pull_distributed}
        self.push_set_handler = {
            "default": lambda x: self.push_distributed(
                x, of_type=["Node", "River", "Waste", "Reservoir"]
            )
        }
        self.pull_check_handler = {"default": self.pull_check_basic}
        self.push_check_handler = {
            "default": lambda x: self.push_check_basic(
                x, of_type=["Node", "River", "Waste", "Reservoir"]
            )
        }
        super().__init__()

        # Mass balance checking
        self.mass_balance_in = [self.total_in]
        self.mass_balance_out = [self.total_out]
        self.mass_balance_ds = [lambda: self.empty_vqip()]

    def apply_overrides(self, overrides: Dict[str, Any] = {}) -> None:
        """Apply overrides to the node.

        The Node does not have any overwriteable parameters. So if any
        overrides are passed up to the node, this means that there are unused
        parameters from the Node subclass, which is flagged.

        Args:
            overrides (dict, optional): Dictionary of overrides. Defaults to {}.
        """
        # overrides data_input_dict
        from wsimod.orchestration.model import read_csv

        content = overrides.pop("filename", None)
        if isinstance(content, str):
            self.data_input_dict = read_csv(content)
        elif not content:
            pass
        else:
            raise RuntimeError("Not recognised format for data_input_dict")

        if len(overrides) > 0:
            print(f"No override behaviour defined for: {overrides.keys()}")

    def total_in(self):
        """Sum flow and pollutant amounts entering a node via in_arcs.

        Returns:
            in_ (dict): Summed VQIP of in_arcs

        Examples:
            >>> node_inflow = my_node.total_in()
        """
        in_ = self.empty_vqip()
        for arc in self.in_arcs.values():
            in_ = self.sum_vqip(in_, arc.vqip_out)

        return in_

    def total_out(self):
        """Sum flow and pollutant amounts leaving a node via out_arcs.

        Returns:
            out_ (dict): Summed VQIP of out_arcs

        Examples:
            >>> node_outflow = my_node.total_out()
        """
        out_ = self.empty_vqip()
        for arc in self.out_arcs.values():
            out_ = self.sum_vqip(out_, arc.vqip_in)

        return out_

    def node_mass_balance(self):
        """Wrapper for core.py/WSIObj/mass_balance. Tracks change in mass balance.

        Returns:
            in_ (dict): A VQIP of the total from mass_balance_in functions
            ds_ (dict): A VQIP of the total from mass_balance_ds functions
            out_ (dict): A VQIP of the total from mass_balance_out functions

        Examples:
            >>> node_in, node_out, node_ds = my_node.node_mass_balance()
        """
        in_, ds_, out_ = self.mass_balance()
        return in_, ds_, out_

    def pull_set(self, vqip, tag="default"):
        """Receives pull set requests from arcs and passes request to query handler.

        Args:
            vqip (dict): the VQIP pull request (by default, only the 'volume' key is
                needed).
            tag (str, optional): optional message to direct query_handler which pull
                function to call. Defaults to 'default'.

        Returns:
            (dict): VQIP received from query_handler

        Examples:
            >>> water_received = my_node.pull_set({'volume' : 10})
        """
        return self.query_handler(self.pull_set_handler, vqip, tag)

    def push_set(self, vqip, tag="default"):
        """Receives push set requests from arcs and passes request to query handler.

        Args:
            vqip (_type_): the VQIP push request
            tag (str, optional): optional message to direct query_handler which pull
                function to call. Defaults to 'default'.

        Returns:
            (dict): VQIP not received from query_handler

        Examples:
            water_not_pushed = my_node.push_set(wastewater_vqip)
        """
        return self.query_handler(self.push_set_handler, vqip, tag)

    def pull_check(self, vqip=None, tag="default"):
        """Receives pull check requests from arcs and passes request to query handler.

        Args:
            vqip (dict, optional): the VQIP pull check (by default, only the
                'volume' key is used). Defaults to None, which returns all available
                water to pull.
            tag (str, optional): optional message to direct query_handler which pull
                function to call. Defaults to 'default'.

        Returns:
            (dict): VQIP available from query_handler

        Examples:
            >>> water_available = my_node.pull_check({'volume' : 10})
            >>> total_water_available = my_node.pull_check()
        """
        return self.query_handler(self.pull_check_handler, vqip, tag)

    def push_check(self, vqip=None, tag="default"):
        """Receives push check requests from arcs and passes request to query handler.

        Args:
            vqip (dict, optional): the VQIP push check. Defaults to None, which
                returns all available capacity to push
            tag (str, optional): optional message to direct query_handler which pull
                function to call. Defaults to 'default'

        Returns:
            (dict): VQIP available to push from query_handler

        Examples:
            >>> total_available_push_capacity = my_node.push_check()
            >>> available_push_capacity = my_node.push_check(wastewater_vqip)
        """
        return self.query_handler(self.push_check_handler, vqip, tag)

    def get_direction_arcs(self, direction, of_type=None):
        """Identify arcs to/from all attached nodes in a given direction.

        Args:
            direction (str): can be either 'pull' or 'push' to send checks to
                receiving or contributing nodes
            of_type (str or list) : optional, can be specified to send checks only
                to nodes of a given type (must be a subclass in nodes.py)

        Returns:
            f (str): Either 'send_pull_check' or 'send_push_check' depending on
                direction
            arcs (list): List of arc objects

        Raises:
            Message if no direction is specified

        Examples:
            >>> arcs_to_push_to = my_node.get_direction_arcs('push')
            >>> arcs_to_pull_from = my_node.get_direction_arcs('pull')
            >>> arcs_from_reservoirs = my_node.get_direction_arcs('pull', of_type =
                'Reservoir')
        """
        if of_type is None:
            # Return all arcs
            if direction == "pull":
                arcs = list(self.in_arcs.values())
                f = "send_pull_check"
            elif direction == "push":
                arcs = list(self.out_arcs.values())
                f = "send_push_check"
            else:
                print("No direction")

        else:
            if isinstance(of_type, str):
                of_type = [of_type]

            # Assign arcs/function based on parameters
            arcs = []
            if direction == "pull":
                for type_ in of_type:
                    arcs += list(self.in_arcs_type[type_].values())
                f = "send_pull_check"
            elif direction == "push":
                for type_ in of_type:
                    arcs += list(self.out_arcs_type[type_].values())
                f = "send_push_check"
            else:
                print("No direction")

        return f, arcs

    def get_connected(self, direction="pull", of_type=None, tag="default"):
        """Send push/pull checks to all attached arcs in a given direction.

        Args:
            direction (str, optional): The type of check to send to all attached
                nodes. Can be 'push' or 'pull'. The default is 'pull'.
            of_type (str or list) : optional, can be specified to send checks only
                to nodes of a given type (must be a subclass in nodes.py)
            tag (str, optional): optional message to direct query_handler which pull
                function to call. Defaults to 'default'.

        Returns:
            connected (dict) :
                Dictionary containing keys:
                'avail': (float) - total available volume for push/pull
                'priority': (float) - total (availability * preference)
                                    of attached arcs
                'allocation': (dict) - contains all attached arcs in specified
                                direction and respective (availability * preference)

        Examples:
            >>> vqip_available_to_pull = my_node.get_direction_arcs()
            >>> vqip_available_to_push = my_node.get_direction_arcs('push')
            >>> avail_reservoir_vqip = my_node.get_direction_arcs('pull',
                                                          of_type = 'Reservoir')
            >>> avail_sewer_push_to_sewers = my_node.get_direction_arcs('push',
                                                                of_type = 'Sewer',
                                                                tag = 'Sewer')
        """
        # Initialise connected dict
        connected = {"avail": 0, "priority": 0, "allocation": {}, "capacity": {}}

        # Get arcs
        f, arcs = self.get_direction_arcs(direction, of_type)

        # Iterate over arcs, updating connected dict
        for arc in arcs:
            avail = getattr(arc, f)(tag=tag)["volume"]
            if avail < constants.FLOAT_ACCURACY:
                avail = 0  # Improves convergence
            connected["avail"] += avail
            preference = arc.preference
            connected["priority"] += avail * preference
            connected["allocation"][arc.name] = avail * preference
            connected["capacity"][arc.name] = avail

        return connected

    def query_handler(self, handler, ip, tag):
        """Sends all push/pull requests/checks using the handler (i.e., ensures the
        correct function is used that lines up with 'tag').

        Args:
            handler (dict): contains all push/pull requests for various tags
            ip (vqip): the vqip request
            tag (str): describes what type of push/pull request should be called

        Returns:
            (dict): the VQIP reply from push/pull request

        Raises:
            Message if no functions are defined for tag and if request/check
            function fails
        """
        try:
            return handler[tag](ip)
        except Exception:
            if tag not in handler.keys():
                print("No functions defined for " + tag)
                return handler[tag](ip)
            else:
                print("Some other error")
                return handler[tag](ip)

    def pull_distributed(self, vqip, of_type=None, tag="default"):
        """Send pull requests to all (or specified by type) nodes connecting to self.
        Iterate until request is met or maximum iterations are hit. Streamlines if only
        one in_arc exists.

        Args:
            vqip (dict): Total amount to pull (by default, only the
                'volume' key is used)
            of_type (str or list) : optional, can be specified to send checks only
                to nodes of a given type (must be a subclass in nodes.py)
            tag (str, optional): optional message to direct query_handler which pull
                function to call. Defaults to 'default'.

        Returns:
            pulled (dict): VQIP of combined pulled water
        """
        if len(self.in_arcs) == 1:
            # If only one in_arc, just pull from that
            if of_type is None:
                pulled = next(iter(self.in_arcs.values())).send_pull_request(
                    vqip, tag=tag
                )
            elif any(
                [x in of_type for x, y in self.in_arcs_type.items() if len(y) > 0]
            ):
                pulled = next(iter(self.in_arcs.values())).send_pull_request(
                    vqip, tag=tag
                )
            else:
                # No viable out arcs
                pulled = self.empty_vqip()
        else:
            # Pull in proportion from connected by priority

            # Initialise pulled, deficit, connected, iter_
            pulled = self.empty_vqip()
            deficit = vqip["volume"]
            connected = self.get_connected(direction="pull", of_type=of_type, tag=tag)
            iter_ = 0

            # Iterate over sending nodes until deficit met
            while (
                (deficit > constants.FLOAT_ACCURACY)
                & (connected["avail"] > constants.FLOAT_ACCURACY)
            ) & (iter_ < constants.MAXITER):
                # Pull from connected
                for key, allocation in connected["allocation"].items():
                    received = self.in_arcs[key].send_pull_request(
                        {"volume": deficit * allocation / connected["priority"]},
                        tag=tag,
                    )
                    pulled = self.sum_vqip(pulled, received)

                # Update deficit, connected and iter_
                deficit = vqip["volume"] - pulled["volume"]
                connected = self.get_connected(
                    direction="pull", of_type=of_type, tag=tag
                )
                iter_ += 1

            if iter_ == constants.MAXITER:
                print("Maxiter reached in {0} at {1}".format(self.name, self.t))
        return pulled

    def push_distributed(self, vqip, of_type=None, tag="default"):
        """Send push requests to all (or specified by type) nodes connecting to self.
        Iterate until request is met or maximum iterations are hit. Streamlines if only
        one in_arc exists.

        Args:
            vqip (dict): Total amount to push
            of_type (str or list) : optional, can be specified to send checks only
                to nodes of a given type (must be a subclass in nodes.py)
            tag (str, optional): optional message to direct query_handler which pull
                function to call. Defaults to 'default'.

        Returns:
            not_pushed_ (dict): VQIP of water that cannot be pushed
        """
        if len(self.out_arcs) == 1:
            # If only one out_arc, just send the water down that
            if of_type is None:
                not_pushed_ = next(iter(self.out_arcs.values())).send_push_request(
                    vqip, tag=tag
                )
            elif any(
                [x in of_type for x, y in self.out_arcs_type.items() if len(y) > 0]
            ):
                not_pushed_ = next(iter(self.out_arcs.values())).send_push_request(
                    vqip, tag=tag
                )
            else:
                # No viable out arcs
                not_pushed_ = vqip
        else:
            # Push in proportion to connected by priority
            # Initialise pushed, deficit, connected, iter_
            not_pushed = vqip["volume"]
            not_pushed_ = self.copy_vqip(vqip)
            connected = self.get_connected(direction="push", of_type=of_type, tag=tag)
            iter_ = 0
            if not_pushed > connected["avail"]:
                # If more water than can be pushed, ignore preference and allocate all
                #   available based on capacity
                connected["priority"] = connected["avail"]
                connected["allocation"] = connected["capacity"]

            # Iterate over receiving nodes until sent
            while (
                (not_pushed > constants.FLOAT_ACCURACY)
                & (connected["avail"] > constants.FLOAT_ACCURACY)
                & (iter_ < constants.MAXITER)
            ):
                # Push to connected
                amount_to_push = min(connected["avail"], not_pushed)

                for key, allocation in connected["allocation"].items():
                    to_send = amount_to_push * allocation / connected["priority"]
                    to_send = self.v_change_vqip(not_pushed_, to_send)
                    reply = self.out_arcs[key].send_push_request(to_send, tag=tag)

                    sent = self.extract_vqip(to_send, reply)
                    not_pushed_ = self.extract_vqip(not_pushed_, sent)

                not_pushed = not_pushed_["volume"]
                connected = self.get_connected(
                    direction="push", of_type=of_type, tag=tag
                )
                iter_ += 1

            if iter_ == constants.MAXITER:
                print("Maxiter reached in {0} at {1}".format(self.name, self.t))

        return not_pushed_

    def check_basic(self, direction, vqip=None, of_type=None, tag="default"):
        """Generic function that conveys a pull or push check onwards to connected
        nodes. It is the default behaviour that treats a node like a junction.

        Args:
            direction (str): can be either 'pull' or 'push' to send checks to
                receiving or contributing nodes
            vqip (dict, optional): The VQIP to check. Defaults to None (if pulling
                this will return available water to pull, if pushing then available
                capacity to push).
            of_type (str or list) : optional, can be specified to send checks only
                to nodes of a given type (must be a subclass in nodes.py)
            tag (str, optional): optional message to direct query_handler which pull
                function to call. Defaults to 'default'.

        Returns:
            avail (dict): VQIP responses summed over all requests
        """
        f, arcs = self.get_direction_arcs(direction, of_type)

        # Iterate over arcs, updating total
        avail = self.empty_vqip()
        for arc in arcs:
            avail = self.sum_vqip(avail, getattr(arc, f)(tag=tag))

        if vqip is not None:
            avail = self.v_change_vqip(avail, min(avail["volume"], vqip["volume"]))

        return avail

    def pull_check_basic(self, vqip=None, of_type=None, tag="default"):
        """Default node check behaviour that treats a node like a junction. Water
        available to pull is just the water available to pull from upstream connected
        nodes.

        Args:
            vqip (dict, optional): VQIP from handler of amount to pull check
                (by default, only the 'volume' key is used). Defaults to None (which
                returns all availalbe water to pull).
            of_type (str or list) : optional, can be specified to send checks only
                to nodes of a given type (must be a subclass in nodes.py)
            tag (str, optional): optional message to direct query_handler which pull
                function to call. Defaults to 'default'.

        Returns:
            (dict): VQIP check response of upstream nodes
        """
        return self.check_basic("pull", vqip, of_type, tag)

    def push_check_basic(self, vqip=None, of_type=None, tag="default"):
        """Default node check behaviour that treats a node like a junction. Water
        available to push is just the water available to push to downstream connected
        nodes.

        Args:
            vqip (dict, optional): VQIP from handler of amount to push check.
                Defaults to None (which returns all available capacity to push).
            of_type (str or list) : optional, can be specified to send checks only
                to nodes of a given type (must be a subclass in nodes.py)
            tag (str, optional): optional message to direct query_handler which pull
                function to call. Defaults to 'default'.

        Returns:
            (dict): VQIP check response of downstream nodes
        """
        return self.check_basic("push", vqip, of_type, tag)

    def pull_set_deny(self, vqip):
        """Responds that no water is available to pull from a request.

        Args:
            vqip (dict): A VQIP amount of water requested (ignored)

        Returns:
            (dict): An empty VQIP indicated no water was pulled

        Raises:
            Message when called, since it would usually occur if a model is
            improperly connected
        """
        print("Attempted pull set from deny")
        return self.empty_vqip()

    def pull_check_deny(self, vqip=None):
        """Responds that no water is available to pull from a check.

        Args:
            vqip (dict): A VQIP amount of water requested (ignored)

        Returns:
            (dict): An empty VQIP indicated no water was pulled

        Raises:
            Message when called, since it would usually occur if a model is
            improperly connected
        """
        print("Attempted pull check from deny")
        return self.empty_vqip()

    def push_set_deny(self, vqip):
        """Responds that no water is available to push in a request.

        Args:
            vqip (dict): A VQIP amount of water to push

        Returns:
            vqip (dict): Returns the request indicating no water was pushed

        Raises:
            Message when called, since it would usually occur if a model is
            improperly connected
        """
        print("Attempted push set to deny")
        return vqip

    def push_check_deny(self, vqip=None):
        """Responds that no water is available to push in a check.

        Args:
            vqip (dict): A VQIP amount of water to push check (ignored)

        Returns:
            (dict): An empty VQIP indicated no capacity for pushes exists

        Raises:
            Message when called, since it would usually occur if a model is
            improperly connected
        """
        print("Attempted push check to deny")
        return self.empty_vqip()

    def push_check_accept(self, vqip=None):
        """Push check function that accepts all water.

        Args:
            vqip (dict, optional): A VQIP that has been pushed (ignored)

        Returns:
            (dict): VQIP or an unbounded capacity, indicating all water can be received
        """
        if not vqip:
            vqip = self.empty_vqip()
            vqip["volume"] = constants.UNBOUNDED_CAPACITY
        return vqip

    def get_data_input(self, var):
        """Read data from data_input_dict. Keys are tuples with the first entry as the
        variable to read and second entry the time.

        Args:
            var (str): Name of variable

        Returns:
            Data read
        """
        return self.data_input_dict[(var, self.t)]

    def end_timestep(self):
        """Empty function intended to be called at the end of every timestep.

        Subclasses will overwrite this functions.
        """
        pass

    def reinit(self):
        """Empty function to be written if reinitialisation capability is added."""
        pass

__init__(name, data_input_dict=None)

Base class for CWSD nodes. Constructs all the necessary attributes for the node object.

Parameters:

Name Type Description Default
name str

Name of node

required
data_input_dict dict

Dictionary of data inputs relevant for the node. Keys are tuples where first value is the name of the variable to read from the dict and the second value is the time. Defaults to None.

None

Examples:

>>> my_node = nodes.Node(name = 'london_river_junction')
Key assumptions
  • No physical processes represented, can be used as a junction.
Input data and parameter requirements
  • All nodes require a name
Source code in wsimod/nodes/nodes.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def __init__(self, name, data_input_dict=None):
    """Base class for CWSD nodes. Constructs all the necessary attributes for the
    node object.

    Args:
        name (str): Name of node
        data_input_dict (dict, optional): Dictionary of data inputs relevant for
            the node. Keys are tuples where first value is the name of the
            variable to read from the dict and the second value is the time.
            Defaults to None.

    Examples:
        >>> my_node = nodes.Node(name = 'london_river_junction')

    Key assumptions:
        - No physical processes represented, can be used as a junction.

    Input data and parameter requirements:
        - All nodes require a `name`
    """
    node_types = list(NODES_REGISTRY.keys())

    # Default essential parameters
    # Dictionary of arcs
    self.in_arcs = {}
    self.out_arcs = {}
    self.in_arcs_type = {x: {} for x in node_types}
    self.out_arcs_type = {x: {} for x in node_types}

    # Set parameters
    self.name = name
    self.t = None
    self.data_input_dict = data_input_dict

    # Initiailise default handlers
    self.pull_set_handler = {"default": self.pull_distributed}
    self.push_set_handler = {
        "default": lambda x: self.push_distributed(
            x, of_type=["Node", "River", "Waste", "Reservoir"]
        )
    }
    self.pull_check_handler = {"default": self.pull_check_basic}
    self.push_check_handler = {
        "default": lambda x: self.push_check_basic(
            x, of_type=["Node", "River", "Waste", "Reservoir"]
        )
    }
    super().__init__()

    # Mass balance checking
    self.mass_balance_in = [self.total_in]
    self.mass_balance_out = [self.total_out]
    self.mass_balance_ds = [lambda: self.empty_vqip()]

__init_subclass__(**kwargs)

Adds all subclasses to the nodes registry.

Source code in wsimod/nodes/nodes.py
18
19
20
21
22
23
24
def __init_subclass__(cls, **kwargs):
    """Adds all subclasses to the nodes registry."""
    super().__init_subclass__(**kwargs)
    if cls.__name__ in NODES_REGISTRY:
        logging.warning(f"Overwriting {cls.__name__} in NODES_REGISTRY with {cls}")

    NODES_REGISTRY[cls.__name__] = cls

apply_overrides(overrides={})

Apply overrides to the node.

The Node does not have any overwriteable parameters. So if any overrides are passed up to the node, this means that there are unused parameters from the Node subclass, which is flagged.

Parameters:

Name Type Description Default
overrides dict

Dictionary of overrides. Defaults to {}.

{}
Source code in wsimod/nodes/nodes.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def apply_overrides(self, overrides: Dict[str, Any] = {}) -> None:
    """Apply overrides to the node.

    The Node does not have any overwriteable parameters. So if any
    overrides are passed up to the node, this means that there are unused
    parameters from the Node subclass, which is flagged.

    Args:
        overrides (dict, optional): Dictionary of overrides. Defaults to {}.
    """
    # overrides data_input_dict
    from wsimod.orchestration.model import read_csv

    content = overrides.pop("filename", None)
    if isinstance(content, str):
        self.data_input_dict = read_csv(content)
    elif not content:
        pass
    else:
        raise RuntimeError("Not recognised format for data_input_dict")

    if len(overrides) > 0:
        print(f"No override behaviour defined for: {overrides.keys()}")

check_basic(direction, vqip=None, of_type=None, tag='default')

Generic function that conveys a pull or push check onwards to connected nodes. It is the default behaviour that treats a node like a junction.

Parameters:

Name Type Description Default
direction str

can be either 'pull' or 'push' to send checks to receiving or contributing nodes

required
vqip dict

The VQIP to check. Defaults to None (if pulling this will return available water to pull, if pushing then available capacity to push).

None
of_type str or list)

optional, can be specified to send checks only to nodes of a given type (must be a subclass in nodes.py)

None
tag str

optional message to direct query_handler which pull function to call. Defaults to 'default'.

'default'

Returns:

Name Type Description
avail dict

VQIP responses summed over all requests

Source code in wsimod/nodes/nodes.py
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
def check_basic(self, direction, vqip=None, of_type=None, tag="default"):
    """Generic function that conveys a pull or push check onwards to connected
    nodes. It is the default behaviour that treats a node like a junction.

    Args:
        direction (str): can be either 'pull' or 'push' to send checks to
            receiving or contributing nodes
        vqip (dict, optional): The VQIP to check. Defaults to None (if pulling
            this will return available water to pull, if pushing then available
            capacity to push).
        of_type (str or list) : optional, can be specified to send checks only
            to nodes of a given type (must be a subclass in nodes.py)
        tag (str, optional): optional message to direct query_handler which pull
            function to call. Defaults to 'default'.

    Returns:
        avail (dict): VQIP responses summed over all requests
    """
    f, arcs = self.get_direction_arcs(direction, of_type)

    # Iterate over arcs, updating total
    avail = self.empty_vqip()
    for arc in arcs:
        avail = self.sum_vqip(avail, getattr(arc, f)(tag=tag))

    if vqip is not None:
        avail = self.v_change_vqip(avail, min(avail["volume"], vqip["volume"]))

    return avail

end_timestep()

Empty function intended to be called at the end of every timestep.

Subclasses will overwrite this functions.

Source code in wsimod/nodes/nodes.py
637
638
639
640
641
642
def end_timestep(self):
    """Empty function intended to be called at the end of every timestep.

    Subclasses will overwrite this functions.
    """
    pass

get_connected(direction='pull', of_type=None, tag='default')

Send push/pull checks to all attached arcs in a given direction.

Parameters:

Name Type Description Default
direction str

The type of check to send to all attached nodes. Can be 'push' or 'pull'. The default is 'pull'.

'pull'
of_type str or list)

optional, can be specified to send checks only to nodes of a given type (must be a subclass in nodes.py)

None
tag str

optional message to direct query_handler which pull function to call. Defaults to 'default'.

'default'

Returns:

Type Description

connected (dict) : Dictionary containing keys: 'avail': (float) - total available volume for push/pull 'priority': (float) - total (availability * preference) of attached arcs 'allocation': (dict) - contains all attached arcs in specified direction and respective (availability * preference)

Examples:

>>> vqip_available_to_pull = my_node.get_direction_arcs()
>>> vqip_available_to_push = my_node.get_direction_arcs('push')
>>> avail_reservoir_vqip = my_node.get_direction_arcs('pull',
                                              of_type = 'Reservoir')
>>> avail_sewer_push_to_sewers = my_node.get_direction_arcs('push',
                                                    of_type = 'Sewer',
                                                    tag = 'Sewer')
Source code in wsimod/nodes/nodes.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def get_connected(self, direction="pull", of_type=None, tag="default"):
    """Send push/pull checks to all attached arcs in a given direction.

    Args:
        direction (str, optional): The type of check to send to all attached
            nodes. Can be 'push' or 'pull'. The default is 'pull'.
        of_type (str or list) : optional, can be specified to send checks only
            to nodes of a given type (must be a subclass in nodes.py)
        tag (str, optional): optional message to direct query_handler which pull
            function to call. Defaults to 'default'.

    Returns:
        connected (dict) :
            Dictionary containing keys:
            'avail': (float) - total available volume for push/pull
            'priority': (float) - total (availability * preference)
                                of attached arcs
            'allocation': (dict) - contains all attached arcs in specified
                            direction and respective (availability * preference)

    Examples:
        >>> vqip_available_to_pull = my_node.get_direction_arcs()
        >>> vqip_available_to_push = my_node.get_direction_arcs('push')
        >>> avail_reservoir_vqip = my_node.get_direction_arcs('pull',
                                                      of_type = 'Reservoir')
        >>> avail_sewer_push_to_sewers = my_node.get_direction_arcs('push',
                                                            of_type = 'Sewer',
                                                            tag = 'Sewer')
    """
    # Initialise connected dict
    connected = {"avail": 0, "priority": 0, "allocation": {}, "capacity": {}}

    # Get arcs
    f, arcs = self.get_direction_arcs(direction, of_type)

    # Iterate over arcs, updating connected dict
    for arc in arcs:
        avail = getattr(arc, f)(tag=tag)["volume"]
        if avail < constants.FLOAT_ACCURACY:
            avail = 0  # Improves convergence
        connected["avail"] += avail
        preference = arc.preference
        connected["priority"] += avail * preference
        connected["allocation"][arc.name] = avail * preference
        connected["capacity"][arc.name] = avail

    return connected

get_data_input(var)

Read data from data_input_dict. Keys are tuples with the first entry as the variable to read and second entry the time.

Parameters:

Name Type Description Default
var str

Name of variable

required

Returns:

Type Description

Data read

Source code in wsimod/nodes/nodes.py
625
626
627
628
629
630
631
632
633
634
635
def get_data_input(self, var):
    """Read data from data_input_dict. Keys are tuples with the first entry as the
    variable to read and second entry the time.

    Args:
        var (str): Name of variable

    Returns:
        Data read
    """
    return self.data_input_dict[(var, self.t)]

get_direction_arcs(direction, of_type=None)

Identify arcs to/from all attached nodes in a given direction.

Parameters:

Name Type Description Default
direction str

can be either 'pull' or 'push' to send checks to receiving or contributing nodes

required
of_type str or list)

optional, can be specified to send checks only to nodes of a given type (must be a subclass in nodes.py)

None

Returns:

Name Type Description
f str

Either 'send_pull_check' or 'send_push_check' depending on direction

arcs list

List of arc objects

Examples:

>>> arcs_to_push_to = my_node.get_direction_arcs('push')
>>> arcs_to_pull_from = my_node.get_direction_arcs('pull')
>>> arcs_from_reservoirs = my_node.get_direction_arcs('pull', of_type =
    'Reservoir')
Source code in wsimod/nodes/nodes.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def get_direction_arcs(self, direction, of_type=None):
    """Identify arcs to/from all attached nodes in a given direction.

    Args:
        direction (str): can be either 'pull' or 'push' to send checks to
            receiving or contributing nodes
        of_type (str or list) : optional, can be specified to send checks only
            to nodes of a given type (must be a subclass in nodes.py)

    Returns:
        f (str): Either 'send_pull_check' or 'send_push_check' depending on
            direction
        arcs (list): List of arc objects

    Raises:
        Message if no direction is specified

    Examples:
        >>> arcs_to_push_to = my_node.get_direction_arcs('push')
        >>> arcs_to_pull_from = my_node.get_direction_arcs('pull')
        >>> arcs_from_reservoirs = my_node.get_direction_arcs('pull', of_type =
            'Reservoir')
    """
    if of_type is None:
        # Return all arcs
        if direction == "pull":
            arcs = list(self.in_arcs.values())
            f = "send_pull_check"
        elif direction == "push":
            arcs = list(self.out_arcs.values())
            f = "send_push_check"
        else:
            print("No direction")

    else:
        if isinstance(of_type, str):
            of_type = [of_type]

        # Assign arcs/function based on parameters
        arcs = []
        if direction == "pull":
            for type_ in of_type:
                arcs += list(self.in_arcs_type[type_].values())
            f = "send_pull_check"
        elif direction == "push":
            for type_ in of_type:
                arcs += list(self.out_arcs_type[type_].values())
            f = "send_push_check"
        else:
            print("No direction")

    return f, arcs

node_mass_balance()

Wrapper for core.py/WSIObj/mass_balance. Tracks change in mass balance.

Returns:

Name Type Description
in_ dict

A VQIP of the total from mass_balance_in functions

ds_ dict

A VQIP of the total from mass_balance_ds functions

out_ dict

A VQIP of the total from mass_balance_out functions

Examples:

>>> node_in, node_out, node_ds = my_node.node_mass_balance()
Source code in wsimod/nodes/nodes.py
134
135
136
137
138
139
140
141
142
143
144
145
146
def node_mass_balance(self):
    """Wrapper for core.py/WSIObj/mass_balance. Tracks change in mass balance.

    Returns:
        in_ (dict): A VQIP of the total from mass_balance_in functions
        ds_ (dict): A VQIP of the total from mass_balance_ds functions
        out_ (dict): A VQIP of the total from mass_balance_out functions

    Examples:
        >>> node_in, node_out, node_ds = my_node.node_mass_balance()
    """
    in_, ds_, out_ = self.mass_balance()
    return in_, ds_, out_

pull_check(vqip=None, tag='default')

Receives pull check requests from arcs and passes request to query handler.

Parameters:

Name Type Description Default
vqip dict

the VQIP pull check (by default, only the 'volume' key is used). Defaults to None, which returns all available water to pull.

None
tag str

optional message to direct query_handler which pull function to call. Defaults to 'default'.

'default'

Returns:

Type Description
dict

VQIP available from query_handler

Examples:

>>> water_available = my_node.pull_check({'volume' : 10})
>>> total_water_available = my_node.pull_check()
Source code in wsimod/nodes/nodes.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def pull_check(self, vqip=None, tag="default"):
    """Receives pull check requests from arcs and passes request to query handler.

    Args:
        vqip (dict, optional): the VQIP pull check (by default, only the
            'volume' key is used). Defaults to None, which returns all available
            water to pull.
        tag (str, optional): optional message to direct query_handler which pull
            function to call. Defaults to 'default'.

    Returns:
        (dict): VQIP available from query_handler

    Examples:
        >>> water_available = my_node.pull_check({'volume' : 10})
        >>> total_water_available = my_node.pull_check()
    """
    return self.query_handler(self.pull_check_handler, vqip, tag)

pull_check_basic(vqip=None, of_type=None, tag='default')

Default node check behaviour that treats a node like a junction. Water available to pull is just the water available to pull from upstream connected nodes.

Parameters:

Name Type Description Default
vqip dict

VQIP from handler of amount to pull check (by default, only the 'volume' key is used). Defaults to None (which returns all availalbe water to pull).

None
of_type str or list)

optional, can be specified to send checks only to nodes of a given type (must be a subclass in nodes.py)

None
tag str

optional message to direct query_handler which pull function to call. Defaults to 'default'.

'default'

Returns:

Type Description
dict

VQIP check response of upstream nodes

Source code in wsimod/nodes/nodes.py
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
def pull_check_basic(self, vqip=None, of_type=None, tag="default"):
    """Default node check behaviour that treats a node like a junction. Water
    available to pull is just the water available to pull from upstream connected
    nodes.

    Args:
        vqip (dict, optional): VQIP from handler of amount to pull check
            (by default, only the 'volume' key is used). Defaults to None (which
            returns all availalbe water to pull).
        of_type (str or list) : optional, can be specified to send checks only
            to nodes of a given type (must be a subclass in nodes.py)
        tag (str, optional): optional message to direct query_handler which pull
            function to call. Defaults to 'default'.

    Returns:
        (dict): VQIP check response of upstream nodes
    """
    return self.check_basic("pull", vqip, of_type, tag)

pull_check_deny(vqip=None)

Responds that no water is available to pull from a check.

Parameters:

Name Type Description Default
vqip dict

A VQIP amount of water requested (ignored)

None

Returns:

Type Description
dict

An empty VQIP indicated no water was pulled

Source code in wsimod/nodes/nodes.py
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
def pull_check_deny(self, vqip=None):
    """Responds that no water is available to pull from a check.

    Args:
        vqip (dict): A VQIP amount of water requested (ignored)

    Returns:
        (dict): An empty VQIP indicated no water was pulled

    Raises:
        Message when called, since it would usually occur if a model is
        improperly connected
    """
    print("Attempted pull check from deny")
    return self.empty_vqip()

pull_distributed(vqip, of_type=None, tag='default')

Send pull requests to all (or specified by type) nodes connecting to self. Iterate until request is met or maximum iterations are hit. Streamlines if only one in_arc exists.

Parameters:

Name Type Description Default
vqip dict

Total amount to pull (by default, only the 'volume' key is used)

required
of_type str or list)

optional, can be specified to send checks only to nodes of a given type (must be a subclass in nodes.py)

None
tag str

optional message to direct query_handler which pull function to call. Defaults to 'default'.

'default'

Returns:

Name Type Description
pulled dict

VQIP of combined pulled water

Source code in wsimod/nodes/nodes.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
def pull_distributed(self, vqip, of_type=None, tag="default"):
    """Send pull requests to all (or specified by type) nodes connecting to self.
    Iterate until request is met or maximum iterations are hit. Streamlines if only
    one in_arc exists.

    Args:
        vqip (dict): Total amount to pull (by default, only the
            'volume' key is used)
        of_type (str or list) : optional, can be specified to send checks only
            to nodes of a given type (must be a subclass in nodes.py)
        tag (str, optional): optional message to direct query_handler which pull
            function to call. Defaults to 'default'.

    Returns:
        pulled (dict): VQIP of combined pulled water
    """
    if len(self.in_arcs) == 1:
        # If only one in_arc, just pull from that
        if of_type is None:
            pulled = next(iter(self.in_arcs.values())).send_pull_request(
                vqip, tag=tag
            )
        elif any(
            [x in of_type for x, y in self.in_arcs_type.items() if len(y) > 0]
        ):
            pulled = next(iter(self.in_arcs.values())).send_pull_request(
                vqip, tag=tag
            )
        else:
            # No viable out arcs
            pulled = self.empty_vqip()
    else:
        # Pull in proportion from connected by priority

        # Initialise pulled, deficit, connected, iter_
        pulled = self.empty_vqip()
        deficit = vqip["volume"]
        connected = self.get_connected(direction="pull", of_type=of_type, tag=tag)
        iter_ = 0

        # Iterate over sending nodes until deficit met
        while (
            (deficit > constants.FLOAT_ACCURACY)
            & (connected["avail"] > constants.FLOAT_ACCURACY)
        ) & (iter_ < constants.MAXITER):
            # Pull from connected
            for key, allocation in connected["allocation"].items():
                received = self.in_arcs[key].send_pull_request(
                    {"volume": deficit * allocation / connected["priority"]},
                    tag=tag,
                )
                pulled = self.sum_vqip(pulled, received)

            # Update deficit, connected and iter_
            deficit = vqip["volume"] - pulled["volume"]
            connected = self.get_connected(
                direction="pull", of_type=of_type, tag=tag
            )
            iter_ += 1

        if iter_ == constants.MAXITER:
            print("Maxiter reached in {0} at {1}".format(self.name, self.t))
    return pulled

pull_set(vqip, tag='default')

Receives pull set requests from arcs and passes request to query handler.

Parameters:

Name Type Description Default
vqip dict

the VQIP pull request (by default, only the 'volume' key is needed).

required
tag str

optional message to direct query_handler which pull function to call. Defaults to 'default'.

'default'

Returns:

Type Description
dict

VQIP received from query_handler

Examples:

>>> water_received = my_node.pull_set({'volume' : 10})
Source code in wsimod/nodes/nodes.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def pull_set(self, vqip, tag="default"):
    """Receives pull set requests from arcs and passes request to query handler.

    Args:
        vqip (dict): the VQIP pull request (by default, only the 'volume' key is
            needed).
        tag (str, optional): optional message to direct query_handler which pull
            function to call. Defaults to 'default'.

    Returns:
        (dict): VQIP received from query_handler

    Examples:
        >>> water_received = my_node.pull_set({'volume' : 10})
    """
    return self.query_handler(self.pull_set_handler, vqip, tag)

pull_set_deny(vqip)

Responds that no water is available to pull from a request.

Parameters:

Name Type Description Default
vqip dict

A VQIP amount of water requested (ignored)

required

Returns:

Type Description
dict

An empty VQIP indicated no water was pulled

Source code in wsimod/nodes/nodes.py
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
def pull_set_deny(self, vqip):
    """Responds that no water is available to pull from a request.

    Args:
        vqip (dict): A VQIP amount of water requested (ignored)

    Returns:
        (dict): An empty VQIP indicated no water was pulled

    Raises:
        Message when called, since it would usually occur if a model is
        improperly connected
    """
    print("Attempted pull set from deny")
    return self.empty_vqip()

push_check(vqip=None, tag='default')

Receives push check requests from arcs and passes request to query handler.

Parameters:

Name Type Description Default
vqip dict

the VQIP push check. Defaults to None, which returns all available capacity to push

None
tag str

optional message to direct query_handler which pull function to call. Defaults to 'default'

'default'

Returns:

Type Description
dict

VQIP available to push from query_handler

Examples:

>>> total_available_push_capacity = my_node.push_check()
>>> available_push_capacity = my_node.push_check(wastewater_vqip)
Source code in wsimod/nodes/nodes.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def push_check(self, vqip=None, tag="default"):
    """Receives push check requests from arcs and passes request to query handler.

    Args:
        vqip (dict, optional): the VQIP push check. Defaults to None, which
            returns all available capacity to push
        tag (str, optional): optional message to direct query_handler which pull
            function to call. Defaults to 'default'

    Returns:
        (dict): VQIP available to push from query_handler

    Examples:
        >>> total_available_push_capacity = my_node.push_check()
        >>> available_push_capacity = my_node.push_check(wastewater_vqip)
    """
    return self.query_handler(self.push_check_handler, vqip, tag)

push_check_accept(vqip=None)

Push check function that accepts all water.

Parameters:

Name Type Description Default
vqip dict

A VQIP that has been pushed (ignored)

None

Returns:

Type Description
dict

VQIP or an unbounded capacity, indicating all water can be received

Source code in wsimod/nodes/nodes.py
611
612
613
614
615
616
617
618
619
620
621
622
623
def push_check_accept(self, vqip=None):
    """Push check function that accepts all water.

    Args:
        vqip (dict, optional): A VQIP that has been pushed (ignored)

    Returns:
        (dict): VQIP or an unbounded capacity, indicating all water can be received
    """
    if not vqip:
        vqip = self.empty_vqip()
        vqip["volume"] = constants.UNBOUNDED_CAPACITY
    return vqip

push_check_basic(vqip=None, of_type=None, tag='default')

Default node check behaviour that treats a node like a junction. Water available to push is just the water available to push to downstream connected nodes.

Parameters:

Name Type Description Default
vqip dict

VQIP from handler of amount to push check. Defaults to None (which returns all available capacity to push).

None
of_type str or list)

optional, can be specified to send checks only to nodes of a given type (must be a subclass in nodes.py)

None
tag str

optional message to direct query_handler which pull function to call. Defaults to 'default'.

'default'

Returns:

Type Description
dict

VQIP check response of downstream nodes

Source code in wsimod/nodes/nodes.py
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
def push_check_basic(self, vqip=None, of_type=None, tag="default"):
    """Default node check behaviour that treats a node like a junction. Water
    available to push is just the water available to push to downstream connected
    nodes.

    Args:
        vqip (dict, optional): VQIP from handler of amount to push check.
            Defaults to None (which returns all available capacity to push).
        of_type (str or list) : optional, can be specified to send checks only
            to nodes of a given type (must be a subclass in nodes.py)
        tag (str, optional): optional message to direct query_handler which pull
            function to call. Defaults to 'default'.

    Returns:
        (dict): VQIP check response of downstream nodes
    """
    return self.check_basic("push", vqip, of_type, tag)

push_check_deny(vqip=None)

Responds that no water is available to push in a check.

Parameters:

Name Type Description Default
vqip dict

A VQIP amount of water to push check (ignored)

None

Returns:

Type Description
dict

An empty VQIP indicated no capacity for pushes exists

Source code in wsimod/nodes/nodes.py
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
def push_check_deny(self, vqip=None):
    """Responds that no water is available to push in a check.

    Args:
        vqip (dict): A VQIP amount of water to push check (ignored)

    Returns:
        (dict): An empty VQIP indicated no capacity for pushes exists

    Raises:
        Message when called, since it would usually occur if a model is
        improperly connected
    """
    print("Attempted push check to deny")
    return self.empty_vqip()

push_distributed(vqip, of_type=None, tag='default')

Send push requests to all (or specified by type) nodes connecting to self. Iterate until request is met or maximum iterations are hit. Streamlines if only one in_arc exists.

Parameters:

Name Type Description Default
vqip dict

Total amount to push

required
of_type str or list)

optional, can be specified to send checks only to nodes of a given type (must be a subclass in nodes.py)

None
tag str

optional message to direct query_handler which pull function to call. Defaults to 'default'.

'default'

Returns:

Name Type Description
not_pushed_ dict

VQIP of water that cannot be pushed

Source code in wsimod/nodes/nodes.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
def push_distributed(self, vqip, of_type=None, tag="default"):
    """Send push requests to all (or specified by type) nodes connecting to self.
    Iterate until request is met or maximum iterations are hit. Streamlines if only
    one in_arc exists.

    Args:
        vqip (dict): Total amount to push
        of_type (str or list) : optional, can be specified to send checks only
            to nodes of a given type (must be a subclass in nodes.py)
        tag (str, optional): optional message to direct query_handler which pull
            function to call. Defaults to 'default'.

    Returns:
        not_pushed_ (dict): VQIP of water that cannot be pushed
    """
    if len(self.out_arcs) == 1:
        # If only one out_arc, just send the water down that
        if of_type is None:
            not_pushed_ = next(iter(self.out_arcs.values())).send_push_request(
                vqip, tag=tag
            )
        elif any(
            [x in of_type for x, y in self.out_arcs_type.items() if len(y) > 0]
        ):
            not_pushed_ = next(iter(self.out_arcs.values())).send_push_request(
                vqip, tag=tag
            )
        else:
            # No viable out arcs
            not_pushed_ = vqip
    else:
        # Push in proportion to connected by priority
        # Initialise pushed, deficit, connected, iter_
        not_pushed = vqip["volume"]
        not_pushed_ = self.copy_vqip(vqip)
        connected = self.get_connected(direction="push", of_type=of_type, tag=tag)
        iter_ = 0
        if not_pushed > connected["avail"]:
            # If more water than can be pushed, ignore preference and allocate all
            #   available based on capacity
            connected["priority"] = connected["avail"]
            connected["allocation"] = connected["capacity"]

        # Iterate over receiving nodes until sent
        while (
            (not_pushed > constants.FLOAT_ACCURACY)
            & (connected["avail"] > constants.FLOAT_ACCURACY)
            & (iter_ < constants.MAXITER)
        ):
            # Push to connected
            amount_to_push = min(connected["avail"], not_pushed)

            for key, allocation in connected["allocation"].items():
                to_send = amount_to_push * allocation / connected["priority"]
                to_send = self.v_change_vqip(not_pushed_, to_send)
                reply = self.out_arcs[key].send_push_request(to_send, tag=tag)

                sent = self.extract_vqip(to_send, reply)
                not_pushed_ = self.extract_vqip(not_pushed_, sent)

            not_pushed = not_pushed_["volume"]
            connected = self.get_connected(
                direction="push", of_type=of_type, tag=tag
            )
            iter_ += 1

        if iter_ == constants.MAXITER:
            print("Maxiter reached in {0} at {1}".format(self.name, self.t))

    return not_pushed_

push_set(vqip, tag='default')

Receives push set requests from arcs and passes request to query handler.

Parameters:

Name Type Description Default
vqip _type_

the VQIP push request

required
tag str

optional message to direct query_handler which pull function to call. Defaults to 'default'.

'default'

Returns:

Type Description
dict

VQIP not received from query_handler

Examples:

water_not_pushed = my_node.push_set(wastewater_vqip)

Source code in wsimod/nodes/nodes.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def push_set(self, vqip, tag="default"):
    """Receives push set requests from arcs and passes request to query handler.

    Args:
        vqip (_type_): the VQIP push request
        tag (str, optional): optional message to direct query_handler which pull
            function to call. Defaults to 'default'.

    Returns:
        (dict): VQIP not received from query_handler

    Examples:
        water_not_pushed = my_node.push_set(wastewater_vqip)
    """
    return self.query_handler(self.push_set_handler, vqip, tag)

push_set_deny(vqip)

Responds that no water is available to push in a request.

Parameters:

Name Type Description Default
vqip dict

A VQIP amount of water to push

required

Returns:

Name Type Description
vqip dict

Returns the request indicating no water was pushed

Source code in wsimod/nodes/nodes.py
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
def push_set_deny(self, vqip):
    """Responds that no water is available to push in a request.

    Args:
        vqip (dict): A VQIP amount of water to push

    Returns:
        vqip (dict): Returns the request indicating no water was pushed

    Raises:
        Message when called, since it would usually occur if a model is
        improperly connected
    """
    print("Attempted push set to deny")
    return vqip

query_handler(handler, ip, tag)

Sends all push/pull requests/checks using the handler (i.e., ensures the correct function is used that lines up with 'tag').

Parameters:

Name Type Description Default
handler dict

contains all push/pull requests for various tags

required
ip vqip

the vqip request

required
tag str

describes what type of push/pull request should be called

required

Returns:

Type Description
dict

the VQIP reply from push/pull request

Source code in wsimod/nodes/nodes.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
def query_handler(self, handler, ip, tag):
    """Sends all push/pull requests/checks using the handler (i.e., ensures the
    correct function is used that lines up with 'tag').

    Args:
        handler (dict): contains all push/pull requests for various tags
        ip (vqip): the vqip request
        tag (str): describes what type of push/pull request should be called

    Returns:
        (dict): the VQIP reply from push/pull request

    Raises:
        Message if no functions are defined for tag and if request/check
        function fails
    """
    try:
        return handler[tag](ip)
    except Exception:
        if tag not in handler.keys():
            print("No functions defined for " + tag)
            return handler[tag](ip)
        else:
            print("Some other error")
            return handler[tag](ip)

reinit()

Empty function to be written if reinitialisation capability is added.

Source code in wsimod/nodes/nodes.py
644
645
646
def reinit(self):
    """Empty function to be written if reinitialisation capability is added."""
    pass

total_in()

Sum flow and pollutant amounts entering a node via in_arcs.

Returns:

Name Type Description
in_ dict

Summed VQIP of in_arcs

Examples:

>>> node_inflow = my_node.total_in()
Source code in wsimod/nodes/nodes.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def total_in(self):
    """Sum flow and pollutant amounts entering a node via in_arcs.

    Returns:
        in_ (dict): Summed VQIP of in_arcs

    Examples:
        >>> node_inflow = my_node.total_in()
    """
    in_ = self.empty_vqip()
    for arc in self.in_arcs.values():
        in_ = self.sum_vqip(in_, arc.vqip_out)

    return in_

total_out()

Sum flow and pollutant amounts leaving a node via out_arcs.

Returns:

Name Type Description
out_ dict

Summed VQIP of out_arcs

Examples:

>>> node_outflow = my_node.total_out()
Source code in wsimod/nodes/nodes.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def total_out(self):
    """Sum flow and pollutant amounts leaving a node via out_arcs.

    Returns:
        out_ (dict): Summed VQIP of out_arcs

    Examples:
        >>> node_outflow = my_node.total_out()
    """
    out_ = self.empty_vqip()
    for arc in self.out_arcs.values():
        out_ = self.sum_vqip(out_, arc.vqip_in)

    return out_