Python多线程编程深度研习全攻略

文章标题:

Python多线程编程全方位研习全方案

文章内容

文章目录

  • Python多线程编程全方位学习指引
    • 一、多线程基础认知
    • 1.1 线程与进程的差别
    • 1.2 全局解释器锁的情况
    • 二、线程的创建与治理
    • 2.1 两种线程创建途径
      • 方式1:函数式创建
      • 方式2:类继承式创建
    • 2.2 线程的经常使用办法与属性
    • 三、线程的同步机制
    • 3.1 锁(Lock)
    • 3.2 可重入锁(RLock)
    • 3.3 信号量(Semaphore)
    • 3.4 事件(Event)
    • 3.5 条件变量(Condition)
    • 3.6 屏障(Barrier)
    • 四、线程间的通信
    • 4.1 应用队列(Queue)
    • 4.2 线程局部数据
    • 五、线程池与高等用法
    • 5.1 应用ThreadPoolExecutor
    • 5.2 定时器线程
    • 5.3 线程优先级队列
    • 六、多线程编程最好理论
    • 6.1 躲避罕见陷阱
    • 6.2 机能优化技能
    • 6.3 调试与监控
    • 七、多线程运用场景
    • 7.1 合适多线程的场景
    • 7.2 不合适多线程的场景
    • 八、进修资本保举

Python多线程编程全方位学习指引

一、多线程基础认知

1.1 线程与进程的差别

特征 进程 线程
资本分派 具有独立内存空间 同享进程内存
创建开销 较大 较小
通信方法 经过管道、套接字等 经过同享变量通信
上下文切换 开销大 开销小
平安性 较高(互相隔离) 较低(需同步机制)
Python中的限制 无GIL限制 受GIL限制

1.2 全局解释器锁的情况

  • Python解释器的设想特征
  • 统一时候仅允许一个线程履行Python字节码
  • 对I/O密集型义务影响小,对CPU密集型义务影响大
  • 处理计划:应用多进程或C扩大绕过GIL

二、线程的创建与治理

2.1 线程创建的两种途径

方式1:函数式创建
import threading
import time

def print_numbers_func():
    for i in range(5):
        time.sleep(0.5)
        print(f"数字: {i}")

def print_letters_func():
    for char in 'ABCDE':
        time.sleep(0.7)
        print(f"字母: {char}")

# 创建线程
thread_one = threading.Thread(target=print_numbers_func)
thread_two = threading.Thread(target=print_letters_func)

# 启动线程
thread_one.start()
thread_two.start()

# 期待线程停止
thread_one.join()
thread_two.join()

print("一切线程履行终了!")
方式2:类继承式创建
class MyCustomThread(threading.Thread):
    def __init__(self, thread_name, delay_time):
        super().__init__()
        self.thread_name = thread_name
        self.delay_time = delay_time

    def run(self):
        print(f"线程 {self.thread_name} 起头履行")
        for i in range(5):
            time.sleep(self.delay_time)
            print(f"{self.thread_name}: {i}")
        print(f"线程 {self.thread_name} 履行终了")

# 创建并启动线程
thread_instances_list = [
    MyCustomThread("Alpha", 0.3),
    MyCustomThread("Beta", 0.4),
    MyCustomThread("Gamma", 0.5)
]

for t in thread_instances_list:
    t.start()

for t in thread_instances_list:
    t.join()

print("一切自定义线程履行终了")

2.2 线程经常使用办法与属性

办法/属性 描写 示例
start() 启动线程 t.start()
run() 线程履行的主体办法(可重写) 自定义线程类时笼盖
join(timeout) 期待线程停止 t.join()
is_alive() 检查线程是不是处于运转状况 if t.is_alive(): ...
name 获取/设置线程称号 t.name = "Worker-1"
ident 线程的标识符(整数情势) print(t.ident)
daemon 守护线程的标记(主线程停止时主动停止) t.daemon = True
isDaemon() 检查是不是为守护线程 t.isDaemon()
setDaemon(bool) 设置守护线程状况 t.setDaemon(True)
native_id 内核级线程ID(Python 3.8+) print(t.native_id)

三、线程的同步机制

3.1 锁(Lock)

import threading

counter_variable = 0
lock_object = threading.Lock()

def increase_counter_func():
    global counter_variable
    for _ in range(100000):
        with lock_object:  # 主动获取和释放锁
            counter_variable += 1

thread_list = []
for i in range(5):
    t = threading.Thread(target=increase_counter_func)
    thread_list.append(t)
    t.start()

for t in thread_list:
    t.join()

print(f"终究的计数器值: {counter_variable} (预期: 500000)")

3.2 可重入锁(RLock)

reentrant_lock_obj = threading.RLock()

def recursive_function_call(n):
    with reentrant_lock_obj:
        if n > 0:
            print(f"层级 {n}")
            recursive_function_call(n-1)

t_one = threading.Thread(target=recursive_function_call, args=(3,))
t_two = threading.Thread(target=recursive_function_call, args=(3,))

t_one.start()
t_two.start()

t_one.join()
t_two.join()

3.3 信号量(Semaphore)

# 限制同时拜访资本的线程数
semaphore_obj = threading.Semaphore(3)  # 最多3个线程同时拜访

def access_resource_func(thread_id):
    with semaphore_obj:
        print(f"线程 {thread_id} 正在拜访资本")
        time.sleep(2)
        print(f"线程 {thread_id} 释放资本")

thread_instances_list = []
for i in range(10):
    t = threading.Thread(target=access_resource_func, args=(i,))
    thread_instances_list.append(t)
    t.start()

for t in thread_instances_list:
    t.join()

3.4 事件(Event)

# 线程间通信机制
event_obj = threading.Event()

def wait_for_event_func():
    print("期待者: 正在期待事件...")
    event_obj.wait()  # 停止直到事件被设置
    print("期待者: 收到事件!")

def set_event_func():
    time.sleep(2)
    print("设置者: 设置事件")
    event_obj.set()  # 叫醒一切期待的线程

t_one = threading.Thread(target=wait_for_event_func)
t_two = threading.Thread(target=set_event_func)

t_one.start()
t_two.start()

t_one.join()
t_two.join()

3.5 条件变量(Condition)

# 生产者-消费者形式
condition_obj = threading.Condition()
buffer_data_list = []
BUFFER_CAPACITY = 5

def data_producer_func():
    global buffer_data_list
    for i in range(10):
        with condition_obj:
            # 检查缓冲区能否已满
            while len(buffer_data_list) >= BUFFER_CAPACITY:
                print("缓冲区已满,生产者期待")
                condition_obj.wait()

            item = f"Item-{i}"
            buffer_data_list.append(item)
            print(f"生产了: {item}")

            # 告诉消费者
            condition_obj.notify_all()
        time.sleep(0.1)

def data_consumer_func():
    global buffer_data_list
    for _ in range(10):
        with condition_obj:
            # 检查缓冲区能否为空
            while len(buffer_data_list) == 0:
                print("缓冲区为空,消费者期待")
                condition_obj.wait()

            item = buffer_data_list.pop(0)
            print(f"消耗了: {item}")

            # 告诉生产者
            condition_obj.notify_all()
        time.sleep(0.2)

producer_threads_list = [threading.Thread(target=data_producer_func) for _ in range(2)]
consumer_threads_list = [threading.Thread(target=data_consumer_func) for _ in range(3)]

for t in producer_threads_list + consumer_threads_list:
    t.start()

for t in producer_threads_list + consumer_threads_list:
    t.join()

3.6 屏障(Barrier)

# 同步多个线程的履行点
barrier_obj = threading.Barrier(3)

def work_process_func(name):
    print(f"{name} 第一阶段工作")
    time.sleep(random.uniform(0.5, 1.5))
    print(f"{name} 到达屏障")
    barrier_obj.wait()  # 期待一切线程到达

    print(f"{name} 第二阶段工作")
    time.sleep(random.uniform(0.5, 1.5))
    print(f"{name} 履行终了")

thread_instances_list = [
    threading.Thread(target=work_process_func, args=("Alice",)),
    threading.Thread(target=work_process_func, args=("Bob",)),
    threading.Thread(target=work_process_func, args=("Charlie",))
]

for t in thread_instances_list:
    t.start()

for t in thread_instances_list:
    t.join()

四、线程间通信

4.1 应用队列

from queue import Queue
import random

# 线程平安的队列
task_queue_obj = Queue()
result_queue_obj = Queue()

def create_task_func():
    for i in range(10):
        task = f"Task-{i}"
        task_queue_obj.put(task)
        print(f"创建了: {task}")
        time.sleep(random.uniform(0.1, 0.3))
    task_queue_obj.put(None)  # 发送停止信号

def handle_task_func():
    while True:
        task = task_queue_obj.get()
        if task is None:  # 收到停止信号
            task_queue_obj.put(None)  # 通报给下一个消费者
            break

        # 处理任务
        time.sleep(random.uniform(0.2, 0.5))
        result = f"义务 {task} 的成果"
        result_queue_obj.put(result)
        print(f"处理了: {task} -> {result}")
        task_queue_obj.task_done()  # 标记任务完成

# 创建生产者线程
producer_thread = threading.Thread(target=create_task_func)

# 创建消费者线程
consumer_threads = [threading.Thread(target=handle_task_func) for _ in range(3)]

# 启动一切线程
producer_thread.start()
for t in consumer_threads:
    t.start()

# 期待生产者完成
producer_thread.join()

# 期待一切任务完成
task_queue_obj.join()

# 处理成果
print("\n成果:")
while not result_queue_obj.empty():
    print(result_queue_obj.get())

4.2 线程局部数据

# 每个线程有独立的数据副本
thread_local_data_obj = threading.local()

def show_local_data_func():
    try:
        value = thread_local_data_obj.value
    except AttributeError:
        print("此线程未设置值")
    else:
        print(f"线程的值: {value}")

def set_local_data_func(value):
    thread_local_data_obj.value = value
    show_local_data_func()

# 创建线程
thread_instances_list = []
for i in range(3):
    t = threading.Thread(target=set_local_data_func, args=(i,))
    thread_instances_list.append(t)
    t.start()

for t in thread_instances_list:
    t.join()

五、线程池与高等用法

5.1 应用ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

def get_url_content_func(url):
    print(f"正在下载 {url}")
    response = requests.get(url, timeout=5)
    return {
        'url': url,
        'status': response.status_code,
        'length': len(response.text),
        'content': response.text[:100]  # 取前100个字符
    }

urls_list = [
    'https://www.python.org',
    'https://www.google.com',
    'https://www.github.com',
    'https://www.wikipedia.org',
    'https://www.stackoverflow.com'
]

# 应用线程池停止治理
with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交义务
    future_to_url = {executor.submit(get_url_content_func, url): url for url in urls_list}

    # 处理成果
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
            print(f"{url} 下载完成: 状况={data['status']}, 长度={data['length']}")
            # print(f"预览: {data['content']}")
        except Exception as e:
            print(f"{url} 发生了异常: {e}")

5.2 定时器线程

def delayed_operation_func(message):
    print(f"延迟消息: {message}")

# 5秒后履行
timer = threading.Timer(5.0, delayed_operation_func, args=("5秒后你好!",))
timer.start()

print("定时器已启动,期待中...")

5.3 线程优先级队列

import queue

# 创建优先级队列
priority_queue_obj = queue.PriorityQueue()

def work_thread_func():
    while True:
        priority, task_item = priority_queue_obj.get()
        if task_item is None:
            break
        print(f"处理义务: {task_item} (优先级: {priority})")
        time.sleep(0.5)
        priority_queue_obj.task_done()

# 启动工作线程
worker_thread_instance = threading.Thread(target=work_thread_func)
worker_thread_instance.start()

# 添加义务(优先级,义务)
priority_queue_obj.put((3, "低优先级义务"))
priority_queue_obj.put((1, "高优先级义务"))
priority_queue_obj.put((2, "中等优先级义务"))
priority_queue_obj.put((1, "另一个高优先级义务"))

# 期待队列处理完成
priority_queue_obj.join()

# 发送停止信号
priority_queue_obj.put((0, None))
worker_thread_instance.join()

六、多线程编程最好理论

6.1 躲避罕见陷阱

  1. 竞争条件 :一直应用同步机制庇护同享资本
  2. 死锁
    • 躲避嵌套锁
    • 按牢固次序获取锁
    • 应用带超时的锁
  3. 线程饥饿 :公道设置线程优先级
  4. 资本泄漏 :确保释放一切资本(文件、收集毗连等)

6.2 机能优化技能

  1. 线程池 :重用线程以削减创建开销
  2. 批量处理 :削减锁的获取/释放次数
  3. 无锁数据布局 :如应用queue.Queue
  4. 局部存储 :削减同享状况
  5. 异步I/O :连系asyncio进步I/O密集型机能

6.3 调试与监控

```python
import threading
import time

def worker_thread_func():
print(f"{threading.current_thread().name} 起头履行")
time.sleep(2)
print(f"{threading.current_thread().name} 竣事履行")

列出一切活动线程

def check_threads_func():
while True:
print("\n=== 活动线程 ===")
for thread in threading.enumerate():
print(f"{thread.name} (ID: {thread.ident}, 存活: {thread.is_alive()}")
time.sleep(1)

文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/12741.html

(0)
LomuLomu
上一篇 8小时前
下一篇 6小时前

相关推荐

  • 比想象中更复杂一点的MySQL Slow Query Log

    1. 问题概述 在分析 Slow Query Log 时,记录下的SQL语句,明明会对一张表执行全表扫描,可为什么慢日志中的 Rows_sent 、Rows_examined 和表的真实记录数也是不一样,甚至相差N多倍。还有一个细节就是上述的SQL语句,执行多次,在慢日志中记录下多条记录,记录之间Rows_sent 、Rows_examined也差别明显。 …

    未分类 2025 年 1 月 14 日
    42800
  • 🚀 2025年最新IDEA激活码 & 永久破解教程(支持JetBrains全家桶)

    💻 教程适用性 本教程适用于JetBrains全家桶,包括但不限于:- IntelliJ IDEA- PyCharm- DataGrip- GoLand- 其他JetBrains产品 先来看看最新IDEA版本破解成功的截图,已经成功激活到2099年!🎉 📥 下载IDEA安装包 如果已经安装可跳过此步骤! 访问官网下载:https://www.jetbrain…

    IDEA破解教程 2025 年 6 月 17 日
    12000
  • Spring Boot与WebSocket融合全攻略:从入门到高阶应用

    一、WebSocket基础概念与核心原理 1.1 WebSocket协议的本质内涵 WebSocket是一种在单一TCP连接上开展全双工通信的协议,它攻克了HTTP协议在实时通信方面的局限。不同于HTTP那种请求 – 响应的模式,WebSocket允许服务器主动向客户端推送数据,实现了真正意义上的双向交互。 传统HTTP通信的弊病所在: 每一次请求都得重新搭…

    未分类 2025 年 6 月 18 日
    17800
  • 2024 GoLand最新激活码,GoLand永久免费激活码2025-02-09 更新

    GoLand 2024最新激活码 以下是最新的GoLand激活码,更新时间:2025-02-09 🔑 激活码使用说明 1️⃣ 复制下方激活码 2️⃣ 打开 GoLand 软件 3️⃣ 在菜单栏中选择 Help -> Register 4️⃣ 选择 Activation Code 5️⃣ 粘贴激活码,点击 Activate ⚠️ 必看!必看! 🔥 获取最新激活…

    2025 年 2 月 9 日
    50500
  • 3dm 格式详解,javascript加载导出3dm文件示例

    3DM 格式详解 3DM 文件格式是由 Rhinoceros 3D(简称 Rhino)软件使用的原生文件格式。这种格式主要用于存储三维模型,支持多种几何类型和丰富的属性信息。以下是 3DM 文件格式的一些关键特性和结构: 文件结构 文件头 : 文件标识符 (File Signature):用于识别文件是否为 3DM 文件。 文件版本号 (File Versi…

    2025 年 1 月 10 日
    45600

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信