Source code for fluidsimfoam.output.log

"""Class for the ``sim.output.log`` object"""

import re

import matplotlib.pyplot as plt
import numpy as np

from fluidsim_core.output.remaining_clock_time import RemainingClockTime


[docs]def get_log_tail(path_file, nbytes=1000): log_size = path_file.stat().st_size with open(path_file, "r") as file: file.seek(max(0, log_size - nbytes)) return file.read()
[docs]def read_time_last(path_file): for nbytes in [1000, 10000]: text = get_log_tail(path_file, nbytes) index = text.rfind("\nTime = ") if index != -1: break if index == -1: print("'Time = ' not found in log file") return None text = text[index + 8 :] return float(text.split(None, 1)[0])
[docs]class Log(RemainingClockTime): _tag = "log" def __init__(self, output): self.output = output self._path_file = None def _load_times(self): """Load remaining time data. - equation_times - remaining_clock_times - clock_times_per_timestep - equation_time_start - full_clock_time """ text = self.text match = re.search(r"start_time = ([\d\.]+)\n", text) if match is not None: equation_time_start = float(match.groups()[0]) else: equation_time_start = 0.0 match = re.search(r"end_time = ([\d\.]+)\n", text) if match is not None: eq_time_end = float(match.groups()[0]) else: raise RuntimeError eq_times = re.findall(r"\nTime = ([\d\.]+e?[-\d]*)", text) clock_times = re.findall(r"\nExecutionTime = ([\d\.]+)", text) eq_times = eq_times[: len(clock_times)] eq_times = np.array([float(word) for word in eq_times]) clock_times = np.array([float(word) for word in clock_times]) estimation_clock_time_per_time_step = clock_times[-1] / len(clock_times) indices_time_step = np.arange(clock_times.size) # remove times with same clock time clock_times, indices_unique = np.unique(clock_times, return_index=True) eq_times = eq_times[indices_unique] indices_time_step = indices_time_step[indices_unique] # decimate if needed precision_clock = 0.01 step = round( min(8 * precision_clock, (clock_times.max() - clock_times.min()) / 2) / estimation_clock_time_per_time_step ) if step > 1: clock_times = clock_times[::step] eq_times = eq_times[::step] indices_time_step = indices_time_step[::step] delta_eq_times = np.diff(eq_times) eq_times = eq_times[:-1] remaining_eq_times = eq_time_end - eq_times delta_clock_times = np.diff(clock_times) remaining_clock_times = ( remaining_eq_times / delta_eq_times * delta_clock_times ) data = { "equation_times": eq_times, "remaining_clock_times": remaining_clock_times, "clock_times_per_timestep": delta_clock_times / np.diff(indices_time_step), "equation_time_start": equation_time_start, "full_clock_time": clock_times[-1], "remaining_eq_times": remaining_eq_times, "delta_eq_times": delta_eq_times, "delta_clock_times": delta_clock_times, } return data @property def path_file(self): output = self.output if output and not self._path_file: path_run = output.path_run logfiles = sorted(path_run.glob("log*.txt")) if logfiles: self._path_file = logfiles[-1] else: print("No log file found") return None return self._path_file.resolve() @path_file.setter def path_file(self, path_log_file): self._path_file = path_log_file @property def text(self): if self.path_file is None: return None with open(self.path_file) as file: return file.read()
[docs] def get_log_tail(self, nbytes=1000): if self.path_file is None: raise IOError(f"No log file found in {self.output.path_run}") return get_log_tail(self.path_file, nbytes)
@property def time_last(self): if self.path_file is None: return None return read_time_last(self.path_file) def _choose_variable_name(self, variable_name): if ( variable_name is not None and variable_name not in self.output.name_variables ): raise ValueError( f"variable_name has to be in {self.output.name_variables}" ) if variable_name is None: for key in ("p", "p_rbgh"): if key in self.output.name_variables: variable_name = key break return variable_name
[docs] def get_last_residual(self, variable_name=None): if self.path_file is None: return None text = self.get_log_tail(5000) variable_name = self._choose_variable_name(variable_name) matches = list( re.finditer( r"\nTime = (?P<time>[\d\.]+e?-?[\d]*)" rf"[\s\S]+?Solving for {variable_name}, " r"Initial residual = (?P<initial>[\d\.]+e?-?[\d]*)", text, ) ) if not matches: raise ValueError(f"No match found for {variable_name}, text=\n{text}") match = matches[-1] data = match.groupdict() time = float(data["time"]) residual = float(data["initial"]) return time, residual
[docs] def plot_residuals(self, variable_name=None, tmin=0.0): variable_name = self._choose_variable_name(variable_name) matches = list( re.finditer( r"\nTime = (?P<time>[\d\.]+e?-?[\d]*)" rf"[\s\S]+?Solving for {variable_name}, " r"Initial residual = (?P<initial>[\d\.]+e?-?[\d]*)", self.text, ) ) times = np.empty(len(matches)) residuals = np.empty_like(times) for index, match in enumerate(matches): data = match.groupdict() times[index] = data["time"] residuals[index] = data["initial"] fig, ax = plt.subplots() residuals = residuals[times > tmin] times = times[times > tmin] ax.plot(times, residuals) ax.set_xlabel("equation time") fig.suptitle(f"Initial residuals {variable_name}") return times, residuals