# coding=utf-8
# --------------------------------------------------------------------
# Copyright (C) 1991 - 2026 - EDF - www.code-aster.org
# This file is part of code_aster.
#
# code_aster is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# code_aster is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with code_aster. If not, see <http://www.gnu.org/licenses/>.
# --------------------------------------------------------------------
"""
:py:mod:`run` --- Main classes for execution
--------------------------------------------
This module defines the objects that prepare the working directory, copy the
data files, execute code_aster and copy the result files.
"""
import os
import os.path as osp
import tempfile
from glob import glob
from pathlib import Path
from subprocess import PIPE, run
from .command_files import (
add_coding_line,
add_import_commands,
change_procdir,
file_changed,
stop_at_end,
)
from .config import CFG
from .logger import WARNING, logger
from .status import StateOptions, Status, get_status
from .timer import Timer
from .utils import (
RUNASTER_PLATFORM,
RUNASTER_ROOT,
PercTemplate,
cmd_abspath,
compress,
copy,
make_writable,
run_command,
uncompress,
)
EXITCODE_FILE = "_exit_code_"
TMPMESS = "fort.6"
FMT_DIAG = """
------------------------------------------------------------
--- DIAGNOSTIC JOB : {state}
------------------------------------------------------------
"""
[docs]def create_temporary_dir(dir=None):
"""Create a temporary directory.
Returns:
str: Path of the directory.
"""
dir = dir or os.environ.get("ASTER_WORKDIR", CFG.get("tmpdir"))
if dir:
os.makedirs(dir, exist_ok=True)
return tempfile.mkdtemp(prefix="run_aster_", dir=dir)
[docs]class RunAster:
"""Execute code_aster from a `.export` file.
Arguments:
export (Export): Export object defining the calculation.
"""
_show_comm = True
_chg_procdir = False
[docs] @classmethod
def factory(
cls,
export,
test=False,
env=False,
tee=False,
output=None,
interactive=False,
exectool=None,
savedb=None,
proc0id=0,
):
"""Return a *RunAster* object from an *Export* object.
Arguments:
export (Export): Export object defining the calculation.
test (bool): for a testcase,
env (bool): to only prepare the working directory and show
command lines to be run,
tee (bool): to follow execution output,
output (str): Path to redirect stdout.
interactive (bool): to keep Python interpreter active.
exectool (str): command that preceeds code_aster command line.
savedb (bool): tell if the database should be saved.
proc0id (int): id of the process that replaces the proc #0.
"""
class_ = RunAster
if env:
class_ = RunOnlyEnv
return class_(export, test, tee, output, interactive, exectool, savedb, proc0id)
[docs] def __init__(
self,
export,
test=False,
tee=False,
output=None,
interactive=False,
exectool=None,
savedb=None,
proc0id=0,
):
self.export = export
self.jobnum = str(os.getpid())
logger.debug("Export content: %s", self.export.filename)
logger.debug("\n%s", self.export)
self._parallel = CFG.get("parallel", 0)
self._test = test
self._tee = tee
self._output = output or TMPMESS
self._interact = interactive
self._exectool = exectool
if self.export.get("hide-command"):
self._show_comm = False
self._proc0id = proc0id
procid = 0
if self._parallel:
procid = get_procid()
procid = max(procid, 0)
self._procid = procid
# multi-steps study
if not export.has_param("step"):
export.set("step", 0)
if not export.has_param("nbsteps"):
export.set("nbsteps", len(self.export.commfiles))
self._last = export.get("step") + 1 == export.get("nbsteps")
self._savdb = savedb or bool([i for i in export.resultfiles if i.filetype == "base"])
[docs] def execute(self, workdir):
"""Execution in a working directory.
Arguments:
workdir (str): Working directory.
Returns:
Status: Status object.
"""
if self._parallel:
workdir = osp.join(workdir, f"proc.{self._procid}")
os.makedirs(workdir, exist_ok=True)
os.chdir(workdir)
status = self._execute()
return status
[docs] def _execute(self):
"""Execution in the current working directory.
Returns:
Status: Status object.
"""
timer = Timer()
timer.load("__timer__")
logger.info("TITLE Execution of code_aster")
timer.start("Preparation of environment")
self.prepare_current_directory()
timer.stop()
timer.start("Execution of code_aster")
status = self.execute_study()
timer.stop()
if self._last or not status.is_completed():
timer.start("Copying results")
self.ending_execution(status.results_saved())
logger.info("TITLE Execution summary")
logger.info(timer.report())
if self._procid == self._proc0id:
logger.info(FMT_DIAG.format(state=status.diag))
timer.save("__timer__")
return status
[docs] def prepare_current_directory(self):
"""Prepare the working directory."""
logger.info("TITLE Prepare environment in %s", os.getcwd())
self.export.write_to(self.jobnum + ".export")
os.makedirs("REPE_IN", exist_ok=True)
os.makedirs("REPE_OUT", exist_ok=True)
[docs] def execute_study(self):
"""Execute the study.
Returns:
Status: Status object.
"""
commfiles = [obj.path for obj in self.export.commfiles]
if not commfiles:
logger.error("no .comm file found")
if self.export.get("nbsteps") > 1:
os.makedirs("BASE_PREC", exist_ok=True)
timeout = self.export.get("time_limit", 86400) * 1.25 * self.export.get("ncpus", 1)
status = Status()
comm = commfiles[0]
logger.info(
"TITLE Command file #{0} / {1}".format(
self.export.get("step") + 1, self.export.get("nbsteps")
)
)
comm = self.change_comm_file(comm)
status.update(self._exec_one(comm, timeout - status.times[-1]))
self._coredump_analysis()
return status
[docs] def _exec_one(self, comm, timeout):
"""Show instructions for a command file.
Arguments:
comm (str): Command file name.
timeout (float): Remaining time.
Returns:
Status: Status object.
"""
idx = self.export.get("step")
logger.info("TITLE Command line #%d:", idx + 1)
timeout = int(max(1, timeout))
cmd = self._get_cmdline(idx, comm, timeout)
logger.info(" %s", " ".join(cmd))
set_num_threads(self.export.get("ncpus", 1))
# use environment variable to make it works with ipython
os.environ["PYTHONFAULTHANDLER"] = "1"
exitcode = run_command(cmd, exitcode_file=EXITCODE_FILE)
status = self._get_status(exitcode)
# workaround on windows to write the output into the '.mess' file running by ctest
if RUNASTER_PLATFORM == "win" and not self._tee:
with open(self._output, "rb") as fobj:
text = fobj.read().decode(errors="replace")
logger.info(text)
msg = f"\nEXECUTION_CODE_ASTER_EXIT_{self.jobnum}={status.exitcode}\n\n"
logger.info(msg)
self._log_mess(msg)
if status.results_saved():
if not self._last and status.is_completed():
for vola in glob("vola.*"):
os.remove(vola)
logger.info("saving result databases to 'BASE_PREC'...")
for base in glob("glob.*") + glob("pick.*"):
copy(base, "BASE_PREC")
msg = f"execution ended (command file #{idx + 1}): {status.diag}"
logger.info(msg)
else:
logger.info("restoring result databases from 'BASE_PREC'...")
for base in glob(osp.join("BASE_PREC", "*")):
copy(base, os.getcwd())
msg = f"execution failed (command file #{idx + 1}): {status.diag}"
logger.warning(msg)
if self._procid == self._proc0id:
self._log_mess(FMT_DIAG.format(state=status.diag))
return status
[docs] def _get_cmdline_exec(self, commfile, idx):
"""Build the command line really executed, without redirection.
Arguments:
commfile (str): Command file name.
idx (int): Index of execution.
Returns:
list[str]: List of command line arguments, without redirection.
"""
cmd = []
if self._exectool:
cmd.append(self._exectool)
wrapped = False
python = CFG.get("python")
if not self._interact:
# absolute path is necessary to call the debugger
cmd.append(cmd_abspath(python))
if self._parallel:
# see documentation of `mpi4py.run`
cmd.extend(["-m", "mpi4py"])
else:
cmd.append(CFG.get("python_interactive", python))
wrapped = CFG.get("python_interactive_is_wrapped")
# 'python3 -m mpi4py -i' does not work
cmd.append("-i")
# To show executed lines with trace module:
# import sys
# ign = [sys.prefix, sys.exec_prefix, "$HOME/.local", os.getenv("PYTHONPATH")]
# cmd.extend(["-m", "trace", "--trace",
# "--ignore-dir=" + ":".join(ign)])
cmd.append(f'"{commfile}"')
if wrapped:
cmd.append("--")
# remaining arguments are treated by code_aster script
if self._interact:
cmd.append("--interactive_interpreter")
if self._test:
cmd.append("--test")
if self._last:
cmd.append("--last")
if self._savdb:
cmd.append("--save_db")
# copy datafiles only the first time because all share the same workdir
if idx == 0:
for obj in self.export.datafiles:
cmd.append(f'--link="{obj.as_argument}"')
cmd.extend(self.export.args)
# TODO add pid + mode to identify the process by asrun
return cmd
[docs] def _coredump_analysis(self):
"""Process the coredump file."""
core = glob("core*")
if not core:
return
logger.info("\ncoredump analysis...")
python3 = cmd_abspath(CFG.get("python"))
if not osp.isfile(python3):
logger.warning("'python3' not found in PATH.")
return
tmpf = "cmd_gdb.sh"
with open(tmpf, "w") as fobj:
fobj.write(os.linesep.join(["where", "quit", ""]))
# may be required if gdb is linked against a different libpython
bck = os.environ.pop("PYTHONHOME", None)
cmd = ["gdb", "-batch", "-x", tmpf, "-e", python3, "-c", core[0]]
run_command(cmd)
if bck:
os.environ["PYTHONHOME"] = bck
[docs] def _get_cmdline(self, idx, commfile, timeout):
"""Build the command line.
Arguments:
idx (int): Index of execution.
commfile (str): Command file name.
timeout (float): Remaining time.
Returns:
list[str]: List of command line arguments.
"""
cmd = self._get_cmdline_exec(commfile, idx)
if self._interact:
if not Path(self._output).exists():
Path(self._output).touch()
elif self._tee:
orig = " ".join(cmd)
if RUNASTER_PLATFORM == "linux":
cmd = [f"( {orig} ; echo $? > {EXITCODE_FILE} )", "2>&1"]
else:
cmd = [f"( {orig} & echo %errorlevel% > {EXITCODE_FILE} )"]
cmd.extend(["|", "tee", "-a", self._output])
else:
cmd.extend([">>", self._output, "2>&1"])
if RUNASTER_PLATFORM == "linux":
cmd.insert(0, f"ulimit -c unlimited ; ulimit -t {timeout:.0f} ;")
return cmd
[docs] def _get_status(self, exitcode):
"""Get the execution status.
Arguments:
exitcode (int): Return code.
Returns:
Status: Status object.
"""
status = get_status(exitcode, self._output, test=self._test and self._last)
expected = self.export.get("expected_diag", [])
if status.diag in expected:
logger.debug(f"Original status: {status.diag} reset to OK")
status.state = StateOptions.Ok
status.exitcode = 0
# else the status unchanged
return status
[docs] def ending_execution(self, results_saved):
"""Post execution phase : copying results, cleanup...
Arguments:
results_saved (bool): *True* if execution did not abort and may have
created results, *False* otherwise.
"""
logger.info("TITLE Content of %s after execution:", os.getcwd())
logger.info(_ls(".", "REPE_OUT"))
if self._procid != self._proc0id:
return
results = self.export.resultfiles
if results:
logger.info("TITLE Copying results")
copy_resultfiles(results, results_saved, test=self._test)
[docs] def change_comm_file(self, comm):
"""Change a command file.
Arguments:
comm (str): Command file name.
Returns:
str: Name of the file to be executed.
"""
with open(comm, "rb") as fobj:
text_init = fobj.read().decode(errors="replace")
text = text_init
if self._chg_procdir:
text = change_procdir(text)
text = add_import_commands(text)
if self._interact:
text = stop_at_end(text, last=self._last)
changed = text.strip() != text_init.strip()
if changed:
text = file_changed(text, comm)
text = add_coding_line(text)
if self._show_comm:
logger.info("\nContent of the file to execute:\n%s\n", text)
if not changed:
return comm
filename = osp.basename(comm) + ".changed.py"
with open(filename, "wb") as fobj:
fobj.write(text.encode())
return filename
[docs] def _log_mess(self, msg):
"""Log a message into the *message* file."""
with open(self._output, "a") as fobj:
fobj.write(msg + "\n")
[docs]class RunOnlyEnv(RunAster):
"""Prepare a working directory for a manual execution.
Arguments:
export (Export): Export object defining the calculation.
"""
_show_comm = False
_chg_procdir = True
[docs] def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self._procid != self._proc0id:
logger.setLevel(WARNING)
[docs] def prepare_current_directory(self):
"""Prepare the working directory."""
if self.export.get("step") == 0:
super().prepare_current_directory()
[docs] def execute_study(self):
"""Execute the study.
Returns:
Status: Status object.
"""
if self.export.get("step") == 0:
logger.info("TITLE Copy/paste these command lines:")
profile = osp.join(RUNASTER_ROOT, "share", "aster", "profile.sh")
timeout = self.export.get("time_limit", 0) * 1.25
if not self._parallel:
logger.info(" cd %s", os.getcwd())
else:
logger.info(" cd %s", osp.dirname(os.getcwd()))
logger.info(" . %s", profile)
logger.info(" ulimit -c unlimited")
logger.info(" ulimit -t %.0f", timeout)
return super().execute_study()
[docs] def _exec_one(self, comm, timeout):
"""Show instructions for a command file.
Arguments:
comm (str): Command file name.
timeout (float): Remaining time.
"""
is_ok = Status(StateOptions.Ok, exitcode=0)
if self._procid != self._proc0id:
return is_ok
idx = self.export.get("step")
base_cmd = self._get_cmdline_exec("proc.0/" + comm, idx)
if not self._parallel:
logger.info(" ".join(base_cmd))
return is_ok
nbthread = self.export.get("ncpus", 1)
_gen_cmd_file(f"cmd{idx}.sh", [" ".join(base_cmd)], nbthread)
shell = f"cmd{idx}.sh"
args_cmd = dict(mpi_nbcpu=self.export.get("mpi_nbcpu", 1), program="proc.0/" + shell)
cmd = CFG.get("mpiexec").format(**args_cmd)
logger.info(" " + cmd)
shell = f"ddt_cmd{idx}.sh"
_gen_ddt_template(shell, cmd, idx, nbthread)
logger.info("Run with DDT using:")
logger.info(" " + "proc.0/" + shell)
return is_ok
[docs] def ending_execution(self, _):
"""Nothing to do in this case."""
[docs]def get_procid():
"""Return the identifier of the current process.
Returns:
int: Process ID, -1 if a parallel version is not run under *mpiexec/srun*.
"""
proc = run(CFG.get("mpi_get_rank"), shell=True, stdout=PIPE, universal_newlines=True)
try:
procid = int(proc.stdout.strip())
except ValueError:
procid = -1
return procid
[docs]def get_nbcores():
"""Return the number of available cores.
Returns:
int: Number of cores.
"""
return os.cpu_count()
[docs]def set_num_threads(value):
"""Define the number of threads to be used by OpenMP, MKL and OpenBLAS.
Arguments:
value (int): Maximum number of threads.
"""
# openblas_set_num_threads does not work, maybe conflicts with numpy init
# --numthreads option could be removed
os.environ["OMP_NUM_THREADS"] = str(value)
# mkl and openblas should used the same value if they are not defined.
[docs]def copy_datafiles(files, verbose=True):
"""Copy data files into the working directory.
Arguments:
files (list[File]): List of File objects.
verbose (bool): Verbosity.
"""
for obj in files:
dest = None
# fort.*
if obj.unit != 0 or obj.filetype == "nom":
if obj.unit in (6, 15):
raise IndexError(
"Files fort.6 and fort.15 are reserved.\n"
"Please change unit number for: {}".format(obj.path)
)
dest = "fort." + str(obj.unit)
if obj.filetype == "nom":
dest = osp.basename(obj.path)
# warning if file already exists
if osp.exists(dest):
logger.warning("%r overwrites %r", obj.path, dest)
if obj.compr:
dest += ".gz"
# for directories
else:
if obj.filetype == "base":
dest = osp.basename(obj.path)
elif obj.filetype == "repe":
dest = "REPE_IN"
if dest is not None:
copy(obj.path, dest, verbose=verbose)
if obj.compr:
dest = uncompress(dest)
# move the bases in main directory
if obj.filetype == "base":
for fname in glob(osp.join(dest, "*")):
os.rename(fname, osp.basename(fname))
# force the file to be writable
make_writable(dest)
[docs]def copy_resultfiles(files, copybase, test=False):
"""Copy result files from the working directory.
Arguments:
files (list[File]): List of File objects.
copybase (bool): Tell if result databases will be copied.
test (bool, optional): *True* for a testcase, *False* for a study.
"""
for obj in files:
if test and obj.unit not in (6, 15):
continue
lsrc = []
# fort.*
if obj.unit != 0:
lsrc.append("fort." + str(obj.unit))
elif obj.filetype == "nom":
lsrc.append(osp.basename(obj.path))
# for directories
else:
if copybase and obj.filetype == "base":
lsrc.extend(glob("glob.*"))
lsrc.extend(glob("pick.*"))
elif obj.filetype == "repe":
lsrc.extend(glob(osp.join("REPE_OUT", "*")))
for filename in lsrc:
if not osp.exists(filename):
logger.warning("file not found: %s", filename)
else:
if obj.compr:
filename = compress(filename)
if obj.isdir and not osp.exists(obj.path):
os.makedirs(obj.path)
dst = obj.path
if osp.isdir(filename) and obj.filetype != "nom":
dst = osp.join(obj.path, osp.basename(filename))
copy(filename, dst, verbose=True)
def _ls(*paths):
if RUNASTER_PLATFORM == "linux":
proc = run(["ls", "-l"] + list(paths), stdout=PIPE, universal_newlines=True)
else:
proc = run(["dir"] + list(paths), stdout=PIPE, universal_newlines=True, shell=True)
return proc.stdout
def _gen_cmd_file(filename, lines, nbthread):
cmd = []
cmd.append("#!/bin/bash")
cmd.append(f"export OMP_NUM_THREADS={nbthread}")
cmd.append("\n".join(lines))
cmd.append("")
with open(filename, "w") as fobj:
fobj.write("\n".join(cmd))
os.chmod(filename, 0o755)
def _gen_ddt_template(filename, cmd, idx, nbthread):
redir = ">" if idx == 0 else ">>"
with open(Path(RUNASTER_ROOT) / "share" / "aster" / "ddt_wrapper.tmpl") as ftmpl:
script = PercTemplate(ftmpl.read()).substitute(
command=cmd, redirect_to=redir, nbthread=nbthread
)
with open(filename, "w") as fobj:
fobj.write(script)
os.chmod(filename, 0o755)