Coverage for wsimod/nodes/storage.py: 13%
340 statements
« prev ^ index » next coverage.py v7.3.2, created at 2024-01-11 16:39 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2024-01-11 16:39 +0000
1# -*- coding: utf-8 -*-
2"""Created on Mon Nov 15 14:20:36 2021.
4@author: bdobson Converted to totals on 2022-05-03
5"""
6from math import exp
8from wsimod.core import constants
9from wsimod.nodes.nodes import DecayQueueTank, DecayTank, Node, QueueTank, Tank
12class Storage(Node):
13 """"""
15 def __init__(
16 self,
17 name,
18 capacity=0,
19 area=0,
20 datum=0,
21 decays=None,
22 initial_storage=0,
23 **kwargs,
24 ):
25 """A Node wrapper for a Tank or DecayTank.
27 Args:
28 name (str): node name capacity (float, optional): Tank capacity (see
29 nodes.py/Tank). Defaults to 0. area (float, optional): Tank area (see
30 nodes.py/Tank). Defaults to 0. datum (float, optional): Tank datum (see
31 nodes.py/Tank). Defaults to 0. decays (dict, optional): Tank decays if
32 needed, (see nodes.py/DecayTank). Defaults to None. initial_storage (float
33 or dict, optional): Initial storage (see nodes.py/Tank). Defaults to 0.
35 Functions intended to call in orchestration:
36 distribute (optional, depends on subclass)
37 """
38 # Set parameters
39 self.initial_storage = initial_storage
40 self.capacity = capacity
41 self.area = area
42 self.datum = datum
43 self.decays = decays
44 super().__init__(name, **kwargs)
46 # Create tank
47 if "initial_storage" not in dir(self):
48 self.initial_storage = self.empty_vqip()
50 if self.decays is None:
51 self.tank = Tank(
52 capacity=self.capacity,
53 area=self.area,
54 datum=self.datum,
55 initial_storage=self.initial_storage,
56 )
57 else:
58 self.tank = DecayTank(
59 capacity=self.capacity,
60 area=self.area,
61 datum=self.datum,
62 initial_storage=self.initial_storage,
63 decays=self.decays,
64 parent=self,
65 )
66 # Update handlers
67 self.push_set_handler["default"] = self.push_set_storage
68 self.push_check_handler["default"] = self.tank.get_excess
69 self.pull_set_handler["default"] = lambda vol: self.tank.pull_storage(vol)
70 self.pull_check_handler["default"] = self.tank.get_avail
72 # Mass balance
73 self.mass_balance_ds.append(lambda: self.tank.ds())
75 def push_set_storage(self, vqip):
76 """A node wrapper for the tank push_storage.
78 Args:
79 vqip (dict): A VQIP amount to push to the tank
81 Returns:
82 reply (dict): A VQIP amount that was not successfully pushed
83 """
84 # Update tank
85 reply = self.tank.push_storage(vqip)
87 return reply
89 def distribute(self):
90 """Optional function that discharges all tank storage with push_distributed."""
91 # Distribute any active storage
92 storage = self.tank.pull_storage(self.tank.get_avail())
93 retained = self.push_distributed(storage)
94 _ = self.tank.push_storage(retained, force=True)
95 if retained["volume"] > constants.FLOAT_ACCURACY:
96 print("Storage unable to push")
98 def get_percent(self):
99 """Function that returns the volume in the storage tank expressed as a percent
100 of capacity."""
101 return self.tank.storage["volume"] / self.tank.capacity
103 def end_timestep(self):
104 """Update tank states."""
105 self.tank.end_timestep()
107 def reinit(self):
108 """Call tank reinit."""
109 # TODO Automate this better
110 self.tank.reinit()
111 self.tank.storage["volume"] = self.initial_storage
112 self.tank.storage_["volume"] = self.initial_storage
115class Groundwater(Storage):
116 """"""
118 def __init__(
119 self,
120 residence_time=200,
121 infiltration_threshold=1,
122 infiltration_pct=0,
123 data_input_dict={},
124 **kwargs,
125 ):
126 # TODO why isn't this using a ResidenceTank?
127 """A storage with a residence time for groundwater. Can also infiltrate to
128 sewers.
130 Args:
131 residence_time (float, optional): Residence time (see nodes.py/
132 ResidenceTank). Defaults to 200.
133 infiltration_threshold (float, optional): Proportion of storage capacity
134 that must be exceeded to generate infiltration. Defaults to 1.
135 infiltration_pct (float, optional): Proportion of storage above the
136 threshold that is square rooted and infiltrated. Defaults to 0.
137 data_input_dict (dict, optional): Dictionary of data inputs relevant for
138 the node (though I don't think it is used). Defaults to {}.
140 Functions intended to call in orchestration:
141 infiltrate (before sewers are discharged)
143 distribute
145 Key assumptions:
146 - Conceptualises groundwater as a tank.
147 - Baseflow is generated following a residence-time method.
148 - Baseflow is sent to `storage.py/River`, `nodes.py/Node` or
149 `waste.py/Waste` nodes.
150 - Infiltration to `sewer.py/Sewer` nodes occurs when the storage
151 in the tank is greater than a specified threshold, at a rate
152 proportional to the sqrt of volume above the threshold. (Note, this
153 behaviour is __not validated__ and a high uncertainty process in
154 general)
155 - If `decays` are provided to model water quality transformations,
156 see `core.py/DecayObj`.
158 Input data and parameter requirements:
159 - Groundwater tank `capacity`, `area`, and `datum`.
160 _Units_: cubic metres, squared metres, metres
161 - Infiltration behaviour determined by an `infiltration_threshold`
162 and `infiltration_pct`. _Units_: proportion of capacity
163 - Optional dictionary of decays with pollutants as keys and decay
164 parameters (a constant and a temperature sensitivity exponent) as
165 values. _Units_: -
166 """
167 self.residence_time = residence_time
168 self.infiltration_threshold = infiltration_threshold
169 self.infiltration_pct = infiltration_pct
170 # TODO not used data_input
171 self.data_input_dict = data_input_dict
172 super().__init__(**kwargs)
174 def distribute(self):
175 """Calculate outflow with residence time and send to Nodes or Rivers."""
176 avail = self.tank.get_avail()["volume"] / self.residence_time
177 to_send = self.tank.pull_storage({"volume": avail})
178 retained = self.push_distributed(to_send, of_type=["Node", "River", "Waste"])
179 _ = self.tank.push_storage(retained, force=True)
180 if retained["volume"] > constants.FLOAT_ACCURACY:
181 print("Storage unable to push")
183 def infiltrate(self):
184 """Calculate amount of water available for infiltration and send to sewers."""
185 # Calculate infiltration
186 avail = self.tank.get_avail()["volume"]
187 avail = max(avail - self.tank.capacity * self.infiltration_threshold, 0)
188 avail = (avail * self.infiltration_pct) ** 0.5
190 # Push to sewers
191 to_send = self.tank.pull_storage({"volume": avail})
192 retained = self.push_distributed(to_send, of_type="Sewer")
193 _ = self.tank.push_storage(retained, force=True)
194 # Any not sent is left in tank
195 if retained["volume"] > constants.FLOAT_ACCURACY:
196 # print('unable to infiltrate')
197 pass
200class QueueGroundwater(Storage):
201 """"""
203 # TODO - no infiltration as yet
204 def __init__(self, timearea={0: 1}, data_input_dict={}, **kwargs):
205 """Alternate formulation of Groundwater that uses a timearea property to enable
206 more nonlinear time behaviour of baseflow routing. Uses the QueueTank or
207 DecayQueueTank (see nodes.py/Tank subclassses).
209 NOTE: abstraction behaviour from this kind of node need careful checking
211 Args:
212 timearea (dict, optional): Time area diagram that enables flows to
213 take a range of different durations to 'traverse' the tank. The keys of
214 the dict are the number of timesteps while the values are the proportion
215 of flow. E.g., {0 : 0.7, 1 : 0.3} means 70% of flow takes 0 timesteps
216 and 30% takes 1 timesteps. Defaults to {0 : 1}.
217 data_input_dict (dict, optional): Dictionary of data inputs relevant for
218 the node (though I don't think it is used). Defaults to {}.
220 Functions intended to call in orchestration:
221 distribute
223 Key assumptions:
224 - Conceptualises groundwater as a tank.
225 - Baseflow is generated following a timearea method.
226 - Baseflow is sent to `storage.py/River`, `nodes.py/Node` or
227 `waste.py/Waste` nodes.
228 - No infiltration to sewers is modelled.
229 - If `decays` are provided to model water quality transformations,
230 see `core.py/DecayObj`.
232 Input data and parameter requirements:
233 - Groundwater tank `capacity`, `area`, and `datum`.
234 _Units_: cubic metres, squared metres, metres
235 - `timearea` is a dictionary containing the timearea diagram.
236 _Units_: duration of flow (in timesteps) and proportion of flow
237 - Optional dictionary of decays with pollutants as keys and decay
238 parameters (a constant and a temperature sensitivity exponent) as
239 values. _Units_: -
240 """
241 self.timearea = timearea
242 # TODO not used
243 self.data_input_dict = data_input_dict
244 super().__init__(**kwargs)
245 # Label as Groundwater class so that other nodes treat it the same
246 self.__class__.__name__ = "Groundwater"
247 # Update handlers
248 self.push_set_handler["default"] = self.push_set_timearea
249 self.pull_set_handler["default"] = self.pull_set_active
250 self.pull_check_handler["default"] = self.pull_check_active
251 # Enable decay
252 if self.decays is None:
253 self.tank = QueueTank(
254 capacity=self.capacity,
255 area=self.area,
256 datum=self.datum,
257 initial_storage=self.initial_storage,
258 )
259 else:
260 self.tank = DecayQueueTank(
261 capacity=self.capacity,
262 area=self.area,
263 datum=self.datum,
264 decays=self.decays,
265 parent=self,
266 initial_storage=self.initial_storage,
267 )
269 def push_set_timearea(self, vqip):
270 """Push setting that enables timearea behaviour, (see __init__ for
271 description).Used to receive flow that is assumed to occur widely across some
272 kind of catchment.
274 Args:
275 vqip (dict): A VQIP that has been pushed
277 Returns:
278 reply (dict): A VQIP amount that was not successfuly receivesd
279 """
280 reply = self.empty_vqip()
281 # Iterate over timearea diagram TODO timearea diagram behaviour be generalised
282 # across nodes
283 for time, normalised in self.timearea.items():
284 vqip_ = self.v_change_vqip(vqip, vqip["volume"] * normalised)
285 reply_ = self.tank.push_storage(vqip_, time=time)
286 reply = self.sum_vqip(reply, reply_)
287 return reply
289 def distribute(self):
290 """Update internal arc, push active_storage onwards, update tank."""
291 _ = self.tank.internal_arc.update_queue(direction="push")
293 remaining = self.push_distributed(self.tank.active_storage)
295 if remaining["volume"] > constants.FLOAT_ACCURACY:
296 print("Groundwater couldnt push all")
298 # Update tank
299 sent = self.tank.active_storage["volume"] - remaining["volume"]
300 sent = self.v_change_vqip(self.tank.active_storage, sent)
301 reply = self.tank.pull_storage(sent)
302 if (reply["volume"] - sent["volume"]) > constants.FLOAT_ACCURACY:
303 print("Miscalculated tank storage in discharge")
305 def infiltrate(self):
306 """"""
307 pass
309 def pull_check_active(self, vqip=None):
310 """A pull check that returns the active storage.
312 Args:
313 vqip (dict, optional): A VQIP that can be used to limit the volume in
314 the return value (only volume key is used). Defaults to None.
316 Returns:
317 (dict): A VQIP amount that is available to pull
318 """
319 if vqip is None:
320 return self.tank.active_storage
321 else:
322 reply = min(vqip["volume"], self.tank.active_storage["volume"])
323 return self.v_change_vqip(self.tank.active_storage, reply)
325 def pull_set_active(self, vqip):
326 # TODO - this is quite weird behaviour, and inconsistent with pull_check_active
327 """Pull proportionately from both the active storage and the queue. Adjudging
328 groundwater abstractions to not be particularly sensitive to the within
329 catchment travel time.
331 Args:
332 vqip (dict): A VQIP amount to be pulled (only volume key is used)
334 Returns:
335 pulled (dict): A VQIP amount that was successfully pulled
336 """
337 # Calculate actual pull
338 total_storage = self.tank.storage["volume"]
339 total_pull = min(self.tank.storage["volume"], vqip["volume"])
341 if total_pull < constants.FLOAT_ACCURACY:
342 return self.empty_vqip()
343 else:
344 # Track total pull in pulled
345 pulled = self.empty_vqip()
346 # Iterate over queue
347 if isinstance(self.tank.internal_arc.queue, dict):
348 for t, v in self.tank.internal_arc.queue.items():
349 # Pull proportionately
350 t_pulled = self.v_change_vqip(
351 self.tank.internal_arc.queue[t],
352 v["volume"] * total_pull / total_storage,
353 )
354 # Reduce queue VQIPs
355 self.tank.internal_arc.queue[t] = self.extract_vqip(
356 self.tank.internal_arc.queue[t], t_pulled
357 )
358 # Track pull
359 pulled = self.sum_vqip(pulled, t_pulled)
360 # Pull also from active storage
361 a_pulled = self.v_change_vqip(
362 self.tank.active_storage,
363 self.tank.active_storage["volume"] * total_pull / total_storage,
364 )
365 self.tank.active_storage = self.extract_vqip(
366 self.tank.active_storage, a_pulled
367 )
368 pulled = self.sum_vqip(pulled, a_pulled)
370 # Recalculate storage
371 self.tank.storage = self.extract_vqip(self.tank.storage, pulled)
372 return pulled
373 # elif isinstance(self.tank.internal_arc.queue, list): for req in
374 # self.tank.internal_arc.queue: t_pulled = req['vqtip']['volume'] *
375 # total_pull / total_storage req['vqtip'] =
376 # self.v_change_vqip(req['vqtip'], req['vqtip']['volume'] -
377 # t_pulled) pulled += t_pulled a_pulled =
378 # self.tank.active_storage['volume'] * total_pull / total_storage
379 # self.tank.active_storage =
380 # self.v_change_vqip(self.tank.active_storage,
381 # self.tank.active_storage['volume'] - a_pulled) pulled += a_pulled
383 # #Recalculate storage - doing this differently causes numerical errors
384 # new_v = sum([x['vqtip']['volume'] for x in
385 # self.tank.internal_arc.queue])+ self.tank.active_storage['volume']
386 # self.tank.storage = self.v_change_vqip(self.tank.storage, new_v)
388 # return self.v_change_vqip(self.tank.storage, pulled)
391class River(Storage):
392 """"""
394 # TODO non-day timestep
395 def __init__(
396 self,
397 depth=2,
398 length=200,
399 width=20,
400 velocity=0.2 * constants.M_S_TO_M_DT,
401 damp=0.1,
402 mrf=0,
403 **kwargs,
404 ):
405 """Node that contains extensive in-river biochemical processes.
407 Args:
408 depth (float, optional): River tank depth. Defaults to 2. length (float,
409 optional): River tank length. Defaults to 200. width (float, optional):
410 River tank width. Defaults to 20. velocity (float, optional): River velocity
411 (if someone wants to calculate
412 this on the fly that would also work). Defaults to
413 0.2*constants.M_S_TO_M_DT.
414 damp (float, optional): Flow delay and attentuation parameter. Defaults
415 to 0.1.
416 mrf (float, optional): Minimum required flow in river (volume per timestep),
417 can limit pulls made to the river. Defaults to 0.
419 Functions intended to call in orchestration:
420 distribute
422 Key assumptions:
423 - River is conceptualised as a water tank that receives flows from various
424 sources (e.g., runoffs from urban and rural land, baseflow from
425 groundwater), interacts with water infrastructure (e.g., abstraction for
426 irrigation and domestic supply, sewage and treated effluent discharge),
427 and discharges flows downstream. It has length and width as shape
428 parameters, average velocity to indicate flow speed and capacity to
429 indicate the maximum storage limit.
430 - Flows from different sources into rivers will fully mix. River tank is
431 assumed to
432 have delay and attenuation effects when generate outflows. These effects
433 are simulated based on the average velocity.
434 - In-river biochemical processes are simulated as sources/sinks of
435 nutrients
436 in the river tank, including - denitrification (for nitrogen) -
437 phytoplankton absorption/release (for nitrogen and phosphorus) -
438 macrophyte uptake (for nitrogen and phosphorus) These processes are
439 affected by river temperature.
441 Input data and parameter requirements:
442 - depth, length, width
443 _Units_: m
444 - velocity
445 _Units_: m/day
446 - damping coefficient
447 _Units_: -
448 - minimum required flow
449 _Units_: m3/day
450 """
451 # Set parameters
452 self.depth = depth # [m]
453 self.length = length # [m]
454 self.width = width # [m]
455 self.velocity = velocity # [m/dt]
456 self.damp = damp # [>=0] flow delay and attenuation
457 self.mrf = mrf
458 area = length * width # [m2]
460 capacity = (
461 constants.UNBOUNDED_CAPACITY
462 ) # TODO might be depth * area if flood indunation is going to be simulated
464 # Required in cases where 'area' conflicts with length*width
465 kwargs["area"] = area
466 # Required in cases where 'capacity' conflicts with depth*area
467 kwargs["capacity"] = capacity
469 super().__init__(**kwargs)
471 # TODO check units TODO Will a user want to change any of these? Wide variety of
472 # river parameters (from HYPE)
473 self.uptake_PNratio = 1 / 7.2 # [-] P:N during crop uptake
474 self.bulk_density = 1300 # [kg/m3] soil density
475 self.denpar_w = 0.0015 # 0.001, # [kg/m2/day] reference denitrification rate
476 # in water course
477 self.T_wdays = 5 # [days] weighting constant for river temperature calculation
478 # (similar to moving average period)
479 self.halfsatINwater = (
480 1.5 * constants.MG_L_TO_KG_M3
481 ) # [kg/m3] half saturation parameter for denitrification in river
482 self.T_10_days = [] # [degree C] average water temperature of 10 days
483 self.T_20_days = [] # [degree C] average water temperature of 20 days
484 self.TP_365_days = [] # [degree C] average water temperature of 20 days
485 self.hsatTP = 0.05 * constants.MG_L_TO_KG_M3 # [kg/m3]
486 self.limpppar = 0.1 * constants.MG_L_TO_KG_M3 # [kg/m3]
487 self.prodNpar = 0.001 # [kg N/m3/day] nitrogen production/mineralisation rate
488 self.prodPpar = (
489 0.0001 # [kg N/m3/day] phosphorus production/mineralisation rate
490 )
491 self.muptNpar = 0.001 # [kg/m2/day] nitrogen macrophyte uptake rate
492 self.muptPpar = 0.0001 # 0.01, # [kg/m2/day] phosphorus macrophyte uptake rate
493 self.qbank_365_days = [1e6, 1e6] # [m3/day] store outflow in the previous year
494 self.qbank = (
495 1e6 # [m3/day] bankfull flow = second largest outflow in the previous year
496 )
497 self.qbankcorrpar = 0.001 # [-] correction coefficient for qbank flow
498 self.sedexppar = 1 # [-]
499 self.EPC0 = 0.05 * constants.MG_L_TO_KG_M3 # [mg/l]
500 self.kd_s = 0 * constants.MG_L_TO_KG_M3 # 6 * 1e-6, # [kg/m3]
501 self.kadsdes_s = 2 # 0.9, # [-]
502 self.Dsed = 0.2 # [m]
504 self.max_temp_lag = 20
505 self.lagged_temperatures = []
507 self.max_phosphorus_lag = 365
508 self.lagged_total_phosphorus = []
510 self.din_components = ["ammonia", "nitrate"]
511 # TODO need a cleaner way to do this depending on whether e.g., nitrite is
512 # included
514 # Initialise paramters
515 self.current_depth = 0 # [m]
516 # self.river_temperature = 0 # [degree C] self.river_denitrification = 0 #
517 # [kg/day] self.macrophyte_uptake_N = 0 # [kg/day] self.macrophyte_uptake_P = 0
518 # # [kg/day] self.sediment_particulate_phosphorus_pool = 60000 # [kg]
519 # self.sediment_pool = 1000000 # [kg] self.benthos_source_sink = 0 # [kg/day]
520 # self.t_res = 0 # [day] self.outflow = self.empty_vqip()
522 # Update end_teimstep
523 self.end_timestep = self.end_timestep_
525 # Update handlers
526 self.push_set_handler["default"] = self.push_set_river
527 self.push_check_handler["default"] = self.push_check_accept
529 self.pull_check_handler["default"] = self.pull_check_river
530 self.pull_set_handler["default"] = self.pull_set_river
532 # TODO - RiparianBuffer
533 self.pull_check_handler[("RiparianBuffer", "volume")] = self.pull_check_fp
535 # Update mass balance
536 self.bio_in = self.empty_vqip()
537 self.bio_out = self.empty_vqip()
539 self.mass_balance_in.append(lambda: self.bio_in)
540 self.mass_balance_out.append(lambda: self.bio_out)
542 # TODO something like this might be needed if you want sewers backing up from river
543 # height... would need to incorporate expected river outflow def get_dt_excess(self,
544 # vqip = None): reply = self.empty_vqip() reply['volume'] =
545 # self.tank.get_excess()['volume'] + self.tank.get_avail()['volume'] *
546 # self.get_riverrc() if vqip is not None: reply['volume'] = min(vqip['volume'],
547 # reply['volume']) return reply
549 # def push_set_river(self, vqip): vqip_ = vqip.copy() vqip_ =
550 # self.v_change_vqip(vqip_, min(vqip_['volume'],
551 # self.get_dt_excess()['volume'])) _ = self.tank.push_storage(vqip_, force=True)
552 # return self.extract_vqip(vqip, vqip_)
554 def pull_check_river(self, vqip=None):
555 """Check amount of water that can be pulled from river tank and upstream.
557 Args:
558 vqip (dict, optional): Maximum water required (only 'volume' is used)
560 Returns:
561 avail (dict): A VQIP amount that can be pulled
562 """
563 # Get storage
564 avail = self.tank.get_avail()
566 # Check incoming
567 upstream = self.get_connected(direction="pull", of_type=["River", "Node"])
568 avail["volume"] += upstream["avail"]
570 # convert mrf from volume/timestep to discrete value
571 mrf = self.mrf / self.get_riverrc()
573 # Apply mrf
574 avail_vol = max(avail["volume"] - mrf, 0)
575 if vqip is None:
576 avail = self.v_change_vqip(avail, avail_vol)
577 else:
578 avail = self.v_change_vqip(avail, min(avail_vol, vqip["volume"]))
580 return avail
582 def pull_set_river(self, vqip):
583 """Pull from river tank and upstream, acknowledging MRF with pull_check.
585 Args:
586 vqip (dict): A VQIP amount to pull (only volume key used)
588 Returns:
589 (dict): A VQIP amount that was pulled
590 """
591 # Calculate available pull
592 avail = self.pull_check_river(vqip)
594 # Take first from tank
595 pulled = self.tank.pull_storage(avail)
597 # Take remaining from upstream
598 to_pull = {"volume": avail["volume"] - pulled["volume"]}
599 pulled_ = self.pull_distributed(to_pull, of_type=["River", "Node"])
601 reply = self.sum_vqip(pulled, pulled_)
603 return reply
605 def push_set_river(self, vqip):
606 """Push to river tank, currently forced.
608 Args:
609 vqip (dict): A VQIP amount to push
611 Returns:
612 (dict): A VQIP amount that was not successfully received
613 """
614 _ = self.tank.push_storage(vqip, force=True)
615 return self.empty_vqip()
617 def update_depth(self):
618 """Recalculate depth."""
619 self.current_depth = self.tank.storage["volume"] / self.area
621 def get_din_pool(self):
622 """Get total dissolved inorganic nitrogen from tank storage.
624 Returns:
625 (float): total din
626 """
627 return sum(
628 [self.tank.storage[x] for x in self.din_components]
629 ) # TODO + self.tank.storage['nitrite'] but nitrite might not be modelled...
630 # need some ways to address this
632 def biochemical_processes(self):
633 """Runs all biochemical processes and updates pollutant amounts.
635 Returns:
636 in_ (dict): A VQIP amount that represents total gain in pollutant amounts
637 out_ (dict): A VQIP amount that represents total loss in pollutant amounts
638 """
639 # TODO make more modular
640 self.update_depth()
642 self.tank.storage["temperature"] = (1 - 1 / self.T_wdays) * self.tank.storage[
643 "temperature"
644 ] + (1 / self.T_wdays) * self.get_data_input("temperature")
646 # Update lagged temperatures
647 if len(self.lagged_temperatures) > self.max_temp_lag:
648 del self.lagged_temperatures[0]
649 self.lagged_temperatures.append(self.tank.storage["temperature"])
651 # Update lagged total phosphorus
652 if len(self.lagged_total_phosphorus) > self.max_phosphorus_lag:
653 del self.lagged_total_phosphorus[0]
654 total_phosphorus = (
655 self.tank.storage["phosphate"] + self.tank.storage["org-phosphorus"]
656 )
657 self.lagged_total_phosphorus.append(total_phosphorus)
659 # Check if any water
660 if self.tank.storage["volume"] < constants.FLOAT_ACCURACY:
661 # Assume these only do something when there is water
662 return (self.empty_vqip(), self.empty_vqip())
664 if self.tank.storage["temperature"] <= 0:
665 # Seems that these things are only active when above freezing
666 return (self.empty_vqip(), self.empty_vqip())
668 # Denitrification
669 tempfcn = 2 ** ((self.tank.storage["temperature"] - 20) / 10)
670 if self.tank.storage["temperature"] < 5:
671 tempfcn *= self.tank.storage["temperature"] / 5
673 din = self.get_din_pool()
674 din_concentration = din / self.tank.storage["volume"]
675 confcn = din_concentration / (
676 din_concentration + self.halfsatINwater
677 ) # [kg/m3]
678 denitri_water = (
679 self.denpar_w * self.area * tempfcn * confcn
680 ) # [kg/day] #TODO convert to per DT
682 river_denitrification = min(
683 denitri_water, 0.5 * din
684 ) # [kg/day] max 50% kan be denitrified
685 din_ = din - river_denitrification # [kg]
687 # Update mass balance
688 in_ = self.empty_vqip()
689 out_ = self.empty_vqip()
690 if din > 0:
691 for pol in self.din_components:
692 # denitrification
693 loss = (din - din_) / din * self.tank.storage[pol]
694 out_[pol] += loss
695 self.tank.storage[pol] -= loss
697 din = self.get_din_pool()
699 # Calculate moving averages TODO generalise
700 temp_10_day = sum(self.lagged_temperatures[-10:]) / 10
701 temp_20_day = sum(self.lagged_temperatures[-20:]) / 20
702 total_phos_365_day = sum(self.lagged_total_phosphorus) / self.max_phosphorus_lag
704 # Calculate coefficients
705 tempfcn = (
706 (self.tank.storage["temperature"]) / 20 * (temp_10_day - temp_20_day) / 5
707 )
708 if (total_phos_365_day - self.limpppar + self.hsatTP) > 0:
709 totalphosfcn = (total_phos_365_day - self.limpppar) / (
710 total_phos_365_day - self.limpppar + self.hsatTP
711 )
712 else:
713 totalphosfcn = 0
715 # Mineralisation/production TODO this feels like it could be much tidier
716 minprodN = (
717 self.prodNpar * totalphosfcn * tempfcn * self.area * self.current_depth
718 ) # [kg N/day]
719 minprodP = (
720 self.prodPpar
721 * totalphosfcn
722 * tempfcn
723 * self.area
724 * self.current_depth
725 * self.uptake_PNratio
726 ) # [kg N/day]
727 if minprodN > 0:
728 # production (inorg -> org)
729 minprodN = min(
730 0.5 * din, minprodN
731 ) # only half pool can be used for production
732 minprodP = min(
733 0.5 * self.tank.storage["phosphate"], minprodP
734 ) # only half pool can be used for production
736 # Update mass balance
737 out_["phosphate"] = minprodP
738 self.tank.storage["phosphate"] -= minprodP
739 in_["org-phosphorus"] = minprodP
740 self.tank.storage["org-phosphorus"] += minprodP
741 if din > 0:
742 for pol in self.din_components:
743 loss = minprodN * self.tank.storage[pol] / din
744 out_[pol] += loss
745 self.tank.storage[pol] -= loss
747 in_["org-nitrogen"] = minprodN
748 self.tank.storage["org-nitrogen"] += minprodN
750 else:
751 # mineralisation (org -> inorg)
752 minprodN = min(0.5 * self.tank.storage["org-nitrogen"], -minprodN)
753 minprodP = min(0.5 * self.tank.storage["org-phosphorus"], -minprodP)
755 # Update mass balance
756 in_["phosphate"] = minprodP
757 self.tank.storage["phosphate"] += minprodP
758 out_["org-phosphorus"] = minprodP
759 self.tank.storage["org-phosphorus"] -= minprodP
760 if din > 0:
761 for pol in self.din_components:
762 gain = minprodN * self.tank.storage[pol] / din
763 in_[pol] += gain
764 self.tank.storage[pol] += gain
766 out_["org-nitrogen"] = minprodN
767 self.tank.storage["org-nitrogen"] -= minprodN
769 din = self.get_din_pool()
771 # macrophyte uptake temperature dependence factor
772 tempfcn1 = (max(0, self.tank.storage["temperature"]) / 20) ** 0.3
773 tempfcn2 = (self.tank.storage["temperature"] - temp_20_day) / 5
774 tempfcn = max(0, tempfcn1 * tempfcn2)
776 macrouptN = self.muptNpar * tempfcn * self.area # [kg/day]
777 macrophyte_uptake_N = min(0.5 * din, macrouptN)
778 if din > 0:
779 for pol in self.din_components:
780 loss = macrophyte_uptake_N * self.tank.storage[pol] / din
781 out_[pol] += loss
782 self.tank.storage[pol] -= loss
784 macrouptP = (
785 self.muptPpar * tempfcn * max(0, totalphosfcn) * self.area
786 ) # [kg/day]
787 macrophyte_uptake_P = min(0.5 * self.tank.storage["phosphate"], macrouptP)
788 out_["phosphate"] += macrophyte_uptake_P
789 self.tank.storage["phosphate"] -= macrophyte_uptake_P
791 # TODO source/sink for benthos sediment P suspension/resuspension
792 return in_, out_
794 def get_riverrc(self):
795 """Get river outflow coefficient (i.e., how much water leaves the tank in this
796 timestep).
798 Returns:
799 riverrc (float): outflow coeffficient
800 """
801 # Calculate travel time
802 total_time = self.length / self.velocity
803 # Apply damp
804 kt = self.damp * total_time # [day]
805 if kt != 0:
806 riverrc = 1 - kt + kt * exp(-1 / kt) # [-]
807 else:
808 riverrc = 1
809 return riverrc
811 def calculate_discharge(self):
812 """"""
813 if "nitrate" in constants.POLLUTANTS:
814 # TODO clumsy Run biochemical processes
815 in_, out_ = self.biochemical_processes()
816 # Mass balance
817 self.bio_in = in_
818 self.bio_out = out_
820 def distribute(self):
821 """Run biochemical processes, track mass balance, and distribute water."""
822 # self.calculate_discharge() Get outflow
823 outflow = self.tank.pull_storage(
824 {"volume": self.tank.storage["volume"] * self.get_riverrc()}
825 )
826 # Distribute outflow
827 reply = self.push_distributed(outflow, of_type=["River", "Node", "Waste"])
828 _ = self.tank.push_storage(reply, force=True)
829 if reply["volume"] > constants.FLOAT_ACCURACY:
830 print("river cant push: {0}".format(reply["volume"]))
832 def pull_check_fp(self, vqip=None):
833 """
835 Args:
836 vqip:
838 Returns:
840 """
841 # TODO Pull checking for riparian buffer, needs updating update river depth
842 self.update_depth()
843 return self.current_depth, self.area, self.width, self.river_tank.storage
845 def end_timestep_(self):
846 """Update state variables."""
847 self.tank.end_timestep()
848 self.bio_in = self.empty_vqip()
849 self.bio_out = self.empty_vqip()
852class Reservoir(Storage):
853 """"""
855 def __init__(self, **kwargs):
856 """Storage node that makes abstractions by calling pull_distributed.
858 Functions intended to call in orchestration:
859 make_abstractions (before any river routing)
861 Key assumptions:
862 - Conceptualised as a `Tank`.
863 - Recharged only via pumped abstractions.
864 - Evaporation/precipitation onto surface area currently ignored.
865 - If `decays` are provided to model water quality transformations,
866 see `core.py/DecayObj`.
868 Input data and parameter requirements:
869 - Tank `capacity`, `area`, and `datum`.
870 _Units_: cubic metres, squared metres, metres
871 - Optional dictionary of decays with pollutants as keys and decay
872 parameters (a constant and a temperature sensitivity exponent) as
873 values. _Units_: -
874 """
875 super().__init__(**kwargs)
877 def make_abstractions(self):
878 """Pulls water and updates tanks."""
879 reply = self.pull_distributed(self.tank.get_excess())
880 spill = self.tank.push_storage(reply)
881 _ = self.tank.push_storage(spill, force=True)
882 if spill["volume"] > constants.FLOAT_ACCURACY:
883 print("Spill at reservoir by {0}".format(spill["volume"]))
886class RiverReservoir(Reservoir):
887 """"""
889 def __init__(self, environmental_flow=0, **kwargs):
890 """A reservoir with a natural river inflow, includes an environmental downstream
891 flow to satisfy.
893 Args:
894 environmental_flow (float, optional): Downstream environmental flow.
895 Defaults to 0.
897 Functions intended to call in orchestration:
898 make_abstractions (if any)
900 satisfy_environmental (before river routing.. possibly before
901 downstream river abstractions depending on licence)
903 Key assumptions:
904 - Conceptualised as a `Tank`.
905 - Recharged via pumped abstractions and receives water from
906 inflowing arcs.
907 - Reservoir aims to satisfy a static `environmental_flow`.
908 - If tank capacity is exceeded, reservoir spills downstream
909 towards `nodes.py/Node`, `storage.py/River` or `waste.py/Waste` nodes.
910 Spill counts towards `environmental_flow`.
911 - Evaporation/precipitation onto surface area currently ignored.
912 - Currently, if a reservoir satisfies a pull from a downstream
913 node, it does __not__ count towards `environmental_flow`.
914 - If `decays` are provided to model water quality transformations,
915 see `core.py/DecayObj`.
917 Input data and parameter requirements:
918 - Tank `capacity`, `area`, and `datum`.
919 _Units_: cubic metres, squared metres, metres
920 - `environmental_flow`
921 _Units_: cubic metres/timestep
922 - Optional dictionary of decays with pollutants as keys and decay
923 parameters (a constant and a temperature sensitivity exponent) as
924 values. _Units_: -
925 """
926 # Parameters
927 self.environmental_flow = environmental_flow
928 super().__init__(**kwargs)
930 # State variables
931 self.total_environmental_satisfied = 0
933 self.push_set_handler["default"] = self.push_set_river_reservoir
934 self.push_check_handler["default"] = self.push_check_river_reservoir
935 self.end_timestep = self.end_timestep_
937 self.__class__.__name__ = "Reservoir"
939 def push_set_river_reservoir(self, vqip):
940 """Receive water.
942 Args:
943 vqip (dict): A VQIP amount to be received
945 Returns:
946 reply (dict): A VQIP amount that was not successfully received
947 """
948 # Apply normal reservoir storage We do this under the assumption that spill is
949 # mixed in with the reservoir If the reservoir can't spill everything you'll get
950 # some weird numbers in reply, but if your reservoir can't spill as much as you
951 # like then you should probably be pushing the right amount through
952 # push_check_river_reservoir Some cunning could probably avoid this by checking
953 # vqip, but this is a serious edge case
954 _ = self.tank.push_storage(vqip, force=True)
955 spill = self.tank.pull_ponded()
957 # Send spill downstream
958 reply = self.push_distributed(spill, of_type=["Node", "River", "Waste"])
960 # Use spill to satisfy downstream flow
961 self.total_environmental_satisfied += spill["volume"] - reply["volume"]
963 return reply
965 def push_check_river_reservoir(self, vqip=None):
966 """A push check to receive water, assumes spill may occur and checks downstream
967 capacity.
969 Args:
970 vqip (dict, optional): A VQIP that can be used to limit the volume in
971 the return value (only volume key is used). Defaults to None.
973 Returns:
974 excess (dict): A VQIP amount of water that cannot be received
975 """
976 # Check downstream capacity (i.e., that would be spilled)
977 downstream_availability = self.get_connected(
978 direction="push", of_type=["Node", "River", "Waste"]
979 )["avail"]
980 # Check excess capacity in the reservoir
981 excess = self.tank.get_excess()
982 # Combine excess and downstream in response
983 new_v = excess["volume"] + downstream_availability
984 if vqip is not None:
985 new_v = min(vqip["volume"], new_v)
986 # Update to vqip
987 excess = self.v_change_vqip(excess, new_v)
989 return excess
991 def satisfy_environmental(self):
992 """Satisfy environmental flow downstream."""
993 # Calculate how much environmental flow is yet to satisfy #(some may have been
994 # already if pull-and-take abstractions have taken place)
995 to_satisfy = max(
996 self.environmental_flow - self.total_environmental_satisfied, 0
997 )
998 # Pull from tank
999 environmental = self.tank.pull_storage({"volume": to_satisfy})
1000 # Send downstream
1001 reply = self.push_distributed(environmental)
1002 _ = self.tank.push_storage(reply, force=True)
1003 if reply["volume"] > constants.FLOAT_ACCURACY:
1004 print("warning: environmental not able to push")
1006 # Update satisfaction
1007 self.total_environmental_satisfied += environmental["volume"]
1009 def end_timestep_(self):
1010 """Udpate state varibles."""
1011 self.tank.end_timestep()
1012 self.total_environmental_satisfied = 0