Package common :: Package xmpp :: Module transports_nb
[hide private]
[frames] | no frames]

Source Code for Module common.xmpp.transports_nb

  1  ##   transports_nb.py 
  2  ##       based on transports.py 
  3  ## 
  4  ##   Copyright (C) 2003-2004 Alexey "Snake" Nezhdanov 
  5  ##       modified by Dimitur Kirov <dkirov@gmail.com> 
  6  ##       modified by Tomas Karasek <tom.to.the.k@gmail.com> 
  7  ## 
  8  ##   This program is free software; you can redistribute it and/or modify 
  9  ##   it under the terms of the GNU General Public License as published by 
 10  ##   the Free Software Foundation; either version 2, or (at your option) 
 11  ##   any later version. 
 12  ## 
 13  ##   This program is distributed in the hope that it will be useful, 
 14  ##   but WITHOUT ANY WARRANTY; without even the implied warranty of 
 15  ##   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 16  ##   GNU General Public License for more details. 
 17   
 18  """ 
 19  Transports are objects responsible for connecting to XMPP server and putting 
 20  data to wrapped sockets in in desired form (SSL, TLS, TCP, for HTTP proxy, 
 21  for SOCKS5 proxy...) 
 22   
 23  Transports are not aware of XMPP stanzas and only responsible for low-level 
 24  connection handling. 
 25  """ 
 26   
 27  from simplexml import ustr 
 28  from plugin import PlugIn 
 29  from idlequeue import IdleObject 
 30  import proxy_connectors 
 31  import tls_nb 
 32   
 33  import socket 
 34  import errno 
 35  import time 
 36  import traceback 
 37  import base64 
 38  import urlparse 
 39   
 40  import logging 
 41  log = logging.getLogger('gajim.c.x.transports_nb') 
 42   
43 -def urisplit(uri):
44 """ 45 Function for splitting URI string to tuple (protocol, host, port, path). 46 e.g. urisplit('http://httpcm.jabber.org:123/webclient') returns ('http', 47 'httpcm.jabber.org', 123, '/webclient') return 443 as default port if proto 48 is https else 80 49 """ 50 splitted = urlparse.urlsplit(uri) 51 proto, host, path = splitted.scheme, splitted.hostname, splitted.path 52 try: 53 port = splitted.port 54 except ValueError: 55 log.warn('port cannot be extracted from BOSH URL %s, using default port' \ 56 % uri) 57 port = '' 58 if not port: 59 if proto == 'https': 60 port = 443 61 else: 62 port = 80 63 return proto, host, port, path
64
65 -def get_proxy_data_from_dict(proxy):
66 tcp_host, tcp_port, proxy_user, proxy_pass = None, None, None, None 67 proxy_type = proxy['type'] 68 if proxy_type == 'bosh' and not proxy['bosh_useproxy']: 69 # with BOSH not over proxy we have to parse the hostname from BOSH URI 70 proto, tcp_host, tcp_port, path = urisplit(proxy['bosh_uri']) 71 else: 72 # with proxy!=bosh or with bosh over HTTP proxy we're connecting to proxy 73 # machine 74 tcp_host, tcp_port = proxy['host'], proxy['port'] 75 if proxy.get('useauth', False): 76 proxy_user, proxy_pass = proxy['user'], proxy['pass'] 77 return tcp_host, tcp_port, proxy_user, proxy_pass
78 79 #: timeout to connect to the server socket, it doesn't include auth 80 CONNECT_TIMEOUT_SECONDS = 30 81 82 #: how long to wait for a disconnect to complete 83 DISCONNECT_TIMEOUT_SECONDS = 5 84 85 #: size of the buffer which reads data from server 86 # if lower, more stanzas will be fragmented and processed twice 87 RECV_BUFSIZE = 32768 # 2x maximum size of ssl packet, should be plenty 88 # FIXME: (#2634) gajim breaks with #RECV_BUFSIZE = 16 89 # it's inefficient but should work. Problem is that connect machine makes wrong 90 # assumptions and that we only check for pending data in sockets but not in SSL 91 # buffer... 92 93 DATA_RECEIVED = 'DATA RECEIVED' 94 DATA_SENT = 'DATA SENT' 95 DATA_ERROR = 'DATA ERROR' 96 97 DISCONNECTED = 'DISCONNECTED' 98 DISCONNECTING = 'DISCONNECTING' 99 CONNECTING = 'CONNECTING' 100 PROXY_CONNECTING = 'PROXY_CONNECTING' 101 CONNECTED = 'CONNECTED' 102 STATES = (DISCONNECTED, CONNECTING, PROXY_CONNECTING, CONNECTED, DISCONNECTING) 103
104 -class NonBlockingTransport(PlugIn):
105 """ 106 Abstract class representing a transport 107 108 Subclasses CAN have different constructor signature but connect method SHOULD 109 be the same. 110 """ 111
112 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, 113 certs):
114 """ 115 Each trasport class can have different constructor but it has to have at 116 least all the arguments of NonBlockingTransport constructor 117 118 :param raise_event: callback for monitoring of sent and received data 119 :param on_disconnect: callback called on disconnection during runtime 120 :param idlequeue: processing idlequeue 121 :param estabilish_tls: boolean whether to estabilish TLS connection after 122 TCP connection is done 123 :param certs: tuple of (cacerts, mycerts) see constructor of 124 tls_nb.NonBlockingTLS for more details 125 """ 126 PlugIn.__init__(self) 127 self.raise_event = raise_event 128 self.on_disconnect = on_disconnect 129 self.on_connect = None 130 self.on_connect_failure = None 131 self.idlequeue = idlequeue 132 self.on_receive = None 133 self.server = None 134 self.port = None 135 self.conn_5tuple = None 136 self.set_state(DISCONNECTED) 137 self.estabilish_tls = estabilish_tls 138 self.certs = certs 139 # type of used ssl lib (if any) will be assigned to this member var 140 self.ssl_lib = None 141 self._exported_methods=[self.onreceive, self.set_send_timeout, 142 self.set_send_timeout2, self.set_timeout, self.remove_timeout, 143 self.start_disconnect] 144 145 # time to wait for SOME stanza to come and then send keepalive 146 self.sendtimeout = 0 147 148 # in case we want to something different than sending keepalives 149 self.on_timeout = None 150 self.on_timeout2 = None
151
152 - def plugin(self, owner):
153 owner.Connection = self
154
155 - def plugout(self):
156 self._owner.Connection = None 157 self._owner = None 158 self.disconnect(do_callback=False)
159
160 - def connect(self, conn_5tuple, on_connect, on_connect_failure):
161 """ 162 Creates and connects transport to server and port defined in conn_5tuple 163 which should be item from list returned from getaddrinfo 164 165 :param conn_5tuple: 5-tuple returned from getaddrinfo 166 :param on_connect: callback called on successful connect to the server 167 :param on_connect_failure: callback called on failure when connecting 168 """ 169 self.on_connect = on_connect 170 self.on_connect_failure = on_connect_failure 171 self.server, self.port = conn_5tuple[4][:2] 172 self.conn_5tuple = conn_5tuple
173
174 - def set_state(self, newstate):
175 assert(newstate in STATES) 176 self.state = newstate
177
178 - def get_state(self):
179 return self.state
180
181 - def _on_connect(self):
182 """ 183 Preceeds call of on_connect callback 184 """ 185 # data is reference to socket wrapper instance. We don't need it in client 186 # because 187 self.set_state(CONNECTED) 188 self.on_connect()
189
190 - def _on_connect_failure(self, err_message):
191 """ 192 Preceeds call of on_connect_failure callback 193 """ 194 # In case of error while connecting we need to disconnect transport 195 # but we don't want to call DisconnectHandlers from client, 196 # thus the do_callback=False 197 self.disconnect(do_callback=False) 198 self.on_connect_failure(err_message=err_message)
199
200 - def send(self, raw_data, now=False):
201 if self.get_state() == DISCONNECTED: 202 log.error('Unable to send %s \n because state is %s.' % 203 (raw_data, self.get_state()))
204
205 - def disconnect(self, do_callback=True):
206 self.set_state(DISCONNECTED) 207 if do_callback: 208 # invoke callback given in __init__ 209 self.on_disconnect()
210
211 - def onreceive(self, recv_handler):
212 """ 213 Set the on_receive callback. 214 215 onreceive(None) sets callback to Dispatcher.ProcessNonBlocking which is 216 the default one that will decide what to do with received stanza based on 217 its tag name and namespace. 218 219 Do not confuse it with on_receive() method, which is the callback 220 itself. 221 """ 222 if not recv_handler: 223 if hasattr(self, '_owner') and hasattr(self._owner, 'Dispatcher'): 224 self.on_receive = self._owner.Dispatcher.ProcessNonBlocking 225 else: 226 log.warn('No Dispatcher plugged. Received data will not be processed') 227 self.on_receive = None 228 return 229 self.on_receive = recv_handler
230
231 - def _tcp_connecting_started(self):
233
234 - def read_timeout(self):
235 """ 236 Called when there's no response from server in defined timeout 237 """ 238 if self.on_timeout: 239 self.on_timeout() 240 self.renew_send_timeout()
241
242 - def read_timeout2(self):
243 """ 244 called when there's no response from server in defined timeout 245 """ 246 if self.on_timeout2: 247 self.on_timeout2() 248 self.renew_send_timeout2()
249
250 - def renew_send_timeout(self):
251 if self.on_timeout and self.sendtimeout > 0: 252 self.set_timeout(self.sendtimeout)
253
254 - def renew_send_timeout2(self):
255 if self.on_timeout2 and self.sendtimeout2 > 0: 256 self.set_timeout2(self.sendtimeout2)
257
258 - def set_timeout(self, timeout):
259 self.idlequeue.set_read_timeout(self.fd, timeout)
260
261 - def set_timeout2(self, timeout2):
262 self.idlequeue.set_read_timeout(self.fd, timeout2, self.read_timeout2)
263
264 - def get_fd(self):
265 pass
266
267 - def remove_timeout(self):
268 self.idlequeue.remove_timeout(self.fd)
269
270 - def set_send_timeout(self, timeout, on_timeout):
271 self.sendtimeout = timeout 272 if self.sendtimeout > 0: 273 self.on_timeout = on_timeout 274 else: 275 self.on_timeout = None
276
277 - def set_send_timeout2(self, timeout2, on_timeout2):
278 self.sendtimeout2 = timeout2 279 if self.sendtimeout2 > 0: 280 self.on_timeout2 = on_timeout2 281 else: 282 self.on_timeout2 = None
283 284 # FIXME: where and why does this need to be called
285 - def start_disconnect(self):
287 288
289 -class NonBlockingTCP(NonBlockingTransport, IdleObject):
290 """ 291 Non-blocking TCP socket wrapper 292 293 It is used for simple XMPP connection. Can be connected via proxy and can 294 estabilish TLS connection. 295 """
296 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, 297 certs, proxy_dict=None):
298 """ 299 :param proxy_dict: dictionary with proxy data as loaded from config file 300 """ 301 NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue, 302 estabilish_tls, certs) 303 IdleObject.__init__(self) 304 305 # queue with messages to be send 306 self.sendqueue = [] 307 308 # bytes remained from the last send message 309 self.sendbuff = '' 310 311 self.proxy_dict = proxy_dict 312 self.on_remote_disconnect = self.disconnect
313 314 # FIXME: transport should not be aware xmpp
315 - def start_disconnect(self):
316 NonBlockingTransport.start_disconnect(self) 317 self.send('</stream:stream>', now=True) 318 self.disconnect()
319
320 - def connect(self, conn_5tuple, on_connect, on_connect_failure):
321 NonBlockingTransport.connect(self, conn_5tuple, on_connect, 322 on_connect_failure) 323 log.info('NonBlockingTCP Connect :: About to connect to %s:%s' % 324 (self.server, self.port)) 325 326 try: 327 self._sock = socket.socket(*conn_5tuple[:3]) 328 except socket.error, (errnum, errstr): 329 self._on_connect_failure('NonBlockingTCP Connect: Error while creating\ 330 socket: %s %s' % (errnum, errstr)) 331 return 332 333 self._send = self._sock.send 334 self._recv = self._sock.recv 335 self.fd = self._sock.fileno() 336 337 # we want to be notified when send is possible to connected socket because 338 # it means the TCP connection is estabilished 339 self._plug_idle(writable=True, readable=False) 340 self.peerhost = None 341 342 # variable for errno symbol that will be found from exception raised 343 # from connect() 344 errnum = 0 345 errstr = str() 346 347 # set timeout for TCP connecting - if nonblocking connect() fails, pollend 348 # is called. If if succeeds pollout is called. 349 self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT_SECONDS) 350 351 try: 352 self._sock.setblocking(False) 353 self._sock.connect((self.server, self.port)) 354 except Exception, exc: 355 errnum, errstr = exc.args 356 357 if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK): 358 # connecting in progress 359 log.info('After NB connect() of %s. "%s" raised => CONNECTING' % 360 (id(self), errstr)) 361 self._tcp_connecting_started() 362 return 363 364 # if there was some other exception, call failure callback and unplug 365 # transport which will also remove read_timeouts for descriptor 366 self._on_connect_failure('Exception while connecting to %s:%s - %s %s' % 367 (self.server, self.port, errnum, errstr))
368
369 - def _connect_to_proxy(self):
370 self.set_state(PROXY_CONNECTING) 371 if self.proxy_dict['type'] == 'socks5': 372 proxyclass = proxy_connectors.SOCKS5Connector 373 elif self.proxy_dict['type'] == 'http' : 374 proxyclass = proxy_connectors.HTTPCONNECTConnector 375 proxyclass.get_instance( 376 send_method=self.send, 377 onreceive=self.onreceive, 378 old_on_receive=self.on_receive, 379 on_success=self._on_connect, 380 on_failure=self._on_connect_failure, 381 xmpp_server=self.proxy_dict['xmpp_server'], 382 proxy_creds=self.proxy_dict['credentials'])
383
384 - def _on_connect(self):
385 """ 386 Preceed invoking of on_connect callback. TCP connection is already 387 estabilished by this time 388 """ 389 if self.estabilish_tls: 390 self.tls_init( 391 on_succ = lambda: NonBlockingTransport._on_connect(self), 392 on_fail = lambda: self._on_connect_failure( 393 'error while estabilishing TLS')) 394 else: 395 NonBlockingTransport._on_connect(self)
396
397 - def tls_init(self, on_succ, on_fail):
398 """ 399 Estabilishes TLS/SSL using this TCP connection by plugging a 400 NonBlockingTLS module 401 """ 402 cacerts, mycerts = self.certs 403 result = tls_nb.NonBlockingTLS.get_instance(cacerts, mycerts).PlugIn(self) 404 if result: 405 on_succ() 406 else: 407 on_fail()
408
409 - def pollin(self):
410 """ 411 Called by idlequeu when receive on plugged socket is possible 412 """ 413 log.info('pollin called, state == %s' % self.get_state()) 414 self._do_receive()
415
416 - def pollout(self):
417 """ 418 Called by idlequeu when send to plugged socket is possible 419 """ 420 log.info('pollout called, state == %s' % self.get_state()) 421 422 if self.get_state() == CONNECTING: 423 log.info('%s socket wrapper connected' % id(self)) 424 self.idlequeue.remove_timeout(self.fd) 425 self._plug_idle(writable=False, readable=False) 426 self.peerhost = self._sock.getsockname() 427 if self.proxy_dict: 428 self._connect_to_proxy() 429 else: 430 self._on_connect() 431 else: 432 self._do_send()
433
434 - def pollend(self):
435 """ 436 Called by idlequeue on TCP connection errors 437 """ 438 log.info('pollend called, state == %s' % self.get_state()) 439 440 if self.get_state() == CONNECTING: 441 self._on_connect_failure('Error during connect to %s:%s' % 442 (self.server, self.port)) 443 else: 444 self.disconnect()
445
446 - def disconnect(self, do_callback=True):
447 if self.get_state() == DISCONNECTED: 448 return 449 self.set_state(DISCONNECTED) 450 self.idlequeue.unplug_idle(self.fd) 451 if 'NonBlockingTLS' in self.__dict__: 452 self.NonBlockingTLS.PlugOut() 453 try: 454 self._sock.shutdown(socket.SHUT_RDWR) 455 self._sock.close() 456 except socket.error, (errnum, errstr): 457 log.info('Error while disconnecting socket: %s' % errstr) 458 self.fd = -1 459 NonBlockingTransport.disconnect(self, do_callback)
460
461 - def read_timeout(self):
462 log.info('read_timeout called, state == %s' % self.get_state()) 463 if self.get_state() == CONNECTING: 464 # if read_timeout is called during connecting, connect() didn't end yet 465 # thus we have to call the tcp failure callback 466 self._on_connect_failure('Error during connect to %s:%s' % 467 (self.server, self.port)) 468 else: 469 NonBlockingTransport.read_timeout(self)
470
471 - def set_timeout(self, timeout):
472 if self.get_state() != DISCONNECTED and self.fd != -1: 473 NonBlockingTransport.set_timeout(self, timeout) 474 else: 475 log.warn('set_timeout: TIMEOUT NOT SET: state is %s, fd is %s' % 476 (self.get_state(), self.fd))
477
478 - def remove_timeout(self):
479 if self.fd: 480 NonBlockingTransport.remove_timeout(self) 481 else: 482 log.warn('remove_timeout: no self.fd state is %s' % self.get_state())
483
484 - def send(self, raw_data, now=False):
485 """ 486 Append raw_data to the queue of messages to be send. If supplied data is 487 unicode string, encode it to utf-8. 488 """ 489 NonBlockingTransport.send(self, raw_data, now) 490 491 r = self.encode_stanza(raw_data) 492 493 if now: 494 self.sendqueue.insert(0, r) 495 self._do_send() 496 else: 497 self.sendqueue.append(r) 498 499 self._plug_idle(writable=True, readable=True)
500
501 - def encode_stanza(self, stanza):
502 """ 503 Encode str or unicode to utf-8 504 """ 505 if isinstance(stanza, unicode): 506 stanza = stanza.encode('utf-8') 507 elif not isinstance(stanza, str): 508 stanza = ustr(stanza).encode('utf-8') 509 return stanza
510
511 - def _plug_idle(self, writable, readable):
512 """ 513 Plug file descriptor of socket to Idlequeue 514 515 Plugged socket will be watched for "send possible" or/and "recv possible" 516 events. pollin() callback is invoked on "recv possible", pollout() on 517 "send_possible". 518 519 Plugged socket will always be watched for "error" event - in that case, 520 pollend() is called. 521 """ 522 log.info('Plugging fd %d, W:%s, R:%s' % (self.fd, writable, readable)) 523 self.idlequeue.plug_idle(self, writable, readable)
524
525 - def _do_send(self):
526 """ 527 Called when send() to connected socket is possible. First message from 528 sendqueue will be sent 529 """ 530 if not self.sendbuff: 531 if not self.sendqueue: 532 log.warn('calling send on empty buffer and queue') 533 self._plug_idle(writable=False, readable=True) 534 return None 535 self.sendbuff = self.sendqueue.pop(0) 536 try: 537 send_count = self._send(self.sendbuff) 538 if send_count: 539 sent_data = self.sendbuff[:send_count] 540 self.sendbuff = self.sendbuff[send_count:] 541 self._plug_idle( 542 writable=((self.sendqueue!=[]) or (self.sendbuff!='')), 543 readable=True) 544 self.raise_event(DATA_SENT, sent_data) 545 546 except Exception: 547 log.error('_do_send:', exc_info=True) 548 traceback.print_exc() 549 self.disconnect()
550
551 - def _do_receive(self):
552 """ 553 Reads all pending incoming data. Will call owner's disconnected() method 554 if appropriate 555 """ 556 received = None 557 errnum = 0 558 errstr = 'No Error Set' 559 560 try: 561 # get as many bites, as possible, but not more than RECV_BUFSIZE 562 received = self._recv(RECV_BUFSIZE) 563 except socket.error, (errnum, errstr): 564 log.info("_do_receive: got %s:" % received, exc_info=True) 565 except tls_nb.SSLWrapper.Error, e: 566 log.info("_do_receive, caught SSL error, got %s:" % received, 567 exc_info=True) 568 errnum, errstr = e.errno, e.strerror 569 570 if received == '': 571 errstr = 'zero bytes on recv' 572 573 if (self.ssl_lib is None and received == '') or \ 574 (self.ssl_lib == tls_nb.PYSTDLIB and errnum == 8 ) or \ 575 (self.ssl_lib == tls_nb.PYOPENSSL and errnum == -1 ): 576 # 8 in stdlib: errstr == EOF occured in violation of protocol 577 # -1 in pyopenssl: errstr == Unexpected EOF 578 log.info("Disconnected by remote server: #%s, %s" % (errnum, errstr)) 579 self.on_remote_disconnect() 580 return 581 582 if errnum: 583 log.info("Connection to %s:%s lost: %s %s" % (self.server, self.port, 584 errnum, errstr), exc_info=True) 585 self.disconnect() 586 return 587 588 # this branch is for case of non-fatal SSL errors - None is returned from 589 # recv() but no errnum is set 590 if received is None: 591 return 592 593 # we have received some bytes, stop the timeout! 594 self.remove_timeout() 595 self.renew_send_timeout() 596 self.renew_send_timeout2() 597 # pass received data to owner 598 if self.on_receive: 599 self.raise_event(DATA_RECEIVED, received) 600 self._on_receive(received) 601 else: 602 # This should never happen, so we need the debug. 603 # (If there is no handler on receive specified, data is passed to 604 # Dispatcher.ProcessNonBlocking) 605 log.error('SOCKET %s Unhandled data received: %s' % (id(self), 606 received)) 607 self.disconnect()
608
609 - def _on_receive(self, data):
610 """ 611 Preceeds on_receive callback. It peels off and checks HTTP headers in 612 HTTP classes, in here it just calls the callback 613 """ 614 self.on_receive(data)
615 616
617 -class NonBlockingHTTP(NonBlockingTCP):
618 """ 619 Socket wrapper that creates HTTP message out of sent data and peels-off HTTP 620 headers from incoming messages 621 """ 622
623 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, 624 certs, on_http_request_possible, on_persistent_fallback, http_dict, 625 proxy_dict=None):
626 """ 627 :param on_http_request_possible: method to call when HTTP request to 628 socket owned by transport is possible. 629 :param on_persistent_fallback: callback called when server ends TCP 630 connection. It doesn't have to be fatal for HTTP session. 631 :param http_dict: dictionary with data for HTTP request and headers 632 """ 633 NonBlockingTCP.__init__(self, raise_event, on_disconnect, idlequeue, 634 estabilish_tls, certs, proxy_dict) 635 636 self.http_protocol, self.http_host, self.http_port, self.http_path = \ 637 urisplit(http_dict['http_uri']) 638 self.http_protocol = self.http_protocol or 'http' 639 self.http_path = self.http_path or '/' 640 self.http_version = http_dict['http_version'] 641 self.http_persistent = http_dict['http_persistent'] 642 self.add_proxy_headers = http_dict['add_proxy_headers'] 643 644 if 'proxy_user' in http_dict and 'proxy_pass' in http_dict: 645 self.proxy_user, self.proxy_pass = http_dict['proxy_user'], http_dict[ 646 'proxy_pass'] 647 else: 648 self.proxy_user, self.proxy_pass = None, None 649 650 # buffer for partial responses 651 self.recvbuff = '' 652 self.expected_length = 0 653 self.pending_requests = 0 654 self.on_http_request_possible = on_http_request_possible 655 self.last_recv_time = 0 656 self.close_current_connection = False 657 self.on_remote_disconnect = lambda: on_persistent_fallback(self)
658
659 - def http_send(self, raw_data, now=False):
660 self.send(self.build_http_message(raw_data), now)
661
662 - def _on_receive(self, data):
663 """ 664 Preceeds passing received data to owner class. Gets rid of HTTP headers 665 and checks them. 666 """ 667 if self.get_state() == PROXY_CONNECTING: 668 NonBlockingTCP._on_receive(self, data) 669 return 670 671 # append currently received data to HTTP msg in buffer 672 self.recvbuff = '%s%s' % (self.recvbuff or '', data) 673 statusline, headers, httpbody, buffer_rest = self.parse_http_message( 674 self.recvbuff) 675 676 if not (statusline and headers and httpbody): 677 log.debug('Received incomplete HTTP response') 678 return 679 680 if statusline[1] != '200': 681 log.error('HTTP Error: %s %s' % (statusline[1], statusline[2])) 682 self.disconnect() 683 return 684 self.expected_length = int(headers['Content-Length']) 685 if 'Connection' in headers and headers['Connection'].strip()=='close': 686 self.close_current_connection = True 687 688 if self.expected_length > len(httpbody): 689 # If we haven't received the whole HTTP mess yet, let's end the thread. 690 # It will be finnished from one of following recvs on plugged socket. 691 log.info('not enough bytes in HTTP response - %d expected, got %d' % 692 (self.expected_length, len(httpbody))) 693 else: 694 # First part of buffer has been extraced and is going to be handled, 695 # remove it from buffer 696 self.recvbuff = buffer_rest 697 698 # everything was received 699 self.expected_length = 0 700 701 if not self.http_persistent or self.close_current_connection: 702 # not-persistent connections disconnect after response 703 self.disconnect(do_callback=False) 704 self.close_current_connection = False 705 self.last_recv_time = time.time() 706 self.on_receive(data=httpbody, socket=self) 707 self.on_http_request_possible()
708
709 - def build_http_message(self, httpbody, method='POST'):
710 """ 711 Builds http message with given body. Values for headers and status line 712 fields are taken from class variables 713 """ 714 absolute_uri = '%s://%s:%s%s' % (self.http_protocol, self.http_host, 715 self.http_port, self.http_path) 716 headers = ['%s %s %s' % (method, absolute_uri, self.http_version), 717 'Host: %s:%s' % (self.http_host, self.http_port), 718 'User-Agent: Gajim', 719 'Content-Type: text/xml; charset=utf-8', 720 'Content-Length: %s' % len(str(httpbody))] 721 if self.add_proxy_headers: 722 headers.append('Proxy-Connection: keep-alive') 723 headers.append('Pragma: no-cache') 724 if self.proxy_user and self.proxy_pass: 725 credentials = '%s:%s' % (self.proxy_user, self.proxy_pass) 726 credentials = base64.encodestring(credentials).strip() 727 headers.append('Proxy-Authorization: Basic %s' % credentials) 728 else: 729 headers.append('Connection: Keep-Alive') 730 headers.append('\r\n') 731 headers = '\r\n'.join(headers) 732 return('%s%s' % (headers, httpbody))
733
734 - def parse_http_message(self, message):
735 """ 736 Split http message into a tuple: 737 (statusline - list of e.g. ['HTTP/1.1', '200', 'OK'], 738 headers - dictionary of headers e.g. {'Content-Length': '604', 739 'Content-Type': 'text/xml; charset=utf-8'}, 740 httpbody - string with http body) 741 http_rest - what is left in the message after a full HTTP header + body 742 """ 743 message = message.replace('\r', '') 744 message = message.lstrip('\n') 745 splitted = message.split('\n\n') 746 if len(splitted) < 2: 747 # no complete http message. Keep filling the buffer until we find one 748 buffer_rest = message 749 return ('', '', '', buffer_rest) 750 else: 751 (header, httpbody) = splitted[:2] 752 header = header.split('\n') 753 statusline = header[0].split(' ', 2) 754 header = header[1:] 755 headers = {} 756 for dummy in header: 757 row = dummy.split(' ', 1) 758 headers[row[0][:-1]] = row[1] 759 body_size = headers['Content-Length'] 760 rest_splitted = splitted[2:] 761 while (len(httpbody) < body_size) and rest_splitted: 762 # Complete httpbody until it has the announced size 763 httpbody = '\n\n'.join([httpbody, rest_splitted.pop(0)]) 764 buffer_rest = "\n\n".join(rest_splitted) 765 return (statusline, headers, httpbody, buffer_rest)
766 767
768 -class NonBlockingHTTPBOSH(NonBlockingHTTP):
769 """ 770 Class for BOSH HTTP connections. Slightly redefines HTTP transport by 771 calling bosh bodytag generating callback before putting data on wire 772 """ 773
774 - def set_stanza_build_cb(self, build_cb):
775 self.build_cb = build_cb
776
777 - def _do_send(self):
778 if self.state == PROXY_CONNECTING: 779 NonBlockingTCP._do_send(self) 780 return 781 if not self.sendbuff: 782 stanza = self.build_cb(socket=self) 783 stanza = self.encode_stanza(stanza) 784 stanza = self.build_http_message(httpbody=stanza) 785 self.sendbuff = stanza 786 NonBlockingTCP._do_send(self)
787