Commit c6b0e334 authored by Romain Bignon's avatar Romain Bignon

plugins management

parent 64786695
from .ass2m import Ass2m, NotWorkingDir
from .ass2m import Ass2m
__all__ = ['Ass2m', 'NotWorkingDir']
__all__ = ['Ass2m']
......@@ -17,31 +17,71 @@
import os
from storage import Storage
import re
class NotWorkingDir(Exception): pass
from .storage import Storage
from .plugin import Plugin
class Ass2m(object):
DIRNAME = '.ass2m'
def __init__(self, path):
def __init__(self, path, parser=None):
if isinstance(path, Storage):
storage = path
elif not path:
raise NotWorkingDir()
else:
elif path:
try:
while not self.DIRNAME in os.listdir(path) and path != os.path.dirname(path):
path = os.path.dirname(path)
except OSError:
raise NotWorkingDir()
if path == os.path.dirname(path):
raise NotWorkingDir()
path = None
if path and path != os.path.dirname(path):
storage = Storage(os.path.join(path, self.DIRNAME))
else:
storage = None
storage = Storage(os.path.join(path, self.DIRNAME))
self.parser = parser
self.storage = storage
self.root = os.path.realpath(os.path.join(storage.path, os.path.pardir))
if self.storage:
self.root = os.path.realpath(os.path.join(storage.path, os.path.pardir))
else:
self.root = None
self.load_plugins()
def iter_existing_plugin_names(self):
try:
import plugins
except ImportError:
return
for path in plugins.__path__:
regexp = re.compile('([\w\d_]+).py$')
for f in os.listdir(path):
m = regexp.match(f)
if m and m.group(1) != '__init__':
yield m.group(1)
def load_plugins(self):
self.plugins = {}
for existing_plugin_name in self.iter_existing_plugin_names():
self.load_plugin(existing_plugin_name)
def load_plugin(self, plugin_name):
package_name = 'ass2m.plugins.%s' % plugin_name
package = __import__(package_name, fromlist=[str(package_name)])
klass = None
for attrname in dir(package):
attr = getattr(package, attrname)
if isinstance(attr, type) and issubclass(attr, Plugin) and attr != Plugin:
klass = attr
break
if not klass:
return
plugin = klass(self)
self.plugins[plugin_name] = plugin
@classmethod
def create(cls, path):
return cls(Storage.init(os.path.join(path, cls.DIRNAME)))
def create(self, path):
self.storage = Storage.init(os.path.join(path, self.DIRNAME))
self.root = os.path.realpath(os.path.join(self.storage.path, os.path.pardir))
......@@ -15,155 +15,19 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
import sys
import os
import getpass
import re
import argparse
from ass2m import Ass2m, NotWorkingDir
from users import User
from ass2m import Ass2m
class Ass2mCLI(object):
class CLI(object):
def __init__(self):
try:
self.ass2m = Ass2m(os.getcwd())
except NotWorkingDir:
self.ass2m = None
def process_command(self, argv):
if len(argv) < 2 or not hasattr(self, 'cmd_%s' % argv[1]):
return self.cmd_help([])
func = getattr(self, 'cmd_%s' % argv[1])
return func(argv[2:])
def workdir(func):
def inner(self, *args, **kwargs):
if not self.ass2m:
print >>sys.stderr, 'Error: Not a ass2m working directory.'
print >>sys.stderr, 'Please use "%s init"' % sys.argv[0]
return 1
return func(self, *args, **kwargs)
return inner
def cmd_help(self, args):
print 'Usage: %s cmd [args..]' % sys.argv[0]
print ''
print 'Commands:'
print ' init'
print ' tree'
print ' share PATH [CONTACTS..]'
print ' contacts <add|list|remove>'
@workdir
def cmd_share(self, args):
return
@workdir
def cmd_tree(self, args):
for root, directories, files in os.walk(os.getcwd()):
path = root[len(os.getcwd()):]
depth = path.count('/')
f = self.ass2m.storage.get_file(path)
parent = f.parent()
perms_s = ''
for key, perm in f.perms.iteritems():
if not parent or not key in parent.perms or parent.perms[key] != perm:
perms_s += '%s(%s) ' % (key, f.perm_str(perm))
if len(perms_s) == 0:
continue
print '%-40s %s' % (' ' * depth + path + '/', perms_s)
@workdir
def cmd_contacts(self, args):
if len(args) < 1:
print >>sys.stderr, 'Error: please give a command <list|add|remove>'
return 1
action = args[0]
if action == 'list':
for user in self.ass2m.storage.iter_users():
print '* %s: %s <%s>' % (user.name, user.realname, user.email)
elif len(args) < 2:
print >>sys.stderr, 'Error: please give a username'
return 1
elif action == 'add':
if self.ass2m.storage.user_exists(args[1]):
print >>sys.stderr, 'Error: user %s already exists.' % args[1]
return 1
user = User(self.ass2m.storage, args[1])
user.realname = self.ask('Enter the realname')
user.email = self.ask('Enter the email address')
user.save()
print 'User %s correctly added.' % user.name
elif action == 'remove':
if not self.ass2m.storage.user_exists(args[1]):
print >>sys.stderr, 'Error: user %s does not exist.' % args[1]
return 1
self.ass2m.storage.remove_user(args[1])
print 'User %s has been removed.' % args[1]
else:
print >>sys.stderr, 'Error: Unknown command "%s"' % action
return 1
return 0
def cmd_init(self, args):
if self.ass2m:
print >>sys.stderr, 'Error: %s is already a working directory' % os.getcwd()
return 1
self.ass2m = Ass2m.create(os.getcwd())
print 'Ass2m working directory created.'
def ask(self, question, default=None, masked=False, regexp=None, choices=None):
"""
Ask a question to user.
@param question text displayed (str)
@param default optional default value (str)
@param masked if True, do not show typed text (bool)
@param regexp text must match this regexp (str)
@return entered text by user (str)
"""
is_bool = False
if choices:
question = u'%s (%s)' % (question, '/'.join(
[s for s in (choices.iterkeys() if isinstance(choices, dict) else choices)]))
if default is not None:
if isinstance(default, bool):
question = u'%s (%s/%s)' % (question, 'Y' if default else 'y', 'n' if default else 'N')
choices = ('y', 'n', 'Y', 'N')
default = 'y' if default else 'n'
is_bool = True
else:
question = u'%s [%s]' % (question, default)
if masked:
question = u'%s (hidden input)' % question
question += ': '
correct = False
while not correct:
line = getpass.getpass(question) if masked else raw_input(question)
if not line and default is not None:
line = default
if isinstance(line, str):
line = line.decode('utf-8')
correct = (not regexp or re.match(unicode(regexp), unicode(line))) and \
(not choices or unicode(line) in
[unicode(s) for s in (choices.iterkeys() if isinstance(choices, dict) else choices)])
if is_bool:
return line.lower() == 'y'
else:
return line
self.parser = argparse.ArgumentParser(prog='ass2m')
self.ass2m = Ass2m(os.getcwd(), self.parser)
def main(self, argv):
# TODO use cmd.Cmd to have a REPL application when no command
# is supplied.
args = self.parser.parse_args(argv[1:])
cmd = args.cmd(self.ass2m)
cmd.cmd(args)
# -*- coding: utf-8 -*-
# Copyright(C) 2011 Romain Bignon, Laurent Bachelier
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
import sys
import getpass
import re
class Command(object):
NAME = None
DESCRIPTION = None
ALIASES = ()
WORKDIR = True
def cmd(self, args):
raise NotImplementedError()
@staticmethod
def configure_parser(parser):
return
def __init__(self, ass2m):
self.ass2m = ass2m
def run(self, args):
if self.WORKDIR and not self.ass2m.storage:
print >>sys.stderr, 'Error: Not a ass2m working directory.'
print >>sys.stderr, 'Please use "%s init"' % sys.argv[0]
return 0
return self.cmd(args)
def ask(self, question, default=None, masked=False, regexp=None, choices=None):
"""
Ask a question to user.
@param question text displayed (str)
@param default optional default value (str)
@param masked if True, do not show typed text (bool)
@param regexp text must match this regexp (str)
@return entered text by user (str)
"""
is_bool = False
if choices:
question = u'%s (%s)' % (question, '/'.join(
[s for s in (choices.iterkeys() if isinstance(choices, dict) else choices)]))
if default is not None:
if isinstance(default, bool):
question = u'%s (%s/%s)' % (question, 'Y' if default else 'y', 'n' if default else 'N')
choices = ('y', 'n', 'Y', 'N')
default = 'y' if default else 'n'
is_bool = True
else:
question = u'%s [%s]' % (question, default)
if masked:
question = u'%s (hidden input)' % question
question += ': '
correct = False
while not correct:
line = getpass.getpass(question) if masked else raw_input(question)
if not line and default is not None:
line = default
if isinstance(line, str):
line = line.decode('utf-8')
correct = (not regexp or re.match(unicode(regexp), unicode(line))) and \
(not choices or unicode(line) in
[unicode(s) for s in (choices.iterkeys() if isinstance(choices, dict) else choices)])
if is_bool:
return line.lower() == 'y'
else:
return line
# -*- coding: utf-8 -*-
# Copyright(C) 2011 Romain Bignon, Laurent Bachelier
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
from .cmd import Command
class Plugin(object):
def __init__(self, ass2m):
self.ass2m = ass2m
def register_cli_command(self, *args):
parser = self.ass2m.parser
if not parser:
# not a cli application, does not need to register command.
return
names = args[:-1]
cmd = args[-1]
if isinstance(cmd, type) and issubclass(cmd, Command):
description = cmd.DESCRIPTION
else:
description = cmd
cmd = None
for name in names:
if hasattr(parser, '_subparser_action'):
subparsers = parser._subparser_action
else:
subparsers = parser.add_subparsers()
parser._subparser_action = subparsers
if name in subparsers._name_parser_map:
parser = subparsers._name_parser_map[name]
else:
parser = subparsers.add_parser(name, help=description)
if cmd:
cmd.configure_parser(parser)
parser.set_defaults(cmd=cmd)
# -*- coding: utf-8 -*-
# Copyright(C) 2011 Romain Bignon, Laurent Bachelier
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
import sys
from ass2m.plugin import Plugin
from ass2m.cmd import Command
from ass2m.users import User
class ContactsAddCmd(Command):
DESCRIPTION = 'Add a contact'
WORKDIR = True
@staticmethod
def configure_parser(parser):
parser.add_argument('username')
def cmd(self, args):
if self.ass2m.storage.user_exists(args.username):
print >>sys.stderr, 'Error: user %s already exists.' % args.username
return 1
user = User(self.ass2m.storage, args.username)
user.realname = self.ask('Enter the realname')
user.email = self.ask('Enter the email address')
user.save()
print 'User %s correctly added.' % user.name
class ContactsListCmd(Command):
DESCRIPTION = 'List contacts'
WORKDIR = True
def cmd(self, args):
for user in self.ass2m.storage.iter_users():
print '* %s: %s <%s>' % (user.name, user.realname, user.email)
class ContactsRemoveCmd(Command):
DESCRIPTION = 'Remove a contact'
WORKDIR = True
@staticmethod
def configure_parser(parser):
parser.add_argument('username')
def cmd(self, args):
if not self.ass2m.storage.user_exists(args.username):
print >>sys.stderr, 'Error: user %s does not exist.' % args.username
return 1
self.ass2m.storage.remove_user(args.username)
print 'User %s has been removed.' % args.username
class ContactsPlugin(Plugin):
def __init__(self, ass2m):
self.ass2m = ass2m
self.register_cli_command('contacts', 'Contacts management')
self.register_cli_command('contacts', 'add', ContactsAddCmd)
self.register_cli_command('contacts', 'list', ContactsListCmd)
self.register_cli_command('contacts', 'remove', ContactsRemoveCmd)
# -*- coding: utf-8 -*-
# Copyright(C) 2011 Romain Bignon, Laurent Bachelier
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
import sys
import os
from ass2m import Ass2m
from ass2m.plugin import Plugin
from ass2m.cmd import Command
class TreeCmd(Command):
DESCRIPTION = 'Display the working tree'
WORKDIR = True
def cmd(self, args):
for root, directories, files in os.walk(os.getcwd()):
path = root[len(os.getcwd()):]
depth = path.count('/')
f = self.ass2m.storage.get_file(path)
parent = f.parent()
perms_s = ''
for key, perm in f.perms.iteritems():
if not parent or not key in parent.perms or parent.perms[key] != perm:
perms_s += '%s(%s) ' % (key, f.perm_str(perm))
if len(perms_s) == 0:
continue
print '%-40s %s' % (' ' * depth + path + '/', perms_s)
class InitCmd(Command):
DESCRIPTION = 'Initialize the current directory as working tree'
WORKDIR = False
def cmd(self, args):
if self.ass2m.storage:
print >>sys.stderr, 'Error: %s is already a working directory' % os.getcwd()
return 1
self.ass2m.create(os.getcwd())
print 'Ass2m working directory created.'
class CorePlugin(Plugin):
def __init__(self, ass2m):
self.ass2m = ass2m
self.register_cli_command('tree', TreeCmd)
self.register_cli_command('init', InitCmd)
......@@ -23,7 +23,7 @@ from paste import httpserver
from paste.fileapp import FileApp
from mako.lookup import TemplateLookup
from ass2m import Ass2m, NotWorkingDir
from ass2m import Ass2m
from users import Anonymous
class Ass2mFileApp(FileApp):
......@@ -37,10 +37,7 @@ class Ass2mFileApp(FileApp):
class Actions(object):
def __init__(self, environ, start_response):
try:
self.ass2m = Ass2m(environ.get("ASS2M_ROOT", None))
except NotWorkingDir:
self.ass2m = None
self.ass2m = Ass2m(environ.get("ASS2M_ROOT", None))
self.environ = environ
self.req = Request(environ)
self.start_response = start_response
......@@ -65,7 +62,7 @@ class Actions(object):
def answer(self):
if not self.ass2m:
if not self.ass2m.storage:
return self.error_notworkingdir()
relpath = self.req.path_info
......
......@@ -19,8 +19,8 @@
import sys
from ass2m.cli import Ass2mCLI
from ass2m.cli import CLI
if __name__ == '__main__':
app = Ass2mCLI()
sys.exit(app.process_command(sys.argv))
app = CLI()
sys.exit(app.main(sys.argv))
#!/bin/bash
if [ `basename $PWD` == 'tools' ]; then
cd ..
fi
# grep will return 0 only if it founds something, but our script
# wants to return 0 when it founds nothing!
pyflakes ass2m | grep -v redefinition && exit 1 || exit 0
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment