# coding=utf-8
# --------------------------------------------------------------------
# Copyright (C) 1991 - 2026 - EDF - www.code-aster.org
# This file is part of code_aster.
#
# code_aster is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# code_aster is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with code_aster. If not, see <http://www.gnu.org/licenses/>.
# --------------------------------------------------------------------
"""
:py:mod:`debugging` --- Debugging utilities
*******************************************
This module defines some convenient utilities that **are not intended to
be used in production**.
Check for dependency between datastructures
===========================================
Use the ``--hook_post_exec`` command line argument to enable a hook called
after each command.
For example: use in the ``.export`` file:
.. code-block:: none
A args --hook_post_exec=code_aster.Helpers.debugging.check_dependencies
"""
import os
import pickle
import time
from contextlib import contextmanager
from functools import wraps
import numpy as np
from ..Cata.Language.SyntaxUtils import force_list
from ..Objects import DataStructure
from ..Utilities import get_caller_context
from .LogicalUnit import LogicalUnitFile
SKIPPED = object()
[docs]class DataStructureFilter:
"""This object store the path to *DataStructure* objects that are
referenced in a dump.
Arguments:
current (str): Name of the current result.
dump (str): Text dump of a *DataStructure*.
"""
[docs] def __init__(self, current, dump):
"""Initialization"""
self._parent_context = []
self._current = current
self._dump = dump
self._keep = None
self._path = []
self.paths = []
[docs] def set_parent_context(self, context):
"""Set the parent context.
If the checker is directly called on a keyword (without checking the
command itself) the values defined upper may be required to evaluate
blocks conditions but the parent context has not been filled.
This parent context could be an optional argument in visit* functions.
"""
self._parent_context.append(context)
[docs] def visitCommand(self, step, userDict=None):
"""Visit a Command object"""
self._parent_context.append(userDict)
self._visitComposite(step, userDict)
self._parent_context.pop()
[docs] def visitMacro(self, step, userDict=None):
"""Visit a Macro object"""
self.visitCommand(step, userDict)
[docs] def visitBloc(self, step, userDict=None):
"""Visit a Bloc object"""
pass
[docs] def visitFactorKeyword(self, step, userDict=None):
"""Visit a FactorKeyword object"""
# debug_message2("checking factor with", userDict)
self._visitComposite(step, userDict)
[docs] def visitSimpleKeyword(self, step, skwValue):
"""Visit a SimpleKeyword object"""
islist = type(skwValue) in (list, tuple)
skwValue = force_list(skwValue)
keep = []
for i in skwValue:
if (
isinstance(i, DataStructure)
and i.getName() != self._current
and i.getName() in self._dump
):
keep.append(i)
if keep and not islist:
keep = keep[0]
self._keep = keep or SKIPPED
[docs] def _visitComposite(self, step, userDict=None):
"""Visit a composite object (containing BLOC, FACT and SIMP objects)"""
if isinstance(userDict, dict):
userDict = [userDict]
# loop on occurrences filled by the user
for userOcc in userDict:
userOrig = userOcc.copy()
ctxt = self._parent_context[-1] if self._parent_context else {}
# loop on keywords provided by the user
for key, value in userOcc.items():
self._path.append(key)
# NB: block conditions are evaluated with the original values
kwd = step.getKeyword(key, userOrig, ctxt)
if not kwd:
continue
kwd.accept(self, value)
if self._keep is not None:
if self._keep is not SKIPPED:
self.paths.append(self._path[:])
self._keep = None
self._path.pop(-1)
[docs]def dump_datastructure(obj):
"""Return a dump (IMPR_CO) of a *DataStructure*.
Arguments:
obj (~code_aster.Objects.DataStructure): Object to be dumped.
Returns:
str: Output of IMPR_CO/debugPrint.
"""
filename = "dump-27406.txt"
dumpfile = LogicalUnitFile.open(filename)
obj.debugPrint(dumpfile.unit)
dumpfile.release()
with open(filename, "rb") as fobj:
dump = fobj.read().decode("ascii", errors="replace")
os.remove(filename)
return dump
[docs]def track_dependencies(inst, keywords):
"""Hook that tracks commands dependencies.
Arguments:
inst (function): the *ExecuteCommand* instance.
keywords (dict): User keywords.
"""
cata = inst._cata
result = inst._result
if not isinstance(result, DataStructure):
return
dump = dump_datastructure(result)
visitor = DataStructureFilter(result.getName(), dump)
cata.accept(visitor, keywords)
if not visitor.paths:
return
paths = ["/".join(path) for path in visitor.paths]
print("#27406:", inst.command_name, " ".join(paths))
[docs]def check_dependencies(inst, _):
"""Hook that check dependencies
Arguments:
inst (function): the *ExecuteCommand* instance.
"""
if not hasattr(check_dependencies, "_storage"):
check_dependencies._storage = {}
if inst.command_name == "POURSUITE":
context = get_caller_context(5)
register_context(context, check_dependencies._storage)
result = inst._result
if not isinstance(result, DataStructure):
return
# keep all created DS
# TODO check if they still exist?
name = result.getName()
typ = result.getType()
if not check_dependencies._storage.get(name):
check_dependencies._storage[name] = typ
def _check_deps(stack, obj):
"""Push dependencies of `obj` in `stack` recursively."""
for dep in obj.getDependencies():
if dep not in stack:
stack.append(dep)
_check_deps(stack, dep)
deps = []
_check_deps(deps, result)
# print("#27406:", name, typ, len(deps))
all_deps = [i.getName() for i in deps]
dump = dump_datastructure(result)
dump_deps = [i for i in check_dependencies._storage.keys() if i != name and i in dump]
direct_deps = [i.getName() for i in result.getDependencies()]
printed = False
missed = sorted(list(set(dump_deps).difference(direct_deps)))
if missed:
missed_all = sorted(list(set(dump_deps).difference(all_deps)))
missed = sorted(list(set(missed).difference(missed_all)))
printed = True
if missed:
print("#27406:", name, typ, "missing direct dep to", missed, "current:", direct_deps)
if missed_all:
print("#27406:", name, typ, "ERROR missing recursive dep to", missed_all)
# else:
# print("#27406:", name, typ, "INFO direct deps:", direct_deps)
nodirect = sorted(list(set(direct_deps).difference(dump_deps)))
if nodirect:
printed = True
print("#27406:", name, typ, "unnecessary direct dep to", nodirect)
if not printed:
print("#27406:", name, typ)
[docs]def register_context(ctxt, storage):
"""Register context as existing objects.
Arguments:
ctxt (dict): Context/dict containing objects to be registered.
storage (dict): Dict that stores the type of each object.
"""
for result in ctxt.values():
if not isinstance(result, DataStructure):
continue
name = result.getName()
typ = result.getType()
if not storage.get(name):
storage[name] = typ
# used to force to show children commands
# from ..Utilities import ExecutionParameter, Options
# ExecutionParameter().enable(Options.ShowChildCmd)
[docs]class DebugArgs:
"""Debugging helper"""
#: bool: to be raised only once
raised = False
#: name of the pickle file
filename = "debug_trace.pick"
[docs] @classmethod
def pickle_on_error(cls, method):
"""Decorator to pickle the args in case of error."""
@wraps(method)
def wrapper(inst, *args, **kwds):
"""wrapper"""
try:
arg0 = inst.copy()
retvalue = method(inst, *args, **kwds)
except Exception:
if not cls.raised:
print(f"pickling traces into {cls.filename}")
with open(cls.filename, "wb") as pick:
print(f"# --- trace arguments of '{method.__name__}':")
print(repr(arg0))
pickle.dump(arg0, pick)
print("--- changed ---")
print(repr(inst))
pickle.dump(inst, pick)
for obj in args:
pickle.dump(obj, pick)
print(repr(obj))
cls.raised = True
raise
return retvalue
return wrapper
[docs] @classmethod
def reset(cls):
"""Reset state"""
cls.raised = False
[docs]class DebugChrono:
"""Helper to measure elapsed time."""
data = []
[docs] @classmethod
@contextmanager
def measure(cls, title):
"""Measure elapsed time."""
t0 = time.time()
yield
elapsed = time.time() - t0
cls.data.append([title, elapsed])
print(f"elapsed time: {title}: {elapsed:.6f}", flush=True)
[docs] @classmethod
def save(cls, filename):
"""Save data into a pickle file."""
with open(filename, "wb") as pick:
pickle.dump(cls.data, pick)