Source code for openalea.metafspm.scene_wrapper

# Public packages
import os, shutil, psutil
import multiprocessing as mp
from multiprocessing.shared_memory import SharedMemory
import numpy as np
import random
import time


### metafspm zone
[docs] def play_Orchestra(scene_name, output_folder, plant_models: list, plant_scenarios: list, soil_model=None, soil_scenario: dict = {"parameters": {}, "input_tables": {}}, light_model = None, light_scenario: dict = {}, translator_path: str = "", logger_class = None, log_settings: dict = {}, heavy_log_period: int = 24, n_iterations = 2500, time_step=3600, scene_xrange=1, scene_yrange=1, sowing_density=250, row_spacing=0.15, max_depth=1.3, voxel_widht=0.01, voxel_height=0.01, record_performance=False): """ Orchestrator function launching in parallel plant models and then environment models --- TODO : Scene orientation regarding an angle relative to North """ # Settings to avoid processes concurrency os.environ.update({ "OMP_NUM_THREADS": "1", "MKL_NUM_THREADS": "1", "OPENBLAS_NUM_THREADS": "1", "NUMEXPR_NUM_THREADS": "1", "MKL_DYNAMIC": "FALSE", }) clean_exit = True # Specific output structure for scenes not managed by per process loggers if not os.path.exists(output_folder): os.mkdir(output_folder) scene_folder = os.path.join(output_folder, scene_name) if os.path.exists(scene_folder): shutil.rmtree(scene_folder) os.mkdir(scene_folder) # Compute the placement of individual plants in the scene and for each position get the information on how to initialize the plant model at that location scene_xrange, scene_yrange, planting_sequence = stand_initialization(scene_name=scene_name, xrange=scene_xrange, yrange=scene_yrange, sowing_density=sowing_density, sowing_depth=[0.025], row_spacing=row_spacing, plant_models=plant_models, plant_scenarios=plant_scenarios, plant_model_frequency=[1.]) cpu_assignments = plan_affinity(len(planting_sequence), 1) # TODO : only 1 cpu per plant as for now, see if we need to adapt this if we start leveraging intense vectorization with numba # Queues to perform synchronization and data sharing of the processes queues_soil_to_plants = {pid: mp.Queue() for pid in planting_sequence.keys()} queue_plants_to_soil = mp.Queue() if light_model is not None: queues_light_to_plants = {pid: mp.Queue() for pid in planting_sequence.keys()} queue_plants_to_light = mp.Queue() else: queues_light_to_plants=None queue_plants_to_light=None stop_event = mp.Event() stop_file = os.path.join(output_folder, scene_name, "Delete_to_Stop") open(stop_file, "w").close() handshake_size = 35 # Then we start workers which namely take the barriers as input so that even when execution is parallel, the resolution loop is synchronized processes = [] sharememories = [] cpu_set = 0 try: for plant_id, init_info in planting_sequence.items(): a = np.empty((handshake_size, 20000), dtype=np.float64) shm = SharedMemory(create=True, name=plant_id, size=a.nbytes) b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) b[:] = a[:] shm.close() sharememories.append(shm) p = mp.Process( target=plant_worker, kwargs=dict(queues_soil_to_plants=queues_soil_to_plants, queue_plants_to_soil=queue_plants_to_soil, queues_light_to_plants=queues_light_to_plants, queue_plants_to_light=queue_plants_to_light, cpu_ids=cpu_assignments[cpu_set], stop_event=stop_event, plant_model=init_info["model"], plant_id=plant_id, translator_path=translator_path, output_dirpath=os.path.join(output_folder, scene_name, plant_id), n_iterations=n_iterations, time_step=time_step, coordinates=init_info["coordinates"], rotation=init_info["rotation"], scenario=init_info["scenario"], logger_class=logger_class, log_settings=log_settings, heavy_log_period=heavy_log_period, record_performance=record_performance) ) processes.append(p) p.start() cpu_set += 1 if soil_model is not None: p = mp.Process( target=soil_worker, kwargs=dict(queues_soil_to_plants=queues_soil_to_plants, queue_plants_to_soil=queue_plants_to_soil, stop_event=stop_event, soil_model=soil_model, scene_xrange=scene_xrange, scene_yrange=scene_yrange, translator_path=translator_path, output_dirpath=os.path.join(output_folder, scene_name, 'Soil'), n_iterations=n_iterations, time_step=time_step, scenario=soil_scenario, logger_class=logger_class, log_settings=log_settings, heavy_log_period=heavy_log_period) ) processes.append(p) p.start() if light_model is not None: p = mp.Process( target=light_worker, kwargs=dict(queues_light_to_plants=queues_light_to_plants, queue_plants_to_light=queue_plants_to_light, stop_event=stop_event, light_model=light_model, scene_xrange=scene_xrange, scene_yrange=scene_yrange, output_dirpath=os.path.join(output_folder, scene_name, 'Light'), n_iterations=n_iterations, time_step=time_step, scenario=plant_scenarios[0])) processes.append(p) p.start() while not stop_event.is_set(): if not os.path.exists(stop_file): stop_event.set() clean_exit = False time.sleep(10) except: clean_exit = False finally: # Wait for all processes to exit. for p in processes: p.join() del b # Delete any remaining nympy handle used at creation for shm in sharememories: shm.close() shm.unlink() # Only if the exit was clean we close the shared array to avoid damaging processes during tests free_cpu(cpu_assignments) # NOTE : For now, each model iteration will log its data in its own data folder (1 per plant + 1 for soil + 1 for Light) return clean_exit
[docs] def stand_initialization(scene_name, xrange, yrange, sowing_density, sowing_depth, row_spacing, plant_models, plant_scenarios, plant_model_frequency, row_alternance=None, exact=False): # TODO : In the current state, field orientation relative to south cannot be chosen unique_plant_ID = 0 n_rows = int(xrange / row_spacing) actual_xrange = n_rows * row_spacing # Reccomputed to make sure the scene size is adapted to symetry number_per_row = max(int(yrange * xrange * sowing_density / n_rows), 1) intra_row_distance = yrange / number_per_row print(f"Launching scene with {n_rows} rows, {number_per_row} plant per rows, which represents {n_rows * number_per_row} plants") current_model_index = -1 planting_sequence = {} for x in range(n_rows): if exact: row_random_shear = lambda x: 0 else: row_random_shear = lambda x: (random.random()-0.5) * x / 2. for y in range(number_per_row): model_picker = random.random() low_bound = 0 for i, frequency in enumerate(plant_model_frequency): if low_bound < model_picker and model_picker <= frequency: current_model_index = i low_bound += frequency plant_ID=f"{plant_models[current_model_index].__name__}_{unique_plant_ID}_{scene_name}" planting_sequence[plant_ID] = dict( model=plant_models[current_model_index], scenario=plant_scenarios[current_model_index], coordinates=[(row_spacing / 2) + x * row_spacing, (intra_row_distance/2) + y * intra_row_distance + row_random_shear(intra_row_distance), - sowing_depth[current_model_index]], rotation=random.uniform(0, 360)) unique_plant_ID += 1 return actual_xrange, yrange, planting_sequence
[docs] def plant_worker(queues_soil_to_plants, queue_plants_to_soil, queues_light_to_plants, queue_plants_to_light, cpu_ids, stop_event, plant_model, plant_id, translator_path, output_dirpath, n_iterations, time_step, coordinates, rotation, scenario, logger_class, log_settings, heavy_log_period, record_performance: bool = False): # Pin to a specific set of cpus to avoid concurrency psutil.Process().cpu_affinity(cpu_ids) # Each process creates its local instance (which includes the unique properties). instance = plant_model(queues_soil_to_plants=queues_soil_to_plants, queue_plants_to_soil=queue_plants_to_soil, queues_light_to_plants=queues_light_to_plants, queue_plants_to_light=queue_plants_to_light, name=plant_id, time_step=time_step, coordinates=coordinates, rotation=rotation, translator_path=translator_path, **scenario) logger = logger_class(model_instance=instance, components=instance.components, outputs_dirpath=output_dirpath, time_step_in_hours=1, logging_period_in_hours=heavy_log_period, echo=False, **log_settings) iteration = 0 while not stop_event.is_set() and iteration < n_iterations: # Run plant time step if record_performance: logger.run_and_monitor_model_step() else: logger() instance.run() iteration += 1 print("Plant stopped") stop_event.set() logger.stop() os._exit(0)
[docs] def soil_worker(queues_soil_to_plants, queue_plants_to_soil, stop_event, soil_model, scene_xrange, scene_yrange, translator_path, output_dirpath, n_iterations, time_step, scenario, logger_class, log_settings, heavy_log_period): # Each process creates its local instance (which includes the unique properties). instance = soil_model(queues_soil_to_plants=queues_soil_to_plants, queue_plants_to_soil=queue_plants_to_soil, time_step=time_step, scene_xrange=scene_xrange, scene_yrange=scene_yrange, translator_path=translator_path, **scenario) logger = logger_class(model_instance=instance, components=instance.components, outputs_dirpath=output_dirpath, time_step_in_hours=1, logging_period_in_hours=heavy_log_period, echo=True, **log_settings) iteration = 0 while not stop_event.is_set() and iteration < n_iterations: # Run time step logger() instance.run() iteration += 1 print("Soil stopped") stop_event.set() logger.stop() os._exit(0)
[docs] def light_worker(queues_light_to_plants, queue_plants_to_light, stop_event, light_model, scene_xrange, scene_yrange, output_dirpath, n_iterations, time_step, scenario): # Maybe a little bit too specific here, since we used only Caribu we didn't use a metafspm utility to create the light model class import pandas as pd meteo = pd.read_csv(os.path.join("inputs", "meteo_Ljutovac2002.csv"), index_col='t') instance = light_model(scene_xrange=scene_xrange, scene_yrange=scene_yrange, meteo=meteo, **scenario) # Here no logging of the interception is performed as shoot models already log the energy they captured iteration = 0 while not stop_event.is_set() and iteration < n_iterations: # Run time step instance.run(queues_light_to_plants=queues_light_to_plants, queue_plants_to_light=queue_plants_to_light) iteration += 1 print("Light stopped") stop_event.set()
[docs] def plan_affinity(n_workers: int, threads_per_worker: int = 1, ids=None): ids = sorted(ids or psutil.Process().cpu_affinity()) lock_file = "outputs/lock" sync_file = "outputs/cpu_availability" while os.path.exists(lock_file): print("Waiting for cpu attribution to be unlocked") time.sleep(1) open(lock_file, "w").close() with open(sync_file, "a+") as f: f.seek(0) cpu_string = f.read() if len(cpu_string) == 0: cpu_string = ('0;' * len(ids))[:-1] if len(cpu_string) != (len(ids) * 2) - 1: print("[WARNING] Saved file for cpu repartition is corrupted. Starting from new attribution.") cpu_string = ('0;' * len(ids))[:-1] cpu_availability = [int(k) for k in cpu_string.split(";")] free_ids = [i for k, i in enumerate(ids) if cpu_availability[k] == 0] need = n_workers * threads_per_worker if len(free_ids) <= need: raise OverflowError("Launched simulations requiered more CPU cores than available") free_ids = free_ids[:need] assigned = [free_ids[i*threads_per_worker:(i+1)*threads_per_worker] for i in range(n_workers)] for cpu_idxs in assigned: for cpu_idx in cpu_idxs: cpu_availability[ids.index(cpu_idx)] = 1 output_cpu_string = '' for idx in cpu_availability: output_cpu_string += str(idx) + ';' output_cpu_string = output_cpu_string[:-1] # Remove old content and write result f.seek(0) f.truncate(0) f.write(output_cpu_string) f.close() os.remove(lock_file) return assigned
[docs] def free_cpu(cpu_list): ids = sorted(psutil.Process().cpu_affinity()) ids_idxs = [] for sublist in cpu_list: for i in sublist: ids_idxs.append(ids.index(i)) try: lock_file = "outputs/lock" sync_file = "outputs/cpu_availability" while os.path.exists(lock_file): print("Waiting for cpu attribution to be unlocked") time.sleep(1) open(lock_file, "w").close() with open(sync_file, "a+") as f: f.seek(0) cpu_string = f.read() if len(cpu_string) != (len(ids) * 2) - 1: last_process = True print("[WARNING] CPUs could not be freed because the file was deleted too early") else: cpu_availability = [int(k) for k in cpu_string.split(";")] for idx in ids_idxs: cpu_availability[idx] = 0 last_process = 1 not in cpu_availability output_cpu_string = '' for idx in cpu_availability: output_cpu_string += str(idx) + ';' output_cpu_string = output_cpu_string[:-1] # Remove old content and write result f.seek(0) f.truncate(0) f.write(output_cpu_string) f.close() if last_process: os.remove(sync_file) os.remove(lock_file) except FileNotFoundError: # Has already been terminated print("[WARNING] CPUs could not be freed because the file was deleted too early")