# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of logilab-common.
#
# logilab-common is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option) any
# later version.
#
# logilab-common is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with logilab-common. If not, see <http://www.gnu.org/licenses/>.
"""Classes to handle advanced configuration in simple to complex applications.
Allows to load the configuration from a file or from command line
options, to generate a sample configuration file or to display
program's usage. Fills the gap between optik/optparse and ConfigParser
by adding data types (which are also available as a standalone optik
extension in the `optik_ext` module).
Quick start: simplest usage
---------------------------
.. ::
>>> import sys
>>> from logilab.common.configuration import Configuration
>>> options = [('dothis', {'type':'yn', 'default': True, 'metavar': '<y or n>'}),
... ('value', {'type': 'string', 'metavar': '<string>'}),
... ('multiple', {'type': 'csv', 'default': ('yop',),
... 'metavar': '<comma separated values>',
... 'help': 'you can also document the option'}),
... ('number', {'type': 'int', 'default':2, 'metavar':'<int>'}),
... ]
>>> config = Configuration(options=options, name='My config')
>>> print config['dothis']
True
>>> print config['value']
None
>>> print config['multiple']
('yop',)
>>> print config['number']
2
>>> print config.help()
Usage: [options]
Options:
-h, --help show this help message and exit
--dothis=<y or n>
--value=<string>
--multiple=<comma separated values>
you can also document the option [current: none]
--number=<int>
>>> f = open('myconfig.ini', 'w')
>>> f.write('''[MY CONFIG]
... number = 3
... dothis = no
... multiple = 1,2,3
... ''')
>>> f.close()
>>> config.load_file_configuration('myconfig.ini')
>>> print config['dothis']
False
>>> print config['value']
None
>>> print config['multiple']
['1', '2', '3']
>>> print config['number']
3
>>> sys.argv = ['mon prog', '--value', 'bacon', '--multiple', '4,5,6',
... 'nonoptionargument']
>>> print config.load_command_line_configuration()
['nonoptionargument']
>>> print config['value']
bacon
>>> config.generate_config()
# class for simple configurations which don't need the
# manager / providers model and prefer delegation to inheritance
#
# configuration values are accessible through a dict like interface
#
[MY CONFIG]
dothis=no
value=bacon
# you can also document the option
multiple=4,5,6
number=3
Note : starting with Python 2.7 ConfigParser is able to take into
account the order of occurrences of the options into a file (by
using an OrderedDict). If you have two options changing some common
state, like a 'disable-all-stuff' and a 'enable-some-stuff-a', their
order of appearance will be significant : the last specified in the
file wins. For earlier version of python and logilab.common newer
than 0.61 the behaviour is unspecified.
"""
__docformat__ = "restructuredtext en"
__all__ = (
"OptionsManagerMixIn",
"OptionsProviderMixIn",
"ConfigurationMixIn",
"Configuration",
"OptionsManager2ConfigurationAdapter",
)
import os
import sys
import re
from difflib import get_close_matches
from os.path import exists, expanduser
from optparse import OptionGroup
from copy import copy
from _io import StringIO, TextIOWrapper
from typing import Any, Optional, Union, Dict, List, Tuple, Iterator, Callable
import configparser as cp
from logilab.common.types import OptionParser, Option, attrdict
from logilab.common.textutils import normalize_text, unquote
from logilab.common import optik_ext
OptionError = optik_ext.OptionError
REQUIRED: List = []
class UnsupportedAction(Exception):
"""raised by set_option when it doesn't know what to do for an action"""
def _get_encoding(encoding: Optional[str], stream: Union[StringIO, TextIOWrapper]) -> str:
encoding = encoding or getattr(stream, "encoding", None)
if not encoding:
import locale
encoding = locale.getpreferredencoding()
return encoding
_ValueType = Union[List[str], Tuple[str, ...], str]
# validation functions ########################################################
# validators will return the validated value or raise optparse.OptionValueError
# XXX add to documentation
def choice_validator(optdict: Dict[str, Any], name: str, value: str) -> str:
"""validate and return a converted value for option of type 'choice'"""
if value not in optdict["choices"]:
msg = "option %s: invalid value: %r, should be in %s"
raise optik_ext.OptionValueError(msg % (name, value, optdict["choices"]))
return value
def multiple_choice_validator(optdict: Dict[str, Any], name: str, value: _ValueType) -> _ValueType:
"""validate and return a converted value for option of type 'choice'"""
choices = optdict["choices"]
values = optik_ext.check_csv(None, name, value)
for value in values:
if value not in choices:
msg = "option %s: invalid value: %r, should be in %s"
raise optik_ext.OptionValueError(msg % (name, value, choices))
return values
def csv_validator(optdict: Dict[str, Any], name: str, value: _ValueType) -> _ValueType:
"""validate and return a converted value for option of type 'csv'"""
return optik_ext.check_csv(None, name, value)
def yn_validator(optdict: Dict[str, Any], name: str, value: Union[bool, str]) -> bool:
"""validate and return a converted value for option of type 'yn'"""
return optik_ext.check_yn(None, name, value)
def named_validator(
optdict: Dict[str, Any], name: str, value: Union[Dict[str, str], str]
) -> Dict[str, str]:
"""validate and return a converted value for option of type 'named'"""
return optik_ext.check_named(None, name, value)
def file_validator(optdict, name, value):
"""validate and return a filepath for option of type 'file'"""
return optik_ext.check_file(None, name, value)
def color_validator(optdict, name, value):
"""validate and return a valid color for option of type 'color'"""
return optik_ext.check_color(None, name, value)
def password_validator(optdict, name, value):
"""validate and return a string for option of type 'password'"""
return optik_ext.check_password(None, name, value)
def date_validator(optdict, name, value):
"""validate and return a mx DateTime object for option of type 'date'"""
return optik_ext.check_date(None, name, value)
def time_validator(optdict, name, value):
"""validate and return a time object for option of type 'time'"""
return optik_ext.check_time(None, name, value)
def bytes_validator(optdict: Dict[str, str], name: str, value: Union[int, str]) -> int:
"""validate and return an integer for option of type 'bytes'"""
return optik_ext.check_bytes(None, name, value)
VALIDATORS: Dict[str, Callable] = {
"string": unquote,
"int": int,
"float": float,
"file": file_validator,
"font": unquote,
"color": color_validator,
"regexp": re.compile,
"csv": csv_validator,
"yn": yn_validator,
"bool": yn_validator,
"named": named_validator,
"password": password_validator,
"date": date_validator,
"time": time_validator,
"bytes": bytes_validator,
"choice": choice_validator,
"multiple_choice": multiple_choice_validator,
}
def _call_validator(
opttype: str, optdict: Dict[str, Any], option: str, value: Union[List[str], int, str]
) -> Union[List[str], int, str]:
if opttype not in VALIDATORS:
all_validators = "\n - ".join(sorted(VALIDATORS.keys()))
raise Exception(f'Unsupported type "{opttype}", supported types are:\n - {all_validators}')
try:
return VALIDATORS[opttype](optdict, option, value)
except TypeError:
try:
return VALIDATORS[opttype](value)
except optik_ext.OptionValueError:
raise
except Exception:
raise optik_ext.OptionValueError(
f"{option} value ({value!r}) should be of type {opttype}"
)
# user input functions ########################################################
# user input functions will ask the user for input on stdin then validate
# the result and return the validated value or raise optparse.OptionValueError
# XXX add to documentation
def input_password(optdict, question="password:"):
from getpass import getpass
while True:
value = getpass(question)
value2 = getpass("confirm: ")
if value == value2:
return value
print("password mismatch, try again")
def input_string(optdict, question):
value = input(question).strip()
return value or None
def _make_input_function(opttype):
def input_validator(optdict, question):
while True:
value = input(question)
if not value.strip():
return None
try:
return _call_validator(opttype, optdict, None, value)
except optik_ext.OptionValueError as ex:
msg = str(ex).split(":", 1)[-1].strip()
print(f"bad value: {msg}")
return input_validator
INPUT_FUNCTIONS: Dict[str, Callable] = {
"string": input_string,
"password": input_password,
}
for opttype in VALIDATORS.keys():
INPUT_FUNCTIONS.setdefault(opttype, _make_input_function(opttype))
# utility functions ############################################################
def expand_default(self, option):
"""monkey patch OptionParser.expand_default since we have a particular
way to handle defaults to avoid overriding values in the configuration
file
"""
if self.parser is None or not self.default_tag:
return option.help
optname = option._long_opts[0][2:]
try:
provider = self.parser.options_manager._all_options[optname]
except KeyError:
value = None
else:
optdict = provider.get_option_def(optname)
optname = provider.option_attrname(optname, optdict)
value = getattr(provider.config, optname, optdict)
value = format_option_value(optdict, value)
if value is optik_ext.NO_DEFAULT or not value:
value = self.NO_DEFAULT_VALUE
return option.help.replace(self.default_tag, str(value))
def _validate(
value: Union[List[str], int, str], optdict: Dict[str, Any], name: str = ""
) -> Union[List[str], int, str]:
"""return a validated value for an option according to its type
optional argument name is only used for error message formatting
"""
try:
_type = optdict["type"]
except KeyError:
# FIXME
return value
return _call_validator(_type, optdict, name, value)
# format and output functions ##################################################
def comment(string):
"""return string as a comment"""
lines = [line.strip() for line in string.splitlines()]
return "# " + f"{os.linesep}# ".join(lines)
def format_time(value):
if not value:
return "0"
if value != int(value):
return f"{value:.2f}s"
value = int(value)
nbmin, nbsec = divmod(value, 60)
if nbsec:
return f"{value}s"
nbhour, nbmin_ = divmod(nbmin, 60)
if nbmin_:
return f"{nbmin}min"
nbday, nbhour_ = divmod(nbhour, 24)
if nbhour_:
return f"{nbhour}h"
return f"{nbday}d"
def format_bytes(value: int) -> str:
if not value:
return "0"
if value != int(value):
return f"{value:.2f}B"
value = int(value)
prevunit = "B"
for unit in ("KB", "MB", "GB", "TB"):
next, remain = divmod(value, 1024)
if remain:
return f"{value}{prevunit}"
prevunit = unit
value = next
return f"{value}{unit}"
def format_option_value(optdict: Dict[str, Any], value: Any) -> Union[None, int, str]:
"""return the user input's value from a 'compiled' value"""
if isinstance(value, (list, tuple)):
value = ",".join(value)
elif isinstance(value, dict):
value = ",".join([f"{k}:{v}" for k, v in value.items()])
elif hasattr(value, "match"): # optdict.get('type') == 'regexp'
# compiled regexp
value = value.pattern
elif optdict.get("type") == "yn":
value = value and "yes" or "no"
elif isinstance(value, str) and value.isspace():
value = f"'{value}'"
elif optdict.get("type") == "time" and isinstance(value, (float, int)):
value = format_time(value)
elif optdict.get("type") == "bytes" and hasattr(value, "__int__"):
value = format_bytes(value)
return value
def ini_format_section(
stream: Union[StringIO, TextIOWrapper],
section: str,
options: Any,
encoding: str = None,
doc: Optional[Any] = None,
) -> None:
"""format an options section using the INI format"""
encoding = _get_encoding(encoding, stream)
if doc:
print(str(comment(doc), encoding), file=stream)
print(f"[{section}]", file=stream)
ini_format(stream, options, encoding)
def ini_format(stream: Union[StringIO, TextIOWrapper], options: Any, encoding: str) -> None:
"""format options using the INI format"""
for optname, optdict, value in options:
value = format_option_value(optdict, value)
help = optdict.get("help")
if help:
help = normalize_text(help, line_len=79, indent="# ")
print(file=stream)
print(str(help), file=stream)
else:
print(file=stream)
if value is None:
print(f"#{optname}=", file=stream)
else:
value = str(value).strip()
if optdict.get("type") == "string" and "\n" in value:
prefix = "\n "
value = prefix + prefix.join(value.split("\n"))
print(f"{optname}={value}", file=stream)
format_section = ini_format_section
def rest_format_section(stream, section, options, encoding=None, doc=None):
"""format an options section using as ReST formatted output"""
encoding = _get_encoding(encoding, stream)
if section:
print("%s\n%s" % (section, "'" * len(section)), file=stream)
if doc:
print(str(normalize_text(doc, line_len=79, indent="")), file=stream)
print(file=stream)
for optname, optdict, value in options:
help = optdict.get("help")
print(f":{optname}:", file=stream)
if help:
help = normalize_text(help, line_len=79, indent=" ")
print(str(help), file=stream)
if value:
value = str(format_option_value(optdict, value))
print(file=stream)
print(f" Default: ``{value.replace('`` ', '```` ``')}``", file=stream)
# Options Manager ##############################################################
[docs]class OptionsManagerMixIn:
"""MixIn to handle a configuration from both a configuration file and
command line options
"""
def __init__(
self,
usage: Optional[str],
config_file: Optional[Any] = None,
version: Optional[Any] = None,
quiet: int = 0,
) -> None:
self.config_file = config_file
self.reset_parsers(usage, version=version)
# list of registered options providers
self.options_providers: List[ConfigurationMixIn] = []
# dictionary associating option name to checker
self._all_options: Dict[str, ConfigurationMixIn] = {}
self._short_options: Dict[str, str] = {}
self._nocallback_options: Dict[ConfigurationMixIn, str] = {}
self._mygroups: Dict[str, optik_ext.OptionGroup] = {}
# verbosity
self.quiet = quiet
self._maxlevel = 0
[docs] def reset_parsers(self, usage: Optional[str] = "", version: Optional[Any] = None) -> None:
# configuration file parser
self.cfgfile_parser = cp.ConfigParser()
# command line parser
self.cmdline_parser = optik_ext.OptionParser(usage=usage, version=version)
# mypy: "OptionParser" has no attribute "options_manager"
# dynamic attribute?
self.cmdline_parser.options_manager = self # type: ignore
self._optik_option_attrs = set(self.cmdline_parser.option_class.ATTRS)
[docs] def register_options_provider(
self, provider: "ConfigurationMixIn", own_group: bool = True
) -> None:
"""register an options provider"""
assert provider.priority <= 0, "provider's priority can't be >= 0"
for i in range(len(self.options_providers)):
if provider.priority > self.options_providers[i].priority:
self.options_providers.insert(i, provider)
break
else:
self.options_providers.append(provider)
# mypy: Need type annotation for 'option'
# you can't type variable of a list comprehension, right?
non_group_spec_options: List = [
option for option in provider.options if "group" not in option[1] # type: ignore
] # type: ignore
groups = getattr(provider, "option_groups", ())
if own_group and non_group_spec_options:
self.add_option_group(
provider.name.upper(), provider.__doc__, non_group_spec_options, provider
)
else:
for opt, optdict in non_group_spec_options:
self.add_optik_option(provider, self.cmdline_parser, opt, optdict)
for gname, gdoc in groups:
gname = gname.upper()
# mypy: Need type annotation for 'option'
# you can't type variable of a list comprehension, right?
goptions: List = [
option
for option in provider.options # type: ignore
if option[1].get("group", "").upper() == gname
] # type: ignore
self.add_option_group(gname, gdoc, goptions, provider)
[docs] def add_option_group(
self,
group_name: str,
doc: Optional[str],
options: Union[List[Tuple[str, Dict[str, Any]]], List[Tuple[str, Dict[str, str]]]],
provider: "ConfigurationMixIn",
) -> None:
"""add an option group including the listed options"""
assert options
# add option group to the command line parser
if group_name in self._mygroups:
group = self._mygroups[group_name]
else:
group = optik_ext.OptionGroup(self.cmdline_parser, title=group_name.capitalize())
self.cmdline_parser.add_option_group(group)
# mypy: "OptionGroup" has no attribute "level"
# dynamic attribute
group.level = provider.level # type: ignore
self._mygroups[group_name] = group
# add section to the config file
if group_name != "DEFAULT":
self.cfgfile_parser.add_section(group_name)
# add provider's specific options
for opt, optdict in options:
self.add_optik_option(provider, group, opt, optdict)
[docs] def add_optik_option(
self,
provider: "ConfigurationMixIn",
optikcontainer: Union[OptionParser, OptionGroup],
opt: str,
optdict: Dict[str, Any],
) -> None:
if "inputlevel" in optdict:
raise ValueError(
"optdict dictionary argument shouldn't contain the key 'inputlevel', rename it to "
f"'level' instead: {optdict}"
)
args, optdict = self.optik_option(provider, opt, optdict)
option = optikcontainer.add_option(*args, **optdict)
self._all_options[opt] = provider
self._maxlevel = max(self._maxlevel, option.level or 0)
[docs] def optik_option(
self, provider: "ConfigurationMixIn", opt: str, optdict: Dict[str, Any]
) -> Tuple[List[str], Dict[str, Any]]:
"""get our personal option definition and return a suitable form for
use with optik/optparse
"""
optdict = copy(optdict)
if "action" in optdict:
self._nocallback_options[provider] = opt
else:
optdict["action"] = "callback"
optdict["callback"] = self.cb_set_provider_option
# default is handled here and *must not* be given to optik if you
# want the whole machinery to work
if "default" in optdict:
if (
"help" in optdict
and optdict.get("default") is not None
and not optdict["action"] in ("store_true", "store_false")
):
optdict["help"] += " [current: %default]"
del optdict["default"]
args = ["--" + str(opt)]
if "short" in optdict:
self._short_options[optdict["short"]] = opt
args.append("-" + optdict["short"])
del optdict["short"]
# cleanup option definition dict before giving it to optik
for key in list(optdict.keys()):
if key not in self._optik_option_attrs:
optdict.pop(key)
return args, optdict
[docs] def cb_set_provider_option(
self, option: "Option", opt: str, value: Union[List[str], int, str], parser: "OptionParser"
) -> None:
"""optik callback for option setting"""
if opt.startswith("--"):
# remove -- on long option
opt = opt[2:]
else:
# short option, get its long equivalent
opt = self._short_options[opt[1:]]
# trick since we can't set action='store_true' on options
if value is None:
value = 1
self.global_set_option(opt, value)
[docs] def global_set_option(self, opt: str, value: Union[List[str], int, str]) -> None:
"""set option on the correct option provider"""
self._all_options[opt].set_option(opt, value)
[docs] def generate_config(
self,
stream: Union[StringIO, TextIOWrapper] = None,
skipsections: Tuple[()] = (),
encoding: Optional[Any] = None,
header_message: Optional[str] = None,
) -> None:
"""write a configuration file according to the current configuration
into the given stream or stdout
"""
options_by_section: Dict[Any, List] = {}
sections = []
for provider in self.options_providers:
for section, options in provider.options_by_section():
if section is None:
section = provider.name
if section in skipsections:
continue
options = [(n, d, v) for (n, d, v) in options if d.get("type") is not None]
if not options:
continue
if section not in sections:
sections.append(section)
alloptions = options_by_section.setdefault(section, [])
alloptions += options
stream = stream or sys.stdout
encoding = _get_encoding(encoding, stream)
printed = False
if header_message is not None:
# Make sure all the lines in header_message begin with '# '
# This is done by matching all the lines that do not start
# with '#' (possibly with whitespaces) and prefix them by '# '
exp = re.compile(r"^\s*(?=[^#])", re.MULTILINE)
commented_header = exp.sub("# ", header_message)
print(commented_header, file=stream)
for section in sections:
if printed:
print("\n", file=stream)
format_section(stream, section.upper(), options_by_section[section], encoding)
printed = True
[docs] def generate_manpage(
self, pkginfo: attrdict, section: int = 1, stream: StringIO = None
) -> None:
"""write a man page for the current configuration into the given
stream or stdout
"""
self._monkeypatch_expand_default()
try:
optik_ext.generate_manpage(
self.cmdline_parser,
pkginfo,
section,
stream=stream or sys.stdout,
level=self._maxlevel,
)
finally:
self._unmonkeypatch_expand_default()
# initialization methods ##################################################
[docs] def load_provider_defaults(self) -> None:
"""initialize configuration using default values"""
for provider in self.options_providers:
provider.load_defaults()
[docs] def load_file_configuration(self, config_file: str = None) -> None:
"""load the configuration from file"""
self.read_config_file(config_file)
self.load_config_file()
[docs] def read_config_file(self, config_file: str = None) -> None:
"""read the configuration file but do not load it (i.e. dispatching
values to each options provider)
"""
helplevel = 1
while helplevel <= self._maxlevel:
opt = "-".join(["long"] * helplevel) + "-help"
if opt in self._all_options:
break # already processed
def helpfunc(option, opt, val, p, level=helplevel):
print(self.help(level))
sys.exit(0)
helpmsg = f"{' '.join(['more'] * helplevel)} verbose help."
optdict = {"action": "callback", "callback": helpfunc, "help": helpmsg}
provider = self.options_providers[0]
self.add_optik_option(provider, self.cmdline_parser, opt, optdict)
provider.options += ((opt, optdict),)
helplevel += 1
if config_file is None:
config_file = self.config_file
if config_file is not None:
config_file = expanduser(config_file)
if config_file and exists(config_file):
parser = self.cfgfile_parser
parser.read([config_file])
# normalize sections'title
# mypy: "ConfigParser" has no attribute "_sections"
# dynamic attribute?
for sect, values in list(parser._sections.items()): # type: ignore
if not sect.isupper() and values:
parser._sections[sect.upper()] = values # type: ignore
elif not self.quiet:
msg = "No config file found, using default configuration"
print(msg, file=sys.stderr)
return
[docs] def load_config_file(self) -> None:
"""dispatch values previously read from a configuration file to each
options provider)
"""
parser = self.cfgfile_parser
for section in parser.sections():
for option, value in parser.items(section):
try:
self.global_set_option(option, value)
except (KeyError, OptionError):
# TODO handle here undeclared options appearing in the config file
continue
[docs] def load_configuration(self, **kwargs: Any) -> None:
"""override configuration according to given parameters"""
for opt, opt_value in kwargs.items():
opt = opt.replace("_", "-")
provider = self._all_options[opt]
provider.set_option(opt, opt_value)
[docs] def load_command_line_configuration(self, args: List[str] = None) -> List[str]:
"""override configuration according to command line parameters
return additional arguments
"""
self._monkeypatch_expand_default()
try:
if args is None:
args = sys.argv[1:]
else:
args = list(args)
(options, args) = self.cmdline_parser.parse_args(args=args)
for provider in self._nocallback_options.keys():
config = provider.config
for attr in config.__dict__.keys():
value = getattr(options, attr, None)
if value is None:
continue
setattr(config, attr, value)
return args
finally:
self._unmonkeypatch_expand_default()
# help methods ############################################################
[docs] def add_help_section(self, title: str, description: str, level: int = 0) -> None:
"""add a dummy option section for help purpose"""
group = optik_ext.OptionGroup(
self.cmdline_parser, title=title.capitalize(), description=description
)
# mypy: "OptionGroup" has no attribute "level"
# it does, it is set in the optik_ext module
group.level = level # type: ignore
self._maxlevel = max(self._maxlevel, level)
self.cmdline_parser.add_option_group(group)
def _monkeypatch_expand_default(self) -> None:
# monkey patch optik_ext to deal with our default values
try:
self.__expand_default_backup = optik_ext.HelpFormatter.expand_default
# mypy: Cannot assign to a method
# it's dirty but you can
optik_ext.HelpFormatter.expand_default = expand_default # type: ignore
except AttributeError:
# python < 2.4: nothing to be done
pass
def _unmonkeypatch_expand_default(self) -> None:
# remove monkey patch
if hasattr(optik_ext.HelpFormatter, "expand_default"):
# mypy: Cannot assign to a method
# it's dirty but you can
# unpatch optik_ext to avoid side effects
optik_ext.HelpFormatter.expand_default = self.__expand_default_backup # type: ignore
[docs] def help(self, level: int = 0) -> str:
"""return the usage string for available options"""
# mypy: "HelpFormatter" has no attribute "output_level"
# set in optik_ext
self.cmdline_parser.formatter.output_level = level # type: ignore
self._monkeypatch_expand_default()
try:
return self.cmdline_parser.format_help()
finally:
self._unmonkeypatch_expand_default()
class Method:
"""used to ease late binding of default method (so you can define options
on the class using default methods on the configuration instance)
"""
def __init__(self, methname):
self.method = methname
self._inst = None
def bind(self, instance: "Configuration") -> None:
"""bind the method to its instance"""
if self._inst is None:
self._inst = instance
def __call__(self, *args: Any, **kwargs: Any) -> Dict[str, str]:
assert self._inst, "unbound method"
return getattr(self._inst, self.method)(*args, **kwargs)
# Options Provider #############################################################
[docs]class OptionsProviderMixIn:
"""Mixin to provide options to an OptionsManager"""
# those attributes should be overridden
priority = -1
name = "default"
options: Tuple = ()
level = 0
def __init__(self) -> None:
self.config = optik_ext.Values()
for option_tuple in self.options:
try:
option, optdict = option_tuple
except ValueError:
raise Exception(f"Bad option: {str(option_tuple)}")
if isinstance(optdict.get("default"), Method):
optdict["default"].bind(self)
elif isinstance(optdict.get("callback"), Method):
optdict["callback"].bind(self)
self.load_defaults()
[docs] def load_defaults(self) -> None:
"""initialize the provider using default values"""
for opt, optdict in self.options:
action = optdict.get("action")
if action != "callback":
# callback action have no default
default = self.option_default(opt, optdict)
if default is REQUIRED:
continue
self.set_option(opt, default, action, optdict)
[docs] def option_default(self, opt, optdict=None):
"""return the default value for an option"""
if optdict is None:
optdict = self.get_option_def(opt)
default = optdict.get("default")
if callable(default):
default = default()
return default
[docs] def option_attrname(self, opt, optdict=None):
"""get the config attribute corresponding to opt"""
if optdict is None:
optdict = self.get_option_def(opt)
return optdict.get("dest", opt.replace("-", "_"))
[docs] def option_value(self, opt):
"""get the current value for the given option"""
return getattr(self.config, self.option_attrname(opt), None)
[docs] def set_option(self, opt, value, action=None, optdict=None):
"""method called to set an option (registered in the options list)"""
if optdict is None:
optdict = self.get_option_def(opt)
if value is not None:
value = _validate(value, optdict, opt)
if action is None:
action = optdict.get("action", "store")
if optdict.get("type") == "named": # XXX need specific handling
optname = self.option_attrname(opt, optdict)
currentvalue = getattr(self.config, optname, None)
if currentvalue:
currentvalue.update(value)
value = currentvalue
if action == "store":
setattr(self.config, self.option_attrname(opt, optdict), value)
elif action in ("store_true", "count"):
setattr(self.config, self.option_attrname(opt, optdict), 0)
elif action == "store_false":
setattr(self.config, self.option_attrname(opt, optdict), 1)
elif action == "append":
opt = self.option_attrname(opt, optdict)
_list = getattr(self.config, opt, None)
if _list is None:
if isinstance(value, (list, tuple)):
_list = value
elif value is not None:
_list = []
_list.append(value)
setattr(self.config, opt, _list)
elif isinstance(_list, tuple):
setattr(self.config, opt, _list + (value,))
else:
_list.append(value)
elif action == "callback":
optdict["callback"](None, opt, value, None)
else:
raise UnsupportedAction(action)
[docs] def get_option_def(self, opt):
"""return the dictionary defining an option given it's name"""
assert self.options
for option in self.options:
if option[0] == opt:
return option[1]
# mypy: Argument 2 to "OptionError" has incompatible type "str"; expected "Option"
# seems to be working?
similar_options = get_close_matches(opt, (str(x[0]) for x in self.options), n=10)
if similar_options:
raise OptionError(
"no such option %s in section %r.\n\nOptions with a similar name are:\n * %s"
% (opt, self.name, "\n * ".join(similar_options)),
opt,
) # type: ignore
else:
raise OptionError(
"no such option %s in section %r.\n\nNo option with a similar name, "
"all available options:\n * %s"
% (opt, self.name, "\n * ".join(str(x[0]) for x in self.options)),
opt,
) # type: ignore
[docs] def all_options(self):
"""return an iterator on available options for this provider
option are actually described by a 3-uple:
(section, option name, option dictionary)
"""
for section, options in self.options_by_section():
if section is None:
if self.name is None:
continue
section = self.name.upper()
for option, optiondict, value in options:
yield section, option, optiondict
[docs] def options_by_section(self) -> Iterator[Any]:
"""return an iterator on options grouped by section
(section, [list of (optname, optdict, optvalue)])
"""
sections: Dict[str, List[Tuple[str, Dict[str, Any], Any]]] = {}
for optname, optdict in self.options:
sections.setdefault(optdict.get("group"), []).append(
(optname, optdict, self.option_value(optname))
)
if None in sections:
# mypy: No overload variant of "pop" of "MutableMapping" matches argument type "None"
# it actually works
yield None, sections.pop(None) # type: ignore
for section, options in sorted(sections.items()):
yield section.upper(), options
[docs] def options_and_values(self, options=None):
if options is None:
options = self.options
for optname, optdict in options:
yield (optname, optdict, self.option_value(optname))
# configuration ################################################################
[docs]class ConfigurationMixIn(OptionsManagerMixIn, OptionsProviderMixIn):
"""basic mixin for simple configurations which don't need the
manager / providers model
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
if not args:
kwargs.setdefault("usage", "")
kwargs.setdefault("quiet", 1)
OptionsManagerMixIn.__init__(self, *args, **kwargs)
OptionsProviderMixIn.__init__(self)
if not getattr(self, "option_groups", None):
self.option_groups: List[Tuple[Any, str]] = []
for option, optdict in self.options:
try:
gdef = (optdict["group"].upper(), "")
except KeyError:
continue
if gdef not in self.option_groups:
self.option_groups.append(gdef)
self.register_options_provider(self, own_group=False)
[docs] def register_options(self, options):
"""add some options to the configuration"""
options_by_group = {}
for optname, optdict in options:
options_by_group.setdefault(optdict.get("group", self.name.upper()), []).append(
(optname, optdict)
)
for group, group_options in options_by_group.items():
self.add_option_group(group, None, group_options, self)
self.options += tuple(options)
[docs] def load_defaults(self):
OptionsProviderMixIn.load_defaults(self)
def __iter__(self):
return iter(self.config.__dict__.items())
def __getitem__(self, key):
try:
return getattr(self.config, self.option_attrname(key))
except (optik_ext.OptionValueError, AttributeError):
raise KeyError(key)
def __setitem__(self, key, value):
self.set_option(key, value)
[docs] def get(self, key, default=None):
try:
return self[key]
except (OptionError, KeyError):
return default
[docs]class Configuration(ConfigurationMixIn):
"""class for simple configurations which don't need the
manager / providers model and prefer delegation to inheritance
configuration values are accessible through a dict like interface
"""
def __init__(
self, config_file=None, options=None, name=None, usage=None, doc=None, version=None
):
if options is not None:
self.options = options
if name is not None:
self.name = name
if doc is not None:
self.__doc__ = doc
super(Configuration, self).__init__(config_file=config_file, usage=usage, version=version)
[docs]class OptionsManager2ConfigurationAdapter:
"""Adapt an option manager to behave like a
`logilab.common.configuration.Configuration` instance
"""
def __init__(self, provider):
self.config = provider
def __getattr__(self, key):
return getattr(self.config, key)
def __getitem__(self, key):
provider = self.config._all_options[key]
try:
return getattr(provider.config, provider.option_attrname(key))
except AttributeError:
raise KeyError(key)
def __setitem__(self, key, value):
self.config.global_set_option(self.config.option_attrname(key), value)
[docs] def get(self, key, default=None):
try:
return self[key]
except KeyError:
return default
# other functions ##############################################################
def read_old_config(newconfig, changes, configfile):
"""initialize newconfig from a deprecated configuration file
possible changes:
* ('renamed', oldname, newname)
* ('moved', option, oldgroup, newgroup)
* ('typechanged', option, oldtype, newvalue)
"""
# build an index of changes
changesindex = {}
for action in changes:
if action[0] == "moved":
option, oldgroup, newgroup = action[1:]
changesindex.setdefault(option, []).append((action[0], oldgroup, newgroup))
continue
if action[0] == "renamed":
oldname, newname = action[1:]
changesindex.setdefault(newname, []).append((action[0], oldname))
continue
if action[0] == "typechanged":
option, oldtype, newvalue = action[1:]
changesindex.setdefault(option, []).append((action[0], oldtype, newvalue))
continue
if action[0] in ("added", "removed"):
continue # nothing to do here
raise Exception(f"unknown change {action[0]}")
# build a config object able to read the old config
options = []
for optname, optdef in newconfig.options:
for action in changesindex.pop(optname, ()):
if action[0] == "moved":
oldgroup, newgroup = action[1:]
optdef = optdef.copy()
optdef["group"] = oldgroup
elif action[0] == "renamed":
optname = action[1]
elif action[0] == "typechanged":
oldtype = action[1]
optdef = optdef.copy()
optdef["type"] = oldtype
options.append((optname, optdef))
if changesindex:
raise Exception(f"unapplied changes: {changesindex}")
oldconfig = Configuration(options=options, name=newconfig.name)
# read the old config
oldconfig.load_file_configuration(configfile)
# apply values reverting changes
changes.reverse()
done = set()
for action in changes:
if action[0] == "renamed":
oldname, newname = action[1:]
newconfig[newname] = oldconfig[oldname]
done.add(newname)
elif action[0] == "typechanged":
optname, oldtype, newvalue = action[1:]
newconfig[optname] = newvalue
done.add(optname)
for optname, optdef in newconfig.options:
if optdef.get("type") and optname not in done:
newconfig.set_option(optname, oldconfig[optname], optdict=optdef)
def merge_options(options, optgroup=None):
"""preprocess a list of options and remove duplicates, returning a new list
(tuple actually) of options.
Options dictionaries are copied to avoid later side-effect. Also, if
`otpgroup` argument is specified, ensure all options are in the given group.
"""
alloptions = {}
options = list(options)
for i in range(len(options) - 1, -1, -1):
optname, optdict = options[i]
if optname in alloptions:
options.pop(i)
alloptions[optname].update(optdict)
else:
optdict = optdict.copy()
options[i] = (optname, optdict)
alloptions[optname] = optdict
if optgroup is not None:
alloptions[optname]["group"] = optgroup
return tuple(options)