Source code for coccigrep

# Copyright (C) 2011 Eric Leblond <eric@regit.org>
#
# You can copy, redistribute or modify this Program under the terms of
# the GNU General Public License version 3 as published by the Free
# Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# version 3 along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.

from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
from os import unlink, path, listdir, getcwd
from sys import exit, stderr
from string import Template
from ConfigParser import RawConfigParser
import re

have_multiprocessing  = True
try:
    from multiprocessing import Process, Pipe
except:
    have_multiprocessing = False

have_pygments = True
try:
    from pygments import highlight
    from pygments.lexers import CLexer
    from pygments.filters import NameHighlightFilter
    from pygments.formatters import Terminal256Formatter, HtmlFormatter
except:
    have_pygments = False

[docs]class CocciException(Exception): """ Generic class for coccigrep exception """ def __init__(self, value): self.value = value def __str__(self): return self.value
[docs]class CocciConfigException(CocciException): """ Exception raised when configuration parameter are not correct. For example, it is returned if spatch command can not be found. """ def __init__(self, value): self.value = value def __str__(self): return self.value
[docs]class CocciRunException(CocciException): """ Exception raised when running parameters are not correct. For example, it is returned if a required argument is missing. """ def __init__(self, value): self.value = value def __str__(self): return self.value
[docs]class CocciGrepConfig: """ Configuration handling class This class parses configuration and can be used to access to configuration item via get operations. This is mainly a wrapper around configparser. """ def __init__(self): self.configbasename = 'coccigrep' self.config = RawConfigParser() self.global_config = RawConfigParser() self.parse_config()
[docs] def parse_config(self): """ Parse the hierarchy of configuration files """ paths = [ path.join('/etc', self.configbasename), path.join(path.expanduser('~'), '.%s' % self.configbasename), path.join(getcwd(), '.%s' % self.configbasename), ] for cpath in paths: if path.isfile(cpath): self.config.read(cpath) # Parse global configuration to have sane default cpath = path.join(path.dirname(__file__), '%s.cfg' % self.configbasename) if path.isfile(cpath): self.global_config.read(cpath) else: raise CocciException('No package config file: %s' % (cpath))
[docs] def get(self, section, value): """ Get value for a configuration item :param section: name of the section in the ini file :type section: str :param value: name of the value under the section :type value: str :return: value of option as a str """ try: return self.config.get(section, value) except: return self.global_config.get(section, value)
[docs] def getint(self, section, value): """ Get value for a configuration item returned as int :param section: name of the section in the ini file :type section: str :param value: name of the value under the section :type value: str :return: value of option as a int """ try: return self.config.getint(section, value) except: return self.global_config.getint(section, value)
[docs] def getboolean(self, section, value): """ Get value for a configuration item returned as boolean :param section: name of the section in the ini file :type section: str :param value: name of the value under the section :type value: str :return: value of option as a boolean """ try: return self.config.getboolean(section, value) except: return self.global_config.getboolean(section, value)
[docs]class CocciMatch: """ Store a match and take care of its display """ def __init__(self, mfile, mline, mcol, mlineend, mcolend): self.file = mfile self.line = int(mline) self.column = int(mcol) self.lineend = int(mlineend) self.columnend = int(mcolend)
[docs] def display(self, stype, mode='raw', oformat='term', before=0, after=0): """ Display output for a single match :param mode: display mode :type mode: str :param oformat: format of output for color (term, html) :type oformat: str :param before: number of lines to display before match :type before: int :param after: number of lines to display after match :type after: int """ f = open(self.file, 'r') lines = f.readlines() pmatch = lines[self.line -1][self.column:self.columnend] output = "" if mode == 'color': output += "%s: l.%s -%d, l.%s +%d, %s *%s\n" % (self.file, self.line, before, self.line, after, stype, pmatch) for i in range(int(self.line) - 1 - before, int(self.line) + after): if mode == 'color': output += lines[i] elif mode == 'vim': output += "%s|%s| (%s *%s): %s" % (self.file, self.line, stype, pmatch, lines[i]) elif mode == 'emacs': output += "%s:%s: (%s *%s): %s" % (self.file, self.line, stype, pmatch, lines[i]) else: output += "%s:%s (%s *%s): %s" % (self.file, self.line, stype, pmatch, lines[i]) f.close() if mode == 'color': if have_pygments: lexer = CLexer() lfilter = NameHighlightFilter(names=[pmatch]) lexer.add_filter(lfilter) if oformat == "term": return highlight(output, lexer, Terminal256Formatter()) elif oformat == "html": return highlight(output, lexer, HtmlFormatter(noclasses=True)) else: return output return output
[docs]class CocciProcess: """ Class used for running spatch command in the case of multiprocessing """ def __init__(self, cmd, verbose): self.process = Process(target=self.execute, args=(self, )) self.output, self.input = Pipe() self.cmd = cmd self.verbose = verbose
[docs] def execute(self, option=''): output = "" try: if self.verbose: stderr.write("Running: %s." % " ".join(self.cmd)) output = Popen(self.cmd, stdout=PIPE).communicate()[0] else: output = Popen(self.cmd, stdout=PIPE, stderr=PIPE).communicate()[0] except Exception, err: import pickle output = pickle.dumps(err) pass self.input.send(output) self.input.close()
[docs] def start(self): self.process.start()
[docs] def join(self): self.process.join()
[docs] def recv(self): return self.output.recv()
[docs]class CocciGrep: """ Core class of the module: setup and run. This class is the core of the module. It is responsible of initialisation and running of the request. """ spatch="spatch" cocci_python=""" @ script:python @ p1 << init.p1; @@ for p in p1: print "%s:%s:%s:%s:%s" % (p.file,p.line,p.column,p.line_end,p.column_end) """ def __init__(self): self.verbose = False self.spatch = CocciGrep.spatch self.ncpus = 1 self.operations = {} self.process = [] dirList = listdir(self.get_datadir()) for fname in dirList: op = path.split(fname)[-1].replace('.cocci','') self.operations[op] = path.join(self.get_datadir(), fname)
[docs] def setup(self, stype, attribut, operation): """ :param stype: structure name, used to replace '$type' in the cocci file :type stype: str :param attribut: basically attribut of the structure, used to replace '$attribut' in the cocci file :type attribut: str :param operation: search operation to do :type operation: str :raise: :class:`CocciRunException` """ if stype == None: raise CocciRunException("Can't use coccigrep without type to search") self.type = stype self.attribut = attribut self.operation = operation
[docs] def set_concurrency(self, ncpus): """ Set concurrency level (number of spatch command to run in parallel) :param ncpus: number of process to launch in parallel :type ncpus: int """ if have_multiprocessing: self.ncpus = ncpus return True else: return False
[docs] def set_spatch_cmd(self, cmd): """ Set path or command name for spatch :param cmd: Name of parth of the spatch command :type cmd: str """ self.spatch = cmd
[docs] def get_datadir(self): this_dir, this_filename = path.split(__file__) datadir = path.join(this_dir, "data") return datadir
[docs] def get_operations(self): """ Get list of available operations :return: list of operations in a list of str """ return self.operations.keys()
[docs] def get_operation_name(self, fname): return path.split(fname)[-1].replace('.cocci','')
[docs] def add_operations(self, new_ops): """ Add operation to the list of supported operations :param new_ops: list of filenames (ending by .cocci) :type new_ops: list of str """ if len(new_ops) == 0: return file_filter = re.compile('^[^\.].*\.cocci$') for fname in new_ops: # file need to end with cocci if file_filter.match(fname): op = path.split(fname)[-1].replace('.cocci','') self.operations[op] = fname
[docs] def set_verbose(self): """ Activate verbose mode """ self.verbose = True
[docs] def run(self, files): """ Run the search against the files given in argument This function is doing the main job. It will run spatch with the correct parameters by using subprocess or it will use multiprocessing if a concurrency level greater than 1 has been asked. :param files: list of filenames :type files: list of str :raise: :class:`CocciRunException` or :class:`CocciConfigException` """ if len(files) == 0: raise CocciRunException("Can't use coccigrep without files to search") for cfile in files: if not path.isfile(cfile): raise CocciRunException("'%s' is not a file, can't continue" % cfile) # create tmp cocci file: tmp_cocci_file = NamedTemporaryFile(suffix=".cocci", delete=False) tmp_cocci_file_name = tmp_cocci_file.name # open file with name matching operation cocci_file = open(self.operations[self.operation], 'r') # get the string and build template cocci_tmpl = cocci_file.read() cocci_smpl_tmpl = Template(cocci_tmpl) cocci_file.close() cocci_smpl = cocci_smpl_tmpl.substitute(type=self.type, attribut=self.attribut) cocci_grep = cocci_smpl + CocciGrep.cocci_python tmp_cocci_file.write(cocci_grep) tmp_cocci_file.close() # launch spatch output = "" if self.ncpus > 1 and len(files) > 1: fseq = [] splitsize = 1.0/self.ncpus*len(files) for i in range(self.ncpus): rfiles = files[int(round(i*splitsize)):int(round((i+1)*splitsize))] if len(rfiles) >= 1: fseq.append(rfiles) for sub_files in fseq: cmd = [self.spatch, "-sp_file", tmp_cocci_file.name] + sub_files sprocess = CocciProcess(cmd, self.verbose) sprocess.start() self.process.append(sprocess) for process in self.process: ret = process.recv() process.join() # CocciProcess return a serialized exception in case of exception if ret.startswith('cexceptions\n'): import pickle import errno err = pickle.loads(ret) unlink(tmp_cocci_file_name) if err.errno in (errno.ENOENT, errno.ENOEXEC): raise CocciConfigException("Unable to run spatch command '%s': %s." % (cmd[0], err.strerror)) else: raise CocciRunException("Unable to run '%s': %s." % (" ".join(cmd), err.strerror)) else: output += ret else: cmd = [self.spatch, "-sp_file", tmp_cocci_file.name] + files try: if self.verbose: stderr.write("Running: %s." % " ".join(cmd)) output = Popen(cmd, stdout=PIPE).communicate()[0] else: output = Popen(cmd, stdout=PIPE, stderr=PIPE).communicate()[0] except OSError, err: import errno unlink(tmp_cocci_file_name) if err.errno in (errno.ENOENT, errno.ENOEXEC): raise CocciConfigException("Unable to run spatch command '%s': %s." % (cmd[0], err.strerror)) else: raise CocciRunException("Unable to run '%s': %s." % (" ".join(cmd), err.strerror)) unlink(tmp_cocci_file_name) prevfile = None prevline = None self.matches = [] for ematch in output.split("\n"): try: (efile, eline, ecol,elinend, ecolend) = ematch.split(":") nmatch = CocciMatch(efile, eline, ecol,elinend, ecolend) # if there is equality then we will already display the line if (efile == prevfile) and (eline == prevline): continue else: prevfile = efile prevline = eline self.matches.append(nmatch) except ValueError: pass
[docs] def display(self, mode='raw', before=0, after=0, oformat='term'): """ Display output for complete request :param mode: display mode :type mode: str :param before: number of lines to display before match :type before: int :param after: number of lines to display after match :type after: int :param oformat: format of output for color (term, html) :type oformat: str :return: the result of the search as a str """ output = "" for match in self.matches: output += match.display(self.type, mode=mode, oformat=oformat, before=before, after=after) return output.rstrip()