Python实现定时器的五种常见方法详解

一、基础time模块实现

1.1 time.sleep()基础定时器

import time

def simple_timer(seconds):
    """简单阻塞式定时器"""
    start_time = time.time()
    time.sleep(seconds)
    elapsed = time.time() - start_time
    print(f"定时器触发,设定时间:{seconds}秒,实际耗时:{elapsed:.2f}秒")

# 使用示例
simple_timer(5)  # 5秒后执行
图片[1]_Python实现定时器的五种常见方法详解_知途无界

1.2 非阻塞式循环定时器

def non_blocking_timer(interval, callback, max_times=10):
    """非阻塞循环定时器"""
    start_time = time.time()
    count = 0
    
    while count < max_times:
        current_time = time.time()
        if current_time - start_time >= interval:
            callback(count)
            start_time = current_time
            count += 1
        time.sleep(0.1)  # 降低CPU占用

# 使用示例
def my_callback(tick):
    print(f"定时回调第{tick+1}次")

non_blocking_timer(1, my_callback, 5)  # 每秒触发一次,共5次

二、threading.Timer实现

2.1 单次定时任务

from threading import Timer

def one_shot_timer(delay, function, args=None, kwargs=None):
    """单次定时任务"""
    if args is None:
        args = []
    if kwargs is None:
        kwargs = {}
    timer = Timer(delay, function, args, kwargs)
    timer.start()
    return timer

# 使用示例
def greet(name):
    print(f"Hello, {name}!")

one_shot_timer(3, greet, ["Alice"])  # 3秒后执行

2.2 循环定时任务

class RepeatingTimer:
    """可重复执行的定时器"""
    
    def __init__(self, interval, function, args=None, kwargs=None):
        self.interval = interval
        self.function = function
        self.args = args if args is not None else []
        self.kwargs = kwargs if kwargs is not None else {}
        self.timer = None
        self.is_running = False
        
    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)
        
    def start(self):
        if not self.is_running:
            self.timer = Timer(self.interval, self._run)
            self.timer.start()
            self.is_running = True
            
    def stop(self):
        if self.timer:
            self.timer.cancel()
        self.is_running = False

# 使用示例
def tick():
    print("Tick-tock")

timer = RepeatingTimer(1, tick)
timer.start()  # 每秒打印一次
time.sleep(5)
timer.stop()  # 5秒后停止

三、sched模块实现

3.1 高级调度器

import sched

def scheduler_timer():
    """使用sched模块实现定时器"""
    scheduler = sched.scheduler(time.time, time.sleep)
    
    def print_time(msg):
        print(f"调度时间:{msg} {time.ctime()}")
    
    print(f"开始时间:{time.ctime()}")
    scheduler.enter(2, 1, print_time, ("2秒后",))
    scheduler.enter(5, 1, print_time, ("5秒后",))
    scheduler.run()

# 使用示例
scheduler_timer()

3.2 带优先级的定时任务

def priority_scheduler():
    """带优先级的定时任务调度"""
    s = sched.scheduler(time.time, time.sleep)
    
    def print_msg(priority, msg):
        print(f"[优先级{priority}] {msg}")
    
    # 相同时间,优先级高的先执行
    s.enter(3, 2, print_msg, (2, "优先级2的任务"))
    s.enter(3, 1, print_msg, (1, "优先级1的任务"))
    s.enterabs(time.time()+5, 3, print_msg, (3, "绝对时间5秒后"))
    s.run()

# 使用示例
priority_scheduler()

四、asyncio异步定时器

4.1 协程定时任务

import asyncio

async def async_timer():
    """异步定时器"""
    print(f"开始时间:{time.strftime('%X')}")
    await asyncio.sleep(3)
    print(f"3秒后:{time.strftime('%X')}")

# 使用示例
asyncio.run(async_timer())

4.2 周期性异步定时器

async def periodic_timer(interval, times):
    """周期性异步定时器"""
    for i in range(times):
        print(f"第{i+1}次触发 - {time.strftime('%X')}")
        await asyncio.sleep(interval)

# 使用示例
asyncio.run(periodic_timer(1, 5))  # 每秒触发,共5次

4.3 多任务并行定时器

async def multi_task_timer():
    """多任务并行定时器"""
    async def task1():
        while True:
            print("任务1执行")
            await asyncio.sleep(1)
            
    async def task2():
        while True:
            print("  任务2执行")
            await asyncio.sleep(2)
    
    await asyncio.gather(task1(), task2())

# 使用示例(运行5秒后停止)
asyncio.run(asyncio.wait_for(multi_task_timer(), 5))

五、第三方库实现

5.1 APScheduler定时任务

from apscheduler.schedulers.blocking import BlockingScheduler

def apscheduler_demo():
    """APScheduler定时任务"""
    scheduler = BlockingScheduler()
    
    def job1():
        print(f"定时任务1执行 {time.strftime('%X')}")
    
    def job2():
        print(f"  定时任务2执行 {time.strftime('%X')}")
    
    # 添加任务
    scheduler.add_job(job1, 'interval', seconds=2)
    scheduler.add_job(job2, 'interval', seconds=5)
    
    print('按 Ctrl+C 退出')
    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()

# 使用示例
apscheduler_demo()

5.2 schedule库实现

import schedule

def schedule_demo():
    """使用schedule库实现定时任务"""
    def job():
        print("定时任务执行")
    
    # 设置定时规则
    schedule.every(10).seconds.do(job)
    schedule.every().minute.at(":30").do(job)
    
    while True:
        schedule.run_pending()
        time.sleep(1)

# 使用示例(需手动停止)
# schedule_demo()

六、性能对比与选择建议

6.1 各方法对比表

方法精度线程安全适用场景复杂度
time.sleep简单阻塞任务★☆☆☆☆
threading.Timer后台定时任务★★☆☆☆
sched复杂调度任务★★★☆☆
asyncio异步IO应用★★★★☆
APScheduler企业级调度★★★★★

6.2 选择建议

graph TD
    A[定时任务需求] --> B{是否需要高精度?}
    B -->|是| C{是否异步环境?}
    B -->|否| D{是否需要后台运行?}
    C -->|是| E[asyncio]
    C -->|否| F[APScheduler]
    D -->|是| G[threading.Timer]
    D -->|否| H[time.sleep/sched]
    
    style E fill:#6f9,stroke:#333
    style F fill:#6f9,stroke:#333

6.3 最佳实践示例

def optimal_timer_solution():
    """综合最佳实践方案"""
    from concurrent.futures import ThreadPoolExecutor
    import datetime
    
    class PrecisionTimer:
        def __init__(self):
            self.executor = ThreadPoolExecutor(max_workers=3)
            self.tasks = []
            
        def add_task(self, callback, interval, repeat=False):
            def task_wrapper():
                next_time = time.time() + interval
                while True:
                    now = time.time()
                    if now >= next_time:
                        try:
                            callback()
                        except Exception as e:
                            print(f"定时任务出错: {e}")
                        
                        if not repeat:
                            break
                        next_time += interval
                    time.sleep(max(0, next_time - now - 0.1))  # 降低CPU占用
            
            future = self.executor.submit(task_wrapper)
            self.tasks.append(future)
            return future
        
        def stop_all(self):
            for task in self.tasks:
                task.cancel()
            self.executor.shutdown(wait=False)
    
    # 使用示例
    timer = PrecisionTimer()
    
    def job1():
        print(f"任务1 - {datetime.datetime.now()}")
    
    def job2():
        print(f"  任务2 - {datetime.datetime.now()}")
    
    timer.add_task(job1, 1, True)  # 每秒执行
    timer.add_task(job2, 3, True)  # 每3秒执行
    
    time.sleep(10)
    timer.stop_all()

# 运行示例
optimal_timer_solution()

七、常见问题解决方案

7.1 定时器不准确问题

def high_precision_timer(interval, callback):
    """高精度定时器实现"""
    next_time = time.time() + interval
    while True:
        current_time = time.time()
        if current_time >= next_time:
            callback()
            next_time += interval
        # 动态调整sleep时间提高精度
        sleep_time = max(0, next_time - current_time - 0.001)
        time.sleep(sleep_time)

# 使用示例(需手动停止)
# high_precision_timer(1, lambda: print(time.time()))

7.2 多线程定时器冲突

from threading import Lock

class ThreadSafeTimer:
    """线程安全定时器"""
    
    def __init__(self):
        self.lock = Lock()
        self.timers = []
        
    def add_timer(self, interval, callback):
        with self.lock:
            timer = Timer(interval, self._wrapper, [callback])
            self.timers.append(timer)
            timer.start()
            
    def _wrapper(self, callback):
        with self.lock:
            try:
                callback()
            except Exception as e:
                print(f"定时任务出错: {e}")
                
    def stop_all(self):
        with self.lock:
            for timer in self.timers:
                timer.cancel()
            self.timers.clear()

# 使用示例
safe_timer = ThreadSafeTimer()
safe_timer.add_timer(1, lambda: print("安全定时器"))
time.sleep(5)
safe_timer.stop_all()

7.3 定时任务持久化方案

import pickle
import os

class PersistentScheduler:
    """可持久化的定时任务调度器"""
    
    def __init__(self, storage_file="scheduler_state.pkl"):
        self.storage_file = storage_file
        self.scheduler = BlockingScheduler()
        self._load_state()
        
    def _load_state(self):
        if os.path.exists(self.storage_file):
            with open(self.storage_file, 'rb') as f:
                jobs = pickle.load(f)
                for job in jobs:
                    self.scheduler.add_job(**job)
    
    def add_job(self, func, trigger, **kwargs):
        self.scheduler.add_job(func, trigger, **kwargs)
        self._save_state()
        
    def _save_state(self):
        jobs = []
        for job in self.scheduler.get_jobs():
            jobs.append({
                'func': job.func,
                'trigger': job.trigger,
                'kwargs': job.kwargs
            })
        with open(self.storage_file, 'wb') as f:
            pickle.dump(jobs, f)
            
    def start(self):
        try:
            self.scheduler.start()
        except (KeyboardInterrupt, SystemExit):
            self._save_state()
            self.scheduler.shutdown()

# 使用示例
def persistent_job():
    print("持久化定时任务执行")

scheduler = PersistentScheduler()
scheduler.add_job(persistent_job, 'interval', seconds=3)
scheduler.start()  # 按Ctrl+C停止后会保存状态

核心建议​:

  1. 简单场景优先使用threading.Timer
  2. 异步环境选择asyncio定时器
  3. 企业级应用推荐APScheduler
  4. 需要高精度时采用动态sleep调整策略
  5. 多线程环境务必考虑线程安全问题

注意事项​:

  1. 长时间运行的定时任务需要处理异常
  2. 注意定时器可能存在的内存泄漏问题
  3. 分布式环境需使用专门的调度系统
  4. 生产环境建议添加日志记录和监控
© 版权声明
THE END
喜欢就点个赞,支持一下吧!
点赞68 分享
评论 抢沙发
头像
欢迎您留下评论!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容