root/branches/1.0/turbogears/scheduler.py

Revision 3366, 17.4 kB (checked in by carndt, 1 year ago)

Ran reindent.py on all .py files to remove extra line-ending space and add NL at eof

Line 
1 """Module that provides a cron-like task scheduler.
2
3 This task scheduler is designed to be used from inside your own program.
4 You can schedule Python functions to be called at specific intervals or
5 days. It uses the standard 'sched' module for the actual task scheduling,
6 but provides much more:
7     - repeated tasks (at intervals, or on specific days)
8     - error handling (exceptions in tasks don't kill the scheduler)
9     - optional to run scheduler in its own thread or separate process
10     - optional to run a task in its own thread or separate process
11
12 If the threading module is available, you can use the various Threaded
13 variants of the scheduler and associated tasks. If threading is not
14 available, you could still use the forked variants. If fork is also
15 not available, all processing is done in a single process, sequentially.
16
17 There are three Scheduler classes:
18     Scheduler    ThreadedScheduler    ForkedScheduler
19
20 You usually add new tasks to a scheduler using the add_interval_task or
21 add_daytime_task methods, with the appropriate processmethod argument
22 to select sequential, threaded or forked processing. NOTE: it is impossible
23 to add new tasks to a ForkedScheduler, after the scheduler has been started!
24 For more control you could use one of the following Task classes
25 and use schedule_task or schedule_task_abs:
26     IntervalTask    ThreadedIntervalTask    ForkedIntervalTask
27     WeekdayTask     ThreadedWeekdayTask     ForkedWeekdayTask
28     MonthdayTask    ThreadedMonthdayTask    ForkedMonthdayTask
29
30 Kronos is the Greek God of Time.
31
32 This module is based on Kronos by Irmen de Jong, but has been modified
33 to better fit within TurboGears. Additionally, this module appeared to
34 no longer be supported/in development.
35 """
36 #
37 #   $Id: kronos.py,v 1.5 2004/10/06 22:43:49 irmen Exp $
38 #
39 #   (c) Irmen de Jong.
40 #   This is open-source software, released under the MIT Software License:
41 #   http://www.opensource.org/licenses/mit-license.php
42 #
43
44
45 import os, sys
46 import sched, time
47 import traceback
48 import weakref
49
50 from turbogears.util import Enum
51
52 method = Enum("sequential", "forked", "threaded")
53
54 class Scheduler:
55     """The Scheduler itself."""
56
57     def __init__(self):
58         self.running=True
59         self.sched = sched.scheduler(time.time, self.__delayfunc)
60
61     def __delayfunc(self, delay):
62         # This delay function is basically a time.sleep() that is
63         # divided up, so that we can check the self.running flag while delaying.
64         # there is an additional check in here to ensure that the top item of
65         # the queue hasn't changed
66         if delay<10:
67             time.sleep(delay)
68         else:
69             toptime = self.sched.queue[0][0]
70             endtime = time.time() + delay
71             period=5
72             stoptime = endtime - period
73             while self.running and stoptime > time.time() and \
74                 self.sched.queue[0][0] == toptime:
75                 time.sleep(period)
76             if not self.running or self.sched.queue[0][0] != toptime:
77                 return
78             now = time.time()
79             if endtime > now:
80                 time.sleep(endtime - now)
81
82     def _acquire_lock(self):    pass
83     def _release_lock(self):    pass
84
85     def add_interval_task(self, action, taskname, initialdelay, interval, processmethod, args, kw):
86         """Add a new Interval Task to the schedule. A very short initialdelay or one of
87         zero cannot be honored, you will see a slight delay before the task is first
88         executed. This is because the scheduler needs to pick it up in its loop."""
89         if initialdelay<0 or interval<1:
90             raise ValueError("delay or interval must be >0")
91         # Select the correct IntervalTask class. Not all types may be available!
92         if processmethod==method.sequential:
93             TaskClass=IntervalTask
94         elif processmethod==method.threaded:
95             TaskClass = ThreadedIntervalTask
96         elif processmethod==method.forked:
97             TaskClass = ForkedIntervalTask
98         else:
99             raise ValueError("invalid processmethod")
100         if not args:
101             args=[]
102         if not kw:
103             kw={}
104         task = TaskClass(taskname, interval, action, args, kw)
105         self.schedule_task(task, initialdelay)
106         return task
107
108     def add_daytime_task(self, action, taskname, weekdays, monthdays, timeonday, processmethod, args, kw):
109         """Add a new Day Task (Weekday or Monthday) to the schedule."""
110         if weekdays and monthdays:
111             raise ValueError("you can only specify weekdays or monthdays, not both")
112         if not args:
113             args=[]
114         if not kw:
115             kw={}
116         if weekdays:
117             # Select the correct WeekdayTask class. Not all types may be available!
118             if processmethod==method.sequential:
119                 TaskClass=WeekdayTask
120             elif processmethod==method.threaded:
121                 TaskClass = ThreadedWeekdayTask
122             elif processmethod==method.forked:
123                 TaskClass = ForkedWeekdayTask
124             else:
125                 raise ValueError("invalid processmethod")
126             task=TaskClass(taskname, weekdays, timeonday, action, args, kw)
127         if monthdays:
128             # Select the correct MonthdayTask class. Not all types may be available!
129             if processmethod==method.sequential:
130                 TaskClass=MonthdayTask
131             elif processmethod==method.threaded:
132                 TaskClass = ThreadedMonthdayTask
133             elif processmethod==method.forked:
134                 TaskClass = ForkedMonthdayTask
135             else:
136                 raise ValueError("invalid processmethod")
137             task=TaskClass(taskname, monthdays, timeonday, action, args, kw)
138         firsttime=task.get_schedule_time(True)
139         self.schedule_task_abs(task, firsttime)
140         return task
141
142     def schedule_task(self, task, delay):
143         """Low-level method to add a new task to the scheduler with the given delay (seconds)."""
144         if self.running:
145             self._acquire_lock()   # lock the sched queue, if needed
146             try:
147                 task.event = self.sched.enter(delay, 0, task,
148                             (weakref.ref(self),) )
149             finally:
150                 self._release_lock()
151         else:
152             task.event = self.sched.enter(delay, 0, task,
153                         (weakref.ref(self),) )
154
155     def schedule_task_abs(self, task, abstime):
156         """Low-level method to add a new task to the scheduler for the given absolute time value."""
157         if self.running:
158             self._acquire_lock()     # lock the sched queue, if needed
159             try:
160                 task.event = self.sched.enterabs(abstime, 0, task,
161                                     (weakref.ref(self),) )
162             finally:
163                 self._release_lock()
164         else:
165             task.event = self.sched.enterabs(abstime, 0, task,
166                                 (weakref.ref(self),) )
167
168
169     def start(self):
170         """Start the scheduler."""
171         self._run()
172
173     def stop(self):
174         """Remove all pending tasks and stop the Scheduler."""
175         self.running=False
176         self.sched.queue[:]=[]
177
178     def cancel(self, task):
179         self.sched.cancel(task.event)
180
181     def _run(self):
182         # Low-level run method to do the actual scheduling loop.
183         while self.running:
184             try:
185                 self.sched.run()
186             except Exception,x:
187                 print >>sys.stderr, "ERROR DURING SCHEDULER EXECUTION",x
188                 print >>sys.stderr, "".join(traceback.format_exception(*sys.exc_info()))
189                 print >>sys.stderr, "-"*20
190             # queue is empty; sleep a short while before checking again
191             if self.running:
192                 time.sleep(5)
193
194
195 class Task:
196     """Abstract base class of all scheduler tasks"""
197     def __init__(self, name, action, args, kw):
198         """This is an abstract class!"""
199         self.name=name
200         self.action=action
201         self.args=args
202         self.kw=kw
203
204     def __call__(self, schedulerref):
205         """Execute the task action in the scheduler's thread."""
206         try:
207             self.execute()
208         except Exception,x:
209             self.handle_exception(x)
210         self.reschedule(schedulerref())
211
212     def reschedule(self, scheduler):
213         """This is an abstract class, this method is defined in one of the sub classes!"""
214         raise NotImplementedError("you're using the abstract base class 'Task', use a concrete class instead")
215
216     def execute(self):
217         """Execute the actual task."""
218         self.action(*self.args, **self.kw)
219
220     def handle_exception(self, exc):
221         """Handle any exception that occured during task execution."""
222         print >>sys.stderr, "ERROR DURING TASK EXECUTION",exc
223         print >>sys.stderr,"".join(traceback.format_exception(*sys.exc_info()))
224         print >>sys.stderr,"-"*20
225
226
227 class IntervalTask(Task):
228     """A repeated task that occurs at certain intervals (in seconds)."""
229     def __init__(self, name, interval, action, args=None, kw=None):
230         Task.__init__(self, name, action, args, kw)
231         self.interval=interval
232
233     def reschedule(self, scheduler):
234         # reschedule this task according to its interval (in seconds).
235         scheduler.schedule_task(self, self.interval)
236
237
238
239 class DayTaskRescheduler:
240     """A mixin class that contains the reschedule logic for the DayTasks."""
241     def __init__(self, timeonday):
242         self.timeonday=timeonday
243
244     def get_schedule_time(self, today):
245         """Calculate the time value at which this task is to be scheduled."""
246         now=list(time.localtime())
247         if today:
248             # schedule for today. let's see if that is still possible
249             if (now[3], now[4]) >= self.timeonday:
250                 now[2]+=1  # too bad, it will be tomorrow
251         else:
252             now[2]+=1   # tomorrow
253         now[3], now[4] = self.timeonday     # set new time on day (hour,minute)
254         now[5]=0 # seconds
255         return time.mktime(now)
256
257     def reschedule(self, scheduler):
258         # Reschedule this task according to the daytime for the task.
259         # The task is scheduled for tomorrow, for the given daytime.
260         # (The execute method in the concrete Task classes will check
261         # if the current day is a day on which the task must run).
262         abstime = self.get_schedule_time(False)
263         scheduler.schedule_task_abs(self, abstime)
264
265
266 class WeekdayTask(DayTaskRescheduler, Task):
267     """A task that is called at specific days in a week (1-7), at a fixed time on the day."""
268     def __init__(self, name, weekdays, timeonday, action, args=None, kw=None):
269         if type(timeonday) not in (list,tuple) or len(timeonday) != 2:
270             raise TypeError("timeonday must be a 2-tuple (hour,minute)")
271         if type(weekdays) not in (list,tuple):
272             raise TypeError("weekdays must be a sequence of weekday numbers 1-7 (1 is Monday)")
273         DayTaskRescheduler.__init__(self, timeonday)
274         Task.__init__(self, name, action, args, kw)
275         self.days=weekdays
276
277     def execute(self):
278         # This is called every day, at the correct time. We only need to
279         # check if we should run this task today (this day of the week).
280         weekday=time.localtime().tm_wday+1
281         if weekday in self.days:
282             self.action(*self.args, **self.kw)
283
284 class MonthdayTask(DayTaskRescheduler, Task):
285     """A task that is called at specific days in a month (1-31), at a fixed time on the day."""
286     def __init__(self, name, monthdays, timeonday, action, args=None, kw=None):
287         if type(timeonday) not in (list,tuple) or len(timeonday) != 2:
288             raise TypeError("timeonday must be a 2-tuple (hour,minute)")
289         if type(monthdays) not in (list,tuple):
290             raise TypeError("monthdays must be a sequence of monthdays numbers 1-31")
291         DayTaskRescheduler.__init__(self, timeonday)
292         Task.__init__(self, name, action, args, kw)
293         self.days=monthdays
294
295     def execute(self):
296         # This is called every day, at the correct time. We only need to
297         # check if we should run this task today (this day of the month).
298         if time.localtime().tm_mday in self.days:
299             self.action(*self.args, **self.kw)
300
301
302 try:
303     import threading
304
305     class ThreadedScheduler(Scheduler):
306         """A Scheduler that runs in its own thread."""
307         def __init__(self):
308             Scheduler.__init__(self)
309             self._lock=threading.Lock()     # we require a lock around the task queue
310         def start(self):
311             # Start method that splices of a thread in which the scheduler will run.
312             self.thread=threading.Thread(target=self._run)
313             self.thread.setDaemon(True)
314             self.thread.start()
315         def stop(self):
316             # Stop method that stops the scheduler and waits for the thread to finish.
317             Scheduler.stop(self)
318             try:
319                 self.thread.join()
320             except AttributeError:
321                 pass
322         def _acquire_lock(self):
323             self._lock.acquire()    # lock the thread's task queue
324         def _release_lock(self):
325             self._lock.release()    # release the thread's task queue
326
327     class ThreadedTaskMixin:
328         """A mixin class to make a Task execute in a separate thread."""
329         def __call__(self, schedulerref):
330             # execute the task action in its own thread.
331             threading.Thread(target=self.threadedcall).start()
332             self.reschedule(schedulerref())
333         def threadedcall(self):
334             # This method is run within its own thread, so we have to
335             # do the execute() call and exception handling here.
336             try:
337                 self.execute()
338             except Exception,x:
339                 self.handle_exception(x)
340
341     class ThreadedIntervalTask(ThreadedTaskMixin, IntervalTask):
342         """Interval Task that executes in its own thread."""
343         pass
344     class ThreadedWeekdayTask(ThreadedTaskMixin, WeekdayTask):
345         """Weekday Task that executes in its own thread."""
346         pass
347     class ThreadedMonthdayTask(ThreadedTaskMixin, MonthdayTask):
348         """Monthday Task that executes in its own thread."""
349         pass
350
351 except ImportError:
352     # threading is not available
353     pass
354
355
356 if hasattr(os,"fork"):
357     import signal
358
359     class ForkedScheduler(Scheduler):
360         """A Scheduler that runs in its own forked process."""
361         def __del__(self):
362             if hasattr(self, "childpid"):
363                 os.kill(self.childpid, signal.SIGKILL)
364         def start(self):
365             # Start method that forks of a new process in which the scheduler will run.
366             pid = os.fork()
367             if pid==0:
368                 # we are the child
369                 signal.signal(signal.SIGUSR1, self.signalhandler)
370                 self._run()
371                 os._exit(0)
372             else:
373                 # we are the parent
374                 self.childpid=pid
375                 del self.sched      # can no longer insert in the scheduler queue
376         def stop(self):
377             # Stop method that stops the scheduler and waits for the process to finish.
378             os.kill(self.childpid, signal.SIGUSR1)
379             os.waitpid(self.childpid,0)
380         def signalhandler(self, sig, stack):
381             Scheduler.stop(self)
382
383     class ForkedTaskMixin:
384         """A mixin class to make a Task execute in a separate process."""
385         def __call__(self, schedulerref):
386             # execute the task action in its own process.
387             pid=os.fork()
388             if pid==0:
389                 # we are the child
390                 try:
391                     self.execute()
392                 except Exception,x:
393                     self.handle_exception(x)
394                 os._exit(0)
395             else:
396                 # we are the parent
397                 self.reschedule(schedulerref())
398
399     class ForkedIntervalTask(ForkedTaskMixin, IntervalTask):
400         """Interval Task that executes in its own process."""
401         pass
402     class ForkedWeekdayTask(ForkedTaskMixin, WeekdayTask):
403         """Weekday Task that executes in its own process."""
404         pass
405     class ForkedMonthdayTask(ForkedTaskMixin, MonthdayTask):
406         """Monthday Task that executes in its own process."""
407         pass
408
409 _scheduler_instance = None
410
411 def _get_scheduler():
412     global _scheduler_instance
413     si = _scheduler_instance
414     if not si:
415         si = ThreadedScheduler()
416         _scheduler_instance = si
417     return si
418
419 def _start_scheduler():
420     si = _get_scheduler()
421     si.start()
422
423 def _stop_scheduler():
424     if not _scheduler_instance:
425         return
426     si = _get_scheduler()
427     si.stop()
428
429 def add_interval_task(action, interval, args=None, kw=None,
430         initialdelay=0, processmethod=method.threaded, taskname=None):
431     si = _get_scheduler()
432     return si.add_interval_task(action=action, interval=interval, args=args,
433             kw=kw, initialdelay=initialdelay,
434             processmethod=processmethod, taskname=taskname)
435
436 def add_weekday_task(action, weekdays, timeonday, args=None, kw=None,
437         processmethod=method.threaded, taskname=None):
438     si = _get_scheduler()
439     return si.add_daytime_task(action=action, taskname=taskname,
440             weekdays=weekdays, monthdays=None, timeonday=timeonday,
441             processmethod=processmethod, args=args, kw=kw)
442
443
444 def add_monthday_task(action, monthdays, timeonday,
445         args=None, kw=None,
446         processmethod=method.threaded, taskname=None):
447     si = _get_scheduler()
448     return si.add_daytime_task(action=action, taskname=taskname,
449             weekdays=None, monthdays=monthdays, timeonday=timeonday,
450             processmethod=processmethod, args=args, kw=kw)
451
452 def cancel(task):
453     si = _get_scheduler()
454     si.cancel(task)
Note: See TracBrowser for help on using the browser.