TOC
Open TOC
Info
https://www.bilibili.com/video/BV1Ra411A7kN
https://reberhardt.com/cs110l/spring-2020/
https://zhuanlan.zhihu.com/p/356912397
why rust
Rust
lang
https://www.rust-lang.org/zh-CN/
IDE
在 Arch Linux 下配置 Rust 开发环境
IDEA - rust - without debug
CLion - rust - with debug
cli
$ cargo new demo
$ cd demo
$ cargo run
#02: Memory Safety
Ownership and mutability
fn om_nom_nom ( param : String ) {
println! ( "{}" , param );
}
fn main () {
let s = String :: from ( "hello" );
om_nom_nom ( s );
om_nom_nom ( s );
}
参数类型未声明为引用,且 String
未实现 Copy
trait
故第一次调用 om_nom_nom
时,s
已被移动
fn om_nom_nom ( param : u32 ) {
println! ( "{}" , param );
}
fn main () {
let x = 1 ;
om_nom_nom ( x );
om_nom_nom ( x );
}
u32
实现了 Copy
trait
References
fn main () {
let mut s = String :: from ( "hello" );
let s1 = & mut s ;
let s2 = & s ;
println! ( "{} {} {}" , s , s1 , s2 );
}
当 mutable reference 没有返回时,不允许 multiple references
fn main () {
let mut s = String :: from ( "hello" );
let s1 = & mut s ;
println! ( "{} {}" , s , s1 );
}
s1
对 s
的 mutable reference 还没有返回
fn main () {
let mut s = String :: from ( "hello" );
let s1 = & mut s ;
println! ( "{}" , s1 );
println! ( "{}" , s )
}
正确
Week 1 Exercises: Hello world
basic
string
&str
- immutable
String
- mutable
for iter
iter
iter_mut
loop
https://stackoverflow.com/q/28892351
fn
fn fib ( n : i32 ) -> i32 {
if n <= 1 { n } else { fib ( n - 1 ) + fib ( n - 2 ) }
}
in Rust, everything is an expression
no semicolon
no need to return, evaluating to the value of the last expression
expressions separated by semicolons
specify the return type for functions that return values
part-3-hangman
刚用 ruby 写过 hangman
由于对 string
的处理需要了解更多 rust 的语法
使用 Vec<char>
作为主力数据结构
let secret_word_chars : Vec < char > = secret_word . chars (). collect ();
无间隔打印 Vec<char>
secret_word_chars . iter (). fold ( String :: new (), | acc , & char | acc + &* char . to_string ())
基本的 io
io :: stdout ()
. flush ()
. expect ( "Error flushing stdout." );
let mut guess = String :: new ();
io :: stdin ()
. read_line (& mut guess )
. expect ( "Error reading line." );
for loop 还可以这样写
for i in 0 .. secret_word_chars . len () {
...
}
再次理解 References
contains
- borrow - has &
push
- move - no &
#03: Error Handling
Handling nulls
Option
Handling errors
Result
code
extern crate rand;
use rand :: Rng ;
fn get_random_num () -> u32 {
rand :: thread_rng (). gen_range ( 0 , 42 )
}
fn feeling_lucky () -> Option < String > {
if get_random_num () > 10 {
Some ( String :: from ( "I'm feeling lucky!" ))
} else {
None
}
}
fn poke_toddler () -> Result <&' static str , &' static str > {
if get_random_num () > 10 {
Ok ( "Hahahaha!" )
} else {
Err ( "Waaaaahhh!" )
}
}
fn main () {
if feeling_lucky (). is_none () {
println! ( "Not feeling lucky :(" )
}
println! (
"{}" ,
feeling_lucky (). unwrap_or ( String :: from ( "Not lucky :(" ))
);
match feeling_lucky () {
Some ( message ) => {
println! ( "Got message: {}" , message )
}
None => {
println! ( "No message returned :-/" )
}
}
match poke_toddler () {
Ok ( message ) => println! ( "Toddler said: {}" , message ),
Err ( cry ) => println! ( "Toddler cried: {}" , cry ),
}
// Panic if the baby cries:
println! ( "{}" , poke_toddler (). unwrap ());
// Same thing, but print a more descriptive panic message:
println! ( "{}" , poke_toddler (). expect ( "Toddler cried :(" ));
panic! ( "Sad times!" );
}
需要在 Cargo.toml
中添加
[dependencies]
rand = "0.6.0"
#04: Object Oriented Rust
实现一个 LinkedList
后进先出
code
extern crate core;
use core ::fmt;
struct Node {
value : u32 ,
next : Option < Box < Node >>, // no null pointers in Rust!
}
pub struct LinkedList {
head : Option < Box < Node >>,
size : usize ,
}
impl Node {
fn new ( value : u32 , next : Option < Box < Node >>) -> Node {
Node { value : value , next : next }
}
}
impl LinkedList {
pub fn new () -> LinkedList {
LinkedList { head : None , size : 0 }
}
pub fn get_size (& self ) -> usize {
self .size
}
pub fn is_empty (& self ) -> bool {
self .size == 0
}
pub fn push (& mut self , value : u32 ) {
let new_node = Box :: new ( Node :: new ( value , self .head. take ()));
self .head = Some ( new_node );
self .size += 1 ;
}
pub fn pop (& mut self ) -> Option < u32 > {
let node = self .head. take ()?;
self .head = node .next;
self .size -= 1 ;
Some ( node .value)
}
pub fn display (& self ) {
let mut current : & Option < Box < Node >> = & self .head;
let mut result = String :: from ( "list ->" );
loop {
match current {
Some ( node ) => {
result = format! ( "{} {}" , result , node .value);
current = & node .next;
},
None => break ,
}
}
println! ( "{}" , result );
}
}
impl fmt :: Display for LinkedList {
fn fmt (& self , f : & mut fmt :: Formatter <' _ >) -> fmt :: Result {
let mut current : & Option < Box < Node >> = & self .head;
let mut result = String :: from ( "list ->" );
loop {
match current {
Some ( node ) => {
result = format! ( "{} {}" , result , node .value);
current = & node .next;
},
None => break ,
}
}
write! ( f , "{}" , result )
}
}
impl Drop for LinkedList {
fn drop (& mut self ) {
let mut current = self .head. take ();
while let Some ( mut node ) = current {
current = node .next. take ();
}
}
}
一些细节
对指针的表示 Option<Box<Node>>
Box
类似于 std::unique_ptr
使用 Option
来处理 null
类似 go-lang,访问 pointer/reference 的结构成员直接使用 .
即可
push
成员函数中使用了 self.head.take()
而非 self.head
由于是 mutable reference,不能直接 move 其成员
使用 take
试图 move 其 Some 的部分,留下 None
的部分
pop 成员函数在 take
后添加了 ?
进行 unwrap
若为 None
则返回 None
propagate errors
Display
trait 使其支持直接 println!("{}", list);
Drop
trait 在 loop 中使用了模式匹配
Week 2 Exercises: Ownership and structs
Part 1: Ownership short-answer exercises
fn main () {
let mut s = String :: from ( "hello" );
let ref1 = & s ;
let ref2 = & ref1 ;
let ref3 = & ref2 ;
s = String :: from ( "goodbye" );
println! ( "{}" , ref3 . to_uppercase ());
}
在赋值后 immutable reference 仍在使用
fn drip_drop () -> & String {
let s = String :: from ( "hello world!" );
return & s ;
}
函数返回,生命期结束
可以改为
static S : &' static str = "hello world!" ;
fn drip_drop () -> &' static str {
return & S ;
}
改变其类型为 &str
同时添加 'static
修饰
fn main () {
let s1 = String :: from ( "hello" );
let mut v = Vec :: new ();
v . push ( s1 );
let s2 : String = v [ 0 ];
println! ( "{}" , s2 );
}
String
未实现 Copy
trait
不能从 Vec 中 move 其元素
Part 2: rdiff
算法部分,参考
https://en.wikipedia.org/wiki/Longest_common_subsequence_problem#Print_the_diff
主要谈一谈错误处理
?
operator - recoverable error
let file = File :: open ( filename )?;
等价于
let file = match File :: open ( filename ) {
Ok ( file ) => file ,
Err ( err ) => return Err ( err ),
};
一旦出现 Err
,则将 Err
传播给调用者
unwrap
and expect
- unrecoverable error
一旦出现 Err
,则直接 panic
Optional: rwc
略……
Optional challenge: Conway’s Game of Life
thread 'main' panicked at 'could not build default app window: NoAvailableAdapter'
似乎和显卡驱动相关
暂时放弃……
#05: Traits and Generics
trait ≈ interface + default method
Syntax
impl xxx for xxx
Deriving Traits
#[derive( Debug , PartialEq , Clone , Copy )]
struct Point {
x : f64 ,
y : f64
}
The following is a list of derivable traits:
Comparison traits: Eq
, PartialEq
, Ord
, PartialOrd
.
Clone
, to create T
from &T
via a copy.
Copy
, to give a type ‘copy semantics’ instead of ‘move semantics’.
Hash
, to compute a hash from &T
.
Default
, to create an empty instance of a data type.
Debug
, to format a value using the {:?}
formatter.
Clone
和 Copy
配套使用
Eq
相较于 PartialEq
多了自反性
PartialEq
for f64
: NaN != NaN
Defining Your Own Traits
pub trait ComputeNorm {
fn compute_norm (& self ) -> f64 {
0.0 // default implementations
}
}
impl ComputeNorm for Point {
fn compute_norm (& self ) -> f64 {
( self .x * self .x + self .y * self .y). sqrt ()
}
}
Associated Type
impl Add for Point {
type Output = Self ; // associated type
fn add ( self , other : Self ) -> Self {
Point :: new ( self .x + other .x, self .y + other .y)
}
}
有点费解
联系泛型的版本
impl < T : Add < Output = T >> Add for Point < T > {
type Output = Self ;
fn add ( self , other : Self ) -> Self :: Output {
Self {
x : self .x + other .x,
y : self .y + other .y,
}
}
}
https://doc.rust-lang.org/rust-by-example/generics/assoc_items/types.html
Generics
实例化特定类型
T
为类型参数,注意约束有两种写法
use std ::fmt;
pub struct MatchingPair < T > {
first : T ,
second : T
}
impl < T > MatchingPair < T > {
pub fn new ( first : T , second : T ) -> Self {
MatchingPair { first : first , second : second }
}
}
impl < T > fmt :: Display for MatchingPair < T > where T : fmt :: Display {
fn fmt (& self , f : & mut fmt :: Formatter <' _ >) -> fmt :: Result {
write! ( f , "({}, {})" , self .first, self .second)
}
}
impl < T : Clone > Clone for MatchingPair < T > {
fn clone (& self ) -> Self {
Self {
first : self .first. clone (),
second : self .second. clone ()
}
}
}
Trait Syntax in Functions and Composing Traits
泛型函数和 Traits 的组合
fn identity_fn < T >( x : T ) -> T { x }
// An example of trait composition -- T must impl Display and PartialOrd
fn print_min < T : fmt :: Display + PartialOrd >( x : T , y : T ) {
if x < y {
println! ( "The minimum is {}" , x );
} else {
println! ( "The minimum is {}" , y )
}
}
#06: Smart Pointers
Box<T>
std::unique_ptr
Rc<T>
multiple im mutable references
实现可持久化版本的 LinkedList
pub fn push_front (& self , value : u32 ) -> LinkedList {
let new_node : Rc < Node > = Rc :: new ( Node :: new ( value , self .head. clone ()));
LinkedList { head : Some ( new_node ), size : self .size + 1 }
}
// A tuple in Rust is like a struct -- you can access the zeroth element of
// a tuple name tup by writing "tup.0", you can access the element at index
// 2 by writing "tup.2", etc.
pub fn pop_front (& self ) -> ( Option < LinkedList >, Option < u32 >) {
match & self .head {
Some ( node ) => {
let val = node .value;
let new_head : Option < Rc < Node >> = node .next. clone ();
let new_list = LinkedList { head : new_head , size : self .size - 1 };
( Some ( new_list ), Some ( val ))
},
None => ( None , None )
}
}
几处变化
no mut
no take
, use clone
RefCell<T>
RefCell
allows us to wrap a mutable piece of data behind an immutable reference. We can call borrow
and borrow_mut
on the RefCell
to acquire references to the internal data – the borrowing rules are now enforced at runtime . Therefore, RefCell
is safe, but it’s slightly more inefficient because it shifts what would have been a compile-time check to runtime. You commonly see Rc<RefCell<T>>
, which lets us have a flexible handle to heap allocated data – just like Rc
except now we can mutate things!
Week 3 Exercises: Error handling, I/O, and traits
Part 1: Inspecting File Descriptors
检查进程所打开的文件描述符,其流程如下
pub struct Process {
pub pid : usize ,
pub ppid : usize ,
pub command : String ,
}
通过遍历 /proc/{pid}/fd
得到文件描述符的列表
在 list_fds
中完成
根据进程号和文件描述符得到 OpenFile
信息
pub struct OpenFile {
pub name : String ,
pub cursor : usize ,
pub access_mode : AccessMode ,
}
read_link - /proc/{pid}/fd/{fdnum}
read_to_string - /proc/{pid}/fdinfo/{fdnum}
其中使用了 ANSI escape codes
note
注意在 IDE 中运行 test_openfile_from_fd
测试无法通过
在终端中运行正常
$ ./multi_pipe_test & cargo run multi_pipe_test
需要让进程保持在一段时间内运行
于是调试短时间内运行的程序几乎无解
另一处实现上的细节是 Option
和 Result
的选择
Part 2: A Generic LinkedList
改成泛型非常简单,加一些 <T>
即可
关键在于实现一些 traits
需要同时为 Node<T>
和 LinkedList<T>
实现
impl < T > Clone for Node < T >
where
T : Clone ,
{
fn clone (& self ) -> Self {
Node :: new ( self .value. clone (), self .next. clone ()) // recursion
}
}
impl < T > Clone for LinkedList < T >
where
T : Clone ,
{
fn clone (& self ) -> Self {
let mut res : LinkedList < T > = LinkedList :: new ();
res .head = self .head. clone ();
res .size = self .size;
return res ;
}
}
注意这里 clone
的递归调用
以及 Clone
性质的传播
T -> Node<T> -> Box<Node<T>> -> Option<Box<Node<T>>>
相当 tricky
参考 http://xion.io/post/code/rust-for-loop.html 可知
等价于
let mut iter = IntoIterator :: into_iter ( v );
loop {
match iter . next () {
Some ( x ) => {
// body
},
None => break ,
}
}
注意到 v
和 iter
是 iterables,iter
是 Iterators
对于 LinkedList<T>
而言
an iterator that takes ownership of the list it is iterating over
只需实现 Iterator
trait,其中调用 pop_front
即可
没有 IntoIterator
trait 为什么也可以……
对于 &LinkedList<T>
而言
an iterator that only references elements in the list
比较复杂
pub struct LinkedListIter <' a , T >
where
T : Clone ,
{
current : &' a Option < Box < Node < T >>>,
}
impl <' a , T > IntoIterator for &' a LinkedList < T >
where
T : Clone ,
{
type Item = T ;
type IntoIter = LinkedListIter <' a , T >;
fn into_iter ( self ) -> Self :: IntoIter {
LinkedListIter {
current : & self .head,
}
}
}
impl < T > Iterator for LinkedListIter <' _ , T >
where
T : Clone ,
{
type Item = T ;
fn next (& mut self ) -> Option < Self :: Item > {
match self .current {
Some ( node ) => {
self .current = & node .next;
Some ( node .value. clone ())
}
None => None ,
}
}
}
需要定义数据结构 LinkedListIter
其中使用了生命期语法
This syntax essentially says that the struct lives as long as the reference it contains, so that we don’t have issues with dangling pointers.
在 next 中使用 clone
以确保 immutable reference
对于 &mut LinkedList<T>
而言
就更复杂了……
#07: Pitfalls in Multiprocessing
fork
fork + exec
use a higher-level abstraction
https://doc.rust-lang.org/std/process/struct.Command.html
pipe
https://stackoverflow.com/questions/49218599/write-to-child-process-stdin-in-rust/
https://docs.rs/os_pipe/latest/os_pipe/
signal
在信号处理程序中应考虑共享变量
以及使用 signal-safety 的函数
https://man7.org/linux/man-pages/man7/signal-safety.7.html
printf 和 malloc 都不是 signal-safety 的
另外,信号处理程序不应该阻塞,因为会独占处理器
使用 The self-pipe trick,避免使用信号处理程序
Create a pipe
When you’re awaiting a signal, read from the pipe (this will block until something is written to it)
In the signal handler, write a single byte to the pipe
官方支持 https://man7.org/linux/man-pages/man2/signalfd.2.html
然而这让程序减少了并发度
either doing work, or reading to wait for a signal
为了解决这个问题,有两种方案
注意到线程是可调度的,而非独占的
仍然存在并发问题,但是 rust 可以控制这些问题
https://github.com/Detegr/rust-ctrlc
Creates a self-pipe
Installs a signal handler that writes to the pipe when SIGINT is received
Spawns a thread: loop { read from pipe; call handler function; }
可以在线程中使用非 signal-safety 的函数
TBD
Project 1: The DEET Debugger
info
实时协作开发 https://visualstudio.microsoft.com/zh-hans/services/live-share/
note
对程序结构进行简单分析
程序入口,其参数为携带调试信息 的可执行程序 路径
抑制主进程对 SIGINT
信号的处理
SIGINT
信号会被传递到子进程
构造 Debugger
对象,并调用其 run
方法
debugger.rs
和 debugger_command.rs
其 run
方法为死循环
通过 get_next_command
得到输入的命令,并解析为 DebuggerCommand
对象
pub enum DebuggerCommand {
Quit ,
Run ( Vec < String >),
Continue ,
Backtrace ,
Break ( String ),
}
其中使用了第三方库 rustyline
,提供了历史命令功能
通过匹配 DebuggerCommand
操作 Inferior
对象,即子进程
使用一些私有辅助函数操作子进程,主要有
create_new_inferior
- 构造 Inferior
对象,并调用 cont_inferior
cont_inferior
- 包装了 Inferior
对象的 cont
方法,使用 handle_status
处理子进程的状态
kill_inferior
- 包装了 Inferior
对象的 kill
方法,使用 handle_status
处理子进程的状态
handle_status
- 处理子进程的状态
pub enum Status {
/// Indicates inferior stopped. Contains the signal that stopped the process, as well as the
/// current instruction pointer that it is stopped at.
Stopped ( signal :: Signal , usize ),
/// Indicates inferior exited normally. Contains the exit status code.
Exited ( i32 ),
/// Indicates the inferior exited due to a signal. Contains the signal that killed the
/// process.
Signaled ( signal :: Signal ),
}
另外,Debugger
对象中还包含调试信息和断点信息,会通过参数传递给 Inferior
对象使用
涉及到大量底层编程 ,下面逐一分析
首先需要明确,子进程是通过调用 wait
得到子进程的状态信息的
所有改变执行流的方法,最后都需要调用 wait
将状态信息返回给调用者
pub fn wait (& self , options : Option < WaitPidFlag >) -> Result < Status , nix :: Error > {
Ok ( match waitpid ( self . pid (), options )? {
WaitStatus :: Exited ( _pid , exit_code ) => Status :: Exited ( exit_code ),
WaitStatus :: Signaled ( _pid , signal , _core_dumped ) => Status :: Signaled ( signal ),
WaitStatus :: Stopped ( _pid , signal ) => {
let regs = ptrace :: getregs ( self . pid ())?;
Status :: Stopped ( signal , regs .rip as usize )
}
other => panic! ( "waitpid returned unexpected status: {:?}" , other ),
})
}
new
使用 Command
模块创建子进程
并在 pre_exec
中使用 ptrace
系统调用追踪子进程
注意使用这种方法成功加载子进程后,子进程会收到 SIGTRAP
信号
在此之后安装断点信息,也就是将断点位置的字节改为 0xcc
,从而触发 SIGTRAP
关于断点的更多实现细节,见下文
cont
更新断点信息,并判断当前 rip
是否指向某个断点位置的下一个字节
回顾 取指 - 译码 - 执行 - 更新 rip
若是,相当于在触发断点之后 cont
,需要重新执行断点位置的指令
于是写回原本的字节,并 ptrace::step
执行单步指令
若程序未结束执行,则再次在断点位置的字节改为 0xcc
,以保证断点仍然有效
最后 ptrace::cont
即可
kill
ptrace::kill
即可
别忘了调用 wait
,否则会出现僵死进程 <defunct>
print_backtrace
需要通过栈帧信息进行回溯
于是需要如下的编译选项
-O0 -g -no-pie -fno-omit-frame-pointer
如何回溯,看图易知
dwarf_data.rs
和 gimli_wrapper.rs
注意到调试信息通常是以 DWARF
的格式存储的
dwarf_data.rs
使用 gimli_wrapper.rs
提取了如下调试信息
行号和地址之间的映射
函数名和地址之间的映射
变量信息
由于一些原因,实际测试发现没有行号和地址之间的映射
并且无法解析库函数,如 printf
使用了第三方库 https://github.com/gimli-rs/gimli
在本项目中,调试信息的实际作用有二
在子进程处于 Status::Stopped
时,或进行回溯时,通过 rip
解析出行号和函数名
在打断点时,通过行号或函数名解析出断点地址
todo
SIGSEGV
后连续 cont
会反复 SIGSEGV
,行为与 gdb 不一致
$ gdb samples/segfault
Reading symbols from samples/segfault...
(gdb) r
Starting program: /home/vgalaxy/Desktop/cs110l-spr-2020-starter-code/proj-1/deet/samples/segfault
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/usr/lib/libthread_db.so.1".
Calling func2
About to segfault... a=2
Program received signal SIGSEGV, Segmentation fault.
0x0000000000401162 in func2 (a=2) at samples/segfault.c:5
5 *(int*)0 = a;
(gdb) c
Continuing.
Program terminated with signal SIGSEGV, Segmentation fault.
The program no longer exists.
(gdb)
更新 gimli 库的使用,提取更多的调试信息
Optional extensions
#09: Intro to Multithreading
使用 move
关键字拷贝下标的值
use std ::thread;
const NAMES : [& str ; 7 ] = [ "Frank" , "Jon" , "Lauren" , "Marco" , "Julie" , "Patty" ,
"Tagalong Introvert Jerry" ];
fn main () {
let mut threads = Vec :: new ();
for i in 0 .. 6 {
threads . push ( thread :: spawn ( move || {
println! ( "Hello from printer {}!" , NAMES [ i ]);
}));
}
// wait for all the threads to finish
for handle in threads {
handle . join (). expect ( "Panic occurred in thread!" );
}
}
否则会报错
closure may outlive the current function, but it borrows `i`, which is owned by the current function
也就是闭包的生命期可能比下标的生命期长
#10: Shared Memory
考虑共享变量是一个可变引用
Attempt 1
直接使用 move
关键字
extern crate rand;
use std ::{thread, time};
use rand :: Rng ;
fn handleCall () {
let mut rng = rand :: thread_rng ();
thread :: sleep ( time :: Duration :: from_millis ( rng . gen_range ( 0 , 1000 )));
}
fn takeBreak () {
let mut rng = rand :: thread_rng ();
thread :: sleep ( time :: Duration :: from_millis ( rng . gen_range ( 0 , 1000 )));
}
fn shouldTakeBreak () -> bool {
rand :: random ()
}
fn ticketAgent ( id : usize , remainingTickets: & mut usize ) {
while *remainingTickets > 0 {
handleCall ();
*remainingTickets -= 1 ;
println! ( "Agent #{} sold a ticket! ({} more to be sold)" ,
id , remainingTickets);
if shouldTakeBreak () {
takeBreak ();
}
}
println! ( "Agent #{} notices all tickets are sold, and goes home!" , id );
}
fn main () {
let mut remainingTickets = 250 ;
let mut threads = Vec :: new ();
for i in 0 .. 10 {
threads . push ( thread :: spawn ( move || {
ticketAgent ( i , & mut remainingTickets)
}));
}
// wait for all the threads to finish
for handle in threads {
handle . join (). expect ( "Panic occurred in thread!" );
}
println! ( "End of business day!" );
}
实际上拷贝了多份,可以通过编译,但是运行结果不正确
Attempt 2
使用 Rc<RefCell<T>>
结构
extern crate rand;
use std :: cell :: RefCell ;
use std :: rc :: Rc ;
use std ::{thread, time};
use rand :: Rng ;
fn handleCall () {
let mut rng = rand :: thread_rng ();
thread :: sleep ( time :: Duration :: from_millis ( rng . gen_range ( 0 , 1000 )));
}
fn takeBreak () {
let mut rng = rand :: thread_rng ();
thread :: sleep ( time :: Duration :: from_millis ( rng . gen_range ( 0 , 1000 )));
}
fn shouldTakeBreak () -> bool {
rand :: random ()
}
fn ticketAgent ( id : usize , remainingTickets: & mut usize ) {
while *remainingTickets > 0 {
handleCall ();
*remainingTickets -= 1 ;
println! ( "Agent #{} sold a ticket! ({} more to be sold)" ,
id , remainingTickets);
if shouldTakeBreak () {
takeBreak ();
}
}
println! ( "Agent #{} notices all tickets are sold, and goes home!" , id );
}
fn main () {
let remainingTickets: Rc < RefCell < usize >> = Rc :: new ( RefCell :: new ( 250 ));
let mut threads = Vec :: new ();
for i in 0 .. 10 {
let remainingTicketsRef = remainingTickets. clone ();
threads . push ( thread :: spawn ( || {
ticketAgent ( i , & mut remainingTicketsRef. borrow_mut ())
}));
}
// wait for all the threads to finish
for handle in threads {
handle . join (). expect ( "Panic occurred in thread!" );
}
println! ( "End of business day!" );
}
在 Rc 上 clone
增加引用计数
在 RefCell 上 borrow_mut
获得可变引用
会报错
error[E0277]: `Rc<RefCell<usize>>` cannot be shared between threads safely
--> src/main.rs:41:22
|
41 | threads.push(thread::spawn(|| {
| ^^^^^^^^^^^^^ `Rc<RefCell<usize>>` cannot be shared between threads safely
|
= help: the trait `Sync` is not implemented for `Rc<RefCell<usize>>`
= note: required because of the requirements on the impl of `Send` for `&Rc<RefCell<usize>>`
= note: required because it appears within the type `[closure@src/main.rs:41:36: 43:10]`
Attempt 3
使用 Arc<Mutex<T>>
结构
extern crate rand;
use std :: sync ::{ Arc , Mutex };
use std ::{thread, time};
use rand :: Rng ;
fn handleCall () {
let mut rng = rand :: thread_rng ();
thread :: sleep ( time :: Duration :: from_millis ( rng . gen_range ( 0 , 10 )));
}
fn takeBreak () {
let mut rng = rand :: thread_rng ();
thread :: sleep ( time :: Duration :: from_millis ( rng . gen_range ( 0 , 10 )));
}
fn shouldTakeBreak () -> bool {
rand :: random ()
}
fn ticketAgent ( id : usize , remainingTickets: Arc < Mutex < usize >>) {
loop {
let mut remainingTicketsRef = remainingTickets. lock (). unwrap ();
if (*remainingTicketsRef == 0 ) {
break ;
}
handleCall ();
*remainingTicketsRef -= 1 ;
println! ( "Agent #{} sold a ticket! ({} more to be sold)" ,
id , *remainingTicketsRef);
if shouldTakeBreak () {
takeBreak ();
}
}
println! ( "Agent #{} notices all tickets are sold, and goes home!" , id );
}
fn main () {
let remainingTickets: Arc < Mutex < usize >> = Arc :: new ( Mutex :: new ( 250 ));
let mut threads = Vec :: new ();
for i in 0 .. 10 {
let remainingTicketsRef = remainingTickets. clone ();
threads . push ( thread :: spawn ( move || {
ticketAgent ( i , remainingTicketsRef);
}));
}
// wait for all the threads to finish
for handle in threads {
handle . join (). expect ( "Panic occurred in thread!" );
}
println! ( "End of business day!" );
}
可以认为 Arc 为 Rc 的线程安全版本,Mutex 为 RefCell 的线程安全版本
前一个线程安全是为了原子的修改引用计数,例如使用原子指令
后一个线程安全是为了原子的修改数据,调用 lock
获得独占权,当离开作用域时则自动解锁
Marker traits
you don’t implement functions for them, they serve a symbolic purpose
Send: Transfer ownership (move) between threads
Sync: Allow this thing to be referenced from multiple threads
for more
#11: Synchronization
介绍了条件变量
与互斥量绑定 Arc<(Mutex<T>, Condvar)>
extern crate rand;
use std :: sync ::{ Arc , Mutex , Condvar };
use std ::{thread, time};
use std :: collections :: VecDeque ;
use rand :: Rng ;
fn rand_sleep () {
let mut rng = rand :: thread_rng ();
thread :: sleep ( time :: Duration :: from_millis ( rng . gen_range ( 0 , 30 )));
}
#[derive( Clone )]
pub struct SemaPlusPlus < T > {
queue_and_cv : Arc <( Mutex < VecDeque < T >>, Condvar )>,
}
impl < T > SemaPlusPlus < T > {
pub fn new () -> Self {
SemaPlusPlus { queue_and_cv : Arc :: new (( Mutex :: new ( VecDeque :: new ()),
Condvar :: new ()))}
}
// Enqueues -- Like semaphore.signal()
pub fn send (& self , message : T ) {
let ( queue_lock , cv ) = &* self .queue_and_cv;
let mut queue = queue_lock . lock (). unwrap ();
let queue_was_empty = queue . is_empty ();
queue . push_back ( message );
if ! queue_was_empty {
cv . notify_all ();
}
}
// Dequeues -- Like semaphore.wait()
pub fn recv (& self ) -> T {
let ( queue_lock , cv ) = &* self .queue_and_cv;
// Wait until there is something to dequeue
let mut queue = cv . wait_while ( queue_lock . lock (). unwrap (), | queue | {
queue . is_empty ()
}). unwrap ();
// Should return Some(...) because we waited
queue . pop_front (). unwrap ()
}
}
const NUM_THREADS : usize = 12 ;
fn main () {
// Inspired by this example https://doc.rust-lang.org/stable/rust-by-example/std_misc/channels.html
let sem : SemaPlusPlus < String > = SemaPlusPlus :: new ();
let mut handles = Vec :: new ();
for i in 0 .. NUM_THREADS {
let sem_clone = sem . clone ();
let handle = thread :: spawn ( move || {
rand_sleep ();
sem_clone . send ( format! ( "thread {} just finished!" , i ))
});
handles . push ( handle );
}
for _ in 0 .. NUM_THREADS {
println! ( "{}" , sem . recv ())
}
for handle in handles {
handle . join (). unwrap ();
}
}
Week 5 Exercises: Farm meets multithreading
完全是被编译器牵着走……
fn pop_number ( queue : & Mutex < VecDeque < u32 >>) -> Option < u32 > {
let mut queue = queue . lock (). unwrap ();
queue . pop_front ()
}
fn main () {
let num_threads = num_cpus :: get ();
println! ( "Farm starting on {} CPUs" , num_threads );
let start = Instant :: now ();
// TODO: call get_input_numbers() and store a queue of numbers to factor
let queue = Arc :: new ( Mutex :: new ( get_input_numbers ()));
let mut threads = Vec :: new ();
// TODO: spawn `num_threads` threads, each of which pops numbers off the queue and calls
// factor_number() until the queue is empty
for _ in 0 .. num_threads {
let queue = queue . clone ();
threads . push ( thread :: spawn ( move || loop {
let num = pop_number ( queue . borrow ());
if num . is_some () {
factor_number ( num . unwrap ());
} else {
break ;
}
}))
}
// TODO: join all the threads you created
for handle in threads {
handle . join (). expect ( "Panic occurred in thread!" );
}
println! ( "Total execution time: {:?}" , start . elapsed ());
}
思路
首先 queue 必然是 Arc<Mutex<VecDeque<u32>>>
然后在循环中 clone
由于 clone
出的不是 reference
并且 Arc<Mutex<VecDeque<u32>>>
并未实现 Copy
trait
因为需要在 spawn
中调用 pop_number
若 pop_number
的参数类型包含 Arc
,就会发生 move
所以 pop_number
的参数类型不应该包含 Arc
接着在 clone
的 queue 上调用 borrow
得到 &Mutex<VecDeque<u32>>
最后在 pop_number
中上锁得到 VecDeque<u32>
得到 u32
后交由 factor_number
分解即可
测试
$ cargo run 12345678 12346789 34567890 45678902 123456 234567 345678 456789 678912 20125001
注意数字是通过命令行传入的,这比从标准输入读取简化了许多
#12: Channels
如何避免数据竞争,不使用共享内存
Effective Go:
Do not communicate by sharing memory; instead, share memory by communicating.
如果不使用共享内存,在线程之间传递数据就需要考虑拷贝的开销
折中的方案是只拷贝指针,也就是浅拷贝,利用智能指针可以保证安全性
关键问题在于如何高效的实现 Channels
类似信号量的实现
参考 SemaPlusPlus
golang 使用了 futex
考虑 lock-free programming techniques
利用 crossbeam 实现 Week 5 练习中的 Farm
加强为从标准输入读取
fn main () {
let ( sender , receiver ) = crossbeam :: channel :: unbounded ();
let mut threads = Vec :: new ();
for _ in 0 .. num_cpus :: get () {
let receiver = receiver . clone ();
threads . push ( thread :: spawn ( move || {
while let Ok ( next_num ) = receiver . recv () {
factor_number ( next_num );
}
}));
}
let stdin = std :: io :: stdin ();
for line in stdin . lock (). lines () {
let num = line . unwrap (). parse ::< u32 >(). unwrap ();
sender
. send ( num )
. expect ( "Tried writing to channel, but there are no receivers!" );
}
drop ( sender );
for thread in threads {
thread . join (). expect ( "Panic occurred in thread" );
}
}
注意 drop(sender)
是为了关闭 Channels 的一端,从而使 receiver.recv()
返回 Err
类似于 read pipe 返回 EOF
Week 6 Exercises: Sharing Data by Communicating
先给出参考实现
fn parallel_map < T , U , F >( input_vec : Vec < T >, num_threads : usize , f : F ) -> Vec < U >
where
F : FnOnce ( T ) -> U + Send + Copy + ' static ,
T : Send + ' static ,
U : Send + ' static + Default ,
{
let mut output_vec : Vec < U > = Vec :: with_capacity ( input_vec . len ());
output_vec . resize_with ( input_vec . len (), Default :: default );
// TODO: implement parallel map!
let ( sender , receiver ) = crossbeam :: channel :: unbounded ();
let ( sender_re , receiver_re ) = crossbeam :: channel :: unbounded ();
let mut threads = Vec :: with_capacity ( num_threads );
for _ in 0 .. num_threads {
let receiver = receiver . clone ();
let sender_re = sender_re . clone ();
threads . push ( thread :: spawn ( move || {
while let Ok ( next_elem ) = receiver . recv () {
let ( index , elem ) = next_elem ;
sender_re
. send (( index , f ( elem )))
. expect ( "Tried writing to channel, but there are no receivers!" );
}
}));
}
let mut index = 0 ;
for elem in input_vec {
sender
. send (( index , elem ))
. expect ( "Tried writing to channel, but there are no receivers!" );
index = index + 1 ;
}
drop ( sender );
for thread in threads {
thread . join (). expect ( "Panic occurred in thread" );
}
drop ( sender_re );
while let Ok ( next_elem ) = receiver_re . recv () {
let ( index , elem ) = next_elem ;
output_vec [ index ] = elem ;
}
output_vec
}
这一堆 traits 约束令人头大
思路
由于只考虑使用 Channels,并且在线程中无法操作 output_vec
因为 move
关键字和 Vec<U>
并未实现 Copy
trait
所以只能在主线程中操作 output_vec
于是考虑使用两个 Channels,其结构如下
flowchart LR
A["sender"]
D["receiver_re"]
A --> B1["receiver"] --> C1["sender_re"] --> D
A --> B2["receiver"] --> C2["sender_re"] --> D
A --> B3["receiver"] --> C3["sender_re"] --> D
A --> B4["receiver"] --> C4["sender_re"] --> D
其中 sender 和 receiver_re 在主线程中
另外,由于线程的调度是不确定的
考虑到 Vec
随机读写的性能,在通信时传输元素和下标 的 tuple
需要小心 Vec
的初始化
let mut output_vec : Vec < U > = Vec :: with_capacity ( input_vec . len ());
output_vec . resize_with ( input_vec . len (), Default :: default );
Vec::with_capacity
只能保证 capacity,而 len 为 0
为此需要 resize
with default
此时 Default
trait 就发挥了作用
查看系统允许的最大线程数和进程数
$ cat /proc/sys/kernel/threads-max
$ cat /proc/sys/kernel/pid_max
$ man 5 proc
TODO
#13: Scalability and Availability
Scalability and Availability
Load balancers
持久存储的部分委托给分布式数据库处理
使用负载均衡器将 client 请求分派给不同的服务器
Load balance your load balancers
多个负载均衡器
$ dig +noall +answer bilibili.com
bilibili.com. 1 IN A 119.3.238.64
bilibili.com. 1 IN A 8.134.50.24
bilibili.com. 1 IN A 39.105.216.211
bilibili.com. 1 IN A 119.3.70.188
bilibili.com. 1 IN A 139.159.241.37
bilibili.com. 1 IN A 47.103.24.173
bilibili.com. 1 IN A 120.92.78.97
Geographic routing with DNS
https://www.whatsmydns.net/
#15: Futures I
Non-blocking I/O
Epoll Syscall
Future
The result of a computation that may or may not have completed.
In Rust, futures are structs that implement the Future trait.
Executors
Combining futures
Async/Await
syntactic sugar
transform your chain of async computation (i.e. futures) into an efficient state machine
#16: Futures II
ThreadPool Link Explorer
Async Link Explorer
tokio 版本太老了,于是做了一些修改
速度似乎还不如 ThreadPool
Project 2: Balancebeam
负载均衡的代理服务器
更新 clap 第三方库的使用
https://github.com/clap-rs/clap/blob/v3.1.18/examples/derive_ref/README.md
测试
$ cargo run -- --upstream 171.67.215.200:80
Compiling balancebeam v0.1.0 (/home/vgalaxy/Desktop/cs110l-spr-2020-starter-code/proj-2/balancebeam)
Finished dev [unoptimized + debuginfo] target(s) in 1.85s
Running `target/debug/balancebeam --upstream '171.67.215.200:80'`
INFO balancebeam > Listening for requests on 0.0.0.0:1100
INFO balancebeam > Connection received from 127.0.0.1
INFO balancebeam > 127.0.0.1 -> 127.0.0.1: GET /class/cs110l/ HTTP/1.1
DEBUG balancebeam > Forwarded request to server
INFO balancebeam > 127.0.0.1 <- HTTP/1.1 200 OK
DEBUG balancebeam > Forwarded response to client
INFO balancebeam > 127.0.0.1 -> 127.0.0.1: GET /favicon.ico HTTP/1.1
DEBUG balancebeam > Forwarded request to server
INFO balancebeam > 127.0.0.1 <- HTTP/1.1 200 OK
DEBUG balancebeam > Forwarded response to client
DEBUG balancebeam > Client finished sending requests. Shutting down connection
http://localhost:1100/class/cs110l/
Milestone 0: Read the starter code
RTFSC
实验指南中讲述的十分详细
main.rs
listening
handle_connection
(from client)
connect_to_upstream
loop
read a request from the client
relay the request to the upstream server
read the response from the upstream server
forward the response to the client
request.rs
and response.rs
read_from_stream
write_to_stream
一些第三方库
本地测试
$ cargo test --test 01_single_upstream_tests
$ cargo test --test 02_multiple_upstream_tests test_load_distribution
Milestone 1: Add multithreading
直接使用第三方库 threadpool
需要让 ProxyState
derive Clone
let pool = ThreadPool :: new ( 8 );
for stream in listener . incoming () {
if let Ok ( stream ) = stream {
// Handle the connection!
let state_cloned = state . clone ();
pool . execute ( move || {
handle_connection ( stream , & state_cloned );
});
}
}
Milestone 2: Use asynchronous I/O
为了不让测试代码 crash,使用老版本的 tokio
实验指南给出了详尽的修改方案
std::net::{TcpListener, TcpStream}
to tokio::net::{TcpListener, TcpStream}
std::io::{Read, Write}
to tokio::io::{AsyncReadExt, AsyncWriteExt}
在异步函数调用后添加 .await
使用 await
的函数需要标记为 async
不使用 ThreadPool,而是使用 task::spawn
while let Some ( stream ) = listener . incoming (). next (). await {
match stream {
Ok ( stream ) => {
let state_cloned = state . clone ();
task :: spawn ( async move {
handle_connection ( stream , & state_cloned ). await ;
});
}
Err ( err ) => {
log :: error! ( "connection failed: {}" , err );
}
}
}
考虑是否必要在 task::spawn
后添加 .await
TFM 中并没有这样做
对比 Async Link Explorer
Milestone 3: Failover with passive health checks
At this point, your load balancer should be pretty scalable .
Let’s work on adding some more availability to our infrastructure.
注意到在 connect_to_upstream
的实现中
若随机选择的 upstream 连接失败则直接返回
为了增加可靠性,当连接失败时尝试连接其他的 upstream
为此在 ProxyState 中添加字段
upstream_states: Vec<bool>
并修改 connect_to_upstream
async fn connect_to_upstream ( state : Arc < Mutex < ProxyState >>) -> Result < TcpStream , std :: io :: Error > {
// TODO: implement failover (milestone 3)
let mut state_ref = state . lock (). await ;
if ! state_ref .upstream_states. contains (& true ) {
log :: error! ( "Failed to connect: all upstreams are dead" );
return Err ( std :: io :: Error :: new ( ErrorKind :: Other , "oh no!" ));
}
let mut rng = rand :: rngs :: StdRng :: from_entropy ();
loop {
let upstream_idx = rng . gen_range ( 0 , state_ref .upstream_addresses. len ());
if ! state_ref .upstream_states[ upstream_idx ] {
continue ;
}
let upstream_ip = & state_ref .upstream_addresses[ upstream_idx ];
match TcpStream :: connect ( upstream_ip ). await {
Ok ( stream ) => {
return Ok ( stream );
}
Err ( err ) => {
log :: warn! ( "Failed to connect to upstream {}: {}" , upstream_ip , err );
state_ref .upstream_states[ upstream_idx ] = false ;
continue ;
}
}
}
}
将参数类型修改为 Arc<Mutex<ProxyState>>
使用 tokio::sync::Mutex
保持独占性
也可以使用 tokio::sync::RwLock
,不过写起来比较麻烦……
Milestone 4: Failover with active health checks
passive health checks 存在一些缺陷
上游服务器仍然可以建立连接,但它们无法为请求提供服务
passive health checks 无法检测出这种情况
上游服务器因为某些原因暂时停机,过了一会儿后又恢复了
passive health checks 标记恢复的上游服务器
于是考虑实现一个异步函数 active_health_check
然后在 tokio::main
中单独开启 spawn 一个异步任务即可
active_health_check
实现如下
async fn active_health_check ( state : Arc < Mutex < ProxyState >>) {
let interval ;
let path ;
let upstream_addresses ;
{
let state_ref = state . lock (). await ;
interval = state_ref .active_health_check_interval;
path = state_ref .active_health_check_path. clone ();
upstream_addresses = state_ref .upstream_addresses. clone ();
} // release lock
loop {
delay_for ( Duration :: from_secs ( interval as u64 )). await ;
for upstream_idx in 0 .. upstream_addresses . len () {
let mut upstream_state = false ;
let upstream_ip = & upstream_addresses [ upstream_idx ];
match TcpStream :: connect ( upstream_ip ). await {
Ok ( mut stream ) => {
let request = http :: Request :: builder ()
. method ( http :: Method :: GET )
. uri ( path . clone ())
. header ( "Host" , upstream_ip )
. body ( Vec :: new ())
. unwrap ();
if let Err ( error ) = request :: write_to_stream (& request , & mut stream ). await {
log :: error! (
"Failed to send request (active health check) to upstream {}: {:?}" ,
upstream_ip ,
error
);
// do nothing
}
match response :: read_from_stream (& mut stream , request . method ()). await {
Ok ( response ) => {
upstream_state = response . status () == StatusCode :: OK ;
}
Err ( error ) => {
log :: error! (
"Error reading response (active health check) from server: {:?}" ,
error
);
// do nothing
}
}
}
Err ( error ) => {
log :: error! (
"Error establishing connection with server {}: {:?}" ,
upstream_ip ,
error
);
// do nothing
}
}
log :: info! (
"Active health check result: upstream {} -> {}" ,
upstream_ip ,
upstream_state
);
{
let mut state_ref = state . lock (). await ;
state_ref .upstream_states[ upstream_idx ] = upstream_state ;
}
}
}
}
遍历上游服务器,对每个服务器尝试建立连接,并发送请求,观察响应是否正常
注意及时的释放锁
Milestone 5: Rate limiting
考虑到可能发生如下的情形
需要限制客户端发送请求的次数
考虑使用 fixed window algorithm
注意到同一个客户端可能多次 connect_to_upstream
所以计数器和计时器不应该是 connect_to_upstream
的内部状态
而应该是 ProxyState 的一部分
为此在 ProxyState 中添加字段
client_requests_map: HashMap<String, (Instant, usize)>
然后在 connect_to_upstream
添加相关代码
// Rate limiting
{
let state_cloned = state . clone ();
let mut state_ref = state_cloned . lock (). await ;
let limits = state_ref .max_requests_per_minute;
let map_cloned = state_ref .client_requests_map. clone ();
if limits != 0 {
match map_cloned . get (& client_ip ) {
None => {
state_ref .client_requests_map. insert ( client_ip . clone (), ( Instant :: now (), 1 ));
}
Some (( when , counter )) => {
let mut count = counter . clone ();
if when . elapsed (). as_secs () >= 60 {
count = 0 ;
}
if count >= limits {
let response = response :: make_http_error ( http :: StatusCode :: TOO_MANY_REQUESTS );
send_response (& mut client_conn , & response ). await ;
log :: debug! ( "Forwarded response `TOO_MANY_REQUESTS` to client" );
continue ;
} else {
count += 1 ;
}
state_ref .client_requests_map. insert ( client_ip . clone (), (* when , count ));
}
}
}
}
注意 tokio::time::Instant::elapsed
方法返回自 instant 创建经过的时间,因而实现了计时器的功能
仍需小心 Rust 的内存模型,以及 HashMap 的各种 reference 类型
Extensions
代理服务器维护与上流服务器的连接
https://konghq.com/blog/how-to-design-a-scalable-rate-limiting-algorithm
Other load balancing algorithms
目前的算法为随机选择
https://www.haproxy.com/blog/power-of-two-load-balancing/
Caching
Web Application Firewall