Coverage for wsimod\orchestration\model.py: 26%

610 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-30 14:55 +0000

1# -*- coding: utf-8 -*- 

2"""Created on Mon Jul 4 16:01:48 2022. 

3 

4@author: bdobson 

5""" 

6import csv 

7import gzip 

8import inspect 

9import os 

10import sys 

11from datetime import datetime 

12from math import log10 

13 

14import dill as pickle 

15import yaml 

16from tqdm import tqdm 

17 

18from wsimod.arcs import arcs as arcs_mod 

19from wsimod.core import constants 

20from wsimod.core.core import WSIObj 

21from wsimod.nodes.land import ImperviousSurface 

22from wsimod.nodes.nodes import NODES_REGISTRY 

23from wsimod.nodes.tanks import QueueTank, ResidenceTank, Tank 

24 

25os.environ["USE_PYGEOS"] = "0" 

26 

27 

28class to_datetime: 

29 """""" 

30 

31 # TODO document and make better 

32 def __init__(self, date_string): 

33 """Simple datetime wrapper that has key properties used in WSIMOD components. 

34 

35 Args: 

36 date_string (str): A string containing the date, expected in 

37 format %Y-%m-%d or %Y-%m. 

38 """ 

39 self._date = self._parse_date(date_string) 

40 

41 def __str__(self): 

42 return self._date.strftime("%Y-%m-%d") 

43 

44 def __repr__(self): 

45 return self._date.strftime("%Y-%m-%d") 

46 

47 @property 

48 def dayofyear(self): 

49 """ 

50 

51 Returns: 

52 

53 """ 

54 return self._date.timetuple().tm_yday 

55 

56 @property 

57 def day(self): 

58 """ 

59 

60 Returns: 

61 

62 """ 

63 return self._date.day 

64 

65 @property 

66 def year(self): 

67 """ 

68 

69 Returns: 

70 

71 """ 

72 return self._date.year 

73 

74 @property 

75 def month(self): 

76 """ 

77 

78 Returns: 

79 

80 """ 

81 return self._date.month 

82 

83 def to_period(self, args="M"): 

84 """ 

85 

86 Args: 

87 args: 

88 

89 Returns: 

90 

91 """ 

92 return to_datetime(f"{self._date.year}-{str(self._date.month).zfill(2)}") 

93 

94 def is_leap_year(self): 

95 """ 

96 

97 Returns: 

98 

99 """ 

100 year = self._date.year 

101 return year % 4 == 0 and (year % 100 != 0 or year % 400 == 0) 

102 

103 def _parse_date(self, date_string, date_format="%Y-%m-%d %H:%M:%S"): 

104 try: 

105 return datetime.strptime(date_string, date_format) 

106 except ValueError: 

107 try: 

108 return datetime.strptime(date_string, "%Y-%m-%d") 

109 except ValueError: 

110 try: 

111 # Check if valid 'YYYY-MM' format 

112 if len(date_string.split("-")[0]) == 4: 

113 int(date_string.split("-")[0]) 

114 if len(date_string.split("-")[1]) == 2: 

115 int(date_string.split("-")[1]) 

116 return date_string 

117 except ValueError: 

118 raise ValueError 

119 

120 def __eq__(self, other): 

121 if isinstance(other, to_datetime): 

122 return self._date == other._date 

123 return False 

124 

125 def __hash__(self): 

126 return hash(self._date) 

127 

128 

129class Model(WSIObj): 

130 """""" 

131 

132 def __init__(self): 

133 """Object to contain nodes and arcs that provides a default orchestration. 

134 

135 Returns: 

136 Model: An empty model object 

137 """ 

138 super().__init__() 

139 self.arcs = {} 

140 # self.arcs_type = {} #not sure that this would be necessary 

141 self.nodes = {} 

142 self.nodes_type = {} 

143 self.extensions = [] 

144 self.river_discharge_order = [] 

145 

146 # Default orchestration 

147 self.orchestration = [ 

148 {"FWTW": "treat_water"}, 

149 {"Demand": "create_demand"}, 

150 {"Land": "run"}, 

151 {"Groundwater": "infiltrate"}, 

152 {"Sewer": "make_discharge"}, 

153 {"Foul": "make_discharge"}, 

154 {"WWTW": "calculate_discharge"}, 

155 {"Groundwater": "distribute"}, 

156 {"River": "calculate_discharge"}, 

157 {"Reservoir": "make_abstractions"}, 

158 {"Land": "apply_irrigation"}, 

159 {"WWTW": "make_discharge"}, 

160 {"Catchment": "route"}, 

161 ] 

162 

163 def get_init_args(self, cls): 

164 """Get the arguments of the __init__ method for a class and its superclasses.""" 

165 init_args = [] 

166 for c in cls.__mro__: 

167 # Get the arguments of the __init__ method 

168 args = inspect.getfullargspec(c.__init__).args[1:] 

169 init_args.extend(args) 

170 return init_args 

171 

172 def load(self, address, config_name="config.yml", overrides={}): 

173 """ 

174 

175 Args: 

176 address: 

177 config_name: 

178 overrides: 

179 """ 

180 from ..extensions import apply_patches 

181 

182 with open(os.path.join(address, config_name), "r") as file: 

183 data: dict = yaml.safe_load(file) 

184 

185 for key, item in overrides.items(): 

186 data[key] = item 

187 

188 constants.POLLUTANTS = data.get("pollutants", constants.POLLUTANTS) 

189 constants.ADDITIVE_POLLUTANTS = data.get( 

190 "additive_pollutants", constants.ADDITIVE_POLLUTANTS 

191 ) 

192 constants.NON_ADDITIVE_POLLUTANTS = data.get( 

193 "non_additive_pollutants", constants.NON_ADDITIVE_POLLUTANTS 

194 ) 

195 constants.FLOAT_ACCURACY = float( 

196 data.get("float_accuracy", constants.FLOAT_ACCURACY) 

197 ) 

198 self.__dict__.update(Model().__dict__) 

199 

200 """ 

201 FLAG: 

202 E.G. ADDITION FOR NEW ORCHESTRATION 

203 """ 

204 load_extension_files(data.get("extensions", [])) 

205 self.extensions = data.get("extensions", []) 

206 

207 if "orchestration" in data.keys(): 

208 # Update orchestration 

209 self.orchestration = data["orchestration"] 

210 

211 if "nodes" not in data.keys(): 

212 raise ValueError("No nodes found in the config") 

213 

214 nodes = data["nodes"] 

215 

216 for name, node in nodes.items(): 

217 if "filename" in node.keys(): 

218 node["data_input_dict"] = read_csv( 

219 os.path.join(address, node["filename"]) 

220 ) 

221 del node["filename"] 

222 if "surfaces" in node.keys(): 

223 for key, surface in node["surfaces"].items(): 

224 if "filename" in surface.keys(): 

225 node["surfaces"][key]["data_input_dict"] = read_csv( 

226 os.path.join(address, surface["filename"]) 

227 ) 

228 del surface["filename"] 

229 node["surfaces"] = list(node["surfaces"].values()) 

230 arcs = data.get("arcs", {}) 

231 self.add_nodes(list(nodes.values())) 

232 self.add_arcs(list(arcs.values())) 

233 

234 self.add_overrides(data.get("overrides", {})) 

235 

236 if "dates" in data.keys(): 

237 self.dates = [to_datetime(x) for x in data["dates"]] 

238 

239 apply_patches(self) 

240 

241 def save(self, address, config_name="config.yml", compress=False): 

242 """Save the model object to a yaml file and input data to csv.gz format in the 

243 directory specified. 

244 

245 Args: 

246 address (str): Path to a directory 

247 config_name (str, optional): Name of yaml model file. 

248 Defaults to 'model.yml' 

249 """ 

250 if not os.path.exists(address): 

251 os.mkdir(address) 

252 nodes = {} 

253 

254 if compress: 

255 file_type = "csv.gz" 

256 else: 

257 file_type = "csv" 

258 for node in self.nodes.values(): 

259 init_args = self.get_init_args(node.__class__) 

260 special_args = set(["surfaces", "parent", "data_input_dict"]) 

261 

262 node_props = { 

263 x: getattr(node, x) for x in set(init_args).difference(special_args) 

264 } 

265 node_props["type_"] = node.__class__.__name__ 

266 node_props["node_type_override"] = ( 

267 repr(node.__class__).split(".")[-1].replace("'>", "") 

268 ) 

269 

270 if "surfaces" in init_args: 

271 surfaces = {} 

272 for surface in node.surfaces: 

273 surface_args = self.get_init_args(surface.__class__) 

274 surface_props = { 

275 x: getattr(surface, x) 

276 for x in set(surface_args).difference(special_args) 

277 } 

278 surface_props["type_"] = surface.__class__.__name__ 

279 

280 # Exceptions... 

281 # TODO I need a better way to do this 

282 del surface_props["capacity"] 

283 if set(["rooting_depth", "pore_depth"]).intersection(surface_args): 

284 del surface_props["depth"] 

285 if "data_input_dict" in surface_args: 

286 if surface.data_input_dict: 

287 filename = ( 

288 "{0}-{1}-inputs.{2}".format( 

289 node.name, surface.surface, file_type 

290 ) 

291 .replace("(", "_") 

292 .replace(")", "_") 

293 .replace("/", "_") 

294 .replace(" ", "_") 

295 ) 

296 write_csv( 

297 surface.data_input_dict, 

298 {"node": node.name, "surface": surface.surface}, 

299 os.path.join(address, filename), 

300 compress=compress, 

301 ) 

302 surface_props["filename"] = filename 

303 surfaces[surface_props["surface"]] = surface_props 

304 node_props["surfaces"] = surfaces 

305 

306 if "data_input_dict" in init_args: 

307 if node.data_input_dict: 

308 filename = "{0}-inputs.{1}".format(node.name, file_type) 

309 write_csv( 

310 node.data_input_dict, 

311 {"node": node.name}, 

312 os.path.join(address, filename), 

313 compress=compress, 

314 ) 

315 node_props["filename"] = filename 

316 

317 nodes[node.name] = node_props 

318 

319 arcs = {} 

320 for arc in self.arcs.values(): 

321 init_args = self.get_init_args(arc.__class__) 

322 special_args = set(["in_port", "out_port"]) 

323 arc_props = { 

324 x: getattr(arc, x) for x in set(init_args).difference(special_args) 

325 } 

326 arc_props["type_"] = arc.__class__.__name__ 

327 arc_props["in_port"] = arc.in_port.name 

328 arc_props["out_port"] = arc.out_port.name 

329 arcs[arc.name] = arc_props 

330 

331 data = { 

332 "nodes": nodes, 

333 "arcs": arcs, 

334 "orchestration": self.orchestration, 

335 "pollutants": constants.POLLUTANTS, 

336 "additive_pollutants": constants.ADDITIVE_POLLUTANTS, 

337 "non_additive_pollutants": constants.NON_ADDITIVE_POLLUTANTS, 

338 "float_accuracy": constants.FLOAT_ACCURACY, 

339 "extensions": self.extensions, 

340 "river_discharge_order": self.river_discharge_order, 

341 } 

342 if hasattr(self, "dates"): 

343 data["dates"] = [str(x) for x in self.dates] 

344 

345 def coerce_value(value): 

346 """ 

347 

348 Args: 

349 value: 

350 

351 Returns: 

352 

353 """ 

354 conversion_options = { 

355 "__float__": float, 

356 "__iter__": list, 

357 "__int__": int, 

358 "__str__": str, 

359 "__bool__": bool, 

360 } 

361 converted = False 

362 for property, func in conversion_options.items(): 

363 if hasattr(value, property): 

364 try: 

365 yaml.safe_dump(func(value)) 

366 value = func(value) 

367 converted = True 

368 break 

369 except Exception: 

370 raise ValueError(f"Cannot dump: {value} of type {type(value)}") 

371 if not converted: 

372 raise ValueError(f"Cannot dump: {value} of type {type(value)}") 

373 

374 return value 

375 

376 def check_and_coerce_dict(data_dict): 

377 """ 

378 

379 Args: 

380 data_dict: 

381 """ 

382 for key, value in data_dict.items(): 

383 if isinstance(value, dict): 

384 check_and_coerce_dict(value) 

385 else: 

386 try: 

387 yaml.safe_dump(value) 

388 except yaml.representer.RepresenterError: 

389 if hasattr(value, "__iter__"): 

390 for idx, val in enumerate(value): 

391 if isinstance(val, dict): 

392 check_and_coerce_dict(val) 

393 else: 

394 value[idx] = coerce_value(val) 

395 data_dict[key] = coerce_value(value) 

396 

397 check_and_coerce_dict(data) 

398 

399 write_yaml(address, config_name, data) 

400 

401 def load_pickle(self, fid): 

402 """Load model object to a pickle file, including the model states. 

403 

404 Args: 

405 fid (str): File address to load the pickled model from 

406 

407 Returns: 

408 model (obj): loaded model 

409 

410 Example: 

411 >>> # Load and run your model 

412 >>> my_model.load(model_dir,config_name = 'config.yml') 

413 >>> _ = my_model.run() 

414 >>> 

415 >>> # Save it including its different states 

416 >>> my_model.save_pickle('model_at_end_of_run.pkl') 

417 >>> 

418 >>> # Load it at another time to resume the model from the end 

419 >>> # of the previous run 

420 >>> new_model = Model() 

421 >>> new_model = new_model.load_pickle('model_at_end_of_run.pkl') 

422 """ 

423 file = open(fid, "rb") 

424 return pickle.load(file) 

425 

426 def save_pickle(self, fid): 

427 """Save model object to a pickle file, including saving the model states. 

428 

429 Args: 

430 fid (str): File address to save the pickled model to 

431 

432 Returns: 

433 message (str): Exit message of pickle dump 

434 """ 

435 file = open(fid, "wb") 

436 pickle.dump(self, file) 

437 return file.close() 

438 

439 def add_nodes(self, nodelist): 

440 """Add nodes to the model object from a list of dicts, where each dict contains 

441 all of the parameters for a node. Intended to be called before add_arcs. 

442 

443 Args: 

444 nodelist (list): List of dicts, where a dict is a node 

445 """ 

446 

447 for data in nodelist: 

448 name = data["name"] 

449 type_ = data["type_"] 

450 if "node_type_override" in data.keys(): 

451 node_type = data["node_type_override"] 

452 del data["node_type_override"] 

453 else: 

454 node_type = type_ 

455 if "foul" in name: 

456 # Absolute hack to enable foul sewers to be treated separate from storm 

457 type_ = "Foul" 

458 if "geometry" in data.keys(): 

459 del data["geometry"] 

460 del data["type_"] 

461 

462 if node_type not in NODES_REGISTRY.keys(): 

463 raise ValueError(f"Node type {node_type} not recognised") 

464 

465 if type_ not in self.nodes_type.keys(): 

466 self.nodes_type[type_] = {} 

467 

468 self.nodes_type[type_][name] = NODES_REGISTRY[node_type](**dict(data)) 

469 self.nodes[name] = self.nodes_type[type_][name] 

470 self.nodelist = [x for x in self.nodes.values()] 

471 

472 def add_instantiated_nodes(self, nodelist): 

473 """Add nodes to the model object from a list of objects, where each object is an 

474 already instantiated node object. Intended to be called before add_arcs. 

475 

476 Args: 

477 nodelist (list): list of objects that are nodes 

478 """ 

479 self.nodelist = nodelist 

480 self.nodes = {x.name: x for x in nodelist} 

481 for x in nodelist: 

482 type_ = x.__class__.__name__ 

483 if type_ not in self.nodes_type.keys(): 

484 self.nodes_type[type_] = {} 

485 self.nodes_type[type_][x.name] = x 

486 

487 def add_arcs(self, arclist): 

488 """Add nodes to the model object from a list of dicts, where each dict contains 

489 all of the parameters for an arc. 

490 

491 Args: 

492 arclist (list): list of dicts, where a dict is an arc 

493 """ 

494 river_arcs = {} 

495 for arc in arclist: 

496 name = arc["name"] 

497 type_ = arc["type_"] 

498 del arc["type_"] 

499 arc["in_port"] = self.nodes[arc["in_port"]] 

500 arc["out_port"] = self.nodes[arc["out_port"]] 

501 self.arcs[name] = getattr(arcs_mod, type_)(**dict(arc)) 

502 

503 if arc["in_port"].__class__.__name__ in [ 

504 "River", 

505 "Node", 

506 "Waste", 

507 "Reservoir", 

508 ]: 

509 if arc["out_port"].__class__.__name__ in [ 

510 "River", 

511 "Node", 

512 "Waste", 

513 "Reservoir", 

514 ]: 

515 river_arcs[name] = self.arcs[name] 

516 

517 self.river_discharge_order = [] 

518 if not any(river_arcs): 

519 return 

520 upstreamness = ( 

521 {x: 0 for x in self.nodes_type["Waste"].keys()} 

522 if "Waste" in self.nodes_type 

523 else {} 

524 ) 

525 upstreamness = self.assign_upstream(river_arcs, upstreamness) 

526 

527 if "River" in self.nodes_type: 

528 for node in sorted( 

529 upstreamness.items(), key=lambda item: item[1], reverse=True 

530 ): 

531 if node[0] in self.nodes_type["River"]: 

532 self.river_discharge_order.append(node[0]) 

533 

534 def add_instantiated_arcs(self, arclist): 

535 """Add arcs to the model object from a list of objects, where each object is an 

536 already instantiated arc object. 

537 

538 Args: 

539 arclist (list): list of objects that are arcs. 

540 """ 

541 self.arclist = arclist 

542 self.arcs = {x.name: x for x in arclist} 

543 river_arcs = {} 

544 for arc in arclist: 

545 if arc.in_port.__class__.__name__ in [ 

546 "River", 

547 "Node", 

548 "Waste", 

549 "Reservoir", 

550 ]: 

551 if arc.out_port.__class__.__name__ in [ 

552 "River", 

553 "Node", 

554 "Waste", 

555 "Reservoir", 

556 ]: 

557 river_arcs[arc.name] = arc 

558 if not any(river_arcs): 

559 return 

560 upstreamness = ( 

561 {x: 0 for x in self.nodes_type["Waste"].keys()} 

562 if "Waste" in self.nodes_type 

563 else {} 

564 ) 

565 upstreamness = {x: 0 for x in self.nodes_type["Waste"].keys()} 

566 

567 upstreamness = self.assign_upstream(river_arcs, upstreamness) 

568 

569 self.river_discharge_order = [] 

570 if "River" in self.nodes_type: 

571 for node in sorted( 

572 upstreamness.items(), key=lambda item: item[1], reverse=True 

573 ): 

574 if node[0] in self.nodes_type["River"]: 

575 self.river_discharge_order.append(node[0]) 

576 

577 def assign_upstream(self, arcs, upstreamness): 

578 """Recursive function to trace upstream up arcs to determine which are the most 

579 upstream. 

580 

581 Args: 

582 arcs (list): list of dicts where dicts are arcs 

583 upstreamness (dict): dictionary contain nodes in 

584 arcs as keys and a number representing upstreamness 

585 (higher numbers = more upstream) 

586 

587 Returns: 

588 upstreamness (dict): final version of upstreamness 

589 """ 

590 upstreamness_ = upstreamness.copy() 

591 in_nodes = [ 

592 x.in_port.name 

593 for x in arcs.values() 

594 if x.out_port.name in upstreamness.keys() 

595 ] 

596 ind = max(list(upstreamness_.values())) + 1 

597 in_nodes = list(set(in_nodes).difference(upstreamness.keys())) 

598 for node in in_nodes: 

599 upstreamness[node] = ind 

600 if upstreamness == upstreamness_: 

601 return upstreamness 

602 else: 

603 upstreamness = self.assign_upstream(arcs, upstreamness) 

604 return upstreamness 

605 

606 def add_overrides(self, config: dict): 

607 for node in config.get("nodes", {}).values(): 

608 type_ = node.pop("type_") 

609 name = node.pop("name") 

610 

611 if type_ not in self.nodes_type.keys(): 

612 raise ValueError(f"Node type {type_} not recognised") 

613 

614 if name not in self.nodes_type[type_].keys(): 

615 raise ValueError(f"Node {name} not recognised") 

616 

617 self.nodes_type[type_][name].apply_overrides(node) 

618 

619 for arc in config.get("arcs", {}).values(): 

620 name = arc.pop("name") 

621 type_ = arc.pop("type_") 

622 

623 if name not in self.arcs.keys(): 

624 raise ValueError(f"Arc {name} not recognised") 

625 

626 self.arcs[name].apply_overrides(arc) 

627 

628 def debug_node_mb(self): 

629 """Simple function that iterates over nodes calling their mass balance 

630 function.""" 

631 for node in self.nodelist: 

632 _ = node.node_mass_balance() 

633 

634 def default_settings(self): 

635 """Incomplete function that enables easy specification of results storage. 

636 

637 Returns: 

638 (dict): default settings 

639 """ 

640 return { 

641 "arcs": {"flows": True, "pollutants": True}, 

642 "tanks": {"storages": True, "pollutants": True}, 

643 "mass_balance": False, 

644 } 

645 

646 def change_runoff_coefficient(self, relative_change, nodes=None): 

647 """Clunky way to change the runoff coefficient of a land node. 

648 

649 Args: 

650 relative_change (float): amount that the impervious area in the land 

651 node is multiplied by (grass area is changed in compensation) 

652 nodes (list, optional): list of land nodes to change the parameters of. 

653 Defaults to None, which applies the change to all land nodes. 

654 """ 

655 # Multiplies impervious area by relative change and adjusts grassland 

656 # accordingly 

657 if nodes is None: 

658 nodes = self.nodes_type["Land"].values() 

659 

660 if isinstance(relative_change, float): 

661 relative_change = {x: relative_change for x in nodes} 

662 

663 for node in nodes: 

664 surface_dict = {x.surface: x for x in node.surfaces} 

665 if "Impervious" in surface_dict.keys(): 

666 impervious_area = surface_dict["Impervious"].area 

667 grass_area = surface_dict["Grass"].area 

668 

669 new_impervious_area = impervious_area * relative_change[node] 

670 new_grass_area = grass_area + (impervious_area - new_impervious_area) 

671 if new_grass_area < 0: 

672 print("not enough grass") 

673 break 

674 surface_dict["Impervious"].area = new_impervious_area 

675 surface_dict["Impervious"].capacity *= relative_change[node] 

676 

677 surface_dict["Grass"].area = new_grass_area 

678 surface_dict["Grass"].capacity *= new_grass_area / grass_area 

679 for pol in constants.ADDITIVE_POLLUTANTS + ["volume"]: 

680 surface_dict["Grass"].storage[pol] *= new_grass_area / grass_area 

681 for pool in surface_dict["Grass"].nutrient_pool.pools: 

682 for nutrient in pool.storage.keys(): 

683 pool.storage[nutrient] *= new_grass_area / grass_area 

684 

685 def run( 

686 self, 

687 dates=None, 

688 settings=None, 

689 record_arcs=None, 

690 record_tanks=None, 

691 record_surfaces=None, 

692 verbose=True, 

693 record_all=True, 

694 objectives=[], 

695 ): 

696 """Run the model object with the default orchestration. 

697 

698 Args: 

699 dates (list, optional): Dates to simulate. Defaults to None, which 

700 simulates all dates that the model has data for. 

701 settings (dict, optional): Dict to specify what results are stored, 

702 not currently used. Defaults to None. 

703 record_arcs (list, optional): List of arcs to store result for. 

704 Defaults to None. 

705 record_tanks (list, optional): List of nodes with water stores to 

706 store results for. Defaults to None. 

707 record_surfaces (list, optional): List of tuples of 

708 (land node, surface) to store results for. Defaults to None. 

709 verbose (bool, optional): Prints updates on simulation if true. 

710 Defaults to True. 

711 record_all (bool, optional): Specifies to store all results. 

712 Defaults to True. 

713 objectives (list, optional): A list of dicts with objectives to 

714 calculate (see examples). Defaults to []. 

715 

716 Returns: 

717 flows: simulated flows in a list of dicts 

718 tanks: simulated tanks storages in a list of dicts 

719 objective_results: list of values based on objectives list 

720 surfaces: simulated surface storages of land nodes in a list of dicts 

721 

722 Examples: 

723 # Run a model without storing any results but calculating objectives 

724 import statistics as stats 

725 objectives = [{'element_type' : 'flows', 

726 'name' : 'my_river', 

727 'function' : @ (x, _) stats.mean([y['phosphate'] for y in x]) 

728 }, 

729 {'element_type' : 'tanks', 

730 'name' : 'my_reservoir', 

731 'function' : @ (x, model) sum([y['storage'] < (model.nodes 

732 ['my_reservoir'].tank.capacity / 2) for y in x]) 

733 }] 

734 _, _, results, _ = my_model.run(record_all = False, objectives = objectives) 

735 """ 

736 if record_arcs is None: 

737 record_arcs = [] 

738 if record_all: 

739 record_arcs = list(self.arcs.keys()) 

740 if record_tanks is None: 

741 record_tanks = [] 

742 

743 if record_surfaces is None: 

744 record_surfaces = [] 

745 

746 if settings is None: 

747 settings = self.default_settings() 

748 

749 def blockPrint(): 

750 """ 

751 

752 Returns: 

753 

754 """ 

755 stdout = sys.stdout 

756 sys.stdout = open(os.devnull, "w") 

757 return stdout 

758 

759 def enablePrint(stdout): 

760 """ 

761 

762 Args: 

763 stdout: 

764 """ 

765 sys.stdout = stdout 

766 

767 if not verbose: 

768 stdout = blockPrint() 

769 if dates is None: 

770 dates = self.dates 

771 

772 for objective in objectives: 

773 if objective["element_type"] == "tanks": 

774 record_tanks.append(objective["name"]) 

775 elif objective["element_type"] == "flows": 

776 record_arcs.append(objective["name"]) 

777 elif objective["element_type"] == "surfaces": 

778 record_surfaces.append((objective["name"], objective["surface"])) 

779 else: 

780 print("element_type not recorded") 

781 

782 flows = [] 

783 tanks = [] 

784 surfaces = [] 

785 for date in tqdm(dates, disable=(not verbose)): 

786 # for date in dates: 

787 for node in self.nodelist: 

788 node.t = date 

789 node.monthyear = date.to_period("M") 

790 

791 # Iterate over orchestration 

792 for timestep_item in self.orchestration: 

793 for node_type, function in timestep_item.items(): 

794 for node in self.nodes_type.get(node_type, {}).values(): 

795 getattr(node, function)() 

796 

797 # river 

798 for node_name in self.river_discharge_order: 

799 self.nodes[node_name].distribute() 

800 

801 # mass balance checking 

802 # nodes/system 

803 sys_in = self.empty_vqip() 

804 sys_out = self.empty_vqip() 

805 sys_ds = self.empty_vqip() 

806 

807 # arcs 

808 for arc in self.arcs.values(): 

809 in_, ds_, out_ = arc.arc_mass_balance() 

810 for v in constants.ADDITIVE_POLLUTANTS + ["volume"]: 

811 sys_in[v] += in_[v] 

812 sys_out[v] += out_[v] 

813 sys_ds[v] += ds_[v] 

814 for node in self.nodelist: 

815 # print(node.name) 

816 in_, ds_, out_ = node.node_mass_balance() 

817 

818 # temp = {'name' : node.name, 

819 # 'time' : date} 

820 # for lab, dict_ in zip(['in','ds','out'], [in_, ds_, out_]): 

821 # for key, value in dict_.items(): 

822 # temp[(lab, key)] = value 

823 # node_mb.append(temp) 

824 

825 for v in constants.ADDITIVE_POLLUTANTS + ["volume"]: 

826 sys_in[v] += in_[v] 

827 sys_out[v] += out_[v] 

828 sys_ds[v] += ds_[v] 

829 

830 for v in constants.ADDITIVE_POLLUTANTS + ["volume"]: 

831 # Find the largest value of in_, out_, ds_ 

832 largest = max(sys_in[v], sys_in[v], sys_in[v]) 

833 

834 if largest > constants.FLOAT_ACCURACY: 

835 # Convert perform comparison in a magnitude to match the largest 

836 # value 

837 magnitude = 10 ** int(log10(largest)) 

838 in_10 = sys_in[v] / magnitude 

839 out_10 = sys_in[v] / magnitude 

840 ds_10 = sys_in[v] / magnitude 

841 else: 

842 in_10 = sys_in[v] 

843 ds_10 = sys_in[v] 

844 out_10 = sys_in[v] 

845 

846 if (in_10 - ds_10 - out_10) > constants.FLOAT_ACCURACY: 

847 print( 

848 "system mass balance error for " 

849 + v 

850 + " of " 

851 + str(sys_in[v] - sys_ds[v] - sys_out[v]) 

852 ) 

853 

854 # Store results 

855 for arc in record_arcs: 

856 arc = self.arcs[arc] 

857 flows.append( 

858 {"arc": arc.name, "flow": arc.vqip_out["volume"], "time": date} 

859 ) 

860 for pol in constants.POLLUTANTS: 

861 flows[-1][pol] = arc.vqip_out[pol] 

862 

863 for node in record_tanks: 

864 node = self.nodes[node] 

865 tanks.append( 

866 { 

867 "node": node.name, 

868 "storage": node.tank.storage["volume"], 

869 "time": date, 

870 } 

871 ) 

872 

873 for node, surface in record_surfaces: 

874 node = self.nodes[node] 

875 name = node.name 

876 surface = node.get_surface(surface) 

877 if not isinstance(surface, ImperviousSurface): 

878 surfaces.append( 

879 { 

880 "node": name, 

881 "surface": surface.surface, 

882 "percolation": surface.percolation["volume"], 

883 "subsurface_r": surface.subsurface_flow["volume"], 

884 "surface_r": surface.infiltration_excess["volume"], 

885 "storage": surface.storage["volume"], 

886 "evaporation": surface.evaporation["volume"], 

887 "precipitation": surface.precipitation["volume"], 

888 "tank_recharge": surface.tank_recharge, 

889 "capacity": surface.capacity, 

890 "time": date, 

891 "et0_coef": surface.et0_coefficient, 

892 # 'crop_factor' : surface.crop_factor 

893 } 

894 ) 

895 for pol in constants.POLLUTANTS: 

896 surfaces[-1][pol] = surface.storage[pol] 

897 else: 

898 surfaces.append( 

899 { 

900 "node": name, 

901 "surface": surface.surface, 

902 "storage": surface.storage["volume"], 

903 "evaporation": surface.evaporation["volume"], 

904 "precipitation": surface.precipitation["volume"], 

905 "capacity": surface.capacity, 

906 "time": date, 

907 } 

908 ) 

909 for pol in constants.POLLUTANTS: 

910 surfaces[-1][pol] = surface.storage[pol] 

911 if record_all: 

912 for node in self.nodes.values(): 

913 for prop_ in dir(node): 

914 prop = node.__getattribute__(prop_) 

915 if prop.__class__ in [QueueTank, Tank, ResidenceTank]: 

916 tanks.append( 

917 { 

918 "node": node.name, 

919 "time": date, 

920 "storage": prop.storage["volume"], 

921 "prop": prop_, 

922 } 

923 ) 

924 for pol in constants.POLLUTANTS: 

925 tanks[-1][pol] = prop.storage[pol] 

926 

927 for name, node in self.nodes_type.get("Land", {}).items(): 

928 for surface in node.surfaces: 

929 if not isinstance(surface, ImperviousSurface): 

930 surfaces.append( 

931 { 

932 "node": name, 

933 "surface": surface.surface, 

934 "percolation": surface.percolation["volume"], 

935 "subsurface_r": surface.subsurface_flow["volume"], 

936 "surface_r": surface.infiltration_excess["volume"], 

937 "storage": surface.storage["volume"], 

938 "evaporation": surface.evaporation["volume"], 

939 "precipitation": surface.precipitation["volume"], 

940 "tank_recharge": surface.tank_recharge, 

941 "capacity": surface.capacity, 

942 "time": date, 

943 "et0_coef": surface.et0_coefficient, 

944 # 'crop_factor' : surface.crop_factor 

945 } 

946 ) 

947 for pol in constants.POLLUTANTS: 

948 surfaces[-1][pol] = surface.storage[pol] 

949 else: 

950 surfaces.append( 

951 { 

952 "node": name, 

953 "surface": surface.surface, 

954 "storage": surface.storage["volume"], 

955 "evaporation": surface.evaporation["volume"], 

956 "precipitation": surface.precipitation["volume"], 

957 "capacity": surface.capacity, 

958 "time": date, 

959 } 

960 ) 

961 for pol in constants.POLLUTANTS: 

962 surfaces[-1][pol] = surface.storage[pol] 

963 

964 for node in self.nodes.values(): 

965 node.end_timestep() 

966 

967 for arc in self.arcs.values(): 

968 arc.end_timestep() 

969 objective_results = [] 

970 for objective in objectives: 

971 if objective["element_type"] == "tanks": 

972 val = objective["function"]( 

973 [x for x in tanks if x["node"] == objective["name"]], self 

974 ) 

975 elif objective["element_type"] == "flows": 

976 val = objective["function"]( 

977 [x for x in flows if x["arc"] == objective["name"]], self 

978 ) 

979 elif objective["element_type"] == "surfaces": 

980 val = objective["function"]( 

981 [ 

982 x 

983 for x in surfaces 

984 if (x["node"] == objective["name"]) 

985 & (x["surface"] == objective["surface"]) 

986 ], 

987 self, 

988 ) 

989 objective_results.append(val) 

990 if not verbose: 

991 enablePrint(stdout) 

992 return flows, tanks, objective_results, surfaces 

993 

994 def reinit(self): 

995 """Reinitialise by ending all node/arc timesteps and calling reinit function in 

996 all nodes (generally zero-ing their storage values).""" 

997 for node in self.nodes.values(): 

998 node.end_timestep() 

999 for prop in dir(node): 

1000 prop = node.__getattribute__(prop) 

1001 for prop_ in dir(prop): 

1002 if prop_ == "reinit": 

1003 prop_ = node.__getattribute__(prop_) 

1004 prop_() 

1005 

1006 for arc in self.arcs.values(): 

1007 arc.end_timestep() 

1008 

1009 

1010def write_yaml(address, config_name, data): 

1011 """ 

1012 

1013 Args: 

1014 address: 

1015 config_name: 

1016 data: 

1017 """ 

1018 with open(os.path.join(address, config_name), "w") as file: 

1019 yaml.dump( 

1020 data, 

1021 file, 

1022 default_flow_style=False, 

1023 sort_keys=False, 

1024 Dumper=yaml.SafeDumper, 

1025 ) 

1026 

1027 

1028def open_func(file_path, mode): 

1029 """ 

1030 

1031 Args: 

1032 file_path: 

1033 mode: 

1034 

1035 Returns: 

1036 

1037 """ 

1038 if mode == "rt" and file_path.endswith(".gz"): 

1039 return gzip.open(file_path, mode) 

1040 else: 

1041 return open(file_path, mode) 

1042 

1043 

1044def read_csv(file_path, delimiter=","): 

1045 """ 

1046 

1047 Args: 

1048 file_path: 

1049 delimiter: 

1050 

1051 Returns: 

1052 

1053 """ 

1054 with open_func(file_path, "rt") as f: 

1055 reader = csv.DictReader(f, delimiter=delimiter) 

1056 data = {} 

1057 for row in reader: 

1058 key = (row["variable"], to_datetime(row["time"])) 

1059 value = float(row["value"]) 

1060 data[key] = value 

1061 return data 

1062 

1063 

1064def write_csv(data, fixed_data={}, filename="", compress=False): 

1065 """ 

1066 

1067 Args: 

1068 data: 

1069 fixed_data: 

1070 filename: 

1071 compress: 

1072 """ 

1073 if compress: 

1074 open_func = gzip.open 

1075 mode = "wt" 

1076 else: 

1077 open_func = open 

1078 mode = "w" 

1079 with open_func(filename, mode, newline="") as csvfile: 

1080 writer = csv.writer(csvfile) 

1081 writer.writerow(list(fixed_data.keys()) + ["variable", "time", "value"]) 

1082 fixed_data_values = list(fixed_data.values()) 

1083 for key, value in data.items(): 

1084 writer.writerow(fixed_data_values + list(key) + [str(value)]) 

1085 

1086 

1087def flatten_dict(d, parent_key="", sep="-"): 

1088 """ 

1089 

1090 Args: 

1091 d: 

1092 parent_key: 

1093 sep: 

1094 

1095 Returns: 

1096 

1097 """ 

1098 # Initialize an empty dictionary 

1099 flat_dict = {} 

1100 # Loop through each key-value pair in the input dictionary 

1101 for k, v in d.items(): 

1102 # Construct a new key by appending the parent key and separator 

1103 new_key = str(parent_key) + sep + str(k) if parent_key else k 

1104 # If the value is another dictionary, call the function recursively 

1105 if isinstance(v, dict): 

1106 flat_dict.update(flatten_dict(v, new_key, sep)) 

1107 # Otherwise, add the key-value pair to the flat dictionary 

1108 else: 

1109 flat_dict[new_key] = v 

1110 # Return the flattened dictionary 

1111 return flat_dict 

1112 

1113 

1114def check_and_convert_string(value): 

1115 """ 

1116 

1117 Args: 

1118 value: 

1119 

1120 Returns: 

1121 

1122 """ 

1123 try: 

1124 return int(value) 

1125 except Exception: 

1126 try: 

1127 return float(value) 

1128 except Exception: 

1129 if value == "None": 

1130 return None 

1131 else: 

1132 return value 

1133 

1134 

1135def unflatten_dict(d, sep=":"): 

1136 """ 

1137 

1138 Args: 

1139 d: 

1140 sep: 

1141 

1142 Returns: 

1143 

1144 """ 

1145 result = {} 

1146 for k, v in d.items(): 

1147 keys = k.split(sep) 

1148 current = result 

1149 for key in keys[:-1]: 

1150 current = current.setdefault(key, {}) 

1151 current[keys[-1]] = v 

1152 return result 

1153 

1154 

1155def convert_keys(d): 

1156 """ 

1157 

1158 Args: 

1159 d: 

1160 

1161 Returns: 

1162 

1163 """ 

1164 # base case: if d is not a dict, return d 

1165 if not isinstance(d, dict): 

1166 return d 

1167 # recursive case: create a new dict with int keys and converted values 

1168 new_d = {} 

1169 for k, v in d.items(): 

1170 new_d[check_and_convert_string(k)] = convert_keys(v) 

1171 return new_d 

1172 

1173 

1174def csv2yaml(address, config_name="config_csv.yml", csv_folder_name="csv"): 

1175 """ 

1176 

1177 Args: 

1178 address: 

1179 config_name: 

1180 csv_folder_name: 

1181 """ 

1182 csv_path = os.path.join(address, csv_folder_name) 

1183 csv_list = [ 

1184 os.path.join(csv_path, f) 

1185 for f in os.listdir(csv_path) 

1186 if os.path.isfile(os.path.join(csv_path, f)) 

1187 ] 

1188 objs_type = {"nodes": {}, "arcs": {}} 

1189 for fid in csv_list: 

1190 with open(fid, "rt") as f: 

1191 if "Dates" in fid: 

1192 reader = csv.reader(f, delimiter=",") 

1193 dates = [] 

1194 for row in reader: 

1195 dates.append(row[0]) 

1196 objs_type["dates"] = dates[1:] 

1197 else: 

1198 reader = csv.DictReader(f, delimiter=",") 

1199 data = {} 

1200 for row in reader: 

1201 formatted_row = {} 

1202 for key, value in row.items(): 

1203 if value: 

1204 if ("[" in value) & ("]" in value): 

1205 # Convert lists 

1206 value = value.strip("[]") # Remove the brackets 

1207 value = value.replace("'", "") # Remove the string bits 

1208 value = value.split(", ") # Split by comma 

1209 value = [check_and_convert_string(x) for x in value] 

1210 else: 

1211 # Convert ints, floats and strings 

1212 value = check_and_convert_string(value) 

1213 

1214 # Convert key and store converted values 

1215 formatted_row[key] = value 

1216 if "Sim_params" not in fid: 

1217 label = formatted_row["label"] 

1218 del formatted_row["label"] 

1219 

1220 formatted_row = unflatten_dict(formatted_row) 

1221 formatted_row = convert_keys(formatted_row) 

1222 

1223 # Convert nested dicts dicts 

1224 data[row["name"]] = formatted_row 

1225 if "Sim_params" in fid: 

1226 objs_type = { 

1227 **objs_type, 

1228 **{x: y["value"] for x, y in data.items()}, 

1229 } 

1230 else: 

1231 objs_type[label] = {**objs_type[label], **data} 

1232 write_yaml(address, config_name, objs_type) 

1233 

1234 

1235def yaml2csv(address, config_name="config.yml", csv_folder_name="csv"): 

1236 """ 

1237 

1238 Args: 

1239 address: 

1240 config_name: 

1241 csv_folder_name: 

1242 """ 

1243 with open(os.path.join(address, config_name), "r") as file: 

1244 data = yaml.safe_load(file) 

1245 

1246 # Format to easy format to write to database 

1247 objs_type = {} 

1248 for objects, object_label in zip([data["nodes"], data["arcs"]], ["nodes", "arcs"]): 

1249 for key, value in objects.items(): 

1250 if isinstance(value, dict): 

1251 # Identify node type 

1252 if "node_type_override" in value.keys(): 

1253 type_ = value["node_type_override"] 

1254 elif "type_" in value.keys(): 

1255 type_ = value["type_"] 

1256 else: 

1257 type_ = False 

1258 

1259 if type_: 

1260 # Flatten dictionaries 

1261 new_dict = {} 

1262 if type_ not in objs_type.keys(): 

1263 objs_type[type_] = {} 

1264 

1265 for key_, value_ in value.items(): 

1266 if isinstance(value_, dict): 

1267 new_dict[key_] = flatten_dict(value_, key_, ":") 

1268 

1269 for key_, value_ in new_dict.items(): 

1270 del value[key_] 

1271 value = {**value, **value_} 

1272 value["label"] = object_label 

1273 objs_type[type_][key] = value 

1274 

1275 del data["nodes"] 

1276 del data["arcs"] 

1277 if "dates" in data.keys(): 

1278 objs_type["Dates"] = data["dates"] 

1279 del data["dates"] 

1280 

1281 objs_type["Sim_params"] = {x: {"name": x, "value": y} for x, y in data.items()} 

1282 

1283 csv_dir = os.path.join(address, csv_folder_name) 

1284 

1285 if not os.path.exists(csv_dir): 

1286 os.mkdir(csv_dir) 

1287 

1288 for key, value in objs_type.items(): 

1289 if key == "Sim_params": 

1290 fields = ["name", "value"] 

1291 elif key == "Dates": 

1292 fields = ["date"] 

1293 else: 

1294 fields = {} 

1295 for value_ in value.values(): 

1296 fields = {**fields, **value_} 

1297 

1298 del fields["name"] 

1299 fields = ["name"] + list(fields.keys()) 

1300 

1301 with open( 

1302 os.path.join(csv_dir, "{0}.csv".format(key)), "w", newline="" 

1303 ) as csvfile: 

1304 writer = csv.writer(csvfile) 

1305 writer.writerow(fields) 

1306 if key == "Dates": 

1307 for date in value: 

1308 writer.writerow([date]) 

1309 else: 

1310 for key_, value_ in value.items(): 

1311 writer.writerow( 

1312 [str(value_[x]) if x in value_.keys() else None for x in fields] 

1313 ) 

1314 

1315 

1316def load_extension_files(files: list[str]) -> None: 

1317 """Load extension files from a list of files. 

1318 

1319 Args: 

1320 files (list[str]): List of file paths to load 

1321 

1322 Raises: 

1323 ValueError: If file is not a .py file 

1324 FileNotFoundError: If file does not exist 

1325 """ 

1326 import importlib 

1327 from pathlib import Path 

1328 

1329 for file in files: 

1330 if not file.endswith(".py"): 

1331 raise ValueError(f"Only .py files are supported. Invalid file: {file}") 

1332 if not Path(file).exists(): 

1333 raise FileNotFoundError(f"File {file} does not exist") 

1334 

1335 spec = importlib.util.spec_from_file_location("module.name", file) 

1336 module = importlib.util.module_from_spec(spec) 

1337 spec.loader.exec_module(module)