文章标题:
Rust高并发线程:原理剖析与实践应用
文章内容:
🌟🌟 欢迎来到景天科技苑🌟🌟
🎉🎉 养成好习惯,先赞后再看哦~🎉🎉
🏆 作者介绍:景天科技苑
🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。🏆《博客》:涵盖Rust开发、Python全栈、Golang开发、云原生开发、PyQt5和Tkinter桌面开发、小程序开发、人工智能、js逆向、App逆向、网络系统安全、数据分析、Django、fastapi、flask等框架、云原生K8S、linux、shell脚本等实操经验、网站搭建、数据库等分享。
所属的专栏:Rust高性能并发编程
景天的主页:景天科技苑
文章目录
- 线程
-
- 1、创建线程
- 2、Thread Builder
- 3、当前的线程
- 4、并发数和当前线程数
- 5、affinity线程亲和性
- 6、num_cpus
- 8、sleep 和 park
- 9、scoped thread
- 10、ThreadLocal
- 11、Move
- 12、设置线程优先级
- 13、Panic
- 14、Rayon scoped thread
- 15、Go 风格的启动线程
线程
线程(英文:thread)是操作系统中能够进行运算和调度的最小单元。通常情况下,线程包含在进程之中,是进程内实际运作的部分,所以程序实际运行时是以线程为单位进行的。一个进程里可以同时运行多个线程,每条线程并行执行不同的任务。线程是独立调度和分派的基本单位,且同一进程内的多条线程会共享该进程的全部系统资源,比如虚拟地址空间、文件描述符和信号处理等。但同一进程中的多个线程拥有各自的调用栈(call stack)、寄存器上下文(register context)以及线程本地存储(thread-local storage)。一个进程能够依靠多个线程来处理任务,每条线程并行执行不同的工作。若进程需要处理大量任务,就需要众多线程,同时也需调用多个核心,在多核、多CPU或支持Hyper-threading的CPU上使用多线程编程能提升程序的执行效率。在单CPU单核的计算机上,运用多线程技术,可将进程中负责I/O处理、人机交互且常被阻塞的部分与密集计算的部分分开执行,从而提高CPU的利用率。
线程与传统多任务操作系统进程在以下几方面存在差异:
• 进程通常是独立的,而线程是进程的组成部分;
• 进程所携带的状态信息比线程多得多,而进程内的多个线程共享进程状态以及内存等资源;
• 进程拥有独立的地址空间,而线程共享其地址空间;
• 进程仅通过系统提供的进程间通信机制进行交互;
• 同一进程中线程之间的上下文切换通常比进程之间的上下文切换更快。线程与进程的优缺点如下:
1、创建线程
Rust标准库中的std::thread crate提供了与线程相关的函数。如前所述,一个Rust程序运行时会启动一个进程,该进程包含一个或多个线程,Rust中的线程是纯粹的操作系统线程,拥有自身的栈和状态。线程之间的通信可通过channel,类似于Go语言中的channel,也可通过一些同步原语。
pub fn start_one_thread() {
let handle = thread::spawn(|| {
println!("Hello from a thread!");
});
handle.join().unwrap();
}
这段代码通过thread.spawn在当前线程中启动一个新线程,新线程简单输出“Hello from a thread”文本。若在main函数中调用此start_one_thread函数,控制台会正常输出该文本,但若注释掉handle.join.unwrap();那一行,期望的文本可能无法输出,原因是主程序退出时,即便新开的线程也会强制终止,所以有时需通过join等待线程完成。若忽略thread::spawn返回的JoinHandle值,新建的线程称为detached,通过调用JoinHandle的join方法,调用者需等待线程完成。
此代码直接使用handle.join().unwrap(),实际上join()返回Result类型,若线程panic,会返回Err,否则返回Ok(_),调用者甚至可获取线程的最终返回值:
pub fn start_one_thread_result() {
let handle = thread::spawn(|| {
println!("Hello from a thread!");
200
});
match handle.join() {
Ok(v) => println!("thread result: {}", v),
Err(e) => println!("error: {:?}", e),
}
}
以下代码启动多个线程:
pub fn start_two_threads() {
let handle1 = thread::spawn(|| {
println!("Hello from a thread1!");
});
let handle2 = thread::spawn(|| {
println!("Hello from a thread2!");
});
handle1.join().unwrap();
handle2.join().unwrap();
}
若启动N个线程,可使用Vector保存线程的handle:
pub fn start_n_threads() {
const N: isize = 10;
let handles: Vec<_> = (0..N)
.map(|i| {
thread::spawn(move || {
println!("Hello from a thread{}!", i);
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}
2、Thread Builder
通过Builder可对线程的初始状态进行更多控制,比如设置线程名称、栈大小等。
use std::thread;
//通过Builder创建线程
fn start_one_thread_by_builder() {
// 创建线程构建器
let builder = thread::Builder
::new()
.name("foo".into()) // 设置线程名称
.stack_size(32 * 1024); // 设置栈大小
// 通过构建器创建线程
let handler = builder
.spawn(|| {
println!("Hello from a thread!");
})
.unwrap();
// 等待线程执行完毕
handler.join().unwrap();
}
fn main() {
start_one_thread_by_builder();
}
它提供spawn开启线程,还提供spawn_scoped开启scoped thread(后续会讲),一个实验性方法spawn_unchecked,提供更宽松的生命周期绑定,调用者需确保引用的对象在丢弃前线程的join已被调用,或使用‘static生命周期,因是实验性方法,不做过多介绍,简单示例如下:
use std::thread::Builder;
fn main() {
let builder = Builder::new();
let x = 1;
let thread_x = &x;
//可在unsafe中通过builder.spawn_unchecked来创建线程
let handler = unsafe {
builder
.spawn_unchecked(move || {
println!("x = {}", *thread_x);
})
.unwrap()
};
// caller has to ensure ‘join()‘ is called, otherwise
// it is possible to access freed memory if ‘x‘ gets
// dropped before the thread closure is executed!
handler.join().unwrap();
}
3、当前的线程
由于线程是操作系统最小的调度和运算单元,一段代码的执行隶属于某个线程。如何获取当前线程?通过thread::current()可获取,返回Thread对象,可通过它获取线程ID和名称:
use std::thread;
//获取当前线程
pub fn current_thread() {
//获取当前线程的id和name
let current_thread = thread::current();
println!("current thread: {:?},{:?}", current_thread.id(), current_thread.name());
let builder = thread::Builder
::new()
.name("foo".into()) // 设置线程名称
.stack_size(32 * 1024); // 设置栈大小
let handler = builder
.spawn(|| {
let current_thread = thread::current();
//获取当前子线程的id和name
println!("child thread: {:?},{:?}", current_thread.id(), current_thread.name());
})
.unwrap();
handler.join().unwrap();
}
fn main() {
current_thread();
}
甚至,还可通过其unpark方法唤醒被阻塞(parked)的线程:
use std::thread;
use std::time::Duration;
fn main() {
//创建一个packed的线程
let parked_thread = thread::Builder
::new()
.spawn(|| {
println!("Parking thread");
//通过thread::park()来阻塞线程
thread::park();
println!("Thread unparked");
})
.unwrap();
//等待线程阻塞一段时间
println!("Sleeping in the main thread for 2 seconds");
thread::sleep(Duration::from_secs(2));
println!("Unpark the thread");
//通过thread::unpark()来唤醒线程
parked_thread.thread().unpark();
parked_thread.join().unwrap();
}
park和unpark是用于阻塞和唤醒线程的方法,利用它们可有效利用CPU,让暂时不满足条件的线程暂时不可执行。
4、并发数和当前线程数
并发能力是一种资源,一台机器能提供的并发能力数值通常等同于计算机拥有的CPU数(逻辑核数),但在虚拟机和容器环境下,程序可使用的CPU核数可能受限。可通过
available_parallelism 获取当前的并发数:
use std::{ io, thread };
fn main() -> io::Result<()> {
let count = thread::available_parallelism()?.get();
println!("Number of logical cores available: {count}");
Ok(())
}
可看到当前逻辑处理器数量是12
5、affinity线程亲和性
Rust的affinity第三方库用于设置和获取线程的CPU亲和性(CPU affinity),即限制线程运行在哪些CPU上。可用于性能调优、NUMA优化或特定核心负载控制等场景。
一、安装方法
在Cargo.toml中添加依赖:
[dependencies]
affinity = "0.1.2"
二、基本概念
CPU亲和性指将线程或进程绑定到特定CPU核心上运行。affinity库封装了这部分OS提供的接口,支持:
设置当前线程只能在哪些核心上运行。
获取当前线程的CPU核心绑定状态。
查询系统中可用的CPU核心数量。
三、常用API介绍
use affinity::{get_core_num, set_thread_affinity, get_thread_affinity};
- 获取系统中可用的CPU核心列表
fn main() {
//获取系统中可用核心
let cores = affinity::get_core_num();
println!("系统可用核心数:{cores}");
}
- 设置和获取当前线程的CPU亲和性
fn main() {
use std::thread;
//获取系统中可用核心
let cores = affinity::get_core_num();
// 设置为第 3 个 CPU
let core_id = 3;
if core_id >= cores {
panic!("核心编号超出范围");
}
let handle = thread::spawn(move || {
//绑定到core_id
affinity::set_thread_affinity(vec![core_id]).expect("绑定失败");
//获取当前线程的亲和性
let current = affinity::get_thread_affinity().unwrap();
println!("当前线程绑定到核心: {:?}", current);
});
handle.join().unwrap();
}
四、完整示例
use affinity::{ self, set_thread_affinity, get_thread_affinity, get_core_num };
use std::thread;
use std::time::Duration;
fn bind_even_cores() {
let cores = affinity::get_core_num();
println!("系统可用核心数:{cores}");
//绑定到偶数核心
let core_id: Vec<usize> = (0..get_core_num()).step_by(2).collect();
//设置亲和性和获取亲和性
// 启动线程并绑定
let handle = thread::spawn(move || {
//绑定到core_id
//这里set_thread_affinity()获取了core_id的所有权,后面core_id就不能再使用了,所以需要clone一下
set_thread_affinity(core_id.clone()).expect("绑定失败");
//获取当前线程的亲和性
let current = get_thread_affinity().unwrap();
println!("当前线程绑定到核心: {:?}", current);
//运行在绑定的核心上
//注意,绑定的核心不能大于等于系统核心数,否则会报错
for i in 0..core_id.len() {
println!("线程在核心 {} 上运行: 第 {} 次", core_id[i], i);
thread::sleep(Duration::from_secs(1));
}
});
handle.join().unwrap();
}
fn main() {
bind_even_cores();
}
五、实用场景
六、调试技巧
使用htop查看线程运行在哪些核心上。
在Linux下用taskset -c启动程序强制绑定进程(可配合库对比)。
numactl查看/控制NUMA分布情况。
七、注意事项
设置线程亲和性不保证线程只运行在指定核心上,操作系统仍有一定调度权(尤其是负载高时)。
与Tokio这类async runtime混用时需特别小心,可能一个线程运行多个任务时不适合绑定特定CPU。
在多线程场景中,避免多个线程绑定到相同核心,否则反而降低性能。
6、num_cpus
num_cpus是Rust中一个轻量级、跨平台的第三方库,用于获取系统的CPU核心数,常用于多线程编程中,帮助合理配置线程池大小或并发任务数量。
以下是对num_cpus库的详细用法介绍,包括安装方法、核心函数、典型应用场景及注意事项。
📦 一、安装
在Cargo.toml中添加:
[dependencies]
num_cpus = "1.17.0"
🧠 二、核心API说明
num_cpus提供两个核心函数:
fn get() -> usize
fn get_physical() -> usize
- num_cpus::get()
返回逻辑CPU数量(即包含超线程技术的线程总数)。
通常用于线程池大小设置。
let logical_cpus = num_cpus::get();
println!("逻辑 CPU 核心数: {}", logical_cpus);
![在这里插入图片描述
文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/13183.html