1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 import socket
27 import struct
28 import hashlib
29 import os
30
31 from errno import EWOULDBLOCK
32 from errno import ENOBUFS
33 from errno import EINTR
34 from errno import EISCONN
35 from errno import EINPROGRESS
36 from errno import EAFNOSUPPORT
37 from xmpp.idlequeue import IdleObject
38 MAX_BUFF_LEN = 65536
39
40
41 STALLED_TIMEOUT = 10
42
43
44
45 CONNECT_TIMEOUT = 30
46
47
48
49 READ_TIMEOUT = 180
50
51
52
53 SEND_TIMEOUT = 180
54
56 """
57 Queue for all file requests objects
58 """
59
60 - def __init__(self, idlequeue, complete_transfer_cb=None,
61 progress_transfer_cb=None, error_cb=None):
62 self.connected = 0
63 self.readers = {}
64 self.files_props = {}
65 self.senders = {}
66 self.idx = 1
67 self.listener = None
68 self.sha_handlers = {}
69
70
71 self.idlequeue = idlequeue
72 self.complete_transfer_cb = complete_transfer_cb
73 self.progress_transfer_cb = progress_transfer_cb
74 self.error_cb = error_cb
75 self.on_success = None
76 self.on_failure = None
77
79 """
80 Start waiting for incomming connections on (host, port) and do a socks5
81 authentication using sid for generated SHA
82 """
83 self.sha_handlers[sha_str] = (sha_handler, sid)
84 if self.listener is None:
85 self.listener = Socks5Listener(self.idlequeue, port)
86 self.listener.queue = self
87 self.listener.bind()
88 if self.listener.started is False:
89 self.listener = None
90
91 self.error_cb(_('Unable to bind to port %s.') % port,
92 _('Maybe you have another running instance of Gajim. File '
93 'Transfer will be cancelled.'))
94 return None
95 self.connected += 1
96 return self.listener
97
99 if 'streamhost-used' in file_props and \
100 file_props['streamhost-used'] is True:
101 if 'proxyhosts' in file_props:
102 for proxy in file_props['proxyhosts']:
103 if proxy == streamhost:
104 self.on_success(streamhost)
105 return 2
106 return 0
107 if 'streamhosts' in file_props:
108 for host in file_props['streamhosts']:
109 if streamhost['state'] == 1:
110 return 0
111 streamhost['state'] = 1
112 self.on_success(streamhost)
113 return 1
114 return 0
115
127
129 """
130 Called when there is a host connected to one of the senders's
131 streamhosts. Stop othere attempts for connections
132 """
133 for host in file_props['streamhosts']:
134 if host != streamhost and 'idx' in host:
135 if host['state'] == 1:
136
137 self.remove_receiver(streamhost['idx'])
138 return
139
140
141 if host['state'] >= 0:
142 self.remove_receiver(host['idx'])
143 host['idx'] = -1
144 host['state'] = -2
145
147 """
148 Check the state of all streamhosts and if all has failed, then emit
149 connection failure cb. If there are some which are still not connected
150 try to establish connection to one of them
151 """
152 self.idlequeue.remove_timeout(receiver.fd)
153 self.idlequeue.unplug_idle(receiver.fd)
154 file_props = receiver.file_props
155 streamhost['state'] = -1
156
157 unused_hosts = False
158 for host in file_props['streamhosts']:
159 if 'idx' in host:
160 if host['state'] >= 0:
161 return
162 elif host['state'] == -2:
163 unused_hosts = True
164 if unused_hosts:
165 for host in file_props['streamhosts']:
166 if host['state'] == -2:
167 host['state'] = 0
168 receiver = Socks5Receiver(self.idlequeue, host, host['sid'],
169 file_props)
170 self.add_receiver(receiver.account, receiver)
171 host['idx'] = receiver.queue_idx
172
173 return
174 if 'received-len' not in file_props or file_props['received-len'] == 0:
175
176 self._connection_refused(streamhost, file_props, receiver.queue_idx)
177 else:
178
179 receiver.disconnect()
180 file_props['error'] = -1
181 self.process_result(-1, receiver)
182
184 """
185 Called when we loose connection during transfer
186 """
187 if file_props is None:
188 return
189 streamhost['state'] = -1
190 self.remove_receiver(idx, False)
191 if 'streamhosts' in file_props:
192 for host in file_props['streamhosts']:
193 if host['state'] != -1:
194 return
195
196 if 'failure_cb' in file_props and file_props['failure_cb']:
197 file_props['failure_cb'](streamhost['initiator'], streamhost['id'],
198 file_props['sid'], code = 404)
199 del(file_props['failure_cb'])
200
202 """
203 Add new file request
204 """
205 self.readers[self.idx] = sock5_receiver
206 sock5_receiver.queue_idx = self.idx
207 sock5_receiver.queue = self
208 sock5_receiver.account = account
209 self.idx += 1
210 result = sock5_receiver.connect()
211 self.connected += 1
212 if result is not None:
213 result = sock5_receiver.main()
214 self.process_result(result, sock5_receiver)
215 return 1
216 return None
217
219 if file_props is None:
220 return
221 if 'hash' in file_props and file_props['hash'] in self.senders:
222 sender = self.senders[file_props['hash']]
223 sender.account = account
224 result = self.get_file_contents(0)
225 self.process_result(result, sender)
226
228 if sha_str in self.sha_handlers:
229 props = self.sha_handlers[sha_str]
230 props[0](props[1], idx)
231
233 if idx not in self.readers:
234 return
235 reader = self.readers[idx]
236 if reader.file_props['type'] != 's':
237 return
238 if reader.state != 5:
239 return
240 reader.state = 6
241 if reader.connected:
242 reader.file_props['error'] = 0
243 reader.file_props['disconnect_cb'] = reader.disconnect
244 reader.file_props['started'] = True
245 reader.file_props['completed'] = False
246 reader.file_props['paused'] = False
247 reader.file_props['stalled'] = False
248 reader.file_props['elapsed-time'] = 0
249 reader.file_props['last-time'] = self.idlequeue.current_time()
250 reader.file_props['received-len'] = 0
251 reader.pauses = 0
252
253 self.idlequeue.set_read_timeout(reader.fd, STALLED_TIMEOUT)
254 self.idlequeue.plug_idle(reader, True, False)
255 result = reader.write_next()
256 self.process_result(result, reader)
257
259 if 'hash' in file_props and file_props['hash'] in self.senders:
260 sender = self.senders[file_props['hash']]
261 file_props['streamhost-used'] = True
262 sender.account = account
263 if file_props['type'] == 's':
264 sender.file_props = file_props
265 result = sender.send_file()
266 self.process_result(result, sender)
267 else:
268 file_props['elapsed-time'] = 0
269 file_props['last-time'] = self.idlequeue.current_time()
270 file_props['received-len'] = 0
271 sender.file_props = file_props
272
274 """
275 File_prop to the dict of current file_props. It is identified by account
276 name and sid
277 """
278 if file_props is None or ('sid' in file_props) is False:
279 return
280 _id = file_props['sid']
281 if account not in self.files_props:
282 self.files_props[account] = {}
283 self.files_props[account][_id] = file_props
284
286 if account in self.files_props:
287 fl_props = self.files_props[account]
288 if sid in fl_props:
289 del(fl_props[sid])
290
291 if len(self.files_props) == 0:
292 self.connected = 0
293
295 """
296 Get fil_prop by account name and session id
297 """
298 if account in self.files_props:
299 fl_props = self.files_props[account]
300 if sid in fl_props:
301 return fl_props[sid]
302 return None
303
305 sock_hash = sock.__hash__()
306 if sock_hash not in self.senders:
307 self.senders[sock_hash] = Socks5Sender(self.idlequeue, sock_hash, self,
308 sock[0], sock[1][0], sock[1][1])
309 self.connected += 1
310
312 """
313 Take appropriate actions upon the result:
314 [ 0, - 1 ] complete/end transfer
315 [ > 0 ] send progress message
316 [ None ] do nothing
317 """
318 if result is None:
319 return
320 if result in (0, -1) and self.complete_transfer_cb is not None:
321 account = actor.account
322 if account is None and 'tt_account' in actor.file_props:
323 account = actor.file_props['tt_account']
324 self.complete_transfer_cb(account, actor.file_props)
325 elif self.progress_transfer_cb is not None:
326 self.progress_transfer_cb(actor.account, actor.file_props)
327
329 """
330 Remove reciver from the list and decrease the number of active
331 connections with 1
332 """
333 if idx != -1:
334 if idx in self.readers:
335 reader = self.readers[idx]
336 self.idlequeue.unplug_idle(reader.fd)
337 self.idlequeue.remove_timeout(reader.fd)
338 if do_disconnect:
339 reader.disconnect()
340 else:
341 if reader.streamhost is not None:
342 reader.streamhost['state'] = -1
343 del(self.readers[idx])
344
346 """
347 Remove sender from the list of senders and decrease the number of active
348 connections with 1
349 """
350 if idx != -1:
351 if idx in self.senders:
352 if do_disconnect:
353 self.senders[idx].disconnect()
354 return
355 else:
356 del(self.senders[idx])
357 if self.connected > 0:
358 self.connected -= 1
359 if len(self.senders) == 0 and self.listener is not None:
360 self.listener.disconnect()
361 self.listener = None
362 self.connected -= 1
363
365 - def __init__(self, idlequeue, host, port, initiator, target, sid):
366 if host is not None:
367 try:
368 self.host = host
369 self.ais = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
370 socket.SOCK_STREAM)
371 except socket.gaierror:
372 self.ais = None
373 self.idlequeue = idlequeue
374 self.fd = -1
375 self.port = port
376 self.initiator = initiator
377 self.target = target
378 self.sid = sid
379 self._sock = None
380 self.account = None
381 self.state = 0
382 self.pauses = 0
383 self.size = 0
384 self.remaining_buff = ''
385 self.file = None
386
388 if self.file is None:
389 try:
390 self.file = open(self.file_props['file-name'], 'rb')
391 if 'offset' in self.file_props and self.file_props['offset']:
392 self.size = self.file_props['offset']
393 self.file.seek(self.size)
394 self.file_props['received-len'] = self.size
395 except IOError, e:
396 self.close_file()
397 raise IOError, e
398
400 if self.file:
401 if not self.file.closed:
402 try:
403 self.file.close()
404 except Exception:
405 pass
406 self.file = None
407
409 """
410 Test if file is already open and return its fd, or just open the file and
411 return the fd
412 """
413 if 'fd' in self.file_props:
414 fd = self.file_props['fd']
415 else:
416 offset = 0
417 opt = 'wb'
418 if 'offset' in self.file_props and self.file_props['offset']:
419 offset = self.file_props['offset']
420 opt = 'ab'
421 fd = open(self.file_props['file-name'], opt)
422 self.file_props['fd'] = fd
423 self.file_props['elapsed-time'] = 0
424 self.file_props['last-time'] = self.idlequeue.current_time()
425 self.file_props['received-len'] = offset
426 return fd
427
429 if 'fd' in self.file_props:
430 del(self.file_props['fd'])
431 try:
432 fd.close()
433 except Exception:
434 pass
435
437 """
438 Read small chunks of data. Call owner's disconnected() method if
439 appropriate
440 """
441 received = ''
442 try:
443 add = self._recv(64)
444 except Exception:
445 add = ''
446 received += add
447 if len(add) == 0:
448 self.disconnect()
449 return add
450
452 """
453 Write raw outgoing data
454 """
455 try:
456 self._send(raw_data)
457 except Exception:
458 self.disconnect()
459 return len(raw_data)
460
462 if self.remaining_buff != '':
463 buff = self.remaining_buff
464 self.remaining_buff = ''
465 else:
466 try:
467 self.open_file_for_reading()
468 except IOError, e:
469 self.state = 8
470 self.disconnect()
471 self.file_props['error'] = -7
472 return -1
473 buff = self.file.read(MAX_BUFF_LEN)
474 if len(buff) > 0:
475 lenn = 0
476 try:
477 lenn = self._send(buff)
478 except Exception, e:
479 if e.args[0] not in (EINTR, ENOBUFS, EWOULDBLOCK):
480
481 self.state = 8
482 self.disconnect()
483 self.file_props['error'] = -1
484 return -1
485 self.size += lenn
486 current_time = self.idlequeue.current_time()
487 self.file_props['elapsed-time'] += current_time - \
488 self.file_props['last-time']
489 self.file_props['last-time'] = current_time
490 self.file_props['received-len'] = self.size
491 if self.size >= int(self.file_props['size']):
492 self.state = 8
493 self.file_props['error'] = 0
494 self.disconnect()
495 return -1
496 if lenn != len(buff):
497 self.remaining_buff = buff[lenn:]
498 else:
499 self.remaining_buff = ''
500 self.state = 7
501 if lenn == 0:
502 return None
503 self.file_props['stalled'] = False
504 return lenn
505 else:
506 self.state = 8
507 self.disconnect()
508 return -1
509
510 - def get_file_contents(self, timeout):
511 """
512 Read file contents from socket and write them to file
513 """
514 if self.file_props is None or ('file-name' in self.file_props) is False:
515 self.file_props['error'] = -2
516 return None
517 fd = None
518 if self.remaining_buff != '':
519 try:
520 fd = self.get_fd()
521 except IOError, e:
522 self.disconnect(False)
523 self.file_props['error'] = -6
524 return 0
525 fd.write(self.remaining_buff)
526 lenn = len(self.remaining_buff)
527 current_time = self.idlequeue.current_time()
528 self.file_props['elapsed-time'] += current_time - \
529 self.file_props['last-time']
530 self.file_props['last-time'] = current_time
531 self.file_props['received-len'] += lenn
532 self.remaining_buff = ''
533 if self.file_props['received-len'] == int(self.file_props['size']):
534 self.rem_fd(fd)
535 self.disconnect()
536 self.file_props['error'] = 0
537 self.file_props['completed'] = True
538 return 0
539 else:
540 try:
541 fd = self.get_fd()
542 except IOError, e:
543 self.disconnect(False)
544 self.file_props['error'] = -6
545 return 0
546 try:
547 buff = self._recv(MAX_BUFF_LEN)
548 except Exception:
549 buff = ''
550 current_time = self.idlequeue.current_time()
551 self.file_props['elapsed-time'] += current_time - \
552 self.file_props['last-time']
553 self.file_props['last-time'] = current_time
554 self.file_props['received-len'] += len(buff)
555 if len(buff) == 0:
556
557
558 self.rem_fd(fd)
559 self.disconnect(False)
560 self.file_props['error'] = -1
561 return 0
562 try:
563 fd.write(buff)
564 except IOError, e:
565 self.rem_fd(fd)
566 self.disconnect(False)
567 self.file_props['error'] = -6
568 return 0
569 if self.file_props['received-len'] >= int(self.file_props['size']):
570
571 self.rem_fd(fd)
572 self.disconnect()
573 self.file_props['error'] = 0
574 self.file_props['completed'] = True
575 return 0
576
577 if fd is not None:
578 self.file_props['stalled'] = False
579 if fd is None and self.file_props['stalled'] is False:
580 return None
581 if 'received-len' in self.file_props:
582 if self.file_props['received-len'] != 0:
583 return self.file_props['received-len']
584 return None
585
587 """
588 Close open descriptors and remover socket descr. from idleque
589 """
590
591 self.close_file()
592 self.idlequeue.remove_timeout(self.fd)
593 self.idlequeue.unplug_idle(self.fd)
594 try:
595 self._sock.shutdown(socket.SHUT_RDWR)
596 self._sock.close()
597 except Exception:
598
599 pass
600 self.connected = False
601 self.fd = -1
602 self.state = -1
603
605 """
606 Message, that we support 1 one auth mechanism: the 'no auth' mechanism
607 """
608 return struct.pack('!BBB', 0x05, 0x01, 0x00)
609
611 """
612 Parse the initial message and create a list of auth mechanisms
613 """
614 auth_mechanisms = []
615 try:
616 num_auth = struct.unpack('!xB', buff[:2])[0]
617 for i in xrange(num_auth):
618 mechanism, = struct.unpack('!B', buff[1 + i])
619 auth_mechanisms.append(mechanism)
620 except Exception:
621 return None
622 return auth_mechanisms
623
625 """
626 Socks version(5), number of extra auth methods (we send 0x00 - no auth)
627 """
628 return struct.pack('!BB', 0x05, 0x00)
629
631 ''' Connect request by domain name '''
632 buff = struct.pack('!BBBBB%dsBB' % len(self.host),
633 0x05, 0x01, 0x00, 0x03, len(self.host), self.host,
634 self.port >> 8, self.port & 0xff)
635 return buff
636
638 """
639 Connect request by domain name, sid sha, instead of domain name (jep
640 0096)
641 """
642 buff = struct.pack('!BBBBB%dsBB' % len(msg),
643 0x05, command, 0x00, 0x03, len(msg), msg, 0, 0)
644 return buff
645
647 try:
648 req_type, host_type, = struct.unpack('!xBxB', buff[:4])
649 if host_type == 0x01:
650 host_arr = struct.unpack('!iiii', buff[4:8])
651 host, = '.'.join(str(s) for s in host_arr)
652 host_len = len(host)
653 elif host_type == 0x03:
654 host_len, = struct.unpack('!B', buff[4])
655 host, = struct.unpack('!%ds' % host_len, buff[5:5 + host_len])
656 portlen = len(buff[host_len + 5:])
657 if portlen == 1:
658 port, = struct.unpack('!B', buff[host_len + 5])
659 elif portlen == 2:
660 port, = struct.unpack('!H', buff[host_len + 5:])
661
662 else:
663 port, = struct.unpack('!H', buff[host_len + 5: host_len + 7])
664 self.remaining_buff = buff[host_len + 7:]
665 except Exception:
666 return (None, None, None)
667 return (req_type, host, port)
668
670 """
671 Connect response: version, auth method
672 """
673 buff = self._recv()
674 try:
675 version, method = struct.unpack('!BB', buff)
676 except Exception:
677 version, method = None, None
678 if version != 0x05 or method == 0xff:
679 self.disconnect()
680
688
690 """
691 Get sha of sid + Initiator jid + Target jid
692 """
693 if 'is_a_proxy' in self.file_props:
694 del(self.file_props['is_a_proxy'])
695 return hashlib.sha1('%s%s%s' % (self.sid,
696 self.file_props['proxy_sender'],
697 self.file_props['proxy_receiver'])).hexdigest()
698 return hashlib.sha1('%s%s%s' % (self.sid, self.initiator, self.target)).\
699 hexdigest()
700
702 """
703 Class for sending file to socket over socks5
704 """
705
706 - def __init__(self, idlequeue, sock_hash, parent, _sock, host=None,
707 port=None):
708 self.queue_idx = sock_hash
709 self.queue = parent
710 Socks5.__init__(self, idlequeue, host, port, None, None, None)
711 self._sock = _sock
712 self._sock.setblocking(False)
713 self.fd = _sock.fileno()
714 self._recv = _sock.recv
715 self._send = _sock.send
716 self.connected = True
717 self.state = 1
718 self.file_props = None
719
720 self.idlequeue.plug_idle(self, False, True)
721
734
764
770
772 if self.connected:
773 if self.state < 5:
774 result = self.main()
775 if self.state == 4:
776 self.queue.result_sha(self.sha_msg, self.queue_idx)
777 if result == -1:
778 self.disconnect()
779
780 elif self.state == 5:
781 if self.file_props is not None and self.file_props['type'] == 'r':
782 result = self.get_file_contents(0)
783 self.queue.process_result(result, self)
784 else:
785 self.disconnect()
786
788 """
789 Start sending the file over verified connection
790 """
791 if self.file_props['started']:
792 return
793 self.file_props['error'] = 0
794 self.file_props['disconnect_cb'] = self.disconnect
795 self.file_props['started'] = True
796 self.file_props['completed'] = False
797 self.file_props['paused'] = False
798 self.file_props['continue_cb'] = self.continue_paused_transfer
799 self.file_props['stalled'] = False
800 self.file_props['connected'] = True
801 self.file_props['elapsed-time'] = 0
802 self.file_props['last-time'] = self.idlequeue.current_time()
803 self.file_props['received-len'] = 0
804 self.pauses = 0
805 self.state = 7
806
807 self.idlequeue.plug_idle(self, True, False)
808 return self.write_next()
809
811 """
812 Initial requests for verifying the connection
813 """
814 if self.state == 1:
815 buff = self.receive()
816 if not self.connected:
817 return -1
818 mechs = self._parse_auth_buff(buff)
819 if mechs is None:
820 return -1
821 elif self.state == 3:
822 buff = self.receive()
823 req_type, self.sha_msg = self._parse_request_buff(buff)[:2]
824 if req_type != 0x01:
825 return -1
826 self.state += 1
827
828 self.idlequeue.plug_idle(self, True, False)
829 return None
830
832 """
833 Close the socket
834 """
835
836 Socks5.disconnect(self)
837 if self.file_props is not None:
838 self.file_props['connected'] = False
839 self.file_props['disconnect_cb'] = None
840 if self.queue is not None:
841 self.queue.remove_sender(self.queue_idx, False)
842
845 """
846 Handle all incomming connections on (0.0.0.0, port)
847
848 This class implements IdleObject, but we will expect
849 only pollin events though
850 """
851 self.port = port
852 self.ais = socket.getaddrinfo(None, port, socket.AF_UNSPEC,
853 socket.SOCK_STREAM, socket.SOL_TCP, socket.AI_PASSIVE)
854 self.ais.sort(reverse=True)
855 self.queue_idx = -1
856 self.idlequeue = idlequeue
857 self.queue = None
858 self.started = False
859 self._sock = None
860 self.fd = -1
861
863 for ai in self.ais:
864
865 try:
866 self._serv = socket.socket(*ai[:3])
867 except socket.error, e:
868 if e.args[0] == EAFNOSUPPORT:
869 self.ai = None
870 continue
871 raise
872 self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
873 self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
874 self._serv.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
875
876
877 if os.name == 'nt':
878 ver = os.sys.getwindowsversion()
879 if (ver[3], ver[0], ver[1]) == (2, 6, 0):
880
881 self._serv.setsockopt(socket.IPPROTO_IPV6, 27, 1)
882
883 try:
884 self._serv.bind(ai[4])
885 self.ai = ai
886 break
887 except Exception:
888 self.ai = None
889 continue
890 if not self.ai:
891
892 return None
893 self._serv.listen(socket.SOMAXCONN)
894 self._serv.setblocking(False)
895 self.fd = self._serv.fileno()
896 self.idlequeue.plug_idle(self, False, True)
897 self.started = True
898
900 """
901 Called when we stop listening on (host, port)
902 """
903 self.disconnect()
904
911
913 """
914 Free all resources, we are not listening anymore
915 """
916 self.idlequeue.remove_timeout(self.fd)
917 self.idlequeue.unplug_idle(self.fd)
918 self.fd = -1
919 self.state = -1
920 self.started = False
921 try:
922 self._serv.close()
923 except Exception:
924 pass
925
927 """
928 Accept a new incomming connection
929 """
930 _sock = self._serv.accept()
931 _sock[0].setblocking(False)
932 return _sock
933
935 - def __init__(self, idlequeue, streamhost, sid, file_props = None):
936 self.queue_idx = -1
937 self.streamhost = streamhost
938 self.queue = None
939 self.file_props = file_props
940 self.connect_timeout = 0
941 self.connected = False
942 self.pauses = 0
943 if not self.file_props:
944 self.file_props = {}
945 self.file_props['disconnect_cb'] = self.disconnect
946 self.file_props['error'] = 0
947 self.file_props['started'] = True
948 self.file_props['completed'] = False
949 self.file_props['paused'] = False
950 self.file_props['continue_cb'] = self.continue_paused_transfer
951 self.file_props['stalled'] = False
952 Socks5.__init__(self, idlequeue, streamhost['host'],
953 int(streamhost['port']), streamhost['initiator'], streamhost['target'],
954 sid)
955
972
974 """
975 Create the socket and plug it to the idlequeue
976 """
977 if self.ais is None:
978 return None
979
980 for ai in self.ais:
981 try:
982 self._sock = socket.socket(*ai[:3])
983
984 self._sock.setblocking(False)
985 self._server = ai[4]
986 break
987 except socket.error, e:
988 if not isinstance(e, basestring) and e[0] == EINPROGRESS:
989 break
990
991 continue
992 self.fd = self._sock.fileno()
993 self.state = 0
994 self.idlequeue.plug_idle(self, True, False)
995 self.do_connect()
996 self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT)
997 return None
998
1000 if self.state < 5:
1001 return False
1002 return True
1003
1024
1033
1052
1054 try:
1055 self._sock.connect(self._server)
1056 self._sock.setblocking(False)
1057 self._send=self._sock.send
1058 self._recv=self._sock.recv
1059 except Exception, ee:
1060 errnum = ee[0]
1061 self.connect_timeout += 1
1062 if errnum == 111 or self.connect_timeout > 1000:
1063 self.queue._connection_refused(self.streamhost,
1064 self.file_props, self.queue_idx)
1065 return None
1066
1067 elif errnum not in (10056, EISCONN) or self.state != 0:
1068 return None
1069 else:
1070 self._sock.setblocking(False)
1071 self._send=self._sock.send
1072 self._recv=self._sock.recv
1073 self.buff = ''
1074 self.connected = True
1075 self.file_props['connected'] = True
1076 self.file_props['disconnect_cb'] = self.disconnect
1077 self.state = 1
1078
1079
1080 self.queue._socket_connected(self.streamhost, self.file_props)
1081 self.idlequeue.plug_idle(self, True, False)
1082 return 1
1083
1084 - def main(self, timeout=0):
1085 """
1086 Begin negotiation. on success 'address' != 0
1087 """
1088 result = 1
1089 buff = self.receive()
1090 if buff == '':
1091
1092 self.pollend()
1093 return
1094
1095 if self.state == 2:
1096 if buff is None or len(buff) != 2:
1097 return None
1098 version, method = struct.unpack('!BB', buff[:2])
1099 if version != 0x05 or method == 0xff:
1100 self.disconnect()
1101 elif self.state == 4:
1102 if buff is None:
1103 return None
1104 sub_buff = buff[:4]
1105 if len(sub_buff) < 4:
1106 return None
1107 version, address_type = struct.unpack('!BxxB', buff[:4])
1108 addrlen = 0
1109 if address_type == 0x03:
1110 addrlen = ord(buff[4])
1111 address = struct.unpack('!%ds' % addrlen, buff[5:addrlen + 5])
1112 portlen = len(buff[addrlen + 5:])
1113 if portlen == 1:
1114 port, = struct.unpack('!B', buff[addrlen + 5])
1115 elif portlen == 2:
1116 port, = struct.unpack('!H', buff[addrlen + 5:])
1117 else:
1118 port, = struct.unpack('!H', buff[addrlen + 5:addrlen + 7])
1119 self.remaining_buff = buff[addrlen + 7:]
1120 self.state = 5
1121 if self.queue.on_success:
1122 result = self.queue.send_success_reply(self.file_props,
1123 self.streamhost)
1124 if result == 0:
1125 self.state = 8
1126 self.disconnect()
1127
1128
1129 if result == 1 and self.state == 5:
1130 if self.file_props['type'] == 's':
1131 self.file_props['error'] = 0
1132 self.file_props['disconnect_cb'] = self.disconnect
1133 self.file_props['started'] = True
1134 self.file_props['completed'] = False
1135 self.file_props['paused'] = False
1136 self.file_props['stalled'] = False
1137 self.file_props['elapsed-time'] = 0
1138 self.file_props['last-time'] = self.idlequeue.current_time()
1139 self.file_props['received-len'] = 0
1140 self.pauses = 0
1141
1142 self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT)
1143 self.idlequeue.plug_idle(self, True, False)
1144 else:
1145
1146 self.idlequeue.plug_idle(self, False, True)
1147 self.file_props['continue_cb'] = self.continue_paused_transfer
1148
1149 self.state = 6
1150 if self.state < 5:
1151 self.idlequeue.plug_idle(self, True, False)
1152 self.state += 1
1153 return None
1154
1156 """
1157 Close the socket. Remove self from queue if cb is True
1158 """
1159
1160 Socks5.disconnect(self)
1161 if cb is True:
1162 self.file_props['disconnect_cb'] = None
1163 if self.queue is not None:
1164 self.queue.remove_receiver(self.queue_idx, False)
1165