Python 多线程通信的常用方法汇总

在 Python 中,多线程编程常用于 I/O 密集型任务(如网络请求、文件读写),但由于 ​GIL(全局解释器锁)​​ 的存在,多线程并不适合 CPU 密集型任务(此时更适合多进程)。多线程通信的核心是线程间安全传递数据或协调执行顺序,以下是常用的通信方法及适用场景:

图片[1]_Python 多线程通信的常用方法汇总_知途无界

一、共享变量(需配合线程同步机制)​

多个线程共享同一进程的内存空间,因此可直接访问全局变量或对象属性。但非原子操作​(如 i += 1)会导致竞态条件(Race Condition),需通过同步机制保证线程安全。

1. 基础共享变量 + 锁(threading.Lock

  • 原理​:通过互斥锁(Lock)确保同一时间只有一个线程修改共享变量。
  • 示例​: import threading count = 0 lock = threading.Lock() # 创建锁 def increment(): global count for _ in range(100000): with lock: # 自动获取和释放锁(推荐) count += 1 threads = [threading.Thread(target=increment) for _ in range(2)] for t in threads: t.start() for t in threads: t.join() print(f"Final count: {count}") # 输出 200000(无锁则可能小于此值)

2. 信号量(threading.Semaphore

  • 原理​:控制同时访问某一资源的线程数量(类似“许可证”)。
  • 适用场景​:限制并发连接数(如数据库连接池)。
  • 示例​: semaphore = threading.Semaphore(2) # 最多2个线程同时执行 def task(): with semaphore: print(f"{threading.current_thread().name} is running") # 模拟耗时操作 import time time.sleep(1) threads = [threading.Thread(target=task) for _ in range(4)] for t in threads: t.start()

3. 事件(threading.Event

  • 原理​:通过一个布尔标志(is_set())控制线程阻塞/唤醒,用于线程间“通知”。
  • 适用场景​:主线程通知子线程启动/停止(如“等待某个条件触发”)。
  • 示例​: event = threading.Event() def waiter(): print("Waiter is waiting...") event.wait() # 阻塞直到 event.set() 被调用 print("Waiter is notified!") def setter(): import time time.sleep(2) event.set() # 触发事件,唤醒所有等待线程 t1 = threading.Thread(target=waiter) t2 = threading.Thread(target=setter) t1.start() t2.start()

4. 条件变量(threading.Condition

  • 原理​:结合锁和条件判断,允许线程在某个条件不满足时阻塞,条件满足时被唤醒(类似“生产者-消费者”模型)。
  • 适用场景​:需要线程间复杂协作(如生产者生产数据后通知消费者)。
  • 示例​: condition = threading.Condition() data = [] def producer(): with condition: data.append("new_data") print("Producer produced data") condition.notify() # 唤醒一个等待的消费者 def consumer(): with condition: while not data: # 必须用循环检查条件(避免虚假唤醒) condition.wait() # 释放锁并阻塞 print(f"Consumer consumed: {data.pop(0)}") t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer) t2.start() # 先启动消费者(会阻塞等待数据) t1.start()

二、队列(queue.Queue,推荐)​

queue.Queue 是线程安全的队列,内置锁机制,无需手动加锁,是多线程通信的首选方案​(尤其适用于生产者-消费者模型)。

核心特性:

  • 线程安全​:内部已实现锁,无需额外同步。
  • 阻塞控制​:支持设置超时(put(timeout)/get(timeout))、队列满/空时的阻塞行为。
  • 类型​:Queue(无限长度)、LifoQueue(栈)、PriorityQueue(优先级队列)。

示例(生产者-消费者模型):

import threading
import queue
import time

q = queue.Queue(maxsize=5)  # 最大容量5

def producer():
    for i in range(10):
        q.put(i)  # 队列满时阻塞(默认)
        print(f"Produced {i}")
        time.sleep(0.1)

def consumer():
    while True:
        item = q.get()  # 队列空时阻塞(默认)
        if item is None:  # 终止信号
            break
        print(f"Consumed {item}")
        q.task_done()  # 标记任务完成(配合 join() 使用)

# 启动消费者
t_consumer = threading.Thread(target=consumer, daemon=True)
t_consumer.start()

# 启动生产者
t_producer = threading.Thread(target=producer)
t_producer.start()
t_producer.join()

# 发送终止信号(放入与消费者数量相同的 None)
q.put(None)
t_consumer.join()

三、管道(multiprocessing.Pipe,跨进程也可用)​

Pipe 创建一个双向管道,返回两个连接对象(conn1, conn2),线程可通过 send()recv() 通信。​注意​:管道本身非线程安全,需配合锁使用。

示例:

import threading
from multiprocessing import Pipe

conn1, conn2 = Pipe()  # 创建管道
lock = threading.Lock()

def sender():
    for i in range(5):
        with lock:
            conn1.send(f"Message {i}")
        print(f"Sent: Message {i}")

def receiver():
    while True:
        with lock:
            if conn2.poll():  # 检查是否有数据
                msg = conn2.recv()
                print(f"Received: {msg}")
                if msg == "Message 4":  # 终止条件
                    break

t_send = threading.Thread(target=sender)
t_recv = threading.Thread(target=receiver)
t_send.start()
t_recv.start()
t_send.join()
t_recv.join()

四、线程本地存储(threading.local)​

threading.local 为每个线程创建独立的变量副本,避免共享变量的竞态条件。​适用场景​:每个线程需要独立的状态(如用户会话)。

示例:

import threading

local_data = threading.local()  # 线程本地存储

def show_data():
    if hasattr(local_data, "value"):
        print(f"{threading.current_thread().name}: {local_data.value}")
    else:
        print(f"{threading.current_thread().name}: No data")

def set_data(value):
    local_data.value = value  # 每个线程有自己的 value 副本

t1 = threading.Thread(target=lambda: (set_data("Thread 1"), show_data()), name="T1")
t2 = threading.Thread(target=lambda: (set_data("Thread 2"), show_data()), name="T2")
t1.start()
t2.start()

五、总结与选择建议

方法适用场景优点缺点
共享变量 + 锁简单数据共享灵活,直接操作内存需手动处理锁,易出错
queue.Queue生产者-消费者模型、任务分发线程安全,内置阻塞控制,推荐使用不适合高频小数据(队列开销)
threading.Event线程间简单通知(如启动/停止)轻量,适合“一次性”触发仅传递布尔状态,无法传数据
threading.Condition复杂协作(如条件依赖的生产消费)支持条件判断,灵活性高逻辑复杂,易死锁
threading.local线程独立状态(如会话)避免共享变量冲突无法在线程间传递数据

最佳实践​:优先使用 queue.Queue(线程安全、易维护);简单通知用 Event;复杂协作用 Condition;避免直接使用共享变量(除非必要且严格加锁)。

© 版权声明
THE END
喜欢就点个赞,支持一下吧!
点赞52 分享
评论 抢沙发
头像
欢迎您留下评论!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容