Package common :: Package xmpp :: Module bosh
[hide private]
[frames] | no frames]

Source Code for Module common.xmpp.bosh

  1  ## bosh.py 
  2  ## 
  3  ## 
  4  ## Copyright (C) 2008 Tomas Karasek <tom.to.the.k@gmail.com> 
  5  ## 
  6  ## This file is part of Gajim. 
  7  ## 
  8  ## Gajim is free software; you can redistribute it and/or modify 
  9  ## it under the terms of the GNU General Public License as published 
 10  ## by the Free Software Foundation; version 3 only. 
 11  ## 
 12  ## Gajim is distributed in the hope that it will be useful, 
 13  ## but WITHOUT ANY WARRANTY; without even the implied warranty of 
 14  ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 15  ## GNU General Public License for more details. 
 16  ## 
 17  ## You should have received a copy of the GNU General Public License 
 18  ## along with Gajim.  If not, see <http://www.gnu.org/licenses/>. 
 19   
 20   
 21  import locale, random 
 22  from hashlib import sha1 
 23  from transports_nb import NonBlockingTransport, NonBlockingHTTPBOSH,\ 
 24          CONNECTED, CONNECTING, DISCONNECTED, DISCONNECTING,\ 
 25          urisplit, DISCONNECT_TIMEOUT_SECONDS 
 26  from protocol import BOSHBody 
 27  from simplexml import Node 
 28   
 29  import logging 
 30  log = logging.getLogger('gajim.c.x.bosh') 
 31   
 32  KEY_COUNT = 10 
 33   
 34  # Fake file descriptor - it's used for setting read_timeout in idlequeue for 
 35  # BOSH Transport. In TCP-derived transports this is file descriptor of socket. 
 36  FAKE_DESCRIPTOR = -1337 
 37   
 38   
39 -class NonBlockingBOSH(NonBlockingTransport):
40 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs, 41 xmpp_server, domain, bosh_dict, proxy_creds):
42 NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue, 43 estabilish_tls, certs) 44 45 self.bosh_sid = None 46 if locale.getdefaultlocale()[0]: 47 self.bosh_xml_lang = locale.getdefaultlocale()[0].split('_')[0] 48 else: 49 self.bosh_xml_lang = 'en' 50 51 self.http_version = 'HTTP/1.1' 52 self.http_persistent = True 53 self.http_pipelining = bosh_dict['bosh_http_pipelining'] 54 self.bosh_to = domain 55 56 self.route_host, self.route_port = xmpp_server 57 58 self.bosh_wait = bosh_dict['bosh_wait'] 59 if not self.http_pipelining: 60 self.bosh_hold = 1 61 else: 62 self.bosh_hold = bosh_dict['bosh_hold'] 63 self.bosh_requests = self.bosh_hold 64 self.bosh_uri = bosh_dict['bosh_uri'] 65 self.bosh_content = bosh_dict['bosh_content'] 66 self.over_proxy = bosh_dict['bosh_useproxy'] 67 if estabilish_tls: 68 self.bosh_secure = 'true' 69 else: 70 self.bosh_secure = 'false' 71 self.use_proxy_auth = bosh_dict['useauth'] 72 self.proxy_creds = proxy_creds 73 self.wait_cb_time = None 74 self.http_socks = [] 75 self.stanza_buffer = [] 76 self.prio_bosh_stanzas = [] 77 self.current_recv_handler = None 78 self.current_recv_socket = None 79 self.key_stack = None 80 self.ack_checker = None 81 self.after_init = False 82 self.proxy_dict = {} 83 if self.over_proxy and self.estabilish_tls: 84 self.proxy_dict['type'] = 'http' 85 # with SSL over proxy, we do HTTP CONNECT to proxy to open a channel to 86 # BOSH Connection Manager 87 host, port = urisplit(self.bosh_uri)[1:3] 88 self.proxy_dict['xmpp_server'] = (host, port) 89 self.proxy_dict['credentials'] = self.proxy_creds
90 91
92 - def connect(self, conn_5tuple, on_connect, on_connect_failure):
93 NonBlockingTransport.connect(self, conn_5tuple, on_connect, on_connect_failure) 94 95 global FAKE_DESCRIPTOR 96 FAKE_DESCRIPTOR = FAKE_DESCRIPTOR - 1 97 self.fd = FAKE_DESCRIPTOR 98 99 self.stanza_buffer = [] 100 self.prio_bosh_stanzas = [] 101 102 self.key_stack = KeyStack(KEY_COUNT) 103 self.ack_checker = AckChecker() 104 self.after_init = True 105 106 self.http_socks.append(self.get_new_http_socket()) 107 self._tcp_connecting_started() 108 109 self.http_socks[0].connect( 110 conn_5tuple = conn_5tuple, 111 on_connect = self._on_connect, 112 on_connect_failure = self._on_connect_failure)
113
114 - def _on_connect(self):
115 self.peerhost = self.http_socks[0].peerhost 116 self.ssl_lib = self.http_socks[0].ssl_lib 117 NonBlockingTransport._on_connect(self)
118 119 120
121 - def set_timeout(self, timeout):
122 if self.get_state() != DISCONNECTED and self.fd != -1: 123 NonBlockingTransport.set_timeout(self, timeout) 124 else: 125 log.warn('set_timeout: TIMEOUT NOT SET: state is %s, fd is %s' % (self.get_state(), self.fd))
126
127 - def on_http_request_possible(self):
128 """ 129 Called when HTTP request it's possible to send a HTTP request. It can be when 130 socket is connected or when HTTP response arrived 131 132 There should be always one pending request to BOSH CM. 133 """ 134 log.debug('on_http_req possible, state:\n%s' % self.get_current_state()) 135 if self.get_state()==DISCONNECTED: return 136 137 #Hack for making the non-secure warning dialog work 138 if self._owner.got_features: 139 if (hasattr(self._owner, 'NonBlockingNonSASL') or hasattr(self._owner, 'SASL')): 140 self.send_BOSH(None) 141 else: 142 # If we already got features and no auth module was plugged yet, we are 143 # probably waiting for confirmation of the "not-secure-connection" dialog. 144 # We don't send HTTP request in that case. 145 # see http://lists.jabber.ru/pipermail/ejabberd/2008-August/004027.html 146 return 147 else: 148 self.send_BOSH(None)
149 150 151
152 - def get_socket_in(self, state):
153 """ 154 Get sockets in desired state 155 """ 156 for s in self.http_socks: 157 if s.get_state()==state: return s 158 return None
159 160
161 - def get_free_socket(self):
162 """ 163 Select and returns socket eligible for sending a data to 164 """ 165 if self.http_pipelining: 166 return self.get_socket_in(CONNECTED) 167 else: 168 last_recv_time, tmpsock = 0, None 169 for s in self.http_socks: 170 # we're interested only in CONNECTED socket with no requests pending 171 if s.get_state()==CONNECTED and s.pending_requests==0: 172 # if there's more of them, we want the one with the least recent data receive 173 # (lowest last_recv_time) 174 if (last_recv_time==0) or (s.last_recv_time < last_recv_time): 175 last_recv_time = s.last_recv_time 176 tmpsock = s 177 if tmpsock: 178 return tmpsock 179 else: 180 return None
181 182
183 - def send_BOSH(self, payload):
184 """ 185 Tries to send a stanza in payload by appeding it to a buffer and plugging a 186 free socket for writing. 187 """ 188 total_pending_reqs = sum([s.pending_requests for s in self.http_socks]) 189 190 # when called after HTTP response (Payload=None) and when there are already 191 # some pending requests and no data to send, or when the socket is 192 # disconnected, we do nothing 193 if payload is None and \ 194 total_pending_reqs > 0 and \ 195 self.stanza_buffer == [] and \ 196 self.prio_bosh_stanzas == [] or \ 197 self.get_state()==DISCONNECTED: 198 return 199 200 # now the payload is put to buffer and will be sent at some point 201 self.append_stanza(payload) 202 203 # if we're about to make more requests than allowed, we don't send - stanzas will be 204 # sent after HTTP response from CM, exception is when we're disconnecting - then we 205 # send anyway 206 if total_pending_reqs >= self.bosh_requests and self.get_state()!=DISCONNECTING: 207 log.warn('attemp to make more requests than allowed by Connection Manager:\n%s' % 208 self.get_current_state()) 209 return 210 211 # when there's free CONNECTED socket, we plug it for write and the data will 212 # be sent when write is possible 213 if self.get_free_socket(): 214 self.plug_socket() 215 return 216 217 # if there is a connecting socket, we just wait for when it connects, 218 # payload will be sent in a sec when the socket connects 219 if self.get_socket_in(CONNECTING): return 220 221 # being here means there are either DISCONNECTED sockets or all sockets are 222 # CONNECTED with too many pending requests 223 s = self.get_socket_in(DISCONNECTED) 224 225 # if we have DISCONNECTED socket, lets connect it and plug for send 226 if s: 227 self.connect_and_flush(s) 228 else: 229 # otherwise create and connect a new one 230 ss = self.get_new_http_socket() 231 self.http_socks.append(ss) 232 self.connect_and_flush(ss) 233 return
234
235 - def plug_socket(self):
236 stanza = None 237 s = self.get_free_socket() 238 if s: 239 s._plug_idle(writable=True, readable=True) 240 else: 241 log.error('=====!!!!!!!!====> Couldn\'t get free socket in plug_socket())')
242
243 - def build_stanza(self, socket):
244 """ 245 Build a BOSH body tag from data in buffers and adds key, rid and ack 246 attributes to it 247 248 This method is called from _do_send() of underlying transport. This is to 249 ensure rid and keys will be processed in correct order. If I generate 250 them before plugging a socket for write (and did it for two sockets/HTTP 251 connections) in parallel, they might be sent in wrong order, which 252 results in violating the BOSH session and server-side disconnect. 253 """ 254 if self.prio_bosh_stanzas: 255 stanza, add_payload = self.prio_bosh_stanzas.pop(0) 256 if add_payload: 257 stanza.setPayload(self.stanza_buffer) 258 self.stanza_buffer = [] 259 else: 260 stanza = self.boshify_stanzas(self.stanza_buffer) 261 self.stanza_buffer = [] 262 263 stanza = self.ack_checker.backup_stanza(stanza, socket) 264 265 key, newkey = self.key_stack.get() 266 if key: 267 stanza.setAttr('key', key) 268 if newkey: 269 stanza.setAttr('newkey', newkey) 270 271 272 log.info('sending msg with rid=%s to sock %s' % (stanza.getAttr('rid'), id(socket))) 273 self.renew_bosh_wait_timeout(self.bosh_wait + 3) 274 return stanza
275 276
277 - def on_bosh_wait_timeout(self):
278 log.error('Connection Manager didn\'t respond within %s + 3 seconds --> forcing disconnect' % self.bosh_wait) 279 self.disconnect()
280 281
282 - def renew_bosh_wait_timeout(self, timeout):
283 if self.wait_cb_time is not None: 284 self.remove_bosh_wait_timeout() 285 sched_time = self.idlequeue.set_alarm(self.on_bosh_wait_timeout, timeout) 286 self.wait_cb_time = sched_time
287
288 - def remove_bosh_wait_timeout(self):
289 self.idlequeue.remove_alarm( 290 self.on_bosh_wait_timeout, 291 self.wait_cb_time)
292
293 - def on_persistent_fallback(self, socket):
294 """ 295 Called from underlying transport when server closes TCP connection 296 297 :param socket: disconnected transport object 298 """ 299 if socket.http_persistent: 300 log.warn('Fallback to nonpersistent HTTP (no pipelining as well)') 301 socket.http_persistent = False 302 self.http_persistent = False 303 self.http_pipelining = False 304 socket.disconnect(do_callback=False) 305 self.connect_and_flush(socket) 306 else: 307 socket.disconnect()
308 309 310
311 - def handle_body_attrs(self, stanza_attrs):
312 """ 313 Called for each incoming body stanza from dispatcher. Checks body 314 attributes. 315 """ 316 self.remove_bosh_wait_timeout() 317 318 if self.after_init: 319 if stanza_attrs.has_key('sid'): 320 # session ID should be only in init response 321 self.bosh_sid = stanza_attrs['sid'] 322 323 if stanza_attrs.has_key('requests'): 324 self.bosh_requests = int(stanza_attrs['requests']) 325 326 if stanza_attrs.has_key('wait'): 327 self.bosh_wait = int(stanza_attrs['wait']) 328 self.after_init = False 329 330 ack = None 331 if stanza_attrs.has_key('ack'): 332 ack = stanza_attrs['ack'] 333 self.ack_checker.process_incoming_ack(ack=ack, 334 socket=self.current_recv_socket) 335 336 if stanza_attrs.has_key('type'): 337 if stanza_attrs['type'] in ['terminate', 'terminal']: 338 condition = 'n/a' 339 if stanza_attrs.has_key('condition'): 340 condition = stanza_attrs['condition'] 341 if condition == 'n/a': 342 log.info('Received sesion-ending terminating stanza') 343 else: 344 log.error('Received terminating stanza: %s - %s' % (condition, 345 bosh_errors[condition])) 346 self.disconnect() 347 return 348 349 if stanza_attrs['type'] == 'error': 350 # recoverable error 351 pass 352 return
353 354
355 - def append_stanza(self, stanza):
356 """ 357 Append stanza to a buffer to send 358 """ 359 if stanza: 360 if isinstance(stanza, tuple): 361 # stanza is tuple of BOSH stanza and bool value for whether to add payload 362 self.prio_bosh_stanzas.append(stanza) 363 else: 364 # stanza is XMPP stanza. Will be boshified before send. 365 self.stanza_buffer.append(stanza)
366 367
368 - def send(self, stanza, now=False):
369 self.send_BOSH(stanza)
370 371 372
373 - def get_current_state(self):
374 t = '------ SOCKET_ID\tSOCKET_STATE\tPENDING_REQS\n' 375 for s in self.http_socks: 376 t = '%s------ %s\t%s\t%s\n' % (t, id(s), s.get_state(), s.pending_requests) 377 t = '%s------ prio stanzas: %s, queued XMPP stanzas: %s, not_acked stanzas: %s' \ 378 % (t, self.prio_bosh_stanzas, self.stanza_buffer, 379 self.ack_checker.get_not_acked_rids()) 380 return t
381 382
383 - def connect_and_flush(self, socket):
384 socket.connect( 385 conn_5tuple = self.conn_5tuple, 386 on_connect = self.on_http_request_possible, 387 on_connect_failure = self.disconnect)
388 389
390 - def boshify_stanzas(self, stanzas=[], body_attrs=None):
391 """ 392 Wraps zero to many stanzas by body tag with xmlns and sid 393 """ 394 log.debug('boshify_staza - type is: %s, stanza is %s' % (type(stanzas), stanzas)) 395 tag = BOSHBody(attrs={'sid': self.bosh_sid}) 396 tag.setPayload(stanzas) 397 return tag
398 399
400 - def send_init(self, after_SASL=False):
401 if after_SASL: 402 t = BOSHBody( 403 attrs={ 'to': self.bosh_to, 404 'sid': self.bosh_sid, 405 'xml:lang': self.bosh_xml_lang, 406 'xmpp:restart': 'true', 407 'secure': self.bosh_secure, 408 'xmlns:xmpp': 'urn:xmpp:xbosh'}) 409 else: 410 t = BOSHBody( 411 attrs={ 'content': self.bosh_content, 412 'hold': str(self.bosh_hold), 413 'route': '%s:%s' % (self.route_host, self.route_port), 414 'to': self.bosh_to, 415 'wait': str(self.bosh_wait), 416 'xml:lang': self.bosh_xml_lang, 417 'xmpp:version': '1.0', 418 'ver': '1.6', 419 'xmlns:xmpp': 'urn:xmpp:xbosh'}) 420 self.send_BOSH((t, True))
421
422 - def start_disconnect(self):
423 NonBlockingTransport.start_disconnect(self) 424 self.renew_bosh_wait_timeout(DISCONNECT_TIMEOUT_SECONDS) 425 self.send_BOSH( 426 (BOSHBody(attrs={'sid': self.bosh_sid, 'type': 'terminate'}), True))
427 428
429 - def get_new_http_socket(self):
430 http_dict = {'http_uri': self.bosh_uri, 431 'http_version': self.http_version, 432 'http_persistent': self.http_persistent, 433 'add_proxy_headers': self.over_proxy and not self.estabilish_tls} 434 if self.use_proxy_auth: 435 http_dict['proxy_user'], http_dict['proxy_pass'] = self.proxy_creds 436 437 s = NonBlockingHTTPBOSH( 438 raise_event=self.raise_event, 439 on_disconnect=self.disconnect, 440 idlequeue = self.idlequeue, 441 estabilish_tls = self.estabilish_tls, 442 certs = self.certs, 443 on_http_request_possible = self.on_http_request_possible, 444 http_dict = http_dict, 445 proxy_dict = self.proxy_dict, 446 on_persistent_fallback = self.on_persistent_fallback) 447 448 s.onreceive(self.on_received_http) 449 s.set_stanza_build_cb(self.build_stanza) 450 return s
451 452
453 - def onreceive(self, recv_handler):
454 if recv_handler is None: 455 recv_handler = self._owner.Dispatcher.ProcessNonBlocking 456 self.current_recv_handler = recv_handler
457 458
459 - def on_received_http(self, data, socket):
460 self.current_recv_socket = socket 461 self.current_recv_handler(data)
462 463
464 - def disconnect(self, do_callback=True):
465 self.remove_bosh_wait_timeout() 466 if self.get_state() == DISCONNECTED: return 467 self.fd = -1 468 for s in self.http_socks: 469 s.disconnect(do_callback=False) 470 NonBlockingTransport.disconnect(self, do_callback)
471 472
473 -def get_rand_number():
474 # with 50-bit random initial rid, session would have to go up 475 # to 7881299347898368 messages to raise rid over 2**53 476 # (see http://www.xmpp.org/extensions/xep-0124.html#rids) 477 # it's also used for sequence key initialization 478 r = random.Random() 479 r.seed() 480 return r.getrandbits(50)
481 482 483
484 -class AckChecker():
485 """ 486 Class for generating rids and generating and checking acknowledgements in 487 BOSH messages 488 """
489 - def __init__(self):
490 self.rid = get_rand_number() 491 self.ack = 1 492 self.last_rids = {} 493 self.not_acked = []
494 495
496 - def get_not_acked_rids(self): return [rid for rid, st in self.not_acked]
497
498 - def backup_stanza(self, stanza, socket):
499 socket.pending_requests += 1 500 rid = self.get_rid() 501 self.not_acked.append((rid, stanza)) 502 stanza.setAttr('rid', str(rid)) 503 self.last_rids[socket]=rid 504 505 if self.rid != self.ack + 1: 506 stanza.setAttr('ack', str(self.ack)) 507 return stanza
508
509 - def process_incoming_ack(self, socket, ack=None):
510 socket.pending_requests -= 1 511 if ack: 512 ack = int(ack) 513 else: 514 ack = self.last_rids[socket] 515 516 i = len([rid for rid, st in self.not_acked if ack >= rid]) 517 self.not_acked = self.not_acked[i:] 518 519 self.ack = ack
520 521
522 - def get_rid(self):
523 self.rid = self.rid + 1 524 return self.rid
525 526 527 528 529
530 -class KeyStack():
531 """ 532 Class implementing key sequences for BOSH messages 533 """
534 - def __init__(self, count):
535 self.count = count 536 self.keys = [] 537 self.reset() 538 self.first_call = True
539
540 - def reset(self):
541 seed = str(get_rand_number()) 542 self.keys = [sha1(seed).hexdigest()] 543 for i in range(self.count-1): 544 curr_seed = self.keys[i] 545 self.keys.append(sha1(curr_seed).hexdigest())
546
547 - def get(self):
548 if self.first_call: 549 self.first_call = False 550 return (None, self.keys.pop()) 551 552 if len(self.keys)>1: 553 return (self.keys.pop(), None) 554 else: 555 last_key = self.keys.pop() 556 self.reset() 557 new_key = self.keys.pop() 558 return (last_key, new_key)
559 560 # http://www.xmpp.org/extensions/xep-0124.html#errorstatus-terminal 561 bosh_errors = { 562 'n/a': 'none or unknown condition in terminating body stanza', 563 'bad-request': 'The format of an HTTP header or binding element received from the client is unacceptable (e.g., syntax error), or Script Syntax is not supported.', 564 'host-gone': 'The target domain specified in the "to" attribute or the target host or port specified in the "route" attribute is no longer serviced by the connection manager.', 565 'host-unknown': 'The target domain specified in the "to" attribute or the target host or port specified in the "route" attribute is unknown to the connection manager.', 566 'improper-addressing': 'The initialization element lacks a "to" or "route" attribute (or the attribute has no value) but the connection manager requires one.', 567 'internal-server-error': 'The connection manager has experienced an internal error that prevents it from servicing the request.', 568 'item-not-found': '(1) "sid" is not valid, (2) "stream" is not valid, (3) "rid" is larger than the upper limit of the expected window, (4) connection manager is unable to resend response, (5) "key" sequence is invalid', 569 'other-request': 'Another request being processed at the same time as this request caused the session to terminate.', 570 'policy-violation': 'The client has broken the session rules (polling too frequently, requesting too frequently, too many simultaneous requests).', 571 'remote-connection-failed': 'The connection manager was unable to connect to, or unable to connect securely to, or has lost its connection to, the server.', 572 'remote-stream-error': 'Encapsulates an error in the protocol being transported.', 573 'see-other-uri': 'The connection manager does not operate at this URI (e.g., the connection manager accepts only SSL or TLS connections at some https: URI rather than the http: URI requested by the client). The client may try POSTing to the URI in the content of the <uri/> child element.', 574 'system-shutdown': 'The connection manager is being shut down. All active HTTP sessions are being terminated. No new sessions can be created.', 575 'undefined-condition': 'The error is not one of those defined herein; the connection manager SHOULD include application-specific information in the content of the <body/> wrapper.' 576 } 577