1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 from common import gajim
21 import common.xmpp
22 from common.xmpp.idlequeue import IdleObject
23 from common.xmpp import dispatcher_nb, simplexml
24 from common.xmpp.plugin import *
25 from common.xmpp.simplexml import ustr
26 from common.xmpp.transports_nb import DATA_RECEIVED, DATA_SENT, DATA_ERROR
27 from common.zeroconf import zeroconf
28
29 from common.xmpp.protocol import *
30 import socket
31 import errno
32 import sys
33 import string
34 from random import Random
35
36 import logging
37 log = logging.getLogger('gajim.c.z.client_zeroconf')
38
39 from common.zeroconf import roster_zeroconf
40
41 MAX_BUFF_LEN = 65536
42 TYPE_SERVER, TYPE_CLIENT = range(2)
43
44
45 CONNECT_TIMEOUT_SECONDS = 10
46
47
48 ACTIVITY_TIMEOUT_SECONDS = 30
49
52 """
53 Handle all incomming connections on ('0.0.0.0', port)
54 """
55 self.port = port
56 self.queue_idx = -1
57
58 self.started = False
59 self._sock = None
60 self.fd = -1
61 self.caller = conn_holder.caller
62 self.conn_holder = conn_holder
63
65 flags = socket.AI_PASSIVE
66 if hasattr(socket, 'AI_ADDRCONFIG'):
67 flags |= socket.AI_ADDRCONFIG
68 ai = socket.getaddrinfo(None, self.port, socket.AF_UNSPEC,
69 socket.SOCK_STREAM, 0, flags)[0]
70 self._serv = socket.socket(ai[0], ai[1])
71 self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
72 self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
73 self._serv.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
74
75 try:
76 self._serv.bind((ai[4][0], self.port))
77 except Exception:
78
79 return None
80 self._serv.listen(socket.SOMAXCONN)
81 self._serv.setblocking(False)
82 self.fd = self._serv.fileno()
83 gajim.idlequeue.plug_idle(self, False, True)
84 self.started = True
85
87 """
88 Called when we stop listening on (host, port)
89 """
90 self.disconnect()
91
93 """
94 Accept a new incomming connection and notify queue
95 """
96 sock = self.accept_conn()
97
98 from_jid = None
99 ipaddr = sock[1][0]
100 for jid in self.conn_holder.getRoster().keys():
101 entry = self.conn_holder.getRoster().getItem(jid)
102 if (entry['address'] == ipaddr):
103 from_jid = jid
104 break
105 P2PClient(sock[0], ipaddr, sock[1][1], self.conn_holder, [], from_jid)
106
121
123 """
124 Accept a new incoming connection
125 """
126 _sock = self._serv.accept()
127 _sock[0].setblocking(False)
128 return _sock
129
131 - def __init__(self, _sock, host, port, conn_holder, stanzaqueue=[], to=None,
132 on_ok=None, on_not_ok=None):
133 self._owner = self
134 self.Namespace = 'jabber:client'
135 self.protocol_type = 'XMPP'
136 self.defaultNamespace = self.Namespace
137 self._component = 0
138 self._registered_name = None
139 self._caller = conn_holder.caller
140 self.conn_holder = conn_holder
141 self.stanzaqueue = stanzaqueue
142 self.to = to
143 self.Server = host
144 self.on_ok = on_ok
145 self.on_not_ok = on_not_ok
146 self.Connection = None
147 self.sock_hash = None
148 if _sock:
149 self.sock_type = TYPE_SERVER
150 else:
151 self.sock_type = TYPE_CLIENT
152 self.fd = -1
153 conn = P2PConnection('', _sock, host, port, self._caller, self.on_connect,
154 self)
155 if not self.conn_holder:
156
157 if on_not_ok:
158 on_not_ok('Connection to host could not be established.')
159 return
160 self.sock_hash = conn._sock.__hash__
161 self.fd = conn.fd
162 self.conn_holder.add_connection(self, self.Server, port, self.to)
163
164 for val in self.stanzaqueue:
165 stanza, is_message = val
166 if is_message:
167 if self.fd == -1:
168 if on_not_ok:
169 on_not_ok('Connection to host could not be established.')
170 return
171 thread_id = stanza.getThread()
172 id_ = stanza.getID()
173 if not id_:
174 id_ = self.Dispatcher.getAnID()
175 if self.conn_holder.ids_of_awaiting_messages.has_key(self.fd):
176 self.conn_holder.ids_of_awaiting_messages[self.fd].append((id_,
177 thread_id))
178 else:
179 self.conn_holder.ids_of_awaiting_messages[self.fd] = [(id_,
180 thread_id)]
181
182 self.on_responses = {}
183
185 if self.Connection:
186 if self.Connection.state == -1:
187 return False
188 self.send(stanza, is_message)
189 else:
190 self.stanzaqueue.append((stanza, is_message))
191
192 if is_message:
193 thread_id = stanza.getThread()
194 id_ = stanza.getID()
195 if not id_:
196 id_ = self.Dispatcher.getAnID()
197 if self.conn_holder.ids_of_awaiting_messages.has_key(self.fd):
198 self.conn_holder.ids_of_awaiting_messages[self.fd].append((id_,
199 thread_id))
200 else:
201 self.conn_holder.ids_of_awaiting_messages[self.fd] = [(id_,
202 thread_id)]
203
204 return True
205
207 id_, thread_id = \
208 self.conn_holder.ids_of_awaiting_messages[connection_id].pop(0)
209 if self.on_ok:
210 self.on_ok(id_)
211
212
213 self.on_ok = None
214
220
232
243
245 if ns != NS_STREAMS or tag != 'stream':
246 log.error('Incorrect stream start: (%s,%s).Terminating!' % (tag, ns), 'error')
247 self.Connection.disconnect()
248 if self.on_not_ok:
249 self.on_not_ok('Connection to host could not be established: Incorrect answer from server.')
250 return
251 if self.sock_type == TYPE_SERVER:
252 if attrs.has_key('from'):
253 self.to = attrs['from']
254 self.send_stream_header()
255 if attrs.has_key('version') and attrs['version'] == '1.0':
256
257 features = Node('stream:features')
258 self.Dispatcher.send(features)
259 while self.stanzaqueue:
260 stanza, is_message = self.stanzaqueue.pop(0)
261 self.send(stanza, is_message)
262 elif self.sock_type == TYPE_CLIENT:
263 while self.stanzaqueue:
264 stanza, is_message = self.stanzaqueue.pop(0)
265 self.send(stanza, is_message)
266
268 if self.conn_holder:
269 if self.conn_holder.ids_of_awaiting_messages.has_key(self.fd):
270 del self.conn_holder.ids_of_awaiting_messages[self.fd]
271 self.conn_holder.remove_connection(self.sock_hash)
272 if self.__dict__.has_key('Dispatcher'):
273 self.Dispatcher.PlugOut()
274 if self.__dict__.has_key('P2PConnection'):
275 self.P2PConnection.PlugOut()
276 self.Connection = None
277 self._caller = None
278 self.conn_holder = None
279
285
300
303
305 self._caller.peerhost = self.Connection._sock.getsockname()
306 self.RegisterHandler('message', lambda conn, data:self._caller._messageCB(
307 self.Server, conn, data))
308 self.RegisterHandler('iq', self._caller._siSetCB, 'set',
309 common.xmpp.NS_SI)
310 self.RegisterHandler('iq', self._caller._siErrorCB, 'error',
311 common.xmpp.NS_SI)
312 self.RegisterHandler('iq', self._caller._siResultCB, 'result',
313 common.xmpp.NS_SI)
314 self.RegisterHandler('iq', self._caller._bytestreamSetCB, 'set',
315 common.xmpp.NS_BYTESTREAM)
316 self.RegisterHandler('iq', self._caller._bytestreamResultCB, 'result',
317 common.xmpp.NS_BYTESTREAM)
318 self.RegisterHandler('iq', self._caller._bytestreamErrorCB, 'error',
319 common.xmpp.NS_BYTESTREAM)
320 self.RegisterHandler('iq', self._caller._DiscoverItemsGetCB, 'get',
321 common.xmpp.NS_DISCO_ITEMS)
322 self.RegisterHandler('iq', self._caller._JingleCB, 'result')
323 self.RegisterHandler('iq', self._caller._JingleCB, 'error')
324 self.RegisterHandler('iq', self._caller._JingleCB, 'set', common.xmpp.NS_JINGLE)
325
327 - def __init__(self, sock_hash, _sock, host=None, port=None, caller=None,
328 on_connect=None, client=None):
329 IdleObject.__init__(self)
330 self._owner = client
331 PlugIn.__init__(self)
332 self.sendqueue = []
333 self.sendbuff = None
334 self.buff_is_message = False
335 self._sock = _sock
336 self.sock_hash = None
337 self.host, self.port = host, port
338 self.on_connect = on_connect
339 self.client = client
340 self.writable = False
341 self.readable = False
342 self._exported_methods = [self.send, self.disconnect, self.onreceive]
343 self.on_receive = None
344 if _sock:
345 self._sock = _sock
346 self.state = 1
347 self._sock.setblocking(False)
348 self.fd = self._sock.fileno()
349 self.on_connect(self)
350 else:
351 self.state = 0
352 try:
353 self.ais = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
354 socket.SOCK_STREAM)
355 except socket.gaierror, e:
356 log.info('Lookup failure for %s: %s[%s]', host, e[1], repr(e[0]),
357 exc_info=True)
358 else:
359 self.connect_to_next_ip()
360
362 if len(self.ais) == 0:
363 log.error('Connection failure to %s', self.host, exc_info=True)
364 self.disconnect()
365 return
366 ai = self.ais.pop(0)
367 log.info('Trying to connect to %s through %s:%s', self.host, ai[4][0],
368 ai[4][1], exc_info=True)
369 try:
370 self._sock = socket.socket(*ai[:3])
371 self._sock.setblocking(False)
372 self._server = ai[4]
373 except socket.error:
374 if sys.exc_value[0] != errno.EINPROGRESS:
375
376 self.connect_to_next_ip()
377 return
378 self.fd = self._sock.fileno()
379 gajim.idlequeue.plug_idle(self, True, False)
380 self.set_timeout(CONNECT_TIMEOUT_SECONDS)
381 self.do_connect()
382
387
392
394 """
395 Disconnect from the remote server and unregister self.disconnected method
396 from the owner's dispatcher
397 """
398 self.disconnect()
399 self._owner = None
400
402 if not recv_handler:
403 if hasattr(self._owner, 'Dispatcher'):
404 self.on_receive = self._owner.Dispatcher.ProcessNonBlocking
405 else:
406 self.on_receive = None
407 return
408 _tmp = self.on_receive
409
410 if not recv_handler(None) and _tmp == self.on_receive:
411 self.on_receive = recv_handler
412
413 - def send(self, packet, is_message=False, now=False):
414 """
415 Append stanza to the queue of messages to be send if now is False, else
416 send it instantly
417
418 If supplied data is unicode string, encode it to UTF-8.
419 """
420 if self.state <= 0:
421 return
422
423 r = packet
424
425 if isinstance(r, unicode):
426 r = r.encode('utf-8')
427 elif not isinstance(r, str):
428 r = ustr(r).encode('utf-8')
429
430 if now:
431 self.sendqueue.insert(0, (r, is_message))
432 self._do_send()
433 else:
434 self.sendqueue.append((r, is_message))
435 self._plug_idle()
436
438 ids = self.client.conn_holder.ids_of_awaiting_messages
439 if self.fd in ids and len(ids[self.fd]) > 0:
440 for (id_, thread_id) in ids[self.fd]:
441 if hasattr(self._owner, 'Dispatcher'):
442 self._owner.Dispatcher.Event('', DATA_ERROR, (self.client.to,
443 thread_id))
444 else:
445 self._owner.on_not_ok('conenction timeout')
446 ids[self.fd] = []
447 self.pollend()
448
450 errnum = 0
451 try:
452 self._sock.connect(self._server)
453 self._sock.setblocking(False)
454 except Exception, ee:
455 (errnum, errstr) = ee
456 if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK):
457 return
458
459 elif errnum not in (0, 10056, errno.EISCONN) or self.state != 0:
460 log.error('Could not connect to %s: %s [%s]', self.host, errnum,
461 errstr)
462 self.connect_to_next_ip()
463 return
464 else:
465 self._sock.setblocking(False)
466 self.state = 1
467
468 self.on_connect(self)
469
476
480
482 """
483 Reads all pending incoming data. Call owner's disconnected() method if
484 appropriate
485 """
486 received = ''
487 errnum = 0
488 try:
489
490 received = self._sock.recv(MAX_BUFF_LEN)
491 except Exception, e:
492 if len(e.args) > 0 and isinstance(e.args[0], int):
493 errnum = e[0]
494
495 if errnum == socket.SSL_ERROR_WANT_READ:
496 pass
497 elif errnum in [errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN]:
498 self.pollend()
499
500 return
501 elif not received :
502 if errnum != socket.SSL_ERROR_EOF:
503
504 self.pollend()
505 if self.state >= 0:
506 self.disconnect()
507 return
508
509 if self.state < 0:
510 return
511 if self.on_receive:
512 if self._owner.sock_type == TYPE_CLIENT:
513 self.set_timeout(ACTIVITY_TIMEOUT_SECONDS)
514 if received.strip():
515 log.debug('received: %s', received)
516 if hasattr(self._owner, 'Dispatcher'):
517 self._owner.Dispatcher.Event('', DATA_RECEIVED, received)
518 self.on_receive(received)
519 else:
520
521 log.error('Unhandled data received: %s' % received)
522 self.disconnect()
523 return True
524
541
543 if not self.sendbuff:
544 if not self.sendqueue:
545 return None
546 self.sendbuff, self.buff_is_message = self.sendqueue.pop(0)
547 self.sent_data = self.sendbuff
548 try:
549 send_count = self._sock.send(self.sendbuff)
550 if send_count:
551 self.sendbuff = self.sendbuff[send_count:]
552 if not self.sendbuff and not self.sendqueue:
553 if self.state < 0:
554 gajim.idlequeue.unplug_idle(self.fd)
555 self._on_send()
556 self.disconnect()
557 return
558
559 self._plug_idle()
560 self._on_send()
561
562 except socket.error, e:
563 if e[0] == socket.SSL_ERROR_WANT_WRITE:
564 return True
565 if self.state < 0:
566 self.disconnect()
567 return
568 self._on_send_failure()
569 return
570 if self._owner.sock_type == TYPE_CLIENT:
571 self.set_timeout(ACTIVITY_TIMEOUT_SECONDS)
572 return True
573
575 readable = self.state != 0
576 if self.sendqueue or self.sendbuff:
577 writable = True
578 else:
579 writable = False
580 if self.writable != writable or self.readable != readable:
581 gajim.idlequeue.plug_idle(self, writable, readable)
582
583
585 if self.sent_data and self.sent_data.strip():
586 log.debug('sent: %s' % self.sent_data)
587 if hasattr(self._owner, 'Dispatcher'):
588 self._owner.Dispatcher.Event('', DATA_SENT, self.sent_data)
589 self.sent_data = None
590 if self.buff_is_message:
591 self._owner.on_message_sent(self.fd)
592 self.buff_is_message = False
593
595 log.error('Socket error while sending data')
596 self._owner.on_disconnect()
597 self.sent_data = None
598
601 self.caller = caller
602 self.zeroconf = None
603 self.roster = None
604 self.last_msg = ''
605 self.connections = {}
606 self.recipient_to_hash = {}
607 self.ip_to_hash = {}
608 self.hash_to_port = {}
609 self.listener = None
610 self.ids_of_awaiting_messages = {}
611 self.disconnect_handlers = []
612 self.disconnecting = False
613
624
628
632
638
642
649
665
667
668 if self.disconnecting:
669 return
670 if self.listener:
671 self.listener.disconnect()
672 self.listener = None
673 if self.zeroconf:
674 self.zeroconf.disconnect()
675 self.zeroconf = None
676 if self.roster:
677 self.roster.zeroconf = None
678 self.roster._data = None
679 self.roster = None
680 self.disconnecting = True
681 for i in reversed(self.disconnect_handlers):
682 log.debug('Calling disconnect handler %s' % i)
683 i()
684 self.disconnecting = False
685
688
692
694 sock_hash=connection.sock_hash
695 if sock_hash not in self.connections:
696 self.connections[sock_hash] = connection
697 self.ip_to_hash[ip] = sock_hash
698 self.hash_to_port[sock_hash] = port
699 if recipient:
700 self.recipient_to_hash[recipient] = sock_hash
701
703 if sock_hash in self.connections:
704 del self.connections[sock_hash]
705 for i in self.recipient_to_hash:
706 if self.recipient_to_hash[i] == sock_hash:
707 del self.recipient_to_hash[i]
708 break
709 for i in self.ip_to_hash:
710 if self.ip_to_hash[i] == sock_hash:
711 del self.ip_to_hash[i]
712 break
713 if self.hash_to_port.has_key(sock_hash):
714 del self.hash_to_port[sock_hash]
715
717 for p in range(port, port + 5):
718 self.listener = ZeroconfListener(p, self)
719 self.listener.bind()
720 if self.listener.started:
721 return p
722 self.listener = None
723 return False
724
726 if self.roster:
727 return self.roster.getRoster()
728 return {}
729
730 - def send(self, stanza, is_message=False, now=False, on_ok=None,
731 on_not_ok=None):
732 stanza.setFrom(self.roster.zeroconf.name)
733 to = unicode(stanza.getTo())
734 to = gajim.get_jid_without_resource(to)
735
736 try:
737 item = self.roster[to]
738 except KeyError:
739
740 return -1
741
742
743 if to in self.recipient_to_hash:
744 conn = self.connections[self.recipient_to_hash[to]]
745 id_ = stanza.getID() or ''
746 if conn.add_stanza(stanza, is_message):
747 if on_ok:
748 on_ok(id_)
749 return
750
751 if item['address'] in self.ip_to_hash:
752 hash_ = self.ip_to_hash[item['address']]
753 if self.hash_to_port[hash_] == item['port']:
754 conn = self.connections[hash_]
755 id_ = stanza.getID() or ''
756 if conn.add_stanza(stanza, is_message):
757 if on_ok:
758 on_ok(id_)
759 return
760
761
762 if not stanza.getID():
763 stanza.setID('zero')
764 P2PClient(None, item['address'], item['port'], self,
765 [(stanza, is_message)], to, on_ok=on_ok, on_not_ok=on_not_ok)
766
768 """
769 Generate a random id
770 """
771 return ''.join(Random().sample(string.letters + string.digits, 6))
772
774 """
775 Register handler that will be called on disconnect
776 """
777 self.disconnect_handlers.append(handler)
778
780 """
781 Unregister handler that is called on disconnect
782 """
783 self.disconnect_handlers.remove(handler)
784
786 """
787 Send stanza and wait for recipient's response to it. Will call transports
788 on_timeout callback if response is not retrieved in time
789
790 Be aware: Only timeout of latest call of SendAndWait is active.
791 """
792
793
794 def on_ok(_waitid):
795
796
797 to = stanza.getTo()
798 conn = None
799 if to in self.recipient_to_hash:
800 conn = self.connections[self.recipient_to_hash[to]]
801 elif item['address'] in self.ip_to_hash:
802 hash_ = self.ip_to_hash[item['address']]
803 if self.hash_to_port[hash_] == item['port']:
804 conn = self.connections[hash_]
805 if func:
806 conn.Dispatcher.on_responses[_waitid] = (func, args)
807 conn.onreceive(conn.Dispatcher._WaitForData)
808 conn.Dispatcher._expected[_waitid] = None
809 self.send(stanza, on_ok=on_ok)
810
812 """
813 Put stanza on the wire and call back when recipient replies. Additional
814 callback arguments can be specified in args.
815 """
816 self.SendAndWaitForResponse(stanza, 0, func, args)
817