文章标题:
Python多线程编程全方位研习全方案
文章内容
文章目录
- Python多线程编程全方位学习指引
- 一、多线程基础认知
- 1.1 线程与进程的差别
- 1.2 全局解释器锁的情况
- 二、线程的创建与治理
- 2.1 两种线程创建途径
- 方式1:函数式创建
- 方式2:类继承式创建
- 2.2 线程的经常使用办法与属性
- 三、线程的同步机制
- 3.1 锁(Lock)
- 3.2 可重入锁(RLock)
- 3.3 信号量(Semaphore)
- 3.4 事件(Event)
- 3.5 条件变量(Condition)
- 3.6 屏障(Barrier)
- 四、线程间的通信
- 4.1 应用队列(Queue)
- 4.2 线程局部数据
- 五、线程池与高等用法
- 5.1 应用ThreadPoolExecutor
- 5.2 定时器线程
- 5.3 线程优先级队列
- 六、多线程编程最好理论
- 6.1 躲避罕见陷阱
- 6.2 机能优化技能
- 6.3 调试与监控
- 七、多线程运用场景
- 7.1 合适多线程的场景
- 7.2 不合适多线程的场景
- 八、进修资本保举
Python多线程编程全方位学习指引
一、多线程基础认知
1.1 线程与进程的差别
特征 | 进程 | 线程 |
---|---|---|
资本分派 | 具有独立内存空间 | 同享进程内存 |
创建开销 | 较大 | 较小 |
通信方法 | 经过管道、套接字等 | 经过同享变量通信 |
上下文切换 | 开销大 | 开销小 |
平安性 | 较高(互相隔离) | 较低(需同步机制) |
Python中的限制 | 无GIL限制 | 受GIL限制 |
1.2 全局解释器锁的情况
- Python解释器的设想特征
- 统一时候仅允许一个线程履行Python字节码
- 对I/O密集型义务影响小,对CPU密集型义务影响大
- 处理计划:应用多进程或C扩大绕过GIL
二、线程的创建与治理
2.1 线程创建的两种途径
方式1:函数式创建
import threading
import time
def print_numbers_func():
for i in range(5):
time.sleep(0.5)
print(f"数字: {i}")
def print_letters_func():
for char in 'ABCDE':
time.sleep(0.7)
print(f"字母: {char}")
# 创建线程
thread_one = threading.Thread(target=print_numbers_func)
thread_two = threading.Thread(target=print_letters_func)
# 启动线程
thread_one.start()
thread_two.start()
# 期待线程停止
thread_one.join()
thread_two.join()
print("一切线程履行终了!")
方式2:类继承式创建
class MyCustomThread(threading.Thread):
def __init__(self, thread_name, delay_time):
super().__init__()
self.thread_name = thread_name
self.delay_time = delay_time
def run(self):
print(f"线程 {self.thread_name} 起头履行")
for i in range(5):
time.sleep(self.delay_time)
print(f"{self.thread_name}: {i}")
print(f"线程 {self.thread_name} 履行终了")
# 创建并启动线程
thread_instances_list = [
MyCustomThread("Alpha", 0.3),
MyCustomThread("Beta", 0.4),
MyCustomThread("Gamma", 0.5)
]
for t in thread_instances_list:
t.start()
for t in thread_instances_list:
t.join()
print("一切自定义线程履行终了")
2.2 线程经常使用办法与属性
办法/属性 | 描写 | 示例 |
---|---|---|
start() |
启动线程 | t.start() |
run() |
线程履行的主体办法(可重写) | 自定义线程类时笼盖 |
join(timeout) |
期待线程停止 | t.join() |
is_alive() |
检查线程是不是处于运转状况 | if t.is_alive(): ... |
name |
获取/设置线程称号 | t.name = "Worker-1" |
ident |
线程的标识符(整数情势) | print(t.ident) |
daemon |
守护线程的标记(主线程停止时主动停止) | t.daemon = True |
isDaemon() |
检查是不是为守护线程 | t.isDaemon() |
setDaemon(bool) |
设置守护线程状况 | t.setDaemon(True) |
native_id |
内核级线程ID(Python 3.8+) | print(t.native_id) |
三、线程的同步机制
3.1 锁(Lock)
import threading
counter_variable = 0
lock_object = threading.Lock()
def increase_counter_func():
global counter_variable
for _ in range(100000):
with lock_object: # 主动获取和释放锁
counter_variable += 1
thread_list = []
for i in range(5):
t = threading.Thread(target=increase_counter_func)
thread_list.append(t)
t.start()
for t in thread_list:
t.join()
print(f"终究的计数器值: {counter_variable} (预期: 500000)")
3.2 可重入锁(RLock)
reentrant_lock_obj = threading.RLock()
def recursive_function_call(n):
with reentrant_lock_obj:
if n > 0:
print(f"层级 {n}")
recursive_function_call(n-1)
t_one = threading.Thread(target=recursive_function_call, args=(3,))
t_two = threading.Thread(target=recursive_function_call, args=(3,))
t_one.start()
t_two.start()
t_one.join()
t_two.join()
3.3 信号量(Semaphore)
# 限制同时拜访资本的线程数
semaphore_obj = threading.Semaphore(3) # 最多3个线程同时拜访
def access_resource_func(thread_id):
with semaphore_obj:
print(f"线程 {thread_id} 正在拜访资本")
time.sleep(2)
print(f"线程 {thread_id} 释放资本")
thread_instances_list = []
for i in range(10):
t = threading.Thread(target=access_resource_func, args=(i,))
thread_instances_list.append(t)
t.start()
for t in thread_instances_list:
t.join()
3.4 事件(Event)
# 线程间通信机制
event_obj = threading.Event()
def wait_for_event_func():
print("期待者: 正在期待事件...")
event_obj.wait() # 停止直到事件被设置
print("期待者: 收到事件!")
def set_event_func():
time.sleep(2)
print("设置者: 设置事件")
event_obj.set() # 叫醒一切期待的线程
t_one = threading.Thread(target=wait_for_event_func)
t_two = threading.Thread(target=set_event_func)
t_one.start()
t_two.start()
t_one.join()
t_two.join()
3.5 条件变量(Condition)
# 生产者-消费者形式
condition_obj = threading.Condition()
buffer_data_list = []
BUFFER_CAPACITY = 5
def data_producer_func():
global buffer_data_list
for i in range(10):
with condition_obj:
# 检查缓冲区能否已满
while len(buffer_data_list) >= BUFFER_CAPACITY:
print("缓冲区已满,生产者期待")
condition_obj.wait()
item = f"Item-{i}"
buffer_data_list.append(item)
print(f"生产了: {item}")
# 告诉消费者
condition_obj.notify_all()
time.sleep(0.1)
def data_consumer_func():
global buffer_data_list
for _ in range(10):
with condition_obj:
# 检查缓冲区能否为空
while len(buffer_data_list) == 0:
print("缓冲区为空,消费者期待")
condition_obj.wait()
item = buffer_data_list.pop(0)
print(f"消耗了: {item}")
# 告诉生产者
condition_obj.notify_all()
time.sleep(0.2)
producer_threads_list = [threading.Thread(target=data_producer_func) for _ in range(2)]
consumer_threads_list = [threading.Thread(target=data_consumer_func) for _ in range(3)]
for t in producer_threads_list + consumer_threads_list:
t.start()
for t in producer_threads_list + consumer_threads_list:
t.join()
3.6 屏障(Barrier)
# 同步多个线程的履行点
barrier_obj = threading.Barrier(3)
def work_process_func(name):
print(f"{name} 第一阶段工作")
time.sleep(random.uniform(0.5, 1.5))
print(f"{name} 到达屏障")
barrier_obj.wait() # 期待一切线程到达
print(f"{name} 第二阶段工作")
time.sleep(random.uniform(0.5, 1.5))
print(f"{name} 履行终了")
thread_instances_list = [
threading.Thread(target=work_process_func, args=("Alice",)),
threading.Thread(target=work_process_func, args=("Bob",)),
threading.Thread(target=work_process_func, args=("Charlie",))
]
for t in thread_instances_list:
t.start()
for t in thread_instances_list:
t.join()
四、线程间通信
4.1 应用队列
from queue import Queue
import random
# 线程平安的队列
task_queue_obj = Queue()
result_queue_obj = Queue()
def create_task_func():
for i in range(10):
task = f"Task-{i}"
task_queue_obj.put(task)
print(f"创建了: {task}")
time.sleep(random.uniform(0.1, 0.3))
task_queue_obj.put(None) # 发送停止信号
def handle_task_func():
while True:
task = task_queue_obj.get()
if task is None: # 收到停止信号
task_queue_obj.put(None) # 通报给下一个消费者
break
# 处理任务
time.sleep(random.uniform(0.2, 0.5))
result = f"义务 {task} 的成果"
result_queue_obj.put(result)
print(f"处理了: {task} -> {result}")
task_queue_obj.task_done() # 标记任务完成
# 创建生产者线程
producer_thread = threading.Thread(target=create_task_func)
# 创建消费者线程
consumer_threads = [threading.Thread(target=handle_task_func) for _ in range(3)]
# 启动一切线程
producer_thread.start()
for t in consumer_threads:
t.start()
# 期待生产者完成
producer_thread.join()
# 期待一切任务完成
task_queue_obj.join()
# 处理成果
print("\n成果:")
while not result_queue_obj.empty():
print(result_queue_obj.get())
4.2 线程局部数据
# 每个线程有独立的数据副本
thread_local_data_obj = threading.local()
def show_local_data_func():
try:
value = thread_local_data_obj.value
except AttributeError:
print("此线程未设置值")
else:
print(f"线程的值: {value}")
def set_local_data_func(value):
thread_local_data_obj.value = value
show_local_data_func()
# 创建线程
thread_instances_list = []
for i in range(3):
t = threading.Thread(target=set_local_data_func, args=(i,))
thread_instances_list.append(t)
t.start()
for t in thread_instances_list:
t.join()
五、线程池与高等用法
5.1 应用ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
def get_url_content_func(url):
print(f"正在下载 {url}")
response = requests.get(url, timeout=5)
return {
'url': url,
'status': response.status_code,
'length': len(response.text),
'content': response.text[:100] # 取前100个字符
}
urls_list = [
'https://www.python.org',
'https://www.google.com',
'https://www.github.com',
'https://www.wikipedia.org',
'https://www.stackoverflow.com'
]
# 应用线程池停止治理
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交义务
future_to_url = {executor.submit(get_url_content_func, url): url for url in urls_list}
# 处理成果
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
print(f"{url} 下载完成: 状况={data['status']}, 长度={data['length']}")
# print(f"预览: {data['content']}")
except Exception as e:
print(f"{url} 发生了异常: {e}")
5.2 定时器线程
def delayed_operation_func(message):
print(f"延迟消息: {message}")
# 5秒后履行
timer = threading.Timer(5.0, delayed_operation_func, args=("5秒后你好!",))
timer.start()
print("定时器已启动,期待中...")
5.3 线程优先级队列
import queue
# 创建优先级队列
priority_queue_obj = queue.PriorityQueue()
def work_thread_func():
while True:
priority, task_item = priority_queue_obj.get()
if task_item is None:
break
print(f"处理义务: {task_item} (优先级: {priority})")
time.sleep(0.5)
priority_queue_obj.task_done()
# 启动工作线程
worker_thread_instance = threading.Thread(target=work_thread_func)
worker_thread_instance.start()
# 添加义务(优先级,义务)
priority_queue_obj.put((3, "低优先级义务"))
priority_queue_obj.put((1, "高优先级义务"))
priority_queue_obj.put((2, "中等优先级义务"))
priority_queue_obj.put((1, "另一个高优先级义务"))
# 期待队列处理完成
priority_queue_obj.join()
# 发送停止信号
priority_queue_obj.put((0, None))
worker_thread_instance.join()
六、多线程编程最好理论
6.1 躲避罕见陷阱
- 竞争条件 :一直应用同步机制庇护同享资本
- 死锁 :
- 躲避嵌套锁
- 按牢固次序获取锁
- 应用带超时的锁
- 线程饥饿 :公道设置线程优先级
- 资本泄漏 :确保释放一切资本(文件、收集毗连等)
6.2 机能优化技能
- 线程池 :重用线程以削减创建开销
- 批量处理 :削减锁的获取/释放次数
- 无锁数据布局 :如应用
queue.Queue
- 局部存储 :削减同享状况
- 异步I/O :连系asyncio进步I/O密集型机能
6.3 调试与监控
```python
import threading
import time
def worker_thread_func():
print(f"{threading.current_thread().name} 起头履行")
time.sleep(2)
print(f"{threading.current_thread().name} 竣事履行")
列出一切活动线程
def check_threads_func():
while True:
print("\n=== 活动线程 ===")
for thread in threading.enumerate():
print(f"{thread.name} (ID: {thread.ident}, 存活: {thread.is_alive()}")
time.sleep(1)
文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/12741.html