Coverage for wsimod\orchestration\model.py: 26%
610 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-30 14:55 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-30 14:55 +0000
1# -*- coding: utf-8 -*-
2"""Created on Mon Jul 4 16:01:48 2022.
4@author: bdobson
5"""
6import csv
7import gzip
8import inspect
9import os
10import sys
11from datetime import datetime
12from math import log10
14import dill as pickle
15import yaml
16from tqdm import tqdm
18from wsimod.arcs import arcs as arcs_mod
19from wsimod.core import constants
20from wsimod.core.core import WSIObj
21from wsimod.nodes.land import ImperviousSurface
22from wsimod.nodes.nodes import NODES_REGISTRY
23from wsimod.nodes.tanks import QueueTank, ResidenceTank, Tank
25os.environ["USE_PYGEOS"] = "0"
28class to_datetime:
29 """"""
31 # TODO document and make better
32 def __init__(self, date_string):
33 """Simple datetime wrapper that has key properties used in WSIMOD components.
35 Args:
36 date_string (str): A string containing the date, expected in
37 format %Y-%m-%d or %Y-%m.
38 """
39 self._date = self._parse_date(date_string)
41 def __str__(self):
42 return self._date.strftime("%Y-%m-%d")
44 def __repr__(self):
45 return self._date.strftime("%Y-%m-%d")
47 @property
48 def dayofyear(self):
49 """
51 Returns:
53 """
54 return self._date.timetuple().tm_yday
56 @property
57 def day(self):
58 """
60 Returns:
62 """
63 return self._date.day
65 @property
66 def year(self):
67 """
69 Returns:
71 """
72 return self._date.year
74 @property
75 def month(self):
76 """
78 Returns:
80 """
81 return self._date.month
83 def to_period(self, args="M"):
84 """
86 Args:
87 args:
89 Returns:
91 """
92 return to_datetime(f"{self._date.year}-{str(self._date.month).zfill(2)}")
94 def is_leap_year(self):
95 """
97 Returns:
99 """
100 year = self._date.year
101 return year % 4 == 0 and (year % 100 != 0 or year % 400 == 0)
103 def _parse_date(self, date_string, date_format="%Y-%m-%d %H:%M:%S"):
104 try:
105 return datetime.strptime(date_string, date_format)
106 except ValueError:
107 try:
108 return datetime.strptime(date_string, "%Y-%m-%d")
109 except ValueError:
110 try:
111 # Check if valid 'YYYY-MM' format
112 if len(date_string.split("-")[0]) == 4:
113 int(date_string.split("-")[0])
114 if len(date_string.split("-")[1]) == 2:
115 int(date_string.split("-")[1])
116 return date_string
117 except ValueError:
118 raise ValueError
120 def __eq__(self, other):
121 if isinstance(other, to_datetime):
122 return self._date == other._date
123 return False
125 def __hash__(self):
126 return hash(self._date)
129class Model(WSIObj):
130 """"""
132 def __init__(self):
133 """Object to contain nodes and arcs that provides a default orchestration.
135 Returns:
136 Model: An empty model object
137 """
138 super().__init__()
139 self.arcs = {}
140 # self.arcs_type = {} #not sure that this would be necessary
141 self.nodes = {}
142 self.nodes_type = {}
143 self.extensions = []
144 self.river_discharge_order = []
146 # Default orchestration
147 self.orchestration = [
148 {"FWTW": "treat_water"},
149 {"Demand": "create_demand"},
150 {"Land": "run"},
151 {"Groundwater": "infiltrate"},
152 {"Sewer": "make_discharge"},
153 {"Foul": "make_discharge"},
154 {"WWTW": "calculate_discharge"},
155 {"Groundwater": "distribute"},
156 {"River": "calculate_discharge"},
157 {"Reservoir": "make_abstractions"},
158 {"Land": "apply_irrigation"},
159 {"WWTW": "make_discharge"},
160 {"Catchment": "route"},
161 ]
163 def get_init_args(self, cls):
164 """Get the arguments of the __init__ method for a class and its superclasses."""
165 init_args = []
166 for c in cls.__mro__:
167 # Get the arguments of the __init__ method
168 args = inspect.getfullargspec(c.__init__).args[1:]
169 init_args.extend(args)
170 return init_args
172 def load(self, address, config_name="config.yml", overrides={}):
173 """
175 Args:
176 address:
177 config_name:
178 overrides:
179 """
180 from ..extensions import apply_patches
182 with open(os.path.join(address, config_name), "r") as file:
183 data: dict = yaml.safe_load(file)
185 for key, item in overrides.items():
186 data[key] = item
188 constants.POLLUTANTS = data.get("pollutants", constants.POLLUTANTS)
189 constants.ADDITIVE_POLLUTANTS = data.get(
190 "additive_pollutants", constants.ADDITIVE_POLLUTANTS
191 )
192 constants.NON_ADDITIVE_POLLUTANTS = data.get(
193 "non_additive_pollutants", constants.NON_ADDITIVE_POLLUTANTS
194 )
195 constants.FLOAT_ACCURACY = float(
196 data.get("float_accuracy", constants.FLOAT_ACCURACY)
197 )
198 self.__dict__.update(Model().__dict__)
200 """
201 FLAG:
202 E.G. ADDITION FOR NEW ORCHESTRATION
203 """
204 load_extension_files(data.get("extensions", []))
205 self.extensions = data.get("extensions", [])
207 if "orchestration" in data.keys():
208 # Update orchestration
209 self.orchestration = data["orchestration"]
211 if "nodes" not in data.keys():
212 raise ValueError("No nodes found in the config")
214 nodes = data["nodes"]
216 for name, node in nodes.items():
217 if "filename" in node.keys():
218 node["data_input_dict"] = read_csv(
219 os.path.join(address, node["filename"])
220 )
221 del node["filename"]
222 if "surfaces" in node.keys():
223 for key, surface in node["surfaces"].items():
224 if "filename" in surface.keys():
225 node["surfaces"][key]["data_input_dict"] = read_csv(
226 os.path.join(address, surface["filename"])
227 )
228 del surface["filename"]
229 node["surfaces"] = list(node["surfaces"].values())
230 arcs = data.get("arcs", {})
231 self.add_nodes(list(nodes.values()))
232 self.add_arcs(list(arcs.values()))
234 self.add_overrides(data.get("overrides", {}))
236 if "dates" in data.keys():
237 self.dates = [to_datetime(x) for x in data["dates"]]
239 apply_patches(self)
241 def save(self, address, config_name="config.yml", compress=False):
242 """Save the model object to a yaml file and input data to csv.gz format in the
243 directory specified.
245 Args:
246 address (str): Path to a directory
247 config_name (str, optional): Name of yaml model file.
248 Defaults to 'model.yml'
249 """
250 if not os.path.exists(address):
251 os.mkdir(address)
252 nodes = {}
254 if compress:
255 file_type = "csv.gz"
256 else:
257 file_type = "csv"
258 for node in self.nodes.values():
259 init_args = self.get_init_args(node.__class__)
260 special_args = set(["surfaces", "parent", "data_input_dict"])
262 node_props = {
263 x: getattr(node, x) for x in set(init_args).difference(special_args)
264 }
265 node_props["type_"] = node.__class__.__name__
266 node_props["node_type_override"] = (
267 repr(node.__class__).split(".")[-1].replace("'>", "")
268 )
270 if "surfaces" in init_args:
271 surfaces = {}
272 for surface in node.surfaces:
273 surface_args = self.get_init_args(surface.__class__)
274 surface_props = {
275 x: getattr(surface, x)
276 for x in set(surface_args).difference(special_args)
277 }
278 surface_props["type_"] = surface.__class__.__name__
280 # Exceptions...
281 # TODO I need a better way to do this
282 del surface_props["capacity"]
283 if set(["rooting_depth", "pore_depth"]).intersection(surface_args):
284 del surface_props["depth"]
285 if "data_input_dict" in surface_args:
286 if surface.data_input_dict:
287 filename = (
288 "{0}-{1}-inputs.{2}".format(
289 node.name, surface.surface, file_type
290 )
291 .replace("(", "_")
292 .replace(")", "_")
293 .replace("/", "_")
294 .replace(" ", "_")
295 )
296 write_csv(
297 surface.data_input_dict,
298 {"node": node.name, "surface": surface.surface},
299 os.path.join(address, filename),
300 compress=compress,
301 )
302 surface_props["filename"] = filename
303 surfaces[surface_props["surface"]] = surface_props
304 node_props["surfaces"] = surfaces
306 if "data_input_dict" in init_args:
307 if node.data_input_dict:
308 filename = "{0}-inputs.{1}".format(node.name, file_type)
309 write_csv(
310 node.data_input_dict,
311 {"node": node.name},
312 os.path.join(address, filename),
313 compress=compress,
314 )
315 node_props["filename"] = filename
317 nodes[node.name] = node_props
319 arcs = {}
320 for arc in self.arcs.values():
321 init_args = self.get_init_args(arc.__class__)
322 special_args = set(["in_port", "out_port"])
323 arc_props = {
324 x: getattr(arc, x) for x in set(init_args).difference(special_args)
325 }
326 arc_props["type_"] = arc.__class__.__name__
327 arc_props["in_port"] = arc.in_port.name
328 arc_props["out_port"] = arc.out_port.name
329 arcs[arc.name] = arc_props
331 data = {
332 "nodes": nodes,
333 "arcs": arcs,
334 "orchestration": self.orchestration,
335 "pollutants": constants.POLLUTANTS,
336 "additive_pollutants": constants.ADDITIVE_POLLUTANTS,
337 "non_additive_pollutants": constants.NON_ADDITIVE_POLLUTANTS,
338 "float_accuracy": constants.FLOAT_ACCURACY,
339 "extensions": self.extensions,
340 "river_discharge_order": self.river_discharge_order,
341 }
342 if hasattr(self, "dates"):
343 data["dates"] = [str(x) for x in self.dates]
345 def coerce_value(value):
346 """
348 Args:
349 value:
351 Returns:
353 """
354 conversion_options = {
355 "__float__": float,
356 "__iter__": list,
357 "__int__": int,
358 "__str__": str,
359 "__bool__": bool,
360 }
361 converted = False
362 for property, func in conversion_options.items():
363 if hasattr(value, property):
364 try:
365 yaml.safe_dump(func(value))
366 value = func(value)
367 converted = True
368 break
369 except Exception:
370 raise ValueError(f"Cannot dump: {value} of type {type(value)}")
371 if not converted:
372 raise ValueError(f"Cannot dump: {value} of type {type(value)}")
374 return value
376 def check_and_coerce_dict(data_dict):
377 """
379 Args:
380 data_dict:
381 """
382 for key, value in data_dict.items():
383 if isinstance(value, dict):
384 check_and_coerce_dict(value)
385 else:
386 try:
387 yaml.safe_dump(value)
388 except yaml.representer.RepresenterError:
389 if hasattr(value, "__iter__"):
390 for idx, val in enumerate(value):
391 if isinstance(val, dict):
392 check_and_coerce_dict(val)
393 else:
394 value[idx] = coerce_value(val)
395 data_dict[key] = coerce_value(value)
397 check_and_coerce_dict(data)
399 write_yaml(address, config_name, data)
401 def load_pickle(self, fid):
402 """Load model object to a pickle file, including the model states.
404 Args:
405 fid (str): File address to load the pickled model from
407 Returns:
408 model (obj): loaded model
410 Example:
411 >>> # Load and run your model
412 >>> my_model.load(model_dir,config_name = 'config.yml')
413 >>> _ = my_model.run()
414 >>>
415 >>> # Save it including its different states
416 >>> my_model.save_pickle('model_at_end_of_run.pkl')
417 >>>
418 >>> # Load it at another time to resume the model from the end
419 >>> # of the previous run
420 >>> new_model = Model()
421 >>> new_model = new_model.load_pickle('model_at_end_of_run.pkl')
422 """
423 file = open(fid, "rb")
424 return pickle.load(file)
426 def save_pickle(self, fid):
427 """Save model object to a pickle file, including saving the model states.
429 Args:
430 fid (str): File address to save the pickled model to
432 Returns:
433 message (str): Exit message of pickle dump
434 """
435 file = open(fid, "wb")
436 pickle.dump(self, file)
437 return file.close()
439 def add_nodes(self, nodelist):
440 """Add nodes to the model object from a list of dicts, where each dict contains
441 all of the parameters for a node. Intended to be called before add_arcs.
443 Args:
444 nodelist (list): List of dicts, where a dict is a node
445 """
447 for data in nodelist:
448 name = data["name"]
449 type_ = data["type_"]
450 if "node_type_override" in data.keys():
451 node_type = data["node_type_override"]
452 del data["node_type_override"]
453 else:
454 node_type = type_
455 if "foul" in name:
456 # Absolute hack to enable foul sewers to be treated separate from storm
457 type_ = "Foul"
458 if "geometry" in data.keys():
459 del data["geometry"]
460 del data["type_"]
462 if node_type not in NODES_REGISTRY.keys():
463 raise ValueError(f"Node type {node_type} not recognised")
465 if type_ not in self.nodes_type.keys():
466 self.nodes_type[type_] = {}
468 self.nodes_type[type_][name] = NODES_REGISTRY[node_type](**dict(data))
469 self.nodes[name] = self.nodes_type[type_][name]
470 self.nodelist = [x for x in self.nodes.values()]
472 def add_instantiated_nodes(self, nodelist):
473 """Add nodes to the model object from a list of objects, where each object is an
474 already instantiated node object. Intended to be called before add_arcs.
476 Args:
477 nodelist (list): list of objects that are nodes
478 """
479 self.nodelist = nodelist
480 self.nodes = {x.name: x for x in nodelist}
481 for x in nodelist:
482 type_ = x.__class__.__name__
483 if type_ not in self.nodes_type.keys():
484 self.nodes_type[type_] = {}
485 self.nodes_type[type_][x.name] = x
487 def add_arcs(self, arclist):
488 """Add nodes to the model object from a list of dicts, where each dict contains
489 all of the parameters for an arc.
491 Args:
492 arclist (list): list of dicts, where a dict is an arc
493 """
494 river_arcs = {}
495 for arc in arclist:
496 name = arc["name"]
497 type_ = arc["type_"]
498 del arc["type_"]
499 arc["in_port"] = self.nodes[arc["in_port"]]
500 arc["out_port"] = self.nodes[arc["out_port"]]
501 self.arcs[name] = getattr(arcs_mod, type_)(**dict(arc))
503 if arc["in_port"].__class__.__name__ in [
504 "River",
505 "Node",
506 "Waste",
507 "Reservoir",
508 ]:
509 if arc["out_port"].__class__.__name__ in [
510 "River",
511 "Node",
512 "Waste",
513 "Reservoir",
514 ]:
515 river_arcs[name] = self.arcs[name]
517 self.river_discharge_order = []
518 if not any(river_arcs):
519 return
520 upstreamness = (
521 {x: 0 for x in self.nodes_type["Waste"].keys()}
522 if "Waste" in self.nodes_type
523 else {}
524 )
525 upstreamness = self.assign_upstream(river_arcs, upstreamness)
527 if "River" in self.nodes_type:
528 for node in sorted(
529 upstreamness.items(), key=lambda item: item[1], reverse=True
530 ):
531 if node[0] in self.nodes_type["River"]:
532 self.river_discharge_order.append(node[0])
534 def add_instantiated_arcs(self, arclist):
535 """Add arcs to the model object from a list of objects, where each object is an
536 already instantiated arc object.
538 Args:
539 arclist (list): list of objects that are arcs.
540 """
541 self.arclist = arclist
542 self.arcs = {x.name: x for x in arclist}
543 river_arcs = {}
544 for arc in arclist:
545 if arc.in_port.__class__.__name__ in [
546 "River",
547 "Node",
548 "Waste",
549 "Reservoir",
550 ]:
551 if arc.out_port.__class__.__name__ in [
552 "River",
553 "Node",
554 "Waste",
555 "Reservoir",
556 ]:
557 river_arcs[arc.name] = arc
558 if not any(river_arcs):
559 return
560 upstreamness = (
561 {x: 0 for x in self.nodes_type["Waste"].keys()}
562 if "Waste" in self.nodes_type
563 else {}
564 )
565 upstreamness = {x: 0 for x in self.nodes_type["Waste"].keys()}
567 upstreamness = self.assign_upstream(river_arcs, upstreamness)
569 self.river_discharge_order = []
570 if "River" in self.nodes_type:
571 for node in sorted(
572 upstreamness.items(), key=lambda item: item[1], reverse=True
573 ):
574 if node[0] in self.nodes_type["River"]:
575 self.river_discharge_order.append(node[0])
577 def assign_upstream(self, arcs, upstreamness):
578 """Recursive function to trace upstream up arcs to determine which are the most
579 upstream.
581 Args:
582 arcs (list): list of dicts where dicts are arcs
583 upstreamness (dict): dictionary contain nodes in
584 arcs as keys and a number representing upstreamness
585 (higher numbers = more upstream)
587 Returns:
588 upstreamness (dict): final version of upstreamness
589 """
590 upstreamness_ = upstreamness.copy()
591 in_nodes = [
592 x.in_port.name
593 for x in arcs.values()
594 if x.out_port.name in upstreamness.keys()
595 ]
596 ind = max(list(upstreamness_.values())) + 1
597 in_nodes = list(set(in_nodes).difference(upstreamness.keys()))
598 for node in in_nodes:
599 upstreamness[node] = ind
600 if upstreamness == upstreamness_:
601 return upstreamness
602 else:
603 upstreamness = self.assign_upstream(arcs, upstreamness)
604 return upstreamness
606 def add_overrides(self, config: dict):
607 for node in config.get("nodes", {}).values():
608 type_ = node.pop("type_")
609 name = node.pop("name")
611 if type_ not in self.nodes_type.keys():
612 raise ValueError(f"Node type {type_} not recognised")
614 if name not in self.nodes_type[type_].keys():
615 raise ValueError(f"Node {name} not recognised")
617 self.nodes_type[type_][name].apply_overrides(node)
619 for arc in config.get("arcs", {}).values():
620 name = arc.pop("name")
621 type_ = arc.pop("type_")
623 if name not in self.arcs.keys():
624 raise ValueError(f"Arc {name} not recognised")
626 self.arcs[name].apply_overrides(arc)
628 def debug_node_mb(self):
629 """Simple function that iterates over nodes calling their mass balance
630 function."""
631 for node in self.nodelist:
632 _ = node.node_mass_balance()
634 def default_settings(self):
635 """Incomplete function that enables easy specification of results storage.
637 Returns:
638 (dict): default settings
639 """
640 return {
641 "arcs": {"flows": True, "pollutants": True},
642 "tanks": {"storages": True, "pollutants": True},
643 "mass_balance": False,
644 }
646 def change_runoff_coefficient(self, relative_change, nodes=None):
647 """Clunky way to change the runoff coefficient of a land node.
649 Args:
650 relative_change (float): amount that the impervious area in the land
651 node is multiplied by (grass area is changed in compensation)
652 nodes (list, optional): list of land nodes to change the parameters of.
653 Defaults to None, which applies the change to all land nodes.
654 """
655 # Multiplies impervious area by relative change and adjusts grassland
656 # accordingly
657 if nodes is None:
658 nodes = self.nodes_type["Land"].values()
660 if isinstance(relative_change, float):
661 relative_change = {x: relative_change for x in nodes}
663 for node in nodes:
664 surface_dict = {x.surface: x for x in node.surfaces}
665 if "Impervious" in surface_dict.keys():
666 impervious_area = surface_dict["Impervious"].area
667 grass_area = surface_dict["Grass"].area
669 new_impervious_area = impervious_area * relative_change[node]
670 new_grass_area = grass_area + (impervious_area - new_impervious_area)
671 if new_grass_area < 0:
672 print("not enough grass")
673 break
674 surface_dict["Impervious"].area = new_impervious_area
675 surface_dict["Impervious"].capacity *= relative_change[node]
677 surface_dict["Grass"].area = new_grass_area
678 surface_dict["Grass"].capacity *= new_grass_area / grass_area
679 for pol in constants.ADDITIVE_POLLUTANTS + ["volume"]:
680 surface_dict["Grass"].storage[pol] *= new_grass_area / grass_area
681 for pool in surface_dict["Grass"].nutrient_pool.pools:
682 for nutrient in pool.storage.keys():
683 pool.storage[nutrient] *= new_grass_area / grass_area
685 def run(
686 self,
687 dates=None,
688 settings=None,
689 record_arcs=None,
690 record_tanks=None,
691 record_surfaces=None,
692 verbose=True,
693 record_all=True,
694 objectives=[],
695 ):
696 """Run the model object with the default orchestration.
698 Args:
699 dates (list, optional): Dates to simulate. Defaults to None, which
700 simulates all dates that the model has data for.
701 settings (dict, optional): Dict to specify what results are stored,
702 not currently used. Defaults to None.
703 record_arcs (list, optional): List of arcs to store result for.
704 Defaults to None.
705 record_tanks (list, optional): List of nodes with water stores to
706 store results for. Defaults to None.
707 record_surfaces (list, optional): List of tuples of
708 (land node, surface) to store results for. Defaults to None.
709 verbose (bool, optional): Prints updates on simulation if true.
710 Defaults to True.
711 record_all (bool, optional): Specifies to store all results.
712 Defaults to True.
713 objectives (list, optional): A list of dicts with objectives to
714 calculate (see examples). Defaults to [].
716 Returns:
717 flows: simulated flows in a list of dicts
718 tanks: simulated tanks storages in a list of dicts
719 objective_results: list of values based on objectives list
720 surfaces: simulated surface storages of land nodes in a list of dicts
722 Examples:
723 # Run a model without storing any results but calculating objectives
724 import statistics as stats
725 objectives = [{'element_type' : 'flows',
726 'name' : 'my_river',
727 'function' : @ (x, _) stats.mean([y['phosphate'] for y in x])
728 },
729 {'element_type' : 'tanks',
730 'name' : 'my_reservoir',
731 'function' : @ (x, model) sum([y['storage'] < (model.nodes
732 ['my_reservoir'].tank.capacity / 2) for y in x])
733 }]
734 _, _, results, _ = my_model.run(record_all = False, objectives = objectives)
735 """
736 if record_arcs is None:
737 record_arcs = []
738 if record_all:
739 record_arcs = list(self.arcs.keys())
740 if record_tanks is None:
741 record_tanks = []
743 if record_surfaces is None:
744 record_surfaces = []
746 if settings is None:
747 settings = self.default_settings()
749 def blockPrint():
750 """
752 Returns:
754 """
755 stdout = sys.stdout
756 sys.stdout = open(os.devnull, "w")
757 return stdout
759 def enablePrint(stdout):
760 """
762 Args:
763 stdout:
764 """
765 sys.stdout = stdout
767 if not verbose:
768 stdout = blockPrint()
769 if dates is None:
770 dates = self.dates
772 for objective in objectives:
773 if objective["element_type"] == "tanks":
774 record_tanks.append(objective["name"])
775 elif objective["element_type"] == "flows":
776 record_arcs.append(objective["name"])
777 elif objective["element_type"] == "surfaces":
778 record_surfaces.append((objective["name"], objective["surface"]))
779 else:
780 print("element_type not recorded")
782 flows = []
783 tanks = []
784 surfaces = []
785 for date in tqdm(dates, disable=(not verbose)):
786 # for date in dates:
787 for node in self.nodelist:
788 node.t = date
789 node.monthyear = date.to_period("M")
791 # Iterate over orchestration
792 for timestep_item in self.orchestration:
793 for node_type, function in timestep_item.items():
794 for node in self.nodes_type.get(node_type, {}).values():
795 getattr(node, function)()
797 # river
798 for node_name in self.river_discharge_order:
799 self.nodes[node_name].distribute()
801 # mass balance checking
802 # nodes/system
803 sys_in = self.empty_vqip()
804 sys_out = self.empty_vqip()
805 sys_ds = self.empty_vqip()
807 # arcs
808 for arc in self.arcs.values():
809 in_, ds_, out_ = arc.arc_mass_balance()
810 for v in constants.ADDITIVE_POLLUTANTS + ["volume"]:
811 sys_in[v] += in_[v]
812 sys_out[v] += out_[v]
813 sys_ds[v] += ds_[v]
814 for node in self.nodelist:
815 # print(node.name)
816 in_, ds_, out_ = node.node_mass_balance()
818 # temp = {'name' : node.name,
819 # 'time' : date}
820 # for lab, dict_ in zip(['in','ds','out'], [in_, ds_, out_]):
821 # for key, value in dict_.items():
822 # temp[(lab, key)] = value
823 # node_mb.append(temp)
825 for v in constants.ADDITIVE_POLLUTANTS + ["volume"]:
826 sys_in[v] += in_[v]
827 sys_out[v] += out_[v]
828 sys_ds[v] += ds_[v]
830 for v in constants.ADDITIVE_POLLUTANTS + ["volume"]:
831 # Find the largest value of in_, out_, ds_
832 largest = max(sys_in[v], sys_in[v], sys_in[v])
834 if largest > constants.FLOAT_ACCURACY:
835 # Convert perform comparison in a magnitude to match the largest
836 # value
837 magnitude = 10 ** int(log10(largest))
838 in_10 = sys_in[v] / magnitude
839 out_10 = sys_in[v] / magnitude
840 ds_10 = sys_in[v] / magnitude
841 else:
842 in_10 = sys_in[v]
843 ds_10 = sys_in[v]
844 out_10 = sys_in[v]
846 if (in_10 - ds_10 - out_10) > constants.FLOAT_ACCURACY:
847 print(
848 "system mass balance error for "
849 + v
850 + " of "
851 + str(sys_in[v] - sys_ds[v] - sys_out[v])
852 )
854 # Store results
855 for arc in record_arcs:
856 arc = self.arcs[arc]
857 flows.append(
858 {"arc": arc.name, "flow": arc.vqip_out["volume"], "time": date}
859 )
860 for pol in constants.POLLUTANTS:
861 flows[-1][pol] = arc.vqip_out[pol]
863 for node in record_tanks:
864 node = self.nodes[node]
865 tanks.append(
866 {
867 "node": node.name,
868 "storage": node.tank.storage["volume"],
869 "time": date,
870 }
871 )
873 for node, surface in record_surfaces:
874 node = self.nodes[node]
875 name = node.name
876 surface = node.get_surface(surface)
877 if not isinstance(surface, ImperviousSurface):
878 surfaces.append(
879 {
880 "node": name,
881 "surface": surface.surface,
882 "percolation": surface.percolation["volume"],
883 "subsurface_r": surface.subsurface_flow["volume"],
884 "surface_r": surface.infiltration_excess["volume"],
885 "storage": surface.storage["volume"],
886 "evaporation": surface.evaporation["volume"],
887 "precipitation": surface.precipitation["volume"],
888 "tank_recharge": surface.tank_recharge,
889 "capacity": surface.capacity,
890 "time": date,
891 "et0_coef": surface.et0_coefficient,
892 # 'crop_factor' : surface.crop_factor
893 }
894 )
895 for pol in constants.POLLUTANTS:
896 surfaces[-1][pol] = surface.storage[pol]
897 else:
898 surfaces.append(
899 {
900 "node": name,
901 "surface": surface.surface,
902 "storage": surface.storage["volume"],
903 "evaporation": surface.evaporation["volume"],
904 "precipitation": surface.precipitation["volume"],
905 "capacity": surface.capacity,
906 "time": date,
907 }
908 )
909 for pol in constants.POLLUTANTS:
910 surfaces[-1][pol] = surface.storage[pol]
911 if record_all:
912 for node in self.nodes.values():
913 for prop_ in dir(node):
914 prop = node.__getattribute__(prop_)
915 if prop.__class__ in [QueueTank, Tank, ResidenceTank]:
916 tanks.append(
917 {
918 "node": node.name,
919 "time": date,
920 "storage": prop.storage["volume"],
921 "prop": prop_,
922 }
923 )
924 for pol in constants.POLLUTANTS:
925 tanks[-1][pol] = prop.storage[pol]
927 for name, node in self.nodes_type.get("Land", {}).items():
928 for surface in node.surfaces:
929 if not isinstance(surface, ImperviousSurface):
930 surfaces.append(
931 {
932 "node": name,
933 "surface": surface.surface,
934 "percolation": surface.percolation["volume"],
935 "subsurface_r": surface.subsurface_flow["volume"],
936 "surface_r": surface.infiltration_excess["volume"],
937 "storage": surface.storage["volume"],
938 "evaporation": surface.evaporation["volume"],
939 "precipitation": surface.precipitation["volume"],
940 "tank_recharge": surface.tank_recharge,
941 "capacity": surface.capacity,
942 "time": date,
943 "et0_coef": surface.et0_coefficient,
944 # 'crop_factor' : surface.crop_factor
945 }
946 )
947 for pol in constants.POLLUTANTS:
948 surfaces[-1][pol] = surface.storage[pol]
949 else:
950 surfaces.append(
951 {
952 "node": name,
953 "surface": surface.surface,
954 "storage": surface.storage["volume"],
955 "evaporation": surface.evaporation["volume"],
956 "precipitation": surface.precipitation["volume"],
957 "capacity": surface.capacity,
958 "time": date,
959 }
960 )
961 for pol in constants.POLLUTANTS:
962 surfaces[-1][pol] = surface.storage[pol]
964 for node in self.nodes.values():
965 node.end_timestep()
967 for arc in self.arcs.values():
968 arc.end_timestep()
969 objective_results = []
970 for objective in objectives:
971 if objective["element_type"] == "tanks":
972 val = objective["function"](
973 [x for x in tanks if x["node"] == objective["name"]], self
974 )
975 elif objective["element_type"] == "flows":
976 val = objective["function"](
977 [x for x in flows if x["arc"] == objective["name"]], self
978 )
979 elif objective["element_type"] == "surfaces":
980 val = objective["function"](
981 [
982 x
983 for x in surfaces
984 if (x["node"] == objective["name"])
985 & (x["surface"] == objective["surface"])
986 ],
987 self,
988 )
989 objective_results.append(val)
990 if not verbose:
991 enablePrint(stdout)
992 return flows, tanks, objective_results, surfaces
994 def reinit(self):
995 """Reinitialise by ending all node/arc timesteps and calling reinit function in
996 all nodes (generally zero-ing their storage values)."""
997 for node in self.nodes.values():
998 node.end_timestep()
999 for prop in dir(node):
1000 prop = node.__getattribute__(prop)
1001 for prop_ in dir(prop):
1002 if prop_ == "reinit":
1003 prop_ = node.__getattribute__(prop_)
1004 prop_()
1006 for arc in self.arcs.values():
1007 arc.end_timestep()
1010def write_yaml(address, config_name, data):
1011 """
1013 Args:
1014 address:
1015 config_name:
1016 data:
1017 """
1018 with open(os.path.join(address, config_name), "w") as file:
1019 yaml.dump(
1020 data,
1021 file,
1022 default_flow_style=False,
1023 sort_keys=False,
1024 Dumper=yaml.SafeDumper,
1025 )
1028def open_func(file_path, mode):
1029 """
1031 Args:
1032 file_path:
1033 mode:
1035 Returns:
1037 """
1038 if mode == "rt" and file_path.endswith(".gz"):
1039 return gzip.open(file_path, mode)
1040 else:
1041 return open(file_path, mode)
1044def read_csv(file_path, delimiter=","):
1045 """
1047 Args:
1048 file_path:
1049 delimiter:
1051 Returns:
1053 """
1054 with open_func(file_path, "rt") as f:
1055 reader = csv.DictReader(f, delimiter=delimiter)
1056 data = {}
1057 for row in reader:
1058 key = (row["variable"], to_datetime(row["time"]))
1059 value = float(row["value"])
1060 data[key] = value
1061 return data
1064def write_csv(data, fixed_data={}, filename="", compress=False):
1065 """
1067 Args:
1068 data:
1069 fixed_data:
1070 filename:
1071 compress:
1072 """
1073 if compress:
1074 open_func = gzip.open
1075 mode = "wt"
1076 else:
1077 open_func = open
1078 mode = "w"
1079 with open_func(filename, mode, newline="") as csvfile:
1080 writer = csv.writer(csvfile)
1081 writer.writerow(list(fixed_data.keys()) + ["variable", "time", "value"])
1082 fixed_data_values = list(fixed_data.values())
1083 for key, value in data.items():
1084 writer.writerow(fixed_data_values + list(key) + [str(value)])
1087def flatten_dict(d, parent_key="", sep="-"):
1088 """
1090 Args:
1091 d:
1092 parent_key:
1093 sep:
1095 Returns:
1097 """
1098 # Initialize an empty dictionary
1099 flat_dict = {}
1100 # Loop through each key-value pair in the input dictionary
1101 for k, v in d.items():
1102 # Construct a new key by appending the parent key and separator
1103 new_key = str(parent_key) + sep + str(k) if parent_key else k
1104 # If the value is another dictionary, call the function recursively
1105 if isinstance(v, dict):
1106 flat_dict.update(flatten_dict(v, new_key, sep))
1107 # Otherwise, add the key-value pair to the flat dictionary
1108 else:
1109 flat_dict[new_key] = v
1110 # Return the flattened dictionary
1111 return flat_dict
1114def check_and_convert_string(value):
1115 """
1117 Args:
1118 value:
1120 Returns:
1122 """
1123 try:
1124 return int(value)
1125 except Exception:
1126 try:
1127 return float(value)
1128 except Exception:
1129 if value == "None":
1130 return None
1131 else:
1132 return value
1135def unflatten_dict(d, sep=":"):
1136 """
1138 Args:
1139 d:
1140 sep:
1142 Returns:
1144 """
1145 result = {}
1146 for k, v in d.items():
1147 keys = k.split(sep)
1148 current = result
1149 for key in keys[:-1]:
1150 current = current.setdefault(key, {})
1151 current[keys[-1]] = v
1152 return result
1155def convert_keys(d):
1156 """
1158 Args:
1159 d:
1161 Returns:
1163 """
1164 # base case: if d is not a dict, return d
1165 if not isinstance(d, dict):
1166 return d
1167 # recursive case: create a new dict with int keys and converted values
1168 new_d = {}
1169 for k, v in d.items():
1170 new_d[check_and_convert_string(k)] = convert_keys(v)
1171 return new_d
1174def csv2yaml(address, config_name="config_csv.yml", csv_folder_name="csv"):
1175 """
1177 Args:
1178 address:
1179 config_name:
1180 csv_folder_name:
1181 """
1182 csv_path = os.path.join(address, csv_folder_name)
1183 csv_list = [
1184 os.path.join(csv_path, f)
1185 for f in os.listdir(csv_path)
1186 if os.path.isfile(os.path.join(csv_path, f))
1187 ]
1188 objs_type = {"nodes": {}, "arcs": {}}
1189 for fid in csv_list:
1190 with open(fid, "rt") as f:
1191 if "Dates" in fid:
1192 reader = csv.reader(f, delimiter=",")
1193 dates = []
1194 for row in reader:
1195 dates.append(row[0])
1196 objs_type["dates"] = dates[1:]
1197 else:
1198 reader = csv.DictReader(f, delimiter=",")
1199 data = {}
1200 for row in reader:
1201 formatted_row = {}
1202 for key, value in row.items():
1203 if value:
1204 if ("[" in value) & ("]" in value):
1205 # Convert lists
1206 value = value.strip("[]") # Remove the brackets
1207 value = value.replace("'", "") # Remove the string bits
1208 value = value.split(", ") # Split by comma
1209 value = [check_and_convert_string(x) for x in value]
1210 else:
1211 # Convert ints, floats and strings
1212 value = check_and_convert_string(value)
1214 # Convert key and store converted values
1215 formatted_row[key] = value
1216 if "Sim_params" not in fid:
1217 label = formatted_row["label"]
1218 del formatted_row["label"]
1220 formatted_row = unflatten_dict(formatted_row)
1221 formatted_row = convert_keys(formatted_row)
1223 # Convert nested dicts dicts
1224 data[row["name"]] = formatted_row
1225 if "Sim_params" in fid:
1226 objs_type = {
1227 **objs_type,
1228 **{x: y["value"] for x, y in data.items()},
1229 }
1230 else:
1231 objs_type[label] = {**objs_type[label], **data}
1232 write_yaml(address, config_name, objs_type)
1235def yaml2csv(address, config_name="config.yml", csv_folder_name="csv"):
1236 """
1238 Args:
1239 address:
1240 config_name:
1241 csv_folder_name:
1242 """
1243 with open(os.path.join(address, config_name), "r") as file:
1244 data = yaml.safe_load(file)
1246 # Format to easy format to write to database
1247 objs_type = {}
1248 for objects, object_label in zip([data["nodes"], data["arcs"]], ["nodes", "arcs"]):
1249 for key, value in objects.items():
1250 if isinstance(value, dict):
1251 # Identify node type
1252 if "node_type_override" in value.keys():
1253 type_ = value["node_type_override"]
1254 elif "type_" in value.keys():
1255 type_ = value["type_"]
1256 else:
1257 type_ = False
1259 if type_:
1260 # Flatten dictionaries
1261 new_dict = {}
1262 if type_ not in objs_type.keys():
1263 objs_type[type_] = {}
1265 for key_, value_ in value.items():
1266 if isinstance(value_, dict):
1267 new_dict[key_] = flatten_dict(value_, key_, ":")
1269 for key_, value_ in new_dict.items():
1270 del value[key_]
1271 value = {**value, **value_}
1272 value["label"] = object_label
1273 objs_type[type_][key] = value
1275 del data["nodes"]
1276 del data["arcs"]
1277 if "dates" in data.keys():
1278 objs_type["Dates"] = data["dates"]
1279 del data["dates"]
1281 objs_type["Sim_params"] = {x: {"name": x, "value": y} for x, y in data.items()}
1283 csv_dir = os.path.join(address, csv_folder_name)
1285 if not os.path.exists(csv_dir):
1286 os.mkdir(csv_dir)
1288 for key, value in objs_type.items():
1289 if key == "Sim_params":
1290 fields = ["name", "value"]
1291 elif key == "Dates":
1292 fields = ["date"]
1293 else:
1294 fields = {}
1295 for value_ in value.values():
1296 fields = {**fields, **value_}
1298 del fields["name"]
1299 fields = ["name"] + list(fields.keys())
1301 with open(
1302 os.path.join(csv_dir, "{0}.csv".format(key)), "w", newline=""
1303 ) as csvfile:
1304 writer = csv.writer(csvfile)
1305 writer.writerow(fields)
1306 if key == "Dates":
1307 for date in value:
1308 writer.writerow([date])
1309 else:
1310 for key_, value_ in value.items():
1311 writer.writerow(
1312 [str(value_[x]) if x in value_.keys() else None for x in fields]
1313 )
1316def load_extension_files(files: list[str]) -> None:
1317 """Load extension files from a list of files.
1319 Args:
1320 files (list[str]): List of file paths to load
1322 Raises:
1323 ValueError: If file is not a .py file
1324 FileNotFoundError: If file does not exist
1325 """
1326 import importlib
1327 from pathlib import Path
1329 for file in files:
1330 if not file.endswith(".py"):
1331 raise ValueError(f"Only .py files are supported. Invalid file: {file}")
1332 if not Path(file).exists():
1333 raise FileNotFoundError(f"File {file} does not exist")
1335 spec = importlib.util.spec_from_file_location("module.name", file)
1336 module = importlib.util.module_from_spec(spec)
1337 spec.loader.exec_module(module)