1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 Transports are objects responsible for connecting to XMPP server and putting
20 data to wrapped sockets in in desired form (SSL, TLS, TCP, for HTTP proxy,
21 for SOCKS5 proxy...)
22
23 Transports are not aware of XMPP stanzas and only responsible for low-level
24 connection handling.
25 """
26
27 from simplexml import ustr
28 from plugin import PlugIn
29 from idlequeue import IdleObject
30 import proxy_connectors
31 import tls_nb
32
33 import socket
34 import errno
35 import time
36 import traceback
37 import base64
38 import urlparse
39
40 import logging
41 log = logging.getLogger('gajim.c.x.transports_nb')
42
44 """
45 Function for splitting URI string to tuple (protocol, host, port, path).
46 e.g. urisplit('http://httpcm.jabber.org:123/webclient') returns ('http',
47 'httpcm.jabber.org', 123, '/webclient') return 443 as default port if proto
48 is https else 80
49 """
50 splitted = urlparse.urlsplit(uri)
51 proto, host, path = splitted.scheme, splitted.hostname, splitted.path
52 try:
53 port = splitted.port
54 except ValueError:
55 log.warn('port cannot be extracted from BOSH URL %s, using default port' \
56 % uri)
57 port = ''
58 if not port:
59 if proto == 'https':
60 port = 443
61 else:
62 port = 80
63 return proto, host, port, path
64
66 tcp_host, tcp_port, proxy_user, proxy_pass = None, None, None, None
67 proxy_type = proxy['type']
68 if proxy_type == 'bosh' and not proxy['bosh_useproxy']:
69
70 proto, tcp_host, tcp_port, path = urisplit(proxy['bosh_uri'])
71 else:
72
73
74 tcp_host, tcp_port = proxy['host'], proxy['port']
75 if proxy.get('useauth', False):
76 proxy_user, proxy_pass = proxy['user'], proxy['pass']
77 return tcp_host, tcp_port, proxy_user, proxy_pass
78
79
80 CONNECT_TIMEOUT_SECONDS = 30
81
82
83 DISCONNECT_TIMEOUT_SECONDS = 5
84
85
86
87 RECV_BUFSIZE = 32768
88
89
90
91
92
93 DATA_RECEIVED = 'DATA RECEIVED'
94 DATA_SENT = 'DATA SENT'
95 DATA_ERROR = 'DATA ERROR'
96
97 DISCONNECTED = 'DISCONNECTED'
98 DISCONNECTING = 'DISCONNECTING'
99 CONNECTING = 'CONNECTING'
100 PROXY_CONNECTING = 'PROXY_CONNECTING'
101 CONNECTED = 'CONNECTED'
102 STATES = (DISCONNECTED, CONNECTING, PROXY_CONNECTING, CONNECTED, DISCONNECTING)
103
105 """
106 Abstract class representing a transport
107
108 Subclasses CAN have different constructor signature but connect method SHOULD
109 be the same.
110 """
111
112 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls,
113 certs):
114 """
115 Each trasport class can have different constructor but it has to have at
116 least all the arguments of NonBlockingTransport constructor
117
118 :param raise_event: callback for monitoring of sent and received data
119 :param on_disconnect: callback called on disconnection during runtime
120 :param idlequeue: processing idlequeue
121 :param estabilish_tls: boolean whether to estabilish TLS connection after
122 TCP connection is done
123 :param certs: tuple of (cacerts, mycerts) see constructor of
124 tls_nb.NonBlockingTLS for more details
125 """
126 PlugIn.__init__(self)
127 self.raise_event = raise_event
128 self.on_disconnect = on_disconnect
129 self.on_connect = None
130 self.on_connect_failure = None
131 self.idlequeue = idlequeue
132 self.on_receive = None
133 self.server = None
134 self.port = None
135 self.conn_5tuple = None
136 self.set_state(DISCONNECTED)
137 self.estabilish_tls = estabilish_tls
138 self.certs = certs
139
140 self.ssl_lib = None
141 self._exported_methods=[self.onreceive, self.set_send_timeout,
142 self.set_send_timeout2, self.set_timeout, self.remove_timeout,
143 self.start_disconnect]
144
145
146 self.sendtimeout = 0
147
148
149 self.on_timeout = None
150 self.on_timeout2 = None
151
154
159
160 - def connect(self, conn_5tuple, on_connect, on_connect_failure):
161 """
162 Creates and connects transport to server and port defined in conn_5tuple
163 which should be item from list returned from getaddrinfo
164
165 :param conn_5tuple: 5-tuple returned from getaddrinfo
166 :param on_connect: callback called on successful connect to the server
167 :param on_connect_failure: callback called on failure when connecting
168 """
169 self.on_connect = on_connect
170 self.on_connect_failure = on_connect_failure
171 self.server, self.port = conn_5tuple[4][:2]
172 self.conn_5tuple = conn_5tuple
173
175 assert(newstate in STATES)
176 self.state = newstate
177
180
189
191 """
192 Preceeds call of on_connect_failure callback
193 """
194
195
196
197 self.disconnect(do_callback=False)
198 self.on_connect_failure(err_message=err_message)
199
200 - def send(self, raw_data, now=False):
204
210
212 """
213 Set the on_receive callback.
214
215 onreceive(None) sets callback to Dispatcher.ProcessNonBlocking which is
216 the default one that will decide what to do with received stanza based on
217 its tag name and namespace.
218
219 Do not confuse it with on_receive() method, which is the callback
220 itself.
221 """
222 if not recv_handler:
223 if hasattr(self, '_owner') and hasattr(self._owner, 'Dispatcher'):
224 self.on_receive = self._owner.Dispatcher.ProcessNonBlocking
225 else:
226 log.warn('No Dispatcher plugged. Received data will not be processed')
227 self.on_receive = None
228 return
229 self.on_receive = recv_handler
230
233
241
243 """
244 called when there's no response from server in defined timeout
245 """
246 if self.on_timeout2:
247 self.on_timeout2()
248 self.renew_send_timeout2()
249
253
255 if self.on_timeout2 and self.sendtimeout2 > 0:
256 self.set_timeout2(self.sendtimeout2)
257
260
263
266
269
276
278 self.sendtimeout2 = timeout2
279 if self.sendtimeout2 > 0:
280 self.on_timeout2 = on_timeout2
281 else:
282 self.on_timeout2 = None
283
284
287
288
290 """
291 Non-blocking TCP socket wrapper
292
293 It is used for simple XMPP connection. Can be connected via proxy and can
294 estabilish TLS connection.
295 """
296 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls,
297 certs, proxy_dict=None):
313
314
319
320 - def connect(self, conn_5tuple, on_connect, on_connect_failure):
321 NonBlockingTransport.connect(self, conn_5tuple, on_connect,
322 on_connect_failure)
323 log.info('NonBlockingTCP Connect :: About to connect to %s:%s' %
324 (self.server, self.port))
325
326 try:
327 self._sock = socket.socket(*conn_5tuple[:3])
328 except socket.error, (errnum, errstr):
329 self._on_connect_failure('NonBlockingTCP Connect: Error while creating\
330 socket: %s %s' % (errnum, errstr))
331 return
332
333 self._send = self._sock.send
334 self._recv = self._sock.recv
335 self.fd = self._sock.fileno()
336
337
338
339 self._plug_idle(writable=True, readable=False)
340 self.peerhost = None
341
342
343
344 errnum = 0
345 errstr = str()
346
347
348
349 self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT_SECONDS)
350
351 try:
352 self._sock.setblocking(False)
353 self._sock.connect((self.server, self.port))
354 except Exception, exc:
355 errnum, errstr = exc.args
356
357 if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK):
358
359 log.info('After NB connect() of %s. "%s" raised => CONNECTING' %
360 (id(self), errstr))
361 self._tcp_connecting_started()
362 return
363
364
365
366 self._on_connect_failure('Exception while connecting to %s:%s - %s %s' %
367 (self.server, self.port, errnum, errstr))
368
383
396
398 """
399 Estabilishes TLS/SSL using this TCP connection by plugging a
400 NonBlockingTLS module
401 """
402 cacerts, mycerts = self.certs
403 result = tls_nb.NonBlockingTLS.get_instance(cacerts, mycerts).PlugIn(self)
404 if result:
405 on_succ()
406 else:
407 on_fail()
408
410 """
411 Called by idlequeu when receive on plugged socket is possible
412 """
413 log.info('pollin called, state == %s' % self.get_state())
414 self._do_receive()
415
433
445
460
470
477
483
484 - def send(self, raw_data, now=False):
485 """
486 Append raw_data to the queue of messages to be send. If supplied data is
487 unicode string, encode it to utf-8.
488 """
489 NonBlockingTransport.send(self, raw_data, now)
490
491 r = self.encode_stanza(raw_data)
492
493 if now:
494 self.sendqueue.insert(0, r)
495 self._do_send()
496 else:
497 self.sendqueue.append(r)
498
499 self._plug_idle(writable=True, readable=True)
500
502 """
503 Encode str or unicode to utf-8
504 """
505 if isinstance(stanza, unicode):
506 stanza = stanza.encode('utf-8')
507 elif not isinstance(stanza, str):
508 stanza = ustr(stanza).encode('utf-8')
509 return stanza
510
512 """
513 Plug file descriptor of socket to Idlequeue
514
515 Plugged socket will be watched for "send possible" or/and "recv possible"
516 events. pollin() callback is invoked on "recv possible", pollout() on
517 "send_possible".
518
519 Plugged socket will always be watched for "error" event - in that case,
520 pollend() is called.
521 """
522 log.info('Plugging fd %d, W:%s, R:%s' % (self.fd, writable, readable))
523 self.idlequeue.plug_idle(self, writable, readable)
524
526 """
527 Called when send() to connected socket is possible. First message from
528 sendqueue will be sent
529 """
530 if not self.sendbuff:
531 if not self.sendqueue:
532 log.warn('calling send on empty buffer and queue')
533 self._plug_idle(writable=False, readable=True)
534 return None
535 self.sendbuff = self.sendqueue.pop(0)
536 try:
537 send_count = self._send(self.sendbuff)
538 if send_count:
539 sent_data = self.sendbuff[:send_count]
540 self.sendbuff = self.sendbuff[send_count:]
541 self._plug_idle(
542 writable=((self.sendqueue!=[]) or (self.sendbuff!='')),
543 readable=True)
544 self.raise_event(DATA_SENT, sent_data)
545
546 except Exception:
547 log.error('_do_send:', exc_info=True)
548 traceback.print_exc()
549 self.disconnect()
550
552 """
553 Reads all pending incoming data. Will call owner's disconnected() method
554 if appropriate
555 """
556 received = None
557 errnum = 0
558 errstr = 'No Error Set'
559
560 try:
561
562 received = self._recv(RECV_BUFSIZE)
563 except socket.error, (errnum, errstr):
564 log.info("_do_receive: got %s:" % received, exc_info=True)
565 except tls_nb.SSLWrapper.Error, e:
566 log.info("_do_receive, caught SSL error, got %s:" % received,
567 exc_info=True)
568 errnum, errstr = e.errno, e.strerror
569
570 if received == '':
571 errstr = 'zero bytes on recv'
572
573 if (self.ssl_lib is None and received == '') or \
574 (self.ssl_lib == tls_nb.PYSTDLIB and errnum == 8 ) or \
575 (self.ssl_lib == tls_nb.PYOPENSSL and errnum == -1 ):
576
577
578 log.info("Disconnected by remote server: #%s, %s" % (errnum, errstr))
579 self.on_remote_disconnect()
580 return
581
582 if errnum:
583 log.info("Connection to %s:%s lost: %s %s" % (self.server, self.port,
584 errnum, errstr), exc_info=True)
585 self.disconnect()
586 return
587
588
589
590 if received is None:
591 return
592
593
594 self.remove_timeout()
595 self.renew_send_timeout()
596 self.renew_send_timeout2()
597
598 if self.on_receive:
599 self.raise_event(DATA_RECEIVED, received)
600 self._on_receive(received)
601 else:
602
603
604
605 log.error('SOCKET %s Unhandled data received: %s' % (id(self),
606 received))
607 self.disconnect()
608
610 """
611 Preceeds on_receive callback. It peels off and checks HTTP headers in
612 HTTP classes, in here it just calls the callback
613 """
614 self.on_receive(data)
615
616
618 """
619 Socket wrapper that creates HTTP message out of sent data and peels-off HTTP
620 headers from incoming messages
621 """
622
623 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls,
624 certs, on_http_request_possible, on_persistent_fallback, http_dict,
625 proxy_dict=None):
626 """
627 :param on_http_request_possible: method to call when HTTP request to
628 socket owned by transport is possible.
629 :param on_persistent_fallback: callback called when server ends TCP
630 connection. It doesn't have to be fatal for HTTP session.
631 :param http_dict: dictionary with data for HTTP request and headers
632 """
633 NonBlockingTCP.__init__(self, raise_event, on_disconnect, idlequeue,
634 estabilish_tls, certs, proxy_dict)
635
636 self.http_protocol, self.http_host, self.http_port, self.http_path = \
637 urisplit(http_dict['http_uri'])
638 self.http_protocol = self.http_protocol or 'http'
639 self.http_path = self.http_path or '/'
640 self.http_version = http_dict['http_version']
641 self.http_persistent = http_dict['http_persistent']
642 self.add_proxy_headers = http_dict['add_proxy_headers']
643
644 if 'proxy_user' in http_dict and 'proxy_pass' in http_dict:
645 self.proxy_user, self.proxy_pass = http_dict['proxy_user'], http_dict[
646 'proxy_pass']
647 else:
648 self.proxy_user, self.proxy_pass = None, None
649
650
651 self.recvbuff = ''
652 self.expected_length = 0
653 self.pending_requests = 0
654 self.on_http_request_possible = on_http_request_possible
655 self.last_recv_time = 0
656 self.close_current_connection = False
657 self.on_remote_disconnect = lambda: on_persistent_fallback(self)
658
661
663 """
664 Preceeds passing received data to owner class. Gets rid of HTTP headers
665 and checks them.
666 """
667 if self.get_state() == PROXY_CONNECTING:
668 NonBlockingTCP._on_receive(self, data)
669 return
670
671
672 self.recvbuff = '%s%s' % (self.recvbuff or '', data)
673 statusline, headers, httpbody, buffer_rest = self.parse_http_message(
674 self.recvbuff)
675
676 if not (statusline and headers and httpbody):
677 log.debug('Received incomplete HTTP response')
678 return
679
680 if statusline[1] != '200':
681 log.error('HTTP Error: %s %s' % (statusline[1], statusline[2]))
682 self.disconnect()
683 return
684 self.expected_length = int(headers['Content-Length'])
685 if 'Connection' in headers and headers['Connection'].strip()=='close':
686 self.close_current_connection = True
687
688 if self.expected_length > len(httpbody):
689
690
691 log.info('not enough bytes in HTTP response - %d expected, got %d' %
692 (self.expected_length, len(httpbody)))
693 else:
694
695
696 self.recvbuff = buffer_rest
697
698
699 self.expected_length = 0
700
701 if not self.http_persistent or self.close_current_connection:
702
703 self.disconnect(do_callback=False)
704 self.close_current_connection = False
705 self.last_recv_time = time.time()
706 self.on_receive(data=httpbody, socket=self)
707 self.on_http_request_possible()
708
710 """
711 Builds http message with given body. Values for headers and status line
712 fields are taken from class variables
713 """
714 absolute_uri = '%s://%s:%s%s' % (self.http_protocol, self.http_host,
715 self.http_port, self.http_path)
716 headers = ['%s %s %s' % (method, absolute_uri, self.http_version),
717 'Host: %s:%s' % (self.http_host, self.http_port),
718 'User-Agent: Gajim',
719 'Content-Type: text/xml; charset=utf-8',
720 'Content-Length: %s' % len(str(httpbody))]
721 if self.add_proxy_headers:
722 headers.append('Proxy-Connection: keep-alive')
723 headers.append('Pragma: no-cache')
724 if self.proxy_user and self.proxy_pass:
725 credentials = '%s:%s' % (self.proxy_user, self.proxy_pass)
726 credentials = base64.encodestring(credentials).strip()
727 headers.append('Proxy-Authorization: Basic %s' % credentials)
728 else:
729 headers.append('Connection: Keep-Alive')
730 headers.append('\r\n')
731 headers = '\r\n'.join(headers)
732 return('%s%s' % (headers, httpbody))
733
735 """
736 Split http message into a tuple:
737 (statusline - list of e.g. ['HTTP/1.1', '200', 'OK'],
738 headers - dictionary of headers e.g. {'Content-Length': '604',
739 'Content-Type': 'text/xml; charset=utf-8'},
740 httpbody - string with http body)
741 http_rest - what is left in the message after a full HTTP header + body
742 """
743 message = message.replace('\r', '')
744 message = message.lstrip('\n')
745 splitted = message.split('\n\n')
746 if len(splitted) < 2:
747
748 buffer_rest = message
749 return ('', '', '', buffer_rest)
750 else:
751 (header, httpbody) = splitted[:2]
752 header = header.split('\n')
753 statusline = header[0].split(' ', 2)
754 header = header[1:]
755 headers = {}
756 for dummy in header:
757 row = dummy.split(' ', 1)
758 headers[row[0][:-1]] = row[1]
759 body_size = headers['Content-Length']
760 rest_splitted = splitted[2:]
761 while (len(httpbody) < body_size) and rest_splitted:
762
763 httpbody = '\n\n'.join([httpbody, rest_splitted.pop(0)])
764 buffer_rest = "\n\n".join(rest_splitted)
765 return (statusline, headers, httpbody, buffer_rest)
766
767
769 """
770 Class for BOSH HTTP connections. Slightly redefines HTTP transport by
771 calling bosh bodytag generating callback before putting data on wire
772 """
773
775 self.build_cb = build_cb
776
787