Coverage for wsimod/nodes/storage.py: 13%

340 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2024-01-11 16:39 +0000

1# -*- coding: utf-8 -*- 

2"""Created on Mon Nov 15 14:20:36 2021. 

3 

4@author: bdobson Converted to totals on 2022-05-03 

5""" 

6from math import exp 

7 

8from wsimod.core import constants 

9from wsimod.nodes.nodes import DecayQueueTank, DecayTank, Node, QueueTank, Tank 

10 

11 

12class Storage(Node): 

13 """""" 

14 

15 def __init__( 

16 self, 

17 name, 

18 capacity=0, 

19 area=0, 

20 datum=0, 

21 decays=None, 

22 initial_storage=0, 

23 **kwargs, 

24 ): 

25 """A Node wrapper for a Tank or DecayTank. 

26 

27 Args: 

28 name (str): node name capacity (float, optional): Tank capacity (see 

29 nodes.py/Tank). Defaults to 0. area (float, optional): Tank area (see 

30 nodes.py/Tank). Defaults to 0. datum (float, optional): Tank datum (see 

31 nodes.py/Tank). Defaults to 0. decays (dict, optional): Tank decays if 

32 needed, (see nodes.py/DecayTank). Defaults to None. initial_storage (float 

33 or dict, optional): Initial storage (see nodes.py/Tank). Defaults to 0. 

34 

35 Functions intended to call in orchestration: 

36 distribute (optional, depends on subclass) 

37 """ 

38 # Set parameters 

39 self.initial_storage = initial_storage 

40 self.capacity = capacity 

41 self.area = area 

42 self.datum = datum 

43 self.decays = decays 

44 super().__init__(name, **kwargs) 

45 

46 # Create tank 

47 if "initial_storage" not in dir(self): 

48 self.initial_storage = self.empty_vqip() 

49 

50 if self.decays is None: 

51 self.tank = Tank( 

52 capacity=self.capacity, 

53 area=self.area, 

54 datum=self.datum, 

55 initial_storage=self.initial_storage, 

56 ) 

57 else: 

58 self.tank = DecayTank( 

59 capacity=self.capacity, 

60 area=self.area, 

61 datum=self.datum, 

62 initial_storage=self.initial_storage, 

63 decays=self.decays, 

64 parent=self, 

65 ) 

66 # Update handlers 

67 self.push_set_handler["default"] = self.push_set_storage 

68 self.push_check_handler["default"] = self.tank.get_excess 

69 self.pull_set_handler["default"] = lambda vol: self.tank.pull_storage(vol) 

70 self.pull_check_handler["default"] = self.tank.get_avail 

71 

72 # Mass balance 

73 self.mass_balance_ds.append(lambda: self.tank.ds()) 

74 

75 def push_set_storage(self, vqip): 

76 """A node wrapper for the tank push_storage. 

77 

78 Args: 

79 vqip (dict): A VQIP amount to push to the tank 

80 

81 Returns: 

82 reply (dict): A VQIP amount that was not successfully pushed 

83 """ 

84 # Update tank 

85 reply = self.tank.push_storage(vqip) 

86 

87 return reply 

88 

89 def distribute(self): 

90 """Optional function that discharges all tank storage with push_distributed.""" 

91 # Distribute any active storage 

92 storage = self.tank.pull_storage(self.tank.get_avail()) 

93 retained = self.push_distributed(storage) 

94 _ = self.tank.push_storage(retained, force=True) 

95 if retained["volume"] > constants.FLOAT_ACCURACY: 

96 print("Storage unable to push") 

97 

98 def get_percent(self): 

99 """Function that returns the volume in the storage tank expressed as a percent 

100 of capacity.""" 

101 return self.tank.storage["volume"] / self.tank.capacity 

102 

103 def end_timestep(self): 

104 """Update tank states.""" 

105 self.tank.end_timestep() 

106 

107 def reinit(self): 

108 """Call tank reinit.""" 

109 # TODO Automate this better 

110 self.tank.reinit() 

111 self.tank.storage["volume"] = self.initial_storage 

112 self.tank.storage_["volume"] = self.initial_storage 

113 

114 

115class Groundwater(Storage): 

116 """""" 

117 

118 def __init__( 

119 self, 

120 residence_time=200, 

121 infiltration_threshold=1, 

122 infiltration_pct=0, 

123 data_input_dict={}, 

124 **kwargs, 

125 ): 

126 # TODO why isn't this using a ResidenceTank? 

127 """A storage with a residence time for groundwater. Can also infiltrate to 

128 sewers. 

129 

130 Args: 

131 residence_time (float, optional): Residence time (see nodes.py/ 

132 ResidenceTank). Defaults to 200. 

133 infiltration_threshold (float, optional): Proportion of storage capacity 

134 that must be exceeded to generate infiltration. Defaults to 1. 

135 infiltration_pct (float, optional): Proportion of storage above the 

136 threshold that is square rooted and infiltrated. Defaults to 0. 

137 data_input_dict (dict, optional): Dictionary of data inputs relevant for 

138 the node (though I don't think it is used). Defaults to {}. 

139 

140 Functions intended to call in orchestration: 

141 infiltrate (before sewers are discharged) 

142 

143 distribute 

144 

145 Key assumptions: 

146 - Conceptualises groundwater as a tank. 

147 - Baseflow is generated following a residence-time method. 

148 - Baseflow is sent to `storage.py/River`, `nodes.py/Node` or 

149 `waste.py/Waste` nodes. 

150 - Infiltration to `sewer.py/Sewer` nodes occurs when the storage 

151 in the tank is greater than a specified threshold, at a rate 

152 proportional to the sqrt of volume above the threshold. (Note, this 

153 behaviour is __not validated__ and a high uncertainty process in 

154 general) 

155 - If `decays` are provided to model water quality transformations, 

156 see `core.py/DecayObj`. 

157 

158 Input data and parameter requirements: 

159 - Groundwater tank `capacity`, `area`, and `datum`. 

160 _Units_: cubic metres, squared metres, metres 

161 - Infiltration behaviour determined by an `infiltration_threshold` 

162 and `infiltration_pct`. _Units_: proportion of capacity 

163 - Optional dictionary of decays with pollutants as keys and decay 

164 parameters (a constant and a temperature sensitivity exponent) as 

165 values. _Units_: - 

166 """ 

167 self.residence_time = residence_time 

168 self.infiltration_threshold = infiltration_threshold 

169 self.infiltration_pct = infiltration_pct 

170 # TODO not used data_input 

171 self.data_input_dict = data_input_dict 

172 super().__init__(**kwargs) 

173 

174 def distribute(self): 

175 """Calculate outflow with residence time and send to Nodes or Rivers.""" 

176 avail = self.tank.get_avail()["volume"] / self.residence_time 

177 to_send = self.tank.pull_storage({"volume": avail}) 

178 retained = self.push_distributed(to_send, of_type=["Node", "River", "Waste"]) 

179 _ = self.tank.push_storage(retained, force=True) 

180 if retained["volume"] > constants.FLOAT_ACCURACY: 

181 print("Storage unable to push") 

182 

183 def infiltrate(self): 

184 """Calculate amount of water available for infiltration and send to sewers.""" 

185 # Calculate infiltration 

186 avail = self.tank.get_avail()["volume"] 

187 avail = max(avail - self.tank.capacity * self.infiltration_threshold, 0) 

188 avail = (avail * self.infiltration_pct) ** 0.5 

189 

190 # Push to sewers 

191 to_send = self.tank.pull_storage({"volume": avail}) 

192 retained = self.push_distributed(to_send, of_type="Sewer") 

193 _ = self.tank.push_storage(retained, force=True) 

194 # Any not sent is left in tank 

195 if retained["volume"] > constants.FLOAT_ACCURACY: 

196 # print('unable to infiltrate') 

197 pass 

198 

199 

200class QueueGroundwater(Storage): 

201 """""" 

202 

203 # TODO - no infiltration as yet 

204 def __init__(self, timearea={0: 1}, data_input_dict={}, **kwargs): 

205 """Alternate formulation of Groundwater that uses a timearea property to enable 

206 more nonlinear time behaviour of baseflow routing. Uses the QueueTank or 

207 DecayQueueTank (see nodes.py/Tank subclassses). 

208 

209 NOTE: abstraction behaviour from this kind of node need careful checking 

210 

211 Args: 

212 timearea (dict, optional): Time area diagram that enables flows to 

213 take a range of different durations to 'traverse' the tank. The keys of 

214 the dict are the number of timesteps while the values are the proportion 

215 of flow. E.g., {0 : 0.7, 1 : 0.3} means 70% of flow takes 0 timesteps 

216 and 30% takes 1 timesteps. Defaults to {0 : 1}. 

217 data_input_dict (dict, optional): Dictionary of data inputs relevant for 

218 the node (though I don't think it is used). Defaults to {}. 

219 

220 Functions intended to call in orchestration: 

221 distribute 

222 

223 Key assumptions: 

224 - Conceptualises groundwater as a tank. 

225 - Baseflow is generated following a timearea method. 

226 - Baseflow is sent to `storage.py/River`, `nodes.py/Node` or 

227 `waste.py/Waste` nodes. 

228 - No infiltration to sewers is modelled. 

229 - If `decays` are provided to model water quality transformations, 

230 see `core.py/DecayObj`. 

231 

232 Input data and parameter requirements: 

233 - Groundwater tank `capacity`, `area`, and `datum`. 

234 _Units_: cubic metres, squared metres, metres 

235 - `timearea` is a dictionary containing the timearea diagram. 

236 _Units_: duration of flow (in timesteps) and proportion of flow 

237 - Optional dictionary of decays with pollutants as keys and decay 

238 parameters (a constant and a temperature sensitivity exponent) as 

239 values. _Units_: - 

240 """ 

241 self.timearea = timearea 

242 # TODO not used 

243 self.data_input_dict = data_input_dict 

244 super().__init__(**kwargs) 

245 # Label as Groundwater class so that other nodes treat it the same 

246 self.__class__.__name__ = "Groundwater" 

247 # Update handlers 

248 self.push_set_handler["default"] = self.push_set_timearea 

249 self.pull_set_handler["default"] = self.pull_set_active 

250 self.pull_check_handler["default"] = self.pull_check_active 

251 # Enable decay 

252 if self.decays is None: 

253 self.tank = QueueTank( 

254 capacity=self.capacity, 

255 area=self.area, 

256 datum=self.datum, 

257 initial_storage=self.initial_storage, 

258 ) 

259 else: 

260 self.tank = DecayQueueTank( 

261 capacity=self.capacity, 

262 area=self.area, 

263 datum=self.datum, 

264 decays=self.decays, 

265 parent=self, 

266 initial_storage=self.initial_storage, 

267 ) 

268 

269 def push_set_timearea(self, vqip): 

270 """Push setting that enables timearea behaviour, (see __init__ for 

271 description).Used to receive flow that is assumed to occur widely across some 

272 kind of catchment. 

273 

274 Args: 

275 vqip (dict): A VQIP that has been pushed 

276 

277 Returns: 

278 reply (dict): A VQIP amount that was not successfuly receivesd 

279 """ 

280 reply = self.empty_vqip() 

281 # Iterate over timearea diagram TODO timearea diagram behaviour be generalised 

282 # across nodes 

283 for time, normalised in self.timearea.items(): 

284 vqip_ = self.v_change_vqip(vqip, vqip["volume"] * normalised) 

285 reply_ = self.tank.push_storage(vqip_, time=time) 

286 reply = self.sum_vqip(reply, reply_) 

287 return reply 

288 

289 def distribute(self): 

290 """Update internal arc, push active_storage onwards, update tank.""" 

291 _ = self.tank.internal_arc.update_queue(direction="push") 

292 

293 remaining = self.push_distributed(self.tank.active_storage) 

294 

295 if remaining["volume"] > constants.FLOAT_ACCURACY: 

296 print("Groundwater couldnt push all") 

297 

298 # Update tank 

299 sent = self.tank.active_storage["volume"] - remaining["volume"] 

300 sent = self.v_change_vqip(self.tank.active_storage, sent) 

301 reply = self.tank.pull_storage(sent) 

302 if (reply["volume"] - sent["volume"]) > constants.FLOAT_ACCURACY: 

303 print("Miscalculated tank storage in discharge") 

304 

305 def infiltrate(self): 

306 """""" 

307 pass 

308 

309 def pull_check_active(self, vqip=None): 

310 """A pull check that returns the active storage. 

311 

312 Args: 

313 vqip (dict, optional): A VQIP that can be used to limit the volume in 

314 the return value (only volume key is used). Defaults to None. 

315 

316 Returns: 

317 (dict): A VQIP amount that is available to pull 

318 """ 

319 if vqip is None: 

320 return self.tank.active_storage 

321 else: 

322 reply = min(vqip["volume"], self.tank.active_storage["volume"]) 

323 return self.v_change_vqip(self.tank.active_storage, reply) 

324 

325 def pull_set_active(self, vqip): 

326 # TODO - this is quite weird behaviour, and inconsistent with pull_check_active 

327 """Pull proportionately from both the active storage and the queue. Adjudging 

328 groundwater abstractions to not be particularly sensitive to the within 

329 catchment travel time. 

330 

331 Args: 

332 vqip (dict): A VQIP amount to be pulled (only volume key is used) 

333 

334 Returns: 

335 pulled (dict): A VQIP amount that was successfully pulled 

336 """ 

337 # Calculate actual pull 

338 total_storage = self.tank.storage["volume"] 

339 total_pull = min(self.tank.storage["volume"], vqip["volume"]) 

340 

341 if total_pull < constants.FLOAT_ACCURACY: 

342 return self.empty_vqip() 

343 else: 

344 # Track total pull in pulled 

345 pulled = self.empty_vqip() 

346 # Iterate over queue 

347 if isinstance(self.tank.internal_arc.queue, dict): 

348 for t, v in self.tank.internal_arc.queue.items(): 

349 # Pull proportionately 

350 t_pulled = self.v_change_vqip( 

351 self.tank.internal_arc.queue[t], 

352 v["volume"] * total_pull / total_storage, 

353 ) 

354 # Reduce queue VQIPs 

355 self.tank.internal_arc.queue[t] = self.extract_vqip( 

356 self.tank.internal_arc.queue[t], t_pulled 

357 ) 

358 # Track pull 

359 pulled = self.sum_vqip(pulled, t_pulled) 

360 # Pull also from active storage 

361 a_pulled = self.v_change_vqip( 

362 self.tank.active_storage, 

363 self.tank.active_storage["volume"] * total_pull / total_storage, 

364 ) 

365 self.tank.active_storage = self.extract_vqip( 

366 self.tank.active_storage, a_pulled 

367 ) 

368 pulled = self.sum_vqip(pulled, a_pulled) 

369 

370 # Recalculate storage 

371 self.tank.storage = self.extract_vqip(self.tank.storage, pulled) 

372 return pulled 

373 # elif isinstance(self.tank.internal_arc.queue, list): for req in 

374 # self.tank.internal_arc.queue: t_pulled = req['vqtip']['volume'] * 

375 # total_pull / total_storage req['vqtip'] = 

376 # self.v_change_vqip(req['vqtip'], req['vqtip']['volume'] - 

377 # t_pulled) pulled += t_pulled a_pulled = 

378 # self.tank.active_storage['volume'] * total_pull / total_storage 

379 # self.tank.active_storage = 

380 # self.v_change_vqip(self.tank.active_storage, 

381 # self.tank.active_storage['volume'] - a_pulled) pulled += a_pulled 

382 

383 # #Recalculate storage - doing this differently causes numerical errors 

384 # new_v = sum([x['vqtip']['volume'] for x in 

385 # self.tank.internal_arc.queue])+ self.tank.active_storage['volume'] 

386 # self.tank.storage = self.v_change_vqip(self.tank.storage, new_v) 

387 

388 # return self.v_change_vqip(self.tank.storage, pulled) 

389 

390 

391class River(Storage): 

392 """""" 

393 

394 # TODO non-day timestep 

395 def __init__( 

396 self, 

397 depth=2, 

398 length=200, 

399 width=20, 

400 velocity=0.2 * constants.M_S_TO_M_DT, 

401 damp=0.1, 

402 mrf=0, 

403 **kwargs, 

404 ): 

405 """Node that contains extensive in-river biochemical processes. 

406 

407 Args: 

408 depth (float, optional): River tank depth. Defaults to 2. length (float, 

409 optional): River tank length. Defaults to 200. width (float, optional): 

410 River tank width. Defaults to 20. velocity (float, optional): River velocity 

411 (if someone wants to calculate 

412 this on the fly that would also work). Defaults to 

413 0.2*constants.M_S_TO_M_DT. 

414 damp (float, optional): Flow delay and attentuation parameter. Defaults 

415 to 0.1. 

416 mrf (float, optional): Minimum required flow in river (volume per timestep), 

417 can limit pulls made to the river. Defaults to 0. 

418 

419 Functions intended to call in orchestration: 

420 distribute 

421 

422 Key assumptions: 

423 - River is conceptualised as a water tank that receives flows from various 

424 sources (e.g., runoffs from urban and rural land, baseflow from 

425 groundwater), interacts with water infrastructure (e.g., abstraction for 

426 irrigation and domestic supply, sewage and treated effluent discharge), 

427 and discharges flows downstream. It has length and width as shape 

428 parameters, average velocity to indicate flow speed and capacity to 

429 indicate the maximum storage limit. 

430 - Flows from different sources into rivers will fully mix. River tank is 

431 assumed to 

432 have delay and attenuation effects when generate outflows. These effects 

433 are simulated based on the average velocity. 

434 - In-river biochemical processes are simulated as sources/sinks of 

435 nutrients 

436 in the river tank, including - denitrification (for nitrogen) - 

437 phytoplankton absorption/release (for nitrogen and phosphorus) - 

438 macrophyte uptake (for nitrogen and phosphorus) These processes are 

439 affected by river temperature. 

440 

441 Input data and parameter requirements: 

442 - depth, length, width 

443 _Units_: m 

444 - velocity 

445 _Units_: m/day 

446 - damping coefficient 

447 _Units_: - 

448 - minimum required flow 

449 _Units_: m3/day 

450 """ 

451 # Set parameters 

452 self.depth = depth # [m] 

453 self.length = length # [m] 

454 self.width = width # [m] 

455 self.velocity = velocity # [m/dt] 

456 self.damp = damp # [>=0] flow delay and attenuation 

457 self.mrf = mrf 

458 area = length * width # [m2] 

459 

460 capacity = ( 

461 constants.UNBOUNDED_CAPACITY 

462 ) # TODO might be depth * area if flood indunation is going to be simulated 

463 

464 # Required in cases where 'area' conflicts with length*width 

465 kwargs["area"] = area 

466 # Required in cases where 'capacity' conflicts with depth*area 

467 kwargs["capacity"] = capacity 

468 

469 super().__init__(**kwargs) 

470 

471 # TODO check units TODO Will a user want to change any of these? Wide variety of 

472 # river parameters (from HYPE) 

473 self.uptake_PNratio = 1 / 7.2 # [-] P:N during crop uptake 

474 self.bulk_density = 1300 # [kg/m3] soil density 

475 self.denpar_w = 0.0015 # 0.001, # [kg/m2/day] reference denitrification rate 

476 # in water course 

477 self.T_wdays = 5 # [days] weighting constant for river temperature calculation 

478 # (similar to moving average period) 

479 self.halfsatINwater = ( 

480 1.5 * constants.MG_L_TO_KG_M3 

481 ) # [kg/m3] half saturation parameter for denitrification in river 

482 self.T_10_days = [] # [degree C] average water temperature of 10 days 

483 self.T_20_days = [] # [degree C] average water temperature of 20 days 

484 self.TP_365_days = [] # [degree C] average water temperature of 20 days 

485 self.hsatTP = 0.05 * constants.MG_L_TO_KG_M3 # [kg/m3] 

486 self.limpppar = 0.1 * constants.MG_L_TO_KG_M3 # [kg/m3] 

487 self.prodNpar = 0.001 # [kg N/m3/day] nitrogen production/mineralisation rate 

488 self.prodPpar = ( 

489 0.0001 # [kg N/m3/day] phosphorus production/mineralisation rate 

490 ) 

491 self.muptNpar = 0.001 # [kg/m2/day] nitrogen macrophyte uptake rate 

492 self.muptPpar = 0.0001 # 0.01, # [kg/m2/day] phosphorus macrophyte uptake rate 

493 self.qbank_365_days = [1e6, 1e6] # [m3/day] store outflow in the previous year 

494 self.qbank = ( 

495 1e6 # [m3/day] bankfull flow = second largest outflow in the previous year 

496 ) 

497 self.qbankcorrpar = 0.001 # [-] correction coefficient for qbank flow 

498 self.sedexppar = 1 # [-] 

499 self.EPC0 = 0.05 * constants.MG_L_TO_KG_M3 # [mg/l] 

500 self.kd_s = 0 * constants.MG_L_TO_KG_M3 # 6 * 1e-6, # [kg/m3] 

501 self.kadsdes_s = 2 # 0.9, # [-] 

502 self.Dsed = 0.2 # [m] 

503 

504 self.max_temp_lag = 20 

505 self.lagged_temperatures = [] 

506 

507 self.max_phosphorus_lag = 365 

508 self.lagged_total_phosphorus = [] 

509 

510 self.din_components = ["ammonia", "nitrate"] 

511 # TODO need a cleaner way to do this depending on whether e.g., nitrite is 

512 # included 

513 

514 # Initialise paramters 

515 self.current_depth = 0 # [m] 

516 # self.river_temperature = 0 # [degree C] self.river_denitrification = 0 # 

517 # [kg/day] self.macrophyte_uptake_N = 0 # [kg/day] self.macrophyte_uptake_P = 0 

518 # # [kg/day] self.sediment_particulate_phosphorus_pool = 60000 # [kg] 

519 # self.sediment_pool = 1000000 # [kg] self.benthos_source_sink = 0 # [kg/day] 

520 # self.t_res = 0 # [day] self.outflow = self.empty_vqip() 

521 

522 # Update end_teimstep 

523 self.end_timestep = self.end_timestep_ 

524 

525 # Update handlers 

526 self.push_set_handler["default"] = self.push_set_river 

527 self.push_check_handler["default"] = self.push_check_accept 

528 

529 self.pull_check_handler["default"] = self.pull_check_river 

530 self.pull_set_handler["default"] = self.pull_set_river 

531 

532 # TODO - RiparianBuffer 

533 self.pull_check_handler[("RiparianBuffer", "volume")] = self.pull_check_fp 

534 

535 # Update mass balance 

536 self.bio_in = self.empty_vqip() 

537 self.bio_out = self.empty_vqip() 

538 

539 self.mass_balance_in.append(lambda: self.bio_in) 

540 self.mass_balance_out.append(lambda: self.bio_out) 

541 

542 # TODO something like this might be needed if you want sewers backing up from river 

543 # height... would need to incorporate expected river outflow def get_dt_excess(self, 

544 # vqip = None): reply = self.empty_vqip() reply['volume'] = 

545 # self.tank.get_excess()['volume'] + self.tank.get_avail()['volume'] * 

546 # self.get_riverrc() if vqip is not None: reply['volume'] = min(vqip['volume'], 

547 # reply['volume']) return reply 

548 

549 # def push_set_river(self, vqip): vqip_ = vqip.copy() vqip_ = 

550 # self.v_change_vqip(vqip_, min(vqip_['volume'], 

551 # self.get_dt_excess()['volume'])) _ = self.tank.push_storage(vqip_, force=True) 

552 # return self.extract_vqip(vqip, vqip_) 

553 

554 def pull_check_river(self, vqip=None): 

555 """Check amount of water that can be pulled from river tank and upstream. 

556 

557 Args: 

558 vqip (dict, optional): Maximum water required (only 'volume' is used) 

559 

560 Returns: 

561 avail (dict): A VQIP amount that can be pulled 

562 """ 

563 # Get storage 

564 avail = self.tank.get_avail() 

565 

566 # Check incoming 

567 upstream = self.get_connected(direction="pull", of_type=["River", "Node"]) 

568 avail["volume"] += upstream["avail"] 

569 

570 # convert mrf from volume/timestep to discrete value 

571 mrf = self.mrf / self.get_riverrc() 

572 

573 # Apply mrf 

574 avail_vol = max(avail["volume"] - mrf, 0) 

575 if vqip is None: 

576 avail = self.v_change_vqip(avail, avail_vol) 

577 else: 

578 avail = self.v_change_vqip(avail, min(avail_vol, vqip["volume"])) 

579 

580 return avail 

581 

582 def pull_set_river(self, vqip): 

583 """Pull from river tank and upstream, acknowledging MRF with pull_check. 

584 

585 Args: 

586 vqip (dict): A VQIP amount to pull (only volume key used) 

587 

588 Returns: 

589 (dict): A VQIP amount that was pulled 

590 """ 

591 # Calculate available pull 

592 avail = self.pull_check_river(vqip) 

593 

594 # Take first from tank 

595 pulled = self.tank.pull_storage(avail) 

596 

597 # Take remaining from upstream 

598 to_pull = {"volume": avail["volume"] - pulled["volume"]} 

599 pulled_ = self.pull_distributed(to_pull, of_type=["River", "Node"]) 

600 

601 reply = self.sum_vqip(pulled, pulled_) 

602 

603 return reply 

604 

605 def push_set_river(self, vqip): 

606 """Push to river tank, currently forced. 

607 

608 Args: 

609 vqip (dict): A VQIP amount to push 

610 

611 Returns: 

612 (dict): A VQIP amount that was not successfully received 

613 """ 

614 _ = self.tank.push_storage(vqip, force=True) 

615 return self.empty_vqip() 

616 

617 def update_depth(self): 

618 """Recalculate depth.""" 

619 self.current_depth = self.tank.storage["volume"] / self.area 

620 

621 def get_din_pool(self): 

622 """Get total dissolved inorganic nitrogen from tank storage. 

623 

624 Returns: 

625 (float): total din 

626 """ 

627 return sum( 

628 [self.tank.storage[x] for x in self.din_components] 

629 ) # TODO + self.tank.storage['nitrite'] but nitrite might not be modelled... 

630 # need some ways to address this 

631 

632 def biochemical_processes(self): 

633 """Runs all biochemical processes and updates pollutant amounts. 

634 

635 Returns: 

636 in_ (dict): A VQIP amount that represents total gain in pollutant amounts 

637 out_ (dict): A VQIP amount that represents total loss in pollutant amounts 

638 """ 

639 # TODO make more modular 

640 self.update_depth() 

641 

642 self.tank.storage["temperature"] = (1 - 1 / self.T_wdays) * self.tank.storage[ 

643 "temperature" 

644 ] + (1 / self.T_wdays) * self.get_data_input("temperature") 

645 

646 # Update lagged temperatures 

647 if len(self.lagged_temperatures) > self.max_temp_lag: 

648 del self.lagged_temperatures[0] 

649 self.lagged_temperatures.append(self.tank.storage["temperature"]) 

650 

651 # Update lagged total phosphorus 

652 if len(self.lagged_total_phosphorus) > self.max_phosphorus_lag: 

653 del self.lagged_total_phosphorus[0] 

654 total_phosphorus = ( 

655 self.tank.storage["phosphate"] + self.tank.storage["org-phosphorus"] 

656 ) 

657 self.lagged_total_phosphorus.append(total_phosphorus) 

658 

659 # Check if any water 

660 if self.tank.storage["volume"] < constants.FLOAT_ACCURACY: 

661 # Assume these only do something when there is water 

662 return (self.empty_vqip(), self.empty_vqip()) 

663 

664 if self.tank.storage["temperature"] <= 0: 

665 # Seems that these things are only active when above freezing 

666 return (self.empty_vqip(), self.empty_vqip()) 

667 

668 # Denitrification 

669 tempfcn = 2 ** ((self.tank.storage["temperature"] - 20) / 10) 

670 if self.tank.storage["temperature"] < 5: 

671 tempfcn *= self.tank.storage["temperature"] / 5 

672 

673 din = self.get_din_pool() 

674 din_concentration = din / self.tank.storage["volume"] 

675 confcn = din_concentration / ( 

676 din_concentration + self.halfsatINwater 

677 ) # [kg/m3] 

678 denitri_water = ( 

679 self.denpar_w * self.area * tempfcn * confcn 

680 ) # [kg/day] #TODO convert to per DT 

681 

682 river_denitrification = min( 

683 denitri_water, 0.5 * din 

684 ) # [kg/day] max 50% kan be denitrified 

685 din_ = din - river_denitrification # [kg] 

686 

687 # Update mass balance 

688 in_ = self.empty_vqip() 

689 out_ = self.empty_vqip() 

690 if din > 0: 

691 for pol in self.din_components: 

692 # denitrification 

693 loss = (din - din_) / din * self.tank.storage[pol] 

694 out_[pol] += loss 

695 self.tank.storage[pol] -= loss 

696 

697 din = self.get_din_pool() 

698 

699 # Calculate moving averages TODO generalise 

700 temp_10_day = sum(self.lagged_temperatures[-10:]) / 10 

701 temp_20_day = sum(self.lagged_temperatures[-20:]) / 20 

702 total_phos_365_day = sum(self.lagged_total_phosphorus) / self.max_phosphorus_lag 

703 

704 # Calculate coefficients 

705 tempfcn = ( 

706 (self.tank.storage["temperature"]) / 20 * (temp_10_day - temp_20_day) / 5 

707 ) 

708 if (total_phos_365_day - self.limpppar + self.hsatTP) > 0: 

709 totalphosfcn = (total_phos_365_day - self.limpppar) / ( 

710 total_phos_365_day - self.limpppar + self.hsatTP 

711 ) 

712 else: 

713 totalphosfcn = 0 

714 

715 # Mineralisation/production TODO this feels like it could be much tidier 

716 minprodN = ( 

717 self.prodNpar * totalphosfcn * tempfcn * self.area * self.current_depth 

718 ) # [kg N/day] 

719 minprodP = ( 

720 self.prodPpar 

721 * totalphosfcn 

722 * tempfcn 

723 * self.area 

724 * self.current_depth 

725 * self.uptake_PNratio 

726 ) # [kg N/day] 

727 if minprodN > 0: 

728 # production (inorg -> org) 

729 minprodN = min( 

730 0.5 * din, minprodN 

731 ) # only half pool can be used for production 

732 minprodP = min( 

733 0.5 * self.tank.storage["phosphate"], minprodP 

734 ) # only half pool can be used for production 

735 

736 # Update mass balance 

737 out_["phosphate"] = minprodP 

738 self.tank.storage["phosphate"] -= minprodP 

739 in_["org-phosphorus"] = minprodP 

740 self.tank.storage["org-phosphorus"] += minprodP 

741 if din > 0: 

742 for pol in self.din_components: 

743 loss = minprodN * self.tank.storage[pol] / din 

744 out_[pol] += loss 

745 self.tank.storage[pol] -= loss 

746 

747 in_["org-nitrogen"] = minprodN 

748 self.tank.storage["org-nitrogen"] += minprodN 

749 

750 else: 

751 # mineralisation (org -> inorg) 

752 minprodN = min(0.5 * self.tank.storage["org-nitrogen"], -minprodN) 

753 minprodP = min(0.5 * self.tank.storage["org-phosphorus"], -minprodP) 

754 

755 # Update mass balance 

756 in_["phosphate"] = minprodP 

757 self.tank.storage["phosphate"] += minprodP 

758 out_["org-phosphorus"] = minprodP 

759 self.tank.storage["org-phosphorus"] -= minprodP 

760 if din > 0: 

761 for pol in self.din_components: 

762 gain = minprodN * self.tank.storage[pol] / din 

763 in_[pol] += gain 

764 self.tank.storage[pol] += gain 

765 

766 out_["org-nitrogen"] = minprodN 

767 self.tank.storage["org-nitrogen"] -= minprodN 

768 

769 din = self.get_din_pool() 

770 

771 # macrophyte uptake temperature dependence factor 

772 tempfcn1 = (max(0, self.tank.storage["temperature"]) / 20) ** 0.3 

773 tempfcn2 = (self.tank.storage["temperature"] - temp_20_day) / 5 

774 tempfcn = max(0, tempfcn1 * tempfcn2) 

775 

776 macrouptN = self.muptNpar * tempfcn * self.area # [kg/day] 

777 macrophyte_uptake_N = min(0.5 * din, macrouptN) 

778 if din > 0: 

779 for pol in self.din_components: 

780 loss = macrophyte_uptake_N * self.tank.storage[pol] / din 

781 out_[pol] += loss 

782 self.tank.storage[pol] -= loss 

783 

784 macrouptP = ( 

785 self.muptPpar * tempfcn * max(0, totalphosfcn) * self.area 

786 ) # [kg/day] 

787 macrophyte_uptake_P = min(0.5 * self.tank.storage["phosphate"], macrouptP) 

788 out_["phosphate"] += macrophyte_uptake_P 

789 self.tank.storage["phosphate"] -= macrophyte_uptake_P 

790 

791 # TODO source/sink for benthos sediment P suspension/resuspension 

792 return in_, out_ 

793 

794 def get_riverrc(self): 

795 """Get river outflow coefficient (i.e., how much water leaves the tank in this 

796 timestep). 

797 

798 Returns: 

799 riverrc (float): outflow coeffficient 

800 """ 

801 # Calculate travel time 

802 total_time = self.length / self.velocity 

803 # Apply damp 

804 kt = self.damp * total_time # [day] 

805 if kt != 0: 

806 riverrc = 1 - kt + kt * exp(-1 / kt) # [-] 

807 else: 

808 riverrc = 1 

809 return riverrc 

810 

811 def calculate_discharge(self): 

812 """""" 

813 if "nitrate" in constants.POLLUTANTS: 

814 # TODO clumsy Run biochemical processes 

815 in_, out_ = self.biochemical_processes() 

816 # Mass balance 

817 self.bio_in = in_ 

818 self.bio_out = out_ 

819 

820 def distribute(self): 

821 """Run biochemical processes, track mass balance, and distribute water.""" 

822 # self.calculate_discharge() Get outflow 

823 outflow = self.tank.pull_storage( 

824 {"volume": self.tank.storage["volume"] * self.get_riverrc()} 

825 ) 

826 # Distribute outflow 

827 reply = self.push_distributed(outflow, of_type=["River", "Node", "Waste"]) 

828 _ = self.tank.push_storage(reply, force=True) 

829 if reply["volume"] > constants.FLOAT_ACCURACY: 

830 print("river cant push: {0}".format(reply["volume"])) 

831 

832 def pull_check_fp(self, vqip=None): 

833 """ 

834 

835 Args: 

836 vqip: 

837 

838 Returns: 

839 

840 """ 

841 # TODO Pull checking for riparian buffer, needs updating update river depth 

842 self.update_depth() 

843 return self.current_depth, self.area, self.width, self.river_tank.storage 

844 

845 def end_timestep_(self): 

846 """Update state variables.""" 

847 self.tank.end_timestep() 

848 self.bio_in = self.empty_vqip() 

849 self.bio_out = self.empty_vqip() 

850 

851 

852class Reservoir(Storage): 

853 """""" 

854 

855 def __init__(self, **kwargs): 

856 """Storage node that makes abstractions by calling pull_distributed. 

857 

858 Functions intended to call in orchestration: 

859 make_abstractions (before any river routing) 

860 

861 Key assumptions: 

862 - Conceptualised as a `Tank`. 

863 - Recharged only via pumped abstractions. 

864 - Evaporation/precipitation onto surface area currently ignored. 

865 - If `decays` are provided to model water quality transformations, 

866 see `core.py/DecayObj`. 

867 

868 Input data and parameter requirements: 

869 - Tank `capacity`, `area`, and `datum`. 

870 _Units_: cubic metres, squared metres, metres 

871 - Optional dictionary of decays with pollutants as keys and decay 

872 parameters (a constant and a temperature sensitivity exponent) as 

873 values. _Units_: - 

874 """ 

875 super().__init__(**kwargs) 

876 

877 def make_abstractions(self): 

878 """Pulls water and updates tanks.""" 

879 reply = self.pull_distributed(self.tank.get_excess()) 

880 spill = self.tank.push_storage(reply) 

881 _ = self.tank.push_storage(spill, force=True) 

882 if spill["volume"] > constants.FLOAT_ACCURACY: 

883 print("Spill at reservoir by {0}".format(spill["volume"])) 

884 

885 

886class RiverReservoir(Reservoir): 

887 """""" 

888 

889 def __init__(self, environmental_flow=0, **kwargs): 

890 """A reservoir with a natural river inflow, includes an environmental downstream 

891 flow to satisfy. 

892 

893 Args: 

894 environmental_flow (float, optional): Downstream environmental flow. 

895 Defaults to 0. 

896 

897 Functions intended to call in orchestration: 

898 make_abstractions (if any) 

899 

900 satisfy_environmental (before river routing.. possibly before 

901 downstream river abstractions depending on licence) 

902 

903 Key assumptions: 

904 - Conceptualised as a `Tank`. 

905 - Recharged via pumped abstractions and receives water from 

906 inflowing arcs. 

907 - Reservoir aims to satisfy a static `environmental_flow`. 

908 - If tank capacity is exceeded, reservoir spills downstream 

909 towards `nodes.py/Node`, `storage.py/River` or `waste.py/Waste` nodes. 

910 Spill counts towards `environmental_flow`. 

911 - Evaporation/precipitation onto surface area currently ignored. 

912 - Currently, if a reservoir satisfies a pull from a downstream 

913 node, it does __not__ count towards `environmental_flow`. 

914 - If `decays` are provided to model water quality transformations, 

915 see `core.py/DecayObj`. 

916 

917 Input data and parameter requirements: 

918 - Tank `capacity`, `area`, and `datum`. 

919 _Units_: cubic metres, squared metres, metres 

920 - `environmental_flow` 

921 _Units_: cubic metres/timestep 

922 - Optional dictionary of decays with pollutants as keys and decay 

923 parameters (a constant and a temperature sensitivity exponent) as 

924 values. _Units_: - 

925 """ 

926 # Parameters 

927 self.environmental_flow = environmental_flow 

928 super().__init__(**kwargs) 

929 

930 # State variables 

931 self.total_environmental_satisfied = 0 

932 

933 self.push_set_handler["default"] = self.push_set_river_reservoir 

934 self.push_check_handler["default"] = self.push_check_river_reservoir 

935 self.end_timestep = self.end_timestep_ 

936 

937 self.__class__.__name__ = "Reservoir" 

938 

939 def push_set_river_reservoir(self, vqip): 

940 """Receive water. 

941 

942 Args: 

943 vqip (dict): A VQIP amount to be received 

944 

945 Returns: 

946 reply (dict): A VQIP amount that was not successfully received 

947 """ 

948 # Apply normal reservoir storage We do this under the assumption that spill is 

949 # mixed in with the reservoir If the reservoir can't spill everything you'll get 

950 # some weird numbers in reply, but if your reservoir can't spill as much as you 

951 # like then you should probably be pushing the right amount through 

952 # push_check_river_reservoir Some cunning could probably avoid this by checking 

953 # vqip, but this is a serious edge case 

954 _ = self.tank.push_storage(vqip, force=True) 

955 spill = self.tank.pull_ponded() 

956 

957 # Send spill downstream 

958 reply = self.push_distributed(spill, of_type=["Node", "River", "Waste"]) 

959 

960 # Use spill to satisfy downstream flow 

961 self.total_environmental_satisfied += spill["volume"] - reply["volume"] 

962 

963 return reply 

964 

965 def push_check_river_reservoir(self, vqip=None): 

966 """A push check to receive water, assumes spill may occur and checks downstream 

967 capacity. 

968 

969 Args: 

970 vqip (dict, optional): A VQIP that can be used to limit the volume in 

971 the return value (only volume key is used). Defaults to None. 

972 

973 Returns: 

974 excess (dict): A VQIP amount of water that cannot be received 

975 """ 

976 # Check downstream capacity (i.e., that would be spilled) 

977 downstream_availability = self.get_connected( 

978 direction="push", of_type=["Node", "River", "Waste"] 

979 )["avail"] 

980 # Check excess capacity in the reservoir 

981 excess = self.tank.get_excess() 

982 # Combine excess and downstream in response 

983 new_v = excess["volume"] + downstream_availability 

984 if vqip is not None: 

985 new_v = min(vqip["volume"], new_v) 

986 # Update to vqip 

987 excess = self.v_change_vqip(excess, new_v) 

988 

989 return excess 

990 

991 def satisfy_environmental(self): 

992 """Satisfy environmental flow downstream.""" 

993 # Calculate how much environmental flow is yet to satisfy #(some may have been 

994 # already if pull-and-take abstractions have taken place) 

995 to_satisfy = max( 

996 self.environmental_flow - self.total_environmental_satisfied, 0 

997 ) 

998 # Pull from tank 

999 environmental = self.tank.pull_storage({"volume": to_satisfy}) 

1000 # Send downstream 

1001 reply = self.push_distributed(environmental) 

1002 _ = self.tank.push_storage(reply, force=True) 

1003 if reply["volume"] > constants.FLOAT_ACCURACY: 

1004 print("warning: environmental not able to push") 

1005 

1006 # Update satisfaction 

1007 self.total_environmental_satisfied += environmental["volume"] 

1008 

1009 def end_timestep_(self): 

1010 """Udpate state varibles.""" 

1011 self.tank.end_timestep() 

1012 self.total_environmental_satisfied = 0