import glob
import inspect
import logging
import re
from importlib.util import find_spec, module_from_spec
from zope.interface import Attribute, Interface, implementer
from zope.interface.exceptions import BrokenImplementation
from zope.interface.verify import verifyObject
from detectem.settings import PLUGIN_PACKAGES
logger = logging.getLogger("detectem")
LANGUAGE_TAGS = [
"php",
"python",
"ruby",
"perl",
"node.js",
"javascript",
"asp.net",
"java",
"go",
"ruby on rails",
"cfml",
]
FRAMEWORK_TAGS = [
"django",
"angular",
"backbone",
"react",
"symfony",
"bootstrap",
"vue",
"laravel",
"woltlab",
"knockout",
"ember",
]
PRODUCT_TAGS = [
"wordpress",
"mysql",
"jquery",
"mootools",
"apache",
"iis",
"nginx",
"ssl",
"joomla!",
"drupal",
"underscore.js",
"marionette.js",
"moment timezone",
"moment.js",
"devtools",
"teamcity",
"google code prettyfy",
"solr",
"postgresql",
"octopress",
"k2",
"sobi 2",
"sobipro",
"virtuemart",
"tomcat",
"coldfusion",
"jekill",
"less",
"windows server",
"mysql",
"waf",
"webpack",
]
CATEGORY_TAGS = [
"cms",
"seo",
"blog",
"advertising networks",
"analytics",
"wiki",
"document management system",
"miscellaneous",
"message board",
"angular",
"js framework",
"web framework",
"visualization",
"graphics",
"web server",
"wiki",
"editor",
"ecommerce",
"accounting",
"database manager",
"photo gallery",
"issue tracker",
"mobile framework",
"slider",
"accounting",
"programming language",
"hosting panel",
"lms",
"js graphic",
"exhibit",
"marketing automation",
"search engine",
"documentation tool",
"database",
"template engine",
"module bundler",
]
HARDWARE_TAGS = ["router", "hmi"]
PLUGIN_TAGS = (
LANGUAGE_TAGS + FRAMEWORK_TAGS + PRODUCT_TAGS + CATEGORY_TAGS + HARDWARE_TAGS
)
class PluginCollection(object):
def __init__(self):
self._plugins = {}
def __len__(self):
return len(self._plugins)
def add(self, ins):
self._plugins[ins.name] = ins
def get(self, name):
return self._plugins.get(name)
def get_all(self):
return self._plugins.values()
def with_version_matchers(self):
return [p for p in self._plugins.values() if p.is_version]
def with_dom_matchers(self):
return [p for p in self._plugins.values() if p.is_dom]
def with_generic_matchers(self):
return [p for p in self._plugins.values() if p.is_generic]
class _PluginLoader:
def __init__(self):
self.plugins = PluginCollection()
def _full_class_name(self, ins):
return "{}.{}".format(ins.__class__.__module__, ins.__class__.__name__)
def _get_plugin_module_paths(self, plugin_dir):
""" Return a list of every module in `plugin_dir`. """
filepaths = [
fp
for fp in glob.glob("{}/**/*.py".format(plugin_dir), recursive=True)
if not fp.endswith("__init__.py")
]
rel_paths = [re.sub(plugin_dir.rstrip("/") + "/", "", fp) for fp in filepaths]
module_paths = [rp.replace("/", ".").replace(".py", "") for rp in rel_paths]
return module_paths
def _is_plugin_ok(self, instance):
"""Return `True` if:
1. Plugin meets plugin interface.
2. Is not already registered in the plugin collection.
3. Have accepted tags.
Otherwise, return `False` and log warnings.
"""
try:
verifyObject(IPlugin, instance)
except BrokenImplementation:
logger.warning(
"Plugin '%(name)s' doesn't provide the plugin interface",
{"name": self._full_class_name(instance)},
)
return False
# Check if the plugin is already registered
reg = self.plugins.get(instance.name)
if reg:
logger.warning(
"Plugin '%(name)s' by '%(instance)s' is already provided by '%(reg)s'",
{
"name": instance.name,
"instance": self._full_class_name(instance),
"reg": self._full_class_name(reg),
},
)
return False
for tag in instance.tags:
if tag not in PLUGIN_TAGS:
logger.warning(
"Invalid tag '%(tag)s' in '%(instance)s'",
{"tag": tag, "instance": self._full_class_name(instance)},
)
return False
return True
def load_plugins(self, plugins_package):
""" Load plugins from `plugins_package` module. """
try:
# Resolve directory in the filesystem
plugin_dir = find_spec(plugins_package).submodule_search_locations[0]
except ImportError:
logger.error(
"Could not load plugins package '%(pkg)s'", {"pkg": plugins_package}
)
return
for module_path in self._get_plugin_module_paths(plugin_dir):
# Load the module dynamically
spec = find_spec("{}.{}".format(plugins_package, module_path))
m = module_from_spec(spec)
spec.loader.exec_module(m)
# Get classes from module and extract the plugin classes
classes = inspect.getmembers(m, predicate=inspect.isclass)
for _, klass in classes:
# Avoid imports processing
if klass.__module__ != spec.name:
continue
# Avoid classes not ending in Plugin
if not klass.__name__.endswith("Plugin"):
continue
instance = klass()
if self._is_plugin_ok(instance):
self.plugins.add(instance)
def load_plugins():
""" Return the list of plugin instances. """
loader = _PluginLoader()
for pkg in PLUGIN_PACKAGES:
loader.load_plugins(pkg)
return loader.plugins
[docs]class IPlugin(Interface):
name = Attribute(""" Name to identify the plugin. """)
homepage = Attribute(""" Plugin homepage. """)
tags = Attribute(""" Tags to categorize plugins """)
matchers = Attribute(""" List of matchers """)
[docs]@implementer(IPlugin)
class Plugin:
"""Class used by normal plugins.
It implements :class:`~IPlugin`.
"""
ptype = "normal"
def get_matchers(self, matcher_type):
return [m[matcher_type] for m in self.matchers if matcher_type in m]
def get_grouped_matchers(self):
"""Return dictionary of matchers (not empty ones)
with matcher type as key and matcher list as value.
"""
data = {}
for matcher_type in ["url", "body", "header", "xpath", "dom"]:
matcher_list = self.get_matchers(matcher_type)
if matcher_list:
data[matcher_type] = matcher_list
return data
@property
def is_version(self):
return self.ptype == "normal"
@property
def is_dom(self):
return any([m for m in self.matchers if "dom" in m])
@property
def is_generic(self):
return self.ptype == "generic"
[docs]class GenericPlugin(Plugin):
""" Class used by generic plugins. """
ptype = "generic"
def get_information(self, entry):
raise NotImplementedError()