TOC
Open TOC
- Info
- Rust
- #02: Memory Safety
- Week 1 Exercises: Hello world
- #03: Error Handling
- #04: Object Oriented Rust
- Week 2 Exercises: Ownership and structs
- #05: Traits and Generics
- #06: Smart Pointers
- Week 3 Exercises: Error handling, I/O, and traits
- #07: Pitfalls in Multiprocessing
- Project 1: The DEET Debugger
- #09: Intro to Multithreading
- #10: Shared Memory
- #11: Synchronization
- Week 5 Exercises: Farm meets multithreading
- #12: Channels
- Week 6 Exercises: Sharing Data by Communicating
- #13: Scalability and Availability
- #15: Futures I
- #16: Futures II
- Project 2: Balancebeam
Info
https://www.bilibili.com/video/BV1Ra411A7kN
https://reberhardt.com/cs110l/spring-2020/
https://zhuanlan.zhihu.com/p/356912397
why rust
- no C or C++
- no gc
Rust
lang
https://www.rust-lang.org/zh-CN/
IDE
IDEA - rust - without debug
CLion - rust - with debug
cli
$ cargo new demo$ cd demo$ cargo run
#02: Memory Safety
Ownership and mutability
fn om_nom_nom(param: String) { println!("{}", param);}
fn main() { let s = String::from("hello"); om_nom_nom(s); om_nom_nom(s);}
参数类型未声明为引用,且 String
未实现 Copy
trait
故第一次调用 om_nom_nom
时,s
已被移动
fn om_nom_nom(param: u32) { println!("{}", param);}
fn main() { let x = 1; om_nom_nom(x); om_nom_nom(x);}
u32
实现了 Copy
trait
References
fn main() { let mut s = String::from("hello"); let s1 = &mut s; let s2 = &s; println!("{} {} {}", s, s1, s2);}
当 mutable reference 没有返回时,不允许 multiple references
fn main() { let mut s = String::from("hello"); let s1 = &mut s; println!("{} {}", s, s1);}
s1
对 s
的 mutable reference 还没有返回
fn main() { let mut s = String::from("hello"); let s1 = &mut s; println!("{}", s1); println!("{}", s)}
正确
Week 1 Exercises: Hello world
basic
string
&str
- immutable
String
- mutable
for iter
iter
iter_mut
loop
https://stackoverflow.com/q/28892351
fn
fn fib(n: i32) -> i32 { if n <= 1 { n } else { fib(n - 1) + fib(n - 2) }}
- in Rust, everything is an expression
- no semicolon
- no need to return, evaluating to the value of the last expression
- expressions separated by semicolons
- specify the return type for functions that return values
part-3-hangman
刚用 ruby 写过 hangman
由于对 string
的处理需要了解更多 rust 的语法
使用 Vec<char>
作为主力数据结构
let secret_word_chars: Vec<char> = secret_word.chars().collect();
无间隔打印 Vec<char>
secret_word_chars.iter().fold(String::new(), |acc, &char| acc + &*char.to_string())
基本的 io
io::stdout() .flush() .expect("Error flushing stdout.");let mut guess = String::new();io::stdin() .read_line(&mut guess) .expect("Error reading line.");
for loop 还可以这样写
for i in 0..secret_word_chars.len() { ...}
再次理解 References
contains
- borrow - has&
push
- move - no&
#03: Error Handling
Handling nulls
Option
- Some
- None
Handling errors
Result
- Ok
- Err
code
extern crate rand;use rand::Rng;
fn get_random_num() -> u32 { rand::thread_rng().gen_range(0, 42)}
fn feeling_lucky() -> Option<String> { if get_random_num() > 10 { Some(String::from("I'm feeling lucky!")) } else { None }}
fn poke_toddler() -> Result<&'static str, &'static str> { if get_random_num() > 10 { Ok("Hahahaha!") } else { Err("Waaaaahhh!") }}
fn main() { if feeling_lucky().is_none() { println!("Not feeling lucky :(") }
println!( "{}", feeling_lucky().unwrap_or(String::from("Not lucky :(")) );
match feeling_lucky() { Some(message) => { println!("Got message: {}", message) } None => { println!("No message returned :-/") } }
match poke_toddler() { Ok(message) => println!("Toddler said: {}", message), Err(cry) => println!("Toddler cried: {}", cry), }
// Panic if the baby cries: println!("{}", poke_toddler().unwrap()); // Same thing, but print a more descriptive panic message: println!("{}", poke_toddler().expect("Toddler cried :("));
panic!("Sad times!");}
需要在 Cargo.toml
中添加
[dependencies]rand = "0.6.0"
#04: Object Oriented Rust
实现一个 LinkedList
后进先出
code
extern crate core;use core::fmt;
struct Node { value: u32, next: Option<Box<Node>>, // no null pointers in Rust!}
pub struct LinkedList { head: Option<Box<Node>>, size: usize,}
impl Node { fn new(value: u32, next: Option<Box<Node>>) -> Node { Node {value: value, next: next} }}
impl LinkedList { pub fn new() -> LinkedList { LinkedList {head: None, size: 0} }
pub fn get_size(&self) -> usize { self.size }
pub fn is_empty(&self) -> bool { self.size == 0 }
pub fn push(&mut self, value: u32) { let new_node = Box::new(Node::new(value, self.head.take())); self.head = Some(new_node); self.size += 1; }
pub fn pop(&mut self) -> Option<u32> { let node = self.head.take()?; self.head = node.next; self.size -= 1; Some(node.value) }
pub fn display(&self) { let mut current: &Option<Box<Node>> = &self.head; let mut result = String::from("list ->"); loop { match current { Some(node) => { result = format!("{} {}", result, node.value); current = &node.next; }, None => break, } } println!("{}", result); }}
impl fmt::Display for LinkedList { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut current: &Option<Box<Node>> = &self.head; let mut result = String::from("list ->"); loop { match current { Some(node) => { result = format!("{} {}", result, node.value); current = &node.next; }, None => break, } } write!(f, "{}", result) }}
impl Drop for LinkedList { fn drop(&mut self) { let mut current = self.head.take(); while let Some(mut node) = current { current = node.next.take(); } }}
一些细节
- 对指针的表示
Option<Box<Node>>
Box
类似于std::unique_ptr
- 使用
Option
来处理 null
- 类似 go-lang,访问 pointer/reference 的结构成员直接使用
.
即可 push
成员函数中使用了self.head.take()
而非self.head
- 由于是 mutable reference,不能直接 move 其成员
- 使用
take
试图 move 其 Some 的部分,留下None
的部分
- pop 成员函数在
take
后添加了?
进行 unwrap- 若为
None
则返回None
- propagate errors
- 若为
Display
trait 使其支持直接println!("{}", list);
Drop
trait 在 loop 中使用了模式匹配
Week 2 Exercises: Ownership and structs
Part 1: Ownership short-answer exercises
fn main() { let mut s = String::from("hello"); let ref1 = &s; let ref2 = &ref1; let ref3 = &ref2; s = String::from("goodbye"); println!("{}", ref3.to_uppercase());}
在赋值后 immutable reference 仍在使用
fn drip_drop() -> &String { let s = String::from("hello world!"); return &s;}
函数返回,生命期结束
可以改为
static S: &'static str = "hello world!";
fn drip_drop() -> &'static str { return &S;}
改变其类型为 &str
同时添加 'static
修饰
fn main() { let s1 = String::from("hello"); let mut v = Vec::new(); v.push(s1); let s2: String = v[0]; println!("{}", s2);}
String
未实现 Copy
trait
不能从 Vec 中 move 其元素
Part 2: rdiff
算法部分,参考
https://en.wikipedia.org/wiki/Longest_common_subsequence_problem#Print_the_diff
主要谈一谈错误处理
?
operator - recoverable error
let file = File::open(filename)?;
等价于
let file = match File::open(filename) { Ok(file) => file, Err(err) => return Err(err),};
一旦出现 Err
,则将 Err
传播给调用者
unwrap
andexpect
- unrecoverable error
一旦出现 Err
,则直接 panic
Optional: rwc
略……
Optional challenge: Conway’s Game of Life
thread 'main' panicked at 'could not build default app window: NoAvailableAdapter'
似乎和显卡驱动相关
暂时放弃……
#05: Traits and Generics
trait ≈ interface + default method
Syntax
impl xxx for xxx
Deriving Traits
#[derive(Debug, PartialEq, Clone, Copy)]struct Point { x: f64, y: f64}
The following is a list of derivable traits:
- Comparison traits:
Eq
,PartialEq
,Ord
,PartialOrd
. Clone
, to createT
from&T
via a copy.Copy
, to give a type ‘copy semantics’ instead of ‘move semantics’.Hash
, to compute a hash from&T
.Default
, to create an empty instance of a data type.Debug
, to format a value using the{:?}
formatter.
Clone
和 Copy
配套使用
Eq
相较于 PartialEq
多了自反性
PartialEq
forf64
:NaN != NaN
Defining Your Own Traits
pub trait ComputeNorm { fn compute_norm(&self) -> f64 { 0.0 // default implementations }}
impl ComputeNorm for Point { fn compute_norm(&self) -> f64 { (self.x * self.x + self.y * self.y).sqrt() }}
Associated Type
impl Add for Point { type Output = Self; // associated type fn add(self, other: Self) -> Self { Point::new(self.x + other.x, self.y + other.y) }}
有点费解
联系泛型的版本
impl<T: Add<Output = T>> Add for Point<T> { type Output = Self;
fn add(self, other: Self) -> Self::Output { Self { x: self.x + other.x, y: self.y + other.y, } }}
https://doc.rust-lang.org/rust-by-example/generics/assoc_items/types.html
Generics
impl xxx for xxx<u32>
实例化特定类型
impl<T> xxx for xxx<T>
T
为类型参数,注意约束有两种写法
use std::fmt;pub struct MatchingPair<T> { first: T, second: T}
impl<T> MatchingPair<T> { pub fn new(first: T, second: T) -> Self { MatchingPair {first: first, second: second} }}
impl<T> fmt::Display for MatchingPair<T> where T: fmt::Display { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "({}, {})", self.first, self.second) }}
impl <T: Clone> Clone for MatchingPair<T> { fn clone(&self) -> Self { Self { first: self.first.clone(), second: self.second.clone() } }}
Trait Syntax in Functions and Composing Traits
泛型函数和 Traits 的组合
fn identity_fn<T>(x: T) -> T {x}
// An example of trait composition -- T must impl Display and PartialOrdfn print_min<T: fmt::Display + PartialOrd>(x: T, y: T) { if x < y { println!("The minimum is {}", x); } else { println!("The minimum is {}", y) }}
#06: Smart Pointers
Box<T>
std::unique_ptr
Rc<T>
multiple immutable references
实现可持久化版本的 LinkedList
pub fn push_front(&self, value: u32) -> LinkedList { let new_node: Rc<Node> = Rc::new(Node::new(value, self.head.clone())); LinkedList {head: Some(new_node), size: self.size + 1}}
// A tuple in Rust is like a struct -- you can access the zeroth element of// a tuple name tup by writing "tup.0", you can access the element at index// 2 by writing "tup.2", etc.pub fn pop_front(&self) -> (Option<LinkedList>, Option<u32>) { match &self.head { Some(node) => { let val = node.value; let new_head: Option<Rc<Node>> = node.next.clone(); let new_list = LinkedList {head: new_head, size: self.size - 1}; (Some(new_list), Some(val)) }, None => (None, None) }}
几处变化
- no
mut
- no
take
, useclone
RefCell<T>
RefCell
allows us to wrap a mutable piece of data behind an immutable reference. We can call borrow
and borrow_mut
on the RefCell
to acquire references to the internal data – the borrowing rules are now enforced at runtime. Therefore, RefCell
is safe, but it’s slightly more inefficient because it shifts what would have been a compile-time check to runtime. You commonly see Rc<RefCell<T>>
, which lets us have a flexible handle to heap allocated data – just like Rc
except now we can mutate things!
Week 3 Exercises: Error handling, I/O, and traits
Part 1: Inspecting File Descriptors
检查进程所打开的文件描述符,其流程如下
- 通过
pgrep
或ps
得到进程信息
pub struct Process { pub pid: usize, pub ppid: usize, pub command: String,}
- 通过遍历
/proc/{pid}/fd
得到文件描述符的列表
在 list_fds
中完成
- 根据进程号和文件描述符得到
OpenFile
信息
pub struct OpenFile { pub name: String, pub cursor: usize, pub access_mode: AccessMode,}
read_link - /proc/{pid}/fd/{fdnum}
read_to_string - /proc/{pid}/fdinfo/{fdnum}
- 打印
其中使用了 ANSI escape codes
note
注意在 IDE 中运行 test_openfile_from_fd
测试无法通过
在终端中运行正常
$ ./multi_pipe_test & cargo run multi_pipe_test
需要让进程保持在一段时间内运行
于是调试短时间内运行的程序几乎无解
另一处实现上的细节是 Option
和 Result
的选择
Option
一致的错误处理Result
对不同的错误进行不同的处理- 可以通过 https://doc.rust-lang.org/core/result/enum.Result.html#method.ok 将
Result
转换为Option
Part 2: A Generic LinkedList
改成泛型非常简单,加一些 <T>
即可
关键在于实现一些 traits
- Clone 和 PartialEq
需要同时为 Node<T>
和 LinkedList<T>
实现
impl<T> Clone for Node<T>where T: Clone,{ fn clone(&self) -> Self { Node::new(self.value.clone(), self.next.clone()) // recursion }}
impl<T> Clone for LinkedList<T>where T: Clone,{ fn clone(&self) -> Self { let mut res: LinkedList<T> = LinkedList::new(); res.head = self.head.clone(); res.size = self.size; return res; }}
注意这里 clone
的递归调用
以及 Clone
性质的传播
T -> Node<T> -> Box<Node<T>> -> Option<Box<Node<T>>>
IntoIterator
和Iterator
相当 tricky
参考 http://xion.io/post/code/rust-for-loop.html 可知
for x in v { // body}
等价于
let mut iter = IntoIterator::into_iter(v);loop { match iter.next() { Some(x) => { // body }, None => break, }}
注意到 v
和 iter
是 iterables,iter
是 Iterators
对于 LinkedList<T>
而言
an iterator that takes ownership of the list it is iterating over
只需实现 Iterator
trait,其中调用 pop_front
即可
没有
IntoIterator
trait 为什么也可以……
对于 &LinkedList<T>
而言
an iterator that only references elements in the list
比较复杂
pub struct LinkedListIter<'a, T>where T: Clone,{ current: &'a Option<Box<Node<T>>>,}
impl<'a, T> IntoIterator for &'a LinkedList<T>where T: Clone,{ type Item = T; type IntoIter = LinkedListIter<'a, T>; fn into_iter(self) -> Self::IntoIter { LinkedListIter { current: &self.head, } }}
impl<T> Iterator for LinkedListIter<'_, T>where T: Clone,{ type Item = T; fn next(&mut self) -> Option<Self::Item> { match self.current { Some(node) => { self.current = &node.next; Some(node.value.clone()) } None => None, } }}
需要定义数据结构 LinkedListIter
其中使用了生命期语法
This syntax essentially says that the struct lives as long as the reference it contains, so that we don’t have issues with dangling pointers.
在 next 中使用 clone
以确保 immutable reference
对于 &mut LinkedList<T>
而言
就更复杂了……
#07: Pitfalls in Multiprocessing
fork
fork + exec
use a higher-level abstraction
https://doc.rust-lang.org/std/process/struct.Command.html
pipe
https://stackoverflow.com/questions/49218599/write-to-child-process-stdin-in-rust/
https://docs.rs/os_pipe/latest/os_pipe/
signal
在信号处理程序中应考虑共享变量
以及使用 signal-safety 的函数
https://man7.org/linux/man-pages/man7/signal-safety.7.html
printf 和 malloc 都不是 signal-safety 的
另外,信号处理程序不应该阻塞,因为会独占处理器
使用 The self-pipe trick,避免使用信号处理程序
- Create a pipe
- When you’re awaiting a signal, read from the pipe (this will block until something is written to it)
- In the signal handler, write a single byte to the pipe
官方支持 https://man7.org/linux/man-pages/man2/signalfd.2.html
然而这让程序减少了并发度
either doing work, or reading to wait for a signal
为了解决这个问题,有两种方案
- Use threads
注意到线程是可调度的,而非独占的
仍然存在并发问题,但是 rust 可以控制这些问题
https://github.com/Detegr/rust-ctrlc
- Creates a self-pipe
- Installs a signal handler that writes to the pipe when SIGINT is received
- Spawns a thread:
loop { read from pipe; call handler function; }
可以在线程中使用非 signal-safety 的函数
- Use non-blocking I/O
TBD
Project 1: The DEET Debugger
info
实时协作开发 https://visualstudio.microsoft.com/zh-hans/services/live-share/
note
对程序结构进行简单分析
main.rs
程序入口,其参数为携带调试信息的可执行程序路径
抑制主进程对 SIGINT
信号的处理
SIGINT
信号会被传递到子进程
构造 Debugger
对象,并调用其 run
方法
debugger.rs
和debugger_command.rs
其 run
方法为死循环
通过 get_next_command
得到输入的命令,并解析为 DebuggerCommand
对象
pub enum DebuggerCommand { Quit, Run(Vec<String>), Continue, Backtrace, Break(String),}
其中使用了第三方库 rustyline
,提供了历史命令功能
通过匹配 DebuggerCommand
操作 Inferior
对象,即子进程
使用一些私有辅助函数操作子进程,主要有
create_new_inferior
- 构造Inferior
对象,并调用cont_inferior
cont_inferior
- 包装了Inferior
对象的cont
方法,使用handle_status
处理子进程的状态kill_inferior
- 包装了Inferior
对象的kill
方法,使用handle_status
处理子进程的状态handle_status
- 处理子进程的状态
pub enum Status { /// Indicates inferior stopped. Contains the signal that stopped the process, as well as the /// current instruction pointer that it is stopped at. Stopped(signal::Signal, usize),
/// Indicates inferior exited normally. Contains the exit status code. Exited(i32),
/// Indicates the inferior exited due to a signal. Contains the signal that killed the /// process. Signaled(signal::Signal),}
另外,Debugger
对象中还包含调试信息和断点信息,会通过参数传递给 Inferior
对象使用
inferior.rs
涉及到大量底层编程,下面逐一分析
首先需要明确,子进程是通过调用 wait
得到子进程的状态信息的
所有改变执行流的方法,最后都需要调用 wait
将状态信息返回给调用者
pub fn wait(&self, options: Option<WaitPidFlag>) -> Result<Status, nix::Error> { Ok(match waitpid(self.pid(), options)? { WaitStatus::Exited(_pid, exit_code) => Status::Exited(exit_code), WaitStatus::Signaled(_pid, signal, _core_dumped) => Status::Signaled(signal), WaitStatus::Stopped(_pid, signal) => { let regs = ptrace::getregs(self.pid())?; Status::Stopped(signal, regs.rip as usize) } other => panic!("waitpid returned unexpected status: {:?}", other), }) }
new
使用 Command
模块创建子进程
并在 pre_exec
中使用 ptrace
系统调用追踪子进程
注意使用这种方法成功加载子进程后,子进程会收到 SIGTRAP
信号
在此之后安装断点信息,也就是将断点位置的字节改为 0xcc
,从而触发 SIGTRAP
关于断点的更多实现细节,见下文
cont
更新断点信息,并判断当前 rip
是否指向某个断点位置的下一个字节
回顾
取指 - 译码 - 执行 - 更新 rip
若是,相当于在触发断点之后 cont
,需要重新执行断点位置的指令
于是写回原本的字节,并 ptrace::step
执行单步指令
若程序未结束执行,则再次在断点位置的字节改为 0xcc
,以保证断点仍然有效
最后 ptrace::cont
即可
kill
ptrace::kill
即可
别忘了调用 wait
,否则会出现僵死进程 <defunct>
print_backtrace
需要通过栈帧信息进行回溯
于是需要如下的编译选项
-O0 -g -no-pie -fno-omit-frame-pointer
如何回溯,看图易知
dwarf_data.rs
和gimli_wrapper.rs
注意到调试信息通常是以 DWARF
的格式存储的
dwarf_data.rs
使用 gimli_wrapper.rs
提取了如下调试信息
- 行号和地址之间的映射
- 函数名和地址之间的映射
- 变量信息
由于一些原因,实际测试发现没有行号和地址之间的映射
并且无法解析库函数,如
printf
使用了第三方库 https://github.com/gimli-rs/gimli
在本项目中,调试信息的实际作用有二
- 在子进程处于
Status::Stopped
时,或进行回溯时,通过rip
解析出行号和函数名 - 在打断点时,通过行号或函数名解析出断点地址
todo
SIGSEGV
后连续cont
会反复SIGSEGV
,行为与 gdb 不一致
$ gdb samples/segfaultReading symbols from samples/segfault...(gdb) rStarting program: /home/vgalaxy/Desktop/cs110l-spr-2020-starter-code/proj-1/deet/samples/segfault[Thread debugging using libthread_db enabled]Using host libthread_db library "/usr/lib/libthread_db.so.1".Calling func2About to segfault... a=2
Program received signal SIGSEGV, Segmentation fault.0x0000000000401162 in func2 (a=2) at samples/segfault.c:55 *(int*)0 = a;(gdb) cContinuing.
Program terminated with signal SIGSEGV, Segmentation fault.The program no longer exists.(gdb)
- 更新 gimli 库的使用,提取更多的调试信息
- Optional extensions
#09: Intro to Multithreading
使用 move
关键字拷贝下标的值
use std::thread;
const NAMES: [&str; 7] = ["Frank", "Jon", "Lauren", "Marco", "Julie", "Patty", "Tagalong Introvert Jerry"];
fn main() { let mut threads = Vec::new(); for i in 0..6 { threads.push(thread::spawn(move || { println!("Hello from printer {}!", NAMES[i]); })); } // wait for all the threads to finish for handle in threads { handle.join().expect("Panic occurred in thread!"); }}
否则会报错
closure may outlive the current function, but it borrows `i`, which is owned by the current function
也就是闭包的生命期可能比下标的生命期长
#10: Shared Memory
考虑共享变量是一个可变引用
Attempt 1
直接使用 move
关键字
extern crate rand;
use std::{thread, time};use rand::Rng;
fn handleCall() { let mut rng = rand::thread_rng(); thread::sleep(time::Duration::from_millis(rng.gen_range(0, 1000)));}
fn takeBreak() { let mut rng = rand::thread_rng(); thread::sleep(time::Duration::from_millis(rng.gen_range(0, 1000)));}
fn shouldTakeBreak() -> bool { rand::random()}
fn ticketAgent(id: usize, remainingTickets: &mut usize) { while *remainingTickets > 0 { handleCall(); *remainingTickets -= 1; println!("Agent #{} sold a ticket! ({} more to be sold)", id, remainingTickets); if shouldTakeBreak() { takeBreak(); } } println!("Agent #{} notices all tickets are sold, and goes home!", id);}
fn main() { let mut remainingTickets = 250;
let mut threads = Vec::new(); for i in 0..10 { threads.push(thread::spawn(move || { ticketAgent(i, &mut remainingTickets) })); } // wait for all the threads to finish for handle in threads { handle.join().expect("Panic occurred in thread!"); } println!("End of business day!");}
实际上拷贝了多份,可以通过编译,但是运行结果不正确
Attempt 2
使用 Rc<RefCell<T>>
结构
extern crate rand;use std::cell::RefCell;use std::rc::Rc;
use std::{thread, time};use rand::Rng;
fn handleCall() { let mut rng = rand::thread_rng(); thread::sleep(time::Duration::from_millis(rng.gen_range(0, 1000)));}
fn takeBreak() { let mut rng = rand::thread_rng(); thread::sleep(time::Duration::from_millis(rng.gen_range(0, 1000)));}
fn shouldTakeBreak() -> bool { rand::random()}
fn ticketAgent(id: usize, remainingTickets: &mut usize) { while *remainingTickets > 0 { handleCall(); *remainingTickets -= 1; println!("Agent #{} sold a ticket! ({} more to be sold)", id, remainingTickets); if shouldTakeBreak() { takeBreak(); } } println!("Agent #{} notices all tickets are sold, and goes home!", id);}
fn main() { let remainingTickets: Rc<RefCell<usize>> = Rc::new(RefCell::new(250));
let mut threads = Vec::new(); for i in 0..10 { let remainingTicketsRef = remainingTickets.clone(); threads.push(thread::spawn(|| { ticketAgent(i, &mut remainingTicketsRef.borrow_mut()) })); } // wait for all the threads to finish for handle in threads { handle.join().expect("Panic occurred in thread!"); } println!("End of business day!");}
在 Rc 上 clone
增加引用计数
在 RefCell 上 borrow_mut
获得可变引用
会报错
error[E0277]: `Rc<RefCell<usize>>` cannot be shared between threads safely --> src/main.rs:41:22 |41 | threads.push(thread::spawn(|| { | ^^^^^^^^^^^^^ `Rc<RefCell<usize>>` cannot be shared between threads safely | = help: the trait `Sync` is not implemented for `Rc<RefCell<usize>>` = note: required because of the requirements on the impl of `Send` for `&Rc<RefCell<usize>>` = note: required because it appears within the type `[closure@src/main.rs:41:36: 43:10]`
Attempt 3
使用 Arc<Mutex<T>>
结构
extern crate rand;use std::sync::{Arc, Mutex};
use std::{thread, time};use rand::Rng;
fn handleCall() { let mut rng = rand::thread_rng(); thread::sleep(time::Duration::from_millis(rng.gen_range(0, 10)));}
fn takeBreak() { let mut rng = rand::thread_rng(); thread::sleep(time::Duration::from_millis(rng.gen_range(0, 10)));}
fn shouldTakeBreak() -> bool { rand::random()}
fn ticketAgent(id: usize, remainingTickets: Arc<Mutex<usize>>) { loop { let mut remainingTicketsRef = remainingTickets.lock().unwrap(); if (*remainingTicketsRef == 0) { break; } handleCall(); *remainingTicketsRef -= 1; println!("Agent #{} sold a ticket! ({} more to be sold)", id, *remainingTicketsRef); if shouldTakeBreak() { takeBreak(); } } println!("Agent #{} notices all tickets are sold, and goes home!", id);}
fn main() { let remainingTickets: Arc<Mutex<usize>> = Arc::new(Mutex::new(250));
let mut threads = Vec::new(); for i in 0..10 { let remainingTicketsRef = remainingTickets.clone(); threads.push(thread::spawn(move || { ticketAgent(i, remainingTicketsRef); })); } // wait for all the threads to finish for handle in threads { handle.join().expect("Panic occurred in thread!"); } println!("End of business day!");}
可以认为 Arc 为 Rc 的线程安全版本,Mutex 为 RefCell 的线程安全版本
- 前一个线程安全是为了原子的修改引用计数,例如使用原子指令
- 后一个线程安全是为了原子的修改数据,调用
lock
获得独占权,当离开作用域时则自动解锁
Marker traits
you don’t implement functions for them, they serve a symbolic purpose
- Send: Transfer ownership (move) between threads
- Sync: Allow this thing to be referenced from multiple threads
for more
#11: Synchronization
介绍了条件变量
与互斥量绑定 Arc<(Mutex<T>, Condvar)>
extern crate rand;use std::sync::{Arc, Mutex, Condvar};
use std::{thread, time};use std::collections::VecDeque;use rand::Rng;
fn rand_sleep() { let mut rng = rand::thread_rng(); thread::sleep(time::Duration::from_millis(rng.gen_range(0, 30)));}
#[derive(Clone)]pub struct SemaPlusPlus<T> { queue_and_cv: Arc<(Mutex<VecDeque<T>>, Condvar)>,}
impl<T> SemaPlusPlus<T> { pub fn new() -> Self { SemaPlusPlus {queue_and_cv: Arc::new((Mutex::new(VecDeque::new()), Condvar::new()))} }
// Enqueues -- Like semaphore.signal() pub fn send(&self, message: T) { let (queue_lock, cv) = &*self.queue_and_cv; let mut queue = queue_lock.lock().unwrap(); let queue_was_empty = queue.is_empty(); queue.push_back(message); if !queue_was_empty { cv.notify_all(); } }
// Dequeues -- Like semaphore.wait() pub fn recv(&self) -> T { let (queue_lock, cv) = &*self.queue_and_cv; // Wait until there is something to dequeue let mut queue = cv.wait_while(queue_lock.lock().unwrap(), |queue| { queue.is_empty() }).unwrap(); // Should return Some(...) because we waited queue.pop_front().unwrap() }}
const NUM_THREADS: usize = 12;fn main() { // Inspired by this example https://doc.rust-lang.org/stable/rust-by-example/std_misc/channels.html let sem: SemaPlusPlus<String> = SemaPlusPlus::new(); let mut handles = Vec::new(); for i in 0..NUM_THREADS { let sem_clone = sem.clone(); let handle = thread::spawn(move || { rand_sleep(); sem_clone.send(format!("thread {} just finished!", i)) }); handles.push(handle); } for _ in 0..NUM_THREADS { println!("{}", sem.recv()) }
for handle in handles { handle.join().unwrap(); }}
Week 5 Exercises: Farm meets multithreading
完全是被编译器牵着走……
fn pop_number(queue: &Mutex<VecDeque<u32>>) -> Option<u32> { let mut queue = queue.lock().unwrap(); queue.pop_front()}
fn main() { let num_threads = num_cpus::get(); println!("Farm starting on {} CPUs", num_threads); let start = Instant::now();
// TODO: call get_input_numbers() and store a queue of numbers to factor let queue = Arc::new(Mutex::new(get_input_numbers())); let mut threads = Vec::new();
// TODO: spawn `num_threads` threads, each of which pops numbers off the queue and calls // factor_number() until the queue is empty for _ in 0..num_threads { let queue = queue.clone(); threads.push(thread::spawn(move || loop { let num = pop_number(queue.borrow()); if num.is_some() { factor_number(num.unwrap()); } else { break; } })) }
// TODO: join all the threads you created for handle in threads { handle.join().expect("Panic occurred in thread!"); }
println!("Total execution time: {:?}", start.elapsed());}
思路
- 首先 queue 必然是
Arc<Mutex<VecDeque<u32>>>
- 然后在循环中
clone
- 由于
clone
出的不是 reference - 并且
Arc<Mutex<VecDeque<u32>>>
并未实现Copy
trait - 因为需要在
spawn
中调用pop_number
- 若
pop_number
的参数类型包含Arc
,就会发生 move- 注意
move
关键字是不可避免的
- 注意
- 所以
pop_number
的参数类型不应该包含Arc
- 由于
- 接着在
clone
的 queue 上调用borrow
得到&Mutex<VecDeque<u32>>
- 最后在
pop_number
中上锁得到VecDeque<u32>
- 得到
u32
后交由factor_number
分解即可
- 得到
测试
$ cargo run 12345678 12346789 34567890 45678902 123456 234567 345678 456789 678912 20125001
注意数字是通过命令行传入的,这比从标准输入读取简化了许多
#12: Channels
如何避免数据竞争,不使用共享内存
Effective Go:
Do not communicate by sharing memory; instead, share memory by communicating.
如果不使用共享内存,在线程之间传递数据就需要考虑拷贝的开销
折中的方案是只拷贝指针,也就是浅拷贝,利用智能指针可以保证安全性
关键问题在于如何高效的实现 Channels
- 类似信号量的实现
- 参考
SemaPlusPlus
- golang 使用了 futex
- 参考
- 考虑 lock-free programming techniques
- rust 的第三方库 crossbeam
利用 crossbeam 实现 Week 5 练习中的 Farm
加强为从标准输入读取
fn main() { let (sender, receiver) = crossbeam::channel::unbounded();
let mut threads = Vec::new(); for _ in 0..num_cpus::get() { let receiver = receiver.clone(); threads.push(thread::spawn(move || { while let Ok(next_num) = receiver.recv() { factor_number(next_num); } })); }
let stdin = std::io::stdin();
for line in stdin.lock().lines() { let num = line.unwrap().parse::<u32>().unwrap(); sender .send(num) .expect("Tried writing to channel, but there are no receivers!"); }
drop(sender);
for thread in threads { thread.join().expect("Panic occurred in thread"); }}
注意 drop(sender)
是为了关闭 Channels 的一端,从而使 receiver.recv()
返回 Err
类似于 read pipe 返回 EOF
Week 6 Exercises: Sharing Data by Communicating
先给出参考实现
fn parallel_map<T, U, F>(input_vec: Vec<T>, num_threads: usize, f: F) -> Vec<U>where F: FnOnce(T) -> U + Send + Copy + 'static, T: Send + 'static, U: Send + 'static + Default,{ let mut output_vec: Vec<U> = Vec::with_capacity(input_vec.len()); output_vec.resize_with(input_vec.len(), Default::default);
// TODO: implement parallel map! let (sender, receiver) = crossbeam::channel::unbounded(); let (sender_re, receiver_re) = crossbeam::channel::unbounded();
let mut threads = Vec::with_capacity(num_threads); for _ in 0..num_threads { let receiver = receiver.clone(); let sender_re = sender_re.clone(); threads.push(thread::spawn(move || { while let Ok(next_elem) = receiver.recv() { let (index, elem) = next_elem; sender_re .send((index, f(elem))) .expect("Tried writing to channel, but there are no receivers!"); } })); }
let mut index = 0; for elem in input_vec { sender .send((index, elem)) .expect("Tried writing to channel, but there are no receivers!"); index = index + 1; }
drop(sender);
for thread in threads { thread.join().expect("Panic occurred in thread"); }
drop(sender_re);
while let Ok(next_elem) = receiver_re.recv() { let (index, elem) = next_elem; output_vec[index] = elem; }
output_vec}
这一堆 traits 约束令人头大
FnOnce
只能被调用一次Default
表明类型存在默认值'static
可以参考 https://doc.rust-lang.org/book/ch10-03-lifetime-syntax.html
思路
- 由于只考虑使用 Channels,并且在线程中无法操作
output_vec
- 因为
move
关键字和Vec<U>
并未实现Copy
trait
- 因为
- 所以只能在主线程中操作
output_vec
- 于是考虑使用两个 Channels,其结构如下
其中 sender 和 receiver_re 在主线程中
另外,由于线程的调度是不确定的
考虑到 Vec
随机读写的性能,在通信时传输元素和下标的 tuple
需要小心 Vec
的初始化
let mut output_vec: Vec<U> = Vec::with_capacity(input_vec.len());output_vec.resize_with(input_vec.len(), Default::default);
Vec::with_capacity
只能保证 capacity,而 len 为 0
为此需要 resize
with default
此时 Default
trait 就发挥了作用
查看系统允许的最大线程数和进程数
$ cat /proc/sys/kernel/threads-max$ cat /proc/sys/kernel/pid_max$ man 5 proc
TODO
- Optional Extensions
- 没太看懂要做什么
#13: Scalability and Availability
Scalability and Availability
Load balancers
持久存储的部分委托给分布式数据库处理
使用负载均衡器将 client 请求分派给不同的服务器
Load balance your load balancers
多个负载均衡器
- Round-robin DNS
$ dig +noall +answer bilibili.combilibili.com. 1 IN A 119.3.238.64bilibili.com. 1 IN A 8.134.50.24bilibili.com. 1 IN A 39.105.216.211bilibili.com. 1 IN A 119.3.70.188bilibili.com. 1 IN A 139.159.241.37bilibili.com. 1 IN A 47.103.24.173bilibili.com. 1 IN A 120.92.78.97
- Geographic routing with DNS
#15: Futures I
Non-blocking I/O
Epoll Syscall
Future
- The result of a computation that may or may not have completed.
- In Rust, futures are structs that implement the Future trait.
Executors
Combining futures
- map
- join
Async/Await
- syntactic sugar
- transform your chain of async computation (i.e. futures) into an efficient state machine
#16: Futures II
- ThreadPool Link Explorer
- Async Link Explorer
- tokio 版本太老了,于是做了一些修改
- 速度似乎还不如 ThreadPool
Project 2: Balancebeam
负载均衡的代理服务器
更新 clap 第三方库的使用
https://github.com/clap-rs/clap/blob/v3.1.18/examples/derive_ref/README.md
测试
$ cargo run -- --upstream 171.67.215.200:80 Compiling balancebeam v0.1.0 (/home/vgalaxy/Desktop/cs110l-spr-2020-starter-code/proj-2/balancebeam) Finished dev [unoptimized + debuginfo] target(s) in 1.85s Running `target/debug/balancebeam --upstream '171.67.215.200:80'` INFO balancebeam > Listening for requests on 0.0.0.0:1100 INFO balancebeam > Connection received from 127.0.0.1 INFO balancebeam > 127.0.0.1 -> 127.0.0.1: GET /class/cs110l/ HTTP/1.1 DEBUG balancebeam > Forwarded request to server INFO balancebeam > 127.0.0.1 <- HTTP/1.1 200 OK DEBUG balancebeam > Forwarded response to client INFO balancebeam > 127.0.0.1 -> 127.0.0.1: GET /favicon.ico HTTP/1.1 DEBUG balancebeam > Forwarded request to server INFO balancebeam > 127.0.0.1 <- HTTP/1.1 200 OK DEBUG balancebeam > Forwarded response to client DEBUG balancebeam > Client finished sending requests. Shutting down connection
http://localhost:1100/class/cs110l/
Milestone 0: Read the starter code
RTFSC
实验指南中讲述的十分详细
main.rs
- listening
handle_connection
(from client)connect_to_upstream
- loop
- read a request from the client
- relay the request to the upstream server
- read the response from the upstream server
- forward the response to the client
request.rs
andresponse.rs
read_from_stream
read_headers
read_body
write_to_stream
一些第三方库
- clap 解析命令行输入
- log 提供日志功能
本地测试
$ cargo test --test 01_single_upstream_tests$ cargo test --test 02_multiple_upstream_tests test_load_distribution
Milestone 1: Add multithreading
直接使用第三方库 threadpool
需要让 ProxyState
derive Clone
let pool = ThreadPool::new(8);
for stream in listener.incoming() { if let Ok(stream) = stream { // Handle the connection! let state_cloned = state.clone(); pool.execute(move || { handle_connection(stream, &state_cloned); }); }}
Milestone 2: Use asynchronous I/O
为了不让测试代码 crash,使用老版本的 tokio
实验指南给出了详尽的修改方案
std::net::{TcpListener, TcpStream}
to tokio::net::{TcpListener, TcpStream}
std::io::{Read, Write}
totokio::io::{AsyncReadExt, AsyncWriteExt}
- API 的名称没有发生变化
- 只不过变成了异步版本
- 在异步函数调用后添加
.await
- 可以认为返回一个
Result
- 于是可以使用
?
语法 - 参考 Rust 的 async/await 语法是怎样工作的
- 可以认为返回一个
- 使用
await
的函数需要标记为async
- 调用这些函数也需要添加
.await
- 级联效应
- 调用这些函数也需要添加
- 不使用 ThreadPool,而是使用
task::spawn
while let Some(stream) = listener.incoming().next().await { match stream { Ok(stream) => { let state_cloned = state.clone(); task::spawn(async move { handle_connection(stream, &state_cloned).await; }); } Err(err) => { log::error!("connection failed: {}", err); } }}
考虑是否必要在 task::spawn
后添加 .await
TFM 中并没有这样做
对比 Async Link Explorer
Milestone 3: Failover with passive health checks
At this point, your load balancer should be pretty scalable.
Let’s work on adding some more availability to our infrastructure.
注意到在 connect_to_upstream
的实现中
若随机选择的 upstream 连接失败则直接返回
为了增加可靠性,当连接失败时尝试连接其他的 upstream
为此在 ProxyState 中添加字段
upstream_states: Vec<bool>
并修改 connect_to_upstream
async fn connect_to_upstream(state: Arc<Mutex<ProxyState>>) -> Result<TcpStream, std::io::Error> { // TODO: implement failover (milestone 3) let mut state_ref = state.lock().await; if !state_ref.upstream_states.contains(&true) { log::error!("Failed to connect: all upstreams are dead"); return Err(std::io::Error::new(ErrorKind::Other, "oh no!")); }
let mut rng = rand::rngs::StdRng::from_entropy(); loop { let upstream_idx = rng.gen_range(0, state_ref.upstream_addresses.len()); if !state_ref.upstream_states[upstream_idx] { continue; } let upstream_ip = &state_ref.upstream_addresses[upstream_idx];
match TcpStream::connect(upstream_ip).await { Ok(stream) => { return Ok(stream); } Err(err) => { log::warn!("Failed to connect to upstream {}: {}", upstream_ip, err); state_ref.upstream_states[upstream_idx] = false; continue; } } }}
将参数类型修改为 Arc<Mutex<ProxyState>>
使用 tokio::sync::Mutex
保持独占性
也可以使用 tokio::sync::RwLock
,不过写起来比较麻烦……
Milestone 4: Failover with active health checks
passive health checks 存在一些缺陷
- 上游服务器仍然可以建立连接,但它们无法为请求提供服务
- passive health checks 无法检测出这种情况
- 上游服务器因为某些原因暂时停机,过了一会儿后又恢复了
- passive health checks 标记恢复的上游服务器
于是考虑实现一个异步函数 active_health_check
然后在 tokio::main
中单独开启 spawn 一个异步任务即可
active_health_check
实现如下
async fn active_health_check(state: Arc<Mutex<ProxyState>>) { let interval; let path; let upstream_addresses;
{ let state_ref = state.lock().await; interval = state_ref.active_health_check_interval; path = state_ref.active_health_check_path.clone(); upstream_addresses = state_ref.upstream_addresses.clone(); } // release lock
loop { delay_for(Duration::from_secs(interval as u64)).await;
for upstream_idx in 0..upstream_addresses.len() { let mut upstream_state = false; let upstream_ip = &upstream_addresses[upstream_idx];
match TcpStream::connect(upstream_ip).await { Ok(mut stream) => { let request = http::Request::builder() .method(http::Method::GET) .uri(path.clone()) .header("Host", upstream_ip) .body(Vec::new()) .unwrap(); if let Err(error) = request::write_to_stream(&request, &mut stream).await { log::error!( "Failed to send request (active health check) to upstream {}: {:?}", upstream_ip, error ); // do nothing } match response::read_from_stream(&mut stream, request.method()).await { Ok(response) => { upstream_state = response.status() == StatusCode::OK; } Err(error) => { log::error!( "Error reading response (active health check) from server: {:?}", error ); // do nothing } } } Err(error) => { log::error!( "Error establishing connection with server {}: {:?}", upstream_ip, error ); // do nothing } }
log::info!( "Active health check result: upstream {} -> {}", upstream_ip, upstream_state );
{ let mut state_ref = state.lock().await; state_ref.upstream_states[upstream_idx] = upstream_state; } } }}
遍历上游服务器,对每个服务器尝试建立连接,并发送请求,观察响应是否正常
注意及时的释放锁
Milestone 5: Rate limiting
考虑到可能发生如下的情形
- 分布式拒绝服务攻击
- 凭证填充攻击
需要限制客户端发送请求的次数
考虑使用 fixed window algorithm
注意到同一个客户端可能多次 connect_to_upstream
所以计数器和计时器不应该是 connect_to_upstream
的内部状态
而应该是 ProxyState 的一部分
为此在 ProxyState 中添加字段
client_requests_map: HashMap<String, (Instant, usize)>
然后在 connect_to_upstream
添加相关代码
// Rate limiting{ let state_cloned = state.clone(); let mut state_ref = state_cloned.lock().await; let limits = state_ref.max_requests_per_minute; let map_cloned = state_ref.client_requests_map.clone(); if limits != 0 { match map_cloned.get(&client_ip) { None => { state_ref.client_requests_map.insert(client_ip.clone(), (Instant::now(), 1)); } Some((when, counter)) => { let mut count = counter.clone(); if when.elapsed().as_secs() >= 60 { count = 0; }
if count >= limits { let response = response::make_http_error(http::StatusCode::TOO_MANY_REQUESTS); send_response(&mut client_conn, &response).await; log::debug!("Forwarded response `TOO_MANY_REQUESTS` to client"); continue; } else { count += 1; }
state_ref.client_requests_map.insert(client_ip.clone(), (*when, count)); } } }}
注意 tokio::time::Instant::elapsed
方法返回自 instant 创建经过的时间,因而实现了计时器的功能
仍需小心 Rust 的内存模型,以及 HashMap 的各种 reference 类型
Extensions
- Connection pooling
代理服务器维护与上流服务器的连接
- Better rate limiting
https://konghq.com/blog/how-to-design-a-scalable-rate-limiting-algorithm
- Other load balancing algorithms
目前的算法为随机选择
https://www.haproxy.com/blog/power-of-two-load-balancing/
- Caching
- Web Application Firewall