问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

Rust多线程编程中的消息传递机制详解

创作时间:
作者:
@小白创作中心

Rust多线程编程中的消息传递机制详解

引用
1
来源
1.
https://geekdaxue.co/read/hbj9527@multithread/iogsl28zbetqapcc

Rust语言的多线程编程以其独特的所有权和生命周期机制著称,其中消息传递机制是实现线程间通信的重要方式。本文将详细介绍Rust中基于消息传递的多线程编程,包括单发送者/单接收者管道、多发送者/单接收者管道以及如何通过Mutex实现多消费者管道等核心知识点。

基本认知

直接先上Go语言中的经典名言来感受一下为什么用消息传递机制来进行通信,而不是共享内存

Do not communicate by sharing memory
instead, share memory by communicating

多线程间有多种方式可以共享传递数据,最常用的方式就是通过消息传递或者将锁和Arc联合使用,而对于消息传递,在编程界还有一个大名鼎鼎的Actor线程模型为其背书,典型的有Erlang语言,还有上述Go的名言。

单发送者/单接收者管道

基本使用

标准库提供了通道std::sync::mpsc,其中mpsc是multiple producer, single consumer的缩写,代表了该通道支持多个发送者,但是只支持唯一的接收者。当然,支持多个发送者也意味着支持单个发送者,我们先来看看单发送者、单接收者的简单例子:

fn main() {
    // 创建一个消息通道, 返回一个元组:(发送者,接收者)
    let (tx, rx) = mpsc::channel();

    // 创建线程,并发送消息
    thread::spawn(move || {
        // 发送数字1, send方法返回Result<T,E> , 通过unwrap进行快速错误处理
        tx.send(1).unwrap();

        // 下面代码将报错,因为编译器自动推导出通道传递的值是i32类型,那么Option<i32>类型将产生不匹配错误
        // tx.send(Some(1)).unwrap()
    });

    // 在主线程中接收子线程发送的消息并输出
    println!("receive {}", rx.recv().unwrap());
}

以上代码并不复杂,但需要注意以下几点:

  • tx,rx对应发送者和接收者,它们的类型编译器自动推导。tx.send(1)发送了整数,一旦类型被推导确定,该通道就只能传递对应类型的值,例如上述例子被注释的代码报错
  • 接收消息的操作rx.recv()会阻塞当前线程,直到取到值或者通道被关闭
  • 需要使用move将tx的所有权转移到子线程的闭包中
  • send方法返回Result,因为如果接收者被drop导致发送的值不被任何人接收,此时毫无意义,因此返回一个错误最为合适。但是在实际项目中别偷懒用unwrap()处理
  • 同样的,对于recv方法来说,当发送者关闭时,它也会接收一个错误,用于说明不会再有任何值发送过来了

不阻塞的try_recv方法

除了上述recv方法,还可以使用try_recv尝试接收一次消息,该方法并不会阻塞线程,当通道中没有消息时,它会立刻返回一个错误

由于子线程的创建需要时间,因此println!和try_recv方法会先执行,而此时子线程的消息还未被发出。try_recv会尝试立即读取一次消息,因为消息没有发出,此次读取最终会报错,且主线程运行结束(可悲的是,相对于主线程中的代码,子线程的创建速度实在是过慢,直到主线程结束,都无法完成子线程的初始化)

如上,try_recv返回了一个错误,错误内容是Empty,代表通道并没有消息。如果你尝试把println!复制一些行,就会发现一个有趣的输出

如上,当子线程创建成功且发送消息后,主线程会接收到Ok(1)的消息内容,紧接着子线程结束,发送者也随着被drop,此时接收者又会报错,但是这次错误原因有所不同,Disconnected代表发送者已经被关闭

传输具有所有权的数据

使用通道来传输数据,一样要遵循Rust的所有权规则:

  • 若值的类型实现了Copy特征,则直接复制一份该值,然后传输过去,例如之前的i32类型
  • 若值没有实现Copy,则它的所有权会被转移给接收端,在发送端继续使用该值将报错

来看看第二种情况:

以上代码中,String底层的字符串是存储在堆上,并没有实现Copy特征,当它被发送后,会将所有权从发送端的s转移给接收端的received,之后s将无法被使用

各种细节不禁令人感叹:Rust还是安全!假如没有所有权的保护,String字符串将被两个线程同时持有,任何一个线程对字符串内容的修改都会导致另外一个线程持有的字符串被改变,除非你故意这么设计,否则这就是不安全的隐患

使用for进行循环接收

下面来看看如何连续接收通道中的值

在上面代码中,主线程和子线程是并发运行的,子线程在不停的发送消息->休眠1秒,与此同时,主线程使用for循环阻塞从rx迭代器中接收消息,当子线程运行完成时,发送者tx会随之被drop,此时for循环将被终止,最终main线程成功结束

多发送者/单接收者管道

这个模式下,由于子线程会拿走发送者的所有权,因此我们必须对发送者进行克隆,然后让每个线程拿走它的一份拷贝

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        tx.send(String::from("hi from raw tx")).unwrap();
    });

    thread::spawn(move || {
        tx1.send(String::from("hi from cloned tx")).unwrap();
    });

    for received in rx {
        println!("Got: {}", received);
    }
}

代码并无太大区别,就多了一个对发送者的克隆lettx1=tx.clone();,然后一个子线程拿走tx的所有权,另一个子线程拿走tx1的所有权。但是有几点需要注意:

  • 需要所有的发送者都被drop掉后,接收者rx才会收到错误,进而跳出for循环,最终结束主线程
  • 这里虽然用了clone但是并不会影响性能,因为它并不在热点代码路径中,仅仅会被执行一次
  • 由于两个子线程谁先创建完成是未知的,因此哪条消息先发送也是未知的,最终主线程的输出顺序也不确定
  • 特别注意:上述第三点的消息顺序仅仅是因为线程创建引起的,并不代表通道中的消息是无序的,对于通道而言,消息的发送顺序和接收顺序是一致的,满足FIFO原则(先进先出)

多发送者/互斥量的多消费者管道

前面提到过,Rust通道是多生产者、单消费者的。或者更具体地说,一个通道只有一个Receiver。任何线程池都不能有多个线程使用一个mpsc通道共享工作成果。不过,有个非常简单的方式可以绕过这个限制,只需要使用标准库。可以为Receiver添加一个Mutex,然后再共享。下面是一个实现它的模块

pub mod shared_channel {
    use std::sync::{Arc, Mutex};
    use std::sync::mpsc::{channel, Sender, Receiver};

    /// 对Receiver的线程安全的封装
    pub struct SharedReceiver<T>(Arc<Mutex<Receiver<T>>>);

    impl<T> Iterator for SharedReceiver<T> {
        /// 从封装的接收者获取下一项
        fn next(&mut self) -> Option<T> {
            let guard = self.0.lock().unwrap();
        }
    }

    /// 创建一个新通道,其接收者可以跨线程共享。这会返回一个发送者和一个接收者
    /// 与stdlib的channel()类似,有时候可以直接代替它使用
    pub fn shared_channel<T>() -> (Sender<T>, SharedReceiver<T>) {
        let (sender, receiver) = channel();
        (sender, SharedReceiver(Arc::new(Mutex::new(receiver))))
    }
}

这里使用了一个Arc>>,其中泛型被嵌套了很多层。这种情况在Rust中比在C++中更常见。只要依次读出它们的名字就可以理解其含义,如下图所示

更好的性能之mpmc(多发多收)

如果你需要mpmc(多发送者,多接收者)或者需要更高的性能,可以考虑第三方库:

同步和异步管道

Rust标准库的mpsc管道其实分为两种类型:同步和异步

异步管道

之前我们使用的都是异步通道:无论接收者是否正在接收消息,消息发送者在发送消息时都不会被阻塞

运行后输出如下

主线程因为睡眠阻塞了3秒,因此并没有进行消息接收,而子线程却在此期间轻松完成了消息的发送。等主线程睡眠结束后,才姗姗来迟的从通道中接收了子线程老早之前发送的消息。从输出可以看出,发送之前和发送之后是连续输出的,没有受到接收端主线程的任何影响,因此通过mpsc::channel创建的通道是异步通道

同步管道

与异步通道相反,同步通道发送消息是阻塞的,只有在消息被接收后才解除阻塞,例如

运行后输出如下

可以看出,主线程由于睡眠被阻塞导致无法接收消息,因此子线程的发送也一直被阻塞,直到主线程结束睡眠并成功接收消息后,发送才成功:发送之后的输出是在receive1之后,说明只有接收消息彻底成功后,发送消息才算完成

消息缓存(管道缓存)

细心的读者可能已经发现在创建同步通道时,我们传递了一个参数0mpsc::sync_channel(0);这是什么意思呢?先将0改成1,然后再运行试试

竟然得到了和异步通道一样的效果:根本没有等待主线程的接收开始,消息发送就立即完成了!难道同步通道变成了异步通道?别急,将子线程中的代码修改下试试

在子线程中,我们又多发了一条消息,此时输出如下

  • 更奇怪的事出现了,第一条消息瞬间发送完成,没有阻塞,而发送第二条消息时却符合同步通道的特点,阻塞了,直到主线程接收后,才发送完成
  • 其实一切关键就在于1上,该值可以用来指定同步通道的消息缓存条数,当你设定为N时,发送者就可以无阻塞的往通道中发送N条消息,当消息缓冲队列满了后,新的消息发送将被阻塞(如果没有接收者消费缓冲队列中的消息,那么第N-1条消息就将触发发送阻塞)
  • 问题又来了,异步通道创建时完全没有这个缓冲值参数mpsc::channel(),它的缓冲值怎么设置呢?都异步了,都可以无限发送了,事实上异步通道的缓冲上限取决于你的内存大小,不要撑爆就行
  • 因此使用异步消息虽然能非常高效且不会造成发送线程的阻塞,但是存在消息未及时消费,最终内存过大的问题。在实际项目中,可以考虑使用一个带缓冲值的同步通道来避免这种风险

关闭管道

之前我们数次提到了通道关闭,并且提到了当通道关闭后,发送消息或接收消息将会报错。那么如何关闭通道呢?很简单,所有发送者被drop或所有接收者被drop后通道自动关闭。Rust的Drop trait没有性能损耗,原因如下:

  • 零开销抽象:Rust语言的设计原则之一是尽可能减少运行时的开销。Drop trait的实现方式是允许程序员在对象被销毁时执行一些代码,但这些代码的执行并不会对程序的运行时性能产生影响
  • 优化编译器:Rust的编译器对Drop trait进行了优化。当一个对象被销毁时,编译器会生成相应的代码来调用该对象的Drop trait中的代码。这个过程是编译时确定的,因此不会在运行时产生额外的开销
  • 避免内存泄漏:Rust的内存管理机制可以自动管理内存的分配和释放。使用Drop trait可以确保在对象被销毁时释放相关的资源,从而避免内存泄漏

传输多种类型的数据

之前提到过,一个消息通道只能传输一种类型的数据,如果你想要传输多种类型的数据

  • 可以为每个类型创建一个通道
  • 你也可以使用枚举类型来实现

如上所示,枚举类型还能让我们带上想要传输的数据,但是有一点需要注意,Rust会按照枚举中占用内存最大的哪个成员进行内存对齐,这意味着就算你传输的是枚举中占用内存最小的成员,它占用的内存依然和最大的成员相同,因此会造成内存上的浪费

新手容易遇到的坑:一直阻塞

mpsc虽然相当简洁明了,但是在使用起来还是可能存在坑

fn main() {
    let (send, recv) = mpsc::channel();

    for i in 0..num_threads {
        let thread_send = send.clone();
        thread::spawn(move || {
            thread_send.send(i).unwrap();
            println!("thread {:?} finished", i);
        });
    }

    // 在这里 drop send ...

    for x in recv {
        println!("Got: {}", x);
    }

    println!("finished iterating");
}
  • 以上代码看起来非常正常,但是运行后主线程会一直阻塞,最后一行打印输出也不会被执行,原因在于,子线程拿走的是复制后的send的所有权,这些拷贝会在子线程结束后被drop,因此无需担心,但是send本身却直到main函数的结束才会被drop
  • 之前提到,通道关闭的两个条件:发送者者接收者全部被drop,要结束for循环显然是要求发送者全部drop,但是由于send自身没有被drop,会导致该循环永远无法结束,最终主线程会一直被阻塞
  • 解决办法很简单,drop掉send即可,在代码中的注释下面添加一行drop(send)
© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号