traceback.print_exc
View license
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
| def run(root_coro): """Schedules a coroutine, running it to completion. This encapsulates the Bluelet scheduler, which the root coroutine can add to by spawning new coroutines. """ # The "threads" dictionary keeps track of all the currently- # executing and suspended coroutines. It maps coroutines to their # currently "blocking" event. The event value may be SUSPENDED if # the coroutine is waiting on some other condition: namely, a # delegated coroutine or a joined coroutine. In this case, the # coroutine should *also* appear as a value in one of the below # dictionaries `delegators` or `joiners`. threads = {root_coro: ValueEvent( None )} # Maps child coroutines to delegating parents. delegators = {} # Maps child coroutines to joining (exit-waiting) parents. joiners = collections.defaultdict( list ) def complete_thread(coro, return_value): """Remove a coroutine from the scheduling pool, awaking delegators and joiners as necessary and returning the specified value to any delegating parent. """ del threads[coro] # Resume delegator. if coro in delegators: threads[delegators[coro]] = ValueEvent(return_value) del delegators[coro] # Resume joiners. if coro in joiners: for parent in joiners[coro]: threads[parent] = ValueEvent( None ) del joiners[coro] def advance_thread(coro, value, is_exc = False ): """After an event is fired, run a given coroutine associated with it in the threads dict until it yields again. If the coroutine exits, then the thread is removed from the pool. If the coroutine raises an exception, it is reraised in a ThreadException. If is_exc is True, then the value must be an exc_info tuple and the exception is thrown into the coroutine. """ try : if is_exc: next_event = coro.throw( * value) else : next_event = coro.send(value) except StopIteration: # Thread is done. complete_thread(coro, None ) except : # Thread raised some other exception. del threads[coro] raise ThreadException(coro, sys.exc_info()) else : if isinstance (next_event, types.GeneratorType): # Automatically invoke sub-coroutines. (Shorthand for # explicit bluelet.call().) next_event = DelegationEvent(next_event) threads[coro] = next_event def kill_thread(coro): """Unschedule this thread and its (recursive) delegates. """ # Collect all coroutines in the delegation stack. coros = [coro] while isinstance (threads[coro], Delegated): coro = threads[coro].child coros.append(coro) # Complete each coroutine from the top to the bottom of the # stack. for coro in reversed (coros): complete_thread(coro, None ) # Continue advancing threads until root thread exits. exit_te = None while threads: try : # Look for events that can be run immediately. Continue # running immediate events until nothing is ready. while True : have_ready = False for coro, event in list (threads.items()): if isinstance (event, SpawnEvent): threads[event.spawned] = ValueEvent( None ) # Spawn. advance_thread(coro, None ) have_ready = True elif isinstance (event, ValueEvent): advance_thread(coro, event.value) have_ready = True elif isinstance (event, ExceptionEvent): advance_thread(coro, event.exc_info, True ) have_ready = True elif isinstance (event, DelegationEvent): threads[coro] = Delegated(event.spawned) # Suspend. threads[event.spawned] = ValueEvent( None ) # Spawn. delegators[event.spawned] = coro have_ready = True elif isinstance (event, ReturnEvent): # Thread is done. complete_thread(coro, event.value) have_ready = True elif isinstance (event, JoinEvent): threads[coro] = SUSPENDED # Suspend. joiners[event.child].append(coro) have_ready = True elif isinstance (event, KillEvent): threads[coro] = ValueEvent( None ) kill_thread(event.child) have_ready = True # Only start the select when nothing else is ready. if not have_ready: break # Wait and fire. event2coro = dict ((v, k) for k, v in threads.items()) for event in _event_select(threads.values()): # Run the IO operation, but catch socket errors. try : value = event.fire() except socket.error as exc: if isinstance (exc.args, tuple ) and \ exc.args[ 0 ] = = errno.EPIPE: # Broken pipe. Remote host disconnected. pass else : traceback.print_exc() # Abort the coroutine. threads[event2coro[event]] = ReturnEvent( None ) else : advance_thread(event2coro[event], value) except ThreadException as te: # Exception raised from inside a thread. event = ExceptionEvent(te.exc_info) if te.coro in delegators: # The thread is a delegate. Raise exception in its # delegator. threads[delegators[te.coro]] = event del delegators[te.coro] else : # The thread is root-level. Raise in client code. exit_te = te break except : # For instance, KeyboardInterrupt during select(). Raise # into root thread and terminate others. threads = {root_coro: ExceptionEvent(sys.exc_info())} # If any threads still remain, kill them. for coro in threads: coro.close() # If we're exiting with an exception, raise it in the client. if exit_te: exit_te.reraise()
|