1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 Main xmpp decision making logic. Provides library with methods to assign
20 different handlers to different XMPP stanzas and namespaces
21 """
22
23 import simplexml, sys, locale
24 from xml.parsers.expat import ExpatError
25 from plugin import PlugIn
26 from protocol import (NS_STREAMS, NS_XMPP_STREAMS, NS_HTTP_BIND, Iq, Presence,
27 Message, Protocol, Node, Error, ERR_FEATURE_NOT_IMPLEMENTED, StreamError)
28 import logging
29 log = logging.getLogger('gajim.c.x.dispatcher_nb')
30
31
32 DEFAULT_TIMEOUT_SECONDS = 25
33 outgoingID = 0
34
35 XML_DECLARATION = '<?xml version=\'1.0\'?>'
39 """
40 Why is this here - I needed to redefine Dispatcher for BOSH and easiest way
41 was to inherit original Dispatcher (now renamed to XMPPDispatcher). Trouble
42 is that reference used to access dispatcher instance is in Client attribute
43 named by __class__.__name__ of the dispatcher instance .. long story short:
44
45 I wrote following to avoid changing each client.Dispatcher.whatever() in xmpp
46
47 If having two kinds of dispatcher will go well, I will rewrite the dispatcher
48 references in other scripts
49 """
50
51 - def PlugIn(self, client_obj, after_SASL=False, old_features=None):
52 if client_obj.protocol_type == 'XMPP':
53 XMPPDispatcher().PlugIn(client_obj)
54 elif client_obj.protocol_type == 'BOSH':
55 BOSHDispatcher().PlugIn(client_obj, after_SASL, old_features)
56 else:
57 assert False
58
59 @classmethod
61 """
62 Factory Method for object creation
63
64 Use this instead of directly initializing the class in order to make
65 unit testing much easier.
66 """
67 return cls(*args, **kwargs)
68
71 """
72 Handles XMPP stream and is the first who takes control over a fresh stanza
73
74 Is plugged into NonBlockingClient but can be replugged to restart handled
75 stream headers (used by SASL f.e.).
76 """
77
79 PlugIn.__init__(self)
80 self.handlers = {}
81 self._expected = {}
82 self._defaultHandler = None
83 self._pendingExceptions = []
84 self._eventHandler = None
85 self._cycleHandlers = []
86 self._exported_methods=[self.RegisterHandler, self.RegisterDefaultHandler,
87 self.RegisterEventHandler, self.UnregisterCycleHandler,
88 self.RegisterCycleHandler, self.RegisterHandlerOnce,
89 self.UnregisterHandler, self.RegisterProtocol,
90 self.SendAndWaitForResponse, self.SendAndCallForResponse,
91 self.getAnID, self.Event, self.send]
92
97
99 """
100 Return set of user-registered callbacks in it's internal format. Used
101 within the library to carry user handlers set over Dispatcher replugins
102 """
103 return self.handlers
104
106 """
107 Restore user-registered callbacks structure from dump previously obtained
108 via dumpHandlers. Used within the library to carry user handlers set over
109 Dispatcher replugins.
110 """
111 self.handlers = handlers
112
128
130 """
131 Plug the Dispatcher instance into Client class instance and send initial
132 stream header. Used internally
133 """
134 self._init()
135 self._owner.lastErrNode = None
136 self._owner.lastErr = None
137 self._owner.lastErrCode = None
138 if hasattr(self._owner, 'StreamInit'):
139 self._owner.StreamInit()
140 else:
141 self.StreamInit()
142
144 """
145 Prepare instance to be destructed
146 """
147 self.Stream.dispatch = None
148 self.Stream.features = None
149 self.Stream.destroy()
150 self._owner = None
151 self.Stream = None
152
171
173 if ns != NS_STREAMS or tag!='stream':
174 raise ValueError('Incorrect stream start: (%s,%s). Terminating.'
175 % (tag, ns))
176
178 """
179 Check incoming stream for data waiting
180
181 :param data: data received from transports/IO sockets
182 :return:
183 1) length of processed data if some data were processed;
184 2) '0' string if no data were processed but link is alive;
185 3) 0 (zero) if underlying connection is closed.
186 """
187
188
189
190
191
192 for handler in self._cycleHandlers:
193 handler(self)
194 if len(self._pendingExceptions) > 0:
195 _pendingException = self._pendingExceptions.pop()
196 raise _pendingException[0], _pendingException[1], _pendingException[2]
197 try:
198 self.Stream.Parse(data)
199
200 if self.Stream and self.Stream.has_received_endtag():
201 self._owner.disconnect(self.Stream.streamError)
202 return 0
203 except ExpatError:
204 log.error('Invalid XML received from server. Forcing disconnect.')
205 self._owner.Connection.disconnect()
206 return 0
207 except ValueError, e:
208 log.debug('ValueError: %s' % str(e))
209 self._owner.Connection.pollend()
210 return 0
211 if len(self._pendingExceptions) > 0:
212 _pendingException = self._pendingExceptions.pop()
213 raise _pendingException[0], _pendingException[1], _pendingException[2]
214 if len(data) == 0:
215 return '0'
216 return len(data)
217
219 """
220 Create internal structures for newly registered namespace
221
222 You can register handlers for this namespace afterwards. By default
223 one namespace is already registered
224 (jabber:client or jabber:component:accept depending on context.
225 """
226 log.debug('Registering namespace "%s"' % xmlns)
227 self.handlers[xmlns] = {}
228 self.RegisterProtocol('unknown', Protocol, xmlns=xmlns)
229 self.RegisterProtocol('default', Protocol, xmlns=xmlns)
230
232 """
233 Used to declare some top-level stanza name to dispatcher
234
235 Needed to start registering handlers for such stanzas. Iq, message and
236 presence protocols are registered by default.
237 """
238 if not xmlns:
239 xmlns=self._owner.defaultNamespace
240 log.debug('Registering protocol "%s" as %s(%s)' %(tag_name, Proto, xmlns))
241 self.handlers[xmlns][tag_name] = {type:Proto, 'default':[]}
242
245 """
246 Register handler for processing all stanzas for specified namespace
247 """
248 self.RegisterHandler('default', handler, typ, ns, xmlns, makefirst,
249 system)
250
251 - def RegisterHandler(self, name, handler, typ='', ns='', xmlns=None,
252 makefirst=False, system=False):
253 """
254 Register user callback as stanzas handler of declared type
255
256 Callback arguments:
257 dispatcher instance (for replying), incoming return of previous handlers.
258 The callback must raise xmpp.NodeProcessed just before return if it wants
259 to prevent other callbacks to be called with the same stanza as argument
260 _and_, more importantly library from returning stanza to sender with error set.
261
262 :param name: name of stanza. F.e. "iq".
263 :param handler: user callback.
264 :param typ: value of stanza's "type" attribute. If not specified any
265 value will match
266 :param ns: namespace of child that stanza must contain.
267 :param makefirst: insert handler in the beginning of handlers list instead
268 of adding it to the end. Note that more common handlers i.e. w/o "typ"
269 and " will be called first nevertheless.
270 :param system: call handler even if NodeProcessed Exception were raised
271 already.
272 """
273 if not xmlns:
274 xmlns=self._owner.defaultNamespace
275 log.debug('Registering handler %s for "%s" type->%s ns->%s(%s)' %
276 (handler, name, typ, ns, xmlns))
277 if not typ and not ns:
278 typ='default'
279 if xmlns not in self.handlers:
280 self.RegisterNamespace(xmlns, 'warn')
281 if name not in self.handlers[xmlns]:
282 self.RegisterProtocol(name, Protocol, xmlns, 'warn')
283 if typ+ns not in self.handlers[xmlns][name]:
284 self.handlers[xmlns][name][typ+ns]=[]
285 if makefirst:
286 self.handlers[xmlns][name][typ+ns].insert(0, {'func':handler,
287 'system':system})
288 else:
289 self.handlers[xmlns][name][typ+ns].append({'func':handler,
290 'system':system})
291
292 - def RegisterHandlerOnce(self, name, handler, typ='', ns='', xmlns=None,
293 makefirst=0, system=0):
294 """
295 Unregister handler after first call (not implemented yet)
296 """
297
298 if not xmlns:
299 xmlns = self._owner.defaultNamespace
300 self.RegisterHandler(name, handler, typ, ns, xmlns, makefirst, system)
301
303 """
304 Unregister handler. "typ" and "ns" must be specified exactly the same as
305 with registering.
306 """
307 if not xmlns:
308 xmlns = self._owner.defaultNamespace
309 if not typ and not ns:
310 typ='default'
311 if xmlns not in self.handlers:
312 return
313 if name not in self.handlers[xmlns]:
314 return
315 if typ+ns not in self.handlers[xmlns][name]:
316 return
317 for pack in self.handlers[xmlns][name][typ+ns]:
318 if pack['func'] == handler:
319 try:
320 self.handlers[xmlns][name][typ+ns].remove(pack)
321 except ValueError:
322 pass
323
325 """
326 Specify the handler that will be used if no NodeProcessed exception were
327 raised. This is returnStanzaHandler by default.
328 """
329 self._defaultHandler = handler
330
332 """
333 Register handler that will process events. F.e. "FILERECEIVED" event. See
334 common/connection: _event_dispatcher()
335 """
336 self._eventHandler = handler
337
345
347 """
348 Register handler that will be called on every Dispatcher.Process() call
349 """
350 if handler not in self._cycleHandlers:
351 self._cycleHandlers.append(handler)
352
354 """
355 Unregister handler that will is called on every Dispatcher.Process() call
356 """
357 if handler in self._cycleHandlers:
358 self._cycleHandlers.remove(handler)
359
360 - def Event(self, realm, event, data):
361 """
362 Raise some event
363
364 :param realm: scope of event. Usually a namespace.
365 :param event: the event itself. F.e. "SUCCESSFUL SEND".
366 :param data: data that comes along with event. Depends on event.
367 """
368 if self._eventHandler:
369 self._eventHandler(realm, event, data)
370 else:
371 log.warning('Received unhandled event: %s' % event)
372
373 - def dispatch(self, stanza, session=None, direct=0):
374 """
375 Main procedure that performs XMPP stanza recognition and calling
376 apppropriate handlers for it. Called by simplexml
377 """
378
379
380
381
382
383 if not session:
384 session = self
385 session.Stream._mini_dom = None
386 name = stanza.getName()
387
388 if name == 'features':
389 self._owner.got_features = True
390 session.Stream.features = stanza
391
392 xmlns = stanza.getNamespace()
393
394
395
396 if xmlns not in self.handlers:
397 log.warn("Unknown namespace: " + xmlns)
398 xmlns = 'unknown'
399
400 if name not in self.handlers[xmlns]:
401 if name != 'features':
402 log.warn("Unknown stanza: " + name)
403 else:
404 log.debug("Got %s/%s stanza" % (xmlns, name))
405 name='unknown'
406 else:
407 log.debug("Got %s/%s stanza" % (xmlns, name))
408
409 if stanza.__class__.__name__ == 'Node':
410
411 stanza=self.handlers[xmlns][name][type](node=stanza)
412
413 typ = stanza.getType()
414 if not typ:
415 typ = ''
416 stanza.props = stanza.getProperties()
417 ID = stanza.getID()
418
419 list_ = ['default']
420 if typ in self.handlers[xmlns][name]:
421 list_.append(typ)
422 for prop in stanza.props:
423 if prop in self.handlers[xmlns][name]:
424 list_.append(prop)
425 if typ and typ+prop in self.handlers[xmlns][name]:
426 list_.append(typ+prop)
427
428 chain = self.handlers[xmlns]['default']['default']
429 for key in list_:
430 if key:
431 chain = chain + self.handlers[xmlns][name][key]
432
433 if ID in session._expected:
434 user = 0
435 if isinstance(session._expected[ID], tuple):
436 cb, args = session._expected[ID]
437 log.debug("Expected stanza arrived. Callback %s(%s) found!" %
438 (cb, args))
439 try:
440 cb(session,stanza,**args)
441 except Exception, typ:
442 if typ.__class__.__name__ != 'NodeProcessed':
443 raise
444 else:
445 log.debug("Expected stanza arrived!")
446 session._expected[ID] = stanza
447 else:
448 user = 1
449 for handler in chain:
450 if user or handler['system']:
451 try:
452 handler['func'](session, stanza)
453 except Exception, typ:
454 if typ.__class__.__name__ != 'NodeProcessed':
455 self._pendingExceptions.insert(0, sys.exc_info())
456 return
457 user=0
458 if user and self._defaultHandler:
459 self._defaultHandler(session, stanza)
460
462 """
463 Internal wrapper around ProcessNonBlocking. Will check for
464 """
465 if data is None:
466 return
467 res = self.ProcessNonBlocking(data)
468
469
470 if not res:
471 return
472 for (_id, _iq) in self._expected.items():
473 if _iq is None:
474
475
476 continue
477 if _id in self.on_responses:
478 if len(self._expected) == 1:
479 self._owner.onreceive(None)
480 resp, args = self.on_responses[_id]
481 del self.on_responses[_id]
482 if args is None:
483 resp(_iq)
484 else:
485 resp(self._owner, _iq, **args)
486 del self._expected[_id]
487
489 """
490 Send stanza and wait for recipient's response to it. Will call transports
491 on_timeout callback if response is not retrieved in time
492
493 Be aware: Only timeout of latest call of SendAndWait is active.
494 """
495 if timeout is None:
496 timeout = DEFAULT_TIMEOUT_SECONDS
497 _waitid = self.send(stanza)
498 if func:
499 self.on_responses[_waitid] = (func, args)
500 if timeout:
501 self._owner.set_timeout(timeout)
502 self._owner.onreceive(self._WaitForData)
503 self._expected[_waitid] = None
504 return _waitid
505
507 """
508 Put stanza on the wire and call back when recipient replies. Additional
509 callback arguments can be specified in args
510 """
511 self.SendAndWaitForResponse(stanza, 0, func, args)
512
513 - def send(self, stanza, now=False):
514 """
515 Wrap transports send method when plugged into NonBlockingClient. Makes
516 sure stanzas get ID and from tag.
517 """
518 ID = None
519 if type(stanza) not in [type(''), type(u'')]:
520 if isinstance(stanza, Protocol):
521 ID = stanza.getID()
522 if ID is None:
523 stanza.setID(self.getAnID())
524 ID = stanza.getID()
525 if self._owner._registered_name and not stanza.getAttr('from'):
526 stanza.setAttr('from', self._owner._registered_name)
527 self._owner.Connection.send(stanza, now)
528 return ID
529
532
533 - def PlugIn(self, owner, after_SASL=False, old_features=None):
534 self.old_features = old_features
535 self.after_SASL = after_SASL
536 XMPPDispatcher.PlugIn(self, owner)
537
559
561 """
562 Send a stream terminator
563 """
564 self._owner.Connection.send_terminator()
565
574
575 - def dispatch(self, stanza, session=None, direct=0):
596