Package common :: Package zeroconf :: Module client_zeroconf
[hide private]
[frames] | no frames]

Source Code for Module common.zeroconf.client_zeroconf

  1  ##      common/zeroconf/client_zeroconf.py 
  2  ## 
  3  ## Copyright (C) 2006 Stefan Bethge <stefan@lanpartei.de> 
  4  ##                              2006 Dimitur Kirov <dkirov@gmail.com> 
  5  ## 
  6  ## This file is part of Gajim. 
  7  ## 
  8  ## Gajim is free software; you can redistribute it and/or modify 
  9  ## it under the terms of the GNU General Public License as published 
 10  ## by the Free Software Foundation; version 3 only. 
 11  ## 
 12  ## Gajim is distributed in the hope that it will be useful, 
 13  ## but WITHOUT ANY WARRANTY; without even the implied warranty of 
 14  ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 15  ## GNU General Public License for more details. 
 16  ## 
 17  ## You should have received a copy of the GNU General Public License 
 18  ## along with Gajim.  If not, see <http://www.gnu.org/licenses/>. 
 19  ## 
 20  from common import gajim 
 21  import common.xmpp 
 22  from common.xmpp.idlequeue import IdleObject 
 23  from common.xmpp import dispatcher_nb, simplexml 
 24  from common.xmpp.plugin import * 
 25  from common.xmpp.simplexml import ustr 
 26  from common.xmpp.transports_nb import DATA_RECEIVED, DATA_SENT, DATA_ERROR 
 27  from common.zeroconf import zeroconf 
 28   
 29  from common.xmpp.protocol import * 
 30  import socket 
 31  import errno 
 32  import sys 
 33  import string 
 34  from random import Random 
 35   
 36  import logging 
 37  log = logging.getLogger('gajim.c.z.client_zeroconf') 
 38   
 39  from common.zeroconf import roster_zeroconf 
 40   
 41  MAX_BUFF_LEN = 65536 
 42  TYPE_SERVER, TYPE_CLIENT = range(2) 
 43   
 44  # wait XX sec to establish a connection 
 45  CONNECT_TIMEOUT_SECONDS = 10 
 46   
 47  # after XX sec with no activity, close the stream 
 48  ACTIVITY_TIMEOUT_SECONDS = 30 
 49   
50 -class ZeroconfListener(IdleObject):
51 - def __init__(self, port, conn_holder):
52 """ 53 Handle all incomming connections on ('0.0.0.0', port) 54 """ 55 self.port = port 56 self.queue_idx = -1 57 #~ self.queue = None 58 self.started = False 59 self._sock = None 60 self.fd = -1 61 self.caller = conn_holder.caller 62 self.conn_holder = conn_holder
63
64 - def bind(self):
65 flags = socket.AI_PASSIVE 66 if hasattr(socket, 'AI_ADDRCONFIG'): 67 flags |= socket.AI_ADDRCONFIG 68 ai = socket.getaddrinfo(None, self.port, socket.AF_UNSPEC, 69 socket.SOCK_STREAM, 0, flags)[0] 70 self._serv = socket.socket(ai[0], ai[1]) 71 self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 72 self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 73 self._serv.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 74 # will fail when port is busy, or we don't have rights to bind 75 try: 76 self._serv.bind((ai[4][0], self.port)) 77 except Exception: 78 # unable to bind, show error dialog 79 return None 80 self._serv.listen(socket.SOMAXCONN) 81 self._serv.setblocking(False) 82 self.fd = self._serv.fileno() 83 gajim.idlequeue.plug_idle(self, False, True) 84 self.started = True
85
86 - def pollend(self):
87 """ 88 Called when we stop listening on (host, port) 89 """ 90 self.disconnect()
91
92 - def pollin(self):
93 """ 94 Accept a new incomming connection and notify queue 95 """ 96 sock = self.accept_conn() 97 # loop through roster to find who has connected to us 98 from_jid = None 99 ipaddr = sock[1][0] 100 for jid in self.conn_holder.getRoster().keys(): 101 entry = self.conn_holder.getRoster().getItem(jid) 102 if (entry['address'] == ipaddr): 103 from_jid = jid 104 break 105 P2PClient(sock[0], ipaddr, sock[1][1], self.conn_holder, [], from_jid)
106
107 - def disconnect(self, message=''):
108 """ 109 Free all resources, we are not listening anymore 110 """ 111 log.info('Disconnecting ZeroconfListener: %s' % message) 112 gajim.idlequeue.remove_timeout(self.fd) 113 gajim.idlequeue.unplug_idle(self.fd) 114 self.fd = -1 115 self.started = False 116 try: 117 self._serv.close() 118 except socket.error: 119 pass 120 self.conn_holder.kill_all_connections()
121
122 - def accept_conn(self):
123 """ 124 Accept a new incoming connection 125 """ 126 _sock = self._serv.accept() 127 _sock[0].setblocking(False) 128 return _sock
129
130 -class P2PClient(IdleObject):
131 - def __init__(self, _sock, host, port, conn_holder, stanzaqueue=[], to=None, 132 on_ok=None, on_not_ok=None):
133 self._owner = self 134 self.Namespace = 'jabber:client' 135 self.protocol_type = 'XMPP' 136 self.defaultNamespace = self.Namespace 137 self._component = 0 138 self._registered_name = None 139 self._caller = conn_holder.caller 140 self.conn_holder = conn_holder 141 self.stanzaqueue = stanzaqueue 142 self.to = to 143 self.Server = host 144 self.on_ok = on_ok 145 self.on_not_ok = on_not_ok 146 self.Connection = None 147 self.sock_hash = None 148 if _sock: 149 self.sock_type = TYPE_SERVER 150 else: 151 self.sock_type = TYPE_CLIENT 152 self.fd = -1 153 conn = P2PConnection('', _sock, host, port, self._caller, self.on_connect, 154 self) 155 if not self.conn_holder: 156 # An error occured, disconnect() has been called 157 if on_not_ok: 158 on_not_ok('Connection to host could not be established.') 159 return 160 self.sock_hash = conn._sock.__hash__ 161 self.fd = conn.fd 162 self.conn_holder.add_connection(self, self.Server, port, self.to) 163 # count messages in queue 164 for val in self.stanzaqueue: 165 stanza, is_message = val 166 if is_message: 167 if self.fd == -1: 168 if on_not_ok: 169 on_not_ok('Connection to host could not be established.') 170 return 171 thread_id = stanza.getThread() 172 id_ = stanza.getID() 173 if not id_: 174 id_ = self.Dispatcher.getAnID() 175 if self.conn_holder.ids_of_awaiting_messages.has_key(self.fd): 176 self.conn_holder.ids_of_awaiting_messages[self.fd].append((id_, 177 thread_id)) 178 else: 179 self.conn_holder.ids_of_awaiting_messages[self.fd] = [(id_, 180 thread_id)] 181 182 self.on_responses = {}
183
184 - def add_stanza(self, stanza, is_message=False):
185 if self.Connection: 186 if self.Connection.state == -1: 187 return False 188 self.send(stanza, is_message) 189 else: 190 self.stanzaqueue.append((stanza, is_message)) 191 192 if is_message: 193 thread_id = stanza.getThread() 194 id_ = stanza.getID() 195 if not id_: 196 id_ = self.Dispatcher.getAnID() 197 if self.conn_holder.ids_of_awaiting_messages.has_key(self.fd): 198 self.conn_holder.ids_of_awaiting_messages[self.fd].append((id_, 199 thread_id)) 200 else: 201 self.conn_holder.ids_of_awaiting_messages[self.fd] = [(id_, 202 thread_id)] 203 204 return True
205
206 - def on_message_sent(self, connection_id):
207 id_, thread_id = \ 208 self.conn_holder.ids_of_awaiting_messages[connection_id].pop(0) 209 if self.on_ok: 210 self.on_ok(id_) 211 # use on_ok only on first message. For others it's called in 212 # ClientZeroconf 213 self.on_ok = None
214
215 - def on_connect(self, conn):
216 self.Connection = conn 217 self.Connection.PlugIn(self) 218 dispatcher_nb.Dispatcher().PlugIn(self) 219 self._register_handlers()
220
221 - def StreamInit(self):
222 """ 223 Send an initial stream header 224 """ 225 self.Dispatcher.Stream = simplexml.NodeBuilder() 226 self.Dispatcher.Stream._dispatch_depth = 2 227 self.Dispatcher.Stream.dispatch = self.Dispatcher.dispatch 228 self.Dispatcher.Stream.stream_header_received = self._check_stream_start 229 self.Dispatcher.Stream.features = None 230 if self.sock_type == TYPE_CLIENT: 231 self.send_stream_header()
232
233 - def send_stream_header(self):
234 self.Dispatcher._metastream = Node('stream:stream') 235 self.Dispatcher._metastream.setNamespace(self.Namespace) 236 self.Dispatcher._metastream.setAttr('version', '1.0') 237 self.Dispatcher._metastream.setAttr('xmlns:stream', NS_STREAMS) 238 self.Dispatcher._metastream.setAttr('from', self.conn_holder.zeroconf.name) 239 if self.to: 240 self.Dispatcher._metastream.setAttr('to', self.to) 241 self.Dispatcher.send("<?xml version='1.0'?>%s>" % str( 242 self.Dispatcher._metastream)[:-2])
243
244 - def _check_stream_start(self, ns, tag, attrs):
245 if ns != NS_STREAMS or tag != 'stream': 246 log.error('Incorrect stream start: (%s,%s).Terminating!' % (tag, ns), 'error') 247 self.Connection.disconnect() 248 if self.on_not_ok: 249 self.on_not_ok('Connection to host could not be established: Incorrect answer from server.') 250 return 251 if self.sock_type == TYPE_SERVER: 252 if attrs.has_key('from'): 253 self.to = attrs['from'] 254 self.send_stream_header() 255 if attrs.has_key('version') and attrs['version'] == '1.0': 256 # other part supports stream features 257 features = Node('stream:features') 258 self.Dispatcher.send(features) 259 while self.stanzaqueue: 260 stanza, is_message = self.stanzaqueue.pop(0) 261 self.send(stanza, is_message) 262 elif self.sock_type == TYPE_CLIENT: 263 while self.stanzaqueue: 264 stanza, is_message = self.stanzaqueue.pop(0) 265 self.send(stanza, is_message)
266
267 - def on_disconnect(self):
268 if self.conn_holder: 269 if self.conn_holder.ids_of_awaiting_messages.has_key(self.fd): 270 del self.conn_holder.ids_of_awaiting_messages[self.fd] 271 self.conn_holder.remove_connection(self.sock_hash) 272 if self.__dict__.has_key('Dispatcher'): 273 self.Dispatcher.PlugOut() 274 if self.__dict__.has_key('P2PConnection'): 275 self.P2PConnection.PlugOut() 276 self.Connection = None 277 self._caller = None 278 self.conn_holder = None
279
280 - def force_disconnect(self):
281 if self.Connection: 282 self.disconnect() 283 else: 284 self.on_disconnect()
285
286 - def _on_receive_document_attrs(self, data):
287 if data: 288 self.Dispatcher.ProcessNonBlocking(data) 289 if not hasattr(self, 'Dispatcher') or \ 290 self.Dispatcher.Stream._document_attrs is None: 291 return 292 self.onreceive(None) 293 if self.Dispatcher.Stream._document_attrs.has_key('version') and \ 294 self.Dispatcher.Stream._document_attrs['version'] == '1.0': 295 #~ self.onreceive(self._on_receive_stream_features) 296 #XXX continue with TLS 297 return 298 self.onreceive(None) 299 return True
300
301 - def remove_timeout(self):
302 pass
303
304 - def _register_handlers(self):
305 self._caller.peerhost = self.Connection._sock.getsockname() 306 self.RegisterHandler('message', lambda conn, data:self._caller._messageCB( 307 self.Server, conn, data)) 308 self.RegisterHandler('iq', self._caller._siSetCB, 'set', 309 common.xmpp.NS_SI) 310 self.RegisterHandler('iq', self._caller._siErrorCB, 'error', 311 common.xmpp.NS_SI) 312 self.RegisterHandler('iq', self._caller._siResultCB, 'result', 313 common.xmpp.NS_SI) 314 self.RegisterHandler('iq', self._caller._bytestreamSetCB, 'set', 315 common.xmpp.NS_BYTESTREAM) 316 self.RegisterHandler('iq', self._caller._bytestreamResultCB, 'result', 317 common.xmpp.NS_BYTESTREAM) 318 self.RegisterHandler('iq', self._caller._bytestreamErrorCB, 'error', 319 common.xmpp.NS_BYTESTREAM) 320 self.RegisterHandler('iq', self._caller._DiscoverItemsGetCB, 'get', 321 common.xmpp.NS_DISCO_ITEMS) 322 self.RegisterHandler('iq', self._caller._JingleCB, 'result') 323 self.RegisterHandler('iq', self._caller._JingleCB, 'error') 324 self.RegisterHandler('iq', self._caller._JingleCB, 'set', common.xmpp.NS_JINGLE)
325
326 -class P2PConnection(IdleObject, PlugIn):
327 - def __init__(self, sock_hash, _sock, host=None, port=None, caller=None, 328 on_connect=None, client=None):
329 IdleObject.__init__(self) 330 self._owner = client 331 PlugIn.__init__(self) 332 self.sendqueue = [] 333 self.sendbuff = None 334 self.buff_is_message = False 335 self._sock = _sock 336 self.sock_hash = None 337 self.host, self.port = host, port 338 self.on_connect = on_connect 339 self.client = client 340 self.writable = False 341 self.readable = False 342 self._exported_methods = [self.send, self.disconnect, self.onreceive] 343 self.on_receive = None 344 if _sock: 345 self._sock = _sock 346 self.state = 1 347 self._sock.setblocking(False) 348 self.fd = self._sock.fileno() 349 self.on_connect(self) 350 else: 351 self.state = 0 352 try: 353 self.ais = socket.getaddrinfo(host, port, socket.AF_UNSPEC, 354 socket.SOCK_STREAM) 355 except socket.gaierror, e: 356 log.info('Lookup failure for %s: %s[%s]', host, e[1], repr(e[0]), 357 exc_info=True) 358 else: 359 self.connect_to_next_ip()
360
361 - def connect_to_next_ip(self):
362 if len(self.ais) == 0: 363 log.error('Connection failure to %s', self.host, exc_info=True) 364 self.disconnect() 365 return 366 ai = self.ais.pop(0) 367 log.info('Trying to connect to %s through %s:%s', self.host, ai[4][0], 368 ai[4][1], exc_info=True) 369 try: 370 self._sock = socket.socket(*ai[:3]) 371 self._sock.setblocking(False) 372 self._server = ai[4] 373 except socket.error: 374 if sys.exc_value[0] != errno.EINPROGRESS: 375 # for all errors, we try other addresses 376 self.connect_to_next_ip() 377 return 378 self.fd = self._sock.fileno() 379 gajim.idlequeue.plug_idle(self, True, False) 380 self.set_timeout(CONNECT_TIMEOUT_SECONDS) 381 self.do_connect()
382
383 - def set_timeout(self, timeout):
384 gajim.idlequeue.remove_timeout(self.fd) 385 if self.state >= 0: 386 gajim.idlequeue.set_read_timeout(self.fd, timeout)
387
388 - def plugin(self, owner):
389 self.onreceive(owner._on_receive_document_attrs) 390 self._plug_idle() 391 return True
392
393 - def plugout(self):
394 """ 395 Disconnect from the remote server and unregister self.disconnected method 396 from the owner's dispatcher 397 """ 398 self.disconnect() 399 self._owner = None
400
401 - def onreceive(self, recv_handler):
402 if not recv_handler: 403 if hasattr(self._owner, 'Dispatcher'): 404 self.on_receive = self._owner.Dispatcher.ProcessNonBlocking 405 else: 406 self.on_receive = None 407 return 408 _tmp = self.on_receive 409 # make sure this cb is not overriden by recursive calls 410 if not recv_handler(None) and _tmp == self.on_receive: 411 self.on_receive = recv_handler
412
413 - def send(self, packet, is_message=False, now=False):
414 """ 415 Append stanza to the queue of messages to be send if now is False, else 416 send it instantly 417 418 If supplied data is unicode string, encode it to UTF-8. 419 """ 420 if self.state <= 0: 421 return 422 423 r = packet 424 425 if isinstance(r, unicode): 426 r = r.encode('utf-8') 427 elif not isinstance(r, str): 428 r = ustr(r).encode('utf-8') 429 430 if now: 431 self.sendqueue.insert(0, (r, is_message)) 432 self._do_send() 433 else: 434 self.sendqueue.append((r, is_message)) 435 self._plug_idle()
436
437 - def read_timeout(self):
438 ids = self.client.conn_holder.ids_of_awaiting_messages 439 if self.fd in ids and len(ids[self.fd]) > 0: 440 for (id_, thread_id) in ids[self.fd]: 441 if hasattr(self._owner, 'Dispatcher'): 442 self._owner.Dispatcher.Event('', DATA_ERROR, (self.client.to, 443 thread_id)) 444 else: 445 self._owner.on_not_ok('conenction timeout') 446 ids[self.fd] = [] 447 self.pollend()
448
449 - def do_connect(self):
450 errnum = 0 451 try: 452 self._sock.connect(self._server) 453 self._sock.setblocking(False) 454 except Exception, ee: 455 (errnum, errstr) = ee 456 if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK): 457 return 458 # win32 needs this 459 elif errnum not in (0, 10056, errno.EISCONN) or self.state != 0: 460 log.error('Could not connect to %s: %s [%s]', self.host, errnum, 461 errstr) 462 self.connect_to_next_ip() 463 return 464 else: # socket is already connected 465 self._sock.setblocking(False) 466 self.state = 1 # connected 467 # we are connected 468 self.on_connect(self)
469
470 - def pollout(self):
471 if self.state == 0: 472 self.do_connect() 473 return 474 gajim.idlequeue.remove_timeout(self.fd) 475 self._do_send()
476
477 - def pollend(self):
478 self.state = -1 479 self.disconnect()
480
481 - def pollin(self):
482 """ 483 Reads all pending incoming data. Call owner's disconnected() method if 484 appropriate 485 """ 486 received = '' 487 errnum = 0 488 try: 489 # get as many bites, as possible, but not more than RECV_BUFSIZE 490 received = self._sock.recv(MAX_BUFF_LEN) 491 except Exception, e: 492 if len(e.args) > 0 and isinstance(e.args[0], int): 493 errnum = e[0] 494 # "received" will be empty anyhow 495 if errnum == socket.SSL_ERROR_WANT_READ: 496 pass 497 elif errnum in [errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN]: 498 self.pollend() 499 # don't proccess result, cas it will raise error 500 return 501 elif not received : 502 if errnum != socket.SSL_ERROR_EOF: 503 # 8 EOF occurred in violation of protocol 504 self.pollend() 505 if self.state >= 0: 506 self.disconnect() 507 return 508 509 if self.state < 0: 510 return 511 if self.on_receive: 512 if self._owner.sock_type == TYPE_CLIENT: 513 self.set_timeout(ACTIVITY_TIMEOUT_SECONDS) 514 if received.strip(): 515 log.debug('received: %s', received) 516 if hasattr(self._owner, 'Dispatcher'): 517 self._owner.Dispatcher.Event('', DATA_RECEIVED, received) 518 self.on_receive(received) 519 else: 520 # This should never happed, so we need the debug 521 log.error('Unhandled data received: %s' % received) 522 self.disconnect() 523 return True
524
525 - def disconnect(self, message=''):
526 """ 527 Close the socket 528 """ 529 gajim.idlequeue.remove_timeout(self.fd) 530 gajim.idlequeue.unplug_idle(self.fd) 531 try: 532 self._sock.shutdown(socket.SHUT_RDWR) 533 self._sock.close() 534 except socket.error: 535 # socket is already closed 536 pass 537 self.fd = -1 538 self.state = -1 539 if self._owner: 540 self._owner.on_disconnect()
541
542 - def _do_send(self):
543 if not self.sendbuff: 544 if not self.sendqueue: 545 return None # nothing to send 546 self.sendbuff, self.buff_is_message = self.sendqueue.pop(0) 547 self.sent_data = self.sendbuff 548 try: 549 send_count = self._sock.send(self.sendbuff) 550 if send_count: 551 self.sendbuff = self.sendbuff[send_count:] 552 if not self.sendbuff and not self.sendqueue: 553 if self.state < 0: 554 gajim.idlequeue.unplug_idle(self.fd) 555 self._on_send() 556 self.disconnect() 557 return 558 # we are not waiting for write 559 self._plug_idle() 560 self._on_send() 561 562 except socket.error, e: 563 if e[0] == socket.SSL_ERROR_WANT_WRITE: 564 return True 565 if self.state < 0: 566 self.disconnect() 567 return 568 self._on_send_failure() 569 return 570 if self._owner.sock_type == TYPE_CLIENT: 571 self.set_timeout(ACTIVITY_TIMEOUT_SECONDS) 572 return True
573
574 - def _plug_idle(self):
575 readable = self.state != 0 576 if self.sendqueue or self.sendbuff: 577 writable = True 578 else: 579 writable = False 580 if self.writable != writable or self.readable != readable: 581 gajim.idlequeue.plug_idle(self, writable, readable)
582 583
584 - def _on_send(self):
585 if self.sent_data and self.sent_data.strip(): 586 log.debug('sent: %s' % self.sent_data) 587 if hasattr(self._owner, 'Dispatcher'): 588 self._owner.Dispatcher.Event('', DATA_SENT, self.sent_data) 589 self.sent_data = None 590 if self.buff_is_message: 591 self._owner.on_message_sent(self.fd) 592 self.buff_is_message = False
593
594 - def _on_send_failure(self):
595 log.error('Socket error while sending data') 596 self._owner.on_disconnect() 597 self.sent_data = None
598
599 -class ClientZeroconf:
600 - def __init__(self, caller):
601 self.caller = caller 602 self.zeroconf = None 603 self.roster = None 604 self.last_msg = '' 605 self.connections = {} 606 self.recipient_to_hash = {} 607 self.ip_to_hash = {} 608 self.hash_to_port = {} 609 self.listener = None 610 self.ids_of_awaiting_messages = {} 611 self.disconnect_handlers = [] 612 self.disconnecting = False
613
614 - def connect(self, show, msg):
615 self.port = self.start_listener(self.caller.port) 616 if not self.port: 617 return False 618 self.zeroconf_init(show, msg) 619 if not self.zeroconf.connect(): 620 self.disconnect() 621 return None 622 self.roster = roster_zeroconf.Roster(self.zeroconf) 623 return True
624
625 - def remove_announce(self):
626 if self.zeroconf: 627 return self.zeroconf.remove_announce()
628
629 - def announce(self):
630 if self.zeroconf: 631 return self.zeroconf.announce()
632
633 - def set_show_msg(self, show, msg):
634 if self.zeroconf: 635 self.zeroconf.txt['msg'] = msg 636 self.last_msg = msg 637 return self.zeroconf.update_txt(show)
638
639 - def resolve_all(self):
640 if self.zeroconf: 641 self.zeroconf.resolve_all()
642
643 - def reannounce(self, txt):
644 self.remove_announce() 645 self.zeroconf.txt = txt 646 self.zeroconf.port = self.port 647 self.zeroconf.username = self.caller.username 648 return self.announce()
649
650 - def zeroconf_init(self, show, msg):
651 self.zeroconf = zeroconf.Zeroconf(self.caller._on_new_service, 652 self.caller._on_remove_service, self.caller._on_name_conflictCB, 653 self.caller._on_disconnected, self.caller._on_error, 654 self.caller.username, self.caller.host, self.port) 655 self.zeroconf.txt['msg'] = msg 656 self.zeroconf.txt['status'] = show 657 self.zeroconf.txt['1st'] = self.caller.first 658 self.zeroconf.txt['last'] = self.caller.last 659 self.zeroconf.txt['jid'] = self.caller.jabber_id 660 self.zeroconf.txt['email'] = self.caller.email 661 self.zeroconf.username = self.caller.username 662 self.zeroconf.host = self.caller.host 663 self.zeroconf.port = self.port 664 self.last_msg = msg
665
666 - def disconnect(self):
667 # to avoid recursive calls 668 if self.disconnecting: 669 return 670 if self.listener: 671 self.listener.disconnect() 672 self.listener = None 673 if self.zeroconf: 674 self.zeroconf.disconnect() 675 self.zeroconf = None 676 if self.roster: 677 self.roster.zeroconf = None 678 self.roster._data = None 679 self.roster = None 680 self.disconnecting = True 681 for i in reversed(self.disconnect_handlers): 682 log.debug('Calling disconnect handler %s' % i) 683 i() 684 self.disconnecting = False
685
686 - def start_disconnect(self):
687 self.disconnect()
688
689 - def kill_all_connections(self):
692
693 - def add_connection(self, connection, ip, port, recipient):
694 sock_hash=connection.sock_hash 695 if sock_hash not in self.connections: 696 self.connections[sock_hash] = connection 697 self.ip_to_hash[ip] = sock_hash 698 self.hash_to_port[sock_hash] = port 699 if recipient: 700 self.recipient_to_hash[recipient] = sock_hash
701
702 - def remove_connection(self, sock_hash):
703 if sock_hash in self.connections: 704 del self.connections[sock_hash] 705 for i in self.recipient_to_hash: 706 if self.recipient_to_hash[i] == sock_hash: 707 del self.recipient_to_hash[i] 708 break 709 for i in self.ip_to_hash: 710 if self.ip_to_hash[i] == sock_hash: 711 del self.ip_to_hash[i] 712 break 713 if self.hash_to_port.has_key(sock_hash): 714 del self.hash_to_port[sock_hash]
715
716 - def start_listener(self, port):
717 for p in range(port, port + 5): 718 self.listener = ZeroconfListener(p, self) 719 self.listener.bind() 720 if self.listener.started: 721 return p 722 self.listener = None 723 return False
724
725 - def getRoster(self):
726 if self.roster: 727 return self.roster.getRoster() 728 return {}
729
730 - def send(self, stanza, is_message=False, now=False, on_ok=None, 731 on_not_ok=None):
732 stanza.setFrom(self.roster.zeroconf.name) 733 to = unicode(stanza.getTo()) 734 to = gajim.get_jid_without_resource(to) 735 736 try: 737 item = self.roster[to] 738 except KeyError: 739 # Contact offline 740 return -1 741 742 # look for hashed connections 743 if to in self.recipient_to_hash: 744 conn = self.connections[self.recipient_to_hash[to]] 745 id_ = stanza.getID() or '' 746 if conn.add_stanza(stanza, is_message): 747 if on_ok: 748 on_ok(id_) 749 return 750 751 if item['address'] in self.ip_to_hash: 752 hash_ = self.ip_to_hash[item['address']] 753 if self.hash_to_port[hash_] == item['port']: 754 conn = self.connections[hash_] 755 id_ = stanza.getID() or '' 756 if conn.add_stanza(stanza, is_message): 757 if on_ok: 758 on_ok(id_) 759 return 760 761 # otherwise open new connection 762 if not stanza.getID(): 763 stanza.setID('zero') 764 P2PClient(None, item['address'], item['port'], self, 765 [(stanza, is_message)], to, on_ok=on_ok, on_not_ok=on_not_ok)
766
767 - def getAnID(self):
768 """ 769 Generate a random id 770 """ 771 return ''.join(Random().sample(string.letters + string.digits, 6))
772
773 - def RegisterDisconnectHandler(self, handler):
774 """ 775 Register handler that will be called on disconnect 776 """ 777 self.disconnect_handlers.append(handler)
778
779 - def UnregisterDisconnectHandler(self, handler):
780 """ 781 Unregister handler that is called on disconnect 782 """ 783 self.disconnect_handlers.remove(handler)
784
785 - def SendAndWaitForResponse(self, stanza, timeout=None, func=None, args=None):
786 """ 787 Send stanza and wait for recipient's response to it. Will call transports 788 on_timeout callback if response is not retrieved in time 789 790 Be aware: Only timeout of latest call of SendAndWait is active. 791 """ 792 # if timeout is None: 793 # timeout = DEFAULT_TIMEOUT_SECONDS 794 def on_ok(_waitid): 795 # if timeout: 796 # self._owner.set_timeout(timeout) 797 to = stanza.getTo() 798 conn = None 799 if to in self.recipient_to_hash: 800 conn = self.connections[self.recipient_to_hash[to]] 801 elif item['address'] in self.ip_to_hash: 802 hash_ = self.ip_to_hash[item['address']] 803 if self.hash_to_port[hash_] == item['port']: 804 conn = self.connections[hash_] 805 if func: 806 conn.Dispatcher.on_responses[_waitid] = (func, args) 807 conn.onreceive(conn.Dispatcher._WaitForData) 808 conn.Dispatcher._expected[_waitid] = None
809 self.send(stanza, on_ok=on_ok)
810
811 - def SendAndCallForResponse(self, stanza, func=None, args=None):
812 """ 813 Put stanza on the wire and call back when recipient replies. Additional 814 callback arguments can be specified in args. 815 """ 816 self.SendAndWaitForResponse(stanza, 0, func, args)
817