Source code for openalea.metafspm.composite_wrapper

import yaml
from dataclasses import fields
from importlib import import_module, reload
from openalea.metafspm.utils import ArrayDict


[docs] def recursive_reload(module): blacklist = ["openalea.mtg.mtg", "openalea.mtg.tree", "collections", "functools"] reload(module) for child_module in vars(module).values(): if isinstance(child_module, type) and module.__name__ != child_module.__module__ and child_module.__module__ not in blacklist: module_to_reload = import_module(name=child_module.__module__) recursive_reload(module_to_reload)
[docs] class CompositeModel:
[docs] def get_documentation(self, filters: dict, models: list): """ Documentation of the RootCyNAPS parameters :return: documentation text """ to_print = "" for model in models: to_print += "MODEL DOCUMENTATION : \n" if model.__doc__ == None: to_print += " no documentation"+ "\n\n" else: to_print += model.__doc__ + "\n\n" to_print += "MODEL OUTPUT VARIABLES : \n" docu = fields(model) first = True for f in docu: if first: headers = f.metadata.keys() max_format = "{:.30} " width_format = "{:<31}" to_print += width_format.format(max_format.format("name")) + " | " for header in headers: if header == "description": max_format = "{:.90} " width_format = "{:<91}" else: max_format = "{:.30} " width_format = "{:<31}" to_print += width_format.format(max_format.format(header)) + " | " to_print += "\n\n" first = False filtering = [f.metadata[k] in v for k, v in filters.items()] if False not in filtering or len(filtering) == 0: to_print += width_format.format(max_format.format(f.name)) + " | " values = list(f.metadata.values()) for value in values: if values.index(value) == 2: max_format = "{:.90} " width_format = "{:<91}" if len(value) > 90: value = value.replace(value[87:], "...") else: max_format = "{:.30} " width_format = "{:<31}" to_print += width_format.format(max_format.format(value)) + " | " to_print += "\n" return to_print
@property def documentation(self): return self.get_documentation(filters={}, models=self.components) @property def inputs(self): return self.get_documentation(filters=dict(variable_type=["input"]), models=self.components) def declare_data(self, shoot=None, root=None, atmosphere=None, soil=None): self.data_structures = {} if shoot: self.data_structures["shoot"] = shoot if root: self.data_structures["root"] = root if atmosphere: self.data_structures["atmosphere"] = atmosphere if soil: self.data_structures["soil"] = soil
[docs] def couple_components(self, *args, translator_path: str = ""): """ Description : linker function that will enable properties sharing through MTG. Parameters : :param translator: list matrix containing translator dictionnaries for each model pair :param components: inistances of components that should be coupled as indicated by the coupling_translator.yaml Note : The whole property is transfered, so if only the collar value of a spatial property is needed, it will be accessed through the first vertice with the [1] indice. Not spatialized properties like xylem pressure or single point properties like collar flows are only stored in the indice [1] vertice. """ self.components = [component for component in args] translator = self.open_or_create_translator(translator_path) soil_name = "SoilModel" # TODO : find a way to generalize this self.plant_side_soil_inputs = ["vertex_index", "x1", "x2", "y1", "y2", "z1", "z2"] for v in translator[soil_name].values(): for t in v.values(): for name in t.keys(): self.plant_side_soil_inputs.append(name) self.soil_inputs, self.soil_outputs = self.get_component_inputs_outputs(translator=translator, components_names=[c.__class__.__name__ for c in self.components], target_name=soil_name, names_for_others=False) props = self.data_structures["root"].properties() # Soil did not initialted its properties in the MTG itself since we stopped pickling it, so we do it here for name in self.soil_outputs: props[name] = ArrayDict(dict(zip(props["struct_mass"].keys(), [0. for k in range(len(props["struct_mass"]))])), dtype=float) for receiver in self.components: self.couple_current_with_components_list(receiver=receiver, components=[c.__class__.__name__ for c in self.components] + [soil_name], translator=translator, common_props=props)
def open_or_create_translator(self, translator_path): try: with open(translator_path + "/coupling_translator.yaml", "r") as f: translator = yaml.safe_load(f) except FileNotFoundError: print("NOTE : You will now have to provide information about shared variables between the modules composing this model :\n") translator = self.translator_matrix_builder() with open(translator_path + "/coupling_translator.yaml", "w") as f: yaml.dump(translator, f) return translator def couple_current_with_components_list(self, receiver, components, translator, common_props=None, subcategory=None): if not hasattr(receiver, "pullable_inputs"): receiver.pullable_inputs = {} if subcategory is not None and subcategory in receiver.pullable_inputs.keys(): pass else: if subcategory is not None: receiver.pullable_inputs[subcategory] = {} for applier in components: if receiver.__class__.__name__ != applier: linker = translator[receiver.__class__.__name__][applier] # If a model has been targeted on this position if len(linker.keys()) > 0: # We set properties with getter method only to retrieve the values dynamically from inputs for name, source_variables in linker.items(): if len(source_variables.keys()) > 0: # Handling exception where operations are put in the coupling translator for unit conversion for source_name, unit_conversion in source_variables.items(): if isinstance(unit_conversion, str): source_variables[source_name] = eval(unit_conversion) if len(source_variables.keys()) == 1: for source_name, unit_conversion in source_variables.items(): # If there is only one variable to associate if source_name == name: # Do nothing the coupling should already be done during initialization continue else: # If only the name is different, just create an alias in the dictionnary and then recreate the pointer of the receiver class to this alias. if unit_conversion == 1. and common_props is not None: # If not created yet, for example in case of secondary soil initialization, we set default and suppose it will be modified later if source_name not in common_props.keys(): common_props[source_name] = {} common_props[name] = common_props[source_name] else: # NOTE TODO : We will probably need to switch only the the second option later if subcategory is None: receiver.pullable_inputs[name] = {source_name: unit_conversion} else: receiver.pullable_inputs[subcategory][name] = {source_name: unit_conversion} else: if subcategory is None: receiver.pullable_inputs[name] = source_variables else: receiver.pullable_inputs[subcategory][name] = {source_name: unit_conversion}
[docs] def translator_matrix_builder(self): """ Translator matrix builder utility, to be used if no translator dictionay is available on modules' directory :param components: inistances of components that should be coupled as indicated by the coupling_translator.yaml """ L = len(self.components) translator = {self.components[i].__class__.__name__:{self.components[k].__class__.__name__:{} for k in range(L)} for i in range(L)} for receiver_model in range(L): inputs = [f for f in fields(self.components[receiver_model]) if f.metadata["variable_type"] == "input"] needed_models = list(set([f.metadata["by"] for f in inputs])) needed_models.sort() for name in needed_models: print([(model + 1, self.components[model].__class__.__name__) for model in range(len(self.components))]) which = int(input(f"[for {self.components[receiver_model].__class__.__name__}] Which is {name}? (0 for None): ")) - 1 needed_inputs = [f.name for f in inputs if f.metadata["by"] == name] if 0 <= which < L: available = self.get_documentation(filters=dict(variable_type=["state_variable", "plant_scale_state"]), models=[self.components[which]]) print(available) for var in needed_inputs: selected = input(f"For {var}, Nothing for same name / enter target names * conversion factor / Separate by ; -> ").split(";") com_dict = {} for expression in selected: if "*" in expression: l = expression.split("*") com_dict[l[0].replace(" ", "")] = float(l[1]) elif expression.strip() == "": com_dict[var] = 1. else: com_dict[expression.replace(" ", "")] = 1. translator[self.components[receiver_model].__class__.__name__][self.components[which].__class__.__name__][var] = com_dict return translator
def declare_data_and_couple_components(self, shoot=None, root=None, atmosphere=None, soil=None, translator_path: str = "", components: tuple = ()): self.declare_data(shoot=shoot, root=root, atmosphere=atmosphere, soil=soil) self.couple_components(translator_path=translator_path, *components) def apply_input_tables(self, tables: dict, to: tuple, when: float): if tables is not None: if not hasattr(self, "models_data_required"): all_available_state_variables = [] for model in to: all_available_state_variables += model.state_variables self.models_data_required = [[var for var in tables.keys() if ( # Either the input is not provided by another coupled module (var in model.inputs) and (var not in all_available_state_variables)) or ( # Or the considered model provides the variable but gets it from input data only var in model.state_variables)] for model in to] for model in range(len(to)): for var in self.models_data_required[model]: if hasattr(to[model], "voxels"): # supposed True : if isinstance(getattr(to[model].voxels, var), np.ndarray): to[model].voxels[var].fill(tables[var][when]) elif hasattr(to[model], "props"): to[model].props[var].update({1: tables[var][when]}) else: raise TypeError("Unknown data structure to apply input data to") def get_component_inputs_outputs(self, translator, components_names, target_name, names_for_others=True): expected_inputs = [] expected_outputs = [] target_component = translator[target_name] for component in components_names: if component != target_name: # Get outputs from all others input_components = translator[component] for provider, source_variables in input_components.items(): # Among inputs if the target is found if provider == target_name: if names_for_others: expected_outputs += list(source_variables.keys()) else: for _, translation in source_variables.items(): expected_outputs += list(translation.keys()) # Get inputs from all for target component if names_for_others: for _, translation in target_component[component].items(): expected_inputs += list(translation.keys()) else: expected_inputs += list(target_component[component].keys()) expected_inputs = list(set(expected_inputs)) expected_outputs = list(set(expected_outputs)) return expected_inputs, expected_outputs