Source code for logilab.common.proc

# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of logilab-common.
#
# logilab-common is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option) any
# later version.
#
# logilab-common is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with logilab-common.  If not, see <http://www.gnu.org/licenses/>.
"""module providing:
* process information (linux specific: rely on /proc)
* a class for resource control (memory / time / cpu time)

This module doesn't work on windows platforms (only tested on linux)

:organization: Logilab



"""
__docformat__ = "restructuredtext en"

import os
import stat
from resource import getrlimit, setrlimit, RLIMIT_CPU, RLIMIT_AS
from signal import signal, SIGXCPU, SIGKILL, SIGUSR2, SIGUSR1
from threading import Timer, currentThread, Thread, Event
from time import time

from logilab.common.tree import Node


[docs]class NoSuchProcess(Exception): pass
[docs]def proc_exists(pid): """check the a pid is registered in /proc raise NoSuchProcess exception if not """ if not os.path.exists(f"/proc/{pid}"): raise NoSuchProcess()
PPID = 3 UTIME = 13 STIME = 14 CUTIME = 15 CSTIME = 16 VSIZE = 22
[docs]class ProcInfo(Node): """provide access to process information found in /proc""" def __init__(self, pid): self.pid = int(pid) Node.__init__(self, self.pid) proc_exists(self.pid) self.file = f"/proc/{self.pid}/stat" self.ppid = int(self.status()[PPID])
[docs] def memory_usage(self): """return the memory usage of the process in Ko""" try: return int(self.status()[VSIZE]) except OSError: return 0
[docs] def lineage_memory_usage(self): return self.memory_usage() + sum([child.lineage_memory_usage() for child in self.children])
[docs] def time(self, children=0): """return the number of jiffies that this process has been scheduled in user and kernel mode""" status = self.status() time = int(status[UTIME]) + int(status[STIME]) if children: time += int(status[CUTIME]) + int(status[CSTIME]) return time
[docs] def status(self): """return the list of fields found in /proc/<pid>/stat""" return open(self.file).read().split()
[docs] def name(self): """return the process name found in /proc/<pid>/stat""" return self.status()[1].strip("()")
[docs] def age(self): """return the age of the process""" return os.stat(self.file)[stat.ST_MTIME]
[docs]class ProcInfoLoader: """manage process information""" def __init__(self): self._loaded = {}
[docs] def list_pids(self): """return a list of existent process ids""" for subdir in os.listdir("/proc"): if subdir.isdigit(): yield int(subdir)
[docs] def load(self, pid): """get a ProcInfo object for a given pid""" pid = int(pid) try: return self._loaded[pid] except KeyError: procinfo = ProcInfo(pid) procinfo.manager = self self._loaded[pid] = procinfo return procinfo
[docs] def load_all(self): """load all processes information""" for pid in self.list_pids(): try: procinfo = self.load(pid) if procinfo.parent is None and procinfo.ppid: pprocinfo = self.load(procinfo.ppid) pprocinfo.append(procinfo) except NoSuchProcess: pass
[docs]class ResourceError(Exception): """Error raise when resource limit is reached""" limit = "Unknown Resource Limit"
[docs]class XCPUError(ResourceError): """Error raised when CPU Time limit is reached""" limit = "CPU Time"
[docs]class LineageMemoryError(ResourceError): """Error raised when the total amount of memory used by a process and it's child is reached""" limit = "Lineage total Memory"
[docs]class TimeoutError(ResourceError): """Error raised when the process is running for to much time""" limit = "Real Time"
# Can't use subclass because the StandardError MemoryError raised RESOURCE_LIMIT_EXCEPTION = (ResourceError, MemoryError)
[docs]class MemorySentinel(Thread): """A class checking a process don't use too much memory in a separated daemonic thread """ def __init__(self, interval, memory_limit, gpid=None): Thread.__init__(self, target=self._run, name="Test.Sentinel") self.memory_limit = memory_limit self._stop = Event() self.interval = interval self.setDaemon(True) self.gpid = gpid if gpid is not None else os.getpid()
[docs] def stop(self): """stop ap""" self._stop.set()
def _run(self): pil = ProcInfoLoader() while not self._stop.isSet(): if self.memory_limit <= pil.load(self.gpid).lineage_memory_usage(): os.killpg(self.gpid, SIGUSR1) self._stop.wait(self.interval)
[docs]class ResourceController: def __init__(self, max_cpu_time=None, max_time=None, max_memory=None, max_reprieve=60): if SIGXCPU == -1: raise RuntimeError("Unsupported platform") self.max_time = max_time self.max_memory = max_memory self.max_cpu_time = max_cpu_time self._reprieve = max_reprieve self._timer = None self._msentinel = None self._old_max_memory = None self._old_usr1_hdlr = None self._old_max_cpu_time = None self._old_usr2_hdlr = None self._old_sigxcpu_hdlr = None self._limit_set = 0 self._abort_try = 0 self._start_time = None self._elapse_time = 0 def _hangle_sig_timeout(self, sig, frame): raise TimeoutError() def _hangle_sig_memory(self, sig, frame): if self._abort_try < self._reprieve: self._abort_try += 1 raise LineageMemoryError("Memory limit reached") else: os.killpg(os.getpid(), SIGKILL) def _handle_sigxcpu(self, sig, frame): if self._abort_try < self._reprieve: self._abort_try += 1 raise XCPUError("Soft CPU time limit reached") else: os.killpg(os.getpid(), SIGKILL) def _time_out(self): if self._abort_try < self._reprieve: self._abort_try += 1 os.killpg(os.getpid(), SIGUSR2) if self._limit_set > 0: self._timer = Timer(1, self._time_out) self._timer.start() else: os.killpg(os.getpid(), SIGKILL)
[docs] def setup_limit(self): """set up the process limit""" assert currentThread().getName() == "MainThread" os.setpgrp() if self._limit_set <= 0: if self.max_time is not None: self._old_usr2_hdlr = signal(SIGUSR2, self._hangle_sig_timeout) self._timer = Timer(max(1, int(self.max_time) - self._elapse_time), self._time_out) self._start_time = int(time()) self._timer.start() if self.max_cpu_time is not None: self._old_max_cpu_time = getrlimit(RLIMIT_CPU) cpu_limit = (int(self.max_cpu_time), self._old_max_cpu_time[1]) self._old_sigxcpu_hdlr = signal(SIGXCPU, self._handle_sigxcpu) setrlimit(RLIMIT_CPU, cpu_limit) if self.max_memory is not None: self._msentinel = MemorySentinel(1, int(self.max_memory)) self._old_max_memory = getrlimit(RLIMIT_AS) self._old_usr1_hdlr = signal(SIGUSR1, self._hangle_sig_memory) as_limit = (int(self.max_memory), self._old_max_memory[1]) setrlimit(RLIMIT_AS, as_limit) self._msentinel.start() self._limit_set += 1
[docs] def clean_limit(self): """reinstall the old process limit""" if self._limit_set > 0: if self.max_time is not None: self._timer.cancel() self._elapse_time += int(time()) - self._start_time self._timer = None signal(SIGUSR2, self._old_usr2_hdlr) if self.max_cpu_time is not None: setrlimit(RLIMIT_CPU, self._old_max_cpu_time) signal(SIGXCPU, self._old_sigxcpu_hdlr) if self.max_memory is not None: self._msentinel.stop() self._msentinel = None setrlimit(RLIMIT_AS, self._old_max_memory) signal(SIGUSR1, self._old_usr1_hdlr) self._limit_set -= 1