Commit eff620d0 authored by Romain Bignon's avatar Romain Bignon

remove the Ass2m class and introduce the Butt one

parent c5a2899a
from .ass2m import Ass2m
__all__ = ['Ass2m']
......@@ -19,42 +19,16 @@
import os
import re
from .storage import Storage
from .plugin import Plugin
__all__ = ['Ass2m']
__all__ = ['Butt']
class Ass2m(object):
DIRNAME = '.ass2m'
VERSION = '0.1-dev'
COPYRIGHT = 'Copyright(C) 2011 Romain Bignon, Laurent Bachelier'
def __init__(self, path, parser=None, router=None):
if isinstance(path, Storage):
storage = path
elif path:
try:
while not os.path.isdir(os.path.join(path, self.DIRNAME)) and path != os.path.dirname(path):
path = os.path.dirname(path)
except OSError:
path = None
if path and path != os.path.dirname(path):
storage = Storage(os.path.join(path, self.DIRNAME))
else:
storage = None
else:
storage = None
class Butt(object):
def __init__(self, parser=None, router=None):
self.parser = parser
self.router = router
self.storage = storage
if self.storage:
self.root = os.path.realpath(os.path.join(storage.path, os.path.pardir))
else:
self.root = None
self.load_plugins()
def iter_existing_plugin_names(self):
......@@ -91,7 +65,3 @@ class Ass2m(object):
plugin = klass(self)
plugin.init()
self.plugins[plugin_name] = plugin
def create(self, path):
self.storage = Storage.init(os.path.join(path, self.DIRNAME))
self.root = os.path.realpath(os.path.join(self.storage.path, os.path.pardir))
......@@ -17,7 +17,9 @@
import argparse
from ass2m import Ass2m
from .storage import Storage
from .butt import Butt
from .version import VERSION, COPYRIGHT
__all__ = ['CLI']
......@@ -27,16 +29,17 @@ class CLI(object):
def __init__(self, working_dir):
self.working_dir = working_dir
self.parser = argparse.ArgumentParser(prog='ass2m')
self.ass2m = Ass2m(self.working_dir, parser=self.parser)
self.butt = Butt(parser=self.parser)
self.parser.add_argument('-V', '--version', action='version',
version='%(prog)s ' + self.ass2m.VERSION +
' ' + self.ass2m.COPYRIGHT)
version='%(prog)s ' + VERSION + ' ' + COPYRIGHT)
def main(self, argv):
# TODO use cmd.Cmd to have a REPL application when no command
# is supplied.
storage = Storage.lookup(self.working_dir)
args = self.parser.parse_args(argv[1:])
cmd = args.cmd(self.ass2m, self.working_dir)
cmd = args.cmd(storage, self.working_dir)
try:
return cmd.run(args)
except KeyboardInterrupt:
......
......@@ -98,12 +98,12 @@ class Command(ConsolePart):
def configure_parser(parser):
return
def __init__(self, ass2m, working_dir):
self.ass2m = ass2m
def __init__(self, storage, working_dir):
self.storage = storage
self.working_dir = working_dir
def run(self, args):
if self.WORKDIR and not self.ass2m.storage:
if self.WORKDIR and not self.storage:
print >>sys.stderr, 'Error: Not a ass2m working directory.'
print >>sys.stderr, 'Please use "%s init"' % sys.argv[0]
return 1
......
......@@ -23,8 +23,8 @@ __all__ = ['Plugin']
class Plugin(object):
def __init__(self, ass2m):
self.ass2m = ass2m
def __init__(self, butt):
self.butt = butt
def init(self):
pass
......@@ -33,7 +33,7 @@ class Plugin(object):
pass
def register_cli_command(self, *args):
parser = self.ass2m.parser
parser = self.butt.parser
if not parser:
# not a cli application, does not need to register a command.
return
......@@ -67,7 +67,7 @@ class Plugin(object):
parser.set_defaults(cmd=cmd)
def register_web_action(self, route, action):
router = self.ass2m.router
router = self.butt.router
if not router:
# not a web application, does not need to register an action.
return
......
......@@ -18,9 +18,9 @@
import sys
from copy import copy
from ass2m import Ass2m
from ass2m.plugin import Plugin
from ass2m.cmd import Command, ConsolePart
from ass2m.storage import Storage
from ass2m.users import User
from ass2m.routes import Route
......@@ -32,19 +32,19 @@ __all__ = ['ContactsManagement', 'ContactsSelection', 'ContactsPlugin']
class ContactsManagement(ConsolePart):
def __init__(self, ass2m):
self.ass2m = ass2m
self.users = list(self.ass2m.storage.iter_users())
def __init__(self, storage):
self.storage = storage
self.users = list(self.storage.iter_users())
def add_contact(self, username=None):
try:
if username is None:
username = self.ask('Enter the username', regexp='^\w+$')
if self.ass2m.storage.user_exists(username):
if self.storage.user_exists(username):
print >>sys.stderr, 'Error: user %s already exists.' % username
return None
user = User(self.ass2m.storage, username)
user = User(self.storage, username)
self.edit_contact(user)
self.users.append(user)
print 'User %s correctly added.' % user.name
......@@ -54,10 +54,10 @@ class ContactsManagement(ConsolePart):
def edit_contact_password(self, username):
try:
if not self.ass2m.storage.user_exists(username):
if not self.storage.user_exists(username):
print >>sys.stderr, 'Error: user %s does not exists.' % username
return None
user = self.ass2m.storage.get_user(username)
user = self.storage.get_user(username)
password1 = self.ask('Enter the new password', masked=True)
password2 = self.ask('Confirm the new password', masked=True)
if len(password1) == 0:
......@@ -118,8 +118,8 @@ class ContactsManagement(ConsolePart):
user.realname, user.email)
class ContactsSelection(ContactsManagement):
def __init__(self, ass2m, sel_users=[], sel_groups=[]):
ContactsManagement.__init__(self, ass2m)
def __init__(self, storage, sel_users=[], sel_groups=[]):
ContactsManagement.__init__(self, storage)
self.sel_users = copy(sel_users)
self.sel_groups = copy(sel_groups)
......@@ -147,7 +147,7 @@ class ContactsAddCmd(Command):
parser.add_argument('username')
def cmd(self, args):
cm = ContactsManagement(self.ass2m)
cm = ContactsManagement(self.storage)
if not cm.add_contact(args.username):
return 1
......@@ -159,7 +159,7 @@ class ContactsPasswordCmd(Command):
parser.add_argument('username')
def cmd(self, args):
cm = ContactsManagement(self.ass2m)
cm = ContactsManagement(self.storage)
if not cm.edit_contact_password(args.username):
return 1
......@@ -167,7 +167,7 @@ class ContactsMenuCmd(Command):
DESCRIPTION = 'Display the contacts menu'
def cmd(self, args):
cm = ContactsManagement(self.ass2m)
cm = ContactsManagement(self.storage)
cm.main()
class ContactsMergeCmd(Command):
......@@ -178,13 +178,13 @@ class ContactsMergeCmd(Command):
parser.add_argument('workdir')
def cmd(self, args):
ass2m = Ass2m(args.workdir)
if not ass2m.storage:
storage = Storage.lookup(args.workdir)
if not storage:
print >>sys.stderr, 'Error: Path "%s" is not a working directory' % args.workdir
return 1
for user in ass2m.storage.iter_users():
user.storage = self.ass2m.storage
for user in storage.iter_users():
user.storage = self.storage
user.save()
print 'Imported %s (%s <%s>)' % (user.name, user.realname, user.email)
......@@ -193,7 +193,7 @@ class ContactsListCmd(Command):
WORKDIR = True
def cmd(self, args):
cm = ContactsManagement(self.ass2m)
cm = ContactsManagement(self.storage)
cm.print_users()
class ContactsRemoveCmd(Command):
......@@ -205,11 +205,11 @@ class ContactsRemoveCmd(Command):
parser.add_argument('username')
def cmd(self, args):
if not self.ass2m.storage.user_exists(args.username):
if not self.storage.user_exists(args.username):
print >>sys.stderr, 'Error: user %s does not exist.' % args.username
return 1
self.ass2m.storage.get_user(args.username).remove()
self.storage.get_user(args.username).remove()
print 'User %s has been removed.' % args.username
......@@ -218,7 +218,7 @@ class LoginAction(Action):
form_username = self.ctx.req.str_POST.get('username')
form_password = self.ctx.req.str_POST.get('password')
if form_username and form_password:
user = self.ctx.ass2m.storage.get_user(form_username)
user = self.ctx.storage.get_user(form_username)
if user and user.is_valid_password(form_password):
self.ctx.res = HTTPFound(location=self.ctx.url.href)
# set cookie
......
......@@ -21,6 +21,7 @@ import re
from ass2m.plugin import Plugin
from ass2m.cmd import Command
from ass2m.storage import Storage
from ass2m.routes import Route
from ass2m.server import Action
......@@ -36,11 +37,11 @@ class InitCmd(Command):
WORKDIR = False
def cmd(self, args):
if self.ass2m.storage:
if self.storage:
print >>sys.stderr, 'Error: %s is already a working directory' % self.working_dir
return 1
self.ass2m.create(self.working_dir)
self.storage = Storage.create(self.working_dir)
print 'Ass2m working directory created.'
class TreeCmd(Command):
......@@ -48,7 +49,7 @@ class TreeCmd(Command):
WORKDIR = True
def print_perms(self, path, depth):
f = self.ass2m.storage.get_file(path)
f = self.storage.get_file(path)
parent = f.parent()
perms_s = ''
......@@ -63,7 +64,7 @@ class TreeCmd(Command):
def cmd(self, args):
for root, directories, files in os.walk(self.working_dir):
path = root[len(self.ass2m.root):]
path = root[len(self.storage.root):]
depth = path.count('/')
self.print_perms(path + '/', depth)
......@@ -85,7 +86,7 @@ class PermsCmd(Command):
print >>sys.stderr, 'Error: Path "%s" does not exist.' % args.path
return 1
f = self.ass2m.storage.get_file_from_realpath(os.path.realpath(args.path))
f = self.storage.get_file_from_realpath(os.path.realpath(args.path))
if not f:
print >>sys.stderr, 'Error: Path "%s" is not in working directory.' % args.path
return 1
......@@ -93,7 +94,7 @@ class PermsCmd(Command):
if '.' in args.who:
t, who = args.who.split('.', 1)
if t == 'u':
if not self.ass2m.storage.user_exists(who):
if not self.storage.user_exists(who):
print >>sys.stderr, 'Error: User "%s" does not exist.' % who
return 1
elif t == 'g':
......
......@@ -144,6 +144,12 @@ class Event(object):
for username in sorted(self.users.keys(), key=unicode.lower):
yield self.f.storage.get_user(username)
def remove_user(self, username):
self.users.pop(username)
def add_user(self, username):
self.users[username] = self.USER_WAITING
class EventCmd(Command):
DESCRIPTION = 'Create or edit an event'
WORKDIR = True
......@@ -153,7 +159,7 @@ class EventCmd(Command):
parser.add_argument('filename')
def cmd(self, args):
f = self.ass2m.storage.get_file_from_realpath(args.filename)
f = self.storage.get_file_from_realpath(args.filename)
if not f:
print >>sys.stderr, 'Error: file "%s" is not in the working tree.' % args.filename
return 1
......@@ -231,12 +237,12 @@ class EventCmd(Command):
event.place = self.ask('Enter a place', default=event.place)
def edit_attendees(self, event):
cs = ContactsSelection(self.ass2m, event.users.keys())
cs = ContactsSelection(self.storage, event.users.keys())
cs.main()
for deleted in (set(event.users.keys()) - set(cs.sel_users)):
event.users.pop(deleted)
event.remove_user(deleted)
for added in set(cs.sel_users) - set(event.users.keys()):
event.users[added] = event.USER_WAITING
event.add_user(added)
class EventAction(Action):
def answer(self):
......
......@@ -28,7 +28,9 @@ from webob.exc import HTTPFound, HTTPNotFound, HTTPForbidden
from paste.url import URL
import urlparse
from ass2m import Ass2m
from .butt import Butt
from .storage import Storage
from .version import VERSION
from .users import Anonymous
from .routes import Router
......@@ -38,10 +40,9 @@ class Context(object):
'/usr/share/ass2m',
'/usr/local/share/ass2m']
def __init__(self, environ, start_response):
self._init_routing()
self.ass2m = Ass2m(environ.get("ASS2M_ROOT"), router=self.router)
def __init__(self, router, environ, start_response):
self.router = router
self.storage = Storage.lookup(environ.get("ASS2M_ROOT"))
self._environ = environ
self._start_response = start_response
self.req = Request(environ)
......@@ -62,8 +63,8 @@ class Context(object):
if path in ('.', '/'):
path = ''
if self.ass2m.root:
f = self.ass2m.storage.get_file(path)
if self.storage:
f = self.storage.get_file(path)
else:
f = None
......@@ -86,19 +87,11 @@ class Context(object):
# Root application URL (for links to special actions)
self.root_url = URL(urlparse.urlparse(self.req.application_url).path)
def _init_routing(self):
router = Router()
router.set_default_view(None, "html")
router.set_default_view("download", "raw")
router.set_default_action("file", "download")
router.set_default_action("directory", "list")
self.router = router
def _init_cookie_secret(self):
if not self.ass2m.storage:
if not self.storage:
return
config = self.ass2m.storage.get_config()
config = self.storage.get_config()
self.cookie_secret = config.data["web"].get("cookie_secret")
if self.cookie_secret is None:
self.cookie_secret = hexlify(new_secret())
......@@ -118,7 +111,7 @@ class Context(object):
def _init_template_vars(self):
self.template_vars = {
'ass2m_version': Ass2m.VERSION,
'ass2m_version': VERSION,
'path': self.path or "/",
'url': self.url,
'root_url': self.root_url,
......@@ -175,13 +168,13 @@ class Dispatcher(Action):
cookie = self.ctx.req.str_cookies.get('ass2m_auth')
user = cookie and signer.auth(cookie)
if user:
self.ctx.user = self.ctx.ass2m.storage.get_user(user)
self.ctx.user = self.ctx.storage.get_user(user)
def answer(self):
ctx = self.ctx
router = ctx.router
if not ctx.ass2m.storage:
if not ctx.storage:
return self.error_notworkingdir()
self._authenticate()
......@@ -246,6 +239,15 @@ class Server(object):
will be used.
"""
self.root = root
self.butt = Butt(router=self._create_router())
def _create_router(self):
router = Router()
router.set_default_view(None, "html")
router.set_default_view("download", "raw")
router.set_default_action("file", "download")
router.set_default_action("directory", "list")
return router
def bind(self, hostname, port):
httpserver.serve(self.process, host=hostname, port=str(port))
......@@ -256,8 +258,7 @@ class Server(object):
"""
if self.root:
environ.setdefault("ASS2M_ROOT", self.root)
ctx = Context(environ, start_response)
ctx = Context(self.butt.router, environ, start_response)
dispatcher = Dispatcher(ctx)
dispatcher.answer()
return ctx.respond()
......@@ -34,12 +34,34 @@ class GlobalConfig(IObject):
class Storage(object):
DIRNAME = '.ass2m'
def __init__(self, path):
self.path = path
@property
def root(self):
return os.path.realpath(os.path.join(self.path, os.path.pardir))
@classmethod
def lookup(cls, path):
if not path:
return None
try:
while not os.path.isdir(os.path.join(path, cls.DIRNAME)) and path != os.path.dirname(path):
path = os.path.dirname(path)
except OSError:
path = None
if path and path != os.path.dirname(path):
return cls(os.path.join(path, cls.DIRNAME))
else:
return None
@classmethod
def init(cls, path):
storage = cls(path)
def create(cls, path):
storage = cls(os.path.join(path, cls.DIRNAME))
# Default perms on /.ass2m
f = File(storage, '/.ass2m')
......
# -*- coding: utf-8 -*-
# Copyright(C) 2011 Romain Bignon, Laurent Bachelier
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
__all__ = ['VERSION', 'COPYRIGHT']
VERSION = '0.1-dev'
COPYRIGHT = 'Copyright(C) 2011 Romain Bignon, Laurent Bachelier'
from __future__ import with_statement
from ass2m.ass2m import Ass2m
from ass2m.storage import Storage
from ass2m.server import Server
from unittest import TestCase
......@@ -15,8 +15,7 @@ from datetime import datetime
class ApiTest(TestCase):
def setUp(self):
self.root = mkdtemp(prefix='ass2m_test_root')
ass2m = Ass2m(self.root)
ass2m.create(self.root)
Storage.create(self.root)
server = Server(self.root)
self.app = TestApp(server.process)
......
from ass2m.ass2m import Ass2m
from unittest import TestCase
from tempfile import mkdtemp
import os
import shutil
class Ass2mTest(TestCase):
def setUp(self):
self.root = mkdtemp(prefix='ass2m_test_root')
def tearDown(self):
if self.root:
shutil.rmtree(self.root)
def test_createAndLoad(self):
# test creation
ass2m = Ass2m(self.root)
assert ass2m.storage is None
assert ass2m.root is None
ass2m.create(self.root)
assert ass2m.storage is not None
assert ass2m.root is not None
assert os.path.isdir(ass2m.root)
assert os.path.samefile(ass2m.root, self.root)
assert isinstance(ass2m.storage.get_config().data, dict)
# test loading of an existing dir
ass2m = Ass2m(self.root)
assert ass2m.storage is not None
assert ass2m.root is not None
assert os.path.samefile(ass2m.root, self.root)
assert isinstance(ass2m.storage.get_config().data, dict)
def test_globalConfig(self):
# create and set settings
ass2m = Ass2m(self.root)
ass2m.create(self.root)
cfg = ass2m.storage.get_config()
cfg.data["penguins"]["cute"] = "yes"
cfg.save()
# were they saved properly?
ass2m = Ass2m(self.root)
assert ass2m.storage.get_config().data["penguins"].get("cute") == "yes"
from __future__ import with_statement
from ass2m.ass2m import Ass2m
from ass2m.storage import Storage
from ass2m.server import Server, Context
from unittest import TestCase
......@@ -13,8 +13,7 @@ import shutil
class AssetsTest(TestCase):
def setUp(self):
self.root = mkdtemp(prefix='ass2m_test_root')
ass2m = Ass2m(self.root)
ass2m.create(self.root)
storage = Storage.create(self.root)
server = Server(self.root)
self.app = TestApp(server.process)
datapath = os.path.join(self.root, 'test_data')
......
from __future__ import with_statement
from ass2m.ass2m import Ass2m
from ass2m.storage import Storage
from ass2m.server import Server
from unittest import TestCase
......@@ -13,8 +13,7 @@ import shutil
class BaseWebTest(TestCase):
def setUp(self):
self.root = mkdtemp(prefix='ass2m_test_root')
ass2m = Ass2m(self.root)
ass2m.create(self.root)
Storage.create(self.root)
server = Server(self.root)
self.app = TestApp(server.process)
......
from __future__ import with_statement
from ass2m.ass2m import Ass2m
from ass2m.storage import Storage
from ass2m.server import Server
from ass2m.users import User
......@@ -14,12 +14,11 @@ import shutil
class EventTest(TestCase):
def setUp(self):
self.root = mkdtemp(prefix='ass2m_test_root')
ass2m = Ass2m(self.root)
ass2m.create(self.root)
storage = Storage.create(self.root)
server = Server(self.root)
self.app = TestApp(server.process)
user = User(ass2m.storage, 'penguin')
user = User(storage, 'penguin')
user.realname = 'Penguin'
user.save()
......@@ -42,7 +41,7 @@ Attendees:
with open(os.path.join(self.root, 'event1.txt'), 'w') as f:
f.write(event_text)
f = ass2m.storage.get_file('/event1.txt')
f = storage.get_file('/event1.txt')
f.view = 'event'
f.save()
......
from __future__ import with_statement
from ass2m.ass2m import Ass2m
from ass2m.storage import Storage
from ass2m.server import Server
from unittest import TestCase
......@@ -15,8 +15,7 @@ import shutil
class GalleryTest(TestCase):
def setUp(self):
self.root = mkdtemp(prefix='ass2m_test_root')
self.ass2m = Ass2m(self.root)
self.ass2m.create(self.root)
self.storage = Storage.create(self.root)
server = Server(self.root)
self.app = TestApp(server.process)
os.mkdir(os.path.join(self.root, 'images'))
......@@ -47,7 +46,7 @@ class GalleryTest(TestCase):
assert 'nothing' in res.body
res1 = self.app.get('/images/?view=gallery')
f = self.ass2m.storage.get_file('/images')
f = self.storage.get_file('/images')
f.view = 'gallery'
f.save()
......
from __future__ import with_statement
from ass2m.ass2m import Ass2m
from ass2m.storage import Storage
from ass2m.server import Server
from ass2m.users import User
......@@ -13,9 +13,8 @@ import shutil
class LoginTest(TestCase):
def setUp(self):
self.root = mkdtemp(prefix='ass2m_test_root')
ass2m = Ass2m(self.root)
ass2m.create(self.root)
user = User(ass2m.storage, 'penguin')
storage = Storage.create(self.root)
user = User(storage, 'penguin')
user.realname = 'Penguin'
user.password = 'monkey1'
user.save()
......
from __future__ import with_statement
from ass2m.ass2m import Ass2m
from ass2m.storage import Storage
from ass2m.server import Server
from unittest import TestCase
......@@ -19,8 +19,7 @@ class RootDirTest(TestCase):
self.not_a_wd = os.path.join(self.root, 'not_a_wd')
os.mkdir(self.a_wd)
os.mkdir(self.not_a_wd)
ass2m = Ass2m(self.a_wd)