Python多线程编程深度研习全攻略

文章标题:

Python多线程编程全方位研习全方案

文章内容

文章目录

  • Python多线程编程全方位学习指引
    • 一、多线程基础认知
    • 1.1 线程与进程的差别
    • 1.2 全局解释器锁的情况
    • 二、线程的创建与治理
    • 2.1 两种线程创建途径
      • 方式1:函数式创建
      • 方式2:类继承式创建
    • 2.2 线程的经常使用办法与属性
    • 三、线程的同步机制
    • 3.1 锁(Lock)
    • 3.2 可重入锁(RLock)
    • 3.3 信号量(Semaphore)
    • 3.4 事件(Event)
    • 3.5 条件变量(Condition)
    • 3.6 屏障(Barrier)
    • 四、线程间的通信
    • 4.1 应用队列(Queue)
    • 4.2 线程局部数据
    • 五、线程池与高等用法
    • 5.1 应用ThreadPoolExecutor
    • 5.2 定时器线程
    • 5.3 线程优先级队列
    • 六、多线程编程最好理论
    • 6.1 躲避罕见陷阱
    • 6.2 机能优化技能
    • 6.3 调试与监控
    • 七、多线程运用场景
    • 7.1 合适多线程的场景
    • 7.2 不合适多线程的场景
    • 八、进修资本保举

Python多线程编程全方位学习指引

一、多线程基础认知

1.1 线程与进程的差别

特征 进程 线程
资本分派 具有独立内存空间 同享进程内存
创建开销 较大 较小
通信方法 经过管道、套接字等 经过同享变量通信
上下文切换 开销大 开销小
平安性 较高(互相隔离) 较低(需同步机制)
Python中的限制 无GIL限制 受GIL限制

1.2 全局解释器锁的情况

  • Python解释器的设想特征
  • 统一时候仅允许一个线程履行Python字节码
  • 对I/O密集型义务影响小,对CPU密集型义务影响大
  • 处理计划:应用多进程或C扩大绕过GIL

二、线程的创建与治理

2.1 线程创建的两种途径

方式1:函数式创建
import threading
import time

def print_numbers_func():
    for i in range(5):
        time.sleep(0.5)
        print(f"数字: {i}")

def print_letters_func():
    for char in 'ABCDE':
        time.sleep(0.7)
        print(f"字母: {char}")

# 创建线程
thread_one = threading.Thread(target=print_numbers_func)
thread_two = threading.Thread(target=print_letters_func)

# 启动线程
thread_one.start()
thread_two.start()

# 期待线程停止
thread_one.join()
thread_two.join()

print("一切线程履行终了!")
方式2:类继承式创建
class MyCustomThread(threading.Thread):
    def __init__(self, thread_name, delay_time):
        super().__init__()
        self.thread_name = thread_name
        self.delay_time = delay_time

    def run(self):
        print(f"线程 {self.thread_name} 起头履行")
        for i in range(5):
            time.sleep(self.delay_time)
            print(f"{self.thread_name}: {i}")
        print(f"线程 {self.thread_name} 履行终了")

# 创建并启动线程
thread_instances_list = [
    MyCustomThread("Alpha", 0.3),
    MyCustomThread("Beta", 0.4),
    MyCustomThread("Gamma", 0.5)
]

for t in thread_instances_list:
    t.start()

for t in thread_instances_list:
    t.join()

print("一切自定义线程履行终了")

2.2 线程经常使用办法与属性

办法/属性 描写 示例
start() 启动线程 t.start()
run() 线程履行的主体办法(可重写) 自定义线程类时笼盖
join(timeout) 期待线程停止 t.join()
is_alive() 检查线程是不是处于运转状况 if t.is_alive(): ...
name 获取/设置线程称号 t.name = "Worker-1"
ident 线程的标识符(整数情势) print(t.ident)
daemon 守护线程的标记(主线程停止时主动停止) t.daemon = True
isDaemon() 检查是不是为守护线程 t.isDaemon()
setDaemon(bool) 设置守护线程状况 t.setDaemon(True)
native_id 内核级线程ID(Python 3.8+) print(t.native_id)

三、线程的同步机制

3.1 锁(Lock)

import threading

counter_variable = 0
lock_object = threading.Lock()

def increase_counter_func():
    global counter_variable
    for _ in range(100000):
        with lock_object:  # 主动获取和释放锁
            counter_variable += 1

thread_list = []
for i in range(5):
    t = threading.Thread(target=increase_counter_func)
    thread_list.append(t)
    t.start()

for t in thread_list:
    t.join()

print(f"终究的计数器值: {counter_variable} (预期: 500000)")

3.2 可重入锁(RLock)

reentrant_lock_obj = threading.RLock()

def recursive_function_call(n):
    with reentrant_lock_obj:
        if n > 0:
            print(f"层级 {n}")
            recursive_function_call(n-1)

t_one = threading.Thread(target=recursive_function_call, args=(3,))
t_two = threading.Thread(target=recursive_function_call, args=(3,))

t_one.start()
t_two.start()

t_one.join()
t_two.join()

3.3 信号量(Semaphore)

# 限制同时拜访资本的线程数
semaphore_obj = threading.Semaphore(3)  # 最多3个线程同时拜访

def access_resource_func(thread_id):
    with semaphore_obj:
        print(f"线程 {thread_id} 正在拜访资本")
        time.sleep(2)
        print(f"线程 {thread_id} 释放资本")

thread_instances_list = []
for i in range(10):
    t = threading.Thread(target=access_resource_func, args=(i,))
    thread_instances_list.append(t)
    t.start()

for t in thread_instances_list:
    t.join()

3.4 事件(Event)

# 线程间通信机制
event_obj = threading.Event()

def wait_for_event_func():
    print("期待者: 正在期待事件...")
    event_obj.wait()  # 停止直到事件被设置
    print("期待者: 收到事件!")

def set_event_func():
    time.sleep(2)
    print("设置者: 设置事件")
    event_obj.set()  # 叫醒一切期待的线程

t_one = threading.Thread(target=wait_for_event_func)
t_two = threading.Thread(target=set_event_func)

t_one.start()
t_two.start()

t_one.join()
t_two.join()

3.5 条件变量(Condition)

# 生产者-消费者形式
condition_obj = threading.Condition()
buffer_data_list = []
BUFFER_CAPACITY = 5

def data_producer_func():
    global buffer_data_list
    for i in range(10):
        with condition_obj:
            # 检查缓冲区能否已满
            while len(buffer_data_list) >= BUFFER_CAPACITY:
                print("缓冲区已满,生产者期待")
                condition_obj.wait()

            item = f"Item-{i}"
            buffer_data_list.append(item)
            print(f"生产了: {item}")

            # 告诉消费者
            condition_obj.notify_all()
        time.sleep(0.1)

def data_consumer_func():
    global buffer_data_list
    for _ in range(10):
        with condition_obj:
            # 检查缓冲区能否为空
            while len(buffer_data_list) == 0:
                print("缓冲区为空,消费者期待")
                condition_obj.wait()

            item = buffer_data_list.pop(0)
            print(f"消耗了: {item}")

            # 告诉生产者
            condition_obj.notify_all()
        time.sleep(0.2)

producer_threads_list = [threading.Thread(target=data_producer_func) for _ in range(2)]
consumer_threads_list = [threading.Thread(target=data_consumer_func) for _ in range(3)]

for t in producer_threads_list + consumer_threads_list:
    t.start()

for t in producer_threads_list + consumer_threads_list:
    t.join()

3.6 屏障(Barrier)

# 同步多个线程的履行点
barrier_obj = threading.Barrier(3)

def work_process_func(name):
    print(f"{name} 第一阶段工作")
    time.sleep(random.uniform(0.5, 1.5))
    print(f"{name} 到达屏障")
    barrier_obj.wait()  # 期待一切线程到达

    print(f"{name} 第二阶段工作")
    time.sleep(random.uniform(0.5, 1.5))
    print(f"{name} 履行终了")

thread_instances_list = [
    threading.Thread(target=work_process_func, args=("Alice",)),
    threading.Thread(target=work_process_func, args=("Bob",)),
    threading.Thread(target=work_process_func, args=("Charlie",))
]

for t in thread_instances_list:
    t.start()

for t in thread_instances_list:
    t.join()

四、线程间通信

4.1 应用队列

from queue import Queue
import random

# 线程平安的队列
task_queue_obj = Queue()
result_queue_obj = Queue()

def create_task_func():
    for i in range(10):
        task = f"Task-{i}"
        task_queue_obj.put(task)
        print(f"创建了: {task}")
        time.sleep(random.uniform(0.1, 0.3))
    task_queue_obj.put(None)  # 发送停止信号

def handle_task_func():
    while True:
        task = task_queue_obj.get()
        if task is None:  # 收到停止信号
            task_queue_obj.put(None)  # 通报给下一个消费者
            break

        # 处理任务
        time.sleep(random.uniform(0.2, 0.5))
        result = f"义务 {task} 的成果"
        result_queue_obj.put(result)
        print(f"处理了: {task} -> {result}")
        task_queue_obj.task_done()  # 标记任务完成

# 创建生产者线程
producer_thread = threading.Thread(target=create_task_func)

# 创建消费者线程
consumer_threads = [threading.Thread(target=handle_task_func) for _ in range(3)]

# 启动一切线程
producer_thread.start()
for t in consumer_threads:
    t.start()

# 期待生产者完成
producer_thread.join()

# 期待一切任务完成
task_queue_obj.join()

# 处理成果
print("\n成果:")
while not result_queue_obj.empty():
    print(result_queue_obj.get())

4.2 线程局部数据

# 每个线程有独立的数据副本
thread_local_data_obj = threading.local()

def show_local_data_func():
    try:
        value = thread_local_data_obj.value
    except AttributeError:
        print("此线程未设置值")
    else:
        print(f"线程的值: {value}")

def set_local_data_func(value):
    thread_local_data_obj.value = value
    show_local_data_func()

# 创建线程
thread_instances_list = []
for i in range(3):
    t = threading.Thread(target=set_local_data_func, args=(i,))
    thread_instances_list.append(t)
    t.start()

for t in thread_instances_list:
    t.join()

五、线程池与高等用法

5.1 应用ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

def get_url_content_func(url):
    print(f"正在下载 {url}")
    response = requests.get(url, timeout=5)
    return {
        'url': url,
        'status': response.status_code,
        'length': len(response.text),
        'content': response.text[:100]  # 取前100个字符
    }

urls_list = [
    'https://www.python.org',
    'https://www.google.com',
    'https://www.github.com',
    'https://www.wikipedia.org',
    'https://www.stackoverflow.com'
]

# 应用线程池停止治理
with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交义务
    future_to_url = {executor.submit(get_url_content_func, url): url for url in urls_list}

    # 处理成果
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
            print(f"{url} 下载完成: 状况={data['status']}, 长度={data['length']}")
            # print(f"预览: {data['content']}")
        except Exception as e:
            print(f"{url} 发生了异常: {e}")

5.2 定时器线程

def delayed_operation_func(message):
    print(f"延迟消息: {message}")

# 5秒后履行
timer = threading.Timer(5.0, delayed_operation_func, args=("5秒后你好!",))
timer.start()

print("定时器已启动,期待中...")

5.3 线程优先级队列

import queue

# 创建优先级队列
priority_queue_obj = queue.PriorityQueue()

def work_thread_func():
    while True:
        priority, task_item = priority_queue_obj.get()
        if task_item is None:
            break
        print(f"处理义务: {task_item} (优先级: {priority})")
        time.sleep(0.5)
        priority_queue_obj.task_done()

# 启动工作线程
worker_thread_instance = threading.Thread(target=work_thread_func)
worker_thread_instance.start()

# 添加义务(优先级,义务)
priority_queue_obj.put((3, "低优先级义务"))
priority_queue_obj.put((1, "高优先级义务"))
priority_queue_obj.put((2, "中等优先级义务"))
priority_queue_obj.put((1, "另一个高优先级义务"))

# 期待队列处理完成
priority_queue_obj.join()

# 发送停止信号
priority_queue_obj.put((0, None))
worker_thread_instance.join()

六、多线程编程最好理论

6.1 躲避罕见陷阱

  1. 竞争条件 :一直应用同步机制庇护同享资本
  2. 死锁
    • 躲避嵌套锁
    • 按牢固次序获取锁
    • 应用带超时的锁
  3. 线程饥饿 :公道设置线程优先级
  4. 资本泄漏 :确保释放一切资本(文件、收集毗连等)

6.2 机能优化技能

  1. 线程池 :重用线程以削减创建开销
  2. 批量处理 :削减锁的获取/释放次数
  3. 无锁数据布局 :如应用queue.Queue
  4. 局部存储 :削减同享状况
  5. 异步I/O :连系asyncio进步I/O密集型机能

6.3 调试与监控

```python
import threading
import time

def worker_thread_func():
print(f"{threading.current_thread().name} 起头履行")
time.sleep(2)
print(f"{threading.current_thread().name} 竣事履行")

列出一切活动线程

def check_threads_func():
while True:
print("\n=== 活动线程 ===")
for thread in threading.enumerate():
print(f"{thread.name} (ID: {thread.ident}, 存活: {thread.is_alive()}")
time.sleep(1)

文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/12741.html

(0)
LomuLomu
上一篇 2025 年 7 月 4 日
下一篇 2025 年 7 月 4 日

相关推荐

  • 2025年最新DataGrip激活码与永久破解教程(支持2099年)

    JetBrains全家桶破解方法详解(含DataGrip/PyCharm/IDEA) 今天为大家带来一个超级实用的DataGrip破解方案,保证可以永久激活到2099年!先看看成功效果图: 此方法不仅适用于DataGrip最新版本,还兼容JetBrains全系列开发工具,包括PyCharm、IDEA等。无论您使用的是Windows、MacOS还是Linux系…

    DataGrip激活码 4天前
    3600
  • 架构设计对比:MVC、MVP、MVVM与DDD在多语言源码中的展现

    架构设计对比:MVC、MVP、MVVM与DDD在多语言代码里的体现 MVC分层架构设计概览 模型-视图-控制器(Model-View-Controller,简称MVC)是一种经典的软件架构设计,通过分层来实现各个部分的解耦,让系统结构清晰且易于维护,具有不错的可扩展性。MVC适用于需要明确区分用户界面、业务逻辑和数据管理的应用场景。随着MVC的发展,衍生出了…

    2025 年 6 月 18 日
    15300
  • 2025年最新IDEA激活码及永久破解教程(支持JetBrains全家桶)

    前言 本教程适用于IntelliJ IDEA、PyCharm、DataGrip、GoLand等JetBrains全系列开发工具,提供最完整的破解方案。先展示最新IDEA版本破解成功的效果图,可以看到已成功激活至2099年! 下面将详细介绍如何永久激活IDEA,该方法同样适用于旧版本,且兼容所有操作系统。 准备工作 下载IDEA安装包 如未安装,请先访问官网下…

    2025 年 5 月 8 日
    1.5K00
  • 2024 GoLand最新激活码,GoLand永久免费激活码2025-02-14 更新

    GoLand 2024最新激活码 以下是最新的GoLand激活码,更新时间:2025-02-14 🔑 激活码使用说明 1️⃣ 复制下方激活码 2️⃣ 打开 GoLand 软件 3️⃣ 在菜单栏中选择 Help -> Register 4️⃣ 选择 Activation Code 5️⃣ 粘贴激活码,点击 Activate ⚠️ 必看!必看! 🔥 获取最新激活…

    2025 年 2 月 14 日
    69600
  • DataGrip注册码分享 – 2025年最新破解方法(永久激活至2099年)

    DataGrip介绍 DataGrip是JetBrains公司出品的一款跨平台数据库工具,专为数据库开发人员和管理员设计,支持多种主流数据库系统,如MySQL、PostgreSQL、MongoDB、Oracle和SQL Server等。它拥有智能的SQL编辑器、高效的导航工具和强大的数据库对象管理功能,能够显著提高数据库操作效率。本文将详细介绍如何获取Dat…

    DataGrip破解教程 2025 年 4 月 27 日
    27900

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信