Source code for fsquass.__init__

# coding: utf-8
"""
FsQuass is a filesystem query and traversing library, a pythonic jQuery for filesystem.

Still work in progress.
"""
# from weakref import WeakValueDictionary
import os
import gettext
import logging
import re
import sys

from fnmatch import fnmatch
from functools import partial
from itertools import chain, product
from os import path
from shutil import rmtree

__version__ = '0.1.0'

_ = lambda x: x  # placeholder for translation
__all__ = ['Fs', 'Dir', 'File']


def escaped_split(delimiter, string):
	"""
	Splits *string* by *delimiters* that are not escaped by backslash.
	Unescapes the strings after splitting.
	"""
	return [i.replace('\\' + delimiter, delimiter)
		for i in re.split(r'(?<![^\\]\\)' + delimiter, string)]


[docs]class Fs(set): """ **Files set**. Is a :py:class:`set` of :py:class:`File` and :py:class:`Dir` instances with traversal methods. Besides the methods inherited from :py:class:`set`, it has some methods and properties specific to file systems. *nodes* can be a string or an iterable. of :py:class:`File` and :py:class:`Dir` instances. If *nodes* is a string, it's treated differently depending on what it starts with: * ``/``, files are matched from those in the root directory and further, without scanning the whole filesystem. * ``./``, the next name will be searched inside the current directory, without recursive scanning. * ``~``, home folder will be opened * ``~/``, home folder will be opened, and it's children will be matched, without recursive scanning. A space is treated like in CSS, a recursive search for descendants. E.g. .. code-block:: python Fs('/home/user tests/__init__.py') will * find ``/home/user``, * then recursively scan both for files and directories named ``tests``, * then will search for ``__init__.py`` inside those directories, but not deeper. Note: Recursive scans can be expensive. If you """ """ TODO: * Fs.has() * Fs.andSelf() """ def __init__(self, nodes=None): if isinstance(nodes, basestring) and nodes.strip() == '/': nodes = [Dir('/')] elif isinstance(nodes, basestring): pattern, nodes = nodes, [] for patterns in self._patterns(pattern): # if query is 'path/to/folder path/to/another' (the part after the space is descendant, not necessarily child of 'folder') # then patterns are [['path', 'to', 'folder'], ['path', 'to', 'another']] if patterns[0][0] == '' and len(patterns[0]) > 1: # '/something' becomes ['', 'something'], hence search from the fs root patterns[0].pop(0) d = Dir('/') elif patterns[0][0] in ('.', '~', '..'): d = Dir(patterns[0].pop(0)) else: # either '/ something' (space means scanning inside fs root), or 'something/...' (which is the same) raise ValueError(_("Forbidden query '%s'. To do system-wide scan, use Fs('/').find('. <your query>')") % nodes) local_nodes = [d] for i, p in enumerate(patterns): local_nodes = Fs(local_nodes)._recursive_find(p, deeper=i > 0) nodes.extend(local_nodes) super(Fs, self).__init__(nodes or [])
[docs] def children(self, pattern=None): """ Returns a set of children of all the set items filtered by *pattern*. """ if len(self): return reduce(self.__class__.__or__, (i.children(pattern) for i in self)) return self
[docs] def closest(self, pattern): """ Finds the closest ancestors by pattern. """ return self._get_ancestors(pattern) - self
[docs] def filter(self, pattern): """ Filters items_list by *patten*. Filtering a set of paths is equal to an intersection of the set and of a set found by *pattern*: .. code-block:: python dirs = Fs('/home/siberiano;/home;/tmp;/tmp/siberiano') dirs.filter('siberiano') == dirs & Fs('/').find('. siberiano') """ return self._get_ancestors(pattern) & self
@staticmethod def _filter_children_generator(node): path_sections = escaped_split(path.sep, node.path) def child_getter(fs): item = list(fs)[0] path_depth = 1 if item.path == '/' else len(escaped_split(path.sep, item.path)) if path_depth < len(path_sections): return [File(path.sep.join(path_sections[:path_depth + 1]))] return [] return child_getter
[docs] def find(self, pattern): """ Searches by *pattern* inside the set items. Returns a new Fs instance. E.g. if we have a set ``fs`` of these paths:: /home/user/ /root ``fs.find('.bashrc')`` will probably output:: /home/user/.bashrc /root/.bashrc If you need to find multiple paths, separate them with semicolon: .. code-block:: python Fs('/home/siberiano').find('.bashrc;Work/project/templates base.haml') Will search for ``.bashrc`` file in my homefolder (but not deeper) and inside ~/Work/project/templates will recursively search for ``base.haml`` files. To avoid accidental scanning of the entire filesystem, recursive search is made harder. Use dot and space in the beginning if you need it anyway: .. code-block:: python # scan the entire filesystem for 'siberiano' Fs('/').find('. siberiano') # scans for files & directories named 'project' inside Work Fs('/home/siberiano/Work').find('. project') """ if pattern.strip() == '': raise ValueError('Search pattern must be non-empty.') result = Fs() for patterns in self._patterns(pattern): if patterns[0] in ([''], ['', '']): raise ValueError(_("Can't start search query with space. To do filesystem scan, use dot-space: '. query'")) for i, p in enumerate(patterns): if p[0] == '': raise ValueError(_("Can't search from root (/) inside an Fs ('%s').") % '/'.join(p)) if i > 0 and '.' in p: raise ValueError(_("Can't use '.' in descendants ('%s')") % '/'.join(p)) local_result = self p = patterns.pop(0) if p != ['.']: # '. name' is the way to scan the filesystem local_result = Fs(local_result._recursive_find(p, deeper=False)) for p in patterns: local_result = Fs(local_result._recursive_find(p)) result |= local_result return result
[docs] def first(self): """ Returns the first item from the set. A shortcut for ``iter(fs).next()`` """ return Fs(iter(self).next())
def _get_ancestors(self, pattern): if not pattern: return self result = self.__class__() for n in self: for patterns in self._patterns(pattern): local_result = Fs('/') for p in patterns: child_getter = self._filter_children_generator(n) local_result = Fs(local_result._recursive_find(p, child_getter)) result |= local_result return result
[docs] def exclude(self, pattern): """ Exclude items that match pattern. """ if isinstance(pattern, basestring): return self - self.filter(pattern) if isinstance(pattern, Fs): return self - pattern raise ValueError(_('pattern must be either a string or an Fs instance. Got %s instead.') % pattern)
def _link(self, target, multiple_targets=False, name_callback=None, link_function=os.symlink): if not isinstance(target, Fs): target = Fs(target) if target.filter(':dir') != target: raise ValueError(_("Target(s) is not a directory: %s") % target) if not callable(link_function): raise ValueError(_("Link function must be a callable.")) for source, target_dir in product(self, target): if not multiple_targets: target = target.first() s, t = Fs(source), Fs(target_dir) if callable(name_callback): s, t = name_callback(s, t) link_function(s.pop().path, t.pop().path)
[docs] def linkTo(self, target, multiple_targets=False, name_callback=None): """ Makes a hard link to all the set members in *target* folder. * target must be a set of 1 or more directories (:py:class:`Dir` instances). * if *multiple_targets* parameter is ``True``, links will be made in all the *target* folders. If *multiple_targets* is ``False``, then will link in the first *target* folder only. Optional *name_callback* should work like this: .. code-block:: python def name_callback(source, target): # source & target are Fs instances with 1 member each return source, target """ self.symlinkTo(target, multiple_targets, name_callback, os.link)
[docs] def symlinkTo(self, target, multiple_targets=False, name_callback=None): """ Makes a symbolic link in *target* folder like :py:func:`Fs.linkTo` """ self._link(target, multiple_targets, name_callback, os.symlink)
[docs] def parents(self): """ Returns a set of parents of all the items, e.g. for .. code-block:: none /home/user/.bashrc /home/user/.hgrc /tmp/test /tmp parents will be .. code-block:: none /home/user /tmp / """ return self.__class__(chain.from_iterable(i.parent for i in self))
@property
[docs] def paths(self): """ A generator of paths of all the items. """ return (i.path for i in self)
@staticmethod def _pattern_level_match(pattern, item): """ Match item's basename against pattern at one level, i.e. between slashes:: .../siberiano/... /tmp/... *pattern* can contain subpatterns:: {home,tmp} {etc,usr,var,tmp} {dropbox:ignorecase,Pictures} """ find_match = re.match(r'^{([^{}]+)}$', pattern) # pattern may contain multiple subpatterns: /{home,tmp,*oot}/ if find_match: subpatterns = escaped_split(',', find_match.groups()[0]) elif re.findall(r'[^\\][\{\},]', pattern): raise ValueError(_('Path query string "%s" contains illegal characters') % pattern) else: subpatterns = [pattern] sp_func = partial(Fs._subpattern_match, item=item) if not any(map(sp_func, subpatterns)): return False return True @staticmethod def _patterns(pattern): for p in escaped_split(';', pattern): yield [escaped_split(path.sep, i) for i in escaped_split(' ', p)] def _recursive_find(self, pattern, get_children=None, deeper=True): """ Searches recursively through filesystem, both for direct children and for descendants. * *pattern* is a list of strings (either of which may contain subpatterns) * *deeper* is a flag whether this path should be searched among children (fs scan) * *get_children* is an optional function for the purpose of filtering. Normally you call :py:func:`Fs.children` and match them with *pattern*. In case you want to filter, you don't need to scan the real filesystem. Search scope is already limited to a node path (/path/to/node), which means for '/path' get_children should return '/path/to' only, without it siblings from the real hard drive. Custom get_children function that is used in :py:func:`Fs.filter` does this. """ get_children = get_children or (lambda s: s.children()) yielder = [] if pattern in ([], ['']): yielder.append(self) elif pattern[0] == '..': yielder.append(self.parents()._recursive_find(pattern[1:], get_children, False)) else: for n in get_children(self): if deeper and isinstance(n, Dir): yielder.append(Fs([n])._recursive_find(pattern, get_children)) logging.debug([n, pattern]) r = [] if self._pattern_level_match(pattern[0], n): r = [n] if r and len(pattern) > 1: r = Fs(r)._recursive_find(pattern[1:], get_children, False) yielder.append(r) for i in chain(*yielder): yield i def __repr__(self): return '%s([%s])' % (self.__class__.__name__, ', '.join(map(str, self)))
[docs] def siblings(self, pattern=None): """ Finds all the siblings of the files in set, filtered by *pattern*. The result will not include any files of the original set. """ return self.parents().children(pattern) - self
@staticmethod def _subpattern_match(subpattern, item): """ Matches ``item.basename`` against *subpattern*. * ``subpattern`` - string containing text or unix-like patterns ``?``, ``*``. * ``item`` - a :py:class:`File` instance. """ pseudo_classes = escaped_split(':', subpattern) subpattern = pseudo_classes.pop(0) basename = item.basename if 'ignorecase' in pseudo_classes: subpattern = subpattern.lower() basename = item.basename.lower() if subpattern and not fnmatch(basename, subpattern): return False # stop searching through subpatterns of current level if (('file' in pseudo_classes and isinstance(item, Dir)) or ('dir' in pseudo_classes and not isinstance(item, Dir))): return False return True
[docs]class File(object): """ A file or a directory. Contains self.path, and if an object with the same absolute path is instantiated, an existing item is returned. If *full_path* is unaccessible, :exc:`EnvironmentError` is raised. """ """ TODO: chmod chown lchown makedirs readlink stat walk """ _data = {} # WeakValueDictionary() def __new__(cls, full_path): """ Normalizes path and checks if an instance for this path already exists. """ full_path = File._normalize(full_path) cls._assert_path(full_path) if path.isdir(full_path): cls = Dir if full_path not in cls._data: cls._data[full_path] = object.__new__(cls, full_path) return cls._data[full_path] def __init__(self, full_path): self.path = File._normalize(full_path) @staticmethod def _assert_path(full_path): """ Checks if path is visible, which means full_path is either of these: * a file (or a working and accessible symlink) * a symlink, wich may be broken or inaccessible """ if not(path.exists(full_path) or path.islink(full_path)): raise OSError(_('Path unreachable: %s') % full_path) @property
[docs] def basename(self): """ String basename of the file. """ return path.basename(self.path)
[docs] def children(self, pattern=None): """ Returns an Fs of child nodes. Makes sense in Dir only, but put here for compatibility. """ return Fs()
def __contains__(self, other): if not isinstance(other, File): raise ValueError('Tried to compare if {0} is in {1}. {0} must be an fsquass.File instance.'.format(other, self)) return other.path.startswith(self.path)
[docs] def delete(self, sure=False): """ Deletes the file if *sure* is ``True``. If you managed to call it like this, don't blame the library for any lost data. """ if sure == True: os.remove(self.path)
def __iter__(self): return [] @staticmethod def _normalize(dirty_path): return path.abspath(path.normpath(path.expanduser(dirty_path)))
[docs] def open(self, *args, **kwargs): """ Wrapper to Python ``open()``. """ open(self.path, *args, **kwargs)
@property
[docs] def parent(self): """ Returns an Fs with the parent directory. """ par = path.normpath(path.join(self.path, path.pardir)) return Fs([Dir(par)] if par != self.path else None)
def __repr__(self): return "%s('%s')" % (self.__class__.__name__, self.path)
[docs]class Dir(File): """ Directory. Returns its directories and files in children() method. * Is iterable: .. code-block:: python for i in Dir('/home/siberiano'): print i will print files and directories in the folder. This allows using such tricks as using a :py:class:`Dir` to get a :py:class:`Fs` of it's children: .. code-block:: python >>> d = Dir('/') >>> Fs(d) == d.children() True * Can check if contains another :py:class:`File` or :py:class:`Dir`: .. code-block:: python >>> Dir('/home') in Dir('/') True >>> Dir('/tmp') in Dir('/home') False """ def __init__(self, full_path): super(Dir, self).__init__(full_path) if not path.isdir(self.path): raise ValueError(_('path %s is not a directory') % self.path)
[docs] def children(self, pattern=None): """ Lists the directory and returns :py:class:`Fs` of the files, filtered by *pattern*. """ try: filenames = os.listdir(self.path) except OSError: return Fs() t = partial(path.join, self.path) return Fs(map(File, map(t, filenames))).filter(pattern)
[docs] def delete(self, sure=False): """ Deletes the directory with all files and directories in it if *sure* is ``True``. If you managed to call it like this, don't blame the library for any lost data. """ if sure == True: rmtree(self.path, ignore_errors=True)
def __iter__(self): return iter(self.children())
[docs] def open(self, *args, **kwargs): """ Raises TypeError, since directories can't be opened like files. """ raise TypeError(_("Directories can't be opened like files")) # paths = Fs(__file__).parents().parents().find('./locale').paths #_ = gettext.translation('fsquass', ''.join(paths)).ugettext
def main(args): if not args: print _('Usage: fsquass "[search string]"\n(Enclose search string in quotes.)') sys.exit(1) try: found = Fs(' '.join(args)) except KeyboardInterrupt: print _('Interrupted by user.') sys.exit(1) except SystemExit: raise except: import traceback if not logging.root.handlers: logging.basicConfig() skip_it = False exc_info = sys.exc_info() if hasattr(exc_info[0], "__name__"): exc_class, exc, tb = exc_info if isinstance(exc, IOError) and exc.args[0] == 32: # Skip 'IOError: [Errno 32] Broken pipe': often a cancelling of `less`. skip_it = True if not skip_it: tb_path, tb_lineno, tb_func = traceback.extract_tb(tb)[-1][:3] logging.error("%s (%s:%s in %s)", exc_info[1], tb_path, tb_lineno, tb_func) else: # string exception logging.error(exc_info[0]) if not skip_it: if logging.getLogger().level <= logging.DEBUG: print() traceback.print_exception(*exc_info) sys.exit(1) for i in found: print i.path sys.exit(0) if __name__ == '__main__': main(sys.argv[1:])

Project Versions