Projects
Kolab:3.4
bonnie
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
Expand all
Collapse all
Changes of Revision 19
View file
bonnie-0.1.tar.gz/INSTALL
Changed
@@ -61,3 +61,4 @@ #. ./worker.py & +#. ./collector.py &
View file
bonnie-0.1.tar.gz/bonnie/__init__.py
Changed
@@ -23,6 +23,8 @@ from bonnie.logger import Logger logging.setLoggerClass(Logger) +API_VERSION = 1 + def getLogger(name): """ Return the correct logger class. @@ -43,5 +45,3 @@ return conf - -
View file
bonnie-0.1.tar.gz/bonnie/broker/brokers/zmq_broker/__init__.py
Changed
@@ -31,6 +31,7 @@ import bonnie conf = bonnie.getConf() +log = bonnie.getLogger('broker.ZMQBroker') from job import Job from collector import Collector @@ -47,7 +48,7 @@ callback({ '_all': self.run }) def collector_add(self, _collector_id, _state): - print "adding collector", _collector_id + log.debug("Adding collector %s" % (_collector_id), level=5) collector = Collector(_collector_id, _state) self.collectors[_collector_id] = collector @@ -89,7 +90,7 @@ if not job.uuid in [x.uuid for x in self.worker_jobs]: self.worker_jobs.append(job) - print "new job: %s" % (job.uuid) + log.debug("New worker job: %s; client=%s, collector=%s" % (job.uuid, client_id, collector_id), level=8) def worker_job_allocate(self, _worker_id): """ @@ -113,7 +114,7 @@ def worker_job_done(self, _job_uuid): for job in [x for x in self.worker_jobs if x.uuid == _job_uuid]: del self.worker_jobs[self.worker_jobs.index(job)] - print "done job", _job_uuid + log.debug("Worker job done: %s;" % (_job_uuid), level=8) def worker_job_free(self, _job_uuid): for job in [x for x in self.worker_jobs if x.uuid == _job_uuid]: @@ -125,7 +126,7 @@ for job in [x for x in self.worker_jobs if x.uuid == _job_uuid]: self.worker_router.send_multipart([_worker_id, b"JOB", _job_uuid, job.notification]) - print "sent job", _job_uuid, "to worker", _worker_id + log.debug("Sent job %s to worker %s;" % (_job_uuid, _worker_id), level=8) def worker_jobs_with_status(self, _state): return [x for x in self.worker_jobs if x.state == _state] @@ -134,7 +135,7 @@ return [x for x in self.worker_jobs if x.worker == self.workers[_worker_id]] def worker_add(self, _worker_id, _state): - print "adding worker", _worker_id + log.debug("Adding worker %s (%s)" % (_worker_id, _state), level=5) worker = Worker(_worker_id, _state) self.workers[_worker_id] = worker @@ -163,22 +164,24 @@ self.worker_job_free(job) for worker in delete_workers: - print "deleting worker", worker + log.debug("Deleting worker %s" % (worker), level=5) del self.workers[worker] def workers_with_status(self, _state): return [worker_id for worker_id, worker in self.workers.iteritems() if worker.state == _state] def run(self): + log.info("Starting") + context = zmq.Context() - client_router_bind_address = conf.get('broker', 'zmq_client_router_bind_address') + dealer_router_bind_address = conf.get('broker', 'zmq_dealer_router_bind_address') - if client_router_bind_address == None: - client_router_bind_address = "tcp://*:5570" + if dealer_router_bind_address == None: + dealer_router_bind_address = "tcp://*:5570" - client_router = context.socket(zmq.ROUTER) - client_router.bind(client_router_bind_address) + dealer_router = context.socket(zmq.ROUTER) + dealer_router.bind(dealer_router_bind_address) collector_router_bind_address = conf.get('broker', 'zmq_collector_router_bind_address') @@ -205,7 +208,7 @@ self.worker_router.bind(worker_router_bind_address) poller = zmq.Poller() - poller.register(client_router, zmq.POLLIN) + poller.register(dealer_router, zmq.POLLIN) poller.register(self.collector_router, zmq.POLLIN) poller.register(self.worker_router, zmq.POLLIN) poller.register(self.controller, zmq.POLLIN) @@ -217,35 +220,36 @@ if self.controller in sockets: if sockets[self.controller] == zmq.POLLIN: _message = self.controller.recv_multipart() - print _message - _worker_id = _message[0] + log.debug("Controller message: %r" % (_message), level=9) if _message[1] == b"STATE": + _worker_id = _message[0] _state = _message[2] self.worker_set_status(_worker_id, _state) if _message[1] == b"DONE": self.worker_job_done(_message[2]) - if _message[1] == b"RETRIEVE": + if _message[1] in [b"FETCH", b"HEADER", b"GETMETADATA", b"GETACL"]: _job_uuid = _message[2] - self.transit_job_collect(_job_uuid) + self.transit_job_collect(_job_uuid, _message[1]) + + if dealer_router in sockets: + if sockets[dealer_router] == zmq.POLLIN: + _message = dealer_router.recv_multipart() + log.debug("Dealer message: %r" % (_message), level=9) - if client_router in sockets: - if sockets[client_router] == zmq.POLLIN: - _message = client_router.recv_multipart() - print _message _client_id = _message[0] _notification = _message[1] _collector_id = _client_id.replace('Dealer', 'Collector') self.worker_job_add(_notification, client_id=_client_id, collector_id=_collector_id) - client_router.send_multipart([_message[0], b"ACK"]) + dealer_router.send_multipart([_message[0], b"ACK"]) if self.collector_router in sockets: if sockets[self.collector_router] == zmq.POLLIN: _message = self.collector_router.recv_multipart() - print _message + log.debug("Collector message: %r" % (_message), level=9) if _message[1] == b"STATE": _collector_id = _message[0] @@ -261,7 +265,8 @@ if self.worker_router in sockets: if sockets[self.worker_router] == zmq.POLLIN: _message = self.worker_router.recv_multipart() - print _message + log.debug("Worker message: %r" % (_message), level=9) + _worker_id = _message[0] _command = _message[1] _job_uuid = _message[2] @@ -283,18 +288,19 @@ pending_jobs = self.collect_jobs_with_status(b"PENDING", collector_id=collector) if len(pending_jobs) > 0: job = self.collect_job_allocate(collector) - self.collector_router.send_multipart([job.collector_id, b"TAKE", job.uuid, job.notification]) + self.collector_router.send_multipart([job.collector_id, job.command, job.uuid, job.notification]) - client_router.close() + dealer_router.close() self.controller.close() self.collector_router.close() self.worker_router.close() context.term() - def transit_job_collect(self, _job_uuid): + def transit_job_collect(self, _job_uuid, _command): for job in [x for x in self.worker_jobs if x.uuid == _job_uuid]: job.set_state(b"PENDING") + job.set_command(_command) self.collect_jobs.append(job) del self.worker_jobs[self.worker_jobs.index(job)]
View file
bonnie-0.1.tar.gz/bonnie/broker/brokers/zmq_broker/job.py
Changed
@@ -38,6 +38,7 @@ self.client_id = client_id self.collector_id = collector_id self.timestamp = time.time() + self.command = None if self.client_id == None: if self.collector_id == None: @@ -52,3 +53,6 @@ def set_worker(self, worker): self.worker = worker + + def set_command(self, cmd): + self.command = cmd
View file
bonnie-0.1.tar.gz/bonnie/collector/__init__.py
Changed
@@ -32,6 +32,7 @@ import bonnie conf = bonnie.getConf() +log = bonnie.getLogger('collector') class BonnieCollector(object): input_interests = {} @@ -57,6 +58,7 @@ messageContents = {} notification = json.loads(notification) + log.debug("FETCH for %r" % (notification), level=9) split_uri = urlparse.urlsplit(notification['uri']) @@ -77,19 +79,18 @@ # Second, .path == 'Calendar/Personal%20Calendar;UIDVALIDITY=$x[/;UID=$y] # Take everything before the first ';' (but only actually take everything # before the first ';' in the next step, here we still have use for it). + # TODO: parse the path/query parameters into a dict path_part = path_part.split(';') - # Use or abuse the length of path_parts at this moment to see if we have - # a message UID. - if len(path_part) == 3: + if notification.has_key('uidset'): + message_uids = expand_uidset(notification['uidset']) + elif notification.has_key('vnd.cmu.oldUidset'): + message_uids = expand_uidset(notification['vnd.cmu.oldUidset']) + elif len(path_part) == 3: + # Use or abuse the length of path_parts at this moment to extract the message UID message_uids = [ path_part[2].split('=')[1] ] - else: - if notification.has_key('uidset'): - message_uids = expand_uidset(notification['uidset']) - print message_uids - if notification.has_key('vnd.cmu.oldUidset'): - message_uids = expand_uidset(notification['vnd.cmu.oldUidset']) - print message_uids + notification['uidset'] = message_uids + # Third, .path == 'Calendar/Personal%20Calendar # Decode the url encoding @@ -123,7 +124,7 @@ ["/usr/lib/cyrus-imapd/mbpath", folder_path] ).strip() - print "using mailbox path: %r" % (mailbox_path) + log.debug("Using mailbox path: %r" % (mailbox_path), level=8) else: # Do it the old-fashioned way @@ -136,7 +137,7 @@ (stdout, stderr) = p1.communicate() mailbox_path = stdout.strip() - print "using mailbox path: %r" % (mailbox_path) + log.debug("Using mailbox path: %r" % (mailbox_path), level=8) # TODO: Assumption #4 is we use altnamespace if not folder_name == "INBOX": @@ -147,30 +148,49 @@ for message_uid in message_uids: message_file_path = "%s/%s." % (mailbox_path, message_uid) - print message_file_path + log.debug("Open message file: %r" % (message_file_path), level=8) if os.access(message_file_path, os.R_OK): fp = open(message_file_path, 'r') data = fp.read() fp.close() - print data - + # TODO: parse mime message and return a dict messageContents[message_uid] = data notification['messageContent'] = messageContents - notification = json.dumps(notification) + return json.dumps(notification) - return notification + def retrieve_headers_for_messages(self, notification): + messageHeaders = {} + + notification = json.loads(notification) + log.debug("HEADERS for %r" % (notification), level=9) + + # TODO: resovle message file system path + # TODO: read file line by line until we reach an empty line + # TODO: decode quoted-pritable or base64 encoded headers + + notification['messageHeaders'] = messageHeaders - def event_notification(self, notification): + return json.dumps(notification) + + def execute(self, command, notification): """ Our goal is to collect whatever message contents for the messages referenced in the notification. """ - print "going to run with", notification - notification = self.retrieve_contents_for_messages(notification) + log.debug("Executing collection command %s" % (command), level=8) + if command == "FETCH": + notification = self.retrieve_contents_for_messages(notification) + elif command == "HEADER": + notification = self.retrieve_headers_for_messages(notification) + elif command == "GETMETADATA": + pass + elif command == "GETACL": + pass + return notification #self.output(notification) @@ -181,5 +201,5 @@ input_modules = conf.get('collector', 'input_modules') for _input in self.input_modules.keys(): if _input.name() == input_modules: - _input.run(callback=self.event_notification) + _input.run(callback=self.execute)
View file
bonnie-0.1.tar.gz/bonnie/collector/inputs/zmq_input.py
Changed
@@ -34,6 +34,7 @@ import bonnie conf = bonnie.getConf() +log = bonnie.getLogger('collector.ZMQInput') class ZMQInput(object): state = b"READY" @@ -61,12 +62,12 @@ pass def report_state(self): - print "reporting state", self.state + log.debug("[%s] Reporting state %s" % (self.identity, self.state), level=9) self.collector.send_multipart([b"STATE", self.state]) self.report_timestamp = time.time() def run(self, callback=None): - print "%s starting" % (self.identity) + log.info("[%s] starting", self.identity) self.report_state() @@ -83,7 +84,7 @@ if _message[0] == b"STATE": self.report_state() - if _message[0] == b"TAKE": + else: if not self.state == b"READY": self.report_state() @@ -92,7 +93,7 @@ _notification = _message[2] if not callback == None: - result = callback(_notification) + result = callback(_message[0], _notification) self.collector.send_multipart([b"DONE", _job_uuid, result])
View file
bonnie-0.1.tar.gz/bonnie/dealer/outputs/zmq_output.py
Changed
@@ -26,6 +26,7 @@ import bonnie conf = bonnie.getConf() +log = bonnie.getLogger('dealer.ZMQOutput') class ZMQOutput(object): def __init__(self, *args, **kw): @@ -50,6 +51,7 @@ return [ 'MailboxCreate' ] def run(self, notification): + log.debug("[%s] Notification received: %r" % (self.dealer.identity, notification), level=9) self.dealer.send(notification) received_reply = False @@ -58,6 +60,7 @@ if self.dealer in sockets: if sockets[self.dealer] == zmq.POLLIN: _reply = self.dealer.recv_multipart() + log.debug("[%s] Reply: %r" % (self.dealer.identity, _reply), level=9) if _reply[0] == b"ACK": received_reply = True
View file
bonnie-0.1.tar.gz/bonnie/worker/__init__.py
Changed
@@ -98,7 +98,6 @@ if self.handler_interests.has_key(event): for interest in self.handler_interests[event]: -# print self.handler_interests[event] (notification, _jobs) = interest['callback'](notification=notification) jobs.extend(_jobs) @@ -111,8 +110,6 @@ (notification, _jobs) = interest['callback'](notification=notification) jobs.extend(_jobs) - print jobs - return notification, jobs def register_handler(self, interests={}):
View file
bonnie-0.1.tar.gz/bonnie/worker/handlers/messageappend.py
Changed
@@ -43,5 +43,5 @@ return (notification, []) print "adding a job for messageappend" - jobs = [ b"RETRIEVE" ] + jobs = [ b"FETCH" ] return (notification, jobs)
View file
bonnie-0.1.tar.gz/bonnie/worker/inputs/zmq_input.py
Changed
@@ -33,6 +33,7 @@ import bonnie conf = bonnie.getConf() +log = bonnie.getLogger('worker.ZMQInput') class ZMQInput(object): def __init__(self, *args, **kw): @@ -78,7 +79,7 @@ self.report_timestamp = time.time() def run(self, callback=None): - print "%s starting" % (self.identity) + log.info("[%s] starting", self.identity) self.report_state() @@ -91,7 +92,7 @@ if self.controller in sockets: if sockets[self.controller] == zmq.POLLIN: _message = self.controller.recv_multipart() - print _message + log.debug("[%s] Controller message: %r" % (self.identity, _message), level=9) if _message[0] == b"STATE": self.report_state() @@ -107,7 +108,7 @@ if self.worker in sockets: if sockets[self.worker] == zmq.POLLIN: _message = self.worker.recv_multipart() - print _message + log.debug("[%s] Worker message: %r" % (self.identity, _message), level=9) if _message[0] == "JOB": # TODO: Sanity checking @@ -118,7 +119,7 @@ if len(jobs) == 0: self.controller.send_multipart([b"DONE", _message[1]]) else: - print "I have jobs:", jobs + log.debug("[%s] Has jobs: %r" % (self.identity, jobs), level=8) for job in jobs: self.controller.send_multipart([job, _message[1]]) @@ -128,18 +129,18 @@ self.worker.close() def set_state_busy(self): - #print "set state to busy" + log.debug("[%s] Set state to BUSY" % (self.identity), level=9) self.controller.send_multipart([b"STATE", b"BUSY", b"%s" % (self.job_id)]) self.state = b"BUSY" def set_state_ready(self): - #print "set state to ready" + log.debug("[%s] Set state to READY" % (self.identity), level=9) self.controller.send_multipart([b"STATE", b"READY"]) self.state = b"READY" self.job_id = None def take_job(self, _job_id): - print "taking job", _job_id + log.debug("[%s] Accept job %s" % (self.identity, _job_id), level=9) self.set_state_busy() self.worker.send_multipart([b"GET", _job_id]) self.job_id = _job_id
View file
bonnie-0.1.tar.gz/bonnie/worker/outputs/elasticsearch_output.py
Changed
@@ -26,15 +26,52 @@ def register(self, callback): callback({'_all': { 'callback': self.run }}) + def notification2log(self, notification): + """ + Convert the given event notification record into a valid log entry + """ + keymap = { + 'timestamp': None, + 'clientIP': 'client_ip', + 'clientPort': None, + 'serverPort': None, + 'serverDomain': 'domain', + 'aclRights': 'acl_rights', + 'aclSubject': 'acl_subject', + 'mailboxID': 'mailbox_id', + 'messageSize': 'message_size', + 'messageHeaders': 'message_headers', + 'messageContent': 'message', + 'flagNames': 'flag_names', + 'diskUsed': 'disk_used', + 'vnd.cmu.oldUidset': 'uidset', + } + log = { '@version': bonnie.API_VERSION } + for key,val in notification.iteritems(): + newkey = keymap[key] if keymap.has_key(key) else key + if newkey is not None: + # convert NIL values into None which is more appropriate + if isinstance(val, list): + val = [x for x in val if not x == "NIL"] + elif val == "NIL": + val = None + + log[newkey] = val + + return log + def run(self, notification): - #print "es output for:", notification # The output should have UTC timestamps, but gets "2014-05-16T12:55:53.870+02:00" - timestamp = notification['timestamp'] - notification['@timestamp'] = datetime.datetime.strftime(parse(timestamp).astimezone(tzutc()), "%Y-%m-%dT%H:%M:%S.%fZ") - #print "es output for:", notification + try: + timestamp = parse(notification['timestamp']).astimezone(tzutc()) + except: + timestamp = datetime.datetime.now(tzutc()) + + notification['@timestamp'] = datetime.datetime.strftime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") + self.es.create( - index='raw-logstash', + index='logstash-%s' % (datetime.datetime.strftime(timestamp, "%Y-%m-%d")), doc_type='logs', - body=notification + body=self.notification2log(notification) ) return (notification, [])
View file
bonnie-0.1.tar.gz/dealer.py
Changed
@@ -6,7 +6,7 @@ from bonnie.dealer import BonnieDealer if __name__ == "__main__": - notification = sys.stdin.read() + notification = sys.stdin.read().strip() dealer = BonnieDealer() dealer.run(notification)
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.