1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 import locale, random
22 from hashlib import sha1
23 from transports_nb import NonBlockingTransport, NonBlockingHTTPBOSH,\
24 CONNECTED, CONNECTING, DISCONNECTED, DISCONNECTING,\
25 urisplit, DISCONNECT_TIMEOUT_SECONDS
26 from protocol import BOSHBody
27 from simplexml import Node
28
29 import logging
30 log = logging.getLogger('gajim.c.x.bosh')
31
32 KEY_COUNT = 10
33
34
35
36 FAKE_DESCRIPTOR = -1337
37
38
40 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs,
41 xmpp_server, domain, bosh_dict, proxy_creds):
42 NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue,
43 estabilish_tls, certs)
44
45 self.bosh_sid = None
46 if locale.getdefaultlocale()[0]:
47 self.bosh_xml_lang = locale.getdefaultlocale()[0].split('_')[0]
48 else:
49 self.bosh_xml_lang = 'en'
50
51 self.http_version = 'HTTP/1.1'
52 self.http_persistent = True
53 self.http_pipelining = bosh_dict['bosh_http_pipelining']
54 self.bosh_to = domain
55
56 self.route_host, self.route_port = xmpp_server
57
58 self.bosh_wait = bosh_dict['bosh_wait']
59 if not self.http_pipelining:
60 self.bosh_hold = 1
61 else:
62 self.bosh_hold = bosh_dict['bosh_hold']
63 self.bosh_requests = self.bosh_hold
64 self.bosh_uri = bosh_dict['bosh_uri']
65 self.bosh_content = bosh_dict['bosh_content']
66 self.over_proxy = bosh_dict['bosh_useproxy']
67 if estabilish_tls:
68 self.bosh_secure = 'true'
69 else:
70 self.bosh_secure = 'false'
71 self.use_proxy_auth = bosh_dict['useauth']
72 self.proxy_creds = proxy_creds
73 self.wait_cb_time = None
74 self.http_socks = []
75 self.stanza_buffer = []
76 self.prio_bosh_stanzas = []
77 self.current_recv_handler = None
78 self.current_recv_socket = None
79 self.key_stack = None
80 self.ack_checker = None
81 self.after_init = False
82 self.proxy_dict = {}
83 if self.over_proxy and self.estabilish_tls:
84 self.proxy_dict['type'] = 'http'
85
86
87 host, port = urisplit(self.bosh_uri)[1:3]
88 self.proxy_dict['xmpp_server'] = (host, port)
89 self.proxy_dict['credentials'] = self.proxy_creds
90
91
92 - def connect(self, conn_5tuple, on_connect, on_connect_failure):
113
118
119
120
126
128 """
129 Called when HTTP request it's possible to send a HTTP request. It can be when
130 socket is connected or when HTTP response arrived
131
132 There should be always one pending request to BOSH CM.
133 """
134 log.debug('on_http_req possible, state:\n%s' % self.get_current_state())
135 if self.get_state()==DISCONNECTED: return
136
137
138 if self._owner.got_features:
139 if (hasattr(self._owner, 'NonBlockingNonSASL') or hasattr(self._owner, 'SASL')):
140 self.send_BOSH(None)
141 else:
142
143
144
145
146 return
147 else:
148 self.send_BOSH(None)
149
150
151
153 """
154 Get sockets in desired state
155 """
156 for s in self.http_socks:
157 if s.get_state()==state: return s
158 return None
159
160
162 """
163 Select and returns socket eligible for sending a data to
164 """
165 if self.http_pipelining:
166 return self.get_socket_in(CONNECTED)
167 else:
168 last_recv_time, tmpsock = 0, None
169 for s in self.http_socks:
170
171 if s.get_state()==CONNECTED and s.pending_requests==0:
172
173
174 if (last_recv_time==0) or (s.last_recv_time < last_recv_time):
175 last_recv_time = s.last_recv_time
176 tmpsock = s
177 if tmpsock:
178 return tmpsock
179 else:
180 return None
181
182
184 """
185 Tries to send a stanza in payload by appeding it to a buffer and plugging a
186 free socket for writing.
187 """
188 total_pending_reqs = sum([s.pending_requests for s in self.http_socks])
189
190
191
192
193 if payload is None and \
194 total_pending_reqs > 0 and \
195 self.stanza_buffer == [] and \
196 self.prio_bosh_stanzas == [] or \
197 self.get_state()==DISCONNECTED:
198 return
199
200
201 self.append_stanza(payload)
202
203
204
205
206 if total_pending_reqs >= self.bosh_requests and self.get_state()!=DISCONNECTING:
207 log.warn('attemp to make more requests than allowed by Connection Manager:\n%s' %
208 self.get_current_state())
209 return
210
211
212
213 if self.get_free_socket():
214 self.plug_socket()
215 return
216
217
218
219 if self.get_socket_in(CONNECTING): return
220
221
222
223 s = self.get_socket_in(DISCONNECTED)
224
225
226 if s:
227 self.connect_and_flush(s)
228 else:
229
230 ss = self.get_new_http_socket()
231 self.http_socks.append(ss)
232 self.connect_and_flush(ss)
233 return
234
236 stanza = None
237 s = self.get_free_socket()
238 if s:
239 s._plug_idle(writable=True, readable=True)
240 else:
241 log.error('=====!!!!!!!!====> Couldn\'t get free socket in plug_socket())')
242
244 """
245 Build a BOSH body tag from data in buffers and adds key, rid and ack
246 attributes to it
247
248 This method is called from _do_send() of underlying transport. This is to
249 ensure rid and keys will be processed in correct order. If I generate
250 them before plugging a socket for write (and did it for two sockets/HTTP
251 connections) in parallel, they might be sent in wrong order, which
252 results in violating the BOSH session and server-side disconnect.
253 """
254 if self.prio_bosh_stanzas:
255 stanza, add_payload = self.prio_bosh_stanzas.pop(0)
256 if add_payload:
257 stanza.setPayload(self.stanza_buffer)
258 self.stanza_buffer = []
259 else:
260 stanza = self.boshify_stanzas(self.stanza_buffer)
261 self.stanza_buffer = []
262
263 stanza = self.ack_checker.backup_stanza(stanza, socket)
264
265 key, newkey = self.key_stack.get()
266 if key:
267 stanza.setAttr('key', key)
268 if newkey:
269 stanza.setAttr('newkey', newkey)
270
271
272 log.info('sending msg with rid=%s to sock %s' % (stanza.getAttr('rid'), id(socket)))
273 self.renew_bosh_wait_timeout(self.bosh_wait + 3)
274 return stanza
275
276
278 log.error('Connection Manager didn\'t respond within %s + 3 seconds --> forcing disconnect' % self.bosh_wait)
279 self.disconnect()
280
281
287
292
294 """
295 Called from underlying transport when server closes TCP connection
296
297 :param socket: disconnected transport object
298 """
299 if socket.http_persistent:
300 log.warn('Fallback to nonpersistent HTTP (no pipelining as well)')
301 socket.http_persistent = False
302 self.http_persistent = False
303 self.http_pipelining = False
304 socket.disconnect(do_callback=False)
305 self.connect_and_flush(socket)
306 else:
307 socket.disconnect()
308
309
310
311 - def handle_body_attrs(self, stanza_attrs):
312 """
313 Called for each incoming body stanza from dispatcher. Checks body
314 attributes.
315 """
316 self.remove_bosh_wait_timeout()
317
318 if self.after_init:
319 if stanza_attrs.has_key('sid'):
320
321 self.bosh_sid = stanza_attrs['sid']
322
323 if stanza_attrs.has_key('requests'):
324 self.bosh_requests = int(stanza_attrs['requests'])
325
326 if stanza_attrs.has_key('wait'):
327 self.bosh_wait = int(stanza_attrs['wait'])
328 self.after_init = False
329
330 ack = None
331 if stanza_attrs.has_key('ack'):
332 ack = stanza_attrs['ack']
333 self.ack_checker.process_incoming_ack(ack=ack,
334 socket=self.current_recv_socket)
335
336 if stanza_attrs.has_key('type'):
337 if stanza_attrs['type'] in ['terminate', 'terminal']:
338 condition = 'n/a'
339 if stanza_attrs.has_key('condition'):
340 condition = stanza_attrs['condition']
341 if condition == 'n/a':
342 log.info('Received sesion-ending terminating stanza')
343 else:
344 log.error('Received terminating stanza: %s - %s' % (condition,
345 bosh_errors[condition]))
346 self.disconnect()
347 return
348
349 if stanza_attrs['type'] == 'error':
350
351 pass
352 return
353
354
356 """
357 Append stanza to a buffer to send
358 """
359 if stanza:
360 if isinstance(stanza, tuple):
361
362 self.prio_bosh_stanzas.append(stanza)
363 else:
364
365 self.stanza_buffer.append(stanza)
366
367
368 - def send(self, stanza, now=False):
370
371
372
374 t = '------ SOCKET_ID\tSOCKET_STATE\tPENDING_REQS\n'
375 for s in self.http_socks:
376 t = '%s------ %s\t%s\t%s\n' % (t, id(s), s.get_state(), s.pending_requests)
377 t = '%s------ prio stanzas: %s, queued XMPP stanzas: %s, not_acked stanzas: %s' \
378 % (t, self.prio_bosh_stanzas, self.stanza_buffer,
379 self.ack_checker.get_not_acked_rids())
380 return t
381
382
388
389
391 """
392 Wraps zero to many stanzas by body tag with xmlns and sid
393 """
394 log.debug('boshify_staza - type is: %s, stanza is %s' % (type(stanzas), stanzas))
395 tag = BOSHBody(attrs={'sid': self.bosh_sid})
396 tag.setPayload(stanzas)
397 return tag
398
399
401 if after_SASL:
402 t = BOSHBody(
403 attrs={ 'to': self.bosh_to,
404 'sid': self.bosh_sid,
405 'xml:lang': self.bosh_xml_lang,
406 'xmpp:restart': 'true',
407 'secure': self.bosh_secure,
408 'xmlns:xmpp': 'urn:xmpp:xbosh'})
409 else:
410 t = BOSHBody(
411 attrs={ 'content': self.bosh_content,
412 'hold': str(self.bosh_hold),
413 'route': '%s:%s' % (self.route_host, self.route_port),
414 'to': self.bosh_to,
415 'wait': str(self.bosh_wait),
416 'xml:lang': self.bosh_xml_lang,
417 'xmpp:version': '1.0',
418 'ver': '1.6',
419 'xmlns:xmpp': 'urn:xmpp:xbosh'})
420 self.send_BOSH((t, True))
421
427
428
430 http_dict = {'http_uri': self.bosh_uri,
431 'http_version': self.http_version,
432 'http_persistent': self.http_persistent,
433 'add_proxy_headers': self.over_proxy and not self.estabilish_tls}
434 if self.use_proxy_auth:
435 http_dict['proxy_user'], http_dict['proxy_pass'] = self.proxy_creds
436
437 s = NonBlockingHTTPBOSH(
438 raise_event=self.raise_event,
439 on_disconnect=self.disconnect,
440 idlequeue = self.idlequeue,
441 estabilish_tls = self.estabilish_tls,
442 certs = self.certs,
443 on_http_request_possible = self.on_http_request_possible,
444 http_dict = http_dict,
445 proxy_dict = self.proxy_dict,
446 on_persistent_fallback = self.on_persistent_fallback)
447
448 s.onreceive(self.on_received_http)
449 s.set_stanza_build_cb(self.build_stanza)
450 return s
451
452
457
458
460 self.current_recv_socket = socket
461 self.current_recv_handler(data)
462
463
471
472
474
475
476
477
478 r = random.Random()
479 r.seed()
480 return r.getrandbits(50)
481
482
483
485 """
486 Class for generating rids and generating and checking acknowledgements in
487 BOSH messages
488 """
490 self.rid = get_rand_number()
491 self.ack = 1
492 self.last_rids = {}
493 self.not_acked = []
494
495
497
499 socket.pending_requests += 1
500 rid = self.get_rid()
501 self.not_acked.append((rid, stanza))
502 stanza.setAttr('rid', str(rid))
503 self.last_rids[socket]=rid
504
505 if self.rid != self.ack + 1:
506 stanza.setAttr('ack', str(self.ack))
507 return stanza
508
510 socket.pending_requests -= 1
511 if ack:
512 ack = int(ack)
513 else:
514 ack = self.last_rids[socket]
515
516 i = len([rid for rid, st in self.not_acked if ack >= rid])
517 self.not_acked = self.not_acked[i:]
518
519 self.ack = ack
520
521
523 self.rid = self.rid + 1
524 return self.rid
525
526
527
528
529
531 """
532 Class implementing key sequences for BOSH messages
533 """
535 self.count = count
536 self.keys = []
537 self.reset()
538 self.first_call = True
539
541 seed = str(get_rand_number())
542 self.keys = [sha1(seed).hexdigest()]
543 for i in range(self.count-1):
544 curr_seed = self.keys[i]
545 self.keys.append(sha1(curr_seed).hexdigest())
546
548 if self.first_call:
549 self.first_call = False
550 return (None, self.keys.pop())
551
552 if len(self.keys)>1:
553 return (self.keys.pop(), None)
554 else:
555 last_key = self.keys.pop()
556 self.reset()
557 new_key = self.keys.pop()
558 return (last_key, new_key)
559
560
561 bosh_errors = {
562 'n/a': 'none or unknown condition in terminating body stanza',
563 'bad-request': 'The format of an HTTP header or binding element received from the client is unacceptable (e.g., syntax error), or Script Syntax is not supported.',
564 'host-gone': 'The target domain specified in the "to" attribute or the target host or port specified in the "route" attribute is no longer serviced by the connection manager.',
565 'host-unknown': 'The target domain specified in the "to" attribute or the target host or port specified in the "route" attribute is unknown to the connection manager.',
566 'improper-addressing': 'The initialization element lacks a "to" or "route" attribute (or the attribute has no value) but the connection manager requires one.',
567 'internal-server-error': 'The connection manager has experienced an internal error that prevents it from servicing the request.',
568 'item-not-found': '(1) "sid" is not valid, (2) "stream" is not valid, (3) "rid" is larger than the upper limit of the expected window, (4) connection manager is unable to resend response, (5) "key" sequence is invalid',
569 'other-request': 'Another request being processed at the same time as this request caused the session to terminate.',
570 'policy-violation': 'The client has broken the session rules (polling too frequently, requesting too frequently, too many simultaneous requests).',
571 'remote-connection-failed': 'The connection manager was unable to connect to, or unable to connect securely to, or has lost its connection to, the server.',
572 'remote-stream-error': 'Encapsulates an error in the protocol being transported.',
573 'see-other-uri': 'The connection manager does not operate at this URI (e.g., the connection manager accepts only SSL or TLS connections at some https: URI rather than the http: URI requested by the client). The client may try POSTing to the URI in the content of the <uri/> child element.',
574 'system-shutdown': 'The connection manager is being shut down. All active HTTP sessions are being terminated. No new sessions can be created.',
575 'undefined-condition': 'The error is not one of those defined herein; the connection manager SHOULD include application-specific information in the content of the <body/> wrapper.'
576 }
577