Coverage for wsimod\nodes\ 22%
371 statements
« prev ^ index » next v7.6.1, created at 2024-10-30 14:52 +0000
« prev ^ index » next v7.6.1, created at 2024-10-30 14:52 +0000
1# -*- coding: utf-8 -*-
2"""Created on Mon Nov 15 14:20:36 2021.
4@author: bdobson Converted to totals on 2022-05-03
6import warnings
7from math import exp
8from typing import Any, Dict
10from wsimod.core import constants
11from wsimod.nodes.nodes import Node
12from wsimod.nodes.tanks import DecayQueueTank, DecayTank, QueueTank, Tank
15class Storage(Node):
16 """"""
18 def __init__(
19 self,
20 name,
21 capacity=0,
22 area=0,
23 datum=0,
24 decays=None,
25 initial_storage=0,
26 **kwargs,
27 ):
28 """A Node wrapper for a Tank or DecayTank.
30 Args:
31 name (str): node name capacity (float, optional): Tank capacity (see
32 Defaults to 0. area (float, optional): Tank area (see
33 Defaults to 0. datum (float, optional): Tank datum (see
34 Defaults to 0. decays (dict, optional): Tank decays if
35 needed, (see Defaults to None. initial_storage (float
36 or dict, optional): Initial storage (see Defaults to 0.
38 Functions intended to call in orchestration:
39 distribute (optional, depends on subclass)
40 """
41 # Set parameters
42 self.initial_storage = initial_storage
43 self.capacity = capacity
44 self.area = area
45 self.datum = datum
46 self.decays = decays
47 super().__init__(name, **kwargs)
49 # Create tank
50 if "initial_storage" not in dir(self):
51 self.initial_storage = self.empty_vqip()
53 if self.decays is None:
54 self.tank = Tank(
55 capacity=self.capacity,
56 area=self.area,
57 datum=self.datum,
58 initial_storage=self.initial_storage,
59 )
60 else:
61 self.tank = DecayTank(
62 capacity=self.capacity,
63 area=self.area,
64 datum=self.datum,
65 initial_storage=self.initial_storage,
66 decays=self.decays,
67 parent=self,
68 )
69 # Update handlers
70 self.push_set_handler["default"] = self.push_set_storage
71 self.push_check_handler["default"] = self.tank.get_excess
72 self.pull_set_handler["default"] = lambda vol: self.tank.pull_storage(vol)
73 self.pull_check_handler["default"] = self.tank.get_avail
75 # Mass balance
76 self.mass_balance_ds.append(lambda: self.tank.ds())
78 def apply_overrides(self, overrides=Dict[str, Any]):
79 """Override parameters.
81 Enables a user to override any of the following parameters:
82 capacity, area, datum, decays.
84 Args:
85 overrides (Dict[str, Any]): Dict describing which parameters should
86 be overridden (keys) and new values (values). Defaults to {}.
87 """
88 # not using pop as these items need to stay
89 # in the overrides to be fed into the tank overrides
90 if "capacity" in overrides.keys():
91 self.capacity = overrides["capacity"]
92 if "area" in overrides.keys():
93 self.area = overrides["area"]
94 if "datum" in overrides.keys():
95 self.datum = overrides["datum"]
96 if "decays" in overrides.keys():
97 if self.decays is None:
98 raise ValueError("Attempting to override decays on a node initialised without decays")
99 self.decays.update(overrides["decays"])
100 # apply tank overrides
101 self.tank.apply_overrides(overrides)
102 super().apply_overrides(overrides)
104 def push_set_storage(self, vqip):
105 """A node wrapper for the tank push_storage.
107 Args:
108 vqip (dict): A VQIP amount to push to the tank
110 Returns:
111 reply (dict): A VQIP amount that was not successfully pushed
112 """
113 # Update tank
114 reply = self.tank.push_storage(vqip)
116 return reply
118 def distribute(self):
119 """Optional function that discharges all tank storage with push_distributed."""
120 # Distribute any active storage
121 storage = self.tank.pull_storage(self.tank.get_avail())
122 retained = self.push_distributed(storage)
123 _ = self.tank.push_storage(retained, force=True)
124 if retained["volume"] > constants.FLOAT_ACCURACY:
125 print("Storage unable to push")
127 def get_percent(self):
128 """Function that returns the volume in the storage tank expressed as a percent
129 of capacity."""
130 return["volume"] / self.tank.capacity
132 def end_timestep(self):
133 """Update tank states."""
134 self.tank.end_timestep()
136 def reinit(self):
137 """Call tank reinit."""
138 # TODO Automate this better
139 self.tank.reinit()
140["volume"] = self.initial_storage
141 self.tank.storage_["volume"] = self.initial_storage
144class Groundwater(Storage):
145 """"""
147 def __init__(
148 self,
149 residence_time=200,
150 infiltration_threshold=1,
151 infiltration_pct=0,
152 data_input_dict={},
153 **kwargs,
154 ):
155 # TODO why isn't this using a ResidenceTank?
156 """A storage with a residence time for groundwater. Can also infiltrate to
157 sewers.
159 Args:
160 residence_time (float, optional): Residence time (see
161 ResidenceTank). Defaults to 200.
162 infiltration_threshold (float, optional): Proportion of storage capacity
163 that must be exceeded to generate infiltration. Defaults to 1.
164 infiltration_pct (float, optional): Proportion of storage above the
165 threshold that is square rooted and infiltrated. Defaults to 0.
166 data_input_dict (dict, optional): Dictionary of data inputs relevant for
167 the node (though I don't think it is used). Defaults to {}.
169 Functions intended to call in orchestration:
170 infiltrate (before sewers are discharged)
172 distribute
174 Key assumptions:
175 - Conceptualises groundwater as a tank.
176 - Baseflow is generated following a residence-time method.
177 - Baseflow is sent to ``, `` or
178 `` nodes.
179 - Infiltration to `` nodes occurs when the storage
180 in the tank is greater than a specified threshold, at a rate
181 proportional to the sqrt of volume above the threshold. (Note, this
182 behaviour is __not validated__ and a high uncertainty process in
183 general)
184 - If `decays` are provided to model water quality transformations,
185 see ``.
187 Input data and parameter requirements:
188 - Groundwater tank `capacity`, `area`, and `datum`.
189 _Units_: cubic metres, squared metres, metres
190 - Infiltration behaviour determined by an `infiltration_threshold`
191 and `infiltration_pct`. _Units_: proportion of capacity
192 - Optional dictionary of decays with pollutants as keys and decay
193 parameters (a constant and a temperature sensitivity exponent) as
194 values. _Units_: -
195 """
196 self.residence_time = residence_time
197 self.infiltration_threshold = infiltration_threshold
198 self.infiltration_pct = infiltration_pct
199 # TODO not used data_input
200 self.data_input_dict = data_input_dict
201 super().__init__(**kwargs)
203 def apply_overrides(self, overrides=Dict[str, Any]):
204 """Override parameters.
206 Enables a user to override any of the following parameters:
207 residence_time, infiltration_threshold, infiltration_pct.
209 Args:
210 overrides (Dict[str, Any]): Dict describing which parameters should
211 be overridden (keys) and new values (values). Defaults to {}.
212 """
213 self.residence_time = overrides.pop("residence_time", self.residence_time)
214 self.infiltration_threshold = overrides.pop(
215 "infiltration_threshold", self.infiltration_threshold
216 )
217 self.infiltration_pct = overrides.pop("infiltration_pct", self.infiltration_pct)
218 super().apply_overrides(overrides)
220 def distribute(self):
221 """Calculate outflow with residence time and send to Nodes or Rivers."""
222 avail = self.tank.get_avail()["volume"] / self.residence_time
223 to_send = self.tank.pull_storage({"volume": avail})
224 retained = self.push_distributed(to_send, of_type=["Node", "River", "Waste"])
225 _ = self.tank.push_storage(retained, force=True)
226 if retained["volume"] > constants.FLOAT_ACCURACY:
227 print("Storage unable to push")
229 def infiltrate(self):
230 """Calculate amount of water available for infiltration and send to sewers."""
231 # Calculate infiltration
232 avail = self.tank.get_avail()["volume"]
233 avail = max(avail - self.tank.capacity * self.infiltration_threshold, 0)
234 avail = (avail * self.infiltration_pct) ** 0.5
236 # Push to sewers
237 to_send = self.tank.pull_storage({"volume": avail})
238 retained = self.push_distributed(to_send, of_type="Sewer")
239 _ = self.tank.push_storage(retained, force=True)
240 # Any not sent is left in tank
241 if retained["volume"] > constants.FLOAT_ACCURACY:
242 # print('unable to infiltrate')
243 pass
246class QueueGroundwater(Storage):
247 """"""
249 # TODO - no infiltration as yet
250 def __init__(self, timearea={0: 1}, data_input_dict={}, **kwargs):
251 """Alternate formulation of Groundwater that uses a timearea property to enable
252 more nonlinear time behaviour of baseflow routing. Uses the QueueTank or
253 DecayQueueTank (see subclassses).
255 NOTE: abstraction behaviour from this kind of node need careful checking
257 Args:
258 timearea (dict, optional): Time area diagram that enables flows to
259 take a range of different durations to 'traverse' the tank. The keys of
260 the dict are the number of timesteps while the values are the proportion
261 of flow. E.g., {0 : 0.7, 1 : 0.3} means 70% of flow takes 0 timesteps
262 and 30% takes 1 timesteps. Defaults to {0 : 1}.
263 data_input_dict (dict, optional): Dictionary of data inputs relevant for
264 the node (though I don't think it is used). Defaults to {}.
266 Functions intended to call in orchestration:
267 distribute
269 Key assumptions:
270 - Conceptualises groundwater as a tank.
271 - Baseflow is generated following a timearea method.
272 - Baseflow is sent to ``, `` or
273 `` nodes.
274 - No infiltration to sewers is modelled.
275 - If `decays` are provided to model water quality transformations,
276 see ``.
278 Input data and parameter requirements:
279 - Groundwater tank `capacity`, `area`, and `datum`.
280 _Units_: cubic metres, squared metres, metres
281 - `timearea` is a dictionary containing the timearea diagram.
282 _Units_: duration of flow (in timesteps) and proportion of flow
283 - Optional dictionary of decays with pollutants as keys and decay
284 parameters (a constant and a temperature sensitivity exponent) as
285 values. _Units_: -
286 """
287 self.timearea = timearea
288 # TODO not used
289 self.data_input_dict = data_input_dict
290 super().__init__(**kwargs)
291 # Label as Groundwater class so that other nodes treat it the same
292 self.__class__.__name__ = "Groundwater"
293 # Update handlers
294 self.push_set_handler["default"] = self.push_set_timearea
295 self.pull_set_handler["default"] = self.pull_set_active
296 self.pull_check_handler["default"] = self.pull_check_active
297 # Enable decay
298 if self.decays is None:
299 self.tank = QueueTank(
300 capacity=self.capacity,
301 area=self.area,
302 datum=self.datum,
303 initial_storage=self.initial_storage,
304 )
305 else:
306 self.tank = DecayQueueTank(
307 capacity=self.capacity,
308 area=self.area,
309 datum=self.datum,
310 decays=self.decays,
311 parent=self,
312 initial_storage=self.initial_storage,
313 )
315 def apply_overrides(self, overrides=Dict[str, Any]):
316 """Override parameters.
318 Enables a user to override any of the following parameters:
319 timearea.
321 Args:
322 overrides (Dict[str, Any]): Dict describing which parameters should
323 be overridden (keys) and new values (values). Defaults to {}.
324 """
325 self.timearea = overrides.pop("timearea", self.timearea)
326 super().apply_overrides(overrides)
328 def push_set_timearea(self, vqip):
329 """Push setting that enables timearea behaviour, (see __init__ for
330 description).Used to receive flow that is assumed to occur widely across some
331 kind of catchment.
333 Args:
334 vqip (dict): A VQIP that has been pushed
336 Returns:
337 reply (dict): A VQIP amount that was not successfuly receivesd
338 """
339 reply = self.empty_vqip()
340 # Iterate over timearea diagram TODO timearea diagram behaviour be generalised
341 # across nodes
342 for time, normalised in self.timearea.items():
343 vqip_ = self.v_change_vqip(vqip, vqip["volume"] * normalised)
344 reply_ = self.tank.push_storage(vqip_, time=time)
345 reply = self.sum_vqip(reply, reply_)
346 return reply
348 def distribute(self):
349 """Update internal arc, push active_storage onwards, update tank."""
350 _ = self.tank.internal_arc.update_queue(direction="push")
352 remaining = self.push_distributed(self.tank.active_storage)
354 if remaining["volume"] > constants.FLOAT_ACCURACY:
355 print("Groundwater couldnt push all")
357 # Update tank
358 sent = self.tank.active_storage["volume"] - remaining["volume"]
359 sent = self.v_change_vqip(self.tank.active_storage, sent)
360 reply = self.tank.pull_storage(sent)
361 if (reply["volume"] - sent["volume"]) > constants.FLOAT_ACCURACY:
362 print("Miscalculated tank storage in discharge")
364 def infiltrate(self):
365 """"""
366 pass
368 def pull_check_active(self, vqip=None):
369 """A pull check that returns the active storage.
371 Args:
372 vqip (dict, optional): A VQIP that can be used to limit the volume in
373 the return value (only volume key is used). Defaults to None.
375 Returns:
376 (dict): A VQIP amount that is available to pull
377 """
378 if vqip is None:
379 return self.tank.active_storage
380 else:
381 reply = min(vqip["volume"], self.tank.active_storage["volume"])
382 return self.v_change_vqip(self.tank.active_storage, reply)
384 def pull_set_active(self, vqip):
385 # TODO - this is quite weird behaviour, and inconsistent with pull_check_active
386 """Pull proportionately from both the active storage and the queue. Adjudging
387 groundwater abstractions to not be particularly sensitive to the within
388 catchment travel time.
390 Args:
391 vqip (dict): A VQIP amount to be pulled (only volume key is used)
393 Returns:
394 pulled (dict): A VQIP amount that was successfully pulled
395 """
396 # Calculate actual pull
397 total_storage =["volume"]
398 total_pull = min(["volume"], vqip["volume"])
400 if total_pull < constants.FLOAT_ACCURACY:
401 return self.empty_vqip()
402 else:
403 # Track total pull in pulled
404 pulled = self.empty_vqip()
405 # Iterate over queue
406 if isinstance(self.tank.internal_arc.queue, dict):
407 for t, v in self.tank.internal_arc.queue.items():
408 # Pull proportionately
409 t_pulled = self.v_change_vqip(
410 self.tank.internal_arc.queue[t],
411 v["volume"] * total_pull / total_storage,
412 )
413 # Reduce queue VQIPs
414 self.tank.internal_arc.queue[t] = self.extract_vqip(
415 self.tank.internal_arc.queue[t], t_pulled
416 )
417 # Track pull
418 pulled = self.sum_vqip(pulled, t_pulled)
419 # Pull also from active storage
420 a_pulled = self.v_change_vqip(
421 self.tank.active_storage,
422 self.tank.active_storage["volume"] * total_pull / total_storage,
423 )
424 self.tank.active_storage = self.extract_vqip(
425 self.tank.active_storage, a_pulled
426 )
427 pulled = self.sum_vqip(pulled, a_pulled)
429 # Recalculate storage
430 = self.extract_vqip(, pulled)
431 return pulled
432 # elif isinstance(self.tank.internal_arc.queue, list): for req in
433 # self.tank.internal_arc.queue: t_pulled = req['vqtip']['volume'] *
434 # total_pull / total_storage req['vqtip'] =
435 # self.v_change_vqip(req['vqtip'], req['vqtip']['volume'] -
436 # t_pulled) pulled += t_pulled a_pulled =
437 # self.tank.active_storage['volume'] * total_pull / total_storage
438 # self.tank.active_storage =
439 # self.v_change_vqip(self.tank.active_storage,
440 # self.tank.active_storage['volume'] - a_pulled) pulled += a_pulled
442 # #Recalculate storage - doing this differently causes numerical errors
443 # new_v = sum([x['vqtip']['volume'] for x in
444 # self.tank.internal_arc.queue])+ self.tank.active_storage['volume']
445 # = self.v_change_vqip(, new_v)
447 # return self.v_change_vqip(, pulled)
450class River(Storage):
451 """"""
453 # TODO non-day timestep
454 def __init__(
455 self,
456 depth=2,
457 length=200,
458 width=20,
459 velocity=0.2 * constants.M_S_TO_M_DT,
460 damp=0.1,
461 mrf=0,
462 **kwargs,
463 ):
464 """Node that contains extensive in-river biochemical processes.
466 Args:
467 depth (float, optional): River tank depth. Defaults to 2. length (float,
468 optional): River tank length. Defaults to 200. width (float, optional):
469 River tank width. Defaults to 20. velocity (float, optional): River velocity
470 (if someone wants to calculate
471 this on the fly that would also work). Defaults to
472 0.2*constants.M_S_TO_M_DT.
473 damp (float, optional): Flow delay and attentuation parameter. Defaults
474 to 0.1.
475 mrf (float, optional): Minimum required flow in river (volume per timestep),
476 can limit pulls made to the river. Defaults to 0.
478 Functions intended to call in orchestration:
479 distribute
481 Key assumptions:
482 - River is conceptualised as a water tank that receives flows from various
483 sources (e.g., runoffs from urban and rural land, baseflow from
484 groundwater), interacts with water infrastructure (e.g., abstraction for
485 irrigation and domestic supply, sewage and treated effluent discharge),
486 and discharges flows downstream. It has length and width as shape
487 parameters, average velocity to indicate flow speed and capacity to
488 indicate the maximum storage limit.
489 - Flows from different sources into rivers will fully mix. River tank is
490 assumed to
491 have delay and attenuation effects when generate outflows. These effects
492 are simulated based on the average velocity.
493 - In-river biochemical processes are simulated as sources/sinks of
494 nutrients
495 in the river tank, including - denitrification (for nitrogen) -
496 phytoplankton absorption/release (for nitrogen and phosphorus) -
497 macrophyte uptake (for nitrogen and phosphorus) These processes are
498 affected by river temperature.
500 Input data and parameter requirements:
501 - depth, length, width
502 _Units_: m
503 - velocity
504 _Units_: m/day
505 - damping coefficient
506 _Units_: -
507 - minimum required flow
508 _Units_: m3/day
509 """
510 # Set parameters
511 self.depth = depth
512 if depth != 2:
513 warnings.warn(
514 "warning: the depth parameter is unused by River nodes because it is \
515 intended for capacity to be unbounded. It may be removed in a future version."
516 )
517 self.length = length # [m]
518 self.width = width # [m]
519 self.velocity = velocity # [m/dt]
520 self.damp = damp # [>=0] flow delay and attenuation
521 self.mrf = mrf
522 area = length * width # [m2]
524 capacity = (
526 ) # TODO might be depth * area if flood indunation is going to be simulated
528 # Required in cases where 'area' conflicts with length*width
529 kwargs["area"] = area
530 # Required in cases where 'capacity' conflicts with depth*area
531 kwargs["capacity"] = capacity
533 super().__init__(**kwargs)
535 # TODO check units TODO Will a user want to change any of these? Wide variety of
536 # river parameters (from HYPE)
537 self.uptake_PNratio = 1 / 7.2 # [-] P:N during crop uptake
538 self.bulk_density = 1300 # [kg/m3] soil density
539 self.denpar_w = 0.0015 # 0.001, # [kg/m2/day] reference denitrification rate
540 # in water course
541 self.T_wdays = 5 # [days] weighting constant for river temperature calculation
542 # (similar to moving average period)
543 self.halfsatINwater = (
544 1.5 * constants.MG_L_TO_KG_M3
545 ) # [kg/m3] half saturation parameter for denitrification in river
546 self.T_10_days = [] # [degree C] average water temperature of 10 days
547 self.T_20_days = [] # [degree C] average water temperature of 20 days
548 self.TP_365_days = [] # [degree C] average water temperature of 20 days
549 self.hsatTP = 0.05 * constants.MG_L_TO_KG_M3 # [kg/m3]
550 self.limpppar = 0.1 * constants.MG_L_TO_KG_M3 # [kg/m3]
551 self.prodNpar = 0.001 # [kg N/m3/day] nitrogen production/mineralisation rate
552 self.prodPpar = (
553 0.0001 # [kg N/m3/day] phosphorus production/mineralisation rate
554 )
555 self.muptNpar = 0.001 # [kg/m2/day] nitrogen macrophyte uptake rate
556 self.muptPpar = 0.0001 # 0.01, # [kg/m2/day] phosphorus macrophyte uptake rate
558 self.max_temp_lag = 20
559 self.lagged_temperatures = []
561 self.max_phosphorus_lag = 365
562 self.lagged_total_phosphorus = []
564 self.din_components = ["ammonia", "nitrate"]
565 # TODO need a cleaner way to do this depending on whether e.g., nitrite is
566 # included
568 # Initialise paramters
569 self.current_depth = 0 # [m]
570 # self.river_temperature = 0 # [degree C] self.river_denitrification = 0 #
571 # [kg/day] self.macrophyte_uptake_N = 0 # [kg/day] self.macrophyte_uptake_P = 0
572 # # [kg/day] self.sediment_particulate_phosphorus_pool = 60000 # [kg]
573 # self.sediment_pool = 1000000 # [kg] self.benthos_source_sink = 0 # [kg/day]
574 # self.t_res = 0 # [day] self.outflow = self.empty_vqip()
576 # Update end_teimstep
577 self.end_timestep = self.end_timestep_
579 # Update handlers
580 self.push_set_handler["default"] = self.push_set_river
581 self.push_check_handler["default"] = self.push_check_accept
583 self.pull_check_handler["default"] = self.pull_check_river
584 self.pull_set_handler["default"] = self.pull_set_river
586 # TODO - RiparianBuffer
587 self.pull_check_handler[("RiparianBuffer", "volume")] = self.pull_check_fp
589 # Update mass balance
590 self.bio_in = self.empty_vqip()
591 self.bio_out = self.empty_vqip()
593 self.mass_balance_in.append(lambda: self.bio_in)
594 self.mass_balance_out.append(lambda: self.bio_out)
596 # TODO something like this might be needed if you want sewers backing up from river
597 # height... would need to incorporate expected river outflow def get_dt_excess(self,
598 # vqip = None): reply = self.empty_vqip() reply['volume'] =
599 # self.tank.get_excess()['volume'] + self.tank.get_avail()['volume'] *
600 # self.get_riverrc() if vqip is not None: reply['volume'] = min(vqip['volume'],
601 # reply['volume']) return reply
603 # def push_set_river(self, vqip): vqip_ = vqip.copy() vqip_ =
604 # self.v_change_vqip(vqip_, min(vqip_['volume'],
605 # self.get_dt_excess()['volume'])) _ = self.tank.push_storage(vqip_, force=True)
606 # return self.extract_vqip(vqip, vqip_)
608 def apply_overrides(self, overrides=Dict[str, Any]):
609 """Override parameters.
611 Enables a user to override any of the following parameters:
612 timearea.
614 Args:
615 overrides (Dict[str, Any]): Dict describing which parameters should
616 be overridden (keys) and new values (values). Defaults to {}.
617 """
618 overwrite_params = set(
619 [
620 "length",
621 "width",
622 "velocity",
623 "damp",
624 "mrf",
625 "uptake_PNratio",
626 "bulk_density",
627 "denpar_w",
628 "T_wdays",
629 "halfsatINwater",
630 "hsatTP",
631 "limpppar",
632 "prodNpar",
633 "prodPpar",
634 "muptNpar",
635 "muptPpar",
636 "max_temp_lag",
637 "max_phosphorus_lag",
638 ]
639 )
641 for param in overwrite_params.intersection(overrides.keys()):
642 setattr(self, param, overrides.pop(param))
644 if "area" in overrides.keys():
645 warnings.warn(
646 "WARNING: specifying area is depreciated in overrides \
647 for river, please specify length and width instead"
648 )
649 overrides["area"] = self.length * self.width
650 if "capacity" in overrides.keys():
651 warnings.warn(
652 "ERROR: specifying capacity is depreciated in overrides \
653 for river, it is always set as unbounded capacity"
654 )
655 overrides["capacity"] = constants.UNBOUNDED_CAPACITY
656 super().apply_overrides(overrides)
658 def pull_check_river(self, vqip=None):
659 """Check amount of water that can be pulled from river tank and upstream.
661 Args:
662 vqip (dict, optional): Maximum water required (only 'volume' is used)
664 Returns:
665 avail (dict): A VQIP amount that can be pulled
666 """
667 # Get storage
668 avail = self.tank.get_avail()
670 # Check incoming
671 upstream = self.get_connected(direction="pull", of_type=["River", "Node"])
672 avail["volume"] += upstream["avail"]
674 # convert mrf from volume/timestep to discrete value
675 mrf = self.mrf / self.get_riverrc()
677 # Apply mrf
678 avail_vol = max(avail["volume"] - mrf, 0)
679 if vqip is None:
680 avail = self.v_change_vqip(avail, avail_vol)
681 else:
682 avail = self.v_change_vqip(avail, min(avail_vol, vqip["volume"]))
684 return avail
686 def pull_set_river(self, vqip):
687 """Pull from river tank and upstream, acknowledging MRF with pull_check.
689 Args:
690 vqip (dict): A VQIP amount to pull (only volume key used)
692 Returns:
693 (dict): A VQIP amount that was pulled
694 """
695 # Calculate available pull
696 avail = self.pull_check_river(vqip)
698 # Take first from tank
699 pulled = self.tank.pull_storage(avail)
701 # Take remaining from upstream
702 to_pull = {"volume": avail["volume"] - pulled["volume"]}
703 pulled_ = self.pull_distributed(to_pull, of_type=["River", "Node"])
705 reply = self.sum_vqip(pulled, pulled_)
707 return reply
709 def push_set_river(self, vqip):
710 """Push to river tank, currently forced.
712 Args:
713 vqip (dict): A VQIP amount to push
715 Returns:
716 (dict): A VQIP amount that was not successfully received
717 """
718 _ = self.tank.push_storage(vqip, force=True)
719 return self.empty_vqip()
721 def update_depth(self):
722 """Recalculate depth."""
723 self.current_depth =["volume"] / self.area
725 def get_din_pool(self):
726 """Get total dissolved inorganic nitrogen from tank storage.
728 Returns:
729 (float): total din
730 """
731 return sum(
732 [[x] for x in self.din_components]
733 ) # TODO +['nitrite'] but nitrite might not be modelled...
734 # need some ways to address this
736 def biochemical_processes(self):
737 """Runs all biochemical processes and updates pollutant amounts.
739 Returns:
740 in_ (dict): A VQIP amount that represents total gain in pollutant amounts
741 out_ (dict): A VQIP amount that represents total loss in pollutant amounts
742 """
743 # TODO make more modular
744 self.update_depth()
746["temperature"] = (1 - 1 / self.T_wdays) *[
747 "temperature"
748 ] + (1 / self.T_wdays) * self.get_data_input("temperature")
750 # Update lagged temperatures
751 if len(self.lagged_temperatures) > self.max_temp_lag:
752 del self.lagged_temperatures[0]
753 self.lagged_temperatures.append(["temperature"])
755 # Update lagged total phosphorus
756 if len(self.lagged_total_phosphorus) > self.max_phosphorus_lag:
757 del self.lagged_total_phosphorus[0]
758 total_phosphorus = (
759["phosphate"] +["org-phosphorus"]
760 )
761 self.lagged_total_phosphorus.append(total_phosphorus)
763 # Check if any water
764 if["volume"] < constants.FLOAT_ACCURACY:
765 # Assume these only do something when there is water
766 return (self.empty_vqip(), self.empty_vqip())
768 if["temperature"] <= 0:
769 # Seems that these things are only active when above freezing
770 return (self.empty_vqip(), self.empty_vqip())
772 # Denitrification
773 tempfcn = 2 ** ((["temperature"] - 20) / 10)
774 if["temperature"] < 5:
775 tempfcn *=["temperature"] / 5
777 din = self.get_din_pool()
778 din_concentration = din /["volume"]
779 confcn = din_concentration / (
780 din_concentration + self.halfsatINwater
781 ) # [kg/m3]
782 denitri_water = (
783 self.denpar_w * self.area * tempfcn * confcn
784 ) # [kg/day] #TODO convert to per DT
786 river_denitrification = min(
787 denitri_water, 0.5 * din
788 ) # [kg/day] max 50% kan be denitrified
789 din_ = din - river_denitrification # [kg]
791 # Update mass balance
792 in_ = self.empty_vqip()
793 out_ = self.empty_vqip()
794 if din > 0:
795 for pol in self.din_components:
796 # denitrification
797 loss = (din - din_) / din *[pol]
798 out_[pol] += loss
799[pol] -= loss
801 din = self.get_din_pool()
803 # Calculate moving averages TODO generalise
804 temp_10_day = sum(self.lagged_temperatures[-10:]) / 10
805 temp_20_day = sum(self.lagged_temperatures[-20:]) / 20
806 total_phos_365_day = sum(self.lagged_total_phosphorus) / self.max_phosphorus_lag
808 # Calculate coefficients
809 tempfcn = (
810 (["temperature"]) / 20 * (temp_10_day - temp_20_day) / 5
811 )
812 if (total_phos_365_day - self.limpppar + self.hsatTP) > 0:
813 totalphosfcn = (total_phos_365_day - self.limpppar) / (
814 total_phos_365_day - self.limpppar + self.hsatTP
815 )
816 else:
817 totalphosfcn = 0
819 # Mineralisation/production TODO this feels like it could be much tidier
820 minprodN = (
821 self.prodNpar * totalphosfcn * tempfcn * self.area * self.current_depth
822 ) # [kg N/day]
823 minprodP = (
824 self.prodPpar
825 * totalphosfcn
826 * tempfcn
827 * self.area
828 * self.current_depth
829 * self.uptake_PNratio
830 ) # [kg N/day]
831 if minprodN > 0:
832 # production (inorg -> org)
833 minprodN = min(
834 0.5 * din, minprodN
835 ) # only half pool can be used for production
836 minprodP = min(
837 0.5 *["phosphate"], minprodP
838 ) # only half pool can be used for production
840 # Update mass balance
841 out_["phosphate"] = minprodP
842["phosphate"] -= minprodP
843 in_["org-phosphorus"] = minprodP
844["org-phosphorus"] += minprodP
845 if din > 0:
846 for pol in self.din_components:
847 loss = minprodN *[pol] / din
848 out_[pol] += loss
849[pol] -= loss
851 in_["org-nitrogen"] = minprodN
852["org-nitrogen"] += minprodN
854 else:
855 # mineralisation (org -> inorg)
856 minprodN = min(0.5 *["org-nitrogen"], -minprodN)
857 minprodP = min(0.5 *["org-phosphorus"], -minprodP)
859 # Update mass balance
860 in_["phosphate"] = minprodP
861["phosphate"] += minprodP
862 out_["org-phosphorus"] = minprodP
863["org-phosphorus"] -= minprodP
864 if din > 0:
865 for pol in self.din_components:
866 gain = minprodN *[pol] / din
867 in_[pol] += gain
868[pol] += gain
870 out_["org-nitrogen"] = minprodN
871["org-nitrogen"] -= minprodN
873 din = self.get_din_pool()
875 # macrophyte uptake temperature dependence factor
876 tempfcn1 = (max(0,["temperature"]) / 20) ** 0.3
877 tempfcn2 = (["temperature"] - temp_20_day) / 5
878 tempfcn = max(0, tempfcn1 * tempfcn2)
880 macrouptN = self.muptNpar * tempfcn * self.area # [kg/day]
881 macrophyte_uptake_N = min(0.5 * din, macrouptN)
882 if din > 0:
883 for pol in self.din_components:
884 loss = macrophyte_uptake_N *[pol] / din
885 out_[pol] += loss
886[pol] -= loss
888 macrouptP = (
889 self.muptPpar * tempfcn * max(0, totalphosfcn) * self.area
890 ) # [kg/day]
891 macrophyte_uptake_P = min(0.5 *["phosphate"], macrouptP)
892 out_["phosphate"] += macrophyte_uptake_P
893["phosphate"] -= macrophyte_uptake_P
895 # TODO source/sink for benthos sediment P suspension/resuspension
896 return in_, out_
898 def get_riverrc(self):
899 """Get river outflow coefficient (i.e., how much water leaves the tank in this
900 timestep).
902 Returns:
903 riverrc (float): outflow coeffficient
904 """
905 # Calculate travel time
906 total_time = self.length / self.velocity
907 # Apply damp
908 kt = self.damp * total_time # [day]
909 if kt != 0:
910 riverrc = 1 - kt + kt * exp(-1 / kt) # [-]
911 else:
912 riverrc = 1
913 return riverrc
915 def calculate_discharge(self):
916 """"""
917 if "nitrate" in constants.POLLUTANTS:
918 # TODO clumsy Run biochemical processes
919 in_, out_ = self.biochemical_processes()
920 # Mass balance
921 self.bio_in = in_
922 self.bio_out = out_
924 def distribute(self):
925 """Run biochemical processes, track mass balance, and distribute water."""
926 # self.calculate_discharge() Get outflow
927 outflow = self.tank.pull_storage(
928 {"volume":["volume"] * self.get_riverrc()}
929 )
930 # Distribute outflow
931 reply = self.push_distributed(outflow, of_type=["River", "Node", "Waste"])
932 _ = self.tank.push_storage(reply, force=True)
933 if reply["volume"] > constants.FLOAT_ACCURACY:
934 print("river cant push: {0}".format(reply["volume"]))
936 def pull_check_fp(self, vqip=None):
937 """
939 Args:
940 vqip:
942 Returns:
944 """
945 # TODO Pull checking for riparian buffer, needs updating update river depth
946 self.update_depth()
947 return self.current_depth, self.area, self.width,
949 def end_timestep_(self):
950 """Update state variables."""
951 self.tank.end_timestep()
952 self.bio_in = self.empty_vqip()
953 self.bio_out = self.empty_vqip()
956class Reservoir(Storage):
957 """"""
959 def __init__(self, **kwargs):
960 """Storage node that makes abstractions by calling pull_distributed.
962 Functions intended to call in orchestration:
963 make_abstractions (before any river routing)
965 Key assumptions:
966 - Conceptualised as a `Tank`.
967 - Recharged only via pumped abstractions.
968 - Evaporation/precipitation onto surface area currently ignored.
969 - If `decays` are provided to model water quality transformations,
970 see ``.
972 Input data and parameter requirements:
973 - Tank `capacity`, `area`, and `datum`.
974 _Units_: cubic metres, squared metres, metres
975 - Optional dictionary of decays with pollutants as keys and decay
976 parameters (a constant and a temperature sensitivity exponent) as
977 values. _Units_: -
978 """
979 super().__init__(**kwargs)
981 def make_abstractions(self):
982 """Pulls water and updates tanks."""
983 reply = self.pull_distributed(self.tank.get_excess())
984 spill = self.tank.push_storage(reply)
985 _ = self.tank.push_storage(spill, force=True)
986 if spill["volume"] > constants.FLOAT_ACCURACY:
987 print("Spill at reservoir by {0}".format(spill["volume"]))
990class RiverReservoir(Reservoir):
991 """"""
993 def __init__(self, environmental_flow=0, **kwargs):
994 """A reservoir with a natural river inflow, includes an environmental downstream
995 flow to satisfy.
997 Args:
998 environmental_flow (float, optional): Downstream environmental flow.
999 Defaults to 0.
1001 Functions intended to call in orchestration:
1002 make_abstractions (if any)
1004 satisfy_environmental (before river routing.. possibly before
1005 downstream river abstractions depending on licence)
1007 Key assumptions:
1008 - Conceptualised as a `Tank`.
1009 - Recharged via pumped abstractions and receives water from
1010 inflowing arcs.
1011 - Reservoir aims to satisfy a static `environmental_flow`.
1012 - If tank capacity is exceeded, reservoir spills downstream
1013 towards ``, `` or `` nodes.
1014 Spill counts towards `environmental_flow`.
1015 - Evaporation/precipitation onto surface area currently ignored.
1016 - Currently, if a reservoir satisfies a pull from a downstream
1017 node, it does __not__ count towards `environmental_flow`.
1018 - If `decays` are provided to model water quality transformations,
1019 see ``.
1021 Input data and parameter requirements:
1022 - Tank `capacity`, `area`, and `datum`.
1023 _Units_: cubic metres, squared metres, metres
1024 - `environmental_flow`
1025 _Units_: cubic metres/timestep
1026 - Optional dictionary of decays with pollutants as keys and decay
1027 parameters (a constant and a temperature sensitivity exponent) as
1028 values. _Units_: -
1029 """
1030 # Parameters
1031 self.environmental_flow = environmental_flow
1032 super().__init__(**kwargs)
1034 # State variables
1035 self.total_environmental_satisfied = 0
1037 self.push_set_handler["default"] = self.push_set_river_reservoir
1038 self.push_check_handler["default"] = self.push_check_river_reservoir
1039 self.end_timestep = self.end_timestep_
1041 self.__class__.__name__ = "Reservoir"
1043 def apply_overrides(self, overrides=Dict[str, Any]):
1044 """Override parameters.
1046 Enables a user to override any of the following parameters:
1047 environmental_flow.
1049 Args:
1050 overrides (Dict[str, Any]): Dict describing which parameters should
1051 be overridden (keys) and new values (values). Defaults to {}.
1052 """
1053 self.environmental_flow = overrides.pop(
1054 "environmental_flow", self.environmental_flow
1055 )
1056 super().apply_overrides(overrides)
1058 def push_set_river_reservoir(self, vqip):
1059 """Receive water.
1061 Args:
1062 vqip (dict): A VQIP amount to be received
1064 Returns:
1065 reply (dict): A VQIP amount that was not successfully received
1066 """
1067 # Apply normal reservoir storage We do this under the assumption that spill is
1068 # mixed in with the reservoir If the reservoir can't spill everything you'll get
1069 # some weird numbers in reply, but if your reservoir can't spill as much as you
1070 # like then you should probably be pushing the right amount through
1071 # push_check_river_reservoir Some cunning could probably avoid this by checking
1072 # vqip, but this is a serious edge case
1073 _ = self.tank.push_storage(vqip, force=True)
1074 spill = self.tank.pull_ponded()
1076 # Send spill downstream
1077 reply = self.push_distributed(spill, of_type=["Node", "River", "Waste"])
1079 # Use spill to satisfy downstream flow
1080 self.total_environmental_satisfied += spill["volume"] - reply["volume"]
1082 return reply
1084 def push_check_river_reservoir(self, vqip=None):
1085 """A push check to receive water, assumes spill may occur and checks downstream
1086 capacity.
1088 Args:
1089 vqip (dict, optional): A VQIP that can be used to limit the volume in
1090 the return value (only volume key is used). Defaults to None.
1092 Returns:
1093 excess (dict): A VQIP amount of water that cannot be received
1094 """
1095 # Check downstream capacity (i.e., that would be spilled)
1096 downstream_availability = self.get_connected(
1097 direction="push", of_type=["Node", "River", "Waste"]
1098 )["avail"]
1099 # Check excess capacity in the reservoir
1100 excess = self.tank.get_excess()
1101 # Combine excess and downstream in response
1102 new_v = excess["volume"] + downstream_availability
1103 if vqip is not None:
1104 new_v = min(vqip["volume"], new_v)
1105 # Update to vqip
1106 excess = self.v_change_vqip(excess, new_v)
1108 return excess
1110 def satisfy_environmental(self):
1111 """Satisfy environmental flow downstream."""
1112 # Calculate how much environmental flow is yet to satisfy #(some may have been
1113 # already if pull-and-take abstractions have taken place)
1114 to_satisfy = max(
1115 self.environmental_flow - self.total_environmental_satisfied, 0
1116 )
1117 # Pull from tank
1118 environmental = self.tank.pull_storage({"volume": to_satisfy})
1119 # Send downstream
1120 reply = self.push_distributed(environmental)
1121 _ = self.tank.push_storage(reply, force=True)
1122 if reply["volume"] > constants.FLOAT_ACCURACY:
1123 print("warning: environmental not able to push")
1125 # Update satisfaction
1126 self.total_environmental_satisfied += environmental["volume"]
1128 def end_timestep_(self):
1129 """Udpate state varibles."""
1130 self.tank.end_timestep()
1131 self.total_environmental_satisfied = 0