Projects
Kolab:16:TestingLinked
pykolab
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
Expand all
Collapse all
Changes of Revision 10
View file
pykolab.spec
Changed
@@ -37,25 +37,33 @@ Source0: pykolab-0.8.1.tar.gz Source1: pykolab.logrotate +Patch0001: 0001-Use-the-correct-constants-import-and-__version__-val.patch +Patch0002: 0002-ID-directly-after-authentication-before-asking-for-a.patch + BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root BuildArch: noarch + %if 0%{?suse_version} BuildRequires: autoconf BuildRequires: automake BuildRequires: fdupes %endif + %if 0%{?fedora} > 21 BuildRequires: future %endif + BuildRequires: gcc BuildRequires: gettext BuildRequires: glib2-devel BuildRequires: intltool + %if 0%{?suse_version} BuildRequires: python-mysql %else BuildRequires: MySQL-python %endif + BuildRequires: python BuildRequires: python-augeas BuildRequires: python-gnupg @@ -64,20 +72,25 @@ BuildRequires: python-kolabformat BuildRequires: python-ldap BuildRequires: python-nose +BuildRequires: python-pep8 BuildRequires: python-pyasn1 BuildRequires: python-pyasn1-modules + %if 0%{?suse_version} BuildRequires: python-pytz %else BuildRequires: pytz %endif + BuildRequires: python-sievelib BuildRequires: python-sqlalchemy BuildRequires: python-twisted-core + %if 0%{?fedora} >= 21 # Fedora 21 has qca2 and qca, qca2 has been renamed to qca, required by kdelibs -BuildRequires: qca +BuildRequires: qca %endif + Requires: kolab-cli = %{version}-%{release} Requires: python-ldap >= 2.4 Requires: python-pyasn1 @@ -219,6 +232,9 @@ %prep %setup -q +%patch0001 -p1 +%patch0002 -p1 + %build autoreconf -v || automake --add-missing && autoreconf -v %configure
View file
0001-Use-the-correct-constants-import-and-__version__-val.patch
Added
@@ -0,0 +1,35 @@ +From f251c62827c1696dd2e0369800c82cc569c57011 Mon Sep 17 00:00:00 2001 +From: "Jeroen van Meeuwen (Kolab Systems)" <vanmeeuwen@kolabsys.com> +Date: Fri, 1 Jul 2016 16:31:50 +0200 +Subject: PATCH 1/2 Use the correct constants import and __version__ value + from it + +--- + pykolab/imap/cyrus.py | 4 ++-- + 1 file changed, 2 insertions(+), 2 deletions(-) + +diff --git a/pykolab/imap/cyrus.py b/pykolab/imap/cyrus.py +index 63b0615..2057a38 100644 +--- a/pykolab/imap/cyrus.py ++++ b/pykolab/imap/cyrus.py +@@ -25,7 +25,7 @@ from urlparse import urlparse + + import pykolab + +-from pykolab.constants import * ++from pykolab import constants + from pykolab.imap import IMAP + from pykolab.translate import _ + +@@ -278,7 +278,7 @@ class Cyrus(cyruslib.CYRUS): + + def _id(self, identity=None): + if identity is None: +- identity = '("name" "Python/Kolab" "version" "%s")' % (__version__) ++ identity = '("name" "Python/Kolab" "version" "%s")' % (constants.__version__) + + typ, dat = self.m._simple_command('ID', identity) + res, dat = self.m._untagged_response(typ, dat, 'ID') +-- +2.5.5 +
View file
0002-ID-directly-after-authentication-before-asking-for-a.patch
Added
@@ -0,0 +1,80 @@ +From 132ea08148dfb903af3bffa1a2f74c7145a6e504 Mon Sep 17 00:00:00 2001 +From: "Jeroen van Meeuwen (Kolab Systems)" <vanmeeuwen@kolabsys.com> +Date: Mon, 4 Jul 2016 09:36:48 +0200 +Subject: PATCH 2/2 ID directly after authentication, before asking for admin + or hierarchy separators + +--- + cyruslib.py | 12 ++++++++---- + 1 file changed, 8 insertions(+), 4 deletions(-) + +diff --git a/cyruslib.py b/cyruslib.py +index 3802c00..9e42f39 100644 +--- a/cyruslib.py ++++ b/cyruslib.py +@@ -130,7 +130,7 @@ class IMAP4(imaplib.IMAP4): + + def id(self): + try: +- typ, dat = self._simple_command('ID', 'NIL') ++ typ, dat = self._simple_command('ID', '("name" "PyKolab/Kolab")') + res, dat = self._untagged_response(typ, dat, 'ID') + except: + return False, dat0 +@@ -211,7 +211,7 @@ class IMAP4_SSL(imaplib.IMAP4_SSL): + + def id(self): + try: +- typ, dat = self._simple_command('ID', 'NIL') ++ typ, dat = self._simple_command('ID', '("name" "PyKolab/Kolab")') + res, dat = self._untagged_response(typ, dat, 'ID') + except: + return False, dat0 +@@ -275,6 +275,7 @@ class IMAP4_SSL(imaplib.IMAP4_SSL): + encoded = b2a_base64("%s\0%s\0%s" % (admin, admin, password)).strip() + + res, data = self._simple_command('AUTHENTICATE', 'PLAIN', encoded) ++ self.AUTH = True + if ok(res): + self.state = 'AUTH' + return res, data +@@ -409,8 +410,11 @@ class CYRUS: + self.__doexception("LOGIN", self.ERROR.get("AUTH")1) + try: + res, msg = self.m.login(username, password) ++ self.AUTH = True ++ self.id() + admin = self.m.isadmin() + except Exception, info: ++ self.AUTH = False + error = str(info).split(':').pop().strip() + self.__doexception("LOGIN", error) + +@@ -418,7 +422,6 @@ class CYRUS: + self.ADMIN = username + + self.SEP = self.m.getsep() +- self.AUTH = True + self.__verbose( 'LOGIN %s %s: %s' % (username, res, msg0) ) + + def login_plain(self, username, password, asUser = None): +@@ -430,6 +433,8 @@ class CYRUS: + self.__verbose( 'AUTHENTICATE PLAIN %s %s: %s' % (username, res, msg0) ) + + if ok(res): ++ self.AUTH = True ++ self.id() + if asUser is None: + if self.m.isadmin(): + self.ADMIN = admin +@@ -437,7 +442,6 @@ class CYRUS: + self.ADMIN = asUser + self.AUSER = asUser + self.SEP = self.m.getsep() +- self.AUTH = True + + def logout(self): + try: +-- +2.5.5 +
View file
debian.series
Changed
@@ -1,1 +1,3 @@ cyrus-imapd.conf-cert-paths.patch -p1 +0001-Use-the-correct-constants-import-and-__version__-val.patch -p1 +0002-ID-directly-after-authentication-before-asking-for-a.patch -p1
View file
pykolab-0.8.1.tar.gz/.arclint
Changed
@@ -11,7 +11,18 @@ "include": "(\\.py$)", "severity": { "E121": "disabled", - "E126": "disabled" + "E126": "disabled", + "E201": "disabled", + "E202": "disabled", + "E231": "disabled", + "E251": "disabled", + "E302": "disabled", + "E303": "disabled", + "E402": "disabled", + "E501": "disabled", + "E711": "disabled", + "E712": "disabled", + "E713": "disabled" } }, "spelling" : {
View file
pykolab-0.8.1.tar.gz/.gitignore
Changed
@@ -26,4 +26,4 @@ pykolab/constants.py pykolab-0-9*.*/ src/ -test-* +./test-*
View file
pykolab-0.8.1.tar.gz/bin/kolab_smtp_access_policy.py
Changed
@@ -38,6 +38,7 @@ from sqlalchemy import String from sqlalchemy import Table from sqlalchemy import Sequence +from sqlalchemy import PickleType from sqlalchemy import create_engine from sqlalchemy.orm import mapper @@ -88,11 +89,12 @@ Column('id', Integer, Sequence('seq_id_result'), primary_key=True), Column('key', String(16), nullable=False), Column('value', Boolean, nullable=False), - Column('sender', String(64), nullable=False), + Column('sender', String(64), nullable=True), Column('recipient', String(64), nullable=False), Column('sasl_username', String(64)), Column('sasl_sender', String(64)), Column('created', Integer, nullable=False), + Column('data', PickleType, nullable=True), ) Index( @@ -113,7 +115,8 @@ sender=None, recipient=None, sasl_username=None, - sasl_sender=None + sasl_sender=None, + data=None ): self.key = key @@ -123,6 +126,7 @@ self.sasl_sender = sasl_sender self.recipient = recipient self.created = (int)(time.time()) + self.data = data mapper(PolicyResult, policy_result_table) @@ -377,7 +381,7 @@ for rule in rules'allow': deny_override = False - if _object.endswith(rule): + if _object is not None and _object.endswith(rule): for deny_rule in rules'deny': if deny_rule.endswith(rule): deny_override = True @@ -389,7 +393,7 @@ for rule in rules'deny': allow_override = False - if _object.endswith(rule): + if _object is not None and _object.endswith(rule): if not allowed: denied = True continue @@ -1061,8 +1065,11 @@ if recipient == record.recipient: recipient_found = True - if recipient_found and not record.value: - reject(_("Sender %s is not allowed to send to recipient %s") % (self.sender,recipient)) + if recipient_found: + if not record.value: + reject(_("Sender %s is not allowed to send to recipient %s") % (self.sender,recipient)) + if record.data is not None: + self.__dict__.update(record.data) return True @@ -1152,13 +1159,20 @@ sender_verified = True if not cache == False: + data = { + 'sasl_user_uses_alias': self.sasl_user_uses_alias, + 'sasl_user_is_delegate': self.sasl_user_is_delegate, + 'sender_domain': self.sender_domain, + } + record_id = cache_update( function='verify_sender', sender=self.sender, recipients=self.recipients, result=(int)(sender_verified), sasl_username=self.sasl_username, - sasl_sender=self.sasl_sender + sasl_sender=self.sasl_sender, + data=data ) return sender_verified @@ -1251,7 +1265,8 @@ recipients=, result=None, sasl_username='', - sasl_sender='' + sasl_sender='', + data=None ): if not cache == True: @@ -1275,7 +1290,8 @@ sender=sender, recipient=recipient, sasl_username=sasl_username, - sasl_sender=sasl_sender + sasl_sender=sasl_sender, + data=data ) ) @@ -1288,7 +1304,8 @@ recipients=, result=None, sasl_username='', - sasl_sender='' + sasl_sender='', + data=None ): """ @@ -1328,7 +1345,8 @@ recipient=recipient, result=result, sasl_username=sasl_username, - sasl_sender=sasl_sender + sasl_sender=sasl_sender, + data=data ) def defer_if_permit(message, policy_request=None): @@ -1412,8 +1430,11 @@ # Note that using an encryption key voids the actual use of proper Sender # and X-Sender headers such as they could be interpreted by a client # application. - enc_key = conf.get(policy_request.sender_domain, 'sender_header_enc_key') - if enc_key == None: + enc_key = None + if policy_request.sender_domain is not None: + enc_key = conf.get(policy_request.sender_domain, 'sender_header_enc_key') + + if enc_key is None: enc_key = conf.get('kolab_smtp_access_policy', 'sender_header_enc_key') sender_header = None @@ -1623,12 +1644,6 @@ ) if not protocol_state == 'data': - log.debug( - _("Request instance %s is not yet in DATA state") % ( - instance - ) - ) - print "action=DUNNO\n\n" sys.stdout.flush() @@ -1636,8 +1651,6 @@ # set to a non-zero value and the protocol_state being set to 'data'. # Note that the input we're getting is a string, not an integer. else: - log.debug(_("Request instance %s reached DATA state") % (instance)) - sender_allowed = False recipient_allowed = False
View file
pykolab-0.8.1.tar.gz/conf.py
Changed
@@ -22,7 +22,6 @@ Kolab configuration utility. """ - import logging import os import sys @@ -44,4 +43,3 @@ pykolab = pykolab.Conf() pykolab.finalize_conf() pykolab.run() -
View file
pykolab-0.8.1.tar.gz/configure.ac
Changed
@@ -1,4 +1,4 @@ -AC_INIT(pykolab, 0.8.0) +AC_INIT(pykolab, 0.8.1) AC_SUBST(RELEASE, 1) AC_CONFIG_SRCDIR(pykolab/constants.py.in)
View file
pykolab-0.8.1.tar.gz/cyruslib.py
Changed
@@ -382,7 +382,7 @@ if ok(res): return res, msg except Exception, info: - error = info.args0.split(':').pop().strip() + error = str(info).split(':').pop().strip() if error.upper().startswith('BAD'): error = error.split('BAD', 1).pop().strip() error = unquote(error1:-1, '\'') @@ -411,7 +411,7 @@ res, msg = self.m.login(username, password) admin = self.m.isadmin() except Exception, info: - error = info.args0.split(':').pop().strip() + error = str(info).split(':').pop().strip() self.__doexception("LOGIN", error) if admin: @@ -443,7 +443,7 @@ try: res, msg = self.m.logout() except Exception, info: - error = info.args0.split(':').pop().strip() + error = str(info).split(':').pop().strip() self.__doexception("LOGOUT", error) self.AUTH = False self.ADMIN = None
View file
pykolab-0.8.1.tar.gz/kolab-cli.py
Changed
@@ -38,4 +38,3 @@ if __name__ == "__main__": kolab = Cli() kolab.run() -
View file
pykolab-0.8.1.tar.gz/kolabd.py
Changed
@@ -37,4 +37,3 @@ if __name__ == "__main__": kolabd = kolabd.KolabDaemon() kolabd.run() -
View file
pykolab-0.8.1.tar.gz/kolabd/__init__.py
Changed
@@ -1,4 +1,4 @@ -# Copyright 2010-2013 Kolab Systems AG (http://www.kolabsys.com) +# Copyright 2010-2016 Kolab Systems AG (http://www.kolabsys.com) # # Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com> # @@ -40,6 +40,7 @@ log = pykolab.getLogger('pykolab.daemon') conf = pykolab.getConf() + class KolabDaemon(object): def __init__(self): """ @@ -156,7 +157,6 @@ sys.exit(1) - # Set real and effective user if not the same as current. if not user_uid == ruid: log.debug( @@ -217,8 +217,8 @@ except AttributeError, errmsg: exitcode = 1 traceback.print_exc() - print >> sys.stderr, _("Traceback occurred, please report a " + \ - "bug at https://issues.kolab.org") + print >> sys.stderr, _("Traceback occurred, please report a " + + "bug at https://issues.kolab.org") except TypeError, errmsg: exitcode = 1 @@ -228,8 +228,8 @@ except: exitcode = 2 traceback.print_exc() - print >> sys.stderr, _("Traceback occurred, please report a " + \ - "bug at https://issues.kolab.org") + print >> sys.stderr, _("Traceback occurred, please report a " + + "bug at https://issues.kolab.org") sys.exit(exitcode) @@ -276,8 +276,8 @@ domain_base_dn = primary_auth.domain_naming_context(primary_domain) log.debug(_("Domain Base DN for domain %r is %r") % (primary_domain, domain_base_dn), level=8) - if not domain_base_dn == None: - if not domain_base_dn in domain_base_dns: + if domain_base_dn is not None: + if domain_base_dn not in domain_base_dns: domain_base_dns.append(domain_base_dn) primary_domain = primary_auth.primary_domain_for_naming_context(domain_base_dn) primary_domains.append(primary_domain) @@ -366,6 +366,6 @@ def write_pid(self): pid = os.getpid() - fp = open(conf.pidfile,'w') + fp = open(conf.pidfile, 'w') fp.write("%d\n" % (pid)) fp.close()
View file
pykolab-0.8.1.tar.gz/kolabd/process.py
Changed
@@ -27,6 +27,7 @@ log = pykolab.getLogger('pykolab.daemon') conf = pykolab.getConf() + class KolabdProcess(multiprocessing.Process): def __init__(self, domain): self.domain = domain @@ -42,7 +43,7 @@ log.debug(_("Synchronizing for domain %s") % (domain), level=8) sync_interval = conf.get('kolab', 'sync_interval') - if sync_interval == None or sync_interval == 0: + if sync_interval is None or sync_interval == 0: sync_interval = 300 else: sync_interval = (int)(sync_interval)
View file
pykolab-0.8.1.tar.gz/pykolab/auth/__init__.py
Changed
@@ -179,6 +179,20 @@ del self._auth self._auth = None + def find_folder_resource(self, folder): + """ + Find one or more resources corresponding to the shared folder name. + """ + if not self._auth or self._auth == None: + self.connect() + + result = self._auth.find_folder_resource(folder) + + if isinstance(result, list) and len(result) == 1: + return result0 + else: + return result + def find_recipient(self, address, domain=None): """ Find one or more entries corresponding to the recipient address.
View file
pykolab-0.8.1.tar.gz/pykolab/auth/ldap/__init__.py
Changed
@@ -116,7 +116,9 @@ pykolab.base.Base.__init__(self, domain=domain) self.ldap = None - self.bind = False + self.ldap_priv = None + self.bind = None + if domain == None: self.domain = conf.get('kolab', 'primary_domain') else: @@ -225,13 +227,9 @@ (entry_dn, entry_attrs) = _result_data0 try: - log.debug(_("Binding with user_dn %s and password %s") - % (entry_dn, '*' * len(login1))) - # Needs to be synchronous or succeeds and continues setting retval # to True!! - self.ldap.simple_bind_s(entry_dn, login1) - retval = True + retval = self._bind(entry_dn, login1) try: auth_cache.set_entry(_filter, entry_dn) except Exception, errmsg: @@ -255,18 +253,14 @@ retval = False else: try: - log.debug(_("Binding with user_dn %s and password %s") - % (entry_dn, '*' * len(login1))) - # Needs to be synchronous or succeeds and continues setting retval # to True!! - self.ldap.simple_bind_s(entry_dn, login1) + retval = self._bind(entry_dn, login1) auth_cache.set_entry(_filter, entry_dn) - retval = True except ldap.NO_SUCH_OBJECT, errmsg: log.debug(_("Error occured, there is no such object: %r") % (errmsg), level=8) - self.bind = False + self.bind = None try: auth_cache.del_entry(_filter) except: @@ -289,11 +283,13 @@ return retval - def connect(self): + def connect(self, priv=None): """ Connect to the LDAP server through the uri configured. """ - if not self.ldap == None: + if priv is None and self.ldap is not None: + return + if priv is not None and self.ldap_priv is not None: return log.debug(_("Connecting to LDAP..."), level=8) @@ -307,15 +303,20 @@ if conf.debuglevel > 8: trace_level = 1 - self.ldap = ldap.ldapobject.ReconnectLDAPObject( + conn = ldap.ldapobject.ReconnectLDAPObject( uri, trace_level=trace_level, retry_max=200, retry_delay=3.0 ) - self.ldap.protocol_version = 3 - self.ldap.supported_controls = + conn.protocol_version = 3 + conn.supported_controls = + + if priv is None: + self.ldap = conn + else: + self.ldap_priv = conn def entry_dn(self, entry_id): """ @@ -468,6 +469,78 @@ return delegators + def find_folder_resource(self, folder="*", exclude_entry_id=None): + """ + Given a shared folder name or list of folder names, find one or more valid + resources. + + Specify an additional entry_id to exclude to exclude matches. + """ + + self._bind() + + if not exclude_entry_id == None: + __filter_prefix = "(&" + __filter_suffix = "(!(%s=%s)))" % ( + self.config_get('unique_attribute'), + exclude_entry_id + ) + else: + __filter_prefix = "" + __filter_suffix = "" + + resource_filter = self.config_get('resource_filter') + if not resource_filter == None: + __filter_prefix = "(&%s" % resource_filter + __filter_suffix = ")" + + recipient_address_attrs = self.config_get_list("mail_attributes") + + result_attributes = recipient_address_attrs + result_attributes.append(self.config_get('unique_attribute')) + result_attributes.append('kolabTargetFolder') + + _filter = "(|" + + if isinstance(folder, basestring): + _filter += "(kolabTargetFolder=%s)" % (folder) + else: + for _folder in folder: + _filter += "(kolabTargetFolder=%s)" % (_folder) + + _filter += ")" + + _filter = "%s%s%s" % (__filter_prefix,_filter,__filter_suffix) + + log.debug(_("Finding resource with filter %r") % (_filter), level=8) + + if len(_filter) <= 6: + return None + + config_base_dn = self.config_get('resource_base_dn') + ldap_base_dn = self._kolab_domain_root_dn(self.domain) + + if not ldap_base_dn == None and not ldap_base_dn == config_base_dn: + resource_base_dn = ldap_base_dn + else: + resource_base_dn = config_base_dn + + _results = self.ldap.search_s( + resource_base_dn, + scope=ldap.SCOPE_SUBTREE, + filterstr=_filter, + attrlist=result_attributes, + attrsonly=True + ) + + _entry_dns = + + for _result in _results: + (_entry_id, _entry_attrs) = _result + _entry_dns.append(_entry_id) + + return _entry_dns + def find_recipient(self, address="*", exclude_entry_id=None): """ Given an address string or list of addresses, find one or more valid @@ -981,8 +1054,11 @@ return entry_modifications def reconnect(self): + bind = self.bind self._disconnect() self.connect() + if bind is not None: + self._bind(bind'dn', bind'pw') def search_entry_by_attribute(self, attr, value, **kw): self._bind() @@ -1006,7 +1082,6 @@ override_search='_regular_search' ) - def set_entry_attribute(self, entry_id, attribute, value): log.debug(_("Setting entry attribute %r to %r for %r") % (attribute, value, entry_id), level=9) self.set_entry_attributes(entry_id, { attribute: value }) @@ -1040,9 +1115,9 @@ dn = entry_dn - if len(modlist) > 0: + if len(modlist) > 0 and self._bind_priv() is True: try: - self.ldap.modify_s(dn, modlist) + self.ldap_priv.modify_s(dn, modlist) except: log.error(_("Could not update dn %r:\n%r") % (dn, modlist)) @@ -1156,6 +1231,10 @@ except: current_ldap_quota = None + # If the new quota is zero, get out + if new_quota == 0: + return + if not current_ldap_quota == None: if not new_quota == (int)(current_ldap_quota): self.set_entry_attribute( @@ -1183,26 +1262,66 @@ ### API depth level increasing! ### - def _bind(self): - if self.ldap == None: + def _bind(self, bind_dn=None, bind_pw=None): + # If we have no LDAP, we have no previous state. + if self.ldap is None: + self.bind = None self.connect() - if not self.bind: - bind_dn = self.config_get('bind_dn') - bind_pw = self.config_get('bind_pw') + # If we are to bind as foo, we have no state. + if bind_dn is not None: + self.bind = None + + # Only if we have no state and no bind credentials specified in the + # function call. + if self.bind is None: + + if bind_dn is None: + bind_dn = self.config_get('service_bind_dn') + + if bind_pw is None: + bind_pw = self.config_get('service_bind_pw') + + if bind_dn is not None: + log.debug(_("Binding with bind_dn: %s and password: %s") + % (bind_dn, '*' * len(bind_pw))) # TODO: Binding errors control try: self.ldap.simple_bind_s(bind_dn, bind_pw) - self.bind = True + self.bind = {'dn': bind_dn, 'pw': bind_pw} + return True except ldap.SERVER_DOWN, errmsg: log.error(_("LDAP server unavailable: %r") % (errmsg)) log.error(_("%s") % (traceback.format_exc())) + return False except ldap.INVALID_CREDENTIALS: log.error(_("Invalid DN, username and/or password.")) + return False + else: + log.debug(_("bind() called but already bound"), level=8) + return True + + def _bind_priv(self): + if self.ldap_priv is None: + self.connect(True) + + bind_dn = self.config_get('bind_dn') + bind_pw = self.config_get('bind_pw') + try: + self.ldap_priv.simple_bind_s(bind_dn, bind_pw) + return True + except ldap.SERVER_DOWN, errmsg: + log.error(_("LDAP server unavailable: %r") % (errmsg)) + log.error(_("%s") % (traceback.format_exc())) + return False + except ldap.INVALID_CREDENTIALS: + log.error(_("Invalid DN, username and/or password.")) + return False else: - log.debug(_("_bind called, but already bound"), level=9) + log.debug(_("bind_priv() called but already bound"), level=8) + return True def _change_add_group(self, entry, change): """ @@ -1317,44 +1436,8 @@ folderacl_entry_attribute ) - if not entryfolderacl_entry_attribute == None: - # Parse it before assigning it - entry'kolabfolderaclentry' = - if not isinstance(entryfolderacl_entry_attribute, list): - entryfolderacl_entry_attribute = entryfolderacl_entry_attribute - - for acl_entry in entryfolderacl_entry_attribute: - acl_access = acl_entry.split()-1 - - if len(acl_entry.split(', ')) > 1: - aci_subject = ', '.join(acl_entry.split(', '):-1) - else: - aci_subject = acl_entry.split()0 - - log.debug(_("Found a subject %r with access %r") % (aci_subject, acl_access), level=8) - - access_lookup_dict = { - 'all': 'lrsedntxakcpiw', - 'append': 'wip', - 'full': 'lrswipkxtecdn', - 'read': 'lrs', - 'read-only': 'lrs', - 'read-write': 'lrswitedn', - 'post': 'p', - 'semi-full': 'lrswit', - 'write': 'lrswite', - } - - if access_lookup_dict.has_key(acl_access): - acl_access = access_lookup_dictacl_access - - log.debug(_("Found a subject %r with access %r") % (aci_subject, acl_access), level=8) - - entry'kolabfolderaclentry'.append("(%r, %r, %r)" % (folder_path, aci_subject, acl_access)) - if not self.imap.shared_folder_exists(folder_path): self.imap.shared_folder_create(folder_path, server) - self.imap.set_acl(folder_path, 'anyone', '') if entry.has_key('kolabfoldertype') and \ not entry'kolabfoldertype' == None: @@ -1364,12 +1447,11 @@ entry'kolabfoldertype' ) - if entry.has_key('kolabfolderaclentry') and \ - not entry'kolabfolderaclentry' == None: + entry'kolabfolderaclentry' = self._parse_acl(entryfolderacl_entry_attribute) - self.imap._set_kolab_mailfolder_acls( - entry'kolabfolderaclentry' - ) + self.imap._set_kolab_mailfolder_acls( + entry'kolabfolderaclentry', folder_path + ) if entry.has_key(delivery_address_attribute) and \ not entrydelivery_address_attribute == None: @@ -1714,44 +1796,8 @@ folderacl_entry_attribute ) - if not entryfolderacl_entry_attribute == None: - # Parse it before assigning it - entry'kolabfolderaclentry' = - if not isinstance(entryfolderacl_entry_attribute, list): - entryfolderacl_entry_attribute = entryfolderacl_entry_attribute - - for acl_entry in entryfolderacl_entry_attribute: - acl_access = acl_entry.split()-1 - - if len(acl_entry.split(', ')) > 1: - aci_subject = ', '.join(acl_entry.split(', '):-1) - else: - aci_subject = acl_entry.split()0 - - log.debug(_("Found a subject %r with access %r") % (aci_subject, acl_access), level=8) - - access_lookup_dict = { - 'all': 'lrsedntxakcpiw', - 'append': 'wip', - 'full': 'lrswipkxtecdn', - 'read': 'lrs', - 'read-only': 'lrs', - 'read-write': 'lrswitedn', - 'post': 'p', - 'semi-full': 'lrswit', - 'write': 'lrswite', - } - - if access_lookup_dict.has_key(acl_access): - acl_access = access_lookup_dictacl_access - - log.debug(_("Found a subject %r with access %r") % (aci_subject, acl_access), level=8) - - entry'kolabfolderaclentry'.append("(%r, %r, %r)" % (folder_path, aci_subject, acl_access)) - if not self.imap.shared_folder_exists(folder_path): self.imap.shared_folder_create(folder_path, server) - self.imap.set_acl(folder_path, 'anyone', '') if entry.has_key('kolabfoldertype') and \ not entry'kolabfoldertype' == None: @@ -1761,12 +1807,11 @@ entry'kolabfoldertype' ) - if entry.has_key('kolabfolderaclentry') and \ - not entry'kolabfolderaclentry' == None: + entry'kolabfolderaclentry' = self._parse_acl(entryfolderacl_entry_attribute) - self.imap._set_kolab_mailfolder_acls( - entry'kolabfolderaclentry' - ) + self.imap._set_kolab_mailfolder_acls( + entry'kolabfolderaclentry', folder_path, True + ) if entry.has_key(delivery_address_attribute) and \ not entrydelivery_address_attribute == None: @@ -1944,33 +1989,11 @@ entry'kolabfoldertype' ) - if entry.has_key('kolabfolderaclentry') and \ - not entry'kolabfolderaclentry' == None: - - if isinstance(entry'kolabfolderaclentry', basestring): - entry'kolabfolderaclentry' = entry'kolabfolderaclentry' - - import copy - _acls = copy.deepcopy(entry'kolabfolderaclentry') - entry'kolabfolderaclentry' = - - for _entry in _acls: - if _entry0 == "(": - entry'kolabfolderaclentry'.append(_entry) - continue - - s,r = x.strip() for x in _entry.split(',') + entry'kolabfolderaclentry' = self._parse_acl(entry'kolabfolderaclentry') - entry'kolabfolderaclentry'.append("('%s', '%s', '%s')" % (folder_path, s, r)) - - self.imap._set_kolab_mailfolder_acls( - entry'kolabfolderaclentry' - ) - - elif entry'kolabfolderaclentry' in None,: - for ace in self.imap.list_acls(folder_path): - aci_subject = ace.split()0 - self.imap.set_acl(folder_path, aci_subject, '') + self.imap._set_kolab_mailfolder_acls( + entry'kolabfolderaclentry', folder_path, True + ) delivery_address_attribute = self.config_get('sharedfolder_delivery_address_attribute') if entry.has_key(delivery_address_attribute) and \ @@ -2072,10 +2095,11 @@ ) def _disconnect(self): - self._unbind() del self.ldap + del self.ldap_priv self.ldap = None - self.bind = False + self.ldap_priv = None + self.bind = None def _domain_naming_context(self, domain): self._bind() @@ -2268,11 +2292,7 @@ log.debug(_("Finding domain root dn for domain %s") % (domain), level=8) - bind_dn = conf.get('ldap', 'bind_dn') - bind_pw = conf.get('ldap', 'bind_pw') - domain_base_dn = conf.get('ldap', 'domain_base_dn', quiet=True) - domain_filter = conf.get('ldap', 'domain_filter') if not domain_filter == None: @@ -2353,9 +2373,7 @@ log.debug(_("Listing domains..."), level=8) self.connect() - - bind_dn = conf.get('ldap', 'bind_dn') - bind_pw = conf.get('ldap', 'bind_pw') + self._bind() domain_base_dn = conf.get('ldap', 'domain_base_dn', quiet=True) @@ -2373,12 +2391,6 @@ if domain_base_dn == None or domain_filter == None: return - # TODO: this function should be wrapped for error handling - try: - self.ldap.simple_bind_s(bind_dn, bind_pw) - except ldap.SERVER_DOWN, e: - raise AuthBackendError, _("Authentication database DOWN") - dna = self.config_get('domain_name_attribute') if dna == None: dna = 'associateddomain' @@ -2415,13 +2427,6 @@ return domains - def _reconnect(self): - """ - Reconnect to LDAP - """ - self._disconnect() - self.connect() - def _synchronize_callback(self, *args, **kw): """ Determine the characteristics of the callback being placed, and @@ -2533,17 +2538,6 @@ # # server = self.imap.user_mailbox_server(folder) - def _unbind(self): - """ - Discard the current set of bind credentials. - - Virtually disconnects the LDAP connection, and should be followed by - a call to _bind() afterwards. - """ - - self.ldap.unbind() - self.bind = False - ### ### Backend search functions ### @@ -2934,3 +2928,32 @@ continue return _results + + def _parse_acl(self, acl): + """ + Parse LDAP ACL specification for use in IMAP + """ + + results = + + if acl is not None: + if not isinstance(acl, list): + acl = acl + + for acl_entry in acl: + # entry already converted to IMAP format? + if acl_entry0 == "(": + results.append(acl_entry) + continue + + acl_access = acl_entry.split()-1 + acl_subject = acl_entry.split(', ') + + if len(acl_subject) > 1: + acl_subject = ', '.join(acl_subject:-1) + else: + acl_subject = acl_entry.split()0 + + results.append("(%r, %r)" % (acl_subject, acl_access)) + + return results
View file
pykolab-0.8.1.tar.gz/pykolab/auth/ldap/auth_cache.py
Changed
@@ -139,6 +139,9 @@ db.commit() elif len(_entries) == 1: + if not isinstance(value, unicode): + value = unicode(value, 'utf-8') + if not _entries0.value == value: _entries0.value = value
View file
pykolab-0.8.1.tar.gz/pykolab/cli/cmd_list_deleted_mailboxes.py
Changed
@@ -70,10 +70,11 @@ print "Deleted folders:" for folder in folders: - mbox_parts = imap.parse_mailfolder(folder) + utf8_folder = imap_utf7.decode(folder).encode('utf-8') + mbox_parts = imap.parse_mailfolder(utf8_folder) + ts = datetime.datetime.fromtimestamp(int(mbox_parts'hex_timestamp', 16)) if not conf.raw: - print "%s (Deleted at %s)" % (imap_utf7.decode(folder).encode('utf-8'), datetime.datetime.fromtimestamp(int(mbox_parts'hex_timestamp', 16))) + print "%s (Deleted at %s)" % (utf8_folder, ts) else: - print "%s (Deleted at %s)" % (folder, datetime.datetime.fromtimestamp(int(mbox_parts'hex_timestamp', 16))) - + print "%s (Deleted at %s)" % (folder, ts)
View file
pykolab-0.8.1.tar.gz/pykolab/cli/cmd_list_domain_mailboxes.py
Changed
@@ -21,6 +21,7 @@ import pykolab +from pykolab import utils from pykolab import imap_utf7 from pykolab.auth import Auth from pykolab.imap import IMAP
View file
pykolab-0.8.1.tar.gz/pykolab/cli/cmd_rename_mailbox.py
Changed
@@ -70,5 +70,5 @@ print >> sys.stderr, _("Target folder %r already exists") % (target_folder) sys.exit(1) - imap.imap.rename(source_folder, target_folder, partition) + imap.imap.rename(imap.folder_utf7(source_folder), imap.folder_utf7(target_folder), partition)
View file
pykolab-0.8.1.tar.gz/pykolab/cli/cmd_sync_mailhost_attrs.py
Changed
@@ -95,14 +95,18 @@ folders.extend(imap.lm('user/%%@%s' % (domain))) for folder in folders: + r_folder = folder + if not folder.startswith('shared/'): + r_folder = '/'.join(folder.split('/')1:) + if conf.delete: if conf.dry_run: - if not folder.split('/')0 == 'shared': - log.warning(_("No recipients for '%s' (would have deleted the mailbox if not for --dry-run)!") % ('/'.join(folder.split('/')1:))) + if not folder.startswith('shared/'): + log.warning(_("No recipients for '%s' (would have deleted the mailbox if not for --dry-run)!") % (r_folder)) else: continue else: - if not '/'.join(folder.split('/')0) == 'shared': + if not folder.startswith('shared/'): log.info(_("Deleting mailbox '%s' because it has no recipients") % (folder)) try: imap.dm(folder) @@ -111,7 +115,7 @@ else: log.info(_("Not automatically deleting shared folder '%s'") % (folder)) else: - log.warning(_("No recipients for '%s' (use --delete to delete)!") % ('/'.join(folder.split('/')1:))) + log.warning(_("No recipients for '%s' (use --delete to delete)!") % (r_folder)) for primary in list(set(domains.values())): secondaries = x for x in domains.keys() if domainsx == primary @@ -130,20 +134,27 @@ for folder in folders: server = imap.user_mailbox_server(folder) - recipient = auth.find_recipient('/'.join(folder.split('/')1:)) + r_folder = folder + + if folder.startswith('shared/'): + recipient = auth.find_folder_resource(folder) + else: + r_folder = '/'.join(folder.split('/')1:) + recipient = auth.find_recipient(r_folder) + if (isinstance(recipient, list)): if len(recipient) > 1: - log.warning(_("Multiple recipients for '%s'!") % ('/'.join(folder.split('/')1:))) + log.warning(_("Multiple recipients for '%s'!") % (r_folder)) continue elif len(recipient) == 0: if conf.delete: if conf.dry_run: - if not folder.split('/')0 == 'shared': - log.warning(_("No recipients for '%s' (would have deleted the mailbox if not for --dry-run)!") % ('/'.join(folder.split('/')1:))) + if not folder.startswith('shared/'): + log.warning(_("No recipients for '%s' (would have deleted the mailbox if not for --dry-run)!") % (r_folder)) else: continue else: - if not '/'.join(folder.split('/')0) == 'shared': + if not folder.startswith('shared/'): log.info(_("Deleting mailbox '%s' because it has no recipients") % (folder)) try: imap.dm(folder) @@ -152,7 +163,7 @@ else: log.info(_("Not automatically deleting shared folder '%s'") % (folder)) else: - log.warning(_("No recipients for '%s' (use --delete to delete)!") % ('/'.join(folder.split('/')1:))) + log.warning(_("No recipients for '%s' (use --delete to delete)!") % (r_folder)) continue else: @@ -173,6 +184,10 @@ for folder in folders: server = imap.user_mailbox_server(folder) - recipient = auth.find_recipient('/'.join(folder.split('/')1:)) + + if folder.startswith('shared/'): + recipient = auth.find_folder_resource(folder) + else: + recipient = auth.find_recipient('/'.join(folder.split('/')1:)) print folder, server, recipient
View file
pykolab-0.8.1.tar.gz/pykolab/cli/sieve/cmd_list.py
Changed
@@ -23,6 +23,7 @@ import pykolab +from pykolab import utils from pykolab.auth import Auth from pykolab.cli import commands from pykolab.translate import _ @@ -31,7 +32,12 @@ conf = pykolab.getConf() def __init__(): - commands.register('list', execute, group='sieve', description=description()) + commands.register( + 'list', + execute, + group='sieve', + description=description() + ) def description(): return """List a user's sieve scripts.""" @@ -82,16 +88,27 @@ import sievelib.managesieve - sieveclient = sievelib.managesieve.Client(hostname, port, conf.debuglevel > 8) + sieveclient = sievelib.managesieve.Client( + hostname, + port, + conf.debuglevel > 8 + ) + sieveclient.connect(None, None, True) - result = sieveclient._plain_authentication(admin_login, admin_password, address) + + result = sieveclient._plain_authentication( + admin_login, + admin_password, + address + ) + if not result: print "LOGIN FAILED??" - + sieveclient.authenticated = True result = sieveclient.listscripts() - + if result == None: print "No scripts" sys.exit(0)
View file
pykolab-0.8.1.tar.gz/pykolab/imap/__init__.py
Changed
@@ -374,8 +374,8 @@ mode = 'subtract' continue if char == '+': - continue mode = 'add' + continue acl_mapmode += char @@ -520,7 +520,7 @@ _additional_folders = None if not hasattr(self, 'domain'): - self.domain == None + self.domain = None if self.domain == None and len(mailbox_base_name.split('@')) > 1: self.domain = mailbox_base_name.split('@')1 @@ -816,20 +816,43 @@ else: return False - def _set_kolab_mailfolder_acls(self, acls): + def _set_kolab_mailfolder_acls(self, acls, folder=None, update=False): + # special case, folder has no ACLs assigned and update was requested, + # remove all existing ACL entries + if update is True and isinstance(acls, list) and len(acls) == 0: + acls = self.list_acls(folder) + for subject in acls: + log.debug( + _("Removing ACL rights %s for subject %s on folder " + \ + "%s") % (aclssubject, subject, folder), level=8) + self.set_acl(folder, subject, '') + + return + if isinstance(acls, basestring): acls = acls + old_acls = None + for acl in acls: exec("acl = %s" % (acl)) - folder = acl0 - subject = acl1 - rights = acl2 - if len(acl) == 4: - epoch = acl3 + subject = acl0 + rights = acl1 + if len(acl) == 3: + epoch = acl2 else: epoch = (int)(time.time()) + 3600 + # update mode, check existing entries + if update is True: + if old_acls is None: + old_acls = self.list_acls(folder) + for old_subject in old_acls: + old_aclsold_subject = old_aclsold_subject + + if subject in old_acls: + old_aclssubject = None + if epoch > (int)(time.time()): log.debug( _("Setting ACL rights %s for subject %s on folder " + \ @@ -852,6 +875,15 @@ "" ) + # update mode, unset removed ACL entries + if old_acls is not None: + for subject in old_acls: + if old_aclssubject is not None: + log.debug( + _("Removing ACL rights %s for subject %s on folder " + \ + "%s") % (old_aclssubject, subject, folder), level=8) + self.set_acl(folder, subject, '') + pass """ Blah functions """
View file
pykolab-0.8.1.tar.gz/pykolab/imap/cyrus.py
Changed
@@ -336,7 +336,7 @@ self.m.rename( self.folder_utf7(from_mailfolder), self.folder_utf7(to_mailfolder), - '"%s"' % (partition) + partition ) def _getannotation(self, *args, **kw): @@ -468,18 +468,22 @@ ) target_server = self.find_mailfolder_server(target_folder) + source_server = self.find_mailfolder_server(undelete_folder) if hasattr(conf, 'dry_run') and not conf.dry_run: - if target_server is not self.server: + undelete_folder = self.folder_utf7(undelete_folder) + target_folder = self.folder_utf7(target_folder) + + if not target_server == source_server: self.xfer(undelete_folder, target_server) self.rename(undelete_folder, target_folder) else: - if not target_server == self.server: + if not target_server == source_server: print >> sys.stdout, \ _("Would have transferred %s from %s to %s") % ( undelete_folder, - self.server, + source_server, target_server ) @@ -589,7 +593,7 @@ mbox'domain' ) - folders = self.lm(deleted_folder_search) + folders = self.lm(self.folder_utf7(deleted_folder_search)) # The folders we have found at this stage include virtdomain folders. # @@ -606,4 +610,4 @@ folders = _folders - return folders + return self.folder_utf8(x) for x in folders
View file
pykolab-0.8.1.tar.gz/pykolab/setup/setup_ldap.py
Changed
@@ -121,7 +121,7 @@ conf.command_set('ldap', 'auth_attributes', 'samaccountname') conf.command_set('ldap', 'modifytimestamp_format', '%%Y%%m%%d%%H%%M%%S.0Z') conf.command_set('ldap', 'unique_attribute', 'userprincipalname') - + # TODO: These attributes need to be checked conf.command_set('ldap', 'mail_attributes', 'mail') conf.command_set('ldap', 'mailserver_attributes', 'mailhost') @@ -496,7 +496,7 @@ auth = Auth(_input'domain') auth.connect() auth._auth.connect() - auth._auth._bind() + auth._auth._bind(bind_dn='cn=Directory Manager', bind_pw=_input'dirmgr_pass') dn = 'uid=%s,ou=Special Users,%s' % (conf.get('cyrus-imap', 'admin_login'), _input'rootdn') @@ -572,6 +572,7 @@ attrs = {} attrs'objectclass' = 'top','extensibleobject' attrs'cn' = "kolab" + attrs'aci' = '(targetattr = "*") (version 3.0;acl "Kolab Services";allow (read,compare,search)(userdn = "ldap:///uid=kolab-service,ou=Special Users,%s");)' % (_input'rootdn') # Convert our dict to nice syntax for the add-function using modlist-module ldif = ldap.modlist.addModlist(attrs) @@ -579,15 +580,6 @@ # Do the actual synchronous add-operation to the ldapserver auth._auth.ldap.add_s(dn, ldif) - auth._auth.set_entry_attribute( - dn, - 'aci', - '(targetattr = "*") (version 3.0;acl "Kolab Services";allow (read,compare,search)(userdn = "ldap:///%s");)' % ('uid=kolab-service,ou=Special Users,%s' % (_input'rootdn')) - ) - - # TODO: Add kolab-admin role - # TODO: Assign kolab-admin admin ACLs - log.info(_("Adding domain %s to list of domains for this deployment") % (_input'domain')) dn = "associateddomain=%s,cn=kolab,cn=config" % (_input'domain') attrs = {} @@ -650,7 +642,7 @@ modlist.append((ldap.MOD_ADD, "altstateattrname", "createTimestamp")) auth._auth.ldap.modify_s(dn, modlist) - # TODO: Add kolab-admin role + # Add kolab-admin role log.info(_("Adding the kolab-admin role")) dn = "cn=kolab-admin,%s" % (_input'rootdn') attrs = {} @@ -661,7 +653,7 @@ auth._auth.ldap.add_s(dn, ldif) - # TODO: User writeable attributes on root_dn + # User writeable attributes on root_dn log.info(_("Setting access control to %s") % (_input'rootdn')) dn = _input'rootdn' aci = @@ -671,11 +663,10 @@ else: aci.append('(targetattr = "carLicense || description || displayName || facsimileTelephoneNumber || homePhone || homePostalAddress || initials || jpegPhoto || l || labeledURI || mobile || o || pager || photo || postOfficeBox || postalAddress || postalCode || preferredDeliveryMethod || preferredLanguage || registeredAddress || roomNumber || secretary || seeAlso || st || street || telephoneNumber || telexNumber || title || userCertificate || userPassword || userSMIMECertificate || x500UniqueIdentifier || kolabDelegate || kolabInvitationPolicy || kolabAllowSMTPSender") (version 3.0; acl "Enable self write for common attributes"; allow (read,compare,search,write)(userdn = "ldap:///self");)') - aci.append('(targetattr = "*") (version 3.0;acl "Directory Administrators Group";allow (all)(groupdn = "ldap:///cn=Directory Administrators,%(rootdn)s" or roledn = "ldap:///cn=kolab-admin,%(rootdn)s");)' % (_input)) aci.append('(targetattr="*")(version 3.0; acl "Configuration Administrators Group"; allow (all) groupdn="ldap:///cn=Configuration Administrators,ou=Groups,ou=TopologyManagement,o=NetscapeRoot";)') aci.append('(targetattr="*")(version 3.0; acl "Configuration Administrator"; allow (all) userdn="ldap:///uid=admin,ou=Administrators,ou=TopologyManagement,o=NetscapeRoot";)') - aci.append('(targetattr = "*")(version 3.0; acl "SIE Group"; allow (all) groupdn = "ldap:///cn=slapd-%(hostname)s,cn=389 Directory Server,cn=Server Group,cn=%(fqdn)s,ou=%(domain)s,o=NetscapeRoot";)' %(_input)) + aci.append('(targetattr = "*")(version 3.0; acl "SIE Group"; allow (all) groupdn = "ldap:///cn=slapd-%(hostname)s,cn=389 Directory Server,cn=Server Group,cn=%(fqdn)s,ou=%(domain)s,o=NetscapeRoot";)' % (_input)) aci.append('(targetattr != "userPassword") (version 3.0;acl "Search Access";allow (read,compare,search)(userdn = "ldap:///all");)') modlist = modlist.append((ldap.MOD_REPLACE, "aci", aci)) @@ -693,4 +684,3 @@ else: log.error(_("Could not start and configure to start on boot, the " + \ "directory server admin service.")) -
View file
pykolab-0.8.1.tar.gz/pykolab/utils.py
Changed
@@ -353,22 +353,29 @@ return result elif type(_object) == dict: - for key in _object.keys(): + def _strip(value): + try: + return value.strip() + except: + return value + + for key in _object: if type(_objectkey) == list: - if _objectkey == None: + if _objectkey is None: continue - if len(_objectkey) == 1: - resultkey.lower() = ''.join(_objectkey) + val = map(_strip, _objectkey) + + if len(val) == 1: + resultkey.lower() = ''.join(val) else: - resultkey.lower() = _objectkey + resultkey.lower() = val else: - if _objectkey == None: + if _objectkey is None: continue - # What the heck? - resultkey.lower() = _objectkey + resultkey.lower() = _strip(_objectkey) if result.has_key('objectsid') and not result'objectsid'0 == "S": result'objectsid' = sid_to_string(result'objectsid') @@ -387,6 +394,17 @@ if not result.has_key('domain') and result.has_key('standard_domain'): result'domain' = result'standard_domain' + if 'objectclass' not in result: + result'objectclass' = + + if result'objectclass' is None: + result'objectclass' = + + if not isinstance(result'objectclass', list): + result'objectclass' = result'objectclass' + + result'objectclass' = x.lower() for x in result'objectclass' + return result def parse_input(_input, splitchars= ' ' ):
View file
pykolab-0.8.1.tar.gz/pykolab/xml/event.py
Changed
@@ -122,6 +122,7 @@ else: self.from_ical(from_ical, from_string) + self.set_created(self.get_created()) self.uid = self.get_uid() def _load_attendees(self): @@ -151,6 +152,23 @@ self._categories.append(ustr(category)) self.event.setCategories(self._categories) + def add_recurrence_date(self, _datetime): + valid_datetime = False + if isinstance(_datetime, datetime.date): + valid_datetime = True + + if isinstance(_datetime, datetime.datetime): + # If no timezone information is passed on, make it UTC + if _datetime.tzinfo is None: + _datetime = _datetime.replace(tzinfo=pytz.utc) + + valid_datetime = True + + if not valid_datetime: + raise InvalidEventDateError, _("Rdate needs datetime.date or datetime.datetime instance, got %r") % (type(_datetime)) + + self.event.addRecurrenceDate(xmlutils.to_cdatetime(_datetime, True)) + def add_exception_date(self, _datetime): valid_datetime = False if isinstance(_datetime, datetime.date): @@ -351,6 +369,8 @@ # NOTE: Make sure to list(set()) or duplicates may arise for attr in list(set(ical_event.singletons)): if ical_event.has_key(attr): + if isinstance(ical_eventattr, list): + ical_eventattr = ical_eventattr0; self.set_from_ical(attr.lower(), ical_eventattr) # NOTE: Make sure to list(set()) or duplicates may arise @@ -478,6 +498,9 @@ return "%s - %s" % (start.strftime(date_format + " " + time_format), end.strftime(end_format)) + def get_recurrence_dates(self): + return map(lambda _: xmlutils.from_cdatetime(_, True), self.event.recurrenceDates()) + def get_exception_dates(self): return map(lambda _: xmlutils.from_cdatetime(_, True), self.event.exceptionDates()) @@ -662,6 +685,13 @@ result.append(rrule.to_ical()) return result + def get_ical_rdate(self): + rdates = self.get_recurrence_dates() + for i in range(len(rdates)): + rdatesi = icalendar.prop.vDDDLists(rdatesi) + + return rdates + def get_location(self): return self.event.location() @@ -826,6 +856,10 @@ for _datetime in _datetimes: self.add_exception_date(_datetime) + def set_recurrence_dates(self, _datetimes): + for _datetime in _datetimes: + self.add_recurrence_date(_datetime) + def add_custom_property(self, name, value): if not name.upper().startswith('X-'): raise ValueError, _("Invalid custom property name %r") % (name) @@ -943,6 +977,23 @@ def set_ical_uid(self, uid): self.set_uid(str(uid)) + def set_ical_rdate(self, rdate): + rdates = + # rdate here can be vDDDLists or a list of vDDDLists, why?! + if isinstance(rdate, icalendar.prop.vDDDLists): + rdate = rdate + + for _rdate in rdate: + if isinstance(_rdate, icalendar.prop.vDDDLists): + tzid = None + if hasattr(_rdate, 'params') and 'TZID' in _rdate.params: + tzid = _rdate.params'TZID' + dts = icalendar.prop.vDDDLists.from_ical(_rdate.to_ical(), tzid) + for datetime in dts: + rdates.append(datetime) + + self.set_recurrence_dates(rdates) + def set_ical_recurrenceid(self, value, params): try: self.thisandfuture = params.get('RANGE', '') == 'THISANDFUTURE' @@ -1313,7 +1364,7 @@ return msg def is_recurring(self): - return self.event.recurrenceRule().isValid() + return self.event.recurrenceRule().isValid() or len(self.get_recurrence_dates()) > 0 def to_event_cal(self): from kolab.calendaring import EventCal
View file
pykolab-0.8.1.tar.gz/pykolab/xml/todo.py
Changed
@@ -61,6 +61,7 @@ else: self.from_ical(from_ical, from_string) + self.set_created(self.get_created()) self.uid = self.get_uid() def from_ical(self, ical, raw): @@ -95,6 +96,8 @@ for attr in list(set(ical_todo.singletons)): if ical_todo.has_key(attr): + if isinstance(ical_todoattr, list): + ical_todoattr = ical_todoattr0; self.set_from_ical(attr.lower(), ical_todoattr) for attr in list(set(ical_todo.multiple)):
View file
pykolab-0.8.1.tar.gz/saslauthd.py
Changed
@@ -39,4 +39,3 @@ if __name__ == "__main__": saslauthd = saslauthd.SASLAuthDaemon() saslauthd.run() -
View file
pykolab-0.8.1.tar.gz/saslauthd/__init__.py
Changed
@@ -1,4 +1,4 @@ -# Copyright 2010-2013 Kolab Systems AG (http://www.kolabsys.com) +# Copyright 2010-2016 Kolab Systems AG (http://www.kolabsys.com) # # Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com> # @@ -46,6 +46,7 @@ log = pykolab.getLogger('saslauthd') conf = pykolab.getConf() + class SASLAuthDaemon(object): def __init__(self): daemon_group = conf.add_cli_parser_option_group(_("Daemon Options")) @@ -166,7 +167,8 @@ except AttributeError, e: exitcode = 1 traceback.print_exc() - print >> sys.stderr, _("Traceback occurred, please report a bug at https://issues.kolab.org") + print >> sys.stderr, _("Traceback occurred, please report a " + + "bug at https://issues.kolab.org") except TypeError, e: exitcode = 1 traceback.print_exc() @@ -174,7 +176,8 @@ except: exitcode = 2 traceback.print_exc() - print >> sys.stderr, _("Traceback occurred, please report a bug at https://issues.kolab.org") + print >> sys.stderr, _("Traceback occurred, please report a " + + "bug at https://issues.kolab.org") sys.exit(exitcode) @@ -214,8 +217,8 @@ bound = True except Exception, errmsg: log.error( - _("kolab-saslauthd could not accept " + \ - "connections on socket: %r") % (errmsg) + _("kolab-saslauthd could not accept " + + "connections on socket: %r") % (errmsg) ) if cur_tries >= max_tries: @@ -288,7 +291,7 @@ def write_pid(self): pid = os.getpid() - fp = open(conf.pidfile,'w') + fp = open(conf.pidfile, 'w') fp.write("%d\n" % (pid)) fp.close() @@ -358,7 +361,6 @@ sys.exit(1) - # Set real and effective user if not the same as current. if not user_uid == ruid: log.debug(
View file
pykolab-0.8.1.tar.gz/share/templates/roundcubemail/config.inc.php.tpl
Changed
@@ -78,8 +78,6 @@ 'kolab_folders', 'kolab_notes', 'kolab_tags', - 'libkolab', - 'libcalendaring', 'managesieve', 'newmail_notifier', 'odfviewer', @@ -101,7 +99,7 @@ \$config'session_lifetime' = 180; \$config'password_charset' = 'UTF-8'; - \$config'useragent' = 'Kolab 3.4/Roundcube ' . RCUBE_VERSION; + \$config'useragent' = 'Kolab 16/Roundcube ' . RCUBE_VERSION; \$config'message_sort_col' = 'date'; @@ -197,7 +195,6 @@ 'surname' => 'sn', 'firstname' => 'givenName', 'middlename' => 'initials', - 'prefix' => 'title', 'email:primary' => 'mail', 'email:alias' => 'alias', 'email:personal' => 'mailalternateaddress',
View file
pykolab-0.8.1.tar.gz/test-wallace.py
Changed
@@ -31,6 +31,7 @@ from email.Utils import COMMASPACE, formatdate from email import Encoders + def send_mail(send_from, send_to, send_with=None): smtp = smtplib.SMTP("localhost", 10026) smtp.set_debuglevel(True) @@ -57,7 +58,7 @@ msg.attach( MIMEText(text) ) - #msg.attach( MIMEBase('application', open('/boot/initrd-plymouth.img').read()) ) + # msg.attach( MIMEBase('application', open('/boot/initrd-plymouth.img').read()) ) smtp.sendmail(send_from, send_to, msg.as_string())
View file
pykolab-0.8.1.tar.gz/tests/functional/purge_imap.py
Changed
@@ -7,6 +7,7 @@ conf = pykolab.getConf() + def purge_imap(): time.sleep(2)
View file
pykolab-0.8.1.tar.gz/tests/functional/purge_users.py
Changed
@@ -6,6 +6,7 @@ conf = pykolab.getConf() + def purge_users(): wap_client.authenticate(conf.get("ldap", "bind_dn"), conf.get("ldap", "bind_pw"))
View file
pykolab-0.8.1.tar.gz/tests/functional/resource_func.py
Changed
@@ -4,11 +4,12 @@ conf = pykolab.getConf() + def resource_add(type, cn, members=None, owner=None, **kw): - if type == None or type == '': + if type is None or type == '': raise Exception - if cn == None or cn == '': + if cn is None or cn == '': raise Exception resource_details = { @@ -21,7 +22,10 @@ resource_details.update(kw) - result = wap_client.authenticate(conf.get('ldap', 'bind_dn'), conf.get('ldap', 'bind_pw'), conf.get('kolab', 'primary_domain')) + bind_dn = conf.get('ldap', 'bind_dn') + bind_pw = conf.get('ldap', 'bind_pw') + domain = conf.get('kolab', 'primary_domain') + result = wap_client.authenticate(bind_dn, bind_pw, domain) type_id = 0 resource_types = wap_client.resource_types_list() @@ -41,7 +45,7 @@ attr_details = resource_type_info'form_fields'attribute if isinstance(attr_details, dict): - if not attr_details.has_key('optional') or attr_details'optional' == False or resource_details.has_key(attribute): + if 'optional' not in attr_details or attr_details'optional' is False or attribute in resource_details: paramsattribute = resource_detailsattribute elif isinstance(attr_details, list): paramsattribute = resource_detailsattribute @@ -49,7 +53,7 @@ fvg_params = params fvg_params'object_type' = 'resource' fvg_params'type_id' = type_id - fvg_params'attributes' = attr for attr in resource_type_info'auto_form_fields'.keys() if not attr in params.keys() + fvg_params'attributes' = attr for attr in resource_type_info'auto_form_fields'.keys() if attr not in params result = wap_client.resource_add(params) result'dn' = "cn=" + result'cn' + ",ou=Resources,dc=example,dc=org" @@ -57,13 +61,15 @@ def purge_resources(): - wap_client.authenticate(conf.get("ldap", "bind_dn"), conf.get("ldap", "bind_pw"), conf.get('kolab', 'primary_domain')) + bind_dn = conf.get('ldap', 'bind_dn') + bind_pw = conf.get('ldap', 'bind_pw') + domain = conf.get('kolab', 'primary_domain') + result = wap_client.authenticate(bind_dn, bind_pw, domain) resources = wap_client.resources_list() for resource in resources'list': wap_client.resource_delete({'id': resource}) - #from tests.functional.purge_imap import purge_imap - #purge_imap() - + # from tests.functional.purge_imap import purge_imap + # purge_imap()
View file
pykolab-0.8.1.tar.gz/tests/functional/synchronize.py
Changed
@@ -1,5 +1,6 @@ from pykolab.auth import Auth + def synchronize_once(): auth = Auth() auth.connect()
View file
pykolab-0.8.1.tar.gz/tests/functional/test_kolabd/__init__.py
Changed
@@ -1,5 +1,6 @@ import pykolab + def setup_package(): conf = pykolab.getConf() conf.finalize_conf(fatal=False)
View file
pykolab-0.8.1.tar.gz/tests/functional/test_kolabd/test_001_user_sync.py
Changed
@@ -8,6 +8,7 @@ conf = pykolab.getConf() + class TestKolabDaemon(unittest.TestCase): @classmethod def setup_class(self, *args, **kw): @@ -61,7 +62,7 @@ result = wap_client.user_info(recipient) - if not result.has_key('mailhost'): + if 'mailhost' not in result: from tests.functional.synchronize import synchronize_once synchronize_once() @@ -106,14 +107,14 @@ print metadata folder_name = '/'.join(folder.split('/')2:).split('@')0 - if ac_folders.has_key(folder_name): - if ac_foldersfolder_name.has_key('annotations'): + if folder_name in ac_folders: + if 'annotations' in ac_foldersfolder_name: for _annotation in ac_foldersfolder_name'annotations'.keys(): if _annotation.startswith('/private'): continue _annotation_value = ac_foldersfolder_name'annotations'_annotation - self.assertTrue(metadatametadata.keys().pop().has_key(_annotation)) + self.assertTrue(_annotation in metadatametadata.keys().pop()) self.assertEqual(_annotation_value, metadatametadata.keys().pop()_annotation) def test_006_user_subscriptions(self): @@ -137,4 +138,3 @@ def test_013_resource_mailbox_annotation(self): pass -
View file
pykolab-0.8.1.tar.gz/tests/functional/test_kolabd/test_002_user_rename.py
Changed
@@ -8,6 +8,7 @@ conf = pykolab.getConf() + class TestKolabDaemon(unittest.TestCase): @classmethod def setup_class(self, *args, **kw): @@ -39,7 +40,7 @@ recipient = auth.find_recipient('john.doe@example.org') user_info = wap_client.user_info(recipient) - if not user_info.has_key('mailhost'): + if 'mailhost' not in user_info: from tests.functional.synchronize import synchronize_once synchronize_once() @@ -77,4 +78,3 @@ folders = imap.lm('user/joe.sixpack@example.org') self.assertEqual(len(folders), 1, "INBOX for joe.sixpack does not exist") -
View file
pykolab-0.8.1.tar.gz/tests/functional/test_kolabd/test_003_two_johns.py
Changed
@@ -8,6 +8,7 @@ conf = pykolab.getConf() + class TestKolabDaemon(unittest.TestCase): @classmethod def setup_class(self, *args, **kw): @@ -48,4 +49,3 @@ folders = imap.lm('user/john.doe2@example.org') self.assertEqual(len(folders), 1, "No INBOX found for second John") -
View file
pykolab-0.8.1.tar.gz/tests/functional/test_postfix/__init__.py
Changed
@@ -1,5 +1,6 @@ import pykolab + def setup_package(): conf = pykolab.getConf() conf.finalize_conf(fatal=False)
View file
pykolab-0.8.1.tar.gz/tests/functional/test_wallace/__init__.py
Changed
@@ -1,5 +1,6 @@ import pykolab + def setup_package(): conf = pykolab.getConf() conf.finalize_conf(fatal=False)
View file
pykolab-0.8.1.tar.gz/tests/functional/test_wallace/test_001_user_add.py
Changed
@@ -1,4 +1,3 @@ - from email import message_from_string import time @@ -11,6 +10,7 @@ conf = pykolab.getConf() + class TestUserAdd(unittest.TestCase): @classmethod @@ -45,7 +45,7 @@ folders = imap.lm('user/%(local)s@%(domain)s' % (self.john)) self.assertEqual(len(folders), 1) - + folders = imap.lm('user/%(local)s@%(domain)s' % (self.jane)) self.assertEqual(len(folders), 1)
View file
pykolab-0.8.1.tar.gz/tests/functional/test_wallace/test_002_footer.py
Changed
@@ -17,6 +17,7 @@ conf = pykolab.getConf() + class TestWallaceFooter(unittest.TestCase): user = None @@ -115,10 +116,10 @@ def send_message(self, msg, _to=None, _from=None): smtp = smtplib.SMTP('localhost', 10026) - if _to == None: + if _to is None: _to = self.send_to - if _from == None: + if _from is None: _from = self.send_from smtp.sendmail(_from, _to, msg.as_string())
View file
pykolab-0.8.1.tar.gz/tests/functional/test_wallace/test_003_nonascii_subject.py
Changed
@@ -19,6 +19,7 @@ conf = pykolab.getConf() + class TestWallaceNonASCIISubject(unittest.TestCase): @classmethod @@ -83,10 +84,10 @@ def send_message(self, msg, _to=None, _from=None): smtp = smtplib.SMTP('localhost', 10026) - if _to == None: + if _to is None: _to = self.send_to - if _from == None: + if _from is None: _from = self.send_from smtp.sendmail(_from, _to, msg.as_string()) @@ -97,7 +98,7 @@ folders = imap.lm('user/%(local)s@%(domain)s' % (self.user)) self.assertEqual(len(folders), 1) - + def test_002_send_nonascii_subject(self): subject = Header(u"test_002_nonascii_subject chwała") body = "This is a test message" @@ -123,4 +124,3 @@ if not self.check_message_delivered(subject): raise Exception -
View file
pykolab-0.8.1.tar.gz/tests/functional/test_wallace/test_004_nonascii_addresses.py
Changed
@@ -19,6 +19,7 @@ conf = pykolab.getConf() + class TestWallaceNonASCIIAddresses(unittest.TestCase): @classmethod @@ -83,10 +84,10 @@ def send_message(self, msg, _to=None, _from=None): smtp = smtplib.SMTP('localhost', 10026) - if _to == None: + if _to is None: _to = self.send_to - if _from == None: + if _from is None: _from = self.send_from smtp.sendmail(_from, _to, msg.as_string()) @@ -97,7 +98,7 @@ folders = imap.lm('user/%(local)s@%(domain)s' % (self.user)) self.assertEqual(len(folders), 1) - + def test_002_send_nonascii_addresses(self): subject = Header(u"test_002_nonascii_addresses") body = "This is a test message" @@ -123,4 +124,3 @@ if not self.check_message_delivered(subject): raise Exception -
View file
pykolab-0.8.1.tar.gz/tests/functional/test_wallace/test_005_resource_add.py
Changed
@@ -11,12 +11,13 @@ conf = pykolab.getConf() + class TestResourceAdd(unittest.TestCase): @classmethod def setUp(self): from tests.functional.purge_users import purge_users - #purge_users() + # purge_users() self.john = { 'local': 'john.doe', @@ -24,13 +25,13 @@ } from tests.functional.user_add import user_add - #user_add("John", "Doe") + # user_add("John", "Doe") funcs.purge_resources() self.audi = funcs.resource_add("car", "Audi A4") self.passat = funcs.resource_add("car", "VW Passat") self.boxter = funcs.resource_add("car", "Porsche Boxter S", kolabinvitationpolicy='ACT_ACCEPT_AND_NOTIFY') - self.cars = funcs.resource_add("collection", "Company Cars", self.audi'dn', self.passat'dn', self.boxter'dn' , kolabinvitationpolicy='ACT_ACCEPT') + self.cars = funcs.resource_add("collection", "Company Cars", self.audi'dn', self.passat'dn', self.boxter'dn', kolabinvitationpolicy='ACT_ACCEPT') from tests.functional.synchronize import synchronize_once synchronize_once() @@ -38,8 +39,8 @@ @classmethod def tearDown(self): from tests.functional.purge_users import purge_users - #funcs.purge_resources() - #purge_users() + # funcs.purge_resources() + # purge_users() def test_001_resource_created(self): resource = module_resources.resource_record_from_email_address(self.audi'mail')
View file
pykolab-0.8.1.tar.gz/tests/functional/test_wallace/test_005_resource_invitation.py
Changed
@@ -131,7 +131,6 @@ END:VCALENDAR """ - itip_recurring = """ BEGIN:VCALENDAR VERSION:2.0 @@ -178,8 +177,8 @@ --=_c8894dbdb8baeedacae836230e3436fd-- """ -class TestResourceInvitation(unittest.TestCase): +class TestResourceInvitation(unittest.TestCase): john = None @classmethod @@ -192,7 +191,7 @@ @classmethod def setup_class(self, *args, **kw): # set language to default - pykolab.translate.setUserLanguage(conf.get('kolab','default_locale')) + pykolab.translate.setUserLanguage(conf.get('kolab', 'default_locale')) self.itip_reply_subject = _("Reservation Request for %(summary)s was %(status)s") @@ -223,14 +222,14 @@ self.audi = funcs.resource_add("car", "Audi A4") self.passat = funcs.resource_add("car", "VW Passat") self.boxter = funcs.resource_add("car", "Porsche Boxter S") - self.cars = funcs.resource_add("collection", "Company Cars", self.audi'dn', self.passat'dn', self.boxter'dn' ) + self.cars = funcs.resource_add("collection", "Company Cars", self.audi'dn', self.passat'dn', self.boxter'dn') self.room1 = funcs.resource_add("confroom", "Room 101", owner=self.jane'dn', kolabinvitationpolicy='ACT_ACCEPT_AND_NOTIFY') self.room2 = funcs.resource_add("confroom", "Conference Room B-222") - self.rooms = funcs.resource_add("collection", "Rooms", self.room1'dn', self.room2'dn' , self.jane'dn', kolabinvitationpolicy='ACT_ACCEPT_AND_NOTIFY') + self.rooms = funcs.resource_add("collection", "Rooms", self.room1'dn', self.room2'dn', self.jane'dn', kolabinvitationpolicy='ACT_ACCEPT_AND_NOTIFY') self.room3 = funcs.resource_add("confroom", "CEOs Office 303") - self.viprooms = funcs.resource_add("collection", "VIP Rooms", self.room3'dn' , self.jane'dn', kolabinvitationpolicy='ACT_MANUAL') + self.viprooms = funcs.resource_add("collection", "VIP Rooms", self.room3'dn', self.jane'dn', kolabinvitationpolicy='ACT_MANUAL') time.sleep(1) from tests.functional.synchronize import synchronize_once @@ -315,16 +314,16 @@ return uid - def send_owner_response(self, event, partstat, from_addr=None): if from_addr is None: from_addr = self.jane'mail' - itip_reply = event.to_message_itip(from_addr, - method="REPLY", - participant_status=partstat, - message_text="Request " + partstat, - subject="Booking has been %s" % (partstat) + itip_reply = event.to_message_itip( + from_addr, + method="REPLY", + participant_status=partstat, + message_text="Request " + partstat, + subject="Booking has been %s" % (partstat) ) smtp = smtplib.SMTP('localhost', 10026) @@ -405,7 +404,6 @@ imap.imap.m.expunge() imap.disconnect() - def find_resource_by_email(self, email): resource = None for r in self.audi, self.passat, self.boxter, self.room1, self.room2: @@ -414,7 +412,6 @@ break return resource - def test_001_resource_from_email_address(self): resource = module_resources.resource_record_from_email_address(self.audi'mail') self.assertEqual(len(resource), 1) @@ -424,35 +421,32 @@ self.assertEqual(len(collection), 1) self.assertEqual(collection0, self.cars'dn') - def test_002_invite_resource(self): - uid = self.send_itip_invitation(self.audi'mail', datetime.datetime(2014,7,13, 10,0,0)) + uid = self.send_itip_invitation(self.audi'mail', datetime.datetime(2014, 7, 13, 10, 0, 0)) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }, self.audi'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}, self.audi'mail') self.assertIsInstance(response, email.message.Message) event = self.check_resource_calendar_event(self.audi'kolabtargetfolder', uid) self.assertIsInstance(event, pykolab.xml.Event) self.assertEqual(event.get_summary(), "test") - # @depends test_002_invite_resource def test_003_invite_resource_conflict(self): - uid = self.send_itip_invitation(self.audi'mail', datetime.datetime(2014,7,13, 12,0,0)) + uid = self.send_itip_invitation(self.audi'mail', datetime.datetime(2014, 7, 13, 12, 0, 0)) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('DECLINED') }, self.audi'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('DECLINED')}, self.audi'mail') self.assertIsInstance(response, email.message.Message) self.assertEqual(self.check_resource_calendar_event(self.audi'kolabtargetfolder', uid), None) - def test_004_invite_resource_collection(self): self.purge_mailbox(self.john'mailbox') - uid = self.send_itip_invitation(self.cars'mail', datetime.datetime(2014,7,13, 12,0,0)) + uid = self.send_itip_invitation(self.cars'mail', datetime.datetime(2014, 7, 13, 12, 0, 0)) # one of the collection members accepted the reservation - accept = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }) + accept = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}) self.assertIsInstance(accept, email.message.Message) delegatee = self.find_resource_by_email(accept'from') @@ -462,23 +456,22 @@ self.assertIsInstance(self.check_resource_calendar_event(delegatee'kolabtargetfolder', uid), pykolab.xml.Event) # resource collection responds with a DELEGATED message - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('DELEGATED') }, self.cars'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('DELEGATED')}, self.cars'mail') self.assertIsInstance(response, email.message.Message) self.assertIn("ROLE=NON-PARTICIPANT;RSVP=FALSE", str(response)) - def test_005_rescheduling_reservation(self): self.purge_mailbox(self.john'mailbox') - uid = self.send_itip_invitation(self.audi'mail', datetime.datetime(2014,4,1, 10,0,0)) + uid = self.send_itip_invitation(self.audi'mail', datetime.datetime(2014, 4, 1, 10, 0, 0)) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }, self.audi'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}, self.audi'mail') self.assertIsInstance(response, email.message.Message) self.purge_mailbox(self.john'mailbox') - self.send_itip_update(self.audi'mail', uid, datetime.datetime(2014,4,1, 12,0,0)) # conflict with myself + self.send_itip_update(self.audi'mail', uid, datetime.datetime(2014, 4, 1, 12, 0, 0)) # conflict with myself - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }, self.audi'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}, self.audi'mail') self.assertIsInstance(response, email.message.Message) event = self.check_resource_calendar_event(self.audi'kolabtargetfolder', uid) @@ -486,28 +479,27 @@ self.assertEqual(event.get_start().hour, 12) self.assertEqual(event.get_sequence(), 2) - def test_005_rescheduling_collection(self): self.purge_mailbox(self.john'mailbox') - uid = self.send_itip_invitation(self.cars'mail', datetime.datetime(2014,4,24, 12,0,0)) + uid = self.send_itip_invitation(self.cars'mail', datetime.datetime(2014, 4, 24, 12, 0, 0)) # one of the collection members accepted the reservation - accept = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }) + accept = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}) self.assertIsInstance(accept, email.message.Message) delegatee = self.find_resource_by_email(accept'from') # book that resource for the next day - self.send_itip_invitation(delegatee'mail', datetime.datetime(2014,4,25, 14,0,0)) - accept2 = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }) + self.send_itip_invitation(delegatee'mail', datetime.datetime(2014, 4, 25, 14, 0, 0)) + accept2 = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}) # re-schedule first booking to a conflicting date self.purge_mailbox(self.john'mailbox') update_template = itip_delegated.replace("resource-car-audia4@example.org", delegatee'mail') - self.send_itip_update(delegatee'mail', uid, datetime.datetime(2014,4,25, 12,0,0), template=update_template) + self.send_itip_update(delegatee'mail', uid, datetime.datetime(2014, 4, 25, 12, 0, 0), template=update_template) # expect response from another member of the initially delegated collection - new_accept = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }) + new_accept = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}) self.assertIsInstance(new_accept, email.message.Message) new_delegatee = self.find_resource_by_email(new_accept'from') @@ -518,17 +510,16 @@ self.assertIsInstance(event, pykolab.xml.Event) # old resource responds with a DELEGATED message - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('DELEGATED') }, delegatee'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('DELEGATED')}, delegatee'mail') self.assertIsInstance(response, email.message.Message) # old reservation was removed from old delegate's calendar self.assertEqual(self.check_resource_calendar_event(delegatee'kolabtargetfolder', uid), None) - def test_006_cancelling_revervation(self): self.purge_mailbox(self.john'mailbox') - uid = self.send_itip_invitation(self.boxter'mail', datetime.datetime(2014,5,1, 10,0,0)) + uid = self.send_itip_invitation(self.boxter'mail', datetime.datetime(2014, 5, 1, 10, 0, 0)) self.assertIsInstance(self.check_resource_calendar_event(self.boxter'kolabtargetfolder', uid), pykolab.xml.Event) self.send_itip_cancel(self.boxter'mail', uid) @@ -537,20 +528,19 @@ self.assertEqual(self.check_resource_calendar_event(self.boxter'kolabtargetfolder', uid), None) # make new reservation to the now free'd slot - self.send_itip_invitation(self.boxter'mail', datetime.datetime(2014,5,1, 9,0,0)) + self.send_itip_invitation(self.boxter'mail', datetime.datetime(2014, 5, 1, 9, 0, 0)) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }, self.boxter'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}, self.boxter'mail') self.assertIsInstance(response, email.message.Message) - def test_007_update_delegated(self): self.purge_mailbox(self.john'mailbox') - dt = datetime.datetime(2014,8,1, 12,0,0) + dt = datetime.datetime(2014, 8, 1, 12, 0, 0) uid = self.send_itip_invitation(self.cars'mail', dt) # wait for accept notification - accept = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }) + accept = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}) self.assertIsInstance(accept, email.message.Message) delegatee = self.find_resource_by_email(accept'from') @@ -561,97 +551,91 @@ self.send_itip_update(delegatee'mail', uid, dt, template=update_template) # get response from delegatee - accept = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }) + accept = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}) self.assertIsInstance(accept, email.message.Message) self.assertIn(delegatee'mail', accept'from') # no delegation response on updates - self.assertEqual(self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('DELEGATED') }, self.cars'mail'), None) - + self.assertEqual(self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('DELEGATED')}, self.cars'mail'), None) def test_008_allday_reservation(self): self.purge_mailbox(self.john'mailbox') - uid = self.send_itip_invitation(self.audi'mail', datetime.datetime(2014,6,2), True) + uid = self.send_itip_invitation(self.audi'mail', datetime.datetime(2014, 6, 2), True) - accept = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }) + accept = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}) self.assertIsInstance(accept, email.message.Message) event = self.check_resource_calendar_event(self.audi'kolabtargetfolder', uid) self.assertIsInstance(event, pykolab.xml.Event) self.assertIsInstance(event.get_start(), datetime.date) - uid2 = self.send_itip_invitation(self.audi'mail', datetime.datetime(2014,6,2, 16,0,0)) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('DECLINED') }, self.audi'mail') + uid2 = self.send_itip_invitation(self.audi'mail', datetime.datetime(2014, 6, 2, 16, 0, 0)) + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('DECLINED')}, self.audi'mail') self.assertIsInstance(response, email.message.Message) - def test_009_recurring_events(self): self.purge_mailbox(self.john'mailbox') # register an infinitely recurring resource invitation - uid = self.send_itip_invitation(self.audi'mail', datetime.datetime(2014,2,20, 12,0,0), + uid = self.send_itip_invitation(self.audi'mail', datetime.datetime(2014, 2, 20, 12, 0, 0), template=itip_recurring.replace(";COUNT=10", "")) - accept = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }) + accept = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}) self.assertIsInstance(accept, email.message.Message) # check non-recurring against recurring - uid2 = self.send_itip_invitation(self.audi'mail', datetime.datetime(2014,3,13, 10,0,0)) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('DECLINED') }, self.audi'mail') + uid2 = self.send_itip_invitation(self.audi'mail', datetime.datetime(2014, 3, 13, 10, 0, 0)) + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('DECLINED')}, self.audi'mail') self.assertIsInstance(response, email.message.Message) self.purge_mailbox(self.john'mailbox') # check recurring against recurring - uid3 = self.send_itip_invitation(self.audi'mail', datetime.datetime(2014,2,22, 8,0,0), template=itip_recurring) - accept = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }) + uid3 = self.send_itip_invitation(self.audi'mail', datetime.datetime(2014, 2, 22, 8, 0, 0), template=itip_recurring) + accept = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}) self.assertIsInstance(accept, email.message.Message) - def test_010_invalid_bookings(self): self.purge_mailbox(self.john'mailbox') itip_other = itip_invitation.replace("mailto:%s", "mailto:some-other-resource@example.org\nCOMMENT: Sent to %s") - self.send_itip_invitation(self.audi'mail', datetime.datetime(2014,3,22, 8,0,0), template=itip_other) + self.send_itip_invitation(self.audi'mail', datetime.datetime(2014, 3, 22, 8, 0, 0), template=itip_other) time.sleep(1) itip_invalid = itip_invitation.replace("DTSTART;", "X-DTSTART;") - self.send_itip_invitation(self.audi'mail', datetime.datetime(2014,3,24, 19,30,0), template=itip_invalid) - - self.assertEqual(self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }, self.audi'mail'), None) + self.send_itip_invitation(self.audi'mail', datetime.datetime(2014, 3, 24, 19, 30, 0), template=itip_invalid) + self.assertEqual(self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}, self.audi'mail'), None) def test_011_owner_info(self): self.purge_mailbox(self.john'mailbox') - self.send_itip_invitation(self.room1'mail', datetime.datetime(2014,6,19, 16,0,0)) + self.send_itip_invitation(self.room1'mail', datetime.datetime(2014, 6, 19, 16, 0, 0)) - accept = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }, self.room1'mail') + accept = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}, self.room1'mail') self.assertIsInstance(accept, email.message.Message) respose_text = str(accept.get_payload(0)) self.assertIn(self.jane'mail', respose_text) self.assertIn(self.jane'displayname', respose_text) - def test_011_owner_info_from_collection(self): self.purge_mailbox(self.john'mailbox') - self.send_itip_invitation(self.room2'mail', datetime.datetime(2014,6,19, 16,0,0)) + self.send_itip_invitation(self.room2'mail', datetime.datetime(2014, 6, 19, 16, 0, 0)) - accept = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }, self.room2'mail') + accept = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}, self.room2'mail') self.assertIsInstance(accept, email.message.Message) respose_text = str(accept.get_payload(0)) self.assertIn(self.jane'mail', respose_text) self.assertIn(self.jane'displayname', respose_text) - def test_012_owner_notification(self): self.purge_mailbox(self.john'mailbox') self.purge_mailbox(self.jane'mailbox') - self.send_itip_invitation(self.room1'mail', datetime.datetime(2014,8,4, 13,0,0)) + self.send_itip_invitation(self.room1'mail', datetime.datetime(2014, 8, 4, 13, 0, 0)) # check notification message sent to resource owner (jane) notify = self.check_message_received(_('Booking for %s has been %s') % (self.room1'cn', participant_status_label('ACCEPTED')), self.room1'mail', self.jane'mailbox') @@ -664,25 +648,24 @@ self.purge_mailbox(self.john'mailbox') # check notification sent to collection owner (jane) - self.send_itip_invitation(self.rooms'mail', datetime.datetime(2014,8,4, 12,30,0)) + self.send_itip_invitation(self.rooms'mail', datetime.datetime(2014, 8, 4, 12, 30, 0)) # one of the collection members accepted the reservation - accepted = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }) + accepted = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}) delegatee = self.find_resource_by_email(accepted'from') notify = self.check_message_received(_('Booking for %s has been %s') % (delegatee'cn', participant_status_label('ACCEPTED')), delegatee'mail', self.jane'mailbox') self.assertIsInstance(notify, email.message.Message) self.assertIn(self.john'mail', notification_text) - def test_013_owner_confirmation_accept(self): self.purge_mailbox(self.john'mailbox') self.purge_mailbox(self.jane'mailbox') - uid = self.send_itip_invitation(self.room3'mail', datetime.datetime(2014,9,12, 14,0,0)) + uid = self.send_itip_invitation(self.room3'mail', datetime.datetime(2014, 9, 12, 14, 0, 0)) # requester (john) gets a TENTATIVE confirmation - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('TENTATIVE') }, self.room3'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('TENTATIVE')}, self.room3'mail') self.assertIsInstance(response, email.message.Message) event = self.check_resource_calendar_event(self.room3'kolabtargetfolder', uid) @@ -699,7 +682,7 @@ self.send_owner_response(itip_event'xml', 'ACCEPTED', from_addr=self.jane'mail') # requester (john) now gets the ACCEPTED response - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }, self.room3'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}, self.room3'mail') self.assertIsInstance(response, email.message.Message) event = self.check_resource_calendar_event(self.room3'kolabtargetfolder', uid) @@ -707,15 +690,14 @@ self.assertEqual(event.get_status(True), 'CONFIRMED') self.assertEqual(event.get_attendee_by_email(self.room3'mail').get_participant_status(True), 'ACCEPTED') - def test_014_owner_confirmation_decline(self): self.purge_mailbox(self.john'mailbox') self.purge_mailbox(self.jane'mailbox') - uid = self.send_itip_invitation(self.room3'mail', datetime.datetime(2014,9,14, 9,0,0)) + uid = self.send_itip_invitation(self.room3'mail', datetime.datetime(2014, 9, 14, 9, 0, 0)) # requester (john) gets a TENTATIVE confirmation - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('TENTATIVE') }, self.room3'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('TENTATIVE')}, self.room3'mail') self.assertIsInstance(response, email.message.Message) # check confirmation message sent to resource owner (jane) @@ -725,11 +707,12 @@ itip_event = events_from_message(notify)0 # resource owner declines reservation request - itip_reply = itip_event'xml'.to_message_itip(self.jane'mail', - method="REPLY", - participant_status='DECLINED', - message_text="Request declined", - subject=_('Booking for %s has been %s') % (self.room3'cn', participant_status_label('DECLINED')) + itip_reply = itip_event'xml'.to_message_itip( + self.jane'mail', + method="REPLY", + participant_status='DECLINED', + message_text="Request declined", + subject=_('Booking for %s has been %s') % (self.room3'cn', participant_status_label('DECLINED')) ) smtp = smtplib.SMTP('localhost', 10026) @@ -737,23 +720,22 @@ smtp.quit() # requester (john) now gets the DECLINED response - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('DECLINED') }, self.room3'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('DECLINED')}, self.room3'mail') self.assertIsInstance(response, email.message.Message) # tentative reservation was set to cancelled event = self.check_resource_calendar_event(self.room3'kolabtargetfolder', uid) self.assertEqual(event, None) - #self.assertEqual(event.get_status(True), 'CANCELLED') - #self.assertEqual(event.get_attendee_by_email(self.room3'mail').get_participant_status(True), 'DECLINED') - + # self.assertEqual(event.get_status(True), 'CANCELLED') + # self.assertEqual(event.get_attendee_by_email(self.room3'mail').get_participant_status(True), 'DECLINED') def test_015_owner_confirmation_update(self): self.purge_mailbox(self.john'mailbox') - uid = self.send_itip_invitation(self.room3'mail', datetime.datetime(2014,8,19, 9,0,0), uid="http://a-totally.stupid/?uid") + uid = self.send_itip_invitation(self.room3'mail', datetime.datetime(2014, 8, 19, 9, 0, 0), uid="http://a-totally.stupid/?uid") # requester (john) gets a TENTATIVE confirmation - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('TENTATIVE') }, self.room3'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('TENTATIVE')}, self.room3'mail') self.assertIsInstance(response, email.message.Message) # check first confirmation message sent to resource owner (jane) @@ -767,7 +749,7 @@ self.purge_mailbox(self.john'mailbox') # send update with new date (and sequence) - self.send_itip_update(self.room3'mail', uid, datetime.datetime(2014,8,19, 16,0,0)) + self.send_itip_update(self.room3'mail', uid, datetime.datetime(2014, 8, 19, 16, 0, 0)) event = self.check_resource_calendar_event(self.room3'kolabtargetfolder', uid) self.assertIsInstance(event, pykolab.xml.Event) @@ -781,11 +763,12 @@ self.assertEqual(itip_event2'start'.hour, 16) # resource owner declines the first reservation request - itip_reply = itip_event1'xml'.to_message_itip(self.jane'mail', - method="REPLY", - participant_status='DECLINED', - message_text="Request declined", - subject=_('Booking for %s has been %s') % (self.room3'cn', participant_status_label('DECLINED')) + itip_reply = itip_event1'xml'.to_message_itip( + self.jane'mail', + method="REPLY", + participant_status='DECLINED', + message_text="Request declined", + subject=_('Booking for %s has been %s') % (self.room3'cn', participant_status_label('DECLINED')) ) smtp = smtplib.SMTP('localhost', 10026) smtp.sendmail(self.jane'mail', str(itip_event1'organizer'), str(itip_reply)) @@ -794,36 +777,36 @@ time.sleep(5) # resource owner accpets the second reservation request - itip_reply = itip_event2'xml'.to_message_itip(self.jane'mail', - method="REPLY", - participant_status='ACCEPTED', - message_text="Request accepred", - subject=_('Booking for %s has been %s') % (self.room3'cn', participant_status_label('ACCEPTED')) + itip_reply = itip_event2'xml'.to_message_itip( + self.jane'mail', + method="REPLY", + participant_status='ACCEPTED', + message_text="Request accepred", + subject=_('Booking for %s has been %s') % (self.room3'cn', participant_status_label('ACCEPTED')) ) smtp = smtplib.SMTP('localhost', 10026) smtp.sendmail(self.jane'mail', str(itip_event2'organizer'), str(itip_reply)) smtp.quit() # requester (john) now gets the ACCEPTED response - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }, self.room3'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}, self.room3'mail') self.assertIsInstance(response, email.message.Message) event = self.check_resource_calendar_event(self.room3'kolabtargetfolder', uid) self.assertIsInstance(event, pykolab.xml.Event) self.assertEqual(event.get_attendee_by_email(self.room3'mail').get_participant_status(True), 'ACCEPTED') - def test_016_collection_owner_confirmation(self): self.purge_mailbox(self.john'mailbox') - uid = self.send_itip_invitation(self.viprooms'mail', datetime.datetime(2014,8,15, 17,0,0)) + uid = self.send_itip_invitation(self.viprooms'mail', datetime.datetime(2014, 8, 15, 17, 0, 0)) # resource collection responds with a DELEGATED message - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('DELEGATED') }, self.viprooms'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('DELEGATED')}, self.viprooms'mail') self.assertIsInstance(response, email.message.Message) # the collection member tentatively accepted the reservation - accept = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('TENTATIVE') }) + accept = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('TENTATIVE')}) self.assertIsInstance(accept, email.message.Message) self.assertIn(self.room3'mail', accept'from') @@ -831,15 +814,17 @@ notify = self.check_message_received(_('Booking request for %s requires confirmation') % (self.room3'cn'), mailbox=self.jane'mailbox') self.assertIsInstance(notify, email.message.Message) - def test_017_reschedule_single_occurrence(self): self.purge_mailbox(self.john'mailbox') + subject = {'summary': 'test', 'status': participant_status_label('ACCEPTED')} + subject = self.itip_reply_subject % (subject) + # register a recurring resource invitation - start = datetime.datetime(2015,2,10, 12,0,0) + start = datetime.datetime(2015, 2, 10, 12, 0, 0) uid = self.send_itip_invitation(self.audi'mail', start, template=itip_recurring) - accept = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }) + accept = self.check_message_received(subject) self.assertIsInstance(accept, email.message.Message) self.purge_mailbox(self.john'mailbox') @@ -849,7 +834,7 @@ exstart = exdate + datetime.timedelta(hours=5) self.send_itip_update(self.audi'mail', uid, exstart, instance=exdate) - accept = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }) + accept = self.check_message_received(subject) self.assertIsInstance(accept, email.message.Message) self.assertIn("RECURRENCE-ID;TZID=Europe/London:" + exdate.strftime('%Y%m%dT%H%M%S'), str(accept)) @@ -860,7 +845,7 @@ # send new invitation for now free slot uid = self.send_itip_invitation(self.audi'mail', exdate, template=itip_invitation.replace('SUMMARY:test', 'SUMMARY:new')) - accept = self.check_message_received(self.itip_reply_subject % { 'summary':'new', 'status':participant_status_label('ACCEPTED') }) + accept = self.check_message_received(subject) self.assertIsInstance(accept, email.message.Message) # send rescheduling request to that single instance again: now conflicting @@ -868,29 +853,31 @@ exstart = exdate + datetime.timedelta(hours=2) self.send_itip_update(self.audi'mail', uid, exstart, instance=exdate, sequence=3) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('DECLINED') }) + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('DECLINED')}) self.assertIsInstance(response, email.message.Message) self.assertIn("RECURRENCE-ID;TZID=Europe/London:", str(response)) - def test_018_invite_single_occurrence(self): self.purge_mailbox(self.john'mailbox') self.purge_mailbox(self.boxter'kolabtargetfolder') - start = datetime.datetime(2015,3,2, 18,30,0) + subject = {'summary': 'test', 'status': participant_status_label('ACCEPTED')} + subject = self.itip_reply_subject % (subject) + + start = datetime.datetime(2015, 3, 2, 18, 30, 0) uid = self.send_itip_invitation(self.boxter'mail', start, instance=start) - accept = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }) + accept = self.check_message_received(subject) self.assertIsInstance(accept, email.message.Message) self.assertIn("RECURRENCE-ID;TZID=Europe/London:" + start.strftime('%Y%m%dT%H%M%S'), str(accept)) self.purge_mailbox(self.john'mailbox') # send a second invitation for another instance with the same UID - nextstart = datetime.datetime(2015,3,9, 18,30,0) + nextstart = datetime.datetime(2015, 3, 9, 18, 30, 0) self.send_itip_invitation(self.boxter'mail', nextstart, uid=uid, instance=nextstart) - accept = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }) + accept = self.check_message_received(subject) self.assertIsInstance(accept, email.message.Message) self.assertIn("RECURRENCE-ID;TZID=Europe/London:" + nextstart.strftime('%Y%m%dT%H%M%S'), str(accept)) @@ -900,7 +887,7 @@ exstart = start + datetime.timedelta(hours=2) self.send_itip_update(self.boxter'mail', uid, exstart, instance=start) - accept = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }) + accept = self.check_message_received(subject) self.assertIsInstance(accept, email.message.Message) self.assertIn("RECURRENCE-ID;TZID=Europe/London:" + start.strftime('%Y%m%dT%H%M%S'), str(accept)) @@ -919,7 +906,7 @@ # send rescheduling request to the 2nd instance self.send_itip_update(self.boxter'mail', uid, nextstart + datetime.timedelta(hours=2), sequence=2, instance=nextstart) - accept = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }) + accept = self.check_message_received(subject) self.assertIsInstance(accept, email.message.Message) self.assertIn("RECURRENCE-ID;TZID=Europe/London:" + nextstart.strftime('%Y%m%dT%H%M%S'), str(accept)) @@ -932,15 +919,17 @@ self.assertEqual(two.get_sequence(), 2) self.assertEqual(two.get_start().hour, 20) - def test_019_cancel_single_occurrence(self): self.purge_mailbox(self.john'mailbox') + subject = {'summary': 'test', 'status': participant_status_label('ACCEPTED')} + subject = self.itip_reply_subject % (subject) + # register a recurring resource invitation - start = datetime.datetime(2015,2,12, 14,0,0) + start = datetime.datetime(2015, 2, 12, 14, 0, 0) uid = self.send_itip_invitation(self.passat'mail', start, template=itip_recurring) - accept = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }) + accept = self.check_message_received(subject) self.assertIsInstance(accept, email.message.Message) exdate = start + datetime.timedelta(days=7) @@ -958,10 +947,10 @@ self.purge_mailbox(self.john'mailbox') # store a single occurrence with recurrence-id - start = datetime.datetime(2015,3,2, 18,30,0) + start = datetime.datetime(2015, 3, 2, 18, 30, 0) uid = self.send_itip_invitation(self.passat'mail', start, instance=start) - accept = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }) + accept = self.check_message_received(subject) self.assertIsInstance(accept, email.message.Message) self.send_itip_cancel(self.passat'mail', uid, instance=start) @@ -969,12 +958,11 @@ time.sleep(5) # wait for IMAP to update self.assertEqual(self.check_resource_calendar_event(self.passat'kolabtargetfolder', uid), None) - def test_020_owner_confirmation_single_occurrence(self): self.purge_mailbox(self.john'mailbox') self.purge_mailbox(self.jane'mailbox') - start = datetime.datetime(2015,4,18, 14,0,0) + start = datetime.datetime(2015, 4, 18, 14, 0, 0) uid = self.send_itip_invitation(self.room3'mail', start, template=itip_recurring) notify = self.check_message_received(_('Booking request for %s requires confirmation') % (self.room3'cn'), mailbox=self.jane'mailbox') @@ -1004,7 +992,7 @@ self.send_owner_response(itip_event'xml', 'DECLINED', from_addr=self.jane'mail') # requester (john) now gets the DECLINED response - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('DECLINED') }, self.room3'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('DECLINED')}, self.room3'mail') self.assertIsInstance(response, email.message.Message) self.assertIn("RECURRENCE-ID;TZID=Europe/London:" + exdate.strftime('%Y%m%dT%H%M%S'), str(response)) @@ -1014,4 +1002,3 @@ exception = event.get_instance(exdate) self.assertEqual(exception.get_attendee_by_email(self.room3'mail').get_participant_status(True), 'DECLINED') -
View file
pykolab-0.8.1.tar.gz/tests/functional/test_wallace/test_006_resource_performance.py
Changed
@@ -35,7 +35,7 @@ funcs.purge_resources() self.room1 = funcs.resource_add("confroom", "Room 101") self.room2 = funcs.resource_add("confroom", "Conference Room B-222") - self.rooms = funcs.resource_add("collection", "Rooms", self.room1'dn', self.room2'dn' ) + self.rooms = funcs.resource_add("collection", "Rooms", self.room1'dn', self.room2'dn') time.sleep(1) synchronize_once() @@ -74,7 +74,6 @@ saved = module_resources.save_resource_event(dict(xml=event), resource) i += 1 - def test_001_save_resource_event(self): event = Event() event.set_summary("test") @@ -86,23 +85,22 @@ saved = module_resources.save_resource_event(dict(xml=event), self.room1) self.assertTrue(saved) - def test_002_read_resource_calendar(self): self.purge_mailbox(self.room1'kolabtargetfolder') event = Event() event.set_summary("test") - event.set_start(datetime.datetime(2014,4,1, 12,0,0, tzinfo=pytz.timezone("Europe/London"))) - event.set_end(datetime.datetime(2014,4,1, 14,0,0, tzinfo=pytz.timezone("Europe/London"))) + event.set_start(datetime.datetime(2014, 4, 1, 12, 0, 0, tzinfo=pytz.timezone("Europe/London"))) + event.set_end(datetime.datetime(2014, 4, 1, 14, 0, 0, tzinfo=pytz.timezone("Europe/London"))) saved = module_resources.save_resource_event(dict(xml=event), self.room1) self.assertTrue(saved) uid = event.get_uid() itip = dict( - uid = str(uuid.uuid4()), - sequence = 0, - start = datetime.datetime(2014,4,1, 13,0,0, tzinfo=pytz.timezone("Europe/London")), - end = datetime.datetime(2014,4,1, 14,30,0, tzinfo=pytz.timezone("Europe/London")) + uid=str(uuid.uuid4()), + sequence=0, + start=datetime.datetime(2014, 4, 1, 13, 0, 0, tzinfo=pytz.timezone("Europe/London")), + end=datetime.datetime(2014, 4, 1, 14, 30, 0, tzinfo=pytz.timezone("Europe/London")) ) event.set_uid(itip'uid') @@ -110,12 +108,11 @@ event.set_end(itip'end') itip'xml' = event - res = module_resources.read_resource_calendar(self.room1, itip ) + res = module_resources.read_resource_calendar(self.room1, itip) self.assertEqual(res, 1) self.assertTrue(self.room1'conflict') self.assertIn(uid, self.room1'conflicting_events') - def test_003_read_time(self): self.purge_mailbox(self.room1'kolabtargetfolder') @@ -125,10 +122,10 @@ self.populate_calendar(self.room1, num, date) itip = dict( - uid = str(uuid.uuid4()), - sequence = 0, - start = date, - end = date + datetime.timedelta(minutes=90) + uid=str(uuid.uuid4()), + sequence=0, + start=date, + end=date + datetime.timedelta(minutes=90) ) event = Event() @@ -138,7 +135,7 @@ itip'xml' = event start = time.time() - res = module_resources.read_resource_calendar(self.room1, itip ) + res = module_resources.read_resource_calendar(self.room1, itip) self.assertEqual(res, num) print "\nREAD TIME:", time.time() - start
View file
pykolab-0.8.1.tar.gz/tests/functional/test_wallace/test_007_invitationpolicy.py
Changed
@@ -221,6 +221,7 @@ --=_c8894dbdb8baeedacae836230e3436fd-- """ + class TestWallaceInvitationpolicy(unittest.TestCase): john = None @@ -236,7 +237,7 @@ @classmethod def setup_class(self, *args, **kw): # set language to default - pykolab.translate.setUserLanguage(conf.get('kolab','default_locale')) + pykolab.translate.setUserLanguage(conf.get('kolab', 'default_locale')) self.itip_reply_subject = _('"%(summary)s" has been %(status)s') @@ -251,7 +252,7 @@ 'mailbox': 'user/john.doe@example.org', 'kolabcalendarfolder': 'user/john.doe/Calendar@example.org', 'kolabtasksfolder': 'user/john.doe/Tasks@example.org', - 'kolabinvitationpolicy': 'ACT_UPDATE_AND_NOTIFY','ACT_MANUAL' + 'kolabinvitationpolicy': 'ACT_UPDATE_AND_NOTIFY', 'ACT_MANUAL' } self.jane = { @@ -263,7 +264,7 @@ 'kolabcalendarfolder': 'user/jane.manager/Calendar@example.org', 'kolabtasksfolder': 'user/jane.manager/Tasks@example.org', 'kolabconfidentialcalendar': 'user/jane.manager/Calendar/Confidential@example.org', - 'kolabinvitationpolicy': 'ACT_ACCEPT_IF_NO_CONFLICT','ACT_REJECT_IF_CONFLICT','TASK_ACCEPT','TASK_UPDATE_AND_NOTIFY','ACT_UPDATE' + 'kolabinvitationpolicy': 'ACT_ACCEPT_IF_NO_CONFLICT', 'ACT_REJECT_IF_CONFLICT', 'TASK_ACCEPT', 'TASK_UPDATE_AND_NOTIFY', 'ACT_UPDATE' } self.jack = { @@ -274,7 +275,7 @@ 'mailbox': 'user/jack.tentative@example.org', 'kolabcalendarfolder': 'user/jack.tentative/Calendar@example.org', 'kolabtasksfolder': 'user/jack.tentative/Tasks@example.org', - 'kolabinvitationpolicy': 'ACT_TENTATIVE_IF_NO_CONFLICT','ALL_SAVE_TO_FOLDER','ACT_UPDATE' + 'kolabinvitationpolicy': 'ACT_TENTATIVE_IF_NO_CONFLICT', 'ALL_SAVE_TO_FOLDER', 'ACT_UPDATE' } self.mark = { @@ -285,7 +286,7 @@ 'mailbox': 'user/mark.german@example.org', 'kolabcalendarfolder': 'user/mark.german/Calendar@example.org', 'kolabtasksfolder': 'user/mark.german/Tasks@example.org', - 'kolabinvitationpolicy': 'ACT_ACCEPT','ACT_UPDATE_AND_NOTIFY' + 'kolabinvitationpolicy': 'ACT_ACCEPT', 'ACT_UPDATE_AND_NOTIFY' } self.lucy = { @@ -296,7 +297,7 @@ 'mailbox': 'user/lucy.meyer@example.org', 'kolabcalendarfolder': 'user/lucy.meyer/Calendar@example.org', 'kolabtasksfolder': 'user/lucy.meyer/Tasks@example.org', - 'kolabinvitationpolicy': 'ALL_SAVE_AND_FORWARD','ACT_CANCEL_DELETE_AND_NOTIFY','ACT_UPDATE_AND_NOTIFY' + 'kolabinvitationpolicy': 'ALL_SAVE_AND_FORWARD', 'ACT_CANCEL_DELETE_AND_NOTIFY', 'ACT_UPDATE_AND_NOTIFY' } self.bill = { @@ -307,7 +308,7 @@ 'mailbox': 'user/bill.mayor@example.org', 'kolabcalendarfolder': 'user/bill.mayor/Calendar@example.org', 'kolabtasksfolder': 'user/bill.mayor/Tasks@example.org', - 'kolabinvitationpolicy': 'ALL_SAVE_TO_FOLDER:lucy.meyer@example.org','ALL_REJECT' + 'kolabinvitationpolicy': 'ALL_SAVE_TO_FOLDER:lucy.meyer@example.org', 'ALL_REJECT' } self.external = { @@ -330,7 +331,7 @@ # create confidential calendar folder for jane imap = IMAP() - imap.connect(domain='example.org') # sets self.domain + imap.connect(domain='example.org') # sets self.domain imap.user_mailbox_create_additional_folders(self.jane'mail', { 'Calendar/Confidential': { 'annotations': { @@ -343,7 +344,6 @@ imap.set_acl(imap.folder_quote(self.jane'kolabcalendarfolder'), self.mark'mail', "lrswipkxtecda") imap.disconnect() - def send_message(self, itip_payload, to_addr, from_addr=None, method="REQUEST"): if from_addr is None: from_addr = self.john'mail' @@ -672,12 +672,11 @@ imap.imap.m.expunge() imap.disconnect() - def test_001_invite_accept_udate(self): - start = datetime.datetime(2014,8,13, 10,0,0) + start = datetime.datetime(2014, 8, 13, 10, 0, 0) uid = self.send_itip_invitation(self.jane'mail', start) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }, self.jane'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}, self.jane'mail') self.assertIsInstance(response, email.message.Message) event = self.check_user_calendar_event(self.jane'kolabcalendarfolder', uid) @@ -693,38 +692,35 @@ self.assertEqual(event.get_summary(), "test updated") self.assertEqual(event.get_attendee(self.jane'mail').get_participant_status(), kolabformat.PartAccepted) - # @depends on test_001_invite_user def test_002_invite_conflict_reject(self): - uid = self.send_itip_invitation(self.jane'mail', datetime.datetime(2014,8,13, 11,0,0), summary="test2") + uid = self.send_itip_invitation(self.jane'mail', datetime.datetime(2014, 8, 13, 11, 0, 0), summary="test2") - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test2', 'status':participant_status_label('DECLINED') }, self.jane'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test2', 'status': participant_status_label('DECLINED')}, self.jane'mail') self.assertIsInstance(response, email.message.Message) event = self.check_user_calendar_event(self.jane'kolabcalendarfolder', uid) self.assertIsInstance(event, pykolab.xml.Event) self.assertEqual(event.get_summary(), "test2") - def test_003_invite_accept_tentative(self): self.purge_mailbox(self.john'mailbox') - uid = self.send_itip_invitation(self.jack'mail', datetime.datetime(2014,7,24, 8,0,0)) + uid = self.send_itip_invitation(self.jack'mail', datetime.datetime(2014, 7, 24, 8, 0, 0)) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('TENTATIVE') }, self.jack'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('TENTATIVE')}, self.jack'mail') self.assertIsInstance(response, email.message.Message) - def test_004_copy_to_calendar(self): self.purge_mailbox(self.john'mailbox') - self.send_itip_invitation(self.jack'mail', datetime.datetime(2014,7,29, 8,0,0)) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('TENTATIVE') }, self.jack'mail') + self.send_itip_invitation(self.jack'mail', datetime.datetime(2014, 7, 29, 8, 0, 0)) + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('TENTATIVE')}, self.jack'mail') self.assertIsInstance(response, email.message.Message) # send conflicting request to jack - uid = self.send_itip_invitation(self.jack'mail', datetime.datetime(2014,7,29, 10,0,0), summary="test2") - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test2', 'status':participant_status_label('DECLINED') }, self.jack'mail') + uid = self.send_itip_invitation(self.jack'mail', datetime.datetime(2014, 7, 29, 10, 0, 0), summary="test2") + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test2', 'status': participant_status_label('DECLINED')}, self.jack'mail') self.assertEqual(response, None, "No reply expected") event = self.check_user_calendar_event(self.jack'kolabcalendarfolder', uid) @@ -732,10 +728,9 @@ self.assertEqual(event.get_summary(), "test2") self.assertEqual(event.get_attendee(self.jack'mail').get_participant_status(), kolabformat.PartNeedsAction) - def test_004_copy_to_calendar_and_forward(self): - uid = self.send_itip_invitation(self.lucy'mail', datetime.datetime(2015,2,11, 14,0,0), summary="test forward") - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test forward', 'status':participant_status_label('ACCEPTED') }, self.lucy'mail', self.lucy'mailbox') + uid = self.send_itip_invitation(self.lucy'mail', datetime.datetime(2015, 2, 11, 14, 0, 0), summary="test forward") + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test forward', 'status': participant_status_label('ACCEPTED')}, self.lucy'mail', self.lucy'mailbox') self.assertEqual(response, None, "No reply expected") event = self.check_user_calendar_event(self.lucy'kolabcalendarfolder', uid) @@ -748,18 +743,17 @@ self.assertIsInstance(message, email.message.Message) itips = events_from_message(message) - self.assertEqual(len(itips), 1); - self.assertEqual(itips0'method', 'REQUEST'); - self.assertEqual(itips0'uid', uid); - + self.assertEqual(len(itips), 1) + self.assertEqual(itips0'method', 'REQUEST') + self.assertEqual(itips0'uid', uid) def test_005_invite_rescheduling_accept(self): self.purge_mailbox(self.john'mailbox') - start = datetime.datetime(2014,8,14, 9,0,0, tzinfo=pytz.timezone("Europe/Berlin")) + start = datetime.datetime(2014, 8, 14, 9, 0, 0, tzinfo=pytz.timezone("Europe/Berlin")) uid = self.send_itip_invitation(self.jane'mail', start) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }, self.jane'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}, self.jane'mail') self.assertIsInstance(response, email.message.Message) event = self.check_user_calendar_event(self.jane'kolabcalendarfolder', uid) @@ -769,10 +763,10 @@ self.purge_mailbox(self.john'mailbox') # send update with new date and incremented sequence - new_start = pytz.timezone("Europe/Berlin").localize(datetime.datetime(2014,8,15, 15,0,0)) + new_start = pytz.timezone("Europe/Berlin").localize(datetime.datetime(2014, 8, 15, 15, 0, 0)) self.send_itip_update(self.jane'mail', uid, new_start, summary="test", sequence=1) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }, self.jane'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}, self.jane'mail') self.assertIsInstance(response, email.message.Message) event = self.check_user_calendar_event(self.jane'kolabcalendarfolder', uid) @@ -780,23 +774,22 @@ self.assertEqual(event.get_start(), new_start) self.assertEqual(event.get_sequence(), 1) - def test_005_invite_rescheduling_reject(self): self.purge_mailbox(self.john'mailbox') self.purge_mailbox(self.jack'kolabcalendarfolder') - start = datetime.datetime(2014,8,9, 17,0,0, tzinfo=pytz.timezone("Europe/Berlin")) + start = datetime.datetime(2014, 8, 9, 17, 0, 0, tzinfo=pytz.timezone("Europe/Berlin")) uid = self.send_itip_invitation(self.jack'mail', start) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('TENTATIVE') }, self.jack'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('TENTATIVE')}, self.jack'mail') self.assertIsInstance(response, email.message.Message) # send update with new but conflicting date and incremented sequence - self.create_calendar_event(datetime.datetime(2014,8,10, 10,30,0, tzinfo=pytz.timezone("Europe/Berlin")), user=self.jack) - new_start = pytz.timezone("Europe/Berlin").localize(datetime.datetime(2014,8,10, 9,30,0)) + self.create_calendar_event(datetime.datetime(2014, 8, 10, 10, 30, 0, tzinfo=pytz.timezone("Europe/Berlin")), user=self.jack) + new_start = pytz.timezone("Europe/Berlin").localize(datetime.datetime(2014, 8, 10, 9, 30, 0)) self.send_itip_update(self.jack'mail', uid, new_start, summary="test (updated)", sequence=1) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('DECLINED') }, self.jack'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('DECLINED')}, self.jack'mail') self.assertEqual(response, None) # verify re-scheduled copy in jack's calendar with NEEDS-ACTION @@ -809,11 +802,10 @@ self.assertTrue(attendee.get_rsvp()) self.assertEqual(attendee.get_participant_status(), kolabformat.PartNeedsAction) - def test_006_invitation_reply(self): self.purge_mailbox(self.john'mailbox') - start = datetime.datetime(2014,8,18, 14,30,0, tzinfo=pytz.timezone("Europe/Berlin")) + start = datetime.datetime(2014, 8, 18, 14, 30, 0, tzinfo=pytz.timezone("Europe/Berlin")) uid = self.create_calendar_event(start, user=self.john) event = self.check_user_calendar_event(self.john'kolabcalendarfolder', uid) @@ -836,11 +828,10 @@ self.assertEqual(len(attachments), 1) self.assertEqual(event.get_attachment_data(0), 'This is a text attachment') - def test_006_invitation_reply_delegated(self): self.purge_mailbox(self.john'mailbox') - start = datetime.datetime(2014,8,28, 14,30,0, tzinfo=pytz.timezone("Europe/Berlin")) + start = datetime.datetime(2014, 8, 28, 14, 30, 0, tzinfo=pytz.timezone("Europe/Berlin")) uid = self.create_calendar_event(start, user=self.john) event = self.check_user_calendar_event(self.john'kolabcalendarfolder', uid) @@ -866,13 +857,12 @@ self.assertEqual(len(delegatee.get_delegated_from()), 1) self.assertEqual(delegatee.get_delegated_from(True)0, self.jane'mail') - def test_007_invitation_cancel(self): self.purge_mailbox(self.john'mailbox') uid = self.send_itip_invitation(self.jane'mail', summary="cancelled") - response = self.check_message_received(self.itip_reply_subject % { 'summary':'cancelled', 'status':participant_status_label('ACCEPTED') }, self.jane'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'cancelled', 'status': participant_status_label('ACCEPTED')}, self.jane'mail') self.assertIsInstance(response, email.message.Message) self.send_itip_cancel(self.jane'mail', uid, summary="cancelled") @@ -884,7 +874,6 @@ self.assertEqual(event.get_status(True), 'CANCELLED') self.assertTrue(event.get_transparency()) - def test_007_invitation_cancel_and_delete(self): self.purge_mailbox(self.john'mailbox') @@ -900,11 +889,10 @@ # verify event was removed from the user's calendar self.assertEqual(self.check_user_calendar_event(self.lucy'kolabcalendarfolder', uid), None) - def test_008_inivtation_reply_notify(self): self.purge_mailbox(self.john'mailbox') - start = datetime.datetime(2014,8,12, 16,0,0, tzinfo=pytz.timezone("Europe/Berlin")) + start = datetime.datetime(2014, 8, 12, 16, 0, 0, tzinfo=pytz.timezone("Europe/Berlin")) uid = self.create_calendar_event(start, user=self.john, attendees=self.jane, self.mark, self.jack) # send a reply from jane to john @@ -921,7 +909,7 @@ notification = self.check_message_received(_('"%s" has been updated') % ('test'), self.john'mail') self.assertIsInstance(notification, email.message.Message) - notification_text = str(notification.get_payload()); + notification_text = str(notification.get_payload()) self.assertIn(self.jane'mail', notification_text) self.assertIn(_("PENDING"), notification_text) @@ -934,14 +922,13 @@ notification = self.check_message_received(_('"%s" has been updated') % ('test'), self.john'mail') self.assertIsInstance(notification, email.message.Message) - notification_text = str(notification.get_payload()); + notification_text = str(notification.get_payload()) self.assertNotIn(_("PENDING"), notification_text) - def test_008_notify_translated(self): self.purge_mailbox(self.mark'mailbox') - start = datetime.datetime(2014,8,12, 16,0,0, tzinfo=pytz.timezone("Europe/Berlin")) + start = datetime.datetime(2014, 8, 12, 16, 0, 0, tzinfo=pytz.timezone("Europe/Berlin")) uid = self.create_calendar_event(start, user=self.mark, attendees=self.jane) # send a reply from jane to mark @@ -952,18 +939,17 @@ notification = self.check_message_received(_('"%s" has been updated') % ('test'), self.mark'mail', self.mark'mailbox') self.assertIsInstance(notification, email.message.Message) - notification_text = str(notification.get_payload()); + notification_text = str(notification.get_payload()) self.assertIn(self.jane'mail', notification_text) self.assertIn(participant_status_label("ACCEPTED")+":", notification_text) # reset localization - pykolab.translate.setUserLanguage(conf.get('kolab','default_locale')) - + pykolab.translate.setUserLanguage(conf.get('kolab', 'default_locale')) def test_009_outdated_reply(self): self.purge_mailbox(self.john'mailbox') - start = datetime.datetime(2014,9,2, 11,0,0, tzinfo=pytz.timezone("Europe/Berlin")) + start = datetime.datetime(2014, 9, 2, 11, 0, 0, tzinfo=pytz.timezone("Europe/Berlin")) uid = self.create_calendar_event(start, user=self.john, sequence=2) # send a reply from jane to john @@ -976,11 +962,10 @@ self.assertEqual(event.get_sequence(), 2) self.assertEqual(event.get_attendee(self.jane'mail').get_participant_status(), kolabformat.PartNeedsAction) - def test_010_partstat_update_propagation(self): # ATTENTION: this test requires wallace.invitationpolicy_autoupdate_other_attendees_on_reply to be enabled in config - start = datetime.datetime(2014,8,21, 13,0,0, tzinfo=pytz.timezone("Europe/Berlin")) + start = datetime.datetime(2014, 8, 21, 13, 0, 0, tzinfo=pytz.timezone("Europe/Berlin")) uid = self.create_calendar_event(start, user=self.john, attendees=self.jane, self.jack, self.external) event = self.check_user_calendar_event(self.john'kolabcalendarfolder', uid) @@ -1011,7 +996,7 @@ self.assertEqual(jacks.get_attendee(self.jack'mail').get_participant_status(), kolabformat.PartTentative) # PART 2: create conflicting event in jack's calendar - new_start = datetime.datetime(2014,8,21, 6,0,0, tzinfo=pytz.timezone("Europe/Berlin")) + new_start = datetime.datetime(2014, 8, 21, 6, 0, 0, tzinfo=pytz.timezone("Europe/Berlin")) self.create_calendar_event(new_start, user=self.jack, attendees=, summary="blocker") # re-schedule initial event to new date @@ -1033,12 +1018,11 @@ self.assertEqual(jacks.get_attendee(self.jane'mail').get_participant_status(), kolabformat.PartAccepted) self.assertEqual(jacks.get_attendee(self.jack'mail').get_participant_status(), kolabformat.PartNeedsAction) - def test_011_manual_schedule_auto_update(self): self.purge_mailbox(self.john'mailbox') # create an event in john's calendar as it was manually accepted - start = datetime.datetime(2014,9,2, 11,0,0, tzinfo=pytz.timezone("Europe/Berlin")) + start = datetime.datetime(2014, 9, 2, 11, 0, 0, tzinfo=pytz.timezone("Europe/Berlin")) uid = self.create_calendar_event(start, user=self.jane, sequence=1, folder=self.john'kolabcalendarfolder') # send update with the same sequence: no re-scheduling @@ -1060,9 +1044,8 @@ notification = self.check_message_received(_('"%s" has been updated') % ('old test'), self.jane'mail', mailbox=self.john'mailbox') self.assertEqual(notification, None) - def test_012_confidential_invitation(self): - start = datetime.datetime(2014,9,21, 9,30,0) + start = datetime.datetime(2014, 9, 21, 9, 30, 0) uid = self.send_itip_invitation(self.jane'mail', start, summary='confidential', template=itip_invitation.replace("DESCRIPTION:test", "CLASS:CONFIDENTIAL")) # check event being stored in the folder annotared with event.confidential @@ -1070,10 +1053,9 @@ self.assertIsInstance(event, pykolab.xml.Event) self.assertEqual(event.get_summary(), "confidential") - def test_013_update_shared_folder(self): # create an event organized by Mark (a delegate of Jane) into Jane's calendar - start = datetime.datetime(2015,3,10, 9,30,0, tzinfo=pytz.timezone("Europe/Berlin")) + start = datetime.datetime(2015, 3, 10, 9, 30, 0, tzinfo=pytz.timezone("Europe/Berlin")) uid = self.create_calendar_event(start, user=self.mark, attendees=self.jane, self.john, folder=self.jane'kolabcalendarfolder') event = self.check_user_calendar_event(self.jane'kolabcalendarfolder', uid) @@ -1090,28 +1072,27 @@ def test_014_per_sender_policy(self): # send invitation from john => REJECT - start = datetime.datetime(2015,2,28, 14,0,0) + start = datetime.datetime(2015, 2, 28, 14, 0, 0) uid = self.send_itip_invitation(self.bill'mail', start) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('DECLINED') }, self.bill'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('DECLINED')}, self.bill'mail') self.assertIsInstance(response, email.message.Message) # send invitation from lucy => SAVE - start = datetime.datetime(2015,3,11, 10,0,0) + start = datetime.datetime(2015, 3, 11, 10, 0, 0) templ = itip_invitation.replace("Doe, John", self.lucy'displayname') uid = self.send_itip_invitation(self.bill'mail', start, template=templ, from_addr=self.lucy'mail') event = self.check_user_calendar_event(self.bill'kolabcalendarfolder', uid) self.assertIsInstance(event, pykolab.xml.Event) - def test_015_update_single_occurrence(self): self.purge_mailbox(self.john'mailbox') - start = datetime.datetime(2015,4,2, 14,0,0) + start = datetime.datetime(2015, 4, 2, 14, 0, 0) uid = self.send_itip_invitation(self.jane'mail', start, template=itip_recurring) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }, self.jane'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}, self.jane'mail') self.assertIsInstance(response, email.message.Message) event = self.check_user_calendar_event(self.jane'kolabcalendarfolder', uid) @@ -1131,14 +1112,13 @@ self.assertEqual(exception.get_summary(), "test exception") self.assertEqual(exception.get_attendee(self.jane'mail').get_participant_status(), kolabformat.PartAccepted) - def test_015_reschedule_single_occurrence(self): self.purge_mailbox(self.john'mailbox') - start = datetime.datetime(2015,4,10, 9,0,0) + start = datetime.datetime(2015, 4, 10, 9, 0, 0) uid = self.send_itip_invitation(self.jane'mail', start, template=itip_recurring) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test', 'status':participant_status_label('ACCEPTED') }, self.jane'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test', 'status': participant_status_label('ACCEPTED')}, self.jane'mail') self.assertIsInstance(response, email.message.Message) # send update to a single instance with the same sequence: no re-scheduling @@ -1146,7 +1126,7 @@ exstart = exdate + datetime.timedelta(hours=5) self.send_itip_update(self.jane'mail', uid, exstart, summary="test resceduled", sequence=1, partstat='NEEDS-ACTION', instance=exdate) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test resceduled', 'status':participant_status_label('ACCEPTED') }, self.jane'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test resceduled', 'status': participant_status_label('ACCEPTED')}, self.jane'mail') self.assertIsInstance(response, email.message.Message) time.sleep(10) @@ -1158,7 +1138,7 @@ exstart = exdate + datetime.timedelta(hours=6) self.send_itip_update(self.jane'mail', uid, exstart, summary="test new", sequence=2, partstat='NEEDS-ACTION', instance=exdate) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'test new', 'status':participant_status_label('ACCEPTED') }, self.jane'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'test new', 'status': participant_status_label('ACCEPTED')}, self.jane'mail') self.assertIsInstance(response, email.message.Message) # check for updated excaption @@ -1171,11 +1151,10 @@ self.assertIsInstance(exception, pykolab.xml.Event) self.assertEqual(exception.get_start().strftime('%Y%m%dT%H%M%S'), exstart.strftime('%Y%m%dT%H%M%S')) - def test_016_reply_single_occurrence(self): self.purge_mailbox(self.john'mailbox') - start = datetime.datetime(2015,3,7, 10,0,0, tzinfo=pytz.timezone("Europe/Zurich")) + start = datetime.datetime(2015, 3, 7, 10, 0, 0, tzinfo=pytz.timezone("Europe/Zurich")) uid = self.create_calendar_event(start, attendees=self.jane, self.mark, recurring=True) event = self.check_user_calendar_event(self.john'kolabcalendarfolder', uid) @@ -1220,10 +1199,10 @@ def test_017_cancel_single_occurrence(self): self.purge_mailbox(self.john'mailbox') - start = datetime.datetime(2015,3,20, 19,0,0, tzinfo=pytz.timezone("Europe/Zurich")) + start = datetime.datetime(2015, 3, 20, 19, 0, 0, tzinfo=pytz.timezone("Europe/Zurich")) uid = self.send_itip_invitation(self.jane'mail', summary="recurring", start=start, template=itip_recurring) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'recurring', 'status':participant_status_label('ACCEPTED') }, self.jane'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'recurring', 'status': participant_status_label('ACCEPTED')}, self.jane'mail') self.assertIsInstance(response, email.message.Message) event = self.check_user_calendar_event(self.jane'kolabcalendarfolder', uid) @@ -1244,13 +1223,13 @@ # send a new invitation for the cancelled slot uid = self.send_itip_invitation(self.jane'mail', summary="new booking", start=exdate) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'new booking', 'status':participant_status_label('ACCEPTED') }, self.jane'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'new booking', 'status': participant_status_label('ACCEPTED')}, self.jane'mail') self.assertIsInstance(response, email.message.Message) def test_017_cancel_delete_single_occurrence(self): self.purge_mailbox(self.john'mailbox') - start = datetime.datetime(2015,3,24, 13,0,0, tzinfo=pytz.timezone("Europe/Zurich")) + start = datetime.datetime(2015, 3, 24, 13, 0, 0, tzinfo=pytz.timezone("Europe/Zurich")) uid = self.send_itip_invitation(self.lucy'mail', summary="recurring cancel-delete", start=start, template=itip_recurring) event = self.check_user_calendar_event(self.lucy'kolabcalendarfolder', uid) @@ -1281,11 +1260,11 @@ def test_017_cancel_thisandfuture(self): self.purge_mailbox(self.john'mailbox') - start = datetime.datetime(2015,5,4, 6,30,0) + start = datetime.datetime(2015, 5, 4, 6, 30, 0) uid = self.send_itip_invitation(self.mark'mail', summary="recurring", start=start, template=itip_recurring) pykolab.translate.setUserLanguage(self.mark'preferredlanguage') - response = self.check_message_received(_('"%(summary)s" has been %(status)s') % { 'summary':'recurring', 'status':participant_status_label('ACCEPTED') }, self.mark'mail') + response = self.check_message_received(_('"%(summary)s" has been %(status)s') % {'summary': 'recurring', 'status': participant_status_label('ACCEPTED')}, self.mark'mail') self.assertIsInstance(response, email.message.Message) pykolab.translate.setUserLanguage(conf.get('kolab','default_locale')) @@ -1304,14 +1283,13 @@ self.assertIsInstance(rrule'until', datetime.datetime) self.assertEqual(rrule'until'.strftime('%Y%m%d'), (exdate - datetime.timedelta(days=1)).strftime('%Y%m%d')) - def test_018_invite_individual_occurrences(self): self.purge_mailbox(self.john'mailbox') - start = datetime.datetime(2015,1,30, 17,0,0, tzinfo=pytz.timezone("Europe/Zurich")) + start = datetime.datetime(2015, 1, 30, 17, 0, 0, tzinfo=pytz.timezone("Europe/Zurich")) uid = self.send_itip_invitation(self.jane'mail', summary="single", start=start, instance=start) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'single', 'status':participant_status_label('ACCEPTED') }, self.jane'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'single', 'status': participant_status_label('ACCEPTED')}, self.jane'mail') self.assertIsInstance(response, email.message.Message) self.assertIn("RECURRENCE-ID", str(response)) @@ -1320,10 +1298,10 @@ self.assertIsInstance(event.get_recurrence_id(), datetime.datetime) # send another invitation with the same UID for different RECURRENCE-ID - newstart = datetime.datetime(2015,2,6, 17,0,0, tzinfo=pytz.timezone("Europe/Zurich")) + newstart = datetime.datetime(2015, 2, 6, 17, 0, 0, tzinfo=pytz.timezone("Europe/Zurich")) self.send_itip_invitation(self.jane'mail', summary="single #2", uid=uid, start=newstart, instance=newstart) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'single #2', 'status':participant_status_label('ACCEPTED') }, self.jane'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'single #2', 'status': participant_status_label('ACCEPTED')}, self.jane'mail') self.assertIsInstance(response, email.message.Message) self.assertIn("RECURRENCE-ID", str(response)) @@ -1350,12 +1328,11 @@ exception = event.get_instance(newstart) self.assertEqual(exception.get_summary(), "updated #2") - def test_020_task_assignment_accept(self): - start = datetime.datetime(2014,9,10, 19,0,0) + start = datetime.datetime(2014, 9, 10, 19, 0, 0) uid = self.send_itip_invitation(self.jane'mail', start, summary='work', template=itip_todo) - response = self.check_message_received(self.itip_reply_subject % { 'summary':'work', 'status':participant_status_label('ACCEPTED') }, self.jane'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'work', 'status': participant_status_label('ACCEPTED')}, self.jane'mail') self.assertIsInstance(response, email.message.Message) todo = self.check_user_imap_object(self.jane'kolabtasksfolder', uid, 'task') @@ -1365,7 +1342,7 @@ # send update with the same sequence: no re-scheduling self.send_itip_update(self.jane'mail', uid, start, summary='work updated', template=itip_todo, sequence=0, partstat='ACCEPTED') - response = self.check_message_received(self.itip_reply_subject % { 'summary':'work updated', 'status':participant_status_label('ACCEPTED') }, self.jane'mail') + response = self.check_message_received(self.itip_reply_subject % {'summary': 'work updated', 'status': participant_status_label('ACCEPTED')}, self.jane'mail') self.assertEqual(response, None) time.sleep(10) @@ -1374,11 +1351,10 @@ self.assertEqual(todo.get_summary(), "work updated") self.assertEqual(todo.get_attendee(self.jane'mail').get_participant_status(), kolabformat.PartAccepted) - def test_021_task_assignment_reply(self): self.purge_mailbox(self.john'mailbox') - due = datetime.datetime(2014,9,12, 14,0,0, tzinfo=pytz.timezone("Europe/Berlin")) + due = datetime.datetime(2014, 9, 12, 14, 0, 0, tzinfo=pytz.timezone("Europe/Berlin")) uid = self.create_task_assignment(due, user=self.john) todo = self.check_user_imap_object(self.john'kolabtasksfolder', uid, 'task') @@ -1401,10 +1377,9 @@ notification = self.check_message_received(_('"%s" has been updated') % ('test'), self.john'mail') self.assertIsInstance(notification, email.message.Message) - notification_text = str(notification.get_payload()); + notification_text = str(notification.get_payload()) self.assertIn(participant_status_label(partstat), notification_text) - def test_022_task_cancellation(self): uid = self.send_itip_invitation(self.jane'mail', summary='more work', template=itip_todo) @@ -1420,4 +1395,3 @@ # this should trigger a notification message notification = self.check_message_received(_('"%s" has been cancelled') % ('more work'), self.john'mail', mailbox=self.jane'mailbox') self.assertIsInstance(notification, email.message.Message) -
View file
pykolab-0.8.1.tar.gz/tests/functional/test_wap_client/__init__.py
Changed
@@ -1,5 +1,6 @@ import pykolab + def setup_package(): conf = pykolab.getConf() conf.finalize_conf(fatal=False)
View file
pykolab-0.8.1.tar.gz/tests/functional/test_wap_client/test_001_connect.py
Changed
@@ -3,6 +3,7 @@ import pykolab from pykolab import wap_client + class TestConnect(unittest.TestCase): @classmethod def setup_class(self, *args, **kw): @@ -18,23 +19,23 @@ def test_002_response_ok(self): result = wap_client.request_raw('POST', 'domains.list') - self.assertTrue(result.has_key('status')) - self.assertTrue(result.has_key('result')) + self.assertTrue('status' in result) + self.assertTrue('result' in result) self.assertEqual(result'status', "OK") def test_003_response_fail(self): result = wap_client.request_raw('POST', 'service.method') - self.assertTrue(result.has_key('status')) - self.assertTrue(result.has_key('reason')) - self.assertTrue(result.has_key('code')) + self.assertTrue('status' in result) + self.assertTrue('reason' in result) + self.assertTrue('code' in result) self.assertEqual(result'status', "ERROR") self.assertEqual(result'reason', "Unknown service") self.assertEqual(result'code', 400) def test_004_domains_list(self): result = wap_client.domains_list() - self.assertTrue(result.has_key('count')) - self.assertTrue(result.has_key('list')) + self.assertTrue('count' in result) + self.assertTrue('list' in result) self.assertEqual(result'count', len(result'list')) def test_005_get_domain(self):
View file
pykolab-0.8.1.tar.gz/tests/functional/test_wap_client/test_002_user_add.py
Changed
@@ -8,6 +8,7 @@ conf = pykolab.getConf() + class TestUserAdd(unittest.TestCase): @classmethod @@ -31,7 +32,6 @@ purge_users() def test_001_inbox_created(self): - time.sleep(2) imap = IMAP() imap.connect() @@ -67,14 +67,12 @@ metadata = imap.get_metadata(folder) folder_name = '/'.join(folder.split('/')2:).split('@')0 - if ac_folders.has_key(folder_name): - if ac_foldersfolder_name.has_key('annotations'): - for _annotation in ac_foldersfolder_name'annotations'.keys(): + if folder_name in ac_folders: + if 'annotations' in ac_foldersfolder_name: + for _annotation in ac_foldersfolder_name'annotations': if _annotation.startswith('/private/'): continue _annotation_value = ac_foldersfolder_name'annotations'_annotation - self.assertTrue(metadatametadata.keys().pop().has_key(_annotation)) + self.assertTrue(_annotation in metadatametadata.keys().pop()) self.assertEqual(_annotation_value, metadatametadata.keys().pop()_annotation) - -
View file
pykolab-0.8.1.tar.gz/tests/functional/test_wap_client/test_003_user_add_fr_FR.py
Changed
@@ -10,6 +10,7 @@ conf = pykolab.getConf() + class TestUserAddFrFR(unittest.TestCase): @classmethod def setup_class(self, *args, **kw): @@ -32,7 +33,6 @@ purge_users() def test_001_inbox_created(self): - time.sleep(2) imap = IMAP() imap.connect() @@ -54,5 +54,3 @@ self.assertEqual(result'mail', 'etienne-nicolas.mehul@example.org') self.assertEqual(sorted(result'alias'), 'e.mehul@example.org', 'mehul@example.org') - -
View file
pykolab-0.8.1.tar.gz/tests/functional/test_wap_client/test_004_user_add_es_ES.py
Changed
@@ -10,6 +10,7 @@ conf = pykolab.getConf() + class TestUserAddEsES(unittest.TestCase): @classmethod @@ -33,7 +34,6 @@ purge_users() def test_001_inbox_created(self): - time.sleep(2) imap = IMAP() imap.connect() @@ -54,5 +54,3 @@ self.assertEqual(result'mail', 'alvaro.fuentes@example.org') self.assertEqual(sorted(result'alias'), 'a.fuentes@example.org', 'fuentes@example.org') - -
View file
pykolab-0.8.1.tar.gz/tests/functional/test_wap_client/test_005_user_add_de_CH.py
Changed
@@ -10,6 +10,7 @@ conf = pykolab.getConf() + class TestUserAddDeCH(unittest.TestCase): @classmethod def setup_class(self, *args, **kw): @@ -53,5 +54,3 @@ self.assertEqual(result'mail', 'thomas.bruederli@example.org') self.assertEqual(sorted(result'alias'), 'bruederli@example.org', 't.bruederli@example.org') - -
View file
pykolab-0.8.1.tar.gz/tests/functional/test_wap_client/test_006_form_value_select_options.py
Changed
@@ -6,6 +6,7 @@ conf = pykolab.getConf() + class TestFormValueListOptions(unittest.TestCase): def test_001_list_options_user_preferredlanguage(self): @@ -24,8 +25,7 @@ 'preferredlanguage' ) - self.assertTrue(attribute_values'preferredlanguage'.has_key('default')) - self.assertTrue(attribute_values'preferredlanguage'.has_key('list')) + self.assertTrue('default' in attribute_values'preferredlanguage') + self.assertTrue('list' in attribute_values'preferredlanguage') self.assertTrue(len(attribute_values'preferredlanguage''list') > 1) self.assertTrue(attribute_values'preferredlanguage''default' in attribute_values'preferredlanguage''list') -
View file
pykolab-0.8.1.tar.gz/tests/functional/test_wap_client/test_007_policy_uid.py
Changed
@@ -9,6 +9,7 @@ conf = pykolab.getConf() + class TestPolicyUid(unittest.TestCase): def remove_option(self, section, option): @@ -172,5 +173,3 @@ from tests.functional.purge_users import purge_users purge_users() - -
View file
pykolab-0.8.1.tar.gz/tests/functional/test_wap_client/test_008_resource_add.py
Changed
@@ -10,6 +10,7 @@ conf = pykolab.getConf() + class TestResourceAdd(unittest.TestCase): @classmethod @@ -29,7 +30,7 @@ self.audi = funcs.resource_add("car", "Audi A4") self.passat = funcs.resource_add("car", "VW Passat") self.boxter = funcs.resource_add("car", "Porsche Boxter S") - self.cars = funcs.resource_add("collection", "Company Cars", self.audi'dn', self.passat'dn', self.boxter'dn' ) + self.cars = funcs.resource_add("collection", "Company Cars", self.audi'dn', self.passat'dn', self.boxter'dn') from tests.functional.synchronize import synchronize_once synchronize_once()
View file
pykolab-0.8.1.tar.gz/tests/functional/user_add.py
Changed
@@ -4,17 +4,12 @@ conf = pykolab.getConf() -def user_add(givenname, sn, preferredlanguage='en_US', **kw): - if givenname == None: - raise Exception - - if givenname == '': - raise Exception - if sn == None: +def user_add(givenname, sn, preferredlanguage='en_US', **kw): + if givenname is None or givenname == '': raise Exception - if sn == '': + if sn is None or sn == '': raise Exception user_details = { @@ -51,7 +46,7 @@ attr_details = user_type_info'form_fields'attribute if isinstance(attr_details, dict): - if not attr_details.has_key('optional') or attr_details'optional' == False or user_details.has_key(attribute): + if 'optional' not in attr_details or attr_details'optional' is False or attribute in user_details: paramsattribute = user_detailsattribute elif isinstance(attr_details, list): paramsattribute = user_detailsattribute @@ -59,7 +54,6 @@ fvg_params = params fvg_params'object_type' = 'user' fvg_params'type_id' = user_type_id - fvg_params'attributes' = attr for attr in user_type_info'auto_form_fields'.keys() if not attr in params.keys() + fvg_params'attributes' = attr for attr in user_type_info'auto_form_fields'.keys() if attr not in params result = wap_client.user_add(params) -
View file
pykolab-0.8.1.tar.gz/tests/unit/test-000-imports.py
Changed
@@ -1,5 +1,6 @@ import unittest + class TestImports(unittest.TestCase): def test_pykolab(self): import pykolab
View file
pykolab-0.8.1.tar.gz/tests/unit/test-001-contact_reference.py
Changed
@@ -3,6 +3,7 @@ from pykolab.xml import ContactReference + class TestEventXML(unittest.TestCase): contact_reference = ContactReference("jane@doe.org") @@ -13,7 +14,7 @@ if (type(_value)) == _type: return True else: - raise AssertionError, "%s != %s" % (type(_value), _type) + raise AssertionError("%s != %s" % (type(_value), _type)) def test_001_minimal(self): self.assertIsInstance(self.contact_reference.__str__(), str)
View file
pykolab-0.8.1.tar.gz/tests/unit/test-002-attendee.py
Changed
@@ -5,6 +5,7 @@ from pykolab.xml import Attendee from pykolab.xml import participant_status_label + class TestEventXML(unittest.TestCase): attendee = Attendee("jane@doe.org") @@ -15,7 +16,7 @@ if (type(_value)) == _type: return True else: - raise AssertionError, "%s != %s" % (type(_value), _type) + raise AssertionError("%s != %s" % (type(_value), _type)) def test_001_minimal(self): self.assertIsInstance(self.attendee.__str__(), str) @@ -46,13 +47,13 @@ def test_007_participant_status_map_reverse_lookup(self): # Reverse lookups - self.assertEqual(k for k,v in self.attendee.participant_status_map.iteritems() if v == 00, "NEEDS-ACTION") - self.assertEqual(k for k,v in self.attendee.participant_status_map.iteritems() if v == 10, "ACCEPTED") - self.assertEqual(k for k,v in self.attendee.participant_status_map.iteritems() if v == 20, "DECLINED") - self.assertEqual(k for k,v in self.attendee.participant_status_map.iteritems() if v == 30, "TENTATIVE") - self.assertEqual(k for k,v in self.attendee.participant_status_map.iteritems() if v == 40, "DELEGATED") - self.assertEqual(k for k,v in self.attendee.participant_status_map.iteritems() if v == 50, "IN-PROCESS") - self.assertEqual(k for k,v in self.attendee.participant_status_map.iteritems() if v == 60, "COMPLETED") + self.assertEqual(k for k, v in self.attendee.participant_status_map.iteritems() if v == 00, "NEEDS-ACTION") + self.assertEqual(k for k, v in self.attendee.participant_status_map.iteritems() if v == 10, "ACCEPTED") + self.assertEqual(k for k, v in self.attendee.participant_status_map.iteritems() if v == 20, "DECLINED") + self.assertEqual(k for k, v in self.attendee.participant_status_map.iteritems() if v == 30, "TENTATIVE") + self.assertEqual(k for k, v in self.attendee.participant_status_map.iteritems() if v == 40, "DELEGATED") + self.assertEqual(k for k, v in self.attendee.participant_status_map.iteritems() if v == 50, "IN-PROCESS") + self.assertEqual(k for k, v in self.attendee.participant_status_map.iteritems() if v == 60, "COMPLETED") def test_008_default_rsvp(self): self.assertEqual(self.attendee.get_rsvp(), 0) @@ -69,12 +70,12 @@ self.assertEqual(self.attendee.rsvp_map"FALSE", 0) def test_012_rsvp_map_reverse_lookup_boolean(self): - self.assertEqual(k for k,v in self.attendee.rsvp_map.iteritems() if v == True0, "TRUE") - self.assertEqual(k for k,v in self.attendee.rsvp_map.iteritems() if v == False0, "FALSE") + self.assertEqual(k for k, v in self.attendee.rsvp_map.iteritems() if v is True0, "TRUE") + self.assertEqual(k for k, v in self.attendee.rsvp_map.iteritems() if v is False0, "FALSE") def test_013_rsvp_map_reverse_lookup_integer(self): - self.assertEqual(k for k,v in self.attendee.rsvp_map.iteritems() if v == 10, "TRUE") - self.assertEqual(k for k,v in self.attendee.rsvp_map.iteritems() if v == 00, "FALSE") + self.assertEqual(k for k, v in self.attendee.rsvp_map.iteritems() if v == 10, "TRUE") + self.assertEqual(k for k, v in self.attendee.rsvp_map.iteritems() if v == 00, "FALSE") def test_014_default_role(self): self.assertEqual(self.attendee.get_role(), 0) @@ -89,10 +90,10 @@ self.assertEqual(self.attendee.role_map"NON-PARTICIPANT", 3) def test_017_role_map_reverse_lookup(self): - self.assertEqual(k for k,v in self.attendee.role_map.iteritems() if v == 00, "REQ-PARTICIPANT") - self.assertEqual(k for k,v in self.attendee.role_map.iteritems() if v == 10, "CHAIR") - self.assertEqual(k for k,v in self.attendee.role_map.iteritems() if v == 20, "OPT-PARTICIPANT") - self.assertEqual(k for k,v in self.attendee.role_map.iteritems() if v == 30, "NON-PARTICIPANT") + self.assertEqual(k for k, v in self.attendee.role_map.iteritems() if v == 00, "REQ-PARTICIPANT") + self.assertEqual(k for k, v in self.attendee.role_map.iteritems() if v == 10, "CHAIR") + self.assertEqual(k for k, v in self.attendee.role_map.iteritems() if v == 20, "OPT-PARTICIPANT") + self.assertEqual(k for k, v in self.attendee.role_map.iteritems() if v == 30, "NON-PARTICIPANT") def test_015_cutype_map_length(self): self.assertEqual(len(self.attendee.cutype_map.keys()), 3) @@ -103,9 +104,9 @@ self.assertEqual(self.attendee.cutype_map"RESOURCE", 3) def test_017_cutype_map_reverse_lookup(self): - self.assertEqual(k for k,v in self.attendee.cutype_map.iteritems() if v == 10, "GROUP") - self.assertEqual(k for k,v in self.attendee.cutype_map.iteritems() if v == 20, "INDIVIDUAL") - self.assertEqual(k for k,v in self.attendee.cutype_map.iteritems() if v == 30, "RESOURCE") + self.assertEqual(k for k, v in self.attendee.cutype_map.iteritems() if v == 10, "GROUP") + self.assertEqual(k for k, v in self.attendee.cutype_map.iteritems() if v == 20, "INDIVIDUAL") + self.assertEqual(k for k, v in self.attendee.cutype_map.iteritems() if v == 30, "RESOURCE") def test_018_partstat_label(self): self.assertEqual(participant_status_label('NEEDS-ACTION'), "Needs Action")
View file
pykolab-0.8.1.tar.gz/tests/unit/test-003-event.py
Changed
@@ -89,6 +89,25 @@ END:VEVENT """ +ical_event_rdate = """ +BEGIN:VEVENT +UID:7a35527d-f783-4b58-b404-b1389bd2fc57 +DTSTAMP;VALUE=DATE-TIME:20140407T122311Z +CREATED;VALUE=DATE-TIME:20140407T122245Z +LAST-MODIFIED;VALUE=DATE-TIME:20140407T122311Z +DTSTART;TZID=Europe/Zurich;VALUE=DATE-TIME:20140523T110000 +DURATION:PT1H30M0S +RDATE;TZID=Europe/Zurich;VALUE=DATE-TIME:20140530T110000 +RDATE;TZID=Europe/Zurich;VALUE=DATE-TIME:20140620T110000 +SUMMARY:Summary +LOCATION:Location +DESCRIPTION:Description +SEQUENCE:2 +CLASS:PUBLIC +ORGANIZER;CN=Doe\, John:mailto:john.doe@example.org +END:VEVENT +""" + xml_event = """ <icalendar xmlns="urn:ietf:params:xml:ns:icalendar-2.0"> <vcalendar> @@ -332,6 +351,7 @@ </icalendar> """ + class TestEventXML(unittest.TestCase): event = Event() @@ -353,10 +373,10 @@ if (type(_value)) == _type: return True else: - if not _msg == None: - raise AssertionError, "%s != %s: %r" % (type(_value), _type, _msg) + if _msg is not None: + raise AssertionError("%s != %s: %r" % (type(_value), _type, _msg)) else: - raise AssertionError, "%s != %s" % (type(_value), _type) + raise AssertionError("%s != %s" % (type(_value), _type)) def test_000_no_start_date(self): self.assertRaises(EventIntegrityError, self.event.__str__) @@ -442,8 +462,8 @@ def test_016_start_with_timezone(self): _start = datetime.datetime(2012, 05, 23, 11, 58, 00, tzinfo=pytz.timezone("Europe/Zurich")) _start_utc = _start.astimezone(pytz.utc) - #self.assertEqual(_start.__str__(), "2012-05-23 11:58:00+01:00") - #self.assertEqual(_start_utc.__str__(), "2012-05-23 10:58:00+00:00") + # self.assertEqual(_start.__str__(), "2012-05-23 11:58:00+01:00") + # self.assertEqual(_start_utc.__str__(), "2012-05-23 10:58:00+00:00") self.event.set_start(_start) self.assertIsInstance(_start.tzinfo, datetime.tzinfo) self.assertEqual(_start.tzinfo, pytz.timezone("Europe/Zurich")) @@ -452,7 +472,7 @@ _start = datetime.date(2012, 05, 23) self.assertEqual(_start.__str__(), "2012-05-23") self.event.set_start(_start) - self.assertEqual(hasattr(_start,'tzinfo'), False) + self.assertEqual(hasattr(_start, 'tzinfo'), False) self.assertEqual(self.event.get_start().__str__(), "2012-05-23") def test_018_load_from_ical(self): @@ -501,7 +521,7 @@ self.assertEqual(message'X-Kolab-Type', 'application/x-vnd.kolab.event') parts = p for p in message.walk() - attachments = event.get_attachments(); + attachments = event.get_attachments() self.assertEqual(len(parts), 5) self.assertEqual(parts1.get_content_type(), 'text/plain') @@ -542,11 +562,11 @@ rrule = RecurrenceRule() rrule.set_frequency(kolabformat.RecurrenceRule.Weekly) - rrule.set_byday('2WE','-1SU') + rrule.set_byday('2WE', '-1SU') rrule.setBymonth(2) rrule.set_count(10) - rrule.set_until(datetime.datetime(2014,7,23, 11,0,0, tzinfo=pytz.timezone("Europe/London"))) - self.event.set_recurrence(rrule); + rrule.set_until(datetime.datetime(2014, 7, 23, 11, 0, 0, tzinfo=pytz.timezone("Europe/London"))) + self.event.set_recurrence(rrule) ical = icalendar.Calendar.from_ical(self.event.as_string_itip()) event = ical.walk('VEVENT')0 @@ -560,12 +580,12 @@ self.assertIsInstance(event'recurrence-id'.dt, datetime.datetime) self.assertEqual(event'recurrence-id'.params.get('RANGE'), 'THISANDFUTURE') - self.assertTrue(event.has_key('rrule')) + self.assertTrue('rrule' in event) self.assertEqual(event'rrule''FREQ'0, 'WEEKLY') self.assertEqual(event'rrule''INTERVAL'0, 1) self.assertEqual(event'rrule''COUNT'0, 10) self.assertEqual(event'rrule''BYMONTH'0, 2) - self.assertEqual(event'rrule''BYDAY', '2WE','-1SU') + self.assertEqual(event'rrule''BYDAY', '2WE', '-1SU') self.assertIsInstance(event'rrule''UNTIL'0, datetime.datetime) self.assertEquals(event'rrule''UNTIL'0.tzinfo, pytz.utc) @@ -606,14 +626,13 @@ self.assertEqual(itip_event'attendee'0.params'delegated-to', 'jack@ripper.com') self.assertEqual(itip_event'attendee'1.params'delegated-from', 'jane@doe.org') - def test_020_calendaring_recurrence(self): rrule = kolabformat.RecurrenceRule() rrule.setFrequency(kolabformat.RecurrenceRule.Monthly) rrule.setCount(10) self.event = Event() - self.event.set_recurrence(rrule); + self.event.set_recurrence(rrule) _start = datetime.datetime(2014, 5, 1, 11, 30, 00, tzinfo=pytz.timezone("Europe/London")) self.event.set_start(_start) @@ -643,7 +662,7 @@ # check infinite recurrence rrule = kolabformat.RecurrenceRule() rrule.setFrequency(kolabformat.RecurrenceRule.Monthly) - self.event.set_recurrence(rrule); + self.event.set_recurrence(rrule) self.assertEqual(self.event.get_last_occurrence(), None) self.assertIsInstance(self.event.get_last_occurrence(force=True), datetime.datetime) @@ -659,7 +678,7 @@ # check get_next_occurence() with an infinitely recurring all-day event rrule = kolabformat.RecurrenceRule() rrule.setFrequency(kolabformat.RecurrenceRule.Yearly) - self.event.set_recurrence(rrule); + self.event.set_recurrence(rrule) self.event.set_start(datetime.date(2014, 5, 1)) self.event.set_end(datetime.date(2014, 5, 1)) @@ -701,13 +720,13 @@ self.event = Event() self.event.set_summary('alldays') - self.event.set_recurrence(rrule); + self.event.set_recurrence(rrule) - _start = datetime.date(2015,1,1) + _start = datetime.date(2015, 1, 1) self.event.set_start(_start) self.event.set_end(_start) - exdate = datetime.date(2015,1,5) + exdate = datetime.date(2015, 1, 5) xmlexception = Event(from_string=str(self.event)) xmlexception.set_start(exdate) xmlexception.set_end(exdate) @@ -715,8 +734,8 @@ xmlexception.set_status('CANCELLED') self.event.add_exception(xmlexception) - inst3 = self.event.get_instance(datetime.date(2015,1,3)) - self.assertEqual(inst3.get_start(), datetime.date(2015,1,3)) + inst3 = self.event.get_instance(datetime.date(2015, 1, 3)) + self.assertEqual(inst3.get_start(), datetime.date(2015, 1, 3)) inst5 = self.event.get_instance(exdate) self.assertEqual(inst5.get_status(True), 'CANCELLED') @@ -752,12 +771,12 @@ self.event = Event() self.event.set_summary('singles') - _start = datetime.datetime(2015,3,1, 14,0,0, tzinfo=pytz.timezone("Europe/London")) + _start = datetime.datetime(2015, 3, 1, 14, 0, 0, tzinfo=pytz.timezone("Europe/London")) self.event.set_start(_start) self.event.set_end(_start + datetime.timedelta(hours=1)) self.event.set_recurrence_id(_start) - _start2 = datetime.datetime(2015,3,5, 15,0,0, tzinfo=pytz.timezone("Europe/London")) + _start2 = datetime.datetime(2015, 3, 5, 15, 0, 0, tzinfo=pytz.timezone("Europe/London")) xmlexception = Event(from_string=str(self.event)) xmlexception.set_start(_start2) xmlexception.set_end(_start2 + datetime.timedelta(hours=1)) @@ -785,7 +804,6 @@ self.assertEqual(event.get_status(True), 'CANCELLED') self.assertEqual(event.get_summary(), "singles #1") - def test_022_load_from_xml(self): event = event_from_string(xml_event) self.assertEqual(event.uid, '75c740bb-b3c6-442c-8021-ecbaeb0a025e') @@ -820,10 +838,10 @@ self.assertEqual(str(occurrence.get_recurrence_id()), "2014-08-15 10:00:00+01:00") # set invalid date-only recurrence-id - exception.set_recurrence_id(datetime.date(2014,8,16)) + exception.set_recurrence_id(datetime.date(2014, 8, 16)) event.add_exception(exception) - inst = event.get_next_instance(_recurrence_id); + inst = event.get_next_instance(_recurrence_id) self.assertIsInstance(inst, Event) self.assertIsInstance(inst.get_recurrence_id(), datetime.datetime) @@ -836,7 +854,7 @@ # check attachment MIME parts are kept parts = p for p in message.walk() - attachments = event.get_attachments(); + attachments = event.get_attachments() self.assertEqual(len(parts), 5) self.assertEqual(parts3.get_content_type(), 'image/png') @@ -871,7 +889,7 @@ self.assertIsInstance(data, dict) self.assertIsInstance(data'start', datetime.datetime) - #self.assertIsInstance(data'end', datetime.datetime) + # self.assertIsInstance(data'end', datetime.datetime) self.assertIsInstance(data'created', datetime.datetime) self.assertIsInstance(data'lastmodified-date', datetime.datetime) self.assertEqual(data'uid', '75c740bb-b3c6-442c-8021-ecbaeb0a025e') @@ -929,7 +947,6 @@ self.assertEqual(pa'index', 0) self.assertEqual(pa'new', dict(partstat='DECLINED')) - def test_026_property_to_string(self): data = event_from_string(xml_event).to_dict() self.assertEqual(property_to_string('sequence', data'sequence'), "1") @@ -941,7 +958,6 @@ self.assertEqual(property_to_string('alarm', data'alarm'0), "Display message 2 hour(s) before") self.assertEqual(property_to_string('attach', data'attach'0), "noname.1395223627.5555") - def test_027_merge_attendee_data(self): event = event_from_string(xml_event) @@ -951,7 +967,7 @@ some = event.set_attendee_participant_status("somebody@else.com", 'ACCEPTED') # update jane + add jack - event.update_attendees(jane,jack) + event.update_attendees(jane, jack) self.assertEqual(len(event.get_attendees()), 3) self.assertEqual(event.get_attendee("jane@example.org").get_participant_status(), kolabformat.PartTentative) self.assertEqual(event.get_attendee("somebody@else.com").get_participant_status(), kolabformat.PartAccepted) @@ -964,6 +980,30 @@ self.assertEqual(event.get_attendee("jack@example.org").get_name(), "Jack") self.assertRaises(ValueError, exception.get_attendee, "somebody@else.com") # not addded to exception + def test_028_rdate(self): + event = event_from_ical(ical_event_rdate) + + self.assertTrue(event.is_recurring()) + self.assertEqual(len(event.get_recurrence_dates()), 2) + self.assertIsInstance(event.get_recurrence_dates()0, datetime.datetime) + + rdates = event.get_recurrence_dates() + self.assertEqual(str(rdates0), "2014-05-30 11:00:00+02:00") + self.assertEqual(str(rdates1), "2014-06-20 11:00:00+02:00") + + dt = datetime.datetime(2014, 8, 15, 10, 0, 0, tzinfo=pytz.timezone("Europe/Zurich")) + event.add_recurrence_date(dt) + rdates = event.get_recurrence_dates() + self.assertEqual(str(rdates2), "2014-08-15 10:00:00+02:00") + + itip = event.as_string_itip() + rdates = + for line in itip.split("\n"): + if re.match('^RDATE', line): + rdates.append(line.strip().split(':')1) + self.assertEqual("TZID=Europe/Zurich", line.split(':')0.split(';')1) + + self.assertEqual(rdates, "20140530T110000", "20140620T110000", "20140815T100000") def _find_prop_in_list(self, diff, name): for prop in diff:
View file
pykolab-0.8.1.tar.gz/tests/unit/test-004-icalendar.py
Changed
@@ -2,6 +2,7 @@ import icalendar import unittest + class TestICalendar(unittest.TestCase): def test_001_from_message_recurrence(self): @@ -89,16 +90,15 @@ self.assertTrue(message.is_multipart()) - itip_methods = "REQUEST" + itip_methods = "REQUEST" # Check each part for part in message.walk(): - # The iTip part MUST be Content-Type: text/calendar (RFC 6047, # section 2.4) if part.get_content_type() == "text/calendar": if not part.get_param('method') in itip_methods: - raise Exception, "method not interesting" + raise Exception("method not interesting") # Get the itip_payload itip_payload = part.get_payload(decode=True) @@ -196,16 +196,15 @@ self.assertTrue(message.is_multipart()) - itip_methods = "REQUEST" + itip_methods = "REQUEST" # Check each part for part in message.walk(): - # The iTip part MUST be Content-Type: text/calendar (RFC 6047, # section 2.4) if part.get_content_type() == "text/calendar": if not part.get_param('method') in itip_methods: - raise Exception, "method not interesting" + raise Exception("method not interesting") # Get the itip_payload itip_payload = part.get_payload(decode=True)
View file
pykolab-0.8.1.tar.gz/tests/unit/test-005-timezone.py
Changed
@@ -11,10 +11,11 @@ from pykolab.xml import InvalidEventDateError from pykolab.xml import event_from_ical + class TestTimezone(unittest.TestCase): def test_001_timezone_conflict(self): - #class datetime.timedelta(days, seconds, microseconds, milliseconds, minutes, hours, weeks) + # class datetime.timedelta(days, seconds, microseconds, milliseconds, minutes, hours, weeks) tdelta = datetime.timedelta(0, 0, 0, 0, 0, 1) event_start = datetime.datetime.now(pytz.timezone("UTC")) @@ -37,16 +38,16 @@ london_xml = london.__str__() zurich_xml = zurich.__str__() - #print london_xml - #print zurich_xml + # print london_xml + # print zurich_xml london_itip = london.as_string_itip() zurich_itip = zurich.as_string_itip() del london, zurich - #print london_itip - #print zurich_itip + # print london_itip + # print zurich_itip london_cal = icalendar.Calendar.from_ical(london_itip) london = event_from_ical(london_cal.walk('VEVENT')0.to_ical()) @@ -54,22 +55,21 @@ zurich_cal = icalendar.Calendar.from_ical(zurich_itip) zurich = event_from_ical(zurich_cal.walk('VEVENT')0.to_ical()) - #fp = open(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'event-london1')), 'w') - #fp.write(london_xml) - #fp.close() + # fp = open(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'event-london1')), 'w') + # fp.write(london_xml) + # fp.close() - #fp = open(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'event-london2')), 'w') - #fp.write(london.__str__()) - #fp.close() + # fp = open(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'event-london2')), 'w') + # fp.write(london.__str__()) + # fp.close() - #fp = open(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'event-zurich1')), 'w') - #fp.write(zurich_xml) - #fp.close() + # fp = open(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'event-zurich1')), 'w') + # fp.write(zurich_xml) + # fp.close() - #fp = open(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'event-zurich2')), 'w') - #fp.write(zurich.__str__()) - #fp.close() + # fp = open(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'event-zurich2')), 'w') + # fp.write(zurich.__str__()) + # fp.close() self.assertEqual(london_xml, london.__str__()) self.assertEqual(zurich_xml, zurich.__str__()) -
View file
pykolab-0.8.1.tar.gz/tests/unit/test-006-ldap_psearch.py
Changed
@@ -1,5 +1,6 @@ import unittest + class TestLDAPPsearch(unittest.TestCase): def test_001_import_psearch(self):
View file
pykolab-0.8.1.tar.gz/tests/unit/test-007-ldap_syncrepl.py
Changed
@@ -1,5 +1,6 @@ import unittest + class TestLDAPSyncrepl(unittest.TestCase): def test_001_import_syncrepl(self):
View file
pykolab-0.8.1.tar.gz/tests/unit/test-008-sievelib.py
Changed
@@ -38,6 +38,7 @@ + class TestSievelib(unittest.TestCase): def test_001_import_sievelib(self): @@ -53,4 +54,4 @@ result = sieve_parser.parse(sieve_str) if not result: print "Sieve line: %r" % (sieve_parser.lexer.text.split('\n')(sieve_parser.lexer.text:sieve_parser.lexer.pos.count('\n'))) - raise Exception, "Failed parsing Sieve script #%d: %s" % (i, sieve_parser.error) + raise Exception("Failed parsing Sieve script #%d: %s" % (i, sieve_parser.error))
View file
pykolab-0.8.1.tar.gz/tests/unit/test-009-parse_ldap_uri.py
Changed
@@ -2,6 +2,7 @@ from pykolab import utils + class TestParseLdapUri(unittest.TestCase): def test_001_ldap_uri(self): @@ -13,5 +14,3 @@ ldap_uri = "ldap://localhost:389" result = utils.parse_ldap_uri(ldap_uri) self.assertEqual(result, ("ldap", "localhost", "389", None, None, None, None)) - -
View file
pykolab-0.8.1.tar.gz/tests/unit/test-010-transliterate.py
Changed
@@ -2,6 +2,7 @@ import unittest + class TestTransliteration(unittest.TestCase): def test_001_raw_fr_FR(self): """
View file
pykolab-0.8.1.tar.gz/tests/unit/test-011-itip.py
Changed
@@ -99,7 +99,7 @@ Message-ID: <001a11c2ad84243e0604f3246bae@google.com> Date: Mon, 24 Feb 2014 10:27:28 +0000 Subject: =?ISO-8859-1?Q?Invitation=3A_iTip_from_Apple_=40_Mon_Feb_24=2C_2014_12pm_?= - =?ISO-8859-1?Q?=2D_1pm_=28Tom_=26_T=E4m=29?= + =?ISO-8859-1?Q?=2D_1pm_=28Tom_=26_T=E4m=29?= From: "john.doe" <john.doe@gmail.com> To: <john.sample@example.org> Content-Type: multipart/mixed; boundary=001a11c2ad84243df004f3246bad @@ -321,6 +321,7 @@ if not hasattr(conf, 'defaults'): conf.finalize_conf() + class TestITip(unittest.TestCase): def setUp(self): @@ -330,7 +331,7 @@ self.patch(smtplib.SMTP, "quit", self._mock_nop) self.patch(smtplib.SMTP, "sendmail", self._mock_smtp_sendmail) - self.smtplog = ; + self.smtplog = def _mock_nop(self, domain=None): pass @@ -341,7 +342,6 @@ def _mock_smtp_sendmail(self, from_addr, to_addr, message, mail_options=None, rcpt_options=None): self.smtplog.append((from_addr, to_addr, message)) - def test_001_itip_events_from_message(self): itips1 = itip.events_from_message(message_from_string(itip_multipart)) self.assertEqual(len(itips1), 1, "Multipart iTip message with text/calendar") @@ -375,42 +375,40 @@ self.assertEqual(xml.get_summary(), "Testing Ümlauts") self.assertEqual(xml.get_location(), "Rue the Genève") - def test_002_check_date_conflict(self): - astart = datetime.datetime(2014,7,13, 10,0,0) - aend = astart + datetime.timedelta(hours=2) + astart = datetime.datetime(2014, 7, 13, 10, 0, 0) + aend = astart + datetime.timedelta(hours=2) - bstart = datetime.datetime(2014,7,13, 10,0,0) - bend = astart + datetime.timedelta(hours=1) + bstart = datetime.datetime(2014, 7, 13, 10, 0, 0) + bend = astart + datetime.timedelta(hours=1) self.assertTrue(itip.check_date_conflict(astart, aend, bstart, bend)) - bstart = datetime.datetime(2014,7,13, 11,0,0) - bend = astart + datetime.timedelta(minutes=30) + bstart = datetime.datetime(2014, 7, 13, 11, 0, 0) + bend = astart + datetime.timedelta(minutes=30) self.assertTrue(itip.check_date_conflict(astart, aend, bstart, bend)) - bend = astart + datetime.timedelta(hours=2) + bend = astart + datetime.timedelta(hours=2) self.assertTrue(itip.check_date_conflict(astart, aend, bstart, bend)) - bstart = datetime.datetime(2014,7,13, 12,0,0) - bend = astart + datetime.timedelta(hours=1) + bstart = datetime.datetime(2014, 7, 13, 12, 0, 0) + bend = astart + datetime.timedelta(hours=1) self.assertFalse(itip.check_date_conflict(astart, aend, bstart, bend)) self.assertFalse(itip.check_date_conflict(bstart, bend, astart, aend)) - bstart = datetime.datetime(2014,6,13, 10,0,0) - bend = datetime.datetime(2014,6,14, 12,0,0) + bstart = datetime.datetime(2014, 6, 13, 10, 0, 0) + bend = datetime.datetime(2014, 6, 14, 12, 0, 0) self.assertFalse(itip.check_date_conflict(astart, aend, bstart, bend)) - bstart = datetime.datetime(2014,7,10, 12,0,0) - bend = datetime.datetime(2014,7,14, 14,0,0) + bstart = datetime.datetime(2014, 7, 10, 12, 0, 0) + bend = datetime.datetime(2014, 7, 14, 14, 0, 0) self.assertTrue(itip.check_date_conflict(astart, aend, bstart, bend)) - def test_002_check_event_conflict(self): itip_event = itip.events_from_message(message_from_string(itip_non_multipart))0 event = Event() - event.set_start(datetime.datetime(2012,7,13, 9,30,0, tzinfo=itip_event'start'.tzinfo)) - event.set_end(datetime.datetime(2012,7,13, 10,30,0, tzinfo=itip_event'start'.tzinfo)) + event.set_start(datetime.datetime(2012, 7, 13, 9, 30, 0, tzinfo=itip_event'start'.tzinfo)) + event.set_end(datetime.datetime(2012, 7, 13, 10, 30, 0, tzinfo=itip_event'start'.tzinfo)) self.assertTrue(itip.check_event_conflict(event, itip_event), "Conflicting dates") @@ -418,8 +416,8 @@ self.assertFalse(itip.check_event_conflict(event, itip_event), "No conflict for same UID") allday = Event() - allday.set_start(datetime.date(2012,7,13)) - allday.set_end(datetime.date(2012,7,13)) + allday.set_start(datetime.date(2012, 7, 13)) + allday.set_end(datetime.date(2012, 7, 13)) self.assertTrue(itip.check_event_conflict(allday, itip_event), "Conflicting allday event") @@ -427,8 +425,8 @@ self.assertFalse(itip.check_event_conflict(allday, itip_event), "No conflict if event is set to transparent") event2 = Event() - event2.set_start(datetime.datetime(2012,7,13, 10,0,0, tzinfo=pytz.timezone("US/Central"))) - event2.set_end(datetime.datetime(2012,7,13, 11,0,0, tzinfo=pytz.timezone("US/Central"))) + event2.set_start(datetime.datetime(2012, 7, 13, 10, 0, 0, tzinfo=pytz.timezone("US/Central"))) + event2.set_end(datetime.datetime(2012, 7, 13, 11, 0, 0, tzinfo=pytz.timezone("US/Central"))) self.assertFalse(itip.check_event_conflict(event, itip_event), "No conflict with timezone shift") @@ -437,9 +435,9 @@ rrule.setCount(10) event3 = Event() - event3.set_recurrence(rrule); - event3.set_start(datetime.datetime(2012,6,29, 9,30,0, tzinfo=pytz.utc)) - event3.set_end(datetime.datetime(2012,6,29, 10,30,0, tzinfo=pytz.utc)) + event3.set_recurrence(rrule) + event3.set_start(datetime.datetime(2012, 6, 29, 9, 30, 0, tzinfo=pytz.utc)) + event3.set_end(datetime.datetime(2012, 6, 29, 10, 30, 0, tzinfo=pytz.utc)) self.assertTrue(itip.check_event_conflict(event3, itip_event), "Conflict in (3rd) recurring event instance") @@ -447,9 +445,9 @@ self.assertTrue(itip.check_event_conflict(event3, itip_event), "Conflict in two recurring events") event4 = Event() - event4.set_recurrence(rrule); - event4.set_start(datetime.datetime(2012,7,1, 9,30,0, tzinfo=pytz.utc)) - event4.set_end(datetime.datetime(2012,7,1, 10,30,0, tzinfo=pytz.utc)) + event4.set_recurrence(rrule) + event4.set_start(datetime.datetime(2012, 7, 1, 9, 30, 0, tzinfo=pytz.utc)) + event4.set_end(datetime.datetime(2012, 7, 1, 10, 30, 0, tzinfo=pytz.utc)) self.assertFalse(itip.check_event_conflict(event4, itip_event), "No conflict in two recurring events") itip_event = itip.events_from_message(message_from_string(itip_non_multipart))0 @@ -458,23 +456,23 @@ rrule.setCount(10) event5 = Event() - event5.set_recurrence(rrule); - event5.set_start(datetime.datetime(2012,7,9, 10,0,0, tzinfo=pytz.timezone("Europe/London"))) - event5.set_end(datetime.datetime(2012,7,9, 11,0,0, tzinfo=pytz.timezone("Europe/London"))) + event5.set_recurrence(rrule) + event5.set_start(datetime.datetime(2012, 7, 9, 10, 0, 0, tzinfo=pytz.timezone("Europe/London"))) + event5.set_end(datetime.datetime(2012, 7, 9, 11, 0, 0, tzinfo=pytz.timezone("Europe/London"))) event_xml = str(event5) exception = Event(from_string=event_xml) - exception.set_start(datetime.datetime(2012,7,13, 14,0,0, tzinfo=pytz.timezone("Europe/London"))) - exception.set_end(datetime.datetime(2012,7,13, 16,0,0, tzinfo=pytz.timezone("Europe/London"))) - exception.set_recurrence_id(datetime.datetime(2012,7,13, 10,0,0, tzinfo=pytz.timezone("Europe/London")), False) + exception.set_start(datetime.datetime(2012, 7, 13, 14, 0, 0, tzinfo=pytz.timezone("Europe/London"))) + exception.set_end(datetime.datetime(2012, 7, 13, 16, 0, 0, tzinfo=pytz.timezone("Europe/London"))) + exception.set_recurrence_id(datetime.datetime(2012, 7, 13, 10, 0, 0, tzinfo=pytz.timezone("Europe/London")), False) event5.add_exception(exception) self.assertFalse(itip.check_event_conflict(event5, itip_event), "No conflict with exception date") exception = Event(from_string=event_xml) - exception.set_start(datetime.datetime(2012,7,13, 10,0,0, tzinfo=pytz.timezone("Europe/London"))) - exception.set_end(datetime.datetime(2012,7,13, 11,0,0, tzinfo=pytz.timezone("Europe/London"))) + exception.set_start(datetime.datetime(2012, 7, 13, 10, 0, 0, tzinfo=pytz.timezone("Europe/London"))) + exception.set_end(datetime.datetime(2012, 7, 13, 11, 0, 0, tzinfo=pytz.timezone("Europe/London"))) exception.set_status('CANCELLED') - exception.set_recurrence_id(datetime.datetime(2012,7,13, 10,0,0, tzinfo=pytz.timezone("Europe/London")), False) + exception.set_recurrence_id(datetime.datetime(2012, 7, 13, 10, 0, 0, tzinfo=pytz.timezone("Europe/London")), False) event5.add_exception(exception) self.assertFalse(itip.check_event_conflict(event5, itip_event), "No conflict with cancelled exception") @@ -482,11 +480,11 @@ itip_event = itip.events_from_message(message_from_string(itip_non_multipart))0 event = Event() - event.set_start(datetime.datetime(2012,7,10, 9,30,0, tzinfo=itip_event'start'.tzinfo)) - event.set_end(datetime.datetime(2012,7,10, 10,30,0, tzinfo=itip_event'start'.tzinfo)) + event.set_start(datetime.datetime(2012, 7, 10, 9, 30, 0, tzinfo=itip_event'start'.tzinfo)) + event.set_end(datetime.datetime(2012, 7, 10, 10, 30, 0, tzinfo=itip_event'start'.tzinfo)) event.set_recurrence_id(event.get_start()) - dtstart = datetime.datetime(2012,7,13, 9,30,0, tzinfo=itip_event'start'.tzinfo) + dtstart = datetime.datetime(2012, 7, 13, 9, 30, 0, tzinfo=itip_event'start'.tzinfo) second = Event(from_string=str(event)) second.set_start(dtstart) second.set_end(dtstart + datetime.timedelta(hours=1)) @@ -497,7 +495,7 @@ itip_event = itip.events_from_message(message_from_string(itip_non_multipart))0 - dtstart = datetime.datetime(2012,7,15, 10,0,0, tzinfo=itip_event'start'.tzinfo) + dtstart = datetime.datetime(2012, 7, 15, 10, 0, 0, tzinfo=itip_event'start'.tzinfo) second = Event(from_string=str(itip_event'xml')) second.set_start(dtstart + datetime.timedelta(hours=1)) second.set_end(dtstart + datetime.timedelta(hours=2)) @@ -507,18 +505,17 @@ self.assertEqual(len(itip_event'xml'.get_exceptions()), 1) event = Event() - event.set_start(datetime.datetime(2012,7,11, 9,30,0, tzinfo=itip_event'start'.tzinfo)) - event.set_end(datetime.datetime(2012,7,11, 10,30,0, tzinfo=itip_event'start'.tzinfo)) + event.set_start(datetime.datetime(2012, 7, 11, 9, 30, 0, tzinfo=itip_event'start'.tzinfo)) + event.set_end(datetime.datetime(2012, 7, 11, 10, 30, 0, tzinfo=itip_event'start'.tzinfo)) self.assertFalse(itip.check_event_conflict(event, itip_event), "Conflicting dates (no)") event = Event() - event.set_start(datetime.datetime(2012,7,15, 11,0,0, tzinfo=itip_event'start'.tzinfo)) - event.set_end(datetime.datetime(2012,7,15, 11,30,0, tzinfo=itip_event'start'.tzinfo)) + event.set_start(datetime.datetime(2012, 7, 15, 11, 0, 0, tzinfo=itip_event'start'.tzinfo)) + event.set_end(datetime.datetime(2012, 7, 15, 11, 30, 0, tzinfo=itip_event'start'.tzinfo)) self.assertFalse(itip.check_event_conflict(event, itip_event), "Conflicting dates (exception)") - def test_003_send_reply(self): itip_events = itip.events_from_message(message_from_string(itip_non_multipart)) itip.send_reply("resource-collection-car@example.org", itip_events, "SUMMARY=%(summary)s; STATUS=%(status)s; NAME=%(name)s;") @@ -529,9 +526,9 @@ _accepted = participant_status_label('ACCEPTED') message = message_from_string(self.smtplog02) - self.assertEqual(message.get('Subject'), _("Invitation for %(summary)s was %(status)s") % { 'summary':'test', 'status':_accepted }) + self.assertEqual(message.get('Subject'), _("Invitation for %(summary)s was %(status)s") % {'summary': 'test', 'status': _accepted}) - text = str(message.get_payload(0)); + text = str(message.get_payload(0)) self.assertIn('SUMMARY=3Dtest', text) self.assertIn('STATUS=3D' + _accepted, text)
View file
pykolab-0.8.1.tar.gz/tests/unit/test-011-wallace_resources.py
Changed
@@ -93,6 +93,7 @@ if not hasattr(conf, 'defaults'): conf.finalize_conf() + class TestWallaceResources(unittest.TestCase): def setUp(self): @@ -110,27 +111,27 @@ self.patch(smtplib.SMTP, "quit", self._mock_nop) self.patch(smtplib.SMTP, "sendmail", self._mock_smtp_sendmail) - self.smtplog = ; + self.smtplog = def _mock_nop(self, domain=None): pass def _mock_find_resource(self, address): - if not 'resource' in address: - return None + if 'resource' not in address: + return (prefix, domain) = address.split('@') entry_dn = "cn=" + prefix + ",ou=Resources,dc=" + ",dc=".join(domain.split('.')) - return entry_dn ; + return entry_dn def _mock_get_entry_attributes(self, domain, entry, attributes): (_, uid) = entry.split(',')0.split('=') - return { 'cn': uid, 'mail': uid + "@example.org", '_attrib': attributes } + return {'cn': uid, 'mail': uid + "@example.org", '_attrib': attributes} def _mock_search_entry_by_attribute(self, attr, value, **kw): results = if value == "cn=Room 101,ou=Resources,dc=example,dc=org": - results.append(('cn=Rooms,ou=Resources,dc=example,dc=org', { attr: value, 'owner': 'uid=doe,ou=People,dc=example,dc=org' })) + results.append(('cn=Rooms,ou=Resources,dc=example,dc=org', {attr: value, 'owner': 'uid=doe,ou=People,dc=example,dc=org'})) return results def _mock_smtp_init(self, host=None, port=None, local_hostname=None, timeout=0): @@ -159,45 +160,42 @@ return None - def test_002_resource_record_from_email_address(self): res = module_resources.resource_record_from_email_address("doe@example.org") - self.assertEqual(len(res), 0); + self.assertEqual(len(res), 0) def test_003_resource_records_from_itip_events(self): message = message_from_string(itip_multipart) itips = itip.events_from_message(message) res = module_resources.resource_records_from_itip_events(itips) - self.assertEqual(len(res), 2, "Return resources: %r" % (res)); + self.assertEqual(len(res), 2, "Return resources: %r" % (res)) res = module_resources.resource_records_from_itip_events(itips, message'To') - self.assertEqual(len(res), 1, "Return target resource: %r" % (res)); - self.assertEqual("cn=resource-collection-car,ou=Resources,dc=example,dc=org", res0); - + self.assertEqual(len(res), 1, "Return target resource: %r" % (res)) + self.assertEqual("cn=resource-collection-car,ou=Resources,dc=example,dc=org", res0) def test_004_get_resource_owner(self): - owner1 = module_resources.get_resource_owner({ 'owner': "uid=foo,ou=People,cd=example,dc=org" }) + owner1 = module_resources.get_resource_owner({'owner': "uid=foo,ou=People,cd=example,dc=org"}) self.assertIsInstance(owner1, dict) self.assertEqual("foo@example.org", owner1'mail') self.assertIn("telephoneNumber", owner1'_attrib') - owner2 = module_resources.get_resource_owner({ 'owner': "uid=john,ou=People,cd=example,dc=org", "uid=jane,ou=People,cd=example,dc=org" }) + owner2 = module_resources.get_resource_owner({'owner': "uid=john,ou=People,cd=example,dc=org", "uid=jane,ou=People,cd=example,dc=org"}) self.assertIsInstance(owner2, dict) self.assertEqual("john@example.org", owner2'mail') - owner3 = module_resources.get_resource_owner({ 'dn': "cn=cars,ou=Resources,cd=example,dc=org" }) + owner3 = module_resources.get_resource_owner({'dn': "cn=cars,ou=Resources,cd=example,dc=org"}) self.assertEqual(owner3, None) - owner4 = module_resources.get_resource_owner({ 'dn': "cn=Room 101,ou=Resources,dc=example,dc=org" }) + owner4 = module_resources.get_resource_owner({'dn': "cn=Room 101,ou=Resources,dc=example,dc=org"}) self.assertEqual("doe@example.org", owner4'mail') - def test_005_send_response_accept(self): itip_event = itip.events_from_message(message_from_string(itip_non_multipart)) module_resources.send_response("resource-collection-car@example.org", itip_event) - self.assertEqual(len(self.smtplog), 1); + self.assertEqual(len(self.smtplog), 1) self.assertEqual("resource-collection-car@example.org", self.smtplog00) self.assertEqual("doe@example.org", self.smtplog01) @@ -210,7 +208,6 @@ self.assertIsInstance(ics_part, message.Message) self.assertEqual(ics_part.get_param('method'), "REPLY") - def test_006_send_response_delegate(self): # delegate resource-collection-car@example.org => resource-car-audi-a4@example.org itip_event = itip.events_from_message(message_from_string(itip_non_multipart))0 @@ -219,7 +216,7 @@ module_resources.send_response("resource-collection-car@example.org", itip_event) - self.assertEqual(len(self.smtplog), 2); + self.assertEqual(len(self.smtplog), 2) self.assertEqual("resource-collection-car@example.org", self.smtplog00) self.assertEqual("resource-car-audi-a4@example.org", self.smtplog10) @@ -235,5 +232,3 @@ self.assertIn("ACCEPTED".lower(), response2'subject'.lower(), "Delegation message subject: %r" % (response2'subject')) self.assertEqual(ical2'attendee'.__str__(), "MAILTO:resource-car-audi-a4@example.org") self.assertEqual(ical2'attendee'.params'PARTSTAT', u"ACCEPTED") - -
View file
pykolab-0.8.1.tar.gz/tests/unit/test-012-wallace_invitationpolicy.py
Changed
@@ -69,6 +69,7 @@ if not hasattr(conf, 'defaults'): conf.finalize_conf() + class TestWallaceInvitationpolicy(unittest.TestCase): def setUp(self): @@ -86,7 +87,7 @@ self.patch(smtplib.SMTP, "quit", self._mock_nop) self.patch(smtplib.SMTP, "sendmail", self._mock_smtp_sendmail) - self.smtplog = ; + self.smtplog = def _mock_find_user_dn(self, value, kolabuser=False): (prefix, domain) = value.split('@') @@ -94,7 +95,7 @@ def _mock_get_entry_attributes(self, domain, entry, attributes): (_, uid) = entry.split(',')0.split('=') - return { 'cn': uid, 'mail': uid + "@example.org", '_attrib': attributes } + return {'cn': uid, 'mail': uid + "@example.org", '_attrib': attributes} def _mock_list_domains(self): return {'example.org': 'example.org'} @@ -121,19 +122,19 @@ self.assertEqual("uid=doe,ou=People,dc=example,dc=org", res) def test_003_get_matching_invitation_policy(self): - user = { 'kolabinvitationpolicy': + user = {'kolabinvitationpolicy': 'TASK_REJECT:*', 'EVENT_ACCEPT:example.org', 'EVENT_REJECT:gmail.com', 'ALL_UPDATE:outlook:com', 'ALL_MANUAL:*' - } + } self.assertEqual(MIP.get_matching_invitation_policies(user, 'a@fastmail.net', MIP.COND_TYPE_EVENT), MIP.ACT_MANUAL) self.assertEqual(MIP.get_matching_invitation_policies(user, 'b@example.org', MIP.COND_TYPE_EVENT), MIP.ACT_ACCEPT, MIP.ACT_MANUAL) self.assertEqual(MIP.get_matching_invitation_policies(user, 'c@gmail.com', MIP.COND_TYPE_EVENT), MIP.ACT_REJECT, MIP.ACT_MANUAL) self.assertEqual(MIP.get_matching_invitation_policies(user, 'd@somedomain.net', MIP.COND_TYPE_TASK), MIP.ACT_REJECT, MIP.ACT_MANUAL) - user = { 'kolabinvitationpolicy': 'ALL_SAVE_TO_FOLDER:maya.foo@example.org', 'ACT_REJECT:others' } + user = {'kolabinvitationpolicy': 'ALL_SAVE_TO_FOLDER:maya.foo@example.org', 'ACT_REJECT:others'} self.assertEqual(MIP.get_matching_invitation_policies(user, 'maya.foo@example.org', MIP.COND_TYPE_ALL), MIP.ACT_SAVE_TO_FOLDER) self.assertEqual(MIP.get_matching_invitation_policies(user, 'd@somedomain.net', MIP.COND_TYPE_ALL), MIP.ACT_MANUAL) @@ -152,27 +153,26 @@ # self.assertFalse(os.path.isfile(lock_file)) def test_005_is_auto_reply(self): - all_manual = 'ACT_MANUAL' - accept_none = 'ACT_REJECT' - accept_all = 'ACT_ACCEPT', 'ACT_UPDATE' - accept_cond = 'ACT_ACCEPT_IF_NO_CONFLICT', 'ACT_REJECT_IF_CONFLICT' - accept_some = 'ACT_ACCEPT_IF_NO_CONFLICT', 'ACT_SAVE_TO_CALENDAR:example.org', 'ACT_REJECT_IF_CONFLICT' - accept_avail = 'ACT_ACCEPT_IF_NO_CONFLICT', 'ACT_REJECT_IF_CONFLICT:example.org' - - self.assertFalse( MIP.is_auto_reply({ 'kolabinvitationpolicy':all_manual }, 'user@domain.org', 'event')) - self.assertTrue( MIP.is_auto_reply({ 'kolabinvitationpolicy':accept_none }, 'user@domain.org', 'event')) - self.assertTrue( MIP.is_auto_reply({ 'kolabinvitationpolicy':accept_all }, 'user@domain.com', 'event')) - self.assertTrue( MIP.is_auto_reply({ 'kolabinvitationpolicy':accept_cond }, 'user@domain.com', 'event')) - self.assertTrue( MIP.is_auto_reply({ 'kolabinvitationpolicy':accept_some }, 'user@domain.com', 'event')) - self.assertFalse( MIP.is_auto_reply({ 'kolabinvitationpolicy':accept_some }, 'sam@example.org', 'event')) - self.assertFalse( MIP.is_auto_reply({ 'kolabinvitationpolicy':accept_avail }, 'user@domain.com', 'event')) - self.assertTrue( MIP.is_auto_reply({ 'kolabinvitationpolicy':accept_avail }, 'john@example.org', 'event')) + all_manual = 'ACT_MANUAL' + accept_none = 'ACT_REJECT' + accept_all = 'ACT_ACCEPT', 'ACT_UPDATE' + accept_cond = 'ACT_ACCEPT_IF_NO_CONFLICT', 'ACT_REJECT_IF_CONFLICT' + accept_some = 'ACT_ACCEPT_IF_NO_CONFLICT', 'ACT_SAVE_TO_CALENDAR:example.org', 'ACT_REJECT_IF_CONFLICT' + accept_avail = 'ACT_ACCEPT_IF_NO_CONFLICT', 'ACT_REJECT_IF_CONFLICT:example.org' + + self.assertFalse( MIP.is_auto_reply({'kolabinvitationpolicy': all_manual}, 'user@domain.org', 'event')) + self.assertTrue( MIP.is_auto_reply({'kolabinvitationpolicy': accept_none}, 'user@domain.org', 'event')) + self.assertTrue( MIP.is_auto_reply({'kolabinvitationpolicy': accept_all}, 'user@domain.com', 'event')) + self.assertTrue( MIP.is_auto_reply({'kolabinvitationpolicy': accept_cond}, 'user@domain.com', 'event')) + self.assertTrue( MIP.is_auto_reply({'kolabinvitationpolicy': accept_some}, 'user@domain.com', 'event')) + self.assertFalse( MIP.is_auto_reply({'kolabinvitationpolicy': accept_some}, 'sam@example.org', 'event')) + self.assertFalse( MIP.is_auto_reply({'kolabinvitationpolicy': accept_avail}, 'user@domain.com', 'event')) + self.assertTrue( MIP.is_auto_reply({'kolabinvitationpolicy': accept_avail}, 'john@example.org', 'event')) def test_006_send_update_notification(self): itips = pykolab.itip.events_from_message(message_from_string(itip_multipart.replace('SUMMARY:test', 'SUMMARY:with äöü'))) - MIP.send_update_notification(itips0'xml', { 'mail': 'sender@example.org' }, old=None, reply=True) + MIP.send_update_notification(itips0'xml', {'mail': 'sender@example.org'}, old=None, reply=True) self.assertEqual(len(self.smtplog), 1) self.assertIn("Subject: =?utf-8?", self.smtplog02) self.assertIn("The event 'with =C3=A4=C3=B6=C3=BC' at", self.smtplog02) -
View file
pykolab-0.8.1.tar.gz/tests/unit/test-014-conf-and-raw.py
Changed
@@ -8,6 +8,7 @@ conf = pykolab.getConf() conf.finalize_conf(fatal=False) + class TestConfRaw(unittest.TestCase): config_file = None @@ -25,7 +26,7 @@ def test_001_set(self): password = '$%something' conf.command_set('kolab', 'test_password', password) - + def test_002_get(self): password = conf.get('kolab', 'test_password') self.assertEqual('$%something', password)
View file
pykolab-0.8.1.tar.gz/tests/unit/test-015-translate.py
Changed
@@ -4,6 +4,7 @@ import gettext from pykolab import translate + class TestTranslate(unittest.TestCase): def test_001_default_langs(self):
View file
pykolab-0.8.1.tar.gz/tests/unit/test-016-todo.py
Changed
@@ -188,6 +188,7 @@ </icalendar> """ + class TestTodoXML(unittest.TestCase): todo = Todo() @@ -198,7 +199,7 @@ if (type(_value)) == _type: return True else: - raise AssertionError, "%s != %s" % (type(_value), _type) + raise AssertionError("%s != %s" % (type(_value), _type)) def test_001_minimal(self): self.todo.set_summary("test") @@ -227,7 +228,6 @@ self.assertEqual(todo.get_status(True), "IN-PROCESS") self.assertEqual(todo.get_related_to(), "9F3E68BED4A94DA2A51EE589F7FDC6C8-A4BF5BBB9FEAA271") - def test_020_load_from_ical(self): ical_str = """BEGIN:VCALENDAR VERSION:2.0 @@ -239,12 +239,12 @@ ical = icalendar.Calendar.from_ical(ical_str) vtodo = ical.walk('VTODO')0 - #print vtodo + # print vtodo todo = todo_from_ical(ical.walk('VTODO')0.to_ical()) self.assertEqual(todo.get_summary(), "Sample Task assignment") self.assertIsInstance(todo.get_start(), datetime.datetime) self.assertEqual(todo.get_percentcomplete(), 20) - #print str(todo) + # print str(todo) data = todo.to_dict() self.assertIsInstance(data'rrule', dict) @@ -271,7 +271,6 @@ self.assertIsInstance(vtodo'due'.dt, datetime.datetime) self.assertIsInstance(vtodo'dtstamp'.dt, datetime.datetime) - def test_022_ical_with_attachment(self): todo = todo_from_ical(ical_todo_attachment) @@ -299,6 +298,5 @@ self.assertEqual(len(data'alarm'), 1) self.assertEqual(data'alarm'0'action', 'DISPLAY') - if __name__ == '__main__': - unittest.main() \ No newline at end of file + unittest.main()
View file
pykolab-0.8.1.tar.gz/tests/unit/test-017-diff.py
Changed
@@ -140,8 +140,9 @@ </icalendar> """ + class TestComputeDiff(unittest.TestCase): - + def test_000_order_proplists(self): one = { "uri": "cid:one", @@ -202,6 +203,5 @@ self.assertEqual(diff4'property', 'lastmodified-date') - if __name__ == '__main__': - unittest.main() \ No newline at end of file + unittest.main()
View file
pykolab-0.8.1.tar.gz/tests/unit/test-018-note.py
Changed
@@ -25,7 +25,9 @@ </note> """ + class TestNoteXML(unittest.TestCase): + def assertIsInstance(self, _value, _type): if hasattr(unittest.TestCase, 'assertIsInstance'): return unittest.TestCase.assertIsInstance(self, _value, _type) @@ -33,7 +35,7 @@ if (type(_value)) == _type: return True else: - raise AssertionError, "%s != %s" % (type(_value), _type) + raise AssertionError("%s != %s" % (type(_value), _type)) def test_001_minimal(self): note = Note() @@ -82,7 +84,7 @@ data = note_from_string(xml_note).to_dict() self.assertIsInstance(data, dict) - self.assertTrue(data.has_key('uid')) + self.assertTrue('uid' in data) self.assertIsInstance(data.get('created', None), datetime.datetime) self.assertIsInstance(data.get('lastmodified-date', None), datetime.datetime) self.assertEqual(data.get('summary', None), "Kolab Note") @@ -91,6 +93,5 @@ self.assertEqual(len(data.get('categories', None)), 2) self.assertTrue('<p>This is a HTML note</p>' in data.get('description', None)) - if __name__ == '__main__': unittest.main()
View file
pykolab-0.8.1.tar.gz/tests/unit/test-019-contact.py
Changed
@@ -280,6 +280,7 @@ --=_4ff5155d75dc1328b7f5fe10ddce8d24-- """ + class TestContactXML(unittest.TestCase): contact = Contact() @@ -290,7 +291,7 @@ if (type(_value)) == _type: return True else: - raise AssertionError, "%s != %s" % (type(_value), _type) + raise AssertionError("%s != %s" % (type(_value), _type)) def test_001_minimal(self): self.contact.set_name("test") @@ -346,6 +347,5 @@ self.assertIsInstance(data'photo', dict) self.assertEqual(data'photo''mimetype', 'image/gif') - if __name__ == '__main__': - unittest.main() \ No newline at end of file + unittest.main()
View file
pykolab-0.8.1.tar.gz/tests/unit/test-020-auth_cache.py
Changed
@@ -37,12 +37,9 @@ metadata = MetaData() -## -## Classes -## - DeclarativeBase = declarative_base() + class Entry(DeclarativeBase): __tablename__ = 'entries' @@ -67,6 +64,7 @@ auth_cache.db = db + class TestAuthCache(unittest.TestCase): def test_001_plain_insert(self): auth_cache.set_entry( @@ -113,3 +111,49 @@ result = auth_cache.get_entry('v' + 'e'*512 + 'rylongkey') self.assertEqual(result, 'v' + 'e'*512 + 'rylongvalue') + + def test_006_plain_update(self): + auth_cache.set_entry( + 'somekey', + 'ou=People,dc=example,dc=org2' + ) + + result = auth_cache.get_entry('somekey') + self.assertEqual(result, 'ou=People,dc=example,dc=org2') + + def test_007_plain_encoding_update(self): + auth_cache.set_entry( + 'somekey2', + 'ou=Geschäftsbereich,ou=People,dc=example,dc=org2' + ) + + result = auth_cache.get_entry('somekey2') + self.assertEqual(result, 'ou=Gesch\xc3\xa4ftsbereich,ou=People,dc=example,dc=org2') + + def test_008_unicode_update(self): + auth_cache.set_entry( + 'somekey3', + u'ou=Geschäftsbereich,ou=People,dc=example,dc=org2' + ) + + result = auth_cache.get_entry('somekey3') + self.assertEqual(result, 'ou=Gesch\xc3\xa4ftsbereich,ou=People,dc=example,dc=org2') + + @unittest.skip("Double encoding or decoding") + def test_009_unicode_escape_update(self): + auth_cache.set_entry( + 'somekey4', + u'ou=Gesch\xc3\xa4ftsbereich,ou=People,dc=example,dc=org2' + ) + + result = auth_cache.get_entry('somekey4') + self.assertEqual(result, u'ou=Gesch\xc3\xa4ftsbereich,ou=People,dc=example,dc=org2') + + def test_010_longkey_update(self): + auth_cache.set_entry( + 'v' + 'e'*512 + 'rylongkey', + 'v' + 'e'*512 + 'rylongvalue2' + ) + + result = auth_cache.get_entry('v' + 'e'*512 + 'rylongkey') + self.assertEqual(result, 'v' + 'e'*512 + 'rylongvalue2')
View file
pykolab-0.8.1.tar.gz/tests/unit/test-022-utils.py
Added
@@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- + +import unittest + +from pykolab import utils + + +class TestTranslate(unittest.TestCase): + + def test_001_normalize(self): + attr = {"test1": " trim ", "test2": " trim1 ", " trim2 "} + result = utils.normalize(attr) + + self.assertEqual(result'test1', "trim") + self.assertEqual(result'test2'0, "trim1") + self.assertEqual(result'test2'1, "trim2") + + +if __name__ == '__main__': + unittest.main()
View file
pykolab-0.8.1.tar.gz/tests/unit/test-030-recipientpolicy.py
Added
@@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- + +import unittest + +from pykolab.plugins.recipientpolicy import KolabRecipientpolicy + +policy = KolabRecipientpolicy() + + +class TestRecipientPolicy(unittest.TestCase): + def test_001_primary_mail(self): + """ + The spaces in attributes used for mail generation. + """ + + entry = { + 'surname': ' sn ', + 'givenname': ' gn ', + } + + mail = policy.set_primary_mail( + primary_mail='%(givenname)s.%(surname)s@%(domain)s', + primary_domain='example.org', + entry=entry + ) + + self.assertEqual('gn.sn@example.org', mail) + +if __name__ == '__main__': + unittest.main()
View file
pykolab-0.8.1.tar.gz/ucs/kolab_sieve.py
Changed
@@ -36,7 +36,7 @@ ), '..' ) - ) + sys.path + ) + sys.path sys.stderr = open('/dev/null', 'a') @@ -63,6 +63,7 @@ from pykolab.auth import Auth + def handler(*args, **kw): auth = Auth() auth.connect() @@ -84,7 +85,7 @@ result_attr = conf.get('cyrus-sasl', 'result_attribute') - if not new.has_key(result_attr): + if result_attr not in new: log.error( "Entry %r does not have attribute %r" % ( dn, @@ -97,13 +98,13 @@ # See if the mailserver_attribute exists mailserver_attribute = conf.get('ldap', 'mailserver_attribute').lower() - if mailserver_attribute == None: + if mailserver_attribute is None: log.error("Mail server attribute is not set") # TODO: Perhaps, query for IMAP servers. If there is only one, # we know what to do. return - if new.has_key(mailserver_attribute): + if mailserver_attribute in new: if not newmailserver_attribute == constants.fqdn: log.info( "The mail server for user %r is set, and it is not me (%r)" % ( @@ -119,9 +120,7 @@ conf.plugins.exec_hook( 'sieve_mgmt_refresh', - kw = { - 'user': newresult_attr - } + kw={'user': newresult_attr} ) else: @@ -136,24 +135,21 @@ mailserver_attribute = conf.get('ldap', 'mailserver_attribute').lower() result_attr = conf.get('cyrus-sasl', 'result_attribute').lower() - if mailserver_attribute == None: + if mailserver_attribute is None: log.error("Mail server attribute is not set") # TODO: Perhaps, query for IMAP servers. If there is only one, # we know what to do. return - if new.has_key(mailserver_attribute): + if mailserver_attribute in new: if not newmailserver_attribute == constants.fqdn: log.info("The mail server for user %r is set, and it is not me (%r)" % (dn, newmailserver_attribute)) return conf.plugins.exec_hook( 'sieve_mgmt_refresh', - kw = { - 'user': newresult_attr - } + kw={'user': newresult_attr} ) else: log.info("entry %r changed, but no new or old attributes" % (dn)) -
View file
pykolab-0.8.1.tar.gz/ucs/listener.py
Changed
@@ -36,7 +36,7 @@ ), '..' ) - ) + sys.path + ) + sys.path #sys.stderr = open('/dev/null', 'a') @@ -46,14 +46,14 @@ # The filter has to be composed to make sure only Kolab Groupware # related objects are passed along to this listener module. filter = '(|(objectClass=kolabInetOrgPerson)(objectClass=univentionMailSharedFolder))' -#attributes = '*' +# attributes = '*' import pykolab from pykolab import constants from pykolab import utils log = pykolab.getLogger('pykolab.listener') -#log.remove_stdout_handler() +# log.remove_stdout_handler() log.setLevel(logging.DEBUG) log.debuglevel = 9 @@ -63,6 +63,7 @@ from pykolab.auth import Auth + def handler(*args, **kw): log.info("kolab.handler(args(%d): %r, kw: %r)" % (len(args), args, kw)) @@ -93,11 +94,11 @@ mailserver_attribute = conf.get('ldap', 'mailserver_attribute').lower() - if mailserver_attribute == None: + if mailserver_attribute is None: log.error("Mail server attribute is not set") return - if old.has_key(mailserver_attribute): + if mailserver_attribute in old: log.info("Modified entry %r has mail server attribute %s: %r" % (dn, mailserver_attribute, newmailserver_attribute)) if not oldmailserver_attribute == constants.fqdn: @@ -109,7 +110,7 @@ else: # If old has no mailserver attribute, but new does, we need to create # the user locally. - if new.has_key(mailserver_attribute): + if mailserver_attribute in new: if not newmailserver_attribute == constants.fqdn: log.info("The mail server for user %r is set (in new, not old), but it is not me (%r)" % (dn, newmailserver_attribute)) return @@ -118,11 +119,11 @@ return auth._auth._synchronize_callback( - change_type = 'modify', - previous_dn = None, - change_number = None, - dn = dn, - entry = new + change_type='modify', + previous_dn=None, + change_number=None, + dn=dn, + entry=new ) else: @@ -131,13 +132,13 @@ # See if the mailserver_attribute exists mailserver_attribute = conf.get('ldap', 'mailserver_attribute').lower() - if mailserver_attribute == None: + if mailserver_attribute is None: log.error("Mail server attribute is not set") # TODO: Perhaps, query for IMAP servers. If there is only one, # we know what to do. return - if old.has_key(mailserver_attribute): + if mailserver_attribute in old: log.info("Deleted entry %r has mail server attribute %s: %r" % (dn, mailserver_attribute, oldmailserver_attribute)) if not oldmailserver_attribute == constants.fqdn: @@ -152,11 +153,11 @@ if cfg.is_true('mail/cyrus/mailbox/delete', True): auth._auth._synchronize_callback( - change_type = 'delete', - previous_dn = None, - change_number = None, - dn = dn, - entry = old + change_type='delete', + previous_dn=None, + change_number=None, + dn=dn, + entry=old ) elif isinstance(new, dict) and len(new.keys()) > 0: @@ -166,13 +167,13 @@ # See if the mailserver_attribute exists mailserver_attribute = conf.get('ldap', 'mailserver_attribute').lower() - if mailserver_attribute == None: + if mailserver_attribute is None: log.error("Mail server attribute is not set") # TODO: Perhaps, query for IMAP servers. If there is only one, # we know what to do. return - if new.has_key(mailserver_attribute): + if mailserver_attribute in new: log.info("Added entry %r has mail server attribute %s: %r" % (dn, mailserver_attribute, newmailserver_attribute)) if not newmailserver_attribute == constants.fqdn: @@ -184,15 +185,16 @@ return auth._auth._synchronize_callback( - change_type = 'add', - previous_dn = None, - change_number = None, - dn = dn, - entry = new + change_type='add', + previous_dn=None, + change_number=None, + dn=dn, + entry=new ) else: log.info("entry %r changed, but no new or old attributes" % (dn)) + def initialize(): log.info("kolab.initialize()")
View file
pykolab-0.8.1.tar.gz/wallace.py
Changed
@@ -37,4 +37,3 @@ if __name__ == "__main__": wallace = wallace.WallaceDaemon() wallace.run() -
View file
pykolab-0.8.1.tar.gz/wallace/__init__.py
Changed
@@ -103,6 +103,7 @@ def __init__(self): self.current_connections = 0 self.max_connections = 24 + self.pool = None daemon_group = conf.add_cli_parser_option_group(_("Daemon Options")) @@ -136,7 +137,7 @@ "--threads", dest = "max_threads", action = "store", - default = 24, + default = 4, type = int, help = _("Number of threads to use.") ) @@ -335,8 +336,9 @@ if os.access(conf.pidfile, os.R_OK): os.remove(conf.pidfile) - self.pool.close() - self.pool.join() + if self.pool is not None: + self.pool.close() + self.pool.join() raise SystemExit
View file
pykolab-0.8.1.tar.gz/wallace/module_footer.py
Changed
@@ -22,9 +22,9 @@ import time from email import message_from_file +from email.encoders import encode_quopri import modules - import pykolab from pykolab.translate import _ @@ -40,18 +40,26 @@ def description(): return """Append a footer to messages.""" +def set_part_content(part, content): + # Reset old encoding and use quoted-printable (#5414) + del part'Content-Transfer-Encoding' + part.set_payload(content) + encode_quopri(part) + + return True + def execute(*args, **kw): if not os.path.isdir(mybasepath): os.makedirs(mybasepath) - for stage in 'incoming', 'ACCEPT' : + for stage in 'incoming', 'ACCEPT': if not os.path.isdir(os.path.join(mybasepath, stage)): os.makedirs(os.path.join(mybasepath, stage)) # TODO: Test for correct call. filepath = args0 - if kw.has_key('stage'): + if 'stage' in kw: log.debug(_("Issuing callback after processing to stage %s") % (kw'stage'), level=8) log.debug(_("Testing cb_action_%s()") % (kw'stage'), level=8) if hasattr(modules, 'cb_action_%s' % (kw'stage')): @@ -80,7 +88,7 @@ log.warning(_("No contents configured for footer module")) exec('modules.cb_action_%s(%r, %r)' % ('ACCEPT','footer', filepath)) return - + if os.path.isfile(footer_text_file): footer'plain' = open(footer_text_file, 'r').read() @@ -94,7 +102,7 @@ if footer'plain' == "" and footer'html' == "<p></p>": exec('modules.cb_action_%s(%r, %r)' % ('ACCEPT','footer', filepath)) return - + footer_added = False try: @@ -121,26 +129,22 @@ log.debug("Walking message part: %s; disposition = %r" % (content_type, disposition), level=8) - if not disposition == None: + if disposition is not None: continue if content_type == "text/plain": - content = part.get_payload() + content = part.get_payload(decode=True) content += "\n\n-- \n%s" % (footer'plain') - part.set_payload(content) - footer_added = True - log.debug("Text footer attached.", level=6) + footer_added = set_part_content(part, content) elif content_type == "text/html": - content = part.get_payload() + content = part.get_payload(decode=True) append = "\n<!-- footer appended by Wallace -->\n" + footer'html' if "</body>" in content: - part.set_payload(content.replace("</body>", append + "</body>")) + content = content.replace("</body>", append + "</body>") else: - part.set_payload("<html><body>" + content + append + "</body></html>") - - footer_added = True - log.debug("HTML footer attached.", level=6) + content = "<html><body>" + content + append + "</body></html>" + footer_added = set_part_content(part, content) if footer_added: log.debug("Footer attached.")
View file
pykolab-0.8.1.tar.gz/wallace/module_invitationpolicy.py
Changed
@@ -461,7 +461,10 @@ itip_event'xml'.set_percentcomplete(existing.get_percentcomplete()) if policy & COND_NOTIFY: - send_update_notification(itip_event'xml', receiving_user, existing, False) + sender = itip_event'xml'.get_organizer() + comment = itip_event'xml'.get_comment() + send_update_notification(itip_event'xml', receiving_user, existing, False, + sender, comment) # if RSVP, send an iTip REPLY if rsvp or scheduling_required: @@ -584,7 +587,8 @@ # update the organizer's copy of the object if update_object(existing, receiving_user, master): if policy & COND_NOTIFY: - send_update_notification(existing, receiving_user, existing, True) + send_update_notification(existing, receiving_user, existing, True, + sender_attendee, itip_event'xml'.get_comment()) # update all other attendee's copies if conf.get('wallace','invitationpolicy_autoupdate_other_attendees_on_reply'): @@ -648,7 +652,9 @@ if success: # send cancellation notification if policy & COND_NOTIFY: - send_cancel_notification(existing, receiving_user, remove_object) + sender = itip_event'xml'.get_organizer() + comment = itip_event'xml'.get_comment() + send_cancel_notification(existing, receiving_user, remove_object, sender, comment) return MESSAGE_PROCESSED @@ -675,7 +681,7 @@ local_domains = auth.list_domains() - if not local_domains == None: + if local_domains is not None: local_domains = list(set(local_domains.keys())) if not email_address.split('@')1 in local_domains: @@ -739,7 +745,7 @@ global imap mail_attribute = conf.get('cyrus-sasl', 'result_attribute') - if mail_attribute == None: + if mail_attribute is None: mail_attribute = 'mail' mail_attribute = mail_attribute.lower() @@ -772,7 +778,7 @@ # return cached list if user_rec.has_key('_imap_folders'): - return user_rec'_imap_folders'; + return user_rec'_imap_folders' result = @@ -786,13 +792,13 @@ for folder in folders: # exclude shared and other user's namespace - if not ns_other is None and folder.startswith(ns_other) and user_rec.has_key('_delegated_mailboxes'): + if ns_other is not None and folder.startswith(ns_other) and '_delegated_mailboxes' in user_rec: # allow shared folders from delegators if len(_mailbox for _mailbox in user_rec'_delegated_mailboxes' if folder.startswith(ns_other + _mailbox + '/')) == 0: - continue; + continue # TODO: list shared folders the user has write privileges ? - if not ns_shared is None and len(_ns for _ns in ns_shared if folder.startswith(_ns)) > 0: - continue; + if ns_shared is not None and len(_ns for _ns in ns_shared if folder.startswith(_ns)) > 0: + continue metadata = imap.get_metadata(folder) log.debug(_("IMAP metadata for %r: %r") % (folder, metadata), level=9) @@ -1012,25 +1018,25 @@ """ Append the given object to the user's default calendar/tasklist """ - - # find default calendar folder to save object to if no target folder - # has already been specified. + + # find calendar folder to save object to if not specified if targetfolder is None: targetfolders = list_user_folders(user_rec, object.type) + oc = object.get_classification() - if not targetfolders == None and len(targetfolders) > 0: - targetfolder = targetfolders0 - - if targetfolder is None: - if user_rec.has_key('_default_folder'): - targetfolder = user_rec'_default_folder' - # use *.confidential folder for invitations classified as confidential - if object.get_classification() == kolabformat.ClassConfidential and user_rec.has_key('_confidential_folder'): + # use *.confidential/private folder for confidential/private invitations + if oc == kolabformat.ClassConfidential and user_rec.has_key('_confidential_folder'): targetfolder = user_rec'_confidential_folder' - elif object.get_classification() == kolabformat.ClassPrivate and user_rec.has_key('_private_folder'): + elif oc == kolabformat.ClassPrivate and user_rec.has_key('_private_folder'): targetfolder = user_rec'_private_folder' + # use *.default folder if exists + elif user_rec.has_key('_default_folder'): + targetfolder = user_rec'_default_folder' + # fallback to any existing folder of specified type + elif targetfolders is not None and len(targetfolders) > 0: + targetfolder = targetfolders0 - if targetfolder == None: + if targetfolder is None: log.error(_("Failed to save %s: no target folder found for user %r") % (object.type, user_rec'mail')) return False @@ -1100,7 +1106,7 @@ return False -def send_update_notification(object, receiving_user, old=None, reply=True): +def send_update_notification(object, receiving_user, old=None, reply=True, sender=None, comment=None): """ Send a (consolidated) notification about the current participant status to organizer """ @@ -1109,15 +1115,20 @@ import smtplib from email.MIMEText import MIMEText from email.Utils import formatdate + from email.header import Header + from email import charset # encode unicode strings with quoted-printable - from email import charset charset.add_charset('utf-8', charset.SHORTEST, charset.QP) organizer = object.get_organizer() orgemail = organizer.email() orgname = organizer.name() + itip_comment = None + if sender is not None and not comment == '': + itip_comment = _("%s commented: %s") % (_attendee_name(sender), comment) + if reply: log.debug(_("Compose participation status summary for %s %r to user %r") % ( object.type, object.uid, receiving_user'mail' @@ -1125,7 +1136,9 @@ auto_replies_expected = 0 auto_replies_received = 0 - partstats = { 'ACCEPTED':, 'TENTATIVE':, 'DECLINED':, 'DELEGATED':, 'IN-PROCESS':, 'COMPLETED':, 'PENDING': } + is_manual_reply = True + partstats = {'ACCEPTED': , 'TENTATIVE': , 'DECLINED': , 'DELEGATED': , 'IN-PROCESS': , 'COMPLETED': , 'PENDING': } + for attendee in object.get_attendees(): parstat = attendee.get_participant_status(True) if partstats.has_key(parstat): @@ -1150,19 +1163,33 @@ if not parstat == 'NEEDS-ACTION': auto_replies_received += 1 + if sender is not None and sender.get_email() == attendee.get_email(): + is_manual_reply = False + # skip notification until we got replies from all automatically responding attendees - if auto_replies_received < auto_replies_expected: + if not is_manual_reply and auto_replies_received < auto_replies_expected: log.debug(_("Waiting for more automated replies (got %d of %d); skipping notification") % ( auto_replies_received, auto_replies_expected ), level=8) return + # build notification message body roundup = '' + + if itip_comment is not None: + roundup += "\n" + itip_comment + for status,attendees in partstats.iteritems(): if len(attendees) > 0: - roundup += "\n" + participant_status_label(status) + ":\n" + "\n".join(attendees) + "\n" + roundup += "\n" + participant_status_label(status) + ":\n\t" + "\n\t".join(attendees) + "\n" else: - roundup = "\n" + _("Changes submitted by %s have been automatically applied.") % (orgname if orgname else orgemail) + # build notification message body + roundup = '' + + if itip_comment is not None: + roundup += "\n" + itip_comment + + roundup += "\n" + _("Changes submitted by %s have been automatically applied.") % (orgname if orgname else orgemail) # list properties changed from previous version if old: @@ -1205,7 +1232,8 @@ msg'To' = receiving_user'mail' msg'Date' = formatdate(localtime=True) msg'Subject' = utils.str2unicode(_('"%s" has been updated') % (object.get_summary())) - msg'From' = utils.str2unicode('"%s" <%s>' % (orgname, orgemail) if orgname else orgemail) + msg'From' = Header(utils.str2unicode('%s' % orgname) if orgname else '') + msg'From'.append("<%s>" % orgemail) smtp = smtplib.SMTP("localhost", 10027) @@ -1230,16 +1258,17 @@ return success -def send_cancel_notification(object, receiving_user, deleted=False): +def send_cancel_notification(object, receiving_user, deleted=False, sender=None, comment=None): """ Send a notification about event/task cancellation """ import smtplib from email.MIMEText import MIMEText from email.Utils import formatdate + from email.header import Header + from email import charset # encode unicode strings with quoted-printable - from email import charset charset.add_charset('utf-8', charset.SHORTEST, charset.QP) log.debug(_("Send cancellation notification for %s %r to user %r") % ( @@ -1257,9 +1286,9 @@ 'organizer': orgname if orgname else orgemail } if deleted: - message_text += " " + _("The copy in your tasklist as been removed accordingly.") + message_text += " " + _("The copy in your tasklist has been removed accordingly.") else: - message_text += " " + _("The copy in your tasklist as been marked as cancelled accordingly.") + message_text += " " + _("The copy in your tasklist has been marked as cancelled accordingly.") else: message_text = _("The event '%(summary)s' at %(start)s has been cancelled by %(organizer)s.") % { 'summary': object.get_summary(), @@ -1267,9 +1296,15 @@ 'organizer': orgname if orgname else orgemail } if deleted: - message_text += " " + _("The copy in your calendar as been removed accordingly.") + message_text += " " + _("The copy in your calendar has been removed accordingly.") else: - message_text += " " + _("The copy in your calendar as been marked as cancelled accordingly.") + message_text += " " + _("The copy in your calendar has been marked as cancelled accordingly.") + + if sender is not None and not comment == '': + message_text += "\n" + _("%s commented: %s") % (_attendee_name(sender), comment) + + if object.get_recurrence_id(): + message_text += "\n" + _("NOTE: This cancellation only refers to this single occurrence!") message_text += "\n\n" + _("*** This is an automated message. Please do not reply. ***") @@ -1279,7 +1314,8 @@ msg'To' = receiving_user'mail' msg'Date' = formatdate(localtime=True) msg'Subject' = utils.str2unicode(_('"%s" has been cancelled') % (object.get_summary())) - msg'From' = utils.str2unicode('"%s" <%s>' % (orgname, orgemail) if orgname else orgemail) + msg'From' = Header(utils.str2unicode('%s' % orgname) if orgname else '') + msg'From'.append("<%s>" % orgemail) smtp = smtplib.SMTP("localhost", 10027) @@ -1347,7 +1383,7 @@ break # copy all attendees from master object (covers additions and removals) - new_attendees = ; + new_attendees = for a in object.get_attendees(): # keep my own entry intact if attendee_entry is not None and attendee_entry.get_email() == a.get_email(): @@ -1378,3 +1414,19 @@ return _("%(name)s has %(status)s your assignment for %(summary)s.") + footer else: return _("%(name)s has %(status)s your invitation for %(summary)s.") + footer + + +def _attendee_name(attendee): + # attendee here can be Attendee or ContactReference + try: + name = attendee.get_name() + except Exception: + name = attendee.name() + + if name == '': + try: + name = attendee.get_email() + except Exception: + name = attendee.email() + + return name
View file
pykolab-0.8.1.tar.gz/wallace/modules.py
Changed
@@ -20,6 +20,7 @@ import os import sys import time +import traceback from email import message_from_string from email.message import Message @@ -113,7 +114,11 @@ log.error(_("No such module %r in modules %r (2).") %(name, modules)) sys.exit(1) - return modulesname'function'(*args, **kw) + try: + return modulesname'function'(*args, **kw) + except Exception, errmsg: + log.error(_("Unknown error occurred; %r") % (errmsg)) + log.error("%r" % (traceback.format_exc())) def heartbeat(name, *args, **kw): if not modules.has_key(name):
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.