Coverage for wsimod/orchestration/model.py: 11%
588 statements
« prev ^ index » next coverage.py v7.3.2, created at 2024-01-11 16:39 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2024-01-11 16:39 +0000
1# -*- coding: utf-8 -*-
2"""Created on Mon Jul 4 16:01:48 2022.
4@author: bdobson
5"""
6import csv
7import gzip
8import inspect
9import os
10import sys
11from datetime import datetime
12from math import log10
14import dill as pickle
15import yaml
16from tqdm import tqdm
18from wsimod import nodes
19from wsimod.arcs import arcs as arcs_mod
20from wsimod.core import constants
21from wsimod.core.core import WSIObj
22from wsimod.nodes.land import ImperviousSurface
23from wsimod.nodes.nodes import Node, QueueTank, ResidenceTank, Tank
25os.environ["USE_PYGEOS"] = "0"
28class to_datetime:
29 """"""
31 # TODO document and make better
32 def __init__(self, date_string):
33 """Simple datetime wrapper that has key properties used in WSIMOD components.
35 Args:
36 date_string (str): A string containing the date, expected in
37 format %Y-%m-%d or %Y-%m.
38 """
39 self._date = self._parse_date(date_string)
41 def __str__(self):
42 return self._date.strftime("%Y-%m-%d")
44 def __repr__(self):
45 return self._date.strftime("%Y-%m-%d")
47 @property
48 def dayofyear(self):
49 """
51 Returns:
53 """
54 return self._date.timetuple().tm_yday
56 @property
57 def day(self):
58 """
60 Returns:
62 """
63 return self._date.day
65 @property
66 def year(self):
67 """
69 Returns:
71 """
72 return self._date.year
74 @property
75 def month(self):
76 """
78 Returns:
80 """
81 return self._date.month
83 def to_period(self, args="M"):
84 """
86 Args:
87 args:
89 Returns:
91 """
92 return to_datetime(f"{self._date.year}-{str(self._date.month).zfill(2)}")
94 def is_leap_year(self):
95 """
97 Returns:
99 """
100 year = self._date.year
101 return year % 4 == 0 and (year % 100 != 0 or year % 400 == 0)
103 def _parse_date(self, date_string, date_format="%Y-%m-%d %H:%M:%S"):
104 try:
105 return datetime.strptime(date_string, date_format)
106 except ValueError:
107 try:
108 return datetime.strptime(date_string, "%Y-%m-%d")
109 except ValueError:
110 try:
111 # Check if valid 'YYYY-MM' format
112 if len(date_string.split("-")[0]) == 4:
113 int(date_string.split("-")[0])
114 if len(date_string.split("-")[1]) == 2:
115 int(date_string.split("-")[1])
116 return date_string
117 except ValueError:
118 raise ValueError
120 def __eq__(self, other):
121 if isinstance(other, to_datetime):
122 return self._date == other._date
123 return False
125 def __hash__(self):
126 return hash(self._date)
129class Model(WSIObj):
130 """"""
132 def __init__(self):
133 """Object to contain nodes and arcs that provides a default orchestration.
135 Returns:
136 Model: An empty model object
137 """
138 super().__init__()
139 self.arcs = {}
140 # self.arcs_type = {} #not sure that this would be necessary
141 self.nodes = {}
142 self.nodes_type = {}
144 def all_subclasses(cls):
145 """
147 Args:
148 cls:
150 Returns:
152 """
153 return set(cls.__subclasses__()).union(
154 [s for c in cls.__subclasses__() for s in all_subclasses(c)]
155 )
157 self.nodes_type = [x.__name__ for x in all_subclasses(Node)] + ["Node"]
158 self.nodes_type = set(
159 getattr(nodes, x)(name="").__class__.__name__ for x in self.nodes_type
160 ).union(["Foul"])
161 self.nodes_type = {x: {} for x in self.nodes_type}
163 def get_init_args(self, cls):
164 """Get the arguments of the __init__ method for a class and its superclasses."""
165 init_args = []
166 for c in cls.__mro__:
167 # Get the arguments of the __init__ method
168 args = inspect.getfullargspec(c.__init__).args[1:]
169 init_args.extend(args)
170 return init_args
172 def load(self, address, config_name="config.yml", overrides={}):
173 """
175 Args:
176 address:
177 config_name:
178 overrides:
179 """
180 with open(os.path.join(address, config_name), "r") as file:
181 data = yaml.safe_load(file)
183 for key, item in overrides.items():
184 data[key] = item
186 constants.POLLUTANTS = data["pollutants"]
187 constants.ADDITIVE_POLLUTANTS = data["additive_pollutants"]
188 constants.NON_ADDITIVE_POLLUTANTS = data["non_additive_pollutants"]
189 constants.FLOAT_ACCURACY = float(data["float_accuracy"])
190 self.__dict__.update(Model().__dict__)
192 nodes = data["nodes"]
194 for name, node in nodes.items():
195 if "filename" in node.keys():
196 node["data_input_dict"] = read_csv(
197 os.path.join(address, node["filename"])
198 )
199 del node["filename"]
200 if "surfaces" in node.keys():
201 for key, surface in node["surfaces"].items():
202 if "filename" in surface.keys():
203 node["surfaces"][key]["data_input_dict"] = read_csv(
204 os.path.join(address, surface["filename"])
205 )
206 del surface["filename"]
207 node["surfaces"] = list(node["surfaces"].values())
208 arcs = data["arcs"]
209 self.add_nodes(list(nodes.values()))
210 self.add_arcs(list(arcs.values()))
211 if "dates" in data.keys():
212 self.dates = [to_datetime(x) for x in data["dates"]]
214 def save(self, address, config_name="config.yml", compress=False):
215 """Save the model object to a yaml file and input data to csv.gz format in the
216 directory specified.
218 Args:
219 address (str): Path to a directory
220 config_name (str, optional): Name of yaml model file.
221 Defaults to 'model.yml'
222 """
223 if not os.path.exists(address):
224 os.mkdir(address)
225 nodes = {}
227 if compress:
228 file_type = "csv.gz"
229 else:
230 file_type = "csv"
231 for node in self.nodes.values():
232 init_args = self.get_init_args(node.__class__)
233 special_args = set(["surfaces", "parent", "data_input_dict"])
235 node_props = {
236 x: getattr(node, x) for x in set(init_args).difference(special_args)
237 }
238 node_props["type_"] = node.__class__.__name__
239 node_props["node_type_override"] = (
240 repr(node.__class__).split(".")[-1].replace("'>", "")
241 )
243 if "surfaces" in init_args:
244 surfaces = {}
245 for surface in node.surfaces:
246 surface_args = self.get_init_args(surface.__class__)
247 surface_props = {
248 x: getattr(surface, x)
249 for x in set(surface_args).difference(special_args)
250 }
251 surface_props["type_"] = surface.__class__.__name__
253 # Exceptions...
254 # TODO I need a better way to do this
255 del surface_props["capacity"]
256 if set(["rooting_depth", "pore_depth"]).intersection(surface_args):
257 del surface_props["depth"]
258 if "data_input_dict" in surface_args:
259 if surface.data_input_dict:
260 filename = (
261 "{0}-{1}-inputs.{2}".format(
262 node.name, surface.surface, file_type
263 )
264 .replace("(", "_")
265 .replace(")", "_")
266 .replace("/", "_")
267 .replace(" ", "_")
268 )
269 write_csv(
270 surface.data_input_dict,
271 {"node": node.name, "surface": surface.surface},
272 os.path.join(address, filename),
273 compress=compress,
274 )
275 surface_props["filename"] = filename
276 surfaces[surface_props["surface"]] = surface_props
277 node_props["surfaces"] = surfaces
279 if "data_input_dict" in init_args:
280 if node.data_input_dict:
281 filename = "{0}-inputs.{1}".format(node.name, file_type)
282 write_csv(
283 node.data_input_dict,
284 {"node": node.name},
285 os.path.join(address, filename),
286 compress=compress,
287 )
288 node_props["filename"] = filename
290 nodes[node.name] = node_props
292 arcs = {}
293 for arc in self.arcs.values():
294 init_args = self.get_init_args(arc.__class__)
295 special_args = set(["in_port", "out_port"])
296 arc_props = {
297 x: getattr(arc, x) for x in set(init_args).difference(special_args)
298 }
299 arc_props["type_"] = arc.__class__.__name__
300 arc_props["in_port"] = arc.in_port.name
301 arc_props["out_port"] = arc.out_port.name
302 arcs[arc.name] = arc_props
304 data = {
305 "nodes": nodes,
306 "arcs": arcs,
307 "pollutants": constants.POLLUTANTS,
308 "additive_pollutants": constants.ADDITIVE_POLLUTANTS,
309 "non_additive_pollutants": constants.NON_ADDITIVE_POLLUTANTS,
310 "float_accuracy": constants.FLOAT_ACCURACY,
311 }
312 if hasattr(self, "dates"):
313 data["dates"] = [str(x) for x in self.dates]
315 def coerce_value(value):
316 """
318 Args:
319 value:
321 Returns:
323 """
324 conversion_options = {
325 "__float__": float,
326 "__iter__": list,
327 "__int__": int,
328 "__str__": str,
329 "__bool__": bool,
330 }
331 converted = False
332 for property, func in conversion_options.items():
333 if hasattr(value, property):
334 try:
335 yaml.safe_dump(func(value))
336 value = func(value)
337 converted = True
338 break
339 except Exception:
340 raise ValueError(f"Cannot dump: {value} of type {type(value)}")
341 if not converted:
342 raise ValueError(f"Cannot dump: {value} of type {type(value)}")
344 return value
346 def check_and_coerce_dict(data_dict):
347 """
349 Args:
350 data_dict:
351 """
352 for key, value in data_dict.items():
353 if isinstance(value, dict):
354 check_and_coerce_dict(value)
355 else:
356 try:
357 yaml.safe_dump(value)
358 except yaml.representer.RepresenterError:
359 if hasattr(value, "__iter__"):
360 for idx, val in enumerate(value):
361 if isinstance(val, dict):
362 check_and_coerce_dict(val)
363 else:
364 value[idx] = coerce_value(val)
365 data_dict[key] = coerce_value(value)
367 check_and_coerce_dict(data)
369 write_yaml(address, config_name, data)
371 def load_pickle(self, fid):
372 """Load model object to a pickle file, including the model states.
374 Args:
375 fid (str): File address to load the pickled model from
377 Returns:
378 model (obj): loaded model
380 Example:
381 >>> # Load and run your model
382 >>> my_model.load(model_dir,config_name = 'config.yml')
383 >>> _ = my_model.run()
384 >>>
385 >>> # Save it including its different states
386 >>> my_model.save_pickle('model_at_end_of_run.pkl')
387 >>>
388 >>> # Load it at another time to resume the model from the end
389 >>> # of the previous run
390 >>> new_model = Model()
391 >>> new_model = new_model.load_pickle('model_at_end_of_run.pkl')
392 """
393 file = open(fid, "rb")
394 return pickle.load(file)
396 def save_pickle(self, fid):
397 """Save model object to a pickle file, including saving the model states.
399 Args:
400 fid (str): File address to save the pickled model to
402 Returns:
403 message (str): Exit message of pickle dump
404 """
405 file = open(fid, "wb")
406 pickle.dump(self, file)
407 return file.close()
409 def add_nodes(self, nodelist):
410 """Add nodes to the model object from a list of dicts, where each dict contains
411 all of the parameters for a node. Intended to be called before add_arcs.
413 Args:
414 nodelist (list): List of dicts, where a dict is a node
415 """
417 def all_subclasses(cls):
418 """
420 Args:
421 cls:
423 Returns:
425 """
426 return set(cls.__subclasses__()).union(
427 [s for c in cls.__subclasses__() for s in all_subclasses(c)]
428 )
430 for data in nodelist:
431 name = data["name"]
432 type_ = data["type_"]
433 if "node_type_override" in data.keys():
434 node_type = data["node_type_override"]
435 del data["node_type_override"]
436 else:
437 node_type = type_
438 if "foul" in name:
439 # Absolute hack to enable foul sewers to be treated separate from storm
440 type_ = "Foul"
441 if "geometry" in data.keys():
442 del data["geometry"]
443 del data["type_"]
444 self.nodes_type[type_][name] = getattr(nodes, node_type)(**dict(data))
445 self.nodes[name] = self.nodes_type[type_][name]
446 self.nodelist = [x for x in self.nodes.values()]
448 def add_instantiated_nodes(self, nodelist):
449 """Add nodes to the model object from a list of objects, where each object is an
450 already instantiated node object. Intended to be called before add_arcs.
452 Args:
453 nodelist (list): list of objects that are nodes
454 """
455 self.nodelist = nodelist
456 self.nodes = {x.name: x for x in nodelist}
457 for x in nodelist:
458 self.nodes_type[x.__class__.__name__][x.name] = x
460 def add_arcs(self, arclist):
461 """Add nodes to the model object from a list of dicts, where each dict contains
462 all of the parameters for an arc.
464 Args:
465 arclist (list): list of dicts, where a dict is an arc
466 """
467 river_arcs = {}
468 for arc in arclist:
469 name = arc["name"]
470 type_ = arc["type_"]
471 del arc["type_"]
472 arc["in_port"] = self.nodes[arc["in_port"]]
473 arc["out_port"] = self.nodes[arc["out_port"]]
474 self.arcs[name] = getattr(arcs_mod, type_)(**dict(arc))
476 if arc["in_port"].__class__.__name__ in [
477 "River",
478 "Node",
479 "Waste",
480 "Reservoir",
481 ]:
482 if arc["out_port"].__class__.__name__ in [
483 "River",
484 "Node",
485 "Waste",
486 "Reservoir",
487 ]:
488 river_arcs[name] = self.arcs[name]
490 if any(river_arcs):
491 upstreamness = {x: 0 for x in self.nodes_type["Waste"].keys()}
492 upstreamness = self.assign_upstream(river_arcs, upstreamness)
494 self.river_discharge_order = []
495 for node in sorted(
496 upstreamness.items(), key=lambda item: item[1], reverse=True
497 ):
498 if node[0] in self.nodes_type["River"].keys():
499 self.river_discharge_order.append(node[0])
501 def add_instantiated_arcs(self, arclist):
502 """Add arcs to the model object from a list of objects, where each object is an
503 already instantiated arc object.
505 Args:
506 arclist (list): list of objects that are arcs.
507 """
508 self.arclist = arclist
509 self.arcs = {x.name: x for x in arclist}
510 river_arcs = {}
511 for arc in arclist:
512 if arc.in_port.__class__.__name__ in [
513 "River",
514 "Node",
515 "Waste",
516 "Reservoir",
517 ]:
518 if arc.out_port.__class__.__name__ in [
519 "River",
520 "Node",
521 "Waste",
522 "Reservoir",
523 ]:
524 river_arcs[arc.name] = arc
525 upstreamness = {x: 0 for x in self.nodes_type["Waste"].keys()}
527 upstreamness = self.assign_upstream(river_arcs, upstreamness)
529 self.river_discharge_order = []
530 for node in sorted(
531 upstreamness.items(), key=lambda item: item[1], reverse=True
532 ):
533 if node[0] in self.nodes_type["River"].keys():
534 self.river_discharge_order.append(node[0])
536 def assign_upstream(self, arcs, upstreamness):
537 """Recursive function to trace upstream up arcs to determine which are the most
538 upstream.
540 Args:
541 arcs (list): list of dicts where dicts are arcs
542 upstreamness (dict): dictionary contain nodes in
543 arcs as keys and a number representing upstreamness
544 (higher numbers = more upstream)
546 Returns:
547 upstreamness (dict): final version of upstreamness
548 """
549 upstreamness_ = upstreamness.copy()
550 in_nodes = [
551 x.in_port.name
552 for x in arcs.values()
553 if x.out_port.name in upstreamness.keys()
554 ]
555 ind = max(list(upstreamness_.values())) + 1
556 in_nodes = list(set(in_nodes).difference(upstreamness.keys()))
557 for node in in_nodes:
558 upstreamness[node] = ind
559 if upstreamness == upstreamness_:
560 return upstreamness
561 else:
562 upstreamness = self.assign_upstream(arcs, upstreamness)
563 return upstreamness
565 def debug_node_mb(self):
566 """Simple function that iterates over nodes calling their mass balance
567 function."""
568 for node in self.nodelist:
569 _ = node.node_mass_balance()
571 def default_settings(self):
572 """Incomplete function that enables easy specification of results storage.
574 Returns:
575 (dict): default settings
576 """
577 return {
578 "arcs": {"flows": True, "pollutants": True},
579 "tanks": {"storages": True, "pollutants": True},
580 "mass_balance": False,
581 }
583 def change_runoff_coefficient(self, relative_change, nodes=None):
584 """Clunky way to change the runoff coefficient of a land node.
586 Args:
587 relative_change (float): amount that the impervious area in the land
588 node is multiplied by (grass area is changed in compensation)
589 nodes (list, optional): list of land nodes to change the parameters of.
590 Defaults to None, which applies the change to all land nodes.
591 """
592 # Multiplies impervious area by relative change and adjusts grassland
593 # accordingly
594 if nodes is None:
595 nodes = self.nodes_type["Land"].values()
597 if isinstance(relative_change, float):
598 relative_change = {x: relative_change for x in nodes}
600 for node in nodes:
601 surface_dict = {x.surface: x for x in node.surfaces}
602 if "Impervious" in surface_dict.keys():
603 impervious_area = surface_dict["Impervious"].area
604 grass_area = surface_dict["Grass"].area
606 new_impervious_area = impervious_area * relative_change[node]
607 new_grass_area = grass_area + (impervious_area - new_impervious_area)
608 if new_grass_area < 0:
609 print("not enough grass")
610 break
611 surface_dict["Impervious"].area = new_impervious_area
612 surface_dict["Impervious"].capacity *= relative_change[node]
614 surface_dict["Grass"].area = new_grass_area
615 surface_dict["Grass"].capacity *= new_grass_area / grass_area
616 for pol in constants.ADDITIVE_POLLUTANTS + ["volume"]:
617 surface_dict["Grass"].storage[pol] *= new_grass_area / grass_area
618 for pool in surface_dict["Grass"].nutrient_pool.pools:
619 for nutrient in pool.storage.keys():
620 pool.storage[nutrient] *= new_grass_area / grass_area
622 def run(
623 self,
624 dates=None,
625 settings=None,
626 record_arcs=None,
627 record_tanks=None,
628 record_surfaces=None,
629 verbose=True,
630 record_all=True,
631 objectives=[],
632 ):
633 """Run the model object with the default orchestration.
635 Args:
636 dates (list, optional): Dates to simulate. Defaults to None, which
637 simulates all dates that the model has data for.
638 settings (dict, optional): Dict to specify what results are stored,
639 not currently used. Defaults to None.
640 record_arcs (list, optional): List of arcs to store result for.
641 Defaults to None.
642 record_tanks (list, optional): List of nodes with water stores to
643 store results for. Defaults to None.
644 record_surfaces (list, optional): List of tuples of
645 (land node, surface) to store results for. Defaults to None.
646 verbose (bool, optional): Prints updates on simulation if true.
647 Defaults to True.
648 record_all (bool, optional): Specifies to store all results.
649 Defaults to True.
650 objectives (list, optional): A list of dicts with objectives to
651 calculate (see examples). Defaults to [].
653 Returns:
654 flows: simulated flows in a list of dicts
655 tanks: simulated tanks storages in a list of dicts
656 objective_results: list of values based on objectives list
657 surfaces: simulated surface storages of land nodes in a list of dicts
659 Examples:
660 # Run a model without storing any results but calculating objectives
661 import statistics as stats
662 objectives = [{'element_type' : 'flows',
663 'name' : 'my_river',
664 'function' : @ (x, _) stats.mean([y['phosphate'] for y in x])
665 },
666 {'element_type' : 'tanks',
667 'name' : 'my_reservoir',
668 'function' : @ (x, model) sum([y['storage'] < (model.nodes
669 ['my_reservoir'].tank.capacity / 2) for y in x])
670 }]
671 _, _, results, _ = my_model.run(record_all = False, objectives = objectives)
672 """
673 if record_arcs is None:
674 record_arcs = []
675 if record_all:
676 record_arcs = list(self.arcs.keys())
677 if record_tanks is None:
678 record_tanks = []
680 if record_surfaces is None:
681 record_surfaces = []
683 if settings is None:
684 settings = self.default_settings()
686 def blockPrint():
687 """
689 Returns:
691 """
692 stdout = sys.stdout
693 sys.stdout = open(os.devnull, "w")
694 return stdout
696 def enablePrint(stdout):
697 """
699 Args:
700 stdout:
701 """
702 sys.stdout = stdout
704 if not verbose:
705 stdout = blockPrint()
706 if dates is None:
707 dates = self.dates
709 for objective in objectives:
710 if objective["element_type"] == "tanks":
711 record_tanks.append(objective["name"])
712 elif objective["element_type"] == "flows":
713 record_arcs.append(objective["name"])
714 elif objective["element_type"] == "surfaces":
715 record_surfaces.append((objective["name"], objective["surface"]))
716 else:
717 print("element_type not recorded")
719 flows = []
720 tanks = []
721 surfaces = []
722 for date in tqdm(dates, disable=(not verbose)):
723 # for date in dates:
724 for node in self.nodelist:
725 node.t = date
726 node.monthyear = date.to_period("M")
728 # Run FWTW
729 for node in self.nodes_type["FWTW"].values():
730 node.treat_water()
732 # Create demand (gets pushed to sewers)
733 for node in self.nodes_type["Demand"].values():
734 node.create_demand()
736 # Create runoff (impervious gets pushed to sewers, pervious to groundwater)
737 for node in self.nodes_type["Land"].values():
738 node.run()
740 # Infiltrate GW
741 for node in self.nodes_type["Groundwater"].values():
742 node.infiltrate()
744 # Discharge sewers (pushed to other sewers or WWTW)
745 for node in self.nodes_type["Sewer"].values():
746 node.make_discharge()
748 # Foul second so that it can discharge any misconnection
749 for node in self.nodes_type["Foul"].values():
750 node.make_discharge()
752 # Discharge WWTW
753 for node in self.nodes_type["WWTW"].values():
754 node.calculate_discharge()
756 # Discharge GW
757 for node in self.nodes_type["Groundwater"].values():
758 node.distribute()
760 # river
761 for node in self.nodes_type["River"].values():
762 node.calculate_discharge()
764 # Abstract
765 for node in self.nodes_type["Reservoir"].values():
766 node.make_abstractions()
768 for node in self.nodes_type["Land"].values():
769 node.apply_irrigation()
771 for node in self.nodes_type["WWTW"].values():
772 node.make_discharge()
774 # Catchment routing
775 for node in self.nodes_type["Catchment"].values():
776 node.route()
778 # river
779 for node_name in self.river_discharge_order:
780 self.nodes[node_name].distribute()
782 # mass balance checking
783 # nodes/system
784 sys_in = self.empty_vqip()
785 sys_out = self.empty_vqip()
786 sys_ds = self.empty_vqip()
788 # arcs
789 for arc in self.arcs.values():
790 in_, ds_, out_ = arc.arc_mass_balance()
791 for v in constants.ADDITIVE_POLLUTANTS + ["volume"]:
792 sys_in[v] += in_[v]
793 sys_out[v] += out_[v]
794 sys_ds[v] += ds_[v]
795 for node in self.nodelist:
796 # print(node.name)
797 in_, ds_, out_ = node.node_mass_balance()
799 # temp = {'name' : node.name,
800 # 'time' : date}
801 # for lab, dict_ in zip(['in','ds','out'], [in_, ds_, out_]):
802 # for key, value in dict_.items():
803 # temp[(lab, key)] = value
804 # node_mb.append(temp)
806 for v in constants.ADDITIVE_POLLUTANTS + ["volume"]:
807 sys_in[v] += in_[v]
808 sys_out[v] += out_[v]
809 sys_ds[v] += ds_[v]
811 for v in constants.ADDITIVE_POLLUTANTS + ["volume"]:
812 # Find the largest value of in_, out_, ds_
813 largest = max(sys_in[v], sys_in[v], sys_in[v])
815 if largest > constants.FLOAT_ACCURACY:
816 # Convert perform comparison in a magnitude to match the largest
817 # value
818 magnitude = 10 ** int(log10(largest))
819 in_10 = sys_in[v] / magnitude
820 out_10 = sys_in[v] / magnitude
821 ds_10 = sys_in[v] / magnitude
822 else:
823 in_10 = sys_in[v]
824 ds_10 = sys_in[v]
825 out_10 = sys_in[v]
827 if (in_10 - ds_10 - out_10) > constants.FLOAT_ACCURACY:
828 print(
829 "system mass balance error for "
830 + v
831 + " of "
832 + str(sys_in[v] - sys_ds[v] - sys_out[v])
833 )
835 # Store results
836 for arc in record_arcs:
837 arc = self.arcs[arc]
838 flows.append(
839 {"arc": arc.name, "flow": arc.vqip_out["volume"], "time": date}
840 )
841 for pol in constants.POLLUTANTS:
842 flows[-1][pol] = arc.vqip_out[pol]
844 for node in record_tanks:
845 node = self.nodes[node]
846 tanks.append(
847 {
848 "node": node.name,
849 "storage": node.tank.storage["volume"],
850 "time": date,
851 }
852 )
854 for node, surface in record_surfaces:
855 node = self.nodes[node]
856 name = node.name
857 surface = node.get_surface(surface)
858 if not isinstance(surface, ImperviousSurface):
859 surfaces.append(
860 {
861 "node": name,
862 "surface": surface.surface,
863 "percolation": surface.percolation["volume"],
864 "subsurface_r": surface.subsurface_flow["volume"],
865 "surface_r": surface.infiltration_excess["volume"],
866 "storage": surface.storage["volume"],
867 "evaporation": surface.evaporation["volume"],
868 "precipitation": surface.precipitation["volume"],
869 "tank_recharge": surface.tank_recharge,
870 "capacity": surface.capacity,
871 "time": date,
872 "et0_coef": surface.et0_coefficient,
873 # 'crop_factor' : surface.crop_factor
874 }
875 )
876 for pol in constants.POLLUTANTS:
877 surfaces[-1][pol] = surface.storage[pol]
878 else:
879 surfaces.append(
880 {
881 "node": name,
882 "surface": surface.surface,
883 "storage": surface.storage["volume"],
884 "evaporation": surface.evaporation["volume"],
885 "precipitation": surface.precipitation["volume"],
886 "capacity": surface.capacity,
887 "time": date,
888 }
889 )
890 for pol in constants.POLLUTANTS:
891 surfaces[-1][pol] = surface.storage[pol]
892 if record_all:
893 for node in self.nodes.values():
894 for prop_ in dir(node):
895 prop = node.__getattribute__(prop_)
896 if prop.__class__ in [QueueTank, Tank, ResidenceTank]:
897 tanks.append(
898 {
899 "node": node.name,
900 "time": date,
901 "storage": prop.storage["volume"],
902 "prop": prop_,
903 }
904 )
905 for pol in constants.POLLUTANTS:
906 tanks[-1][pol] = prop.storage[pol]
908 for name, node in self.nodes_type["Land"].items():
909 for surface in node.surfaces:
910 if not isinstance(surface, ImperviousSurface):
911 surfaces.append(
912 {
913 "node": name,
914 "surface": surface.surface,
915 "percolation": surface.percolation["volume"],
916 "subsurface_r": surface.subsurface_flow["volume"],
917 "surface_r": surface.infiltration_excess["volume"],
918 "storage": surface.storage["volume"],
919 "evaporation": surface.evaporation["volume"],
920 "precipitation": surface.precipitation["volume"],
921 "tank_recharge": surface.tank_recharge,
922 "capacity": surface.capacity,
923 "time": date,
924 "et0_coef": surface.et0_coefficient,
925 # 'crop_factor' : surface.crop_factor
926 }
927 )
928 for pol in constants.POLLUTANTS:
929 surfaces[-1][pol] = surface.storage[pol]
930 else:
931 surfaces.append(
932 {
933 "node": name,
934 "surface": surface.surface,
935 "storage": surface.storage["volume"],
936 "evaporation": surface.evaporation["volume"],
937 "precipitation": surface.precipitation["volume"],
938 "capacity": surface.capacity,
939 "time": date,
940 }
941 )
942 for pol in constants.POLLUTANTS:
943 surfaces[-1][pol] = surface.storage[pol]
945 for node in self.nodes.values():
946 node.end_timestep()
948 for arc in self.arcs.values():
949 arc.end_timestep()
950 objective_results = []
951 for objective in objectives:
952 if objective["element_type"] == "tanks":
953 val = objective["function"](
954 [x for x in tanks if x["node"] == objective["name"]], self
955 )
956 elif objective["element_type"] == "flows":
957 val = objective["function"](
958 [x for x in flows if x["arc"] == objective["name"]], self
959 )
960 elif objective["element_type"] == "surfaces":
961 val = objective["function"](
962 [
963 x
964 for x in surfaces
965 if (x["node"] == objective["name"])
966 & (x["surface"] == objective["surface"])
967 ],
968 self,
969 )
970 objective_results.append(val)
971 if not verbose:
972 enablePrint(stdout)
973 return flows, tanks, objective_results, surfaces
975 def reinit(self):
976 """Reinitialise by ending all node/arc timesteps and calling reinit function in
977 all nodes (generally zero-ing their storage values)."""
978 for node in self.nodes.values():
979 node.end_timestep()
980 for prop in dir(node):
981 prop = node.__getattribute__(prop)
982 for prop_ in dir(prop):
983 if prop_ == "reinit":
984 prop_ = node.__getattribute__(prop_)
985 prop_()
987 for arc in self.arcs.values():
988 arc.end_timestep()
991def write_yaml(address, config_name, data):
992 """
994 Args:
995 address:
996 config_name:
997 data:
998 """
999 with open(os.path.join(address, config_name), "w") as file:
1000 yaml.dump(
1001 data,
1002 file,
1003 default_flow_style=False,
1004 sort_keys=False,
1005 Dumper=yaml.SafeDumper,
1006 )
1009def open_func(file_path, mode):
1010 """
1012 Args:
1013 file_path:
1014 mode:
1016 Returns:
1018 """
1019 if mode == "rt" and file_path.endswith(".gz"):
1020 return gzip.open(file_path, mode)
1021 else:
1022 return open(file_path, mode)
1025def read_csv(file_path, delimiter=","):
1026 """
1028 Args:
1029 file_path:
1030 delimiter:
1032 Returns:
1034 """
1035 with open_func(file_path, "rt") as f:
1036 reader = csv.DictReader(f, delimiter=delimiter)
1037 data = {}
1038 for row in reader:
1039 key = (row["variable"], to_datetime(row["time"]))
1040 value = float(row["value"])
1041 data[key] = value
1042 return data
1045def write_csv(data, fixed_data={}, filename="", compress=False):
1046 """
1048 Args:
1049 data:
1050 fixed_data:
1051 filename:
1052 compress:
1053 """
1054 if compress:
1055 open_func = gzip.open
1056 mode = "wt"
1057 else:
1058 open_func = open
1059 mode = "w"
1060 with open_func(filename, mode, newline="") as csvfile:
1061 writer = csv.writer(csvfile)
1062 writer.writerow(list(fixed_data.keys()) + ["variable", "time", "value"])
1063 fixed_data_values = list(fixed_data.values())
1064 for key, value in data.items():
1065 writer.writerow(fixed_data_values + list(key) + [str(value)])
1068def flatten_dict(d, parent_key="", sep="-"):
1069 """
1071 Args:
1072 d:
1073 parent_key:
1074 sep:
1076 Returns:
1078 """
1079 # Initialize an empty dictionary
1080 flat_dict = {}
1081 # Loop through each key-value pair in the input dictionary
1082 for k, v in d.items():
1083 # Construct a new key by appending the parent key and separator
1084 new_key = str(parent_key) + sep + str(k) if parent_key else k
1085 # If the value is another dictionary, call the function recursively
1086 if isinstance(v, dict):
1087 flat_dict.update(flatten_dict(v, new_key, sep))
1088 # Otherwise, add the key-value pair to the flat dictionary
1089 else:
1090 flat_dict[new_key] = v
1091 # Return the flattened dictionary
1092 return flat_dict
1095def check_and_convert_string(value):
1096 """
1098 Args:
1099 value:
1101 Returns:
1103 """
1104 try:
1105 return int(value)
1106 except Exception:
1107 try:
1108 return float(value)
1109 except Exception:
1110 if value == "None":
1111 return None
1112 else:
1113 return value
1116def unflatten_dict(d, sep=":"):
1117 """
1119 Args:
1120 d:
1121 sep:
1123 Returns:
1125 """
1126 result = {}
1127 for k, v in d.items():
1128 keys = k.split(sep)
1129 current = result
1130 for key in keys[:-1]:
1131 current = current.setdefault(key, {})
1132 current[keys[-1]] = v
1133 return result
1136def convert_keys(d):
1137 """
1139 Args:
1140 d:
1142 Returns:
1144 """
1145 # base case: if d is not a dict, return d
1146 if not isinstance(d, dict):
1147 return d
1148 # recursive case: create a new dict with int keys and converted values
1149 new_d = {}
1150 for k, v in d.items():
1151 new_d[check_and_convert_string(k)] = convert_keys(v)
1152 return new_d
1155def csv2yaml(address, config_name="config_csv.yml", csv_folder_name="csv"):
1156 """
1158 Args:
1159 address:
1160 config_name:
1161 csv_folder_name:
1162 """
1163 csv_path = os.path.join(address, csv_folder_name)
1164 csv_list = [
1165 os.path.join(csv_path, f)
1166 for f in os.listdir(csv_path)
1167 if os.path.isfile(os.path.join(csv_path, f))
1168 ]
1169 objs_type = {"nodes": {}, "arcs": {}}
1170 for fid in csv_list:
1171 with open(fid, "rt") as f:
1172 if "Dates" in fid:
1173 reader = csv.reader(f, delimiter=",")
1174 dates = []
1175 for row in reader:
1176 dates.append(row[0])
1177 objs_type["dates"] = dates[1:]
1178 else:
1179 reader = csv.DictReader(f, delimiter=",")
1180 data = {}
1181 for row in reader:
1182 formatted_row = {}
1183 for key, value in row.items():
1184 if value:
1185 if ("[" in value) & ("]" in value):
1186 # Convert lists
1187 value = value.strip("[]") # Remove the brackets
1188 value = value.replace("'", "") # Remove the string bits
1189 value = value.split(", ") # Split by comma
1190 value = [check_and_convert_string(x) for x in value]
1191 else:
1192 # Convert ints, floats and strings
1193 value = check_and_convert_string(value)
1195 # Convert key and store converted values
1196 formatted_row[key] = value
1197 if "Sim_params" not in fid:
1198 label = formatted_row["label"]
1199 del formatted_row["label"]
1201 formatted_row = unflatten_dict(formatted_row)
1202 formatted_row = convert_keys(formatted_row)
1204 # Convert nested dicts dicts
1205 data[row["name"]] = formatted_row
1206 if "Sim_params" in fid:
1207 objs_type = {
1208 **objs_type,
1209 **{x: y["value"] for x, y in data.items()},
1210 }
1211 else:
1212 objs_type[label] = {**objs_type[label], **data}
1213 write_yaml(address, config_name, objs_type)
1216def yaml2csv(address, config_name="config.yml", csv_folder_name="csv"):
1217 """
1219 Args:
1220 address:
1221 config_name:
1222 csv_folder_name:
1223 """
1224 with open(os.path.join(address, config_name), "r") as file:
1225 data = yaml.safe_load(file)
1227 # Format to easy format to write to database
1228 objs_type = {}
1229 for objects, object_label in zip([data["nodes"], data["arcs"]], ["nodes", "arcs"]):
1230 for key, value in objects.items():
1231 if isinstance(value, dict):
1232 # Identify node type
1233 if "node_type_override" in value.keys():
1234 type_ = value["node_type_override"]
1235 elif "type_" in value.keys():
1236 type_ = value["type_"]
1237 else:
1238 type_ = False
1240 if type_:
1241 # Flatten dictionaries
1242 new_dict = {}
1243 if type_ not in objs_type.keys():
1244 objs_type[type_] = {}
1246 for key_, value_ in value.items():
1247 if isinstance(value_, dict):
1248 new_dict[key_] = flatten_dict(value_, key_, ":")
1250 for key_, value_ in new_dict.items():
1251 del value[key_]
1252 value = {**value, **value_}
1253 value["label"] = object_label
1254 objs_type[type_][key] = value
1256 del data["nodes"]
1257 del data["arcs"]
1258 if "dates" in data.keys():
1259 objs_type["Dates"] = data["dates"]
1260 del data["dates"]
1262 objs_type["Sim_params"] = {x: {"name": x, "value": y} for x, y in data.items()}
1264 csv_dir = os.path.join(address, csv_folder_name)
1266 if not os.path.exists(csv_dir):
1267 os.mkdir(csv_dir)
1269 for key, value in objs_type.items():
1270 if key == "Sim_params":
1271 fields = ["name", "value"]
1272 elif key == "Dates":
1273 fields = ["date"]
1274 else:
1275 fields = {}
1276 for value_ in value.values():
1277 fields = {**fields, **value_}
1279 del fields["name"]
1280 fields = ["name"] + list(fields.keys())
1282 with open(
1283 os.path.join(csv_dir, "{0}.csv".format(key)), "w", newline=""
1284 ) as csvfile:
1285 writer = csv.writer(csvfile)
1286 writer.writerow(fields)
1287 if key == "Dates":
1288 for date in value:
1289 writer.writerow([date])
1290 else:
1291 for key_, value_ in value.items():
1292 writer.writerow(
1293 [str(value_[x]) if x in value_.keys() else None for x in fields]
1294 )