Rust高并发线程:原理剖析与应用实践

文章标题:

Rust高并发线程:原理剖析与实践应用

文章内容:

在这里插入图片描述

🌟🌟 欢迎来到景天科技苑🌟🌟

🎉🎉 养成好习惯,先赞后再看哦~🎉🎉

🏆 作者介绍:景天科技苑
🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。

🏆《博客》:涵盖Rust开发、Python全栈、Golang开发、云原生开发、PyQt5和Tkinter桌面开发、小程序开发、人工智能、js逆向、App逆向、网络系统安全、数据分析、Django、fastapi、flask等框架、云原生K8S、linux、shell脚本等实操经验、网站搭建、数据库等分享。

所属的专栏:Rust高性能并发编程
景天的主页:景天科技苑

在这里插入图片描述

文章目录

  • 线程
    • 1、创建线程
    • 2、Thread Builder
    • 3、当前的线程
    • 4、并发数和当前线程数
    • 5、affinity线程亲和性
    • 6、num_cpus
    • 8、sleep 和 park
    • 9、scoped thread
    • 10、ThreadLocal
    • 11、Move
    • 12、设置线程优先级
    • 13、Panic
    • 14、Rayon scoped thread
    • 15、Go 风格的启动线程

线程

线程(英文:thread)是操作系统中能够进行运算和调度的最小单元。通常情况下,线程包含在进程之中,是进程内实际运作的部分,所以程序实际运行时是以线程为单位进行的。一个进程里可以同时运行多个线程,每条线程并行执行不同的任务。线程是独立调度和分派的基本单位,且同一进程内的多条线程会共享该进程的全部系统资源,比如虚拟地址空间、文件描述符和信号处理等。但同一进程中的多个线程拥有各自的调用栈(call stack)、寄存器上下文(register context)以及线程本地存储(thread-local storage)。一个进程能够依靠多个线程来处理任务,每条线程并行执行不同的工作。若进程需要处理大量任务,就需要众多线程,同时也需调用多个核心,在多核、多CPU或支持Hyper-threading的CPU上使用多线程编程能提升程序的执行效率。在单CPU单核的计算机上,运用多线程技术,可将进程中负责I/O处理、人机交互且常被阻塞的部分与密集计算的部分分开执行,从而提高CPU的利用率。

线程与传统多任务操作系统进程在以下几方面存在差异:
• 进程通常是独立的,而线程是进程的组成部分;
• 进程所携带的状态信息比线程多得多,而进程内的多个线程共享进程状态以及内存等资源;
• 进程拥有独立的地址空间,而线程共享其地址空间;
• 进程仅通过系统提供的进程间通信机制进行交互;
• 同一进程中线程之间的上下文切换通常比进程之间的上下文切换更快。线程与进程的优缺点如下:

1、创建线程

Rust标准库中的std::thread crate提供了与线程相关的函数。如前所述,一个Rust程序运行时会启动一个进程,该进程包含一个或多个线程,Rust中的线程是纯粹的操作系统线程,拥有自身的栈和状态。线程之间的通信可通过channel,类似于Go语言中的channel,也可通过一些同步原语。

pub fn start_one_thread() {
    let handle = thread::spawn(|| {
        println!("Hello from a thread!");
     });
    handle.join().unwrap();
 }

这段代码通过thread.spawn在当前线程中启动一个新线程,新线程简单输出“Hello from a thread”文本。若在main函数中调用此start_one_thread函数,控制台会正常输出该文本,但若注释掉handle.join.unwrap();那一行,期望的文本可能无法输出,原因是主程序退出时,即便新开的线程也会强制终止,所以有时需通过join等待线程完成。若忽略thread::spawn返回的JoinHandle值,新建的线程称为detached,通过调用JoinHandle的join方法,调用者需等待线程完成。

此代码直接使用handle.join().unwrap(),实际上join()返回Result类型,若线程panic,会返回Err,否则返回Ok(_),调用者甚至可获取线程的最终返回值:

pub fn start_one_thread_result() {
    let handle = thread::spawn(|| {
        println!("Hello from a thread!");
        200
    });

    match handle.join() {
      Ok(v) => println!("thread result: {}", v),
      Err(e) => println!("error: {:?}", e),
    }
}

以下代码启动多个线程:

pub fn start_two_threads() {
    let handle1 = thread::spawn(|| {
        println!("Hello from a thread1!");
    });

    let handle2 = thread::spawn(|| {
        println!("Hello from a thread2!");
    });

    handle1.join().unwrap();
    handle2.join().unwrap();
}

若启动N个线程,可使用Vector保存线程的handle:

pub fn start_n_threads() {
    const N: isize = 10;
    let handles: Vec<_> = (0..N)
        .map(|i| {
            thread::spawn(move || {
                println!("Hello from a thread{}!", i);
            })
        })
        .collect();
    for handle in handles {
        handle.join().unwrap();
    }
}

2、Thread Builder

通过Builder可对线程的初始状态进行更多控制,比如设置线程名称、栈大小等。

use std::thread;

//通过Builder创建线程
fn start_one_thread_by_builder() {
    // 创建线程构建器
    let builder = thread::Builder
        ::new()
        .name("foo".into()) // 设置线程名称
        .stack_size(32 * 1024); // 设置栈大小

    // 通过构建器创建线程
    let handler = builder
        .spawn(|| {
            println!("Hello from a thread!");
        })
        .unwrap();

    // 等待线程执行完毕
    handler.join().unwrap();
}
fn main() {
    start_one_thread_by_builder();
}

在这里插入图片描述

它提供spawn开启线程,还提供spawn_scoped开启scoped thread(后续会讲),一个实验性方法spawn_unchecked,提供更宽松的生命周期绑定,调用者需确保引用的对象在丢弃前线程的join已被调用,或使用‘static生命周期,因是实验性方法,不做过多介绍,简单示例如下:

use std::thread::Builder;

fn main() {
    let builder = Builder::new();
    let x = 1;
    let thread_x = &x;
    //可在unsafe中通过builder.spawn_unchecked来创建线程
    let handler = unsafe {
        builder
            .spawn_unchecked(move || {
                println!("x = {}", *thread_x);
            })
            .unwrap()
    };

    // caller has to ensure ‘join()‘ is called, otherwise
    // it is possible to access freed memory if ‘x‘ gets
    // dropped before the thread closure is executed!
    handler.join().unwrap();
}

在这里插入图片描述

3、当前的线程

由于线程是操作系统最小的调度和运算单元,一段代码的执行隶属于某个线程。如何获取当前线程?通过thread::current()可获取,返回Thread对象,可通过它获取线程ID和名称:

use std::thread;

//获取当前线程
pub fn current_thread() {
    //获取当前线程的id和name
    let current_thread = thread::current();
    println!("current thread: {:?},{:?}", current_thread.id(), current_thread.name());

    let builder = thread::Builder
        ::new()
        .name("foo".into()) // 设置线程名称
        .stack_size(32 * 1024); // 设置栈大小

    let handler = builder
        .spawn(|| {
            let current_thread = thread::current();
            //获取当前子线程的id和name
            println!("child thread: {:?},{:?}", current_thread.id(), current_thread.name());
        })
        .unwrap();

    handler.join().unwrap();
}

fn main() {
    current_thread();
}

在这里插入图片描述

甚至,还可通过其unpark方法唤醒被阻塞(parked)的线程:

use std::thread;
use std::time::Duration;

fn main() {
    //创建一个packed的线程
    let parked_thread = thread::Builder
        ::new()
        .spawn(|| {
            println!("Parking thread");
            //通过thread::park()来阻塞线程
            thread::park();
            println!("Thread unparked");
        })
        .unwrap();

    //等待线程阻塞一段时间
    println!("Sleeping in the main thread for 2 seconds");
    thread::sleep(Duration::from_secs(2));

    println!("Unpark the thread");
    //通过thread::unpark()来唤醒线程
    parked_thread.thread().unpark();

    parked_thread.join().unwrap();
}

在这里插入图片描述

park和unpark是用于阻塞和唤醒线程的方法,利用它们可有效利用CPU,让暂时不满足条件的线程暂时不可执行。

4、并发数和当前线程数

并发能力是一种资源,一台机器能提供的并发能力数值通常等同于计算机拥有的CPU数(逻辑核数),但在虚拟机和容器环境下,程序可使用的CPU核数可能受限。可通过

available_parallelism 获取当前的并发数:
use std::{ io, thread };

fn main() -> io::Result<()> {
    let count = thread::available_parallelism()?.get();
    println!("Number of logical cores available: {count}");
    Ok(())
}

可看到当前逻辑处理器数量是12
在这里插入图片描述

5、affinity线程亲和性

Rust的affinity第三方库用于设置和获取线程的CPU亲和性(CPU affinity),即限制线程运行在哪些CPU上。可用于性能调优、NUMA优化或特定核心负载控制等场景。

一、安装方法
在Cargo.toml中添加依赖:

[dependencies]
affinity = "0.1.2"

二、基本概念
CPU亲和性指将线程或进程绑定到特定CPU核心上运行。affinity库封装了这部分OS提供的接口,支持:
设置当前线程只能在哪些核心上运行。
获取当前线程的CPU核心绑定状态。
查询系统中可用的CPU核心数量。

三、常用API介绍

use affinity::{get_core_num, set_thread_affinity, get_thread_affinity};
  1. 获取系统中可用的CPU核心列表
fn main() {
    //获取系统中可用核心
    let cores = affinity::get_core_num();
    println!("系统可用核心数:{cores}");
}

在这里插入图片描述

  1. 设置和获取当前线程的CPU亲和性
fn main() {
    use std::thread;

    //获取系统中可用核心
    let cores = affinity::get_core_num();

    // 设置为第 3 个 CPU
    let core_id = 3;
    if core_id >= cores {
        panic!("核心编号超出范围");
    }
    let handle = thread::spawn(move || {
        //绑定到core_id
        affinity::set_thread_affinity(vec![core_id]).expect("绑定失败");

        //获取当前线程的亲和性
        let current = affinity::get_thread_affinity().unwrap();
        println!("当前线程绑定到核心: {:?}", current);
    });
    handle.join().unwrap();
}

在这里插入图片描述

四、完整示例

use affinity::{ self, set_thread_affinity, get_thread_affinity, get_core_num };
use std::thread;
use std::time::Duration;

fn bind_even_cores() {
    let cores = affinity::get_core_num();
    println!("系统可用核心数:{cores}");
    //绑定到偶数核心
    let core_id: Vec<usize> = (0..get_core_num()).step_by(2).collect();
    //设置亲和性和获取亲和性
    // 启动线程并绑定
    let handle = thread::spawn(move || {
        //绑定到core_id
        //这里set_thread_affinity()获取了core_id的所有权,后面core_id就不能再使用了,所以需要clone一下
        set_thread_affinity(core_id.clone()).expect("绑定失败");

        //获取当前线程的亲和性
        let current = get_thread_affinity().unwrap();
        println!("当前线程绑定到核心: {:?}", current);

        //运行在绑定的核心上
        //注意,绑定的核心不能大于等于系统核心数,否则会报错
        for i in 0..core_id.len() {
            println!("线程在核心 {} 上运行: 第 {} 次", core_id[i], i);
            thread::sleep(Duration::from_secs(1));
        }
    });

    handle.join().unwrap();
}

fn main() {
    bind_even_cores();
}

在这里插入图片描述

五、实用场景
在这里插入图片描述

六、调试技巧
使用htop查看线程运行在哪些核心上。
在Linux下用taskset -c启动程序强制绑定进程(可配合库对比)。
numactl查看/控制NUMA分布情况。

七、注意事项
设置线程亲和性不保证线程只运行在指定核心上,操作系统仍有一定调度权(尤其是负载高时)。

与Tokio这类async runtime混用时需特别小心,可能一个线程运行多个任务时不适合绑定特定CPU。

在多线程场景中,避免多个线程绑定到相同核心,否则反而降低性能。

6、num_cpus

num_cpus是Rust中一个轻量级、跨平台的第三方库,用于获取系统的CPU核心数,常用于多线程编程中,帮助合理配置线程池大小或并发任务数量。
以下是对num_cpus库的详细用法介绍,包括安装方法、核心函数、典型应用场景及注意事项。

📦 一、安装
在Cargo.toml中添加:

[dependencies]
num_cpus = "1.17.0"

🧠 二、核心API说明
num_cpus提供两个核心函数:
fn get() -> usize
fn get_physical() -> usize

  1. num_cpus::get()
    返回逻辑CPU数量(即包含超线程技术的线程总数)。
    通常用于线程池大小设置。
let logical_cpus = num_cpus::get();
println!("逻辑 CPU 核心数: {}", logical_cpus);

![在这里插入图片描述

文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/13183.html

(0)
LomuLomu
上一篇 10小时前
下一篇 8小时前

相关推荐

  • 2025年最新IDEA激活码分享:永久破解IDEA至2099年详细教程

    你是否正在寻找最新IDEA激活码或破解方法?本文将为你提供完整的JetBrains全家桶(包括IDEA、PyCharm、DataGrip、Goland等)永久破解教程,有效期直达2099年! 先来看看成功破解后的效果截图,可以看到IDEA已成功激活至2099年: 下面将详细介绍如何一步步完成IDEA的永久激活,这个方法同样适用于旧版本,无论你使用什么操作系统…

    2025 年 5 月 9 日
    19000
  • 2024 GoLand最新激活码,GoLand永久免费激活码2025-01-11 更新

    GoLand 2024最新激活码 以下是最新的GoLand激活码,更新时间:2025-01-11 🔑 激活码使用说明 1️⃣ 复制下方激活码 2️⃣ 打开 GoLand 软件 3️⃣ 在菜单栏中选择 Help -> Register 4️⃣ 选择 Activation Code 5️⃣ 粘贴激活码,点击 Activate ⚠️ 必看!必看! 🔥 获取最新激活…

    2025 年 1 月 11 日
    45500
  • 实战指南:理解 ThreadLocal 原理并用于Java 多线程上下文管理

    目录 一、ThreadLocal基本知识回顾分析 (一)ThreadLocal原理 (二)既然ThreadLocalMap的key是弱引用,GC之后key是否为null? (三)ThreadLocal中的内存泄漏问题及JDK处理方法 (四)部分核心源码回顾 ThreadLocal.set()方法源码详解 ThreadLocalMap.get()方法详解 Th…

    2025 年 1 月 22 日
    55500
  • 2024 PyCharm最新激活码,PyCharm永久免费激活码2025-02-05 更新

    PyCharm 2024最新激活码 以下是最新的PyCharm激活码,更新时间:2025-02-05 🔑 激活码使用说明 1️⃣ 复制下方激活码 2️⃣ 打开 PyCharm 软件 3️⃣ 在菜单栏中选择 Help -> Register 4️⃣ 选择 Activation Code 5️⃣ 粘贴激活码,点击 Activate ⚠️ 必看!必看! 🔥 获取最…

    2025 年 2 月 5 日
    66600
  • 如何做好软件架构师

    本文以个人视野聊下软件架构师的工作以及软件架构设计知识。做开发工作接近10年了,期间主要做Windows应用开发。在成熟的“华南区最大WPF团队”希沃白板呆了较长一段时间、后面从0到1构建Windows技术栈以及会议屏软件集,在软件设计这块自己成长了很多。之前整理过如何做好技术经理 – 唐宋元明清2188 – 博客园,这里梳理下自己的设计思维,算是自己阶段性…

    未分类 2025 年 1 月 15 日
    27200

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信