1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 Idlequeues are Gajim's network heartbeat. Transports can be plugged as idle
18 objects and be informed about possible IO
19 """
20
21 import os
22 import select
23 import logging
24 log = logging.getLogger('gajim.c.x.idlequeue')
25
26
27 try:
28 import gobject
29 HAVE_GOBJECT = True
30 except ImportError:
31 HAVE_GOBJECT = False
32
33
34 if os.name == 'nt':
35 from subprocess import *
36 elif os.name == 'posix':
37 import fcntl
38
39 FLAG_WRITE = 20
40 FLAG_READ = 19
41 FLAG_READ_WRITE = 23
42 FLAG_CLOSE = 16
43
44 PENDING_READ = 3
45 PENDING_WRITE = 4
46 IS_CLOSED = 16
47
48
63
64
66 """
67 Idle listener interface. Listed methods are called by IdleQueue.
68 """
69
72
74 """
75 Called on stream failure
76 """
77 pass
78
80 """
81 Called on new read event
82 """
83 pass
84
86 """
87 Called on new write event (connect in sockets is a pollout)
88 """
89 pass
90
92 """
93 Called when timeout happened
94 """
95 pass
96
97
99 """
100 Can be subclassed to execute commands asynchronously by the idlequeue.
101 Result will be optained via file descriptor of created pipe
102 """
103
105 IdleObject.__init__(self)
106
107
108 self.commandtimeout = 0
109
110 self.result_handler = on_result
111
112 self.canexecute = True
113 self.idlequeue = None
114 self.result =''
115
118
120 if self.result_handler:
121 self.result_handler(self.result)
122 self.result_handler = None
123
125 return ['echo', 'da']
126
128 """
129 Return one line representation of command and its arguments
130 """
131 return reduce(lambda left, right: left + ' ' + right,
132 self._compose_command_args())
133
150
160
169
177
184
189
191 try:
192 res = self.pipe.read()
193 except Exception, e:
194 res = ''
195 if res == '':
196 return self.pollend()
197 else:
198 self.result += res
199
203
204
206 """
207 IdleQueue provide three distinct time based features. Uses select.poll()
208
209 1. Alarm timeout: Execute a callback after foo seconds
210 2. Timeout event: Call read_timeout() of an plugged object if a timeout
211 has been set, but not removed in time.
212 3. Check file descriptor of plugged objects for read, write and error
213 events
214 """
215
216
217
218 PROCESS_TIMEOUT = (100, False)
219
221 self.queue = {}
222
223
224
225
226
227
228 self.read_timeouts = {}
229
230
231 self.alarms = {}
232 self._init_idle()
233
235 """
236 Hook method for subclassed. Will be called by __init__
237 """
238 self.selector = select.poll()
239
241 """
242 Set up a new alarm. alarm_cb will be called after specified seconds.
243 """
244 alarm_time = self.current_time() + seconds
245
246 if alarm_time in self.alarms:
247 self.alarms[alarm_time].append(alarm_cb)
248 else:
249 self.alarms[alarm_time] = [alarm_cb]
250 return alarm_time
251
253 """
254 Remove alarm callback alarm_cb scheduled on alarm_time. Returns True if
255 it was removed sucessfully, otherwise False
256 """
257 if not alarm_time in self.alarms:
258 return False
259 i = -1
260 for i in range(len(self.alarms[alarm_time])):
261
262 if self.alarms[alarm_time][i] is alarm_cb:
263 break
264 if i != -1:
265 del self.alarms[alarm_time][i]
266 if self.alarms[alarm_time] == []:
267 del self.alarms[alarm_time]
268 return True
269 else:
270 return False
271
273 """
274 Remove the read timeout
275 """
276 log.info('read timeout removed for fd %s' % fd)
277 if fd in self.read_timeouts:
278 if timeout:
279 if timeout in self.read_timeouts[fd]:
280 del(self.read_timeouts[fd][timeout])
281 if len(self.read_timeouts[fd]) == 0:
282 del(self.read_timeouts[fd])
283 else:
284 del(self.read_timeouts[fd])
285
287 """
288 Seta a new timeout. If it is not removed after specified seconds,
289 func or obj.read_timeout() will be called
290
291 A filedescriptor fd can have several timeouts.
292 """
293 log_txt = 'read timeout set for fd %s on %s seconds' % (fd, seconds)
294 if func:
295 log_txt += ' with function ' + str(func)
296 log.info(log_txt)
297 timeout = self.current_time() + seconds
298 if fd in self.read_timeouts:
299 self.read_timeouts[fd][timeout] = func
300 else:
301 self.read_timeouts[fd] = {timeout: func}
302
304 """
305 Execute and remove alarm callbacks and execute func() or read_timeout()
306 for plugged objects if specified time has ellapsed
307 """
308 current_time = self.current_time()
309
310 for fd, timeouts in self.read_timeouts.items():
311 if fd not in self.queue:
312 self.remove_timeout(fd)
313 continue
314 for timeout, func in timeouts.items():
315 if timeout > current_time:
316 continue
317 if func:
318 log.debug('Calling %s for fd %s' % (func, fd))
319 func()
320 else:
321 log.debug('Calling read_timeout for fd %s' % fd)
322 self.queue[fd].read_timeout()
323 self.remove_timeout(fd, timeout)
324
325 times = self.alarms.keys()
326 for alarm_time in times:
327 if alarm_time > current_time:
328 continue
329 if alarm_time in self.alarms:
330 for callback in self.alarms[alarm_time]:
331 callback()
332 if alarm_time in self.alarms:
333 del(self.alarms[alarm_time])
334
335 - def plug_idle(self, obj, writable=True, readable=True):
336 """
337 Plug an IdleObject into idlequeue. Filedescriptor fd must be set
338
339 :param obj: the IdleObject
340 :param writable: True if obj has data to sent
341 :param readable: True if obj expects data to be reiceived
342 """
343 if obj.fd == -1:
344 return
345 if obj.fd in self.queue:
346 self.unplug_idle(obj.fd)
347 self.queue[obj.fd] = obj
348 if writable:
349 if not readable:
350 flags = FLAG_WRITE
351 else:
352 flags = FLAG_READ_WRITE
353 else:
354 if readable:
355 flags = FLAG_READ
356 else:
357
358 flags = FLAG_CLOSE
359 self._add_idle(obj.fd, flags)
360
362 """
363 Hook method for subclasses, called by plug_idle
364 """
365 self.selector.register(fd, flags)
366
368 """
369 Remove plugged IdleObject, specified by filedescriptor fd
370 """
371 if fd in self.queue:
372 del(self.queue[fd])
373 self._remove_idle(fd)
374
376 from time import time
377 return time()
378
380 """
381 Hook method for subclassed, called by unplug_idle
382 """
383 self.selector.unregister(fd)
384
406
408 """
409 Process idlequeue. Check for any pending timeout or alarm events. Call
410 IdleObjects on possible and requested read, write and error events on
411 their file descriptors
412
413 Call this in regular intervals.
414 """
415 if not self.queue:
416
417 self._check_time_events()
418 return True
419 try:
420 waiting_descriptors = self.selector.poll(0)
421 except select.error, e:
422 waiting_descriptors = []
423 if e[0] != 4:
424 raise
425 for fd, flags in waiting_descriptors:
426 self._process_events(fd, flags)
427 self._check_time_events()
428 return True
429
430
432 """
433 Extends IdleQueue to use select.select() for polling
434
435 This class exisists for the sake of gtk2.8 on windows, which doesn't seem to
436 support io_add_watch properly (yet)
437 """
438
440 """
441 Create a dict, which maps file/pipe/sock descriptor to glib event id
442 """
443 self.read_fds = {}
444 self.write_fds = {}
445 self.error_fds = {}
446
448 """
449 This method is called when we plug a new idle object. Remove descriptor
450 to read/write/error lists, according flags
451 """
452 if flags & 3:
453 self.read_fds[fd] = fd
454 if flags & 4:
455 self.write_fds[fd] = fd
456 self.error_fds[fd] = fd
457
459 """
460 This method is called when we unplug a new idle object. Remove descriptor
461 from read/write/error lists
462 """
463 if fd in self.read_fds:
464 del(self.read_fds[fd])
465 if fd in self.write_fds:
466 del(self.write_fds[fd])
467 if fd in self.error_fds:
468 del(self.error_fds[fd])
469
471 if not self.write_fds and not self.read_fds:
472 self._check_time_events()
473 return True
474 try:
475 waiting_descriptors = select.select(self.read_fds.keys(),
476 self.write_fds.keys(), self.error_fds.keys(), 0)
477 except select.error, e:
478 waiting_descriptors = ((), (), ())
479 if e[0] != 4:
480 raise
481 for fd in waiting_descriptors[0]:
482 q = self.queue.get(fd)
483 if q:
484 q.pollin()
485 for fd in waiting_descriptors[1]:
486 q = self.queue.get(fd)
487 if q:
488 q.pollout()
489 for fd in waiting_descriptors[2]:
490 q = self.queue.get(fd)
491 if q:
492 q.pollend()
493 self._check_time_events()
494 return True
495
496
498 """
499 Extends IdleQueue to use glib io_add_wath, instead of select/poll In another
500 'non gui' implementation of Gajim IdleQueue can be used safetly
501 """
502
503
504
505 PROCESS_TIMEOUT = (2, True)
506
508 """
509 Creates a dict, which maps file/pipe/sock descriptor to glib event id
510 """
511 self.events = {}
512
513
514 self.current_time = gobject.get_current_time
515
517 """
518 This method is called when we plug a new idle object. Start listening for
519 events from fd
520 """
521 res = gobject.io_add_watch(fd, flags, self._process_events,
522 priority=gobject.PRIORITY_LOW)
523
524 self.events[fd] = res
525
533
535 """
536 This method is called when we unplug a new idle object. Stop listening
537 for events from fd
538 """
539 if not fd in self.events:
540 return
541 gobject.source_remove(self.events[fd])
542 del(self.events[fd])
543
546