Package common :: Package xmpp :: Module idlequeue
[hide private]
[frames] | no frames]

Source Code for Module common.xmpp.idlequeue

  1  ##   idlequeue.py 
  2  ## 
  3  ##   Copyright (C) 2006 Dimitur Kirov <dkirov@gmail.com> 
  4  ## 
  5  ##   This program is free software; you can redistribute it and/or modify 
  6  ##   it under the terms of the GNU General Public License as published by 
  7  ##   the Free Software Foundation; either version 2, or (at your option) 
  8  ##   any later version. 
  9  ## 
 10  ##   This program is distributed in the hope that it will be useful, 
 11  ##   but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  ##   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 13  ##   GNU General Public License for more details. 
 14   
 15   
 16  """ 
 17  Idlequeues are Gajim's network heartbeat. Transports can be plugged as idle 
 18  objects and be informed about possible IO 
 19  """ 
 20   
 21  import os 
 22  import select 
 23  import logging 
 24  log = logging.getLogger('gajim.c.x.idlequeue') 
 25   
 26  # needed for get_idleqeue 
 27  try: 
 28      import gobject 
 29      HAVE_GOBJECT = True 
 30  except ImportError: 
 31      HAVE_GOBJECT = False 
 32   
 33  # needed for idlecommand 
 34  if os.name == 'nt': 
 35      from subprocess import * # python24 only. we ask this for Windows 
 36  elif os.name == 'posix': 
 37      import fcntl 
 38   
 39  FLAG_WRITE                      = 20 # write only 
 40  FLAG_READ                       = 19 # read only 
 41  FLAG_READ_WRITE = 23 # read and write 
 42  FLAG_CLOSE                      = 16 # wait for close 
 43   
 44  PENDING_READ            = 3 # waiting read event 
 45  PENDING_WRITE           = 4 # waiting write event 
 46  IS_CLOSED                       = 16 # channel closed 
 47   
 48   
49 -def get_idlequeue():
50 """ 51 Get an appropriate idlequeue 52 """ 53 if os.name == 'nt': 54 # gobject.io_add_watch does not work on windows 55 return SelectIdleQueue() 56 else: 57 if HAVE_GOBJECT: 58 # Gajim's default Idlequeue 59 return GlibIdleQueue() 60 else: 61 # GUI less implementation 62 return SelectIdleQueue()
63 64
65 -class IdleObject:
66 """ 67 Idle listener interface. Listed methods are called by IdleQueue. 68 """ 69
70 - def __init__(self):
71 self.fd = -1 #: filedescriptor, must be unique for each IdleObject
72
73 - def pollend(self):
74 """ 75 Called on stream failure 76 """ 77 pass
78
79 - def pollin(self):
80 """ 81 Called on new read event 82 """ 83 pass
84
85 - def pollout(self):
86 """ 87 Called on new write event (connect in sockets is a pollout) 88 """ 89 pass
90
91 - def read_timeout(self):
92 """ 93 Called when timeout happened 94 """ 95 pass
96 97
98 -class IdleCommand(IdleObject):
99 """ 100 Can be subclassed to execute commands asynchronously by the idlequeue. 101 Result will be optained via file descriptor of created pipe 102 """ 103
104 - def __init__(self, on_result):
105 IdleObject.__init__(self) 106 # how long (sec.) to wait for result ( 0 - forever ) 107 # it is a class var, instead of a constant and we can override it. 108 self.commandtimeout = 0 109 # when we have some kind of result (valid, ot not) we call this handler 110 self.result_handler = on_result 111 # if it is True, we can safetely execute the command 112 self.canexecute = True 113 self.idlequeue = None 114 self.result =''
115
116 - def set_idlequeue(self, idlequeue):
117 self.idlequeue = idlequeue
118
119 - def _return_result(self):
120 if self.result_handler: 121 self.result_handler(self.result) 122 self.result_handler = None
123
124 - def _compose_command_args(self):
125 return ['echo', 'da']
126
127 - def _compose_command_line(self):
128 """ 129 Return one line representation of command and its arguments 130 """ 131 return reduce(lambda left, right: left + ' ' + right, 132 self._compose_command_args())
133
134 - def wait_child(self):
135 if self.pipe.poll() is None: 136 # result timeout 137 if self.endtime < self.idlequeue.current_time(): 138 self._return_result() 139 self.pipe.stdout.close() 140 self.pipe.stdin.close() 141 else: 142 # child is still active, continue to wait 143 self.idlequeue.set_alarm(self.wait_child, 0.1) 144 else: 145 # child has quit 146 self.result = self.pipe.stdout.read() 147 self._return_result() 148 self.pipe.stdout.close() 149 self.pipe.stdin.close()
150
151 - def start(self):
152 if not self.canexecute: 153 self.result = '' 154 self._return_result() 155 return 156 if os.name == 'nt': 157 self._start_nt() 158 elif os.name == 'posix': 159 self._start_posix()
160
161 - def _start_nt(self):
162 # if gajim is started from noninteraactive shells stdin is closed and 163 # cannot be forwarded, so we have to keep it open 164 self.pipe = Popen(self._compose_command_args(), stdout=PIPE, 165 bufsize = 1024, shell = True, stderr = STDOUT, stdin = PIPE) 166 if self.commandtimeout >= 0: 167 self.endtime = self.idlequeue.current_time() + self.commandtimeout 168 self.idlequeue.set_alarm(self.wait_child, 0.1)
169
170 - def _start_posix(self):
171 self.pipe = os.popen(self._compose_command_line()) 172 self.fd = self.pipe.fileno() 173 fcntl.fcntl(self.pipe, fcntl.F_SETFL, os.O_NONBLOCK) 174 self.idlequeue.plug_idle(self, False, True) 175 if self.commandtimeout >= 0: 176 self.idlequeue.set_read_timeout(self.fd, self.commandtimeout)
177
178 - def end(self):
179 self.idlequeue.unplug_idle(self.fd) 180 try: 181 self.pipe.close() 182 except: 183 pass
184
185 - def pollend(self):
186 self.idlequeue.remove_timeout(self.fd) 187 self.end() 188 self._return_result()
189
190 - def pollin(self):
191 try: 192 res = self.pipe.read() 193 except Exception, e: 194 res = '' 195 if res == '': 196 return self.pollend() 197 else: 198 self.result += res
199
200 - def read_timeout(self):
201 self.end() 202 self._return_result()
203 204
205 -class IdleQueue:
206 """ 207 IdleQueue provide three distinct time based features. Uses select.poll() 208 209 1. Alarm timeout: Execute a callback after foo seconds 210 2. Timeout event: Call read_timeout() of an plugged object if a timeout 211 has been set, but not removed in time. 212 3. Check file descriptor of plugged objects for read, write and error 213 events 214 """ 215 216 # (timeout, boolean) 217 # Boolean is True if timeout is specified in seconds, False means miliseconds 218 PROCESS_TIMEOUT = (100, False) 219
220 - def __init__(self):
221 self.queue = {} 222 223 # when there is a timeout it executes obj.read_timeout() 224 # timeout is not removed automatically! 225 # {fd1: {timeout1: func1, timeout2: func2}} 226 # timout are unique (timeout1 must be != timeout2) 227 # If func1 is None, read_time function is called 228 self.read_timeouts = {} 229 230 # cb, which are executed after XX sec., alarms are removed automatically 231 self.alarms = {} 232 self._init_idle()
233
234 - def _init_idle(self):
235 """ 236 Hook method for subclassed. Will be called by __init__ 237 """ 238 self.selector = select.poll()
239
240 - def set_alarm(self, alarm_cb, seconds):
241 """ 242 Set up a new alarm. alarm_cb will be called after specified seconds. 243 """ 244 alarm_time = self.current_time() + seconds 245 # almost impossible, but in case we have another alarm_cb at this time 246 if alarm_time in self.alarms: 247 self.alarms[alarm_time].append(alarm_cb) 248 else: 249 self.alarms[alarm_time] = [alarm_cb] 250 return alarm_time
251
252 - def remove_alarm(self, alarm_cb, alarm_time):
253 """ 254 Remove alarm callback alarm_cb scheduled on alarm_time. Returns True if 255 it was removed sucessfully, otherwise False 256 """ 257 if not alarm_time in self.alarms: 258 return False 259 i = -1 260 for i in range(len(self.alarms[alarm_time])): 261 # let's not modify the list inside the loop 262 if self.alarms[alarm_time][i] is alarm_cb: 263 break 264 if i != -1: 265 del self.alarms[alarm_time][i] 266 if self.alarms[alarm_time] == []: 267 del self.alarms[alarm_time] 268 return True 269 else: 270 return False
271
272 - def remove_timeout(self, fd, timeout=None):
273 """ 274 Remove the read timeout 275 """ 276 log.info('read timeout removed for fd %s' % fd) 277 if fd in self.read_timeouts: 278 if timeout: 279 if timeout in self.read_timeouts[fd]: 280 del(self.read_timeouts[fd][timeout]) 281 if len(self.read_timeouts[fd]) == 0: 282 del(self.read_timeouts[fd]) 283 else: 284 del(self.read_timeouts[fd])
285
286 - def set_read_timeout(self, fd, seconds, func=None):
287 """ 288 Seta a new timeout. If it is not removed after specified seconds, 289 func or obj.read_timeout() will be called 290 291 A filedescriptor fd can have several timeouts. 292 """ 293 log_txt = 'read timeout set for fd %s on %s seconds' % (fd, seconds) 294 if func: 295 log_txt += ' with function ' + str(func) 296 log.info(log_txt) 297 timeout = self.current_time() + seconds 298 if fd in self.read_timeouts: 299 self.read_timeouts[fd][timeout] = func 300 else: 301 self.read_timeouts[fd] = {timeout: func}
302
303 - def _check_time_events(self):
304 """ 305 Execute and remove alarm callbacks and execute func() or read_timeout() 306 for plugged objects if specified time has ellapsed 307 """ 308 current_time = self.current_time() 309 310 for fd, timeouts in self.read_timeouts.items(): 311 if fd not in self.queue: 312 self.remove_timeout(fd) 313 continue 314 for timeout, func in timeouts.items(): 315 if timeout > current_time: 316 continue 317 if func: 318 log.debug('Calling %s for fd %s' % (func, fd)) 319 func() 320 else: 321 log.debug('Calling read_timeout for fd %s' % fd) 322 self.queue[fd].read_timeout() 323 self.remove_timeout(fd, timeout) 324 325 times = self.alarms.keys() 326 for alarm_time in times: 327 if alarm_time > current_time: 328 continue 329 if alarm_time in self.alarms: 330 for callback in self.alarms[alarm_time]: 331 callback() 332 if alarm_time in self.alarms: 333 del(self.alarms[alarm_time])
334
335 - def plug_idle(self, obj, writable=True, readable=True):
336 """ 337 Plug an IdleObject into idlequeue. Filedescriptor fd must be set 338 339 :param obj: the IdleObject 340 :param writable: True if obj has data to sent 341 :param readable: True if obj expects data to be reiceived 342 """ 343 if obj.fd == -1: 344 return 345 if obj.fd in self.queue: 346 self.unplug_idle(obj.fd) 347 self.queue[obj.fd] = obj 348 if writable: 349 if not readable: 350 flags = FLAG_WRITE 351 else: 352 flags = FLAG_READ_WRITE 353 else: 354 if readable: 355 flags = FLAG_READ 356 else: 357 # when we paused a FT, we expect only a close event 358 flags = FLAG_CLOSE 359 self._add_idle(obj.fd, flags)
360
361 - def _add_idle(self, fd, flags):
362 """ 363 Hook method for subclasses, called by plug_idle 364 """ 365 self.selector.register(fd, flags)
366
367 - def unplug_idle(self, fd):
368 """ 369 Remove plugged IdleObject, specified by filedescriptor fd 370 """ 371 if fd in self.queue: 372 del(self.queue[fd]) 373 self._remove_idle(fd)
374
375 - def current_time(self):
376 from time import time 377 return time()
378
379 - def _remove_idle(self, fd):
380 """ 381 Hook method for subclassed, called by unplug_idle 382 """ 383 self.selector.unregister(fd)
384
385 - def _process_events(self, fd, flags):
386 obj = self.queue.get(fd) 387 if obj is None: 388 self.unplug_idle(fd) 389 return False 390 391 if flags & PENDING_READ: 392 #print 'waiting read on %d, flags are %d' % (fd, flags) 393 obj.pollin() 394 return True 395 396 elif flags & PENDING_WRITE: 397 obj.pollout() 398 return True 399 400 elif flags & IS_CLOSED: 401 # io error, don't expect more events 402 self.remove_timeout(obj.fd) 403 self.unplug_idle(obj.fd) 404 obj.pollend() 405 return False
406
407 - def process(self):
408 """ 409 Process idlequeue. Check for any pending timeout or alarm events. Call 410 IdleObjects on possible and requested read, write and error events on 411 their file descriptors 412 413 Call this in regular intervals. 414 """ 415 if not self.queue: 416 # check for timeouts/alert also when there are no active fds 417 self._check_time_events() 418 return True 419 try: 420 waiting_descriptors = self.selector.poll(0) 421 except select.error, e: 422 waiting_descriptors = [] 423 if e[0] != 4: # interrupt 424 raise 425 for fd, flags in waiting_descriptors: 426 self._process_events(fd, flags) 427 self._check_time_events() 428 return True
429 430
431 -class SelectIdleQueue(IdleQueue):
432 """ 433 Extends IdleQueue to use select.select() for polling 434 435 This class exisists for the sake of gtk2.8 on windows, which doesn't seem to 436 support io_add_watch properly (yet) 437 """ 438
439 - def _init_idle(self):
440 """ 441 Create a dict, which maps file/pipe/sock descriptor to glib event id 442 """ 443 self.read_fds = {} 444 self.write_fds = {} 445 self.error_fds = {}
446
447 - def _add_idle(self, fd, flags):
448 """ 449 This method is called when we plug a new idle object. Remove descriptor 450 to read/write/error lists, according flags 451 """ 452 if flags & 3: 453 self.read_fds[fd] = fd 454 if flags & 4: 455 self.write_fds[fd] = fd 456 self.error_fds[fd] = fd
457
458 - def _remove_idle(self, fd):
459 """ 460 This method is called when we unplug a new idle object. Remove descriptor 461 from read/write/error lists 462 """ 463 if fd in self.read_fds: 464 del(self.read_fds[fd]) 465 if fd in self.write_fds: 466 del(self.write_fds[fd]) 467 if fd in self.error_fds: 468 del(self.error_fds[fd])
469
470 - def process(self):
471 if not self.write_fds and not self.read_fds: 472 self._check_time_events() 473 return True 474 try: 475 waiting_descriptors = select.select(self.read_fds.keys(), 476 self.write_fds.keys(), self.error_fds.keys(), 0) 477 except select.error, e: 478 waiting_descriptors = ((), (), ()) 479 if e[0] != 4: # interrupt 480 raise 481 for fd in waiting_descriptors[0]: 482 q = self.queue.get(fd) 483 if q: 484 q.pollin() 485 for fd in waiting_descriptors[1]: 486 q = self.queue.get(fd) 487 if q: 488 q.pollout() 489 for fd in waiting_descriptors[2]: 490 q = self.queue.get(fd) 491 if q: 492 q.pollend() 493 self._check_time_events() 494 return True
495 496
497 -class GlibIdleQueue(IdleQueue):
498 """ 499 Extends IdleQueue to use glib io_add_wath, instead of select/poll In another 500 'non gui' implementation of Gajim IdleQueue can be used safetly 501 """ 502 503 # (timeout, boolean) 504 # Boolean is True if timeout is specified in seconds, False means miliseconds 505 PROCESS_TIMEOUT = (2, True) 506
507 - def _init_idle(self):
508 """ 509 Creates a dict, which maps file/pipe/sock descriptor to glib event id 510 """ 511 self.events = {} 512 # time() is already called in glib, we just get the last value 513 # overrides IdleQueue.current_time() 514 self.current_time = gobject.get_current_time
515
516 - def _add_idle(self, fd, flags):
517 """ 518 This method is called when we plug a new idle object. Start listening for 519 events from fd 520 """ 521 res = gobject.io_add_watch(fd, flags, self._process_events, 522 priority=gobject.PRIORITY_LOW) 523 # store the id of the watch, so that we can remove it on unplug 524 self.events[fd] = res
525
526 - def _process_events(self, fd, flags):
527 try: 528 return IdleQueue._process_events(self, fd, flags) 529 except Exception: 530 self._remove_idle(fd) 531 self._add_idle(fd, flags) 532 raise
533
534 - def _remove_idle(self, fd):
535 """ 536 This method is called when we unplug a new idle object. Stop listening 537 for events from fd 538 """ 539 if not fd in self.events: 540 return 541 gobject.source_remove(self.events[fd]) 542 del(self.events[fd])
543
544 - def process(self):
545 self._check_time_events()
546