Source code for code_aster.Supervis.ExecuteCommand

# coding=utf-8
# --------------------------------------------------------------------
# Copyright (C) 1991 - 2026 - EDF - www.code-aster.org
# This file is part of code_aster.
#
# code_aster is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# code_aster is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with code_aster.  If not, see <http://www.gnu.org/licenses/>.
# --------------------------------------------------------------------

"""
All Commands executors are subclasses of :class:`.ExecuteCommand`.
Commands are factories (:meth:`ExecuteCommand.run` *classmethod*) that
create an executor that is called to create a result object.

When a new command is added there are different levels of complexity:

- Commands that are automatically added just using their catalog.
  Only the commands that return no result can work with this method.
  Other commands will raise a *NotImplementedError* exception at runtime.
  Macro-commands do not need a specific executor. Their catalog and ``ops()``
  function are sufficient.

  .. note:: All Commands that are not explicitly imported by
    :mod:`code_aster.Commands.__init__` are automatically created using this
    method.

- Commands where the Fortran operator is directly called. Usually it is just
  necessary to define the type of result object.
  Example: :mod:`~code_aster.Commands.defi_compor`.

- Commands where keywords are used to create the output result.
  Example: :mod:`~code_aster.Commands.defi_group`.

- Commands where several steps must be adapted. Usually, the operator directly
  calls C++ Objects to do the job.


For all features that are not yet supported, please use utilities from
the :mod:`code_aster.Utilities` functions (example:
:func:`~code_aster.Utilities.compatibility.unsupported` or
:func:`~code_aster.Utilities.compatibility.required`) to identify them.


Base classes
============
"""

# For SDVERI:
# aslint: disable=C4014

import gc
import inspect
import linecache
import re
import sys
import time
import traceback
from contextlib import contextmanager

import aster_core
import libaster

from ..Cata import Commands
from ..Cata.Language.SyntaxObjects import _F
from ..Cata.SyntaxChecker import CheckerError, checkCommandSyntax
from ..Cata.SyntaxUtils import force_list, mixedcopy, remove_none, search_for
from ..Messages import UTMESS, MessageLog
from ..Objects import DataStructure, DataStructureDict, NamedTuple, PyDataStructure
from ..Utilities import (
    DEBUG,
    ExecutionParameter,
    Options,
    config,
    deprecated,
    haveMPI,
    import_object,
    logger,
    no_new_attributes,
)
from ..Utilities.outputs import command_text, decorate_name
from .code_file import Tracking
from .CommandSyntax import CommandSyntax
from .ctopy import check_ds_object
from .Serializer import FinalizeOptions, saveObjectsFromContext


[docs]@contextmanager def command_execution(): """Context manager used to simulate the execution of a legacy Fortran operator.""" libaster.cmd_ctxt_enter() yield if libaster.jeveux_status(): libaster.cmd_ctxt_exit()
[docs]class ExecuteCommand: """This class implements an executor of commands. Commands executors are defined by subclassing this class and overloading some of the methods. A user's Command is a factory of these classes. The :meth:`run` factory creates an instance of one of these classes and executes successively these steps: - :meth:`compat_syntax_` to eventually change the user's keywords to adapt the syntax from an older version **before** syntax checking. Subclasses **must not overload** this method. The compatibility changes must be defined in ``compat_syntax`` attribute in the catalog. Subclasses may define a :meth:`change_syntax` method for modifications on non-keywords arguments. This should be limited to very specific cases. - :meth:`check_syntax` to check the user's keywords conformance to to catalog definition. - :meth:`adapt_syntax` to eventually change the user's keywords to adapt the syntax **after** syntax checking. Does nothing by default. - :meth:`create_result` to create the *DataStructure* object. The default implementation only works if the operator creates no object. - :meth:`exec_` that is the main function of the Command. The default implementation calls the *legacy* Fortran operator. - :meth:`post_exec` that allows to execute additional code after the main function. Does nothing by default. If a :attr:`libaster.AsterError` exception (or a derivated) is raised during :meth:`exec_` the behaviour depends on the execution mode. - In a standard study, the results are saved and the execution is interrupted. The current result is available if it is validated by the command and the study can be restarted. - In the testcase mode (option ``--test`` used, see :class:`~code_aster.Utilities.ExecutionParameter.ExecutionParameter` or ``CODE`` in :func:`DEBUT`/:func:`POURSUITE`), the exception is raised and can be catched in the commands file. The current result will be available in the ``except`` statement if it is validated by the command. """ # class attributes command_name = command_op = command_cata = None level = 0 hook = [] _cata = _op = _result = _counter = _caller = _exc = None _tuplmode = None __setattr__ = no_new_attributes(object.__setattr__)
[docs] def __init__(self): """Initialization""" assert self.command_name, "'command_name' attribute is not defined!" self._cata = self.command_cata or getattr(Commands, self.command_name) self._op = self.command_op or self._cata.definition["op"] self._result = None # index of the command self._counter = 0 current_opt = ExecutionParameter().option self._tuplmode = current_opt & Options.UseLegacyMode == 0
[docs] @classmethod def run(cls, **kwargs): """Run the command (class method). Arguments: keywords (dict): User keywords """ cmd = cls() return cmd.run_(**kwargs)
[docs] def run_(self, **kwargs): """Run the command (worker). Arguments: keywords (dict): User keywords """ self._tuplmode = kwargs.pop("__use_namedtuple__", self._tuplmode) keywords = mixedcopy(kwargs) self.keep_caller_infos(keywords) timer = ExecutionParameter().timer if self.command_name not in ("DEBUT", "POURSUITE", "FIN"): check_jeveux() ExecuteCommand.level += 1 MessageLog.reset_command() self._counter = ExecutionParameter().incr_command_counter() if self.show_syntax() or self.command_name in ("DEBUT", "POURSUITE"): timer.Start(str(self._counter), name=self.command_name) timer.Start(" . check syntax", num=1.1e6) self.compat_syntax_(keywords) try: self.check_syntax(keywords) finally: timer.Stop(" . check syntax") self.adapt_syntax(keywords) if ExecutionParameter().option & Options.TestMode: Tracking.add("KWDS", self._cata, self.command_name, keywords) self.create_result(keywords) if not getattr(self._result, "userName", "n/a"): # attribute does exist but is not defined self._result.userName = get_user_name( self.command_name, self._caller["filename"], self._caller["lineno"] ) self.print_syntax(keywords) with ExceptHookManager.wraps(self): try: if self._op is None and self.command_name not in ("DEBUT", "POURSUITE"): # for commands that do not call any operator, do initializations libaster.call_oper_init() with command_execution(): self.exec_(keywords) except libaster.AsterError as exc: self._exc = exc self.publish_result() raise finally: self.post_exec_(keywords) ExecuteCommand.level -= 1 return self._result
[docs] def publish_result(self): """Publish the result in the caller context.""" if not hasattr(self._result, "userName"): return publish_in(self.caller_context, {self._result.userName: self._result})
@property def caller_context(self): """dict: The context from which the command has be called.""" return self._caller["context"] @property def exception(self): """*AsterError*: Exception raised during the execution, *None* if the execution was successful. """ return self._exc
[docs] @staticmethod def register_hook(func): """Register a hook for *post_exec* step. Arguments: func (function): Function with signature: *ExecuteCommand*, *dict* of keywords. """ ExecuteCommand.hook.append(func)
[docs] @classmethod def show_syntax(cls): """Tell if the current command syntax should be printed. Returns: bool: *True* if the syntax should be printed. """ opt = ExecutionParameter().option return opt & Options.ShowSyntax and ( ExecuteCommand.level <= 1 or opt & Options.ShowChildCmd )
[docs] def _call_oper(self, syntax): """Call fortran operator. Arguments: syntax (*CommandSyntax*): Syntax description with user keywords. """ return libaster.call_oper(syntax, 0)
@property def name(self): """Returns the command name.""" return self._cata.name def _add_deps_keywords(self, toVisit): name = self._result.getName() if type(toVisit) in (list, tuple): for value in toVisit: if isinstance(value, DataStructure): if name != value.getName(): self._result.addDependency(value) else: self._add_deps_keywords(value) elif type(toVisit) in (dict, _F): for value in toVisit.values(): self._add_deps_keywords(value) elif isinstance(toVisit, DataStructure): if name != toVisit.getName(): self._result.addDependency(toVisit)
[docs] def add_dependencies(self, keywords): """Register input *DataStructure* objects as dependencies. The default implementation registers all input objects as dependencies. Arguments: keywords (dict): User's keywords. """ self._add_deps_keywords(keywords)
[docs] def remove_dependencies(self, keywords, key1, key2=None): """Remove dependencies to the input *DataStructure* objects passed with given keywords. Arguments: keywords (dict): User's keywords. key1 (str/list[str]): One or more simple keywords, or factor keyword. key2 (str/list[str]): One or more simple keywords if `key1` is a factor keyword. """ if not key2: key1 = force_list(key1) for key in key1: value = keywords.get(key) if not value: continue for obj in force_list(value): self._result.removeDependency(obj) else: assert type(key1) is str, key1 fact = keywords.get(key1) if not fact: return key2 = force_list(key2) for occ in force_list(fact): for key in key2: value = occ.get(key) if not value: continue for obj in force_list(value): self._result.removeDependency(obj)
[docs] def compat_syntax_(self, keywords): """Call the compatibility function defined in catalog. As the keywords have not been checked, no assumptions can be done about the presence or the number of values for example. If the syntax is already valid, call :meth:`adapt_syntax` instead. Arguments: keywords (dict): Keywords arguments of user's keywords, changed in place. """ try: self._cata.get_compat_syntax()(keywords) except (AssertionError, KeyError, TypeError, ValueError, AttributeError) as exc: # in case of syntax error, show the syntax and raise the exception self.print_syntax(keywords) ExecuteCommand.level -= 1 msg = getattr(exc, "msg", str(exc)) if ExecutionParameter().option & Options.Debug: logger.error(msg) raise UTMESS("F", "DVP_10", valk=("compat_syntax", self.command_name, msg)) self.change_syntax(keywords)
[docs] def change_syntax(self, keywords): """Hook to change keywords before checking syntax. You must define `compat_syntax` in the command catalog to adapt the syntax from an old version or for compatibility. `compat_syntax` is also used by AsterStudy, `change_syntax` is only called during execution and limited to very specific cases. If the syntax is already valid, call :meth:`adapt_syntax` instead. Arguments: keywords (dict): Keywords arguments of user's keywords, changed in place. """
[docs] def adapt_syntax(self, keywords): """Hook to adapt syntax *after* syntax checking. As the syntax is valid, changes are easier: existence, number of values verify syntax description. Arguments: keywords (dict): Keywords arguments of user's keywords, changed in place. """
[docs] def print_syntax(self, keywords): """Print an echo of the Command. Arguments: keywords (dict): Keywords arguments of user's keywords. """ if not self.show_syntax(): return printed_args = mixedcopy(keywords) remove_none(printed_args) filename = self._caller["filename"] lineno = self._caller["lineno"] logger.info( "\n.. _stg{0}_{1}".format( ExecutionParameter().get_option("stage_number"), self._caller["identifier"] ) ) logger.info(command_separator()) logger.info(command_header(self._counter, filename, lineno)) max_print = ExecutionParameter().get_option("max_print") user_name = get_user_name(self.command_name, filename, lineno, strict=False) logger.info(command_text(self.name, printed_args, user_name, limit=max_print))
[docs] def print_result(self): """Print an echo of the result of the command.""" if not self.show_syntax(): return if self._result is not None: logger.info(command_result(self._counter, self.name, self._result)) if hasattr(self._result, "getDependencies"): deps = self._result.getDependencies() if deps: logger.info(MessageLog.GetText("I", "SUPERVIS2_80")) for dep in deps: logger.info("# - {0} {1}".format(*_get_object_repr(dep))) self._print_stats()
[docs] def _print_stats(self): """Print the memory and timer informations.""" # not called if not show_syntax() timer = ExecutionParameter().timer times = timer.StopAndGet(str(self._counter)) if ExecutionParameter().option & Options.TestMode: Tracking.add("PERF", self.name, *times) logger.info(command_memory(self.name)) logger.info(command_time(self._counter, *times)) logger.info(command_separator())
[docs] def check_syntax(self, keywords): """Check the syntax passed to the command. *keywords* will contain default keywords after executing this method. Arguments: keywords (dict): Keywords arguments of user's keywords, changed in place. """ try: self._cata.addDefaultKeywords(keywords) remove_none(keywords) logger.debug("checking syntax of %s...", self.name) max_check = ExecutionParameter().get_option("max_check") checkCommandSyntax(self._cata, keywords, add_default=False, max_check=max_check) except ( CheckerError, AssertionError, KeyError, TypeError, ValueError, AttributeError, ) as exc: # in case of syntax error, show the syntax and raise the exception self.print_syntax(keywords) ExecuteCommand.level -= 1 msg = getattr(exc, "msg", str(exc)) if ExecutionParameter().option & Options.Debug: logger.error(msg) raise UTMESS("F", "SUPERVIS_4", valk=(self.command_name, msg))
[docs] def create_result(self, keywords): """Create the result before calling the *exec* command function if needed. The result is stored in an internal attribute and will be built by *exec_*. Arguments: keywords (dict): Keywords arguments of user's keywords. """ sd_type = self._cata.get_type_sd_prod(**keywords) if sd_type is None: self._result = None else: raise NotImplementedError( "Method 'create_result' must be overridden for {0!r}.".format(self.name) )
@property def result(self): """misc: Attribute that holds the result of the Command.""" return self._result
[docs] def exec_(self, keywords): """Execute the command. Arguments: keywords (dict): User's keywords. """ syntax = CommandSyntax(self.name, self._cata) syntax.define(keywords, add_default=False) # set result and type names if self._result is None or type(self._result) is int: type_name = "" result_name = "" else: type_name = self._result.getType() result_name = self._result.getName() syntax.setResult(result_name, type_name) timer = ExecutionParameter().timer try: timer.Start(" . fortran", num=1.2e6) self._call_oper(syntax) finally: timer.Stop(" . fortran") syntax.free() # for special fortran operator that returns a int/float if syntax.getResultValue() is not None: self._result = syntax.getResultValue()
[docs] def post_exec_(self, keywords): """Post-treatments executed after the `exec_` function. Commands may override `post_exec` method to add specific actions. Arguments: keywords (dict): Keywords arguments of user's keywords. """ try: self.post_exec(keywords) if isinstance(self._result, DataStructure): self.add_dependencies(keywords) if not self._exc and ExecutionParameter().get_option("sdveri"): self.check_ds() for func in self.hook: func(self, keywords) finally: self.cleanup() self.print_result()
[docs] def post_exec(self, keywords): """Hook that allows to add post-treatments after the *exec* function. .. note:: If the Command executes the *op* fortran subroutine and if the result DataStructure references any inputs in a Jeveux object, pointers on these inputs must be explicitly references into the result. Arguments: keywords (dict): Keywords arguments of user's keywords. """
[docs] def cleanup(self): """Clean-up function."""
[docs] def check_ds(self): """Check a result created by the command. Arguments: result (*DataStructure*): Object to be checked. """ if not self.show_syntax(): return self.check_ds_result(self._result)
[docs] @staticmethod def check_ds_result(result): """Check a *DataStructure* object. Arguments: result (*DataStructure*): Object to be checked. """ if not isinstance(result, DataStructure): return timer = ExecutionParameter().timer timer.Start(" . sdveri", num=1.3e6) try: iret = check_ds_object(result.sdj) if iret != 0: if logger.getEffectiveLevel() == DEBUG: result.debugPrint() logger.error( "SDVERI ended with exit code {0} for {1!r} ({2!r}, {3!r})".format( iret, result.getName(), result.getType(), result.sdj.__class__.__name__ ) ) finally: timer.Stop(" . sdveri")
[docs] def keep_caller_infos(self, keywords, level=3): """Register the caller frame infos. Arguments: keywords (dict): Dict of keywords. level (Optional[int]): Number of frames to rewind to find the caller. Defaults to 2 (1: *here*, 2: *run_*, 3: *run*). """ self._caller = {"filename": "unknown", "lineno": 0, "context": {}, "identifier": ""} caller = inspect.currentframe() try: for _ in range(level): if not caller.f_back: break caller = caller.f_back self._caller["filename"] = caller.f_code.co_filename self._caller["lineno"] = caller.f_lineno self._caller["context"] = caller.f_globals finally: del caller try: identifier = "cmd{0}".format(keywords.pop("identifier")) except KeyError: identifier = "txt{0}".format(self._caller["lineno"]) self._caller["identifier"] = identifier
[docs] @classmethod def debugPrintAll(cls): """Call ``debugPrint()`` after each command. Use ``ExecuteCommand.debugPrintAll()`` in a testcase to make each command to dump its result content. """ def _debugPrintAll(cmd, dummy): if not isinstance(cmd.result, DataStructure): return cmd.result.debugPrint(unit=10000 + cmd._counter) cls.register_hook(_debugPrintAll)
[docs]def check_jeveux(): """Check that the memory manager (Jeveux) is up.""" if not libaster.jeveux_status(): raise RuntimeError("code_aster memory manager is not started. No command can be executed.")
[docs]class ExecuteMacro(ExecuteCommand): """This implements an executor of *legacy* macro-commands. The OPS function of *legacy* macro-commands returns a return code and declared results through ``self.register_result(...)``. Now the results must be directly returned by the OPS function. Long term: ``result = MACRO_COMMAND(**keywords)`` where ``result`` is a *DataStructure* object (for one result) or a *namedtuple-like* object (a code_aster *NamedTuple*) for several results). ``result`` is created by the *ops* function. For compatibility: :meth:`create_result` gets the names of the results. The *ops* function returns the "main" result and registers others with :meth:`register_result`. :meth:`exec_` adds it at first position of the *namedtuple* under the name "main". Attributes: _sdprods (list[:class:`CO`]): List of CO objects. _result_names (list[str]): List of expected results names. _add_results (dict): Dict of additional results. """ # class attributes _last_cleanup = 0 _last_cleanup_date = time.time() _check_freq = False _sdprods = _result_names = _add_results = None
[docs] def __init__(self): """Initialization""" super(ExecuteMacro, self).__init__() self._op = self.command_op or import_object(self._op)
[docs] def create_result(self, keywords): """Macro-commands create their results in the *exec_* method. Arguments: keywords (dict): Keywords arguments of user's keywords. """ def _predicate(value): if type(value) in (list, tuple): return all(isinstance(val, CO) for val in value) return isinstance(value, CO) self._sdprods = search_for(keywords, _predicate) if self._sdprods: names_i = ( (ii.getName() for ii in i) if (type(i) in (list, tuple)) else [i.getName()] for i in self._sdprods ) names = [ii for i in names_i for ii in i] self._result_names = names self._add_results = {}
[docs] def print_result(self): """Print an echo of the result of the command.""" if not self.show_syntax(): return if not self._sdprods and self._result: logger.info(command_result(self._counter, self.name, self._result)) if self._result_names: for name in self._result_names: logger.info(command_result(self._counter, self.name, self._add_results.get(name))) self._print_stats()
[docs] def cleanup(self): """Clean-up function. Force deletion of removed objects after each macro-command. Some macro-commands may directly call ``gc.collect()`` if they create a lot of objects or if there is a risk to recreate an object with the same name (example: NonLinearOperator). """ # extract 'max_check' once (suppose to optimize performance if it is small) if ExecuteMacro._last_cleanup == 0: ExecuteMacro._check_freq = ExecutionParameter().get_option("max_check") < 5 if ExecuteMacro._check_freq and time.time() - ExecuteMacro._last_cleanup_date < 1.0: return if ExecuteCommand.level > 1: if self._counter < ExecuteMacro._last_cleanup + 250: return ExecuteMacro._last_cleanup = self._counter ExecuteMacro._last_cleanup_date = time.time() timer = ExecutionParameter().timer timer.Start(" . cleanup", num=1.9e6) gc.collect() timer.Stop(" . cleanup")
[docs] def exec_(self, keywords): """Execute the command and fill the *_result* attribute. Arguments: keywords (dict): User's keywords. """ output = self._op(self, **keywords) assert not isinstance(output, int), "OPS must return results, not 'int'." if not self._tuplmode: self._result = output # re-assign the user variable name if hasattr(self._result, "userName"): user_name = get_user_name( self.command_name, self._caller["filename"], self._caller["lineno"] ) self._result.userName = user_name if self._add_results: publish_in(self.caller_context, self._add_results) return if not self._sdprods: self._result = output else: dres = dict(main=output) dres.update([(k, v) for k, v in self._add_results.items() if k in self._result_names]) missing = set(self._result_names).difference(list(dres.keys())) if missing: raise ValueError("Missing results: {0}".format(tuple(missing))) self._result = NamedTuple(dres)
[docs] def register_result(self, result, target): """Register an additional result. It must called **after** that the result has been created. Arguments: result (*DataStructure*): Result object to register. target (:class:`CO`): CO object. """ name = target.getName() orig = result.userName result.userName = name self._add_results[name] = result if ExecutionParameter().option & Options.ShowChildCmd: logger.info(MessageLog.GetText("I", "SUPERVIS2_69", valk=(orig, name)))
[docs] def check_ds(self): """Check all results created by the command. Arguments: result (*DataStructure*): Object to be checked. """ if not self.show_syntax(): return super().check_ds() if self._add_results: for additional in self._add_results.values(): self.check_ds_result(additional)
[docs] def add_dependencies(self, keywords): """Register input *DataStructure* objects as dependencies. Macro-commands should not have to add dependencies. This should be done by embedded commands. Arguments: keywords (dict): User's keywords. """
@property @deprecated(case=2) def reuse(self): return @property @deprecated(case=2) def sd(self): return @property @deprecated(case=3, help="Use the 'logger' object instead.") def cr(self): return logger @property def sdprods(self): """Attribute that holds the auxiliary results.""" return self._sdprods or []
[docs]def UserMacro(name, cata, ops): """Helper function that defines a user macro-command from its catalog description and its operator. Arguments: name (str): Name of the macro-command. cata (*PartOfSyntax*): Catalog description of the macro-command. ops (str): OPS function of the macro-command. Returns: *ExecuteCommand.run*: Executor function. """ class Macro(ExecuteMacro): """Execute legacy operator.""" command_name = name command_cata = cata @staticmethod def command_op(self, **kwargs): return ops(self, **kwargs) return Macro.run
[docs]class CO(PyDataStructure): """Object that identify an auxiliary result of a Macro-Command.""" _name = None @property def value_repr(self): """Returns the representation as a keyword value.""" return "CO('{0}')".format(self.userName)
[docs] def is_typco(self): """Tell if it is an auxiliary result.""" return True
[docs] def getType(self): """Return a type for syntax checking.""" return "CO"
[docs] def setType(self, typ): """Declare the future type.""" pass
# used in AsterStudy as settype settype = setType
[docs]def publish_in(context, dict_objects): """Publish some objects in a context. It supports list of results as declared in code_aster legacy: If an object is name ``xxx_n`` and that ``xxx`` exists and is a list in *context* and ``xxx[n]`` exists this item of the list is replaced by the new value. Arguments: context (dict): Destination context, changed in place. dict_objects (dict): Objects to be added into the context. """ re_id = re.compile("^(.*)_([0-9]+)$") for name, value in list(dict_objects.items()): indexed = re_id.search(name) if indexed: bas, idx = indexed.groups() idx = int(idx) seq = context.get(bas) if seq and isinstance(seq, list) and len(seq) > idx: seq[idx] = value continue context[name] = value
[docs]def get_user_name(command, filename, lineno, strict=True): """Parse the caller syntax to extract the name used by the user. Arguments: command (str): Command name. filename (str): Filename of the caller lineno (int): Line number in the file. strict (bool, optional): Tell if the name must be a valid Python variable name. Returns: str: Variable name used by the user, empty if not found. """ re_comment = re.compile(r"^\s*#.*") re_oper = re.compile(r"\b{0}\s*\(".format(command)) if strict: re_name = re.compile(r"^\s*(?P<name>\w+)\s*" r"=\s*{0}\s*\(".format(command)) else: re_name = re.compile(r"^\s*(?P<name>.+)\s*" r"=\s*{0}\s*\(".format(command)) while lineno > 0: line = linecache.getline(filename, lineno) lineno = lineno - 1 if re_comment.match(line): continue if re_oper.search(line): mat = re_name.search(line) if mat: # str() because of linecache in ipython, remove it in py3 return str(mat.group("name")).strip() break return ""
[docs]def command_separator(): """Return a separator line. Returns: str: A separator line. """ if not hasattr(command_separator, "cache"): command_separator.cache = MessageLog.GetText("I", "SUPERVIS2_70") return command_separator.cache
[docs]def command_header(counter, filename, lineno): """Return the command header. Arguments: counter (int): Number of the command. Returns: str: String representation. """ return MessageLog.GetText("I", "SUPERVIS2_71", vali=(counter, lineno), valk=filename)
def _get_object_repr(obj): typ = "" if hasattr(obj, "getName"): nam = decorate_name(obj.getName().strip()) if obj.userName: nam = "{0} ({1})".format(obj.userName.strip(), nam) typ = MessageLog.GetText("I", "SUPERVIS2_76", valk=type(obj).__name__) elif isinstance(obj, str): nam = decorate_name(obj) else: nam = str(obj) return nam, typ
[docs]def command_result(counter, command_name, result): """Return the command footer. Arguments: counter (int): Number of the command. command_name (str): Command name. result (~.code_aster.Objects.DataStructure|str|list[str]): Result object or name(s) of the result(s) of the command. Returns: str: String representation. """ show_name, show_type = _get_object_repr(result) dict_args = dict(vali=counter, valk=(command_name, show_name, show_type)) return MessageLog.GetText("I", "SUPERVIS2_72", **dict_args)
[docs]def command_memory(command_name): """Return a representation of the current memory consumption. Arguments: command_name (str): Command name. Returns: str: String representation. """ rval, iret = aster_core.get_mem_stat("VMPEAK", "VMSIZE", "CMAX_JV", "CMXU_JV") txt = "" if iret == 0: if ExecutionParameter().option & Options.TestMode: Tracking.add("MEM", command_name, *rval) if rval[0] > 0.0: txt = MessageLog.GetText("I", "SUPERVIS2_73", valr=rval) else: txt = MessageLog.GetText("I", "SUPERVIS2_74", valr=rval) return txt
[docs]def command_time(counter, cpu, system, elapsed): """Return a representation of elapsed times in a command. Arguments: cpu (float): User time. system (float): System time. elapsed (float): Elapsed time. Returns: str: String representation. """ return MessageLog.GetText("I", "SUPERVIS2_75", vali=counter, valr=(cpu, system, elapsed))
[docs]class loop_on_dsdict: r"""Decorator on ExecuteCommand that overloads 'run\_' method to loop on each result of a DataStructureDict. Arguments: mark (str): Possible paths to keywords that may hold the DataStructureDict object. Format: "mcf1/mcs1|mcf2/mcs2|mcs3|..." """
[docs] def __init__(dec, mark: str): dec._mark: str = mark
def __call__(dec, command): command._orig_run_ = command.run_ def run_(self, **kwargs): """Run the command.""" def path_(kwds: dict): """Return the path to the relevant keyword""" candidates = dec._mark.split("|") for key in candidates: store = kwds mcs = key.split("/") if len(mcs) < 2: mcf = None else: mcf = mcs.pop(0) if not kwds.get(mcf): mcf = None continue store = kwds[mcf] mcs = mcs.pop(0) if store.get(mcs): return mcf, mcs return None def extr_(kwds: dict, path: tuple[str]): """Return the value of the relevant keyword""" mcf, mcs = path if mcf: kwds = kwds[mcf] return kwds[mcs] def set_(kwds: dict, path: tuple[str], value: DataStructureDict): """Set the value of the relevant keyword""" mcf, mcs = path if mcf: kwds = kwds[mcf] kwds[mcs] = value self.keep_caller_infos(kwargs) caller = self._caller path = path_(kwargs) if not path or not isinstance(extr_(kwargs, path), DataStructureDict): output = command._orig_run_(self, **kwargs) if not getattr(output, "userName", "n/a"): # attribute does exist but is not defined output.userName = get_user_name( self.command_name, caller["filename"], caller["lineno"] ) return output keywords = mixedcopy(kwargs) input = extr_(keywords, path) in_place = "reuse" in keywords if in_place: output = keywords["reuse"] assert output is input, "unexpected object in reuse" else: self.create_result(keywords) output = self._result if output is not None and not isinstance(output, DataStructureDict): raise TypeError(f"Output should be a DataStructureDict or None, not {output}.") for key in input: set_(keywords, path, input[key]) if in_place: keywords["reuse"] = extr_(keywords, path) output[key] = command._orig_run_(self, **keywords) return output command.run_ = run_ return command
[docs]class ExceptHookManager: """Object to manager the definition of the *excepthook*.""" _current_cmd = None
[docs] @classmethod def enter(cls, command): """Assign the excepthook when entering in a command.""" # keep the outer command if not cls._current_cmd: cls._current_cmd = command
[docs] @classmethod def exit(cls, command): """Called when exiting a command.""" if command is cls._current_cmd: cls._current_cmd = None
[docs] @classmethod def init(cls): """Assign the basic hook.""" sys.excepthook = cls.excepthook
[docs] @classmethod def excepthook(cls, type, value, traceb): """Hook called on uncaught exceptions raised during the execution of a command. Args: command (ExecuteCommand): The current command. type (type): Exception type. value (BaseException): Exception instance. traceb (traceback): Exception traceback. """ opt = ExecutionParameter().option traceback.print_exception(type, value, traceb, file=sys.stderr) sys.stderr.flush() if config["ASTER_PLATFORM_MINGW"]: time.sleep(1) if ( sys.flags.interactive or opt & Options.InteractiveInterpreter or "__IPYTHON__" in globals() ): print("An exception occurred! Return to interactive session.") return if cls._current_cmd: options = FinalizeOptions.Set if haveMPI() and not opt & Options.HPCMode: options |= FinalizeOptions.OnlyProc0 if opt & Options.SaveBase: options |= FinalizeOptions.SaveBase print("\nException: Trying to close the database after an uncaught exception...\n") print( f"\nPublishing the result of the current command {cls._current_cmd.name}...\n", flush=True, ) cls._current_cmd.publish_result() saveObjectsFromContext(cls._current_cmd.caller_context, options=options) else: sys.__excepthook__(type, value, traceb)
[docs] @classmethod @contextmanager def wraps(cls, command): """Context manager used to enable a specific hook during a command execution.""" cls.enter(command) yield cls.exit(command)
ExceptHookManager.init()