一、基础time模块实现
1.1 time.sleep()基础定时器
import time
def simple_timer(seconds):
"""简单阻塞式定时器"""
start_time = time.time()
time.sleep(seconds)
elapsed = time.time() - start_time
print(f"定时器触发,设定时间:{seconds}秒,实际耗时:{elapsed:.2f}秒")
# 使用示例
simple_timer(5) # 5秒后执行
![图片[1]_Python实现定时器的五种常见方法详解_知途无界](https://zhituwujie.com/wp-content/uploads/2025/08/d2b5ca33bd20250828110456.png)
1.2 非阻塞式循环定时器
def non_blocking_timer(interval, callback, max_times=10):
"""非阻塞循环定时器"""
start_time = time.time()
count = 0
while count < max_times:
current_time = time.time()
if current_time - start_time >= interval:
callback(count)
start_time = current_time
count += 1
time.sleep(0.1) # 降低CPU占用
# 使用示例
def my_callback(tick):
print(f"定时回调第{tick+1}次")
non_blocking_timer(1, my_callback, 5) # 每秒触发一次,共5次
二、threading.Timer实现
2.1 单次定时任务
from threading import Timer
def one_shot_timer(delay, function, args=None, kwargs=None):
"""单次定时任务"""
if args is None:
args = []
if kwargs is None:
kwargs = {}
timer = Timer(delay, function, args, kwargs)
timer.start()
return timer
# 使用示例
def greet(name):
print(f"Hello, {name}!")
one_shot_timer(3, greet, ["Alice"]) # 3秒后执行
2.2 循环定时任务
class RepeatingTimer:
"""可重复执行的定时器"""
def __init__(self, interval, function, args=None, kwargs=None):
self.interval = interval
self.function = function
self.args = args if args is not None else []
self.kwargs = kwargs if kwargs is not None else {}
self.timer = None
self.is_running = False
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self.timer = Timer(self.interval, self._run)
self.timer.start()
self.is_running = True
def stop(self):
if self.timer:
self.timer.cancel()
self.is_running = False
# 使用示例
def tick():
print("Tick-tock")
timer = RepeatingTimer(1, tick)
timer.start() # 每秒打印一次
time.sleep(5)
timer.stop() # 5秒后停止
三、sched模块实现
3.1 高级调度器
import sched
def scheduler_timer():
"""使用sched模块实现定时器"""
scheduler = sched.scheduler(time.time, time.sleep)
def print_time(msg):
print(f"调度时间:{msg} {time.ctime()}")
print(f"开始时间:{time.ctime()}")
scheduler.enter(2, 1, print_time, ("2秒后",))
scheduler.enter(5, 1, print_time, ("5秒后",))
scheduler.run()
# 使用示例
scheduler_timer()
3.2 带优先级的定时任务
def priority_scheduler():
"""带优先级的定时任务调度"""
s = sched.scheduler(time.time, time.sleep)
def print_msg(priority, msg):
print(f"[优先级{priority}] {msg}")
# 相同时间,优先级高的先执行
s.enter(3, 2, print_msg, (2, "优先级2的任务"))
s.enter(3, 1, print_msg, (1, "优先级1的任务"))
s.enterabs(time.time()+5, 3, print_msg, (3, "绝对时间5秒后"))
s.run()
# 使用示例
priority_scheduler()
四、asyncio异步定时器
4.1 协程定时任务
import asyncio
async def async_timer():
"""异步定时器"""
print(f"开始时间:{time.strftime('%X')}")
await asyncio.sleep(3)
print(f"3秒后:{time.strftime('%X')}")
# 使用示例
asyncio.run(async_timer())
4.2 周期性异步定时器
async def periodic_timer(interval, times):
"""周期性异步定时器"""
for i in range(times):
print(f"第{i+1}次触发 - {time.strftime('%X')}")
await asyncio.sleep(interval)
# 使用示例
asyncio.run(periodic_timer(1, 5)) # 每秒触发,共5次
4.3 多任务并行定时器
async def multi_task_timer():
"""多任务并行定时器"""
async def task1():
while True:
print("任务1执行")
await asyncio.sleep(1)
async def task2():
while True:
print(" 任务2执行")
await asyncio.sleep(2)
await asyncio.gather(task1(), task2())
# 使用示例(运行5秒后停止)
asyncio.run(asyncio.wait_for(multi_task_timer(), 5))
五、第三方库实现
5.1 APScheduler定时任务
from apscheduler.schedulers.blocking import BlockingScheduler
def apscheduler_demo():
"""APScheduler定时任务"""
scheduler = BlockingScheduler()
def job1():
print(f"定时任务1执行 {time.strftime('%X')}")
def job2():
print(f" 定时任务2执行 {time.strftime('%X')}")
# 添加任务
scheduler.add_job(job1, 'interval', seconds=2)
scheduler.add_job(job2, 'interval', seconds=5)
print('按 Ctrl+C 退出')
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
# 使用示例
apscheduler_demo()
5.2 schedule库实现
import schedule
def schedule_demo():
"""使用schedule库实现定时任务"""
def job():
print("定时任务执行")
# 设置定时规则
schedule.every(10).seconds.do(job)
schedule.every().minute.at(":30").do(job)
while True:
schedule.run_pending()
time.sleep(1)
# 使用示例(需手动停止)
# schedule_demo()
六、性能对比与选择建议
6.1 各方法对比表
| 方法 | 精度 | 线程安全 | 适用场景 | 复杂度 |
|---|---|---|---|---|
| time.sleep | 低 | 否 | 简单阻塞任务 | ★☆☆☆☆ |
| threading.Timer | 中 | 是 | 后台定时任务 | ★★☆☆☆ |
| sched | 中 | 否 | 复杂调度任务 | ★★★☆☆ |
| asyncio | 高 | 是 | 异步IO应用 | ★★★★☆ |
| APScheduler | 高 | 是 | 企业级调度 | ★★★★★ |
6.2 选择建议
graph TD
A[定时任务需求] --> B{是否需要高精度?}
B -->|是| C{是否异步环境?}
B -->|否| D{是否需要后台运行?}
C -->|是| E[asyncio]
C -->|否| F[APScheduler]
D -->|是| G[threading.Timer]
D -->|否| H[time.sleep/sched]
style E fill:#6f9,stroke:#333
style F fill:#6f9,stroke:#333
6.3 最佳实践示例
def optimal_timer_solution():
"""综合最佳实践方案"""
from concurrent.futures import ThreadPoolExecutor
import datetime
class PrecisionTimer:
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=3)
self.tasks = []
def add_task(self, callback, interval, repeat=False):
def task_wrapper():
next_time = time.time() + interval
while True:
now = time.time()
if now >= next_time:
try:
callback()
except Exception as e:
print(f"定时任务出错: {e}")
if not repeat:
break
next_time += interval
time.sleep(max(0, next_time - now - 0.1)) # 降低CPU占用
future = self.executor.submit(task_wrapper)
self.tasks.append(future)
return future
def stop_all(self):
for task in self.tasks:
task.cancel()
self.executor.shutdown(wait=False)
# 使用示例
timer = PrecisionTimer()
def job1():
print(f"任务1 - {datetime.datetime.now()}")
def job2():
print(f" 任务2 - {datetime.datetime.now()}")
timer.add_task(job1, 1, True) # 每秒执行
timer.add_task(job2, 3, True) # 每3秒执行
time.sleep(10)
timer.stop_all()
# 运行示例
optimal_timer_solution()
七、常见问题解决方案
7.1 定时器不准确问题
def high_precision_timer(interval, callback):
"""高精度定时器实现"""
next_time = time.time() + interval
while True:
current_time = time.time()
if current_time >= next_time:
callback()
next_time += interval
# 动态调整sleep时间提高精度
sleep_time = max(0, next_time - current_time - 0.001)
time.sleep(sleep_time)
# 使用示例(需手动停止)
# high_precision_timer(1, lambda: print(time.time()))
7.2 多线程定时器冲突
from threading import Lock
class ThreadSafeTimer:
"""线程安全定时器"""
def __init__(self):
self.lock = Lock()
self.timers = []
def add_timer(self, interval, callback):
with self.lock:
timer = Timer(interval, self._wrapper, [callback])
self.timers.append(timer)
timer.start()
def _wrapper(self, callback):
with self.lock:
try:
callback()
except Exception as e:
print(f"定时任务出错: {e}")
def stop_all(self):
with self.lock:
for timer in self.timers:
timer.cancel()
self.timers.clear()
# 使用示例
safe_timer = ThreadSafeTimer()
safe_timer.add_timer(1, lambda: print("安全定时器"))
time.sleep(5)
safe_timer.stop_all()
7.3 定时任务持久化方案
import pickle
import os
class PersistentScheduler:
"""可持久化的定时任务调度器"""
def __init__(self, storage_file="scheduler_state.pkl"):
self.storage_file = storage_file
self.scheduler = BlockingScheduler()
self._load_state()
def _load_state(self):
if os.path.exists(self.storage_file):
with open(self.storage_file, 'rb') as f:
jobs = pickle.load(f)
for job in jobs:
self.scheduler.add_job(**job)
def add_job(self, func, trigger, **kwargs):
self.scheduler.add_job(func, trigger, **kwargs)
self._save_state()
def _save_state(self):
jobs = []
for job in self.scheduler.get_jobs():
jobs.append({
'func': job.func,
'trigger': job.trigger,
'kwargs': job.kwargs
})
with open(self.storage_file, 'wb') as f:
pickle.dump(jobs, f)
def start(self):
try:
self.scheduler.start()
except (KeyboardInterrupt, SystemExit):
self._save_state()
self.scheduler.shutdown()
# 使用示例
def persistent_job():
print("持久化定时任务执行")
scheduler = PersistentScheduler()
scheduler.add_job(persistent_job, 'interval', seconds=3)
scheduler.start() # 按Ctrl+C停止后会保存状态
核心建议:
- 简单场景优先使用
threading.Timer - 异步环境选择
asyncio定时器 - 企业级应用推荐
APScheduler - 需要高精度时采用动态sleep调整策略
- 多线程环境务必考虑线程安全问题
注意事项:
- 长时间运行的定时任务需要处理异常
- 注意定时器可能存在的内存泄漏问题
- 分布式环境需使用专门的调度系统
- 生产环境建议添加日志记录和监控
© 版权声明
文中内容均来源于公开资料,受限于信息的时效性和复杂性,可能存在误差或遗漏。我们已尽力确保内容的准确性,但对于因信息变更或错误导致的任何后果,本站不承担任何责任。如需引用本文内容,请注明出处并尊重原作者的版权。
THE END

























暂无评论内容