Package common :: Package xmpp :: Module dispatcher_nb
[hide private]
[frames] | no frames]

Source Code for Module common.xmpp.dispatcher_nb

  1  ##   dispatcher_nb.py 
  2  ##       based on dispatcher.py 
  3  ## 
  4  ##   Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov 
  5  ##       modified by Dimitur Kirov <dkirov@gmail.com> 
  6  ## 
  7  ##   This program is free software; you can redistribute it and/or modify 
  8  ##   it under the terms of the GNU General Public License as published by 
  9  ##   the Free Software Foundation; either version 2, or (at your option) 
 10  ##   any later version. 
 11  ## 
 12  ##   This program is distributed in the hope that it will be useful, 
 13  ##   but WITHOUT ANY WARRANTY; without even the implied warranty of 
 14  ##   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 15  ##   GNU General Public License for more details. 
 16   
 17   
 18  """ 
 19  Main xmpp decision making logic. Provides library with methods to assign 
 20  different handlers to different XMPP stanzas and namespaces 
 21  """ 
 22   
 23  import simplexml, sys, locale 
 24  from xml.parsers.expat import ExpatError 
 25  from plugin import PlugIn 
 26  from protocol import (NS_STREAMS, NS_XMPP_STREAMS, NS_HTTP_BIND, Iq, Presence, 
 27          Message, Protocol, Node, Error, ERR_FEATURE_NOT_IMPLEMENTED, StreamError) 
 28  import logging 
 29  log = logging.getLogger('gajim.c.x.dispatcher_nb') 
 30   
 31  #: default timeout to wait for response for our id 
 32  DEFAULT_TIMEOUT_SECONDS = 25 
 33  outgoingID = 0 
 34   
 35  XML_DECLARATION = '<?xml version=\'1.0\'?>' 
36 37 # FIXME: ugly 38 -class Dispatcher():
39 """ 40 Why is this here - I needed to redefine Dispatcher for BOSH and easiest way 41 was to inherit original Dispatcher (now renamed to XMPPDispatcher). Trouble 42 is that reference used to access dispatcher instance is in Client attribute 43 named by __class__.__name__ of the dispatcher instance .. long story short: 44 45 I wrote following to avoid changing each client.Dispatcher.whatever() in xmpp 46 47 If having two kinds of dispatcher will go well, I will rewrite the dispatcher 48 references in other scripts 49 """ 50
51 - def PlugIn(self, client_obj, after_SASL=False, old_features=None):
52 if client_obj.protocol_type == 'XMPP': 53 XMPPDispatcher().PlugIn(client_obj) 54 elif client_obj.protocol_type == 'BOSH': 55 BOSHDispatcher().PlugIn(client_obj, after_SASL, old_features) 56 else: 57 assert False # should never be reached
58 59 @classmethod
60 - def get_instance(cls, *args, **kwargs):
61 """ 62 Factory Method for object creation 63 64 Use this instead of directly initializing the class in order to make 65 unit testing much easier. 66 """ 67 return cls(*args, **kwargs)
68
69 70 -class XMPPDispatcher(PlugIn):
71 """ 72 Handles XMPP stream and is the first who takes control over a fresh stanza 73 74 Is plugged into NonBlockingClient but can be replugged to restart handled 75 stream headers (used by SASL f.e.). 76 """ 77
78 - def __init__(self):
79 PlugIn.__init__(self) 80 self.handlers = {} 81 self._expected = {} 82 self._defaultHandler = None 83 self._pendingExceptions = [] 84 self._eventHandler = None 85 self._cycleHandlers = [] 86 self._exported_methods=[self.RegisterHandler, self.RegisterDefaultHandler, 87 self.RegisterEventHandler, self.UnregisterCycleHandler, 88 self.RegisterCycleHandler, self.RegisterHandlerOnce, 89 self.UnregisterHandler, self.RegisterProtocol, 90 self.SendAndWaitForResponse, self.SendAndCallForResponse, 91 self.getAnID, self.Event, self.send]
92
93 - def getAnID(self):
94 global outgoingID 95 outgoingID += 1 96 return repr(outgoingID)
97
98 - def dumpHandlers(self):
99 """ 100 Return set of user-registered callbacks in it's internal format. Used 101 within the library to carry user handlers set over Dispatcher replugins 102 """ 103 return self.handlers
104
105 - def restoreHandlers(self, handlers):
106 """ 107 Restore user-registered callbacks structure from dump previously obtained 108 via dumpHandlers. Used within the library to carry user handlers set over 109 Dispatcher replugins. 110 """ 111 self.handlers = handlers
112
113 - def _init(self):
114 """ 115 Register default namespaces/protocols/handlers. Used internally 116 """ 117 # FIXME: inject dependencies, do not rely that they are defined by our 118 # owner 119 self.RegisterNamespace('unknown') 120 self.RegisterNamespace(NS_STREAMS) 121 self.RegisterNamespace(self._owner.defaultNamespace) 122 self.RegisterProtocol('iq', Iq) 123 self.RegisterProtocol('presence', Presence) 124 self.RegisterProtocol('message', Message) 125 self.RegisterDefaultHandler(self.returnStanzaHandler) 126 self.RegisterEventHandler(self._owner._caller._event_dispatcher) 127 self.on_responses = {}
128
129 - def plugin(self, owner):
130 """ 131 Plug the Dispatcher instance into Client class instance and send initial 132 stream header. Used internally 133 """ 134 self._init() 135 self._owner.lastErrNode = None 136 self._owner.lastErr = None 137 self._owner.lastErrCode = None 138 if hasattr(self._owner, 'StreamInit'): 139 self._owner.StreamInit() 140 else: 141 self.StreamInit()
142
143 - def plugout(self):
144 """ 145 Prepare instance to be destructed 146 """ 147 self.Stream.dispatch = None 148 self.Stream.features = None 149 self.Stream.destroy() 150 self._owner = None 151 self.Stream = None
152
153 - def StreamInit(self):
154 """ 155 Send an initial stream header 156 """ 157 self.Stream = simplexml.NodeBuilder() 158 self.Stream.dispatch = self.dispatch 159 self.Stream._dispatch_depth = 2 160 self.Stream.stream_header_received = self._check_stream_start 161 self.Stream.features = None 162 self._metastream = Node('stream:stream') 163 self._metastream.setNamespace(self._owner.Namespace) 164 self._metastream.setAttr('version', '1.0') 165 self._metastream.setAttr('xmlns:stream', NS_STREAMS) 166 self._metastream.setAttr('to', self._owner.Server) 167 if locale.getdefaultlocale()[0]: 168 self._metastream.setAttr('xml:lang', 169 locale.getdefaultlocale()[0].split('_')[0]) 170 self._owner.send("%s%s>" % (XML_DECLARATION, str(self._metastream)[:-2]))
171
172 - def _check_stream_start(self, ns, tag, attrs):
173 if ns != NS_STREAMS or tag!='stream': 174 raise ValueError('Incorrect stream start: (%s,%s). Terminating.' 175 % (tag, ns))
176
177 - def ProcessNonBlocking(self, data):
178 """ 179 Check incoming stream for data waiting 180 181 :param data: data received from transports/IO sockets 182 :return: 183 1) length of processed data if some data were processed; 184 2) '0' string if no data were processed but link is alive; 185 3) 0 (zero) if underlying connection is closed. 186 """ 187 # FIXME: 188 # When an error occurs we disconnect the transport directly. Client's 189 # disconnect method will never be called. 190 # Is this intended? 191 # also look at transports start_disconnect() 192 for handler in self._cycleHandlers: 193 handler(self) 194 if len(self._pendingExceptions) > 0: 195 _pendingException = self._pendingExceptions.pop() 196 raise _pendingException[0], _pendingException[1], _pendingException[2] 197 try: 198 self.Stream.Parse(data) 199 # end stream:stream tag received 200 if self.Stream and self.Stream.has_received_endtag(): 201 self._owner.disconnect(self.Stream.streamError) 202 return 0 203 except ExpatError: 204 log.error('Invalid XML received from server. Forcing disconnect.') 205 self._owner.Connection.disconnect() 206 return 0 207 except ValueError, e: 208 log.debug('ValueError: %s' % str(e)) 209 self._owner.Connection.pollend() 210 return 0 211 if len(self._pendingExceptions) > 0: 212 _pendingException = self._pendingExceptions.pop() 213 raise _pendingException[0], _pendingException[1], _pendingException[2] 214 if len(data) == 0: 215 return '0' 216 return len(data)
217
218 - def RegisterNamespace(self, xmlns, order='info'):
219 """ 220 Create internal structures for newly registered namespace 221 222 You can register handlers for this namespace afterwards. By default 223 one namespace is already registered 224 (jabber:client or jabber:component:accept depending on context. 225 """ 226 log.debug('Registering namespace "%s"' % xmlns) 227 self.handlers[xmlns] = {} 228 self.RegisterProtocol('unknown', Protocol, xmlns=xmlns) 229 self.RegisterProtocol('default', Protocol, xmlns=xmlns)
230
231 - def RegisterProtocol(self, tag_name, Proto, xmlns=None, order='info'):
232 """ 233 Used to declare some top-level stanza name to dispatcher 234 235 Needed to start registering handlers for such stanzas. Iq, message and 236 presence protocols are registered by default. 237 """ 238 if not xmlns: 239 xmlns=self._owner.defaultNamespace 240 log.debug('Registering protocol "%s" as %s(%s)' %(tag_name, Proto, xmlns)) 241 self.handlers[xmlns][tag_name] = {type:Proto, 'default':[]}
242
243 - def RegisterNamespaceHandler(self, xmlns, handler, typ='', ns='', 244 makefirst=0, system=0):
245 """ 246 Register handler for processing all stanzas for specified namespace 247 """ 248 self.RegisterHandler('default', handler, typ, ns, xmlns, makefirst, 249 system)
250
251 - def RegisterHandler(self, name, handler, typ='', ns='', xmlns=None, 252 makefirst=False, system=False):
253 """ 254 Register user callback as stanzas handler of declared type 255 256 Callback arguments: 257 dispatcher instance (for replying), incoming return of previous handlers. 258 The callback must raise xmpp.NodeProcessed just before return if it wants 259 to prevent other callbacks to be called with the same stanza as argument 260 _and_, more importantly library from returning stanza to sender with error set. 261 262 :param name: name of stanza. F.e. "iq". 263 :param handler: user callback. 264 :param typ: value of stanza's "type" attribute. If not specified any 265 value will match 266 :param ns: namespace of child that stanza must contain. 267 :param makefirst: insert handler in the beginning of handlers list instead 268 of adding it to the end. Note that more common handlers i.e. w/o "typ" 269 and " will be called first nevertheless. 270 :param system: call handler even if NodeProcessed Exception were raised 271 already. 272 """ 273 if not xmlns: 274 xmlns=self._owner.defaultNamespace 275 log.debug('Registering handler %s for "%s" type->%s ns->%s(%s)' % 276 (handler, name, typ, ns, xmlns)) 277 if not typ and not ns: 278 typ='default' 279 if xmlns not in self.handlers: 280 self.RegisterNamespace(xmlns, 'warn') 281 if name not in self.handlers[xmlns]: 282 self.RegisterProtocol(name, Protocol, xmlns, 'warn') 283 if typ+ns not in self.handlers[xmlns][name]: 284 self.handlers[xmlns][name][typ+ns]=[] 285 if makefirst: 286 self.handlers[xmlns][name][typ+ns].insert(0, {'func':handler, 287 'system':system}) 288 else: 289 self.handlers[xmlns][name][typ+ns].append({'func':handler, 290 'system':system})
291
292 - def RegisterHandlerOnce(self, name, handler, typ='', ns='', xmlns=None, 293 makefirst=0, system=0):
294 """ 295 Unregister handler after first call (not implemented yet) 296 """ 297 # FIXME Drop or implement 298 if not xmlns: 299 xmlns = self._owner.defaultNamespace 300 self.RegisterHandler(name, handler, typ, ns, xmlns, makefirst, system)
301
302 - def UnregisterHandler(self, name, handler, typ='', ns='', xmlns=None):
303 """ 304 Unregister handler. "typ" and "ns" must be specified exactly the same as 305 with registering. 306 """ 307 if not xmlns: 308 xmlns = self._owner.defaultNamespace 309 if not typ and not ns: 310 typ='default' 311 if xmlns not in self.handlers: 312 return 313 if name not in self.handlers[xmlns]: 314 return 315 if typ+ns not in self.handlers[xmlns][name]: 316 return 317 for pack in self.handlers[xmlns][name][typ+ns]: 318 if pack['func'] == handler: 319 try: 320 self.handlers[xmlns][name][typ+ns].remove(pack) 321 except ValueError: 322 pass
323
324 - def RegisterDefaultHandler(self, handler):
325 """ 326 Specify the handler that will be used if no NodeProcessed exception were 327 raised. This is returnStanzaHandler by default. 328 """ 329 self._defaultHandler = handler
330
331 - def RegisterEventHandler(self, handler):
332 """ 333 Register handler that will process events. F.e. "FILERECEIVED" event. See 334 common/connection: _event_dispatcher() 335 """ 336 self._eventHandler = handler
337
338 - def returnStanzaHandler(self, conn, stanza):
339 """ 340 Return stanza back to the sender with <feature-not-implemented/> error 341 set 342 """ 343 if stanza.getType() in ('get', 'set'): 344 conn._owner.send(Error(stanza, ERR_FEATURE_NOT_IMPLEMENTED))
345
346 - def RegisterCycleHandler(self, handler):
347 """ 348 Register handler that will be called on every Dispatcher.Process() call 349 """ 350 if handler not in self._cycleHandlers: 351 self._cycleHandlers.append(handler)
352
353 - def UnregisterCycleHandler(self, handler):
354 """ 355 Unregister handler that will is called on every Dispatcher.Process() call 356 """ 357 if handler in self._cycleHandlers: 358 self._cycleHandlers.remove(handler)
359
360 - def Event(self, realm, event, data):
361 """ 362 Raise some event 363 364 :param realm: scope of event. Usually a namespace. 365 :param event: the event itself. F.e. "SUCCESSFUL SEND". 366 :param data: data that comes along with event. Depends on event. 367 """ 368 if self._eventHandler: 369 self._eventHandler(realm, event, data) 370 else: 371 log.warning('Received unhandled event: %s' % event)
372
373 - def dispatch(self, stanza, session=None, direct=0):
374 """ 375 Main procedure that performs XMPP stanza recognition and calling 376 apppropriate handlers for it. Called by simplexml 377 """ 378 # FIXME: Where do we set session and direct. Why? What are those intended 379 # to do? 380 381 #log.info('dispatch called: stanza = %s, session = %s, direct= %s' 382 # % (stanza, session, direct)) 383 if not session: 384 session = self 385 session.Stream._mini_dom = None 386 name = stanza.getName() 387 388 if name == 'features': 389 self._owner.got_features = True 390 session.Stream.features = stanza 391 392 xmlns = stanza.getNamespace() 393 394 # log.info('in dispatch, getting ns for %s, and the ns is %s' 395 # % (stanza, xmlns)) 396 if xmlns not in self.handlers: 397 log.warn("Unknown namespace: " + xmlns) 398 xmlns = 'unknown' 399 # features stanza has been handled before 400 if name not in self.handlers[xmlns]: 401 if name != 'features': 402 log.warn("Unknown stanza: " + name) 403 else: 404 log.debug("Got %s/%s stanza" % (xmlns, name)) 405 name='unknown' 406 else: 407 log.debug("Got %s/%s stanza" % (xmlns, name)) 408 409 if stanza.__class__.__name__ == 'Node': 410 # FIXME: this cannot work 411 stanza=self.handlers[xmlns][name][type](node=stanza) 412 413 typ = stanza.getType() 414 if not typ: 415 typ = '' 416 stanza.props = stanza.getProperties() 417 ID = stanza.getID() 418 419 list_ = ['default'] # we will use all handlers: 420 if typ in self.handlers[xmlns][name]: 421 list_.append(typ) # from very common... 422 for prop in stanza.props: 423 if prop in self.handlers[xmlns][name]: 424 list_.append(prop) 425 if typ and typ+prop in self.handlers[xmlns][name]: 426 list_.append(typ+prop) # ...to very particular 427 428 chain = self.handlers[xmlns]['default']['default'] 429 for key in list_: 430 if key: 431 chain = chain + self.handlers[xmlns][name][key] 432 433 if ID in session._expected: 434 user = 0 435 if isinstance(session._expected[ID], tuple): 436 cb, args = session._expected[ID] 437 log.debug("Expected stanza arrived. Callback %s(%s) found!" % 438 (cb, args)) 439 try: 440 cb(session,stanza,**args) 441 except Exception, typ: 442 if typ.__class__.__name__ != 'NodeProcessed': 443 raise 444 else: 445 log.debug("Expected stanza arrived!") 446 session._expected[ID] = stanza 447 else: 448 user = 1 449 for handler in chain: 450 if user or handler['system']: 451 try: 452 handler['func'](session, stanza) 453 except Exception, typ: 454 if typ.__class__.__name__ != 'NodeProcessed': 455 self._pendingExceptions.insert(0, sys.exc_info()) 456 return 457 user=0 458 if user and self._defaultHandler: 459 self._defaultHandler(session, stanza)
460
461 - def _WaitForData(self, data):
462 """ 463 Internal wrapper around ProcessNonBlocking. Will check for 464 """ 465 if data is None: 466 return 467 res = self.ProcessNonBlocking(data) 468 # 0 result indicates that we have closed the connection, e.g. 469 # we have released dispatcher, so self._owner has no methods 470 if not res: 471 return 472 for (_id, _iq) in self._expected.items(): 473 if _iq is None: 474 # If the expected Stanza would have arrived, ProcessNonBlocking 475 # would have placed the reply stanza in there 476 continue 477 if _id in self.on_responses: 478 if len(self._expected) == 1: 479 self._owner.onreceive(None) 480 resp, args = self.on_responses[_id] 481 del self.on_responses[_id] 482 if args is None: 483 resp(_iq) 484 else: 485 resp(self._owner, _iq, **args) 486 del self._expected[_id]
487
488 - def SendAndWaitForResponse(self, stanza, timeout=None, func=None, args=None):
489 """ 490 Send stanza and wait for recipient's response to it. Will call transports 491 on_timeout callback if response is not retrieved in time 492 493 Be aware: Only timeout of latest call of SendAndWait is active. 494 """ 495 if timeout is None: 496 timeout = DEFAULT_TIMEOUT_SECONDS 497 _waitid = self.send(stanza) 498 if func: 499 self.on_responses[_waitid] = (func, args) 500 if timeout: 501 self._owner.set_timeout(timeout) 502 self._owner.onreceive(self._WaitForData) 503 self._expected[_waitid] = None 504 return _waitid
505
506 - def SendAndCallForResponse(self, stanza, func=None, args=None):
507 """ 508 Put stanza on the wire and call back when recipient replies. Additional 509 callback arguments can be specified in args 510 """ 511 self.SendAndWaitForResponse(stanza, 0, func, args)
512
513 - def send(self, stanza, now=False):
514 """ 515 Wrap transports send method when plugged into NonBlockingClient. Makes 516 sure stanzas get ID and from tag. 517 """ 518 ID = None 519 if type(stanza) not in [type(''), type(u'')]: 520 if isinstance(stanza, Protocol): 521 ID = stanza.getID() 522 if ID is None: 523 stanza.setID(self.getAnID()) 524 ID = stanza.getID() 525 if self._owner._registered_name and not stanza.getAttr('from'): 526 stanza.setAttr('from', self._owner._registered_name) 527 self._owner.Connection.send(stanza, now) 528 return ID
529
530 531 -class BOSHDispatcher(XMPPDispatcher):
532
533 - def PlugIn(self, owner, after_SASL=False, old_features=None):
534 self.old_features = old_features 535 self.after_SASL = after_SASL 536 XMPPDispatcher.PlugIn(self, owner)
537
538 - def StreamInit(self):
539 """ 540 Send an initial stream header 541 """ 542 self.Stream = simplexml.NodeBuilder() 543 self.Stream.dispatch = self.dispatch 544 self.Stream._dispatch_depth = 2 545 self.Stream.stream_header_received = self._check_stream_start 546 self.Stream.features = self.old_features 547 548 self._metastream = Node('stream:stream') 549 self._metastream.setNamespace(self._owner.Namespace) 550 self._metastream.setAttr('version', '1.0') 551 self._metastream.setAttr('xmlns:stream', NS_STREAMS) 552 self._metastream.setAttr('to', self._owner.Server) 553 if locale.getdefaultlocale()[0]: 554 self._metastream.setAttr('xml:lang', 555 locale.getdefaultlocale()[0].split('_')[0]) 556 557 self.restart = True 558 self._owner.Connection.send_init(after_SASL=self.after_SASL)
559
560 - def StreamTerminate(self):
561 """ 562 Send a stream terminator 563 """ 564 self._owner.Connection.send_terminator()
565
566 - def ProcessNonBlocking(self, data=None):
567 if self.restart: 568 fromstream = self._metastream 569 fromstream.setAttr('from', fromstream.getAttr('to')) 570 fromstream.delAttr('to') 571 data = '%s%s>%s' % (XML_DECLARATION, str(fromstream)[:-2], data) 572 self.restart = False 573 return XMPPDispatcher.ProcessNonBlocking(self, data)
574
575 - def dispatch(self, stanza, session=None, direct=0):
576 if stanza.getName() == 'body' and stanza.getNamespace() == NS_HTTP_BIND: 577 578 stanza_attrs = stanza.getAttrs() 579 if 'authid' in stanza_attrs: 580 # should be only in init response 581 # auth module expects id of stream in document attributes 582 self.Stream._document_attrs['id'] = stanza_attrs['authid'] 583 self._owner.Connection.handle_body_attrs(stanza_attrs) 584 585 children = stanza.getChildren() 586 if children: 587 for child in children: 588 # if child doesn't have any ns specified, simplexml (or expat) 589 # thinks it's of parent's (BOSH body) namespace, so we have to 590 # rewrite it to jabber:client 591 if child.getNamespace() == NS_HTTP_BIND: 592 child.setNamespace(self._owner.defaultNamespace) 593 XMPPDispatcher.dispatch(self, child, session, direct) 594 else: 595 XMPPDispatcher.dispatch(self, stanza, session, direct)
596