Coverage for wsimod\nodes\wtw.py: 18%
211 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-24 11:16 +0100
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-24 11:16 +0100
1# -*- coding: utf-8 -*-
2"""Created on Mon Nov 15 14:20:36 2021.
4@author: bdobson
5Converted to totals on 2022-05-03
6"""
7from typing import Any, Dict
9from wsimod.core import constants
10from wsimod.nodes.nodes import Node
11from wsimod.nodes.tanks import Tank
14class WTW(Node):
15 """A generic Water Treatment Works (WTW) node.
17 This class is a generic water treatment works node. It is intended to be
18 subclassed into freshwater and wastewater treatment works (FWTW and WWTW
19 respectively).
20 """
22 def __init__(
23 self,
24 name,
25 treatment_throughput_capacity=10,
26 process_parameters={},
27 liquor_multiplier={},
28 percent_solids=0.0002,
29 ):
30 """Generic treatment processes that apply temperature a sensitive transform of
31 pollutants into liquor and solids (behaviour depends on subclass). Push requests
32 are stored in the current_input state variable, but treatment must be triggered
33 with treat_current_input. This treated water is stored in the discharge_holder
34 state variable, which will be sent different depending on FWTW/WWTW.
36 Args:
37 name (str): Node name
38 treatment_throughput_capacity (float, optional): Amount of volume per
39 timestep of water that can be treated. Defaults to 10.
40 process_parameters (dict, optional): Dict of dicts for each pollutant.
41 Top level key describes pollutant. Next level key describes the
42 constant portion of the transform and the temperature sensitive
43 exponent portion (see core.py/DecayObj for more detailed
44 explanation). Defaults to {}.
45 liquor_multiplier (dict, optional): Keys for each pollutant that
46 describes how much influent becomes liquor. Defaults to {}.
47 percent_solids (float, optional): Proportion of volume that becomes solids.
48 All pollutants that do not become effluent or liquor become solids.
49 Defaults to 0.0002.
51 Functions intended to call in orchestration:
52 None (use FWTW or WWTW subclass)
54 Key assumptions:
55 - Throughput can be modelled entirely with a set capacity.
56 - Pollutant reduction for the entire treatment process can be modelled
57 primarily with a single (temperature sensitive) transformation for
58 each pollutant.
59 - Liquor and solids are tracked and calculated with proportional
60 multiplier parameters.
62 Input data and parameter requirements:
63 - `treatment_throughput_capacity`
64 _Units_: cubic metres/timestep
65 - `process_parameters` contains the constant (non temperature
66 sensitive) and exponent (temperature sensitive) transformations
67 applied to treated water for each pollutant.
68 _Units_: -
69 - `liquor_multiplier` and `percent_solids` describe the proportion of
70 throughput that goes to liquor/solids.
71 """
72 # Set/Default parameters
73 self.treatment_throughput_capacity = treatment_throughput_capacity
74 if len(process_parameters) > 0:
75 self.process_parameters = process_parameters
76 else:
77 self.process_parameters = {
78 x: {"constant": 0.01, "exponent": 1.001}
79 for x in constants.ADDITIVE_POLLUTANTS
80 }
81 if len(liquor_multiplier) > 0:
82 self._liquor_multiplier = liquor_multiplier
83 else:
84 self._liquor_multiplier = {x: 0.7 for x in constants.ADDITIVE_POLLUTANTS}
85 self._liquor_multiplier["volume"] = 0.03
87 self._percent_solids = percent_solids
89 # Update args
90 super().__init__(name)
92 self.process_parameters["volume"] = {"constant": self.calculate_volume()}
94 # Update handlers
95 self.push_set_handler["default"] = self.push_set_deny
96 self.push_check_handler["default"] = self.push_check_deny
98 # Initialise parameters
99 self.current_input = self.empty_vqip()
100 self.treated = self.empty_vqip()
101 self.liquor = self.empty_vqip()
102 self.solids = self.empty_vqip()
104 def calculate_volume(self):
105 """Calculate the volume proportion of treated water.
107 Returns:
108 (float): Volume of treated water
109 """
110 return 1 - self._percent_solids - self._liquor_multiplier["volume"]
112 @property
113 def percent_solids(self):
114 return self._percent_solids
116 @percent_solids.setter
117 def percent_solids(self, value):
118 self._percent_solids = value
119 self.process_parameters["volume"]["constant"] = self.calculate_volume()
121 @property
122 def liquor_multiplier(self):
123 return self._liquor_multiplier
125 @liquor_multiplier.setter
126 def liquor_multiplier(self, value):
127 self._liquor_multiplier.update(value)
128 self.process_parameters["volume"]["constant"] = self.calculate_volume()
130 def apply_overrides(self, overrides=Dict[str, Any]):
131 """Override parameters.
133 Enables a user to override any of the following parameters:
134 percent_solids, treatment_throughput_capacity, process_parameters (the
135 entire dict does not need to be redefined, only changed values need to
136 be included), liquor_multiplier (as with process_parameters).
138 Args:
139 overrides (Dict[str, Any]): Dict describing which parameters should
140 be overridden (keys) and new values (values). Defaults to {}.
141 """
142 self.percent_solids = overrides.pop("percent_solids", self._percent_solids)
143 self.liquor_multiplier = overrides.pop(
144 "liquor_multiplier", self._liquor_multiplier
145 )
146 process_parameters = overrides.pop("process_parameters", {})
147 for key, value in process_parameters.items():
148 self.process_parameters[key].update(value)
150 self.treatment_throughput_capacity = overrides.pop(
151 "treatment_throughput_capacity", self.treatment_throughput_capacity
152 )
153 super().apply_overrides(overrides)
155 def get_excess_throughput(self):
156 """How much excess treatment capacity is there.
158 Returns:
159 (float): Amount of volume that can still be treated this timestep
160 """
161 return max(self.treatment_throughput_capacity - self.current_input["volume"], 0)
163 def treat_current_input(self):
164 """Run treatment processes this timestep, including temperature sensitive
165 transforms, liquoring, solids."""
166 # Treat current input
167 influent = self.copy_vqip(self.current_input)
169 # Calculate effluent, liquor and solids
170 discharge_holder = self.empty_vqip()
172 # Assume non-additive pollutants are unchanged in discharge and are
173 # proportionately mixed in liquor
174 for key in constants.NON_ADDITIVE_POLLUTANTS:
175 discharge_holder[key] = influent[key]
176 self.liquor[key] = (
177 self.liquor[key] * self.liquor["volume"]
178 + influent[key] * influent["volume"] * self.liquor_multiplier["volume"]
179 ) / (
180 self.liquor["volume"]
181 + influent["volume"] * self.liquor_multiplier["volume"]
182 )
184 # TODO this should probably just be for process_parameters.keys() to avoid
185 # having to declare non changing parameters
186 # TODO should the liquoring be temperature sensitive too? As it is the solids
187 # will take the brunt of the temperature variability which maybe isn't sensible
188 for key in constants.ADDITIVE_POLLUTANTS + ["volume"]:
189 if key != "volume":
190 # Temperature sensitive transform
191 temp_factor = self.process_parameters[key]["exponent"] ** (
192 constants.DECAY_REFERENCE_TEMPERATURE - influent["temperature"]
193 )
194 else:
195 temp_factor = 1
196 # Calculate discharge
197 discharge_holder[key] = (
198 influent[key] * self.process_parameters[key]["constant"] * temp_factor
199 )
200 # Calculate liquor
201 self.liquor[key] = influent[key] * self.liquor_multiplier[key]
203 # Calculate solids volume
204 self.solids["volume"] = influent["volume"] * self.percent_solids
206 # All remaining pollutants go to solids
207 for key in constants.ADDITIVE_POLLUTANTS:
208 self.solids[key] = influent[key] - discharge_holder[key] - self.liquor[key]
210 # Blend with any existing discharge
211 self.treated = self.sum_vqip(self.treated, discharge_holder)
213 if self.treated["volume"] > self.current_input["volume"]:
214 print("more treated than input")
216 def end_timestep(self):
217 """"""
218 # Reset state variables
219 self.current_input = self.empty_vqip()
220 self.treated = self.empty_vqip()
223class WWTW(WTW):
224 """Wastewater Treatment Works (WWTW) node."""
226 def __init__(
227 self,
228 stormwater_storage_capacity=10,
229 stormwater_storage_area=1,
230 stormwater_storage_elevation=10,
231 **kwargs,
232 ):
233 """A wastewater treatment works wrapper for WTW. Contains a temporary stormwater
234 storage tank. Liquor is combined with current_effluent and re- treated while
235 solids leave the model.
237 Args:
238 stormwater_storage_capacity (float, optional): Capacity of stormwater tank.
239 Defaults to 10.
240 stormwater_storage_area (float, optional): Area of stormwater tank.
241 Defaults to 1.
242 stormwater_storage_elevation (float, optional): Datum of stormwater tank.
243 Defaults to 10.
245 Functions intended to call in orchestration:
246 calculate_discharge
248 make_discharge
250 Key assumptions:
251 - See `wtw.py/WTW` for treatment.
252 - When `treatment_throughput_capacity` is exceeded, water is first sent
253 to a stormwater storage tank before denying pushes. Leftover water
254 in this tank aims to be treated in subsequent timesteps.
255 - Can be pulled from to simulate active wastewater effluent use.
257 Input data and parameter requirements:
258 - See `wtw.py/WTW` for treatment.
259 - Stormwater tank `capacity`, `area`, and `datum`.
260 _Units_: cubic metres, squared metres, metres
261 """
262 # Set parameters
263 self.stormwater_storage_capacity = stormwater_storage_capacity
264 self.stormwater_storage_area = stormwater_storage_area
265 self.stormwater_storage_elevation = stormwater_storage_elevation
267 # Update args
268 super().__init__(**kwargs)
270 self.end_timestep = self.end_timestep_
272 # Update handlers
273 self.pull_set_handler["default"] = self.pull_set_reuse
274 self.pull_check_handler["default"] = self.pull_check_reuse
275 self.push_set_handler["Sewer"] = self.push_set_sewer
276 self.push_check_handler["Sewer"] = self.push_check_sewer
277 self.push_check_handler["default"] = self.push_check_sewer
278 self.push_set_handler["default"] = self.push_set_sewer
280 # Create tank
281 self.stormwater_tank = Tank(
282 capacity=self.stormwater_storage_capacity,
283 area=self.stormwater_storage_area,
284 datum=self.stormwater_storage_elevation,
285 )
287 # Initialise states
288 self.liquor_ = self.empty_vqip()
289 self.previous_input = self.empty_vqip()
290 self.current_input = self.empty_vqip() # TODO is this not done in WTW?
292 # Mass balance
293 self.mass_balance_out.append(lambda: self.solids) # Assume these go to landfill
294 self.mass_balance_ds.append(lambda: self.stormwater_tank.ds())
295 self.mass_balance_ds.append(
296 lambda: self.ds_vqip(self.liquor, self.liquor_)
297 ) # Change in liquor
299 def apply_overrides(self, overrides=Dict[str, Any]):
300 """Apply overrides to the stormwater tank and WWTW.
302 Enables a user to override any parameter of the stormwater tank, and
303 then calls any overrides in WTW.
305 Args:
306 overrides (Dict[str, Any]): Dict describing which parameters should
307 be overridden (keys) and new values (values). Defaults to {}.
308 """
309 self.stormwater_storage_capacity = overrides.pop(
310 "stormwater_storage_capacity", self.stormwater_storage_capacity
311 )
312 self.stormwater_storage_area = overrides.pop(
313 "stormwater_storage_area", self.stormwater_storage_area
314 )
315 self.stormwater_storage_elevation = overrides.pop(
316 "stormwater_storage_elevation", self.stormwater_storage_elevation
317 )
318 self.stormwater_tank.area = self.stormwater_storage_area
319 self.stormwater_tank.capacity = self.stormwater_storage_capacity
320 self.stormwater_tank.datum = self.stormwater_storage_elevation
321 super().apply_overrides(overrides)
323 def calculate_discharge(self):
324 """Clear stormwater tank if possible, and call treat_current_input."""
325 # Run WWTW model
327 # Try to clear stormwater
328 # TODO (probably more tidy to use push_set_sewer? though maybe less
329 # computationally efficient)
330 excess = self.get_excess_throughput()
331 if (self.stormwater_tank.get_avail()["volume"] > constants.FLOAT_ACCURACY) & (
332 excess > constants.FLOAT_ACCURACY
333 ):
334 to_pull = min(excess, self.stormwater_tank.get_avail()["volume"])
335 to_pull = self.v_change_vqip(self.stormwater_tank.storage, to_pull)
336 cleared_stormwater = self.stormwater_tank.pull_storage(to_pull)
337 self.current_input = self.sum_vqip(self.current_input, cleared_stormwater)
339 # Run processes
340 self.current_input = self.sum_vqip(self.current_input, self.liquor)
341 self.treat_current_input()
343 def make_discharge(self):
344 """Discharge treated effluent."""
345 reply = self.push_distributed(self.treated)
346 self.treated = self.empty_vqip()
347 if reply["volume"] > constants.FLOAT_ACCURACY:
348 _ = self.stormwater_tank.push_storage(reply, force=True)
349 print("WWTW couldnt push")
351 def push_check_sewer(self, vqip=None):
352 """Check throughput and stormwater tank capacity.
354 Args:
355 vqip (dict, optional): A VQIP that can be used to limit the volume in
356 the return value (only volume key is used). Defaults to None.
358 Returns:
359 (dict): excess
360 """
361 # Get excess
362 excess_throughput = self.get_excess_throughput()
363 excess_tank = self.stormwater_tank.get_excess()
364 # Combine tank and throughput
365 vol = excess_tank["volume"] + excess_throughput
366 # Update volume
367 if vqip is None:
368 vqip = self.empty_vqip()
369 else:
370 vol = min(vol, vqip["volume"])
372 return self.v_change_vqip(vqip, vol)
374 def push_set_sewer(self, vqip):
375 """Receive water, first try to update current_input, and then stormwater tank.
377 Args:
378 vqip (dict): A VQIP amount to be treated and then stored
380 Returns:
381 (dict): A VQIP amount of water that was not treated
382 """
383 # Receive water from sewers
384 vqip = self.copy_vqip(vqip)
385 # Check if can directly be treated
386 sent_direct = self.get_excess_throughput()
388 sent_direct = min(sent_direct, vqip["volume"])
390 sent_direct = self.v_change_vqip(vqip, sent_direct)
392 self.current_input = self.sum_vqip(self.current_input, sent_direct)
394 if sent_direct["volume"] == vqip["volume"]:
395 # If all added to input, no problem
396 return self.empty_vqip()
398 # Next try temporary storage
399 vqip = self.v_change_vqip(vqip, vqip["volume"] - sent_direct["volume"])
401 vqip = self.stormwater_tank.push_storage(vqip)
403 if vqip["volume"] < constants.FLOAT_ACCURACY:
404 return self.empty_vqip()
405 else:
406 # TODO what to do here ???
407 return vqip
409 def pull_set_reuse(self, vqip):
410 """Enables WWTW to receive pulls of the treated water (i.e., for wastewater
411 reuse or satisfaction of environmental flows). Intended to be called in between
412 calculate_discharge and make_discharge.
414 Args:
415 vqip (dict): A VQIP amount to be pulled (only 'volume' key is used)
417 Returns:
418 reply (dict): Amount of water that has been pulled
419 """
420 # Satisfy request with treated (volume)
421 reply_vol = min(vqip["volume"], self.treated["volume"])
422 # Update pollutants
423 reply = self.v_change_vqip(self.treated, reply_vol)
424 # Update treated
425 self.treated = self.v_change_vqip(
426 self.treated, self.treated["volume"] - reply_vol
427 )
428 return reply
430 def pull_check_reuse(self, vqip=None):
431 """Pull check available water. Simply returns the previous timestep's treated
432 throughput. This is of course inaccurate (which may lead to slightly longer
433 calulcations), but it is much more flexible. This hasn't been recently tested so
434 it might be that returning treated would be fine (and more accurate!).
436 Args:
437 vqip (dict, optional): A VQIP that can be used to limit the volume in
438 the return value (only volume key is used). Defaults to None.
440 Returns:
441 (dict): A VQIP amount of water available. Currently just the previous
442 timestep's treated throughput
443 """
444 # Respond to request of water for reuse/MRF
445 return self.copy_vqip(self.treated)
447 def end_timestep_(self):
448 """End timestep function to update state variables."""
449 self.liquor_ = self.copy_vqip(self.liquor)
450 self.previous_input = self.copy_vqip(self.current_input)
451 self.current_input = self.empty_vqip()
452 self.solids = self.empty_vqip()
453 self.stormwater_tank.end_timestep()
456class FWTW(WTW):
457 """"""
459 def __init__(
460 self,
461 service_reservoir_storage_capacity=10,
462 service_reservoir_storage_area=1,
463 service_reservoir_storage_elevation=10,
464 service_reservoir_initial_storage=0,
465 data_input_dict={},
466 **kwargs,
467 ):
468 """A freshwater treatment works wrapper for WTW. Contains service reservoirs
469 that treated water is released to and pulled from. Cannot allow deficit (thus
470 any deficit is satisfied by water entering the model 'via other means'). Liquor
471 and solids are sent to sewers.
473 Args:
474 service_reservoir_storage_capacity (float, optional): Capacity of service
475 reservoirs. Defaults to 10.
476 service_reservoir_storage_area (float, optional): Area of service
477 reservoirs. Defaults to 1.
478 service_reservoir_storage_elevation (float, optional): Datum of service
479 reservoirs. Defaults to 10.
480 service_reservoir_initial_storage (float or dict, optional): initial
481 storage of service reservoirs (see nodes.py/Tank for details).
482 Defaults to 0.
483 data_input_dict (dict, optional): Dictionary of data inputs relevant for
484 the node (though I don't think it is used). Defaults to {}.
486 Functions intended to call in orchestration:
487 treat_water
489 Key assumptions:
490 - See `wtw.py/WTW` for treatment.
491 - Stores treated water in a service reservoir tank, with a single tank
492 per `FWTW` node.
493 - Aims to satisfy a throughput that would top up the service reservoirs
494 until full.
495 - Currently, will not allow a deficit, thus introducing water from
496 'other measures' if pulls cannot fulfil demand. Behaviour under a
497 deficit should be determined and validated before introducing.
499 Input data and parameter requirements:
500 - See `wtw.py/WTW` for treatment.
501 - Service reservoir tank `capacity`, `area`, and `datum`.
502 _Units_: cubic metres, squared metres, metres
503 """
504 # Default parameters
505 self.service_reservoir_storage_capacity = service_reservoir_storage_capacity
506 self.service_reservoir_storage_area = service_reservoir_storage_area
507 self.service_reservoir_storage_elevation = service_reservoir_storage_elevation
508 self.service_reservoir_initial_storage = service_reservoir_initial_storage
509 # TODO don't think data_input_dict is used
510 self.data_input_dict = data_input_dict
512 # Update args
513 super().__init__(**kwargs)
514 self.end_timestep = self.end_timestep_
516 # Update handlers
517 self.pull_set_handler["default"] = self.pull_set_fwtw
518 self.pull_check_handler["default"] = self.pull_check_fwtw
520 self.push_set_handler["default"] = self.push_set_deny
521 self.push_check_handler["default"] = self.push_check_deny
523 # Initialise parameters
524 self.total_deficit = self.empty_vqip()
525 self.total_pulled = self.empty_vqip()
526 self.previous_pulled = self.empty_vqip()
527 self.unpushed_sludge = self.empty_vqip()
529 # Create tanks
530 self.service_reservoir_tank = Tank(
531 capacity=self.service_reservoir_storage_capacity,
532 area=self.service_reservoir_storage_area,
533 datum=self.service_reservoir_storage_elevation,
534 initial_storage=self.service_reservoir_initial_storage,
535 )
536 # self.service_reservoir_tank.storage['volume'] =
537 # self.service_reservoir_inital_storage
538 # self.service_reservoir_tank.storage_['volume'] =
539 # self.service_reservoir_inital_storage
541 # Mass balance
542 self.mass_balance_in.append(lambda: self.total_deficit)
543 self.mass_balance_ds.append(lambda: self.service_reservoir_tank.ds())
544 self.mass_balance_out.append(lambda: self.unpushed_sludge)
546 def apply_overrides(self, overrides=Dict[str, Any]):
547 """Apply overrides to the service reservoir tank and FWTW.
549 Enables a user to override any parameter of the service reservoir tank, and
550 then calls any overrides in WTW.
552 Args:
553 overrides (Dict[str, Any]): Dict describing which parameters should
554 be overridden (keys) and new values (values). Defaults to {}.
555 """
556 self.service_reservoir_storage_capacity = overrides.pop(
557 "service_reservoir_storage_capacity",
558 self.service_reservoir_storage_capacity,
559 )
560 self.service_reservoir_storage_area = overrides.pop(
561 "service_reservoir_storage_area", self.service_reservoir_storage_area
562 )
563 self.service_reservoir_storage_elevation = overrides.pop(
564 "service_reservoir_storage_elevation",
565 self.service_reservoir_storage_elevation,
566 )
568 self.service_reservoir_tank.capacity = self.service_reservoir_storage_capacity
569 self.service_reservoir_tank.area = self.service_reservoir_storage_area
570 self.service_reservoir_tank.datum = self.service_reservoir_storage_elevation
571 super().apply_overrides(overrides)
573 def treat_water(self):
574 """Pulls water, aiming to fill service reservoirs, calls WTW
575 treat_current_input, avoids deficit, sends liquor and solids to sewers."""
576 # Calculate how much water is needed
577 target_throughput = self.service_reservoir_tank.get_excess()
578 target_throughput = min(
579 target_throughput["volume"], self.treatment_throughput_capacity
580 )
582 # Pull water
583 throughput = self.pull_distributed({"volume": target_throughput})
585 # Calculate deficit (assume is equal to difference between previous treated
586 # throughput and current throughput)
587 # TODO think about this a bit more
588 deficit = max(target_throughput - throughput["volume"], 0)
589 # deficit = max(self.previous_pulled['volume'] - throughput['volume'], 0)
590 deficit = self.v_change_vqip(self.previous_pulled, deficit)
592 # Introduce deficit
593 self.current_input = self.sum_vqip(throughput, deficit)
595 # Track deficit
596 self.total_deficit = self.sum_vqip(self.total_deficit, deficit)
598 if self.total_deficit["volume"] > constants.FLOAT_ACCURACY:
599 print(
600 "Service reservoirs not filled at {0} on {1}".format(self.name, self.t)
601 )
603 # Run treatment processes
604 self.treat_current_input()
606 # Discharge liquor and solids to sewers
607 push_back = self.sum_vqip(self.liquor, self.solids)
608 rejected = self.push_distributed(push_back, of_type="Sewer")
609 self.unpushed_sludge = self.sum_vqip(self.unpushed_sludge, rejected)
610 if rejected["volume"] > constants.FLOAT_ACCURACY:
611 print("nowhere for sludge to go")
613 # Send water to service reservoirs
614 excess = self.service_reservoir_tank.push_storage(self.treated)
615 _ = self.service_reservoir_tank.push_storage(excess, force=True)
616 if excess["volume"] > 0:
617 print("excess treated water")
619 def pull_check_fwtw(self, vqip=None):
620 """Pull checks query service reservoirs.
622 Args:
623 vqip (dict, optional): A VQIP that can be used to limit the volume in
624 the return value (only volume key is used). Defaults to None.
626 Returns:
627 (dict): A VQIP of availability in service reservoirs
628 """
629 return self.service_reservoir_tank.get_avail(vqip)
631 def pull_set_fwtw(self, vqip):
632 """Pull treated water from service reservoirs.
634 Args:
635 vqip (dict): a VQIP amount to pull
637 Returns:
638 pulled (dict): A VQIP amount that was successfully pulled
639 """
640 # Pull
641 pulled = self.service_reservoir_tank.pull_storage(vqip)
642 # Update total_pulled this timestep
643 self.total_pulled = self.sum_vqip(self.total_pulled, pulled)
644 return pulled
646 def end_timestep_(self):
647 """Update state variables."""
648 self.service_reservoir_tank.end_timestep()
649 self.total_deficit = self.empty_vqip()
650 self.previous_pulled = self.copy_vqip(self.total_pulled)
651 self.total_pulled = self.empty_vqip()
652 self.treated = self.empty_vqip()
653 self.unpushed_sludge = self.empty_vqip()
655 def reinit(self):
656 """Call tank reinit."""
657 self.service_reservoir_tank.reinit()