Coverage for wsimod\nodes\storage.py: 22%

371 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-30 14:52 +0000

1# -*- coding: utf-8 -*- 

2"""Created on Mon Nov 15 14:20:36 2021. 

3 

4@author: bdobson Converted to totals on 2022-05-03 

5""" 

6import warnings 

7from math import exp 

8from typing import Any, Dict 

9 

10from wsimod.core import constants 

11from wsimod.nodes.nodes import Node 

12from wsimod.nodes.tanks import DecayQueueTank, DecayTank, QueueTank, Tank 

13 

14 

15class Storage(Node): 

16 """""" 

17 

18 def __init__( 

19 self, 

20 name, 

21 capacity=0, 

22 area=0, 

23 datum=0, 

24 decays=None, 

25 initial_storage=0, 

26 **kwargs, 

27 ): 

28 """A Node wrapper for a Tank or DecayTank. 

29 

30 Args: 

31 name (str): node name capacity (float, optional): Tank capacity (see 

32 nodes.py/Tank). Defaults to 0. area (float, optional): Tank area (see 

33 nodes.py/Tank). Defaults to 0. datum (float, optional): Tank datum (see 

34 nodes.py/Tank). Defaults to 0. decays (dict, optional): Tank decays if 

35 needed, (see nodes.py/DecayTank). Defaults to None. initial_storage (float 

36 or dict, optional): Initial storage (see nodes.py/Tank). Defaults to 0. 

37 

38 Functions intended to call in orchestration: 

39 distribute (optional, depends on subclass) 

40 """ 

41 # Set parameters 

42 self.initial_storage = initial_storage 

43 self.capacity = capacity 

44 self.area = area 

45 self.datum = datum 

46 self.decays = decays 

47 super().__init__(name, **kwargs) 

48 

49 # Create tank 

50 if "initial_storage" not in dir(self): 

51 self.initial_storage = self.empty_vqip() 

52 

53 if self.decays is None: 

54 self.tank = Tank( 

55 capacity=self.capacity, 

56 area=self.area, 

57 datum=self.datum, 

58 initial_storage=self.initial_storage, 

59 ) 

60 else: 

61 self.tank = DecayTank( 

62 capacity=self.capacity, 

63 area=self.area, 

64 datum=self.datum, 

65 initial_storage=self.initial_storage, 

66 decays=self.decays, 

67 parent=self, 

68 ) 

69 # Update handlers 

70 self.push_set_handler["default"] = self.push_set_storage 

71 self.push_check_handler["default"] = self.tank.get_excess 

72 self.pull_set_handler["default"] = lambda vol: self.tank.pull_storage(vol) 

73 self.pull_check_handler["default"] = self.tank.get_avail 

74 

75 # Mass balance 

76 self.mass_balance_ds.append(lambda: self.tank.ds()) 

77 

78 def apply_overrides(self, overrides=Dict[str, Any]): 

79 """Override parameters. 

80 

81 Enables a user to override any of the following parameters: 

82 capacity, area, datum, decays. 

83 

84 Args: 

85 overrides (Dict[str, Any]): Dict describing which parameters should 

86 be overridden (keys) and new values (values). Defaults to {}. 

87 """ 

88 # not using pop as these items need to stay 

89 # in the overrides to be fed into the tank overrides 

90 if "capacity" in overrides.keys(): 

91 self.capacity = overrides["capacity"] 

92 if "area" in overrides.keys(): 

93 self.area = overrides["area"] 

94 if "datum" in overrides.keys(): 

95 self.datum = overrides["datum"] 

96 if "decays" in overrides.keys(): 

97 if self.decays is None: 

98 raise ValueError("Attempting to override decays on a node initialised without decays") 

99 self.decays.update(overrides["decays"]) 

100 # apply tank overrides 

101 self.tank.apply_overrides(overrides) 

102 super().apply_overrides(overrides) 

103 

104 def push_set_storage(self, vqip): 

105 """A node wrapper for the tank push_storage. 

106 

107 Args: 

108 vqip (dict): A VQIP amount to push to the tank 

109 

110 Returns: 

111 reply (dict): A VQIP amount that was not successfully pushed 

112 """ 

113 # Update tank 

114 reply = self.tank.push_storage(vqip) 

115 

116 return reply 

117 

118 def distribute(self): 

119 """Optional function that discharges all tank storage with push_distributed.""" 

120 # Distribute any active storage 

121 storage = self.tank.pull_storage(self.tank.get_avail()) 

122 retained = self.push_distributed(storage) 

123 _ = self.tank.push_storage(retained, force=True) 

124 if retained["volume"] > constants.FLOAT_ACCURACY: 

125 print("Storage unable to push") 

126 

127 def get_percent(self): 

128 """Function that returns the volume in the storage tank expressed as a percent 

129 of capacity.""" 

130 return self.tank.storage["volume"] / self.tank.capacity 

131 

132 def end_timestep(self): 

133 """Update tank states.""" 

134 self.tank.end_timestep() 

135 

136 def reinit(self): 

137 """Call tank reinit.""" 

138 # TODO Automate this better 

139 self.tank.reinit() 

140 self.tank.storage["volume"] = self.initial_storage 

141 self.tank.storage_["volume"] = self.initial_storage 

142 

143 

144class Groundwater(Storage): 

145 """""" 

146 

147 def __init__( 

148 self, 

149 residence_time=200, 

150 infiltration_threshold=1, 

151 infiltration_pct=0, 

152 data_input_dict={}, 

153 **kwargs, 

154 ): 

155 # TODO why isn't this using a ResidenceTank? 

156 """A storage with a residence time for groundwater. Can also infiltrate to 

157 sewers. 

158 

159 Args: 

160 residence_time (float, optional): Residence time (see nodes.py/ 

161 ResidenceTank). Defaults to 200. 

162 infiltration_threshold (float, optional): Proportion of storage capacity 

163 that must be exceeded to generate infiltration. Defaults to 1. 

164 infiltration_pct (float, optional): Proportion of storage above the 

165 threshold that is square rooted and infiltrated. Defaults to 0. 

166 data_input_dict (dict, optional): Dictionary of data inputs relevant for 

167 the node (though I don't think it is used). Defaults to {}. 

168 

169 Functions intended to call in orchestration: 

170 infiltrate (before sewers are discharged) 

171 

172 distribute 

173 

174 Key assumptions: 

175 - Conceptualises groundwater as a tank. 

176 - Baseflow is generated following a residence-time method. 

177 - Baseflow is sent to `storage.py/River`, `nodes.py/Node` or 

178 `waste.py/Waste` nodes. 

179 - Infiltration to `sewer.py/Sewer` nodes occurs when the storage 

180 in the tank is greater than a specified threshold, at a rate 

181 proportional to the sqrt of volume above the threshold. (Note, this 

182 behaviour is __not validated__ and a high uncertainty process in 

183 general) 

184 - If `decays` are provided to model water quality transformations, 

185 see `core.py/DecayObj`. 

186 

187 Input data and parameter requirements: 

188 - Groundwater tank `capacity`, `area`, and `datum`. 

189 _Units_: cubic metres, squared metres, metres 

190 - Infiltration behaviour determined by an `infiltration_threshold` 

191 and `infiltration_pct`. _Units_: proportion of capacity 

192 - Optional dictionary of decays with pollutants as keys and decay 

193 parameters (a constant and a temperature sensitivity exponent) as 

194 values. _Units_: - 

195 """ 

196 self.residence_time = residence_time 

197 self.infiltration_threshold = infiltration_threshold 

198 self.infiltration_pct = infiltration_pct 

199 # TODO not used data_input 

200 self.data_input_dict = data_input_dict 

201 super().__init__(**kwargs) 

202 

203 def apply_overrides(self, overrides=Dict[str, Any]): 

204 """Override parameters. 

205 

206 Enables a user to override any of the following parameters: 

207 residence_time, infiltration_threshold, infiltration_pct. 

208 

209 Args: 

210 overrides (Dict[str, Any]): Dict describing which parameters should 

211 be overridden (keys) and new values (values). Defaults to {}. 

212 """ 

213 self.residence_time = overrides.pop("residence_time", self.residence_time) 

214 self.infiltration_threshold = overrides.pop( 

215 "infiltration_threshold", self.infiltration_threshold 

216 ) 

217 self.infiltration_pct = overrides.pop("infiltration_pct", self.infiltration_pct) 

218 super().apply_overrides(overrides) 

219 

220 def distribute(self): 

221 """Calculate outflow with residence time and send to Nodes or Rivers.""" 

222 avail = self.tank.get_avail()["volume"] / self.residence_time 

223 to_send = self.tank.pull_storage({"volume": avail}) 

224 retained = self.push_distributed(to_send, of_type=["Node", "River", "Waste"]) 

225 _ = self.tank.push_storage(retained, force=True) 

226 if retained["volume"] > constants.FLOAT_ACCURACY: 

227 print("Storage unable to push") 

228 

229 def infiltrate(self): 

230 """Calculate amount of water available for infiltration and send to sewers.""" 

231 # Calculate infiltration 

232 avail = self.tank.get_avail()["volume"] 

233 avail = max(avail - self.tank.capacity * self.infiltration_threshold, 0) 

234 avail = (avail * self.infiltration_pct) ** 0.5 

235 

236 # Push to sewers 

237 to_send = self.tank.pull_storage({"volume": avail}) 

238 retained = self.push_distributed(to_send, of_type="Sewer") 

239 _ = self.tank.push_storage(retained, force=True) 

240 # Any not sent is left in tank 

241 if retained["volume"] > constants.FLOAT_ACCURACY: 

242 # print('unable to infiltrate') 

243 pass 

244 

245 

246class QueueGroundwater(Storage): 

247 """""" 

248 

249 # TODO - no infiltration as yet 

250 def __init__(self, timearea={0: 1}, data_input_dict={}, **kwargs): 

251 """Alternate formulation of Groundwater that uses a timearea property to enable 

252 more nonlinear time behaviour of baseflow routing. Uses the QueueTank or 

253 DecayQueueTank (see nodes.py/Tank subclassses). 

254 

255 NOTE: abstraction behaviour from this kind of node need careful checking 

256 

257 Args: 

258 timearea (dict, optional): Time area diagram that enables flows to 

259 take a range of different durations to 'traverse' the tank. The keys of 

260 the dict are the number of timesteps while the values are the proportion 

261 of flow. E.g., {0 : 0.7, 1 : 0.3} means 70% of flow takes 0 timesteps 

262 and 30% takes 1 timesteps. Defaults to {0 : 1}. 

263 data_input_dict (dict, optional): Dictionary of data inputs relevant for 

264 the node (though I don't think it is used). Defaults to {}. 

265 

266 Functions intended to call in orchestration: 

267 distribute 

268 

269 Key assumptions: 

270 - Conceptualises groundwater as a tank. 

271 - Baseflow is generated following a timearea method. 

272 - Baseflow is sent to `storage.py/River`, `nodes.py/Node` or 

273 `waste.py/Waste` nodes. 

274 - No infiltration to sewers is modelled. 

275 - If `decays` are provided to model water quality transformations, 

276 see `core.py/DecayObj`. 

277 

278 Input data and parameter requirements: 

279 - Groundwater tank `capacity`, `area`, and `datum`. 

280 _Units_: cubic metres, squared metres, metres 

281 - `timearea` is a dictionary containing the timearea diagram. 

282 _Units_: duration of flow (in timesteps) and proportion of flow 

283 - Optional dictionary of decays with pollutants as keys and decay 

284 parameters (a constant and a temperature sensitivity exponent) as 

285 values. _Units_: - 

286 """ 

287 self.timearea = timearea 

288 # TODO not used 

289 self.data_input_dict = data_input_dict 

290 super().__init__(**kwargs) 

291 # Label as Groundwater class so that other nodes treat it the same 

292 self.__class__.__name__ = "Groundwater" 

293 # Update handlers 

294 self.push_set_handler["default"] = self.push_set_timearea 

295 self.pull_set_handler["default"] = self.pull_set_active 

296 self.pull_check_handler["default"] = self.pull_check_active 

297 # Enable decay 

298 if self.decays is None: 

299 self.tank = QueueTank( 

300 capacity=self.capacity, 

301 area=self.area, 

302 datum=self.datum, 

303 initial_storage=self.initial_storage, 

304 ) 

305 else: 

306 self.tank = DecayQueueTank( 

307 capacity=self.capacity, 

308 area=self.area, 

309 datum=self.datum, 

310 decays=self.decays, 

311 parent=self, 

312 initial_storage=self.initial_storage, 

313 ) 

314 

315 def apply_overrides(self, overrides=Dict[str, Any]): 

316 """Override parameters. 

317 

318 Enables a user to override any of the following parameters: 

319 timearea. 

320 

321 Args: 

322 overrides (Dict[str, Any]): Dict describing which parameters should 

323 be overridden (keys) and new values (values). Defaults to {}. 

324 """ 

325 self.timearea = overrides.pop("timearea", self.timearea) 

326 super().apply_overrides(overrides) 

327 

328 def push_set_timearea(self, vqip): 

329 """Push setting that enables timearea behaviour, (see __init__ for 

330 description).Used to receive flow that is assumed to occur widely across some 

331 kind of catchment. 

332 

333 Args: 

334 vqip (dict): A VQIP that has been pushed 

335 

336 Returns: 

337 reply (dict): A VQIP amount that was not successfuly receivesd 

338 """ 

339 reply = self.empty_vqip() 

340 # Iterate over timearea diagram TODO timearea diagram behaviour be generalised 

341 # across nodes 

342 for time, normalised in self.timearea.items(): 

343 vqip_ = self.v_change_vqip(vqip, vqip["volume"] * normalised) 

344 reply_ = self.tank.push_storage(vqip_, time=time) 

345 reply = self.sum_vqip(reply, reply_) 

346 return reply 

347 

348 def distribute(self): 

349 """Update internal arc, push active_storage onwards, update tank.""" 

350 _ = self.tank.internal_arc.update_queue(direction="push") 

351 

352 remaining = self.push_distributed(self.tank.active_storage) 

353 

354 if remaining["volume"] > constants.FLOAT_ACCURACY: 

355 print("Groundwater couldnt push all") 

356 

357 # Update tank 

358 sent = self.tank.active_storage["volume"] - remaining["volume"] 

359 sent = self.v_change_vqip(self.tank.active_storage, sent) 

360 reply = self.tank.pull_storage(sent) 

361 if (reply["volume"] - sent["volume"]) > constants.FLOAT_ACCURACY: 

362 print("Miscalculated tank storage in discharge") 

363 

364 def infiltrate(self): 

365 """""" 

366 pass 

367 

368 def pull_check_active(self, vqip=None): 

369 """A pull check that returns the active storage. 

370 

371 Args: 

372 vqip (dict, optional): A VQIP that can be used to limit the volume in 

373 the return value (only volume key is used). Defaults to None. 

374 

375 Returns: 

376 (dict): A VQIP amount that is available to pull 

377 """ 

378 if vqip is None: 

379 return self.tank.active_storage 

380 else: 

381 reply = min(vqip["volume"], self.tank.active_storage["volume"]) 

382 return self.v_change_vqip(self.tank.active_storage, reply) 

383 

384 def pull_set_active(self, vqip): 

385 # TODO - this is quite weird behaviour, and inconsistent with pull_check_active 

386 """Pull proportionately from both the active storage and the queue. Adjudging 

387 groundwater abstractions to not be particularly sensitive to the within 

388 catchment travel time. 

389 

390 Args: 

391 vqip (dict): A VQIP amount to be pulled (only volume key is used) 

392 

393 Returns: 

394 pulled (dict): A VQIP amount that was successfully pulled 

395 """ 

396 # Calculate actual pull 

397 total_storage = self.tank.storage["volume"] 

398 total_pull = min(self.tank.storage["volume"], vqip["volume"]) 

399 

400 if total_pull < constants.FLOAT_ACCURACY: 

401 return self.empty_vqip() 

402 else: 

403 # Track total pull in pulled 

404 pulled = self.empty_vqip() 

405 # Iterate over queue 

406 if isinstance(self.tank.internal_arc.queue, dict): 

407 for t, v in self.tank.internal_arc.queue.items(): 

408 # Pull proportionately 

409 t_pulled = self.v_change_vqip( 

410 self.tank.internal_arc.queue[t], 

411 v["volume"] * total_pull / total_storage, 

412 ) 

413 # Reduce queue VQIPs 

414 self.tank.internal_arc.queue[t] = self.extract_vqip( 

415 self.tank.internal_arc.queue[t], t_pulled 

416 ) 

417 # Track pull 

418 pulled = self.sum_vqip(pulled, t_pulled) 

419 # Pull also from active storage 

420 a_pulled = self.v_change_vqip( 

421 self.tank.active_storage, 

422 self.tank.active_storage["volume"] * total_pull / total_storage, 

423 ) 

424 self.tank.active_storage = self.extract_vqip( 

425 self.tank.active_storage, a_pulled 

426 ) 

427 pulled = self.sum_vqip(pulled, a_pulled) 

428 

429 # Recalculate storage 

430 self.tank.storage = self.extract_vqip(self.tank.storage, pulled) 

431 return pulled 

432 # elif isinstance(self.tank.internal_arc.queue, list): for req in 

433 # self.tank.internal_arc.queue: t_pulled = req['vqtip']['volume'] * 

434 # total_pull / total_storage req['vqtip'] = 

435 # self.v_change_vqip(req['vqtip'], req['vqtip']['volume'] - 

436 # t_pulled) pulled += t_pulled a_pulled = 

437 # self.tank.active_storage['volume'] * total_pull / total_storage 

438 # self.tank.active_storage = 

439 # self.v_change_vqip(self.tank.active_storage, 

440 # self.tank.active_storage['volume'] - a_pulled) pulled += a_pulled 

441 

442 # #Recalculate storage - doing this differently causes numerical errors 

443 # new_v = sum([x['vqtip']['volume'] for x in 

444 # self.tank.internal_arc.queue])+ self.tank.active_storage['volume'] 

445 # self.tank.storage = self.v_change_vqip(self.tank.storage, new_v) 

446 

447 # return self.v_change_vqip(self.tank.storage, pulled) 

448 

449 

450class River(Storage): 

451 """""" 

452 

453 # TODO non-day timestep 

454 def __init__( 

455 self, 

456 depth=2, 

457 length=200, 

458 width=20, 

459 velocity=0.2 * constants.M_S_TO_M_DT, 

460 damp=0.1, 

461 mrf=0, 

462 **kwargs, 

463 ): 

464 """Node that contains extensive in-river biochemical processes. 

465 

466 Args: 

467 depth (float, optional): River tank depth. Defaults to 2. length (float, 

468 optional): River tank length. Defaults to 200. width (float, optional): 

469 River tank width. Defaults to 20. velocity (float, optional): River velocity 

470 (if someone wants to calculate 

471 this on the fly that would also work). Defaults to 

472 0.2*constants.M_S_TO_M_DT. 

473 damp (float, optional): Flow delay and attentuation parameter. Defaults 

474 to 0.1. 

475 mrf (float, optional): Minimum required flow in river (volume per timestep), 

476 can limit pulls made to the river. Defaults to 0. 

477 

478 Functions intended to call in orchestration: 

479 distribute 

480 

481 Key assumptions: 

482 - River is conceptualised as a water tank that receives flows from various 

483 sources (e.g., runoffs from urban and rural land, baseflow from 

484 groundwater), interacts with water infrastructure (e.g., abstraction for 

485 irrigation and domestic supply, sewage and treated effluent discharge), 

486 and discharges flows downstream. It has length and width as shape 

487 parameters, average velocity to indicate flow speed and capacity to 

488 indicate the maximum storage limit. 

489 - Flows from different sources into rivers will fully mix. River tank is 

490 assumed to 

491 have delay and attenuation effects when generate outflows. These effects 

492 are simulated based on the average velocity. 

493 - In-river biochemical processes are simulated as sources/sinks of 

494 nutrients 

495 in the river tank, including - denitrification (for nitrogen) - 

496 phytoplankton absorption/release (for nitrogen and phosphorus) - 

497 macrophyte uptake (for nitrogen and phosphorus) These processes are 

498 affected by river temperature. 

499 

500 Input data and parameter requirements: 

501 - depth, length, width 

502 _Units_: m 

503 - velocity 

504 _Units_: m/day 

505 - damping coefficient 

506 _Units_: - 

507 - minimum required flow 

508 _Units_: m3/day 

509 """ 

510 # Set parameters 

511 self.depth = depth 

512 if depth != 2: 

513 warnings.warn( 

514 "warning: the depth parameter is unused by River nodes because it is \ 

515 intended for capacity to be unbounded. It may be removed in a future version." 

516 ) 

517 self.length = length # [m] 

518 self.width = width # [m] 

519 self.velocity = velocity # [m/dt] 

520 self.damp = damp # [>=0] flow delay and attenuation 

521 self.mrf = mrf 

522 area = length * width # [m2] 

523 

524 capacity = ( 

525 constants.UNBOUNDED_CAPACITY 

526 ) # TODO might be depth * area if flood indunation is going to be simulated 

527 

528 # Required in cases where 'area' conflicts with length*width 

529 kwargs["area"] = area 

530 # Required in cases where 'capacity' conflicts with depth*area 

531 kwargs["capacity"] = capacity 

532 

533 super().__init__(**kwargs) 

534 

535 # TODO check units TODO Will a user want to change any of these? Wide variety of 

536 # river parameters (from HYPE) 

537 self.uptake_PNratio = 1 / 7.2 # [-] P:N during crop uptake 

538 self.bulk_density = 1300 # [kg/m3] soil density 

539 self.denpar_w = 0.0015 # 0.001, # [kg/m2/day] reference denitrification rate 

540 # in water course 

541 self.T_wdays = 5 # [days] weighting constant for river temperature calculation 

542 # (similar to moving average period) 

543 self.halfsatINwater = ( 

544 1.5 * constants.MG_L_TO_KG_M3 

545 ) # [kg/m3] half saturation parameter for denitrification in river 

546 self.T_10_days = [] # [degree C] average water temperature of 10 days 

547 self.T_20_days = [] # [degree C] average water temperature of 20 days 

548 self.TP_365_days = [] # [degree C] average water temperature of 20 days 

549 self.hsatTP = 0.05 * constants.MG_L_TO_KG_M3 # [kg/m3] 

550 self.limpppar = 0.1 * constants.MG_L_TO_KG_M3 # [kg/m3] 

551 self.prodNpar = 0.001 # [kg N/m3/day] nitrogen production/mineralisation rate 

552 self.prodPpar = ( 

553 0.0001 # [kg N/m3/day] phosphorus production/mineralisation rate 

554 ) 

555 self.muptNpar = 0.001 # [kg/m2/day] nitrogen macrophyte uptake rate 

556 self.muptPpar = 0.0001 # 0.01, # [kg/m2/day] phosphorus macrophyte uptake rate 

557 

558 self.max_temp_lag = 20 

559 self.lagged_temperatures = [] 

560 

561 self.max_phosphorus_lag = 365 

562 self.lagged_total_phosphorus = [] 

563 

564 self.din_components = ["ammonia", "nitrate"] 

565 # TODO need a cleaner way to do this depending on whether e.g., nitrite is 

566 # included 

567 

568 # Initialise paramters 

569 self.current_depth = 0 # [m] 

570 # self.river_temperature = 0 # [degree C] self.river_denitrification = 0 # 

571 # [kg/day] self.macrophyte_uptake_N = 0 # [kg/day] self.macrophyte_uptake_P = 0 

572 # # [kg/day] self.sediment_particulate_phosphorus_pool = 60000 # [kg] 

573 # self.sediment_pool = 1000000 # [kg] self.benthos_source_sink = 0 # [kg/day] 

574 # self.t_res = 0 # [day] self.outflow = self.empty_vqip() 

575 

576 # Update end_teimstep 

577 self.end_timestep = self.end_timestep_ 

578 

579 # Update handlers 

580 self.push_set_handler["default"] = self.push_set_river 

581 self.push_check_handler["default"] = self.push_check_accept 

582 

583 self.pull_check_handler["default"] = self.pull_check_river 

584 self.pull_set_handler["default"] = self.pull_set_river 

585 

586 # TODO - RiparianBuffer 

587 self.pull_check_handler[("RiparianBuffer", "volume")] = self.pull_check_fp 

588 

589 # Update mass balance 

590 self.bio_in = self.empty_vqip() 

591 self.bio_out = self.empty_vqip() 

592 

593 self.mass_balance_in.append(lambda: self.bio_in) 

594 self.mass_balance_out.append(lambda: self.bio_out) 

595 

596 # TODO something like this might be needed if you want sewers backing up from river 

597 # height... would need to incorporate expected river outflow def get_dt_excess(self, 

598 # vqip = None): reply = self.empty_vqip() reply['volume'] = 

599 # self.tank.get_excess()['volume'] + self.tank.get_avail()['volume'] * 

600 # self.get_riverrc() if vqip is not None: reply['volume'] = min(vqip['volume'], 

601 # reply['volume']) return reply 

602 

603 # def push_set_river(self, vqip): vqip_ = vqip.copy() vqip_ = 

604 # self.v_change_vqip(vqip_, min(vqip_['volume'], 

605 # self.get_dt_excess()['volume'])) _ = self.tank.push_storage(vqip_, force=True) 

606 # return self.extract_vqip(vqip, vqip_) 

607 

608 def apply_overrides(self, overrides=Dict[str, Any]): 

609 """Override parameters. 

610 

611 Enables a user to override any of the following parameters: 

612 timearea. 

613 

614 Args: 

615 overrides (Dict[str, Any]): Dict describing which parameters should 

616 be overridden (keys) and new values (values). Defaults to {}. 

617 """ 

618 overwrite_params = set( 

619 [ 

620 "length", 

621 "width", 

622 "velocity", 

623 "damp", 

624 "mrf", 

625 "uptake_PNratio", 

626 "bulk_density", 

627 "denpar_w", 

628 "T_wdays", 

629 "halfsatINwater", 

630 "hsatTP", 

631 "limpppar", 

632 "prodNpar", 

633 "prodPpar", 

634 "muptNpar", 

635 "muptPpar", 

636 "max_temp_lag", 

637 "max_phosphorus_lag", 

638 ] 

639 ) 

640 

641 for param in overwrite_params.intersection(overrides.keys()): 

642 setattr(self, param, overrides.pop(param)) 

643 

644 if "area" in overrides.keys(): 

645 warnings.warn( 

646 "WARNING: specifying area is depreciated in overrides \ 

647 for river, please specify length and width instead" 

648 ) 

649 overrides["area"] = self.length * self.width 

650 if "capacity" in overrides.keys(): 

651 warnings.warn( 

652 "ERROR: specifying capacity is depreciated in overrides \ 

653 for river, it is always set as unbounded capacity" 

654 ) 

655 overrides["capacity"] = constants.UNBOUNDED_CAPACITY 

656 super().apply_overrides(overrides) 

657 

658 def pull_check_river(self, vqip=None): 

659 """Check amount of water that can be pulled from river tank and upstream. 

660 

661 Args: 

662 vqip (dict, optional): Maximum water required (only 'volume' is used) 

663 

664 Returns: 

665 avail (dict): A VQIP amount that can be pulled 

666 """ 

667 # Get storage 

668 avail = self.tank.get_avail() 

669 

670 # Check incoming 

671 upstream = self.get_connected(direction="pull", of_type=["River", "Node"]) 

672 avail["volume"] += upstream["avail"] 

673 

674 # convert mrf from volume/timestep to discrete value 

675 mrf = self.mrf / self.get_riverrc() 

676 

677 # Apply mrf 

678 avail_vol = max(avail["volume"] - mrf, 0) 

679 if vqip is None: 

680 avail = self.v_change_vqip(avail, avail_vol) 

681 else: 

682 avail = self.v_change_vqip(avail, min(avail_vol, vqip["volume"])) 

683 

684 return avail 

685 

686 def pull_set_river(self, vqip): 

687 """Pull from river tank and upstream, acknowledging MRF with pull_check. 

688 

689 Args: 

690 vqip (dict): A VQIP amount to pull (only volume key used) 

691 

692 Returns: 

693 (dict): A VQIP amount that was pulled 

694 """ 

695 # Calculate available pull 

696 avail = self.pull_check_river(vqip) 

697 

698 # Take first from tank 

699 pulled = self.tank.pull_storage(avail) 

700 

701 # Take remaining from upstream 

702 to_pull = {"volume": avail["volume"] - pulled["volume"]} 

703 pulled_ = self.pull_distributed(to_pull, of_type=["River", "Node"]) 

704 

705 reply = self.sum_vqip(pulled, pulled_) 

706 

707 return reply 

708 

709 def push_set_river(self, vqip): 

710 """Push to river tank, currently forced. 

711 

712 Args: 

713 vqip (dict): A VQIP amount to push 

714 

715 Returns: 

716 (dict): A VQIP amount that was not successfully received 

717 """ 

718 _ = self.tank.push_storage(vqip, force=True) 

719 return self.empty_vqip() 

720 

721 def update_depth(self): 

722 """Recalculate depth.""" 

723 self.current_depth = self.tank.storage["volume"] / self.area 

724 

725 def get_din_pool(self): 

726 """Get total dissolved inorganic nitrogen from tank storage. 

727 

728 Returns: 

729 (float): total din 

730 """ 

731 return sum( 

732 [self.tank.storage[x] for x in self.din_components] 

733 ) # TODO + self.tank.storage['nitrite'] but nitrite might not be modelled... 

734 # need some ways to address this 

735 

736 def biochemical_processes(self): 

737 """Runs all biochemical processes and updates pollutant amounts. 

738 

739 Returns: 

740 in_ (dict): A VQIP amount that represents total gain in pollutant amounts 

741 out_ (dict): A VQIP amount that represents total loss in pollutant amounts 

742 """ 

743 # TODO make more modular 

744 self.update_depth() 

745 

746 self.tank.storage["temperature"] = (1 - 1 / self.T_wdays) * self.tank.storage[ 

747 "temperature" 

748 ] + (1 / self.T_wdays) * self.get_data_input("temperature") 

749 

750 # Update lagged temperatures 

751 if len(self.lagged_temperatures) > self.max_temp_lag: 

752 del self.lagged_temperatures[0] 

753 self.lagged_temperatures.append(self.tank.storage["temperature"]) 

754 

755 # Update lagged total phosphorus 

756 if len(self.lagged_total_phosphorus) > self.max_phosphorus_lag: 

757 del self.lagged_total_phosphorus[0] 

758 total_phosphorus = ( 

759 self.tank.storage["phosphate"] + self.tank.storage["org-phosphorus"] 

760 ) 

761 self.lagged_total_phosphorus.append(total_phosphorus) 

762 

763 # Check if any water 

764 if self.tank.storage["volume"] < constants.FLOAT_ACCURACY: 

765 # Assume these only do something when there is water 

766 return (self.empty_vqip(), self.empty_vqip()) 

767 

768 if self.tank.storage["temperature"] <= 0: 

769 # Seems that these things are only active when above freezing 

770 return (self.empty_vqip(), self.empty_vqip()) 

771 

772 # Denitrification 

773 tempfcn = 2 ** ((self.tank.storage["temperature"] - 20) / 10) 

774 if self.tank.storage["temperature"] < 5: 

775 tempfcn *= self.tank.storage["temperature"] / 5 

776 

777 din = self.get_din_pool() 

778 din_concentration = din / self.tank.storage["volume"] 

779 confcn = din_concentration / ( 

780 din_concentration + self.halfsatINwater 

781 ) # [kg/m3] 

782 denitri_water = ( 

783 self.denpar_w * self.area * tempfcn * confcn 

784 ) # [kg/day] #TODO convert to per DT 

785 

786 river_denitrification = min( 

787 denitri_water, 0.5 * din 

788 ) # [kg/day] max 50% kan be denitrified 

789 din_ = din - river_denitrification # [kg] 

790 

791 # Update mass balance 

792 in_ = self.empty_vqip() 

793 out_ = self.empty_vqip() 

794 if din > 0: 

795 for pol in self.din_components: 

796 # denitrification 

797 loss = (din - din_) / din * self.tank.storage[pol] 

798 out_[pol] += loss 

799 self.tank.storage[pol] -= loss 

800 

801 din = self.get_din_pool() 

802 

803 # Calculate moving averages TODO generalise 

804 temp_10_day = sum(self.lagged_temperatures[-10:]) / 10 

805 temp_20_day = sum(self.lagged_temperatures[-20:]) / 20 

806 total_phos_365_day = sum(self.lagged_total_phosphorus) / self.max_phosphorus_lag 

807 

808 # Calculate coefficients 

809 tempfcn = ( 

810 (self.tank.storage["temperature"]) / 20 * (temp_10_day - temp_20_day) / 5 

811 ) 

812 if (total_phos_365_day - self.limpppar + self.hsatTP) > 0: 

813 totalphosfcn = (total_phos_365_day - self.limpppar) / ( 

814 total_phos_365_day - self.limpppar + self.hsatTP 

815 ) 

816 else: 

817 totalphosfcn = 0 

818 

819 # Mineralisation/production TODO this feels like it could be much tidier 

820 minprodN = ( 

821 self.prodNpar * totalphosfcn * tempfcn * self.area * self.current_depth 

822 ) # [kg N/day] 

823 minprodP = ( 

824 self.prodPpar 

825 * totalphosfcn 

826 * tempfcn 

827 * self.area 

828 * self.current_depth 

829 * self.uptake_PNratio 

830 ) # [kg N/day] 

831 if minprodN > 0: 

832 # production (inorg -> org) 

833 minprodN = min( 

834 0.5 * din, minprodN 

835 ) # only half pool can be used for production 

836 minprodP = min( 

837 0.5 * self.tank.storage["phosphate"], minprodP 

838 ) # only half pool can be used for production 

839 

840 # Update mass balance 

841 out_["phosphate"] = minprodP 

842 self.tank.storage["phosphate"] -= minprodP 

843 in_["org-phosphorus"] = minprodP 

844 self.tank.storage["org-phosphorus"] += minprodP 

845 if din > 0: 

846 for pol in self.din_components: 

847 loss = minprodN * self.tank.storage[pol] / din 

848 out_[pol] += loss 

849 self.tank.storage[pol] -= loss 

850 

851 in_["org-nitrogen"] = minprodN 

852 self.tank.storage["org-nitrogen"] += minprodN 

853 

854 else: 

855 # mineralisation (org -> inorg) 

856 minprodN = min(0.5 * self.tank.storage["org-nitrogen"], -minprodN) 

857 minprodP = min(0.5 * self.tank.storage["org-phosphorus"], -minprodP) 

858 

859 # Update mass balance 

860 in_["phosphate"] = minprodP 

861 self.tank.storage["phosphate"] += minprodP 

862 out_["org-phosphorus"] = minprodP 

863 self.tank.storage["org-phosphorus"] -= minprodP 

864 if din > 0: 

865 for pol in self.din_components: 

866 gain = minprodN * self.tank.storage[pol] / din 

867 in_[pol] += gain 

868 self.tank.storage[pol] += gain 

869 

870 out_["org-nitrogen"] = minprodN 

871 self.tank.storage["org-nitrogen"] -= minprodN 

872 

873 din = self.get_din_pool() 

874 

875 # macrophyte uptake temperature dependence factor 

876 tempfcn1 = (max(0, self.tank.storage["temperature"]) / 20) ** 0.3 

877 tempfcn2 = (self.tank.storage["temperature"] - temp_20_day) / 5 

878 tempfcn = max(0, tempfcn1 * tempfcn2) 

879 

880 macrouptN = self.muptNpar * tempfcn * self.area # [kg/day] 

881 macrophyte_uptake_N = min(0.5 * din, macrouptN) 

882 if din > 0: 

883 for pol in self.din_components: 

884 loss = macrophyte_uptake_N * self.tank.storage[pol] / din 

885 out_[pol] += loss 

886 self.tank.storage[pol] -= loss 

887 

888 macrouptP = ( 

889 self.muptPpar * tempfcn * max(0, totalphosfcn) * self.area 

890 ) # [kg/day] 

891 macrophyte_uptake_P = min(0.5 * self.tank.storage["phosphate"], macrouptP) 

892 out_["phosphate"] += macrophyte_uptake_P 

893 self.tank.storage["phosphate"] -= macrophyte_uptake_P 

894 

895 # TODO source/sink for benthos sediment P suspension/resuspension 

896 return in_, out_ 

897 

898 def get_riverrc(self): 

899 """Get river outflow coefficient (i.e., how much water leaves the tank in this 

900 timestep). 

901 

902 Returns: 

903 riverrc (float): outflow coeffficient 

904 """ 

905 # Calculate travel time 

906 total_time = self.length / self.velocity 

907 # Apply damp 

908 kt = self.damp * total_time # [day] 

909 if kt != 0: 

910 riverrc = 1 - kt + kt * exp(-1 / kt) # [-] 

911 else: 

912 riverrc = 1 

913 return riverrc 

914 

915 def calculate_discharge(self): 

916 """""" 

917 if "nitrate" in constants.POLLUTANTS: 

918 # TODO clumsy Run biochemical processes 

919 in_, out_ = self.biochemical_processes() 

920 # Mass balance 

921 self.bio_in = in_ 

922 self.bio_out = out_ 

923 

924 def distribute(self): 

925 """Run biochemical processes, track mass balance, and distribute water.""" 

926 # self.calculate_discharge() Get outflow 

927 outflow = self.tank.pull_storage( 

928 {"volume": self.tank.storage["volume"] * self.get_riverrc()} 

929 ) 

930 # Distribute outflow 

931 reply = self.push_distributed(outflow, of_type=["River", "Node", "Waste"]) 

932 _ = self.tank.push_storage(reply, force=True) 

933 if reply["volume"] > constants.FLOAT_ACCURACY: 

934 print("river cant push: {0}".format(reply["volume"])) 

935 

936 def pull_check_fp(self, vqip=None): 

937 """ 

938 

939 Args: 

940 vqip: 

941 

942 Returns: 

943 

944 """ 

945 # TODO Pull checking for riparian buffer, needs updating update river depth 

946 self.update_depth() 

947 return self.current_depth, self.area, self.width, self.river_tank.storage 

948 

949 def end_timestep_(self): 

950 """Update state variables.""" 

951 self.tank.end_timestep() 

952 self.bio_in = self.empty_vqip() 

953 self.bio_out = self.empty_vqip() 

954 

955 

956class Reservoir(Storage): 

957 """""" 

958 

959 def __init__(self, **kwargs): 

960 """Storage node that makes abstractions by calling pull_distributed. 

961 

962 Functions intended to call in orchestration: 

963 make_abstractions (before any river routing) 

964 

965 Key assumptions: 

966 - Conceptualised as a `Tank`. 

967 - Recharged only via pumped abstractions. 

968 - Evaporation/precipitation onto surface area currently ignored. 

969 - If `decays` are provided to model water quality transformations, 

970 see `core.py/DecayObj`. 

971 

972 Input data and parameter requirements: 

973 - Tank `capacity`, `area`, and `datum`. 

974 _Units_: cubic metres, squared metres, metres 

975 - Optional dictionary of decays with pollutants as keys and decay 

976 parameters (a constant and a temperature sensitivity exponent) as 

977 values. _Units_: - 

978 """ 

979 super().__init__(**kwargs) 

980 

981 def make_abstractions(self): 

982 """Pulls water and updates tanks.""" 

983 reply = self.pull_distributed(self.tank.get_excess()) 

984 spill = self.tank.push_storage(reply) 

985 _ = self.tank.push_storage(spill, force=True) 

986 if spill["volume"] > constants.FLOAT_ACCURACY: 

987 print("Spill at reservoir by {0}".format(spill["volume"])) 

988 

989 

990class RiverReservoir(Reservoir): 

991 """""" 

992 

993 def __init__(self, environmental_flow=0, **kwargs): 

994 """A reservoir with a natural river inflow, includes an environmental downstream 

995 flow to satisfy. 

996 

997 Args: 

998 environmental_flow (float, optional): Downstream environmental flow. 

999 Defaults to 0. 

1000 

1001 Functions intended to call in orchestration: 

1002 make_abstractions (if any) 

1003 

1004 satisfy_environmental (before river routing.. possibly before 

1005 downstream river abstractions depending on licence) 

1006 

1007 Key assumptions: 

1008 - Conceptualised as a `Tank`. 

1009 - Recharged via pumped abstractions and receives water from 

1010 inflowing arcs. 

1011 - Reservoir aims to satisfy a static `environmental_flow`. 

1012 - If tank capacity is exceeded, reservoir spills downstream 

1013 towards `nodes.py/Node`, `storage.py/River` or `waste.py/Waste` nodes. 

1014 Spill counts towards `environmental_flow`. 

1015 - Evaporation/precipitation onto surface area currently ignored. 

1016 - Currently, if a reservoir satisfies a pull from a downstream 

1017 node, it does __not__ count towards `environmental_flow`. 

1018 - If `decays` are provided to model water quality transformations, 

1019 see `core.py/DecayObj`. 

1020 

1021 Input data and parameter requirements: 

1022 - Tank `capacity`, `area`, and `datum`. 

1023 _Units_: cubic metres, squared metres, metres 

1024 - `environmental_flow` 

1025 _Units_: cubic metres/timestep 

1026 - Optional dictionary of decays with pollutants as keys and decay 

1027 parameters (a constant and a temperature sensitivity exponent) as 

1028 values. _Units_: - 

1029 """ 

1030 # Parameters 

1031 self.environmental_flow = environmental_flow 

1032 super().__init__(**kwargs) 

1033 

1034 # State variables 

1035 self.total_environmental_satisfied = 0 

1036 

1037 self.push_set_handler["default"] = self.push_set_river_reservoir 

1038 self.push_check_handler["default"] = self.push_check_river_reservoir 

1039 self.end_timestep = self.end_timestep_ 

1040 

1041 self.__class__.__name__ = "Reservoir" 

1042 

1043 def apply_overrides(self, overrides=Dict[str, Any]): 

1044 """Override parameters. 

1045 

1046 Enables a user to override any of the following parameters: 

1047 environmental_flow. 

1048 

1049 Args: 

1050 overrides (Dict[str, Any]): Dict describing which parameters should 

1051 be overridden (keys) and new values (values). Defaults to {}. 

1052 """ 

1053 self.environmental_flow = overrides.pop( 

1054 "environmental_flow", self.environmental_flow 

1055 ) 

1056 super().apply_overrides(overrides) 

1057 

1058 def push_set_river_reservoir(self, vqip): 

1059 """Receive water. 

1060 

1061 Args: 

1062 vqip (dict): A VQIP amount to be received 

1063 

1064 Returns: 

1065 reply (dict): A VQIP amount that was not successfully received 

1066 """ 

1067 # Apply normal reservoir storage We do this under the assumption that spill is 

1068 # mixed in with the reservoir If the reservoir can't spill everything you'll get 

1069 # some weird numbers in reply, but if your reservoir can't spill as much as you 

1070 # like then you should probably be pushing the right amount through 

1071 # push_check_river_reservoir Some cunning could probably avoid this by checking 

1072 # vqip, but this is a serious edge case 

1073 _ = self.tank.push_storage(vqip, force=True) 

1074 spill = self.tank.pull_ponded() 

1075 

1076 # Send spill downstream 

1077 reply = self.push_distributed(spill, of_type=["Node", "River", "Waste"]) 

1078 

1079 # Use spill to satisfy downstream flow 

1080 self.total_environmental_satisfied += spill["volume"] - reply["volume"] 

1081 

1082 return reply 

1083 

1084 def push_check_river_reservoir(self, vqip=None): 

1085 """A push check to receive water, assumes spill may occur and checks downstream 

1086 capacity. 

1087 

1088 Args: 

1089 vqip (dict, optional): A VQIP that can be used to limit the volume in 

1090 the return value (only volume key is used). Defaults to None. 

1091 

1092 Returns: 

1093 excess (dict): A VQIP amount of water that cannot be received 

1094 """ 

1095 # Check downstream capacity (i.e., that would be spilled) 

1096 downstream_availability = self.get_connected( 

1097 direction="push", of_type=["Node", "River", "Waste"] 

1098 )["avail"] 

1099 # Check excess capacity in the reservoir 

1100 excess = self.tank.get_excess() 

1101 # Combine excess and downstream in response 

1102 new_v = excess["volume"] + downstream_availability 

1103 if vqip is not None: 

1104 new_v = min(vqip["volume"], new_v) 

1105 # Update to vqip 

1106 excess = self.v_change_vqip(excess, new_v) 

1107 

1108 return excess 

1109 

1110 def satisfy_environmental(self): 

1111 """Satisfy environmental flow downstream.""" 

1112 # Calculate how much environmental flow is yet to satisfy #(some may have been 

1113 # already if pull-and-take abstractions have taken place) 

1114 to_satisfy = max( 

1115 self.environmental_flow - self.total_environmental_satisfied, 0 

1116 ) 

1117 # Pull from tank 

1118 environmental = self.tank.pull_storage({"volume": to_satisfy}) 

1119 # Send downstream 

1120 reply = self.push_distributed(environmental) 

1121 _ = self.tank.push_storage(reply, force=True) 

1122 if reply["volume"] > constants.FLOAT_ACCURACY: 

1123 print("warning: environmental not able to push") 

1124 

1125 # Update satisfaction 

1126 self.total_environmental_satisfied += environmental["volume"] 

1127 

1128 def end_timestep_(self): 

1129 """Udpate state varibles.""" 

1130 self.tank.end_timestep() 

1131 self.total_environmental_satisfied = 0