Skip to content

CS 110L

Posted on:2022.05.20

TOC

Open TOC

Info

https://www.bilibili.com/video/BV1Ra411A7kN

https://reberhardt.com/cs110l/spring-2020/

https://zhuanlan.zhihu.com/p/356912397

why rust

Rust

lang

https://www.rust-lang.org/zh-CN/

IDE

在 Arch Linux 下配置 Rust 开发环境

IDEA - rust - without debug

CLion - rust - with debug

cli

$ cargo new demo
$ cd demo
$ cargo run

#02: Memory Safety

Ownership and mutability

fn om_nom_nom(param: String) {
println!("{}", param);
}
fn main() {
let s = String::from("hello");
om_nom_nom(s);
om_nom_nom(s);
}

参数类型未声明为引用,且 String 未实现 Copy trait

故第一次调用 om_nom_nom 时,s 已被移动

fn om_nom_nom(param: u32) {
println!("{}", param);
}
fn main() {
let x = 1;
om_nom_nom(x);
om_nom_nom(x);
}

u32 实现了 Copy trait

References

fn main() {
let mut s = String::from("hello");
let s1 = &mut s;
let s2 = &s;
println!("{} {} {}", s, s1, s2);
}

当 mutable reference 没有返回时,不允许 multiple references

fn main() {
let mut s = String::from("hello");
let s1 = &mut s;
println!("{} {}", s, s1);
}

s1s 的 mutable reference 还没有返回

fn main() {
let mut s = String::from("hello");
let s1 = &mut s;
println!("{}", s1);
println!("{}", s)
}

正确

Week 1 Exercises: Hello world

basic

string

&str - immutable

String - mutable

for iter

iter

iter_mut

loop

https://stackoverflow.com/q/28892351

fn

fn fib(n: i32) -> i32 {
if n <= 1 { n } else { fib(n - 1) + fib(n - 2) }
}

part-3-hangman

刚用 ruby 写过 hangman

由于对 string 的处理需要了解更多 rust 的语法

使用 Vec<char> 作为主力数据结构

let secret_word_chars: Vec<char> = secret_word.chars().collect();

无间隔打印 Vec<char>

secret_word_chars.iter().fold(String::new(), |acc, &char| acc + &*char.to_string())

基本的 io

io::stdout()
.flush()
.expect("Error flushing stdout.");
let mut guess = String::new();
io::stdin()
.read_line(&mut guess)
.expect("Error reading line.");

for loop 还可以这样写

for i in 0..secret_word_chars.len() {
...
}

再次理解 References

#03: Error Handling

Handling nulls

Option

Handling errors

Result

code

extern crate rand;
use rand::Rng;
fn get_random_num() -> u32 {
rand::thread_rng().gen_range(0, 42)
}
fn feeling_lucky() -> Option<String> {
if get_random_num() > 10 {
Some(String::from("I'm feeling lucky!"))
} else {
None
}
}
fn poke_toddler() -> Result<&'static str, &'static str> {
if get_random_num() > 10 {
Ok("Hahahaha!")
} else {
Err("Waaaaahhh!")
}
}
fn main() {
if feeling_lucky().is_none() {
println!("Not feeling lucky :(")
}
println!(
"{}",
feeling_lucky().unwrap_or(String::from("Not lucky :("))
);
match feeling_lucky() {
Some(message) => {
println!("Got message: {}", message)
}
None => {
println!("No message returned :-/")
}
}
match poke_toddler() {
Ok(message) => println!("Toddler said: {}", message),
Err(cry) => println!("Toddler cried: {}", cry),
}
// Panic if the baby cries:
println!("{}", poke_toddler().unwrap());
// Same thing, but print a more descriptive panic message:
println!("{}", poke_toddler().expect("Toddler cried :("));
panic!("Sad times!");
}

需要在 Cargo.toml 中添加

[dependencies]
rand = "0.6.0"

#04: Object Oriented Rust

实现一个 LinkedList

后进先出

code

extern crate core;
use core::fmt;
struct Node {
value: u32,
next: Option<Box<Node>>, // no null pointers in Rust!
}
pub struct LinkedList {
head: Option<Box<Node>>,
size: usize,
}
impl Node {
fn new(value: u32, next: Option<Box<Node>>) -> Node {
Node {value: value, next: next}
}
}
impl LinkedList {
pub fn new() -> LinkedList {
LinkedList {head: None, size: 0}
}
pub fn get_size(&self) -> usize {
self.size
}
pub fn is_empty(&self) -> bool {
self.size == 0
}
pub fn push(&mut self, value: u32) {
let new_node = Box::new(Node::new(value, self.head.take()));
self.head = Some(new_node);
self.size += 1;
}
pub fn pop(&mut self) -> Option<u32> {
let node = self.head.take()?;
self.head = node.next;
self.size -= 1;
Some(node.value)
}
pub fn display(&self) {
let mut current: &Option<Box<Node>> = &self.head;
let mut result = String::from("list ->");
loop {
match current {
Some(node) => {
result = format!("{} {}", result, node.value);
current = &node.next;
},
None => break,
}
}
println!("{}", result);
}
}
impl fmt::Display for LinkedList {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut current: &Option<Box<Node>> = &self.head;
let mut result = String::from("list ->");
loop {
match current {
Some(node) => {
result = format!("{} {}", result, node.value);
current = &node.next;
},
None => break,
}
}
write!(f, "{}", result)
}
}
impl Drop for LinkedList {
fn drop(&mut self) {
let mut current = self.head.take();
while let Some(mut node) = current {
current = node.next.take();
}
}
}

一些细节

Week 2 Exercises: Ownership and structs

Part 1: Ownership short-answer exercises

fn main() {
let mut s = String::from("hello");
let ref1 = &s;
let ref2 = &ref1;
let ref3 = &ref2;
s = String::from("goodbye");
println!("{}", ref3.to_uppercase());
}

在赋值后 immutable reference 仍在使用

fn drip_drop() -> &String {
let s = String::from("hello world!");
return &s;
}

函数返回,生命期结束

可以改为

static S: &'static str = "hello world!";
fn drip_drop() -> &'static str {
return &S;
}

改变其类型为 &str

同时添加 'static 修饰

fn main() {
let s1 = String::from("hello");
let mut v = Vec::new();
v.push(s1);
let s2: String = v[0];
println!("{}", s2);
}

String 未实现 Copy trait

不能从 Vec 中 move 其元素

Part 2: rdiff

算法部分,参考

https://en.wikipedia.org/wiki/Longest_common_subsequence_problem#Print_the_diff

主要谈一谈错误处理

let file = File::open(filename)?;

等价于

let file = match File::open(filename) {
Ok(file) => file,
Err(err) => return Err(err),
};

一旦出现 Err,则将 Err 传播给调用者

一旦出现 Err,则直接 panic

Optional: rwc

略……

Optional challenge: Conway’s Game of Life

thread 'main' panicked at 'could not build default app window: NoAvailableAdapter'

似乎和显卡驱动相关

暂时放弃……

#05: Traits and Generics

trait ≈ interface + default method

Syntax

impl xxx for xxx

Deriving Traits

#[derive(Debug, PartialEq, Clone, Copy)]
struct Point {
x: f64,
y: f64
}

The following is a list of derivable traits:

CloneCopy 配套使用

Eq 相较于 PartialEq 多了自反性

PartialEq for f64: NaN != NaN

Defining Your Own Traits

pub trait ComputeNorm {
fn compute_norm(&self) -> f64 {
0.0 // default implementations
}
}
impl ComputeNorm for Point {
fn compute_norm(&self) -> f64 {
(self.x * self.x + self.y * self.y).sqrt()
}
}

Associated Type

impl Add for Point {
type Output = Self; // associated type
fn add(self, other: Self) -> Self {
Point::new(self.x + other.x, self.y + other.y)
}
}

有点费解

联系泛型的版本

impl<T: Add<Output = T>> Add for Point<T> {
type Output = Self;
fn add(self, other: Self) -> Self::Output {
Self {
x: self.x + other.x,
y: self.y + other.y,
}
}
}

https://doc.rust-lang.org/rust-by-example/generics/assoc_items/types.html

Generics

实例化特定类型

T 为类型参数,注意约束有两种写法

use std::fmt;
pub struct MatchingPair<T> {
first: T,
second: T
}
impl<T> MatchingPair<T> {
pub fn new(first: T, second: T) -> Self {
MatchingPair {first: first, second: second}
}
}
impl<T> fmt::Display for MatchingPair<T> where T: fmt::Display {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "({}, {})", self.first, self.second)
}
}
impl <T: Clone> Clone for MatchingPair<T> {
fn clone(&self) -> Self {
Self {
first: self.first.clone(),
second: self.second.clone()
}
}
}

Trait Syntax in Functions and Composing Traits

泛型函数和 Traits 的组合

fn identity_fn<T>(x: T) -> T {x}
// An example of trait composition -- T must impl Display and PartialOrd
fn print_min<T: fmt::Display + PartialOrd>(x: T, y: T) {
if x < y {
println!("The minimum is {}", x);
} else {
println!("The minimum is {}", y)
}
}

#06: Smart Pointers

Box<T>

std::unique_ptr

Rc<T>

multiple immutable references

实现可持久化版本的 LinkedList

pub fn push_front(&self, value: u32) -> LinkedList {
let new_node: Rc<Node> = Rc::new(Node::new(value, self.head.clone()));
LinkedList {head: Some(new_node), size: self.size + 1}
}
// A tuple in Rust is like a struct -- you can access the zeroth element of
// a tuple name tup by writing "tup.0", you can access the element at index
// 2 by writing "tup.2", etc.
pub fn pop_front(&self) -> (Option<LinkedList>, Option<u32>) {
match &self.head {
Some(node) => {
let val = node.value;
let new_head: Option<Rc<Node>> = node.next.clone();
let new_list = LinkedList {head: new_head, size: self.size - 1};
(Some(new_list), Some(val))
},
None => (None, None)
}
}

几处变化

RefCell<T>

RefCell allows us to wrap a mutable piece of data behind an immutable reference. We can call borrow and borrow_mut on the RefCell to acquire references to the internal data – the borrowing rules are now enforced at runtime. Therefore, RefCell is safe, but it’s slightly more inefficient because it shifts what would have been a compile-time check to runtime. You commonly see Rc<RefCell<T>>, which lets us have a flexible handle to heap allocated data – just like Rc except now we can mutate things!

Week 3 Exercises: Error handling, I/O, and traits

Part 1: Inspecting File Descriptors

检查进程所打开的文件描述符,其流程如下

pub struct Process {
pub pid: usize,
pub ppid: usize,
pub command: String,
}

list_fds 中完成

pub struct OpenFile {
pub name: String,
pub cursor: usize,
pub access_mode: AccessMode,
}

read_link - /proc/{pid}/fd/{fdnum}

read_to_string - /proc/{pid}/fdinfo/{fdnum}

其中使用了 ANSI escape codes

note

注意在 IDE 中运行 test_openfile_from_fd 测试无法通过

在终端中运行正常

$ ./multi_pipe_test & cargo run multi_pipe_test

需要让进程保持在一段时间内运行

于是调试短时间内运行的程序几乎无解

另一处实现上的细节是 OptionResult 的选择

Part 2: A Generic LinkedList

改成泛型非常简单,加一些 <T> 即可

关键在于实现一些 traits

需要同时为 Node<T>LinkedList<T> 实现

impl<T> Clone for Node<T>
where
T: Clone,
{
fn clone(&self) -> Self {
Node::new(self.value.clone(), self.next.clone()) // recursion
}
}
impl<T> Clone for LinkedList<T>
where
T: Clone,
{
fn clone(&self) -> Self {
let mut res: LinkedList<T> = LinkedList::new();
res.head = self.head.clone();
res.size = self.size;
return res;
}
}

注意这里 clone 的递归调用

以及 Clone 性质的传播

T -> Node<T> -> Box<Node<T>> -> Option<Box<Node<T>>>

相当 tricky

参考 http://xion.io/post/code/rust-for-loop.html 可知

for x in v {
// body
}

等价于

let mut iter = IntoIterator::into_iter(v);
loop {
match iter.next() {
Some(x) => {
// body
},
None => break,
}
}

注意到 viter 是 iterables,iter 是 Iterators

对于 LinkedList<T> 而言

an iterator that takes ownership of the list it is iterating over

只需实现 Iterator trait,其中调用 pop_front 即可

没有 IntoIterator trait 为什么也可以……

对于 &LinkedList<T> 而言

an iterator that only references elements in the list

比较复杂

pub struct LinkedListIter<'a, T>
where
T: Clone,
{
current: &'a Option<Box<Node<T>>>,
}
impl<'a, T> IntoIterator for &'a LinkedList<T>
where
T: Clone,
{
type Item = T;
type IntoIter = LinkedListIter<'a, T>;
fn into_iter(self) -> Self::IntoIter {
LinkedListIter {
current: &self.head,
}
}
}
impl<T> Iterator for LinkedListIter<'_, T>
where
T: Clone,
{
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
match self.current {
Some(node) => {
self.current = &node.next;
Some(node.value.clone())
}
None => None,
}
}
}

需要定义数据结构 LinkedListIter

其中使用了生命期语法

This syntax essentially says that the struct lives as long as the reference it contains, so that we don’t have issues with dangling pointers.

在 next 中使用 clone 以确保 immutable reference

对于 &mut LinkedList<T> 而言

就更复杂了……

#07: Pitfalls in Multiprocessing

fork

fork + exec

use a higher-level abstraction

https://doc.rust-lang.org/std/process/struct.Command.html

pipe

https://stackoverflow.com/questions/49218599/write-to-child-process-stdin-in-rust/

https://docs.rs/os_pipe/latest/os_pipe/

signal

在信号处理程序中应考虑共享变量

以及使用 signal-safety 的函数

https://man7.org/linux/man-pages/man7/signal-safety.7.html

printf 和 malloc 都不是 signal-safety 的

另外,信号处理程序不应该阻塞,因为会独占处理器

使用 The self-pipe trick,避免使用信号处理程序

  1. Create a pipe
  2. When you’re awaiting a signal, read from the pipe (this will block until something is written to it)
  3. In the signal handler, write a single byte to the pipe

官方支持 https://man7.org/linux/man-pages/man2/signalfd.2.html

然而这让程序减少了并发度

either doing work, or reading to wait for a signal

为了解决这个问题,有两种方案

注意到线程是可调度的,而非独占的

仍然存在并发问题,但是 rust 可以控制这些问题

https://github.com/Detegr/rust-ctrlc

  1. Creates a self-pipe
  2. Installs a signal handler that writes to the pipe when SIGINT is received
  3. Spawns a thread: loop { read from pipe; call handler function; }

可以在线程中使用非 signal-safety 的函数

TBD

Project 1: The DEET Debugger

info

实时协作开发 https://visualstudio.microsoft.com/zh-hans/services/live-share/

note

对程序结构进行简单分析

程序入口,其参数为携带调试信息可执行程序路径

抑制主进程对 SIGINT 信号的处理

SIGINT 信号会被传递到子进程

构造 Debugger 对象,并调用其 run 方法

run 方法为死循环

通过 get_next_command 得到输入的命令,并解析为 DebuggerCommand 对象

pub enum DebuggerCommand {
Quit,
Run(Vec<String>),
Continue,
Backtrace,
Break(String),
}

其中使用了第三方库 rustyline,提供了历史命令功能

通过匹配 DebuggerCommand 操作 Inferior 对象,即子进程

使用一些私有辅助函数操作子进程,主要有

  1. create_new_inferior - 构造 Inferior 对象,并调用 cont_inferior
  2. cont_inferior - 包装了 Inferior 对象的 cont 方法,使用 handle_status 处理子进程的状态
  3. kill_inferior - 包装了 Inferior 对象的 kill 方法,使用 handle_status 处理子进程的状态
  4. handle_status - 处理子进程的状态
pub enum Status {
/// Indicates inferior stopped. Contains the signal that stopped the process, as well as the
/// current instruction pointer that it is stopped at.
Stopped(signal::Signal, usize),
/// Indicates inferior exited normally. Contains the exit status code.
Exited(i32),
/// Indicates the inferior exited due to a signal. Contains the signal that killed the
/// process.
Signaled(signal::Signal),
}

另外,Debugger 对象中还包含调试信息和断点信息,会通过参数传递给 Inferior 对象使用

涉及到大量底层编程,下面逐一分析

首先需要明确,子进程是通过调用 wait 得到子进程的状态信息的

所有改变执行流的方法,最后都需要调用 wait 将状态信息返回给调用者

pub fn wait(&self, options: Option<WaitPidFlag>) -> Result<Status, nix::Error> {
Ok(match waitpid(self.pid(), options)? {
WaitStatus::Exited(_pid, exit_code) => Status::Exited(exit_code),
WaitStatus::Signaled(_pid, signal, _core_dumped) => Status::Signaled(signal),
WaitStatus::Stopped(_pid, signal) => {
let regs = ptrace::getregs(self.pid())?;
Status::Stopped(signal, regs.rip as usize)
}
other => panic!("waitpid returned unexpected status: {:?}", other),
})
}

new

使用 Command 模块创建子进程

并在 pre_exec 中使用 ptrace 系统调用追踪子进程

注意使用这种方法成功加载子进程后,子进程会收到 SIGTRAP 信号

在此之后安装断点信息,也就是将断点位置的字节改为 0xcc,从而触发 SIGTRAP

关于断点的更多实现细节,见下文

cont

更新断点信息,并判断当前 rip 是否指向某个断点位置的下一个字节

回顾 取指 - 译码 - 执行 - 更新 rip

若是,相当于在触发断点之后 cont,需要重新执行断点位置的指令

于是写回原本的字节,并 ptrace::step 执行单步指令

若程序未结束执行,则再次在断点位置的字节改为 0xcc,以保证断点仍然有效

最后 ptrace::cont 即可

kill

ptrace::kill 即可

别忘了调用 wait,否则会出现僵死进程 <defunct>

print_backtrace

需要通过栈帧信息进行回溯

于是需要如下的编译选项

-O0 -g -no-pie -fno-omit-frame-pointer

如何回溯,看图易知

1a06591eb5fc435d84ffbe3d9fb84692.png

注意到调试信息通常是以 DWARF 的格式存储的

dwarf_data.rs 使用 gimli_wrapper.rs 提取了如下调试信息

  1. 行号和地址之间的映射
  2. 函数名和地址之间的映射
  3. 变量信息

由于一些原因,实际测试发现没有行号和地址之间的映射

并且无法解析库函数,如 printf

使用了第三方库 https://github.com/gimli-rs/gimli

在本项目中,调试信息的实际作用有二

  1. 在子进程处于 Status::Stopped 时,或进行回溯时,通过 rip 解析出行号和函数名
  2. 在打断点时,通过行号或函数名解析出断点地址

todo

$ gdb samples/segfault
Reading symbols from samples/segfault...
(gdb) r
Starting program: /home/vgalaxy/Desktop/cs110l-spr-2020-starter-code/proj-1/deet/samples/segfault
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/usr/lib/libthread_db.so.1".
Calling func2
About to segfault... a=2
Program received signal SIGSEGV, Segmentation fault.
0x0000000000401162 in func2 (a=2) at samples/segfault.c:5
5 *(int*)0 = a;
(gdb) c
Continuing.
Program terminated with signal SIGSEGV, Segmentation fault.
The program no longer exists.
(gdb)

#09: Intro to Multithreading

使用 move 关键字拷贝下标的值

use std::thread;
const NAMES: [&str; 7] = ["Frank", "Jon", "Lauren", "Marco", "Julie", "Patty",
"Tagalong Introvert Jerry"];
fn main() {
let mut threads = Vec::new();
for i in 0..6 {
threads.push(thread::spawn(move || {
println!("Hello from printer {}!", NAMES[i]);
}));
}
// wait for all the threads to finish
for handle in threads {
handle.join().expect("Panic occurred in thread!");
}
}

否则会报错

closure may outlive the current function, but it borrows `i`, which is owned by the current function

也就是闭包的生命期可能比下标的生命期长

#10: Shared Memory

考虑共享变量是一个可变引用

Attempt 1

直接使用 move 关键字

extern crate rand;
use std::{thread, time};
use rand::Rng;
fn handleCall() {
let mut rng = rand::thread_rng();
thread::sleep(time::Duration::from_millis(rng.gen_range(0, 1000)));
}
fn takeBreak() {
let mut rng = rand::thread_rng();
thread::sleep(time::Duration::from_millis(rng.gen_range(0, 1000)));
}
fn shouldTakeBreak() -> bool {
rand::random()
}
fn ticketAgent(id: usize, remainingTickets: &mut usize) {
while *remainingTickets > 0 {
handleCall();
*remainingTickets -= 1;
println!("Agent #{} sold a ticket! ({} more to be sold)",
id, remainingTickets);
if shouldTakeBreak() {
takeBreak();
}
}
println!("Agent #{} notices all tickets are sold, and goes home!", id);
}
fn main() {
let mut remainingTickets = 250;
let mut threads = Vec::new();
for i in 0..10 {
threads.push(thread::spawn(move || {
ticketAgent(i, &mut remainingTickets)
}));
}
// wait for all the threads to finish
for handle in threads {
handle.join().expect("Panic occurred in thread!");
}
println!("End of business day!");
}

实际上拷贝了多份,可以通过编译,但是运行结果不正确

Attempt 2

使用 Rc<RefCell<T>> 结构

extern crate rand;
use std::cell::RefCell;
use std::rc::Rc;
use std::{thread, time};
use rand::Rng;
fn handleCall() {
let mut rng = rand::thread_rng();
thread::sleep(time::Duration::from_millis(rng.gen_range(0, 1000)));
}
fn takeBreak() {
let mut rng = rand::thread_rng();
thread::sleep(time::Duration::from_millis(rng.gen_range(0, 1000)));
}
fn shouldTakeBreak() -> bool {
rand::random()
}
fn ticketAgent(id: usize, remainingTickets: &mut usize) {
while *remainingTickets > 0 {
handleCall();
*remainingTickets -= 1;
println!("Agent #{} sold a ticket! ({} more to be sold)",
id, remainingTickets);
if shouldTakeBreak() {
takeBreak();
}
}
println!("Agent #{} notices all tickets are sold, and goes home!", id);
}
fn main() {
let remainingTickets: Rc<RefCell<usize>> = Rc::new(RefCell::new(250));
let mut threads = Vec::new();
for i in 0..10 {
let remainingTicketsRef = remainingTickets.clone();
threads.push(thread::spawn(|| {
ticketAgent(i, &mut remainingTicketsRef.borrow_mut())
}));
}
// wait for all the threads to finish
for handle in threads {
handle.join().expect("Panic occurred in thread!");
}
println!("End of business day!");
}

在 Rc 上 clone 增加引用计数

在 RefCell 上 borrow_mut 获得可变引用

会报错

error[E0277]: `Rc<RefCell<usize>>` cannot be shared between threads safely
--> src/main.rs:41:22
|
41 | threads.push(thread::spawn(|| {
| ^^^^^^^^^^^^^ `Rc<RefCell<usize>>` cannot be shared between threads safely
|
= help: the trait `Sync` is not implemented for `Rc<RefCell<usize>>`
= note: required because of the requirements on the impl of `Send` for `&Rc<RefCell<usize>>`
= note: required because it appears within the type `[closure@src/main.rs:41:36: 43:10]`

Attempt 3

使用 Arc<Mutex<T>> 结构

extern crate rand;
use std::sync::{Arc, Mutex};
use std::{thread, time};
use rand::Rng;
fn handleCall() {
let mut rng = rand::thread_rng();
thread::sleep(time::Duration::from_millis(rng.gen_range(0, 10)));
}
fn takeBreak() {
let mut rng = rand::thread_rng();
thread::sleep(time::Duration::from_millis(rng.gen_range(0, 10)));
}
fn shouldTakeBreak() -> bool {
rand::random()
}
fn ticketAgent(id: usize, remainingTickets: Arc<Mutex<usize>>) {
loop {
let mut remainingTicketsRef = remainingTickets.lock().unwrap();
if (*remainingTicketsRef == 0) {
break;
}
handleCall();
*remainingTicketsRef -= 1;
println!("Agent #{} sold a ticket! ({} more to be sold)",
id, *remainingTicketsRef);
if shouldTakeBreak() {
takeBreak();
}
}
println!("Agent #{} notices all tickets are sold, and goes home!", id);
}
fn main() {
let remainingTickets: Arc<Mutex<usize>> = Arc::new(Mutex::new(250));
let mut threads = Vec::new();
for i in 0..10 {
let remainingTicketsRef = remainingTickets.clone();
threads.push(thread::spawn(move || {
ticketAgent(i, remainingTicketsRef);
}));
}
// wait for all the threads to finish
for handle in threads {
handle.join().expect("Panic occurred in thread!");
}
println!("End of business day!");
}

可以认为 Arc 为 Rc 的线程安全版本,Mutex 为 RefCell 的线程安全版本

Marker traits

you don’t implement functions for them, they serve a symbolic purpose

for more

#11: Synchronization

介绍了条件变量

与互斥量绑定 Arc<(Mutex<T>, Condvar)>

extern crate rand;
use std::sync::{Arc, Mutex, Condvar};
use std::{thread, time};
use std::collections::VecDeque;
use rand::Rng;
fn rand_sleep() {
let mut rng = rand::thread_rng();
thread::sleep(time::Duration::from_millis(rng.gen_range(0, 30)));
}
#[derive(Clone)]
pub struct SemaPlusPlus<T> {
queue_and_cv: Arc<(Mutex<VecDeque<T>>, Condvar)>,
}
impl<T> SemaPlusPlus<T> {
pub fn new() -> Self {
SemaPlusPlus {queue_and_cv: Arc::new((Mutex::new(VecDeque::new()),
Condvar::new()))}
}
// Enqueues -- Like semaphore.signal()
pub fn send(&self, message: T) {
let (queue_lock, cv) = &*self.queue_and_cv;
let mut queue = queue_lock.lock().unwrap();
let queue_was_empty = queue.is_empty();
queue.push_back(message);
if !queue_was_empty {
cv.notify_all();
}
}
// Dequeues -- Like semaphore.wait()
pub fn recv(&self) -> T {
let (queue_lock, cv) = &*self.queue_and_cv;
// Wait until there is something to dequeue
let mut queue = cv.wait_while(queue_lock.lock().unwrap(), |queue| {
queue.is_empty()
}).unwrap();
// Should return Some(...) because we waited
queue.pop_front().unwrap()
}
}
const NUM_THREADS: usize = 12;
fn main() {
// Inspired by this example https://doc.rust-lang.org/stable/rust-by-example/std_misc/channels.html
let sem: SemaPlusPlus<String> = SemaPlusPlus::new();
let mut handles = Vec::new();
for i in 0..NUM_THREADS {
let sem_clone = sem.clone();
let handle = thread::spawn(move || {
rand_sleep();
sem_clone.send(format!("thread {} just finished!", i))
});
handles.push(handle);
}
for _ in 0..NUM_THREADS {
println!("{}", sem.recv())
}
for handle in handles {
handle.join().unwrap();
}
}

Week 5 Exercises: Farm meets multithreading

完全是被编译器牵着走……

fn pop_number(queue: &Mutex<VecDeque<u32>>) -> Option<u32> {
let mut queue = queue.lock().unwrap();
queue.pop_front()
}
fn main() {
let num_threads = num_cpus::get();
println!("Farm starting on {} CPUs", num_threads);
let start = Instant::now();
// TODO: call get_input_numbers() and store a queue of numbers to factor
let queue = Arc::new(Mutex::new(get_input_numbers()));
let mut threads = Vec::new();
// TODO: spawn `num_threads` threads, each of which pops numbers off the queue and calls
// factor_number() until the queue is empty
for _ in 0..num_threads {
let queue = queue.clone();
threads.push(thread::spawn(move || loop {
let num = pop_number(queue.borrow());
if num.is_some() {
factor_number(num.unwrap());
} else {
break;
}
}))
}
// TODO: join all the threads you created
for handle in threads {
handle.join().expect("Panic occurred in thread!");
}
println!("Total execution time: {:?}", start.elapsed());
}

思路

测试

$ cargo run 12345678 12346789 34567890 45678902 123456 234567 345678 456789 678912 20125001

注意数字是通过命令行传入的,这比从标准输入读取简化了许多

#12: Channels

如何避免数据竞争,不使用共享内存

Effective Go:

Do not communicate by sharing memory; instead, share memory by communicating.

如果不使用共享内存,在线程之间传递数据就需要考虑拷贝的开销

折中的方案是只拷贝指针,也就是浅拷贝,利用智能指针可以保证安全性

关键问题在于如何高效的实现 Channels

利用 crossbeam 实现 Week 5 练习中的 Farm

加强为从标准输入读取

fn main() {
let (sender, receiver) = crossbeam::channel::unbounded();
let mut threads = Vec::new();
for _ in 0..num_cpus::get() {
let receiver = receiver.clone();
threads.push(thread::spawn(move || {
while let Ok(next_num) = receiver.recv() {
factor_number(next_num);
}
}));
}
let stdin = std::io::stdin();
for line in stdin.lock().lines() {
let num = line.unwrap().parse::<u32>().unwrap();
sender
.send(num)
.expect("Tried writing to channel, but there are no receivers!");
}
drop(sender);
for thread in threads {
thread.join().expect("Panic occurred in thread");
}
}

注意 drop(sender) 是为了关闭 Channels 的一端,从而使 receiver.recv() 返回 Err

类似于 read pipe 返回 EOF

Week 6 Exercises: Sharing Data by Communicating

先给出参考实现

fn parallel_map<T, U, F>(input_vec: Vec<T>, num_threads: usize, f: F) -> Vec<U>
where
F: FnOnce(T) -> U + Send + Copy + 'static,
T: Send + 'static,
U: Send + 'static + Default,
{
let mut output_vec: Vec<U> = Vec::with_capacity(input_vec.len());
output_vec.resize_with(input_vec.len(), Default::default);
// TODO: implement parallel map!
let (sender, receiver) = crossbeam::channel::unbounded();
let (sender_re, receiver_re) = crossbeam::channel::unbounded();
let mut threads = Vec::with_capacity(num_threads);
for _ in 0..num_threads {
let receiver = receiver.clone();
let sender_re = sender_re.clone();
threads.push(thread::spawn(move || {
while let Ok(next_elem) = receiver.recv() {
let (index, elem) = next_elem;
sender_re
.send((index, f(elem)))
.expect("Tried writing to channel, but there are no receivers!");
}
}));
}
let mut index = 0;
for elem in input_vec {
sender
.send((index, elem))
.expect("Tried writing to channel, but there are no receivers!");
index = index + 1;
}
drop(sender);
for thread in threads {
thread.join().expect("Panic occurred in thread");
}
drop(sender_re);
while let Ok(next_elem) = receiver_re.recv() {
let (index, elem) = next_elem;
output_vec[index] = elem;
}
output_vec
}

这一堆 traits 约束令人头大

思路

flowchart LR A["sender"] D["receiver_re"] A --> B1["receiver"] --> C1["sender_re"] --> D A --> B2["receiver"] --> C2["sender_re"] --> D A --> B3["receiver"] --> C3["sender_re"] --> D A --> B4["receiver"] --> C4["sender_re"] --> D

其中 sender 和 receiver_re 在主线程中

另外,由于线程的调度是不确定的

考虑到 Vec 随机读写的性能,在通信时传输元素和下标tuple

需要小心 Vec 的初始化

let mut output_vec: Vec<U> = Vec::with_capacity(input_vec.len());
output_vec.resize_with(input_vec.len(), Default::default);

Vec::with_capacity 只能保证 capacity,而 len 为 0

为此需要 resize with default

此时 Default trait 就发挥了作用

查看系统允许的最大线程数和进程数

$ cat /proc/sys/kernel/threads-max
$ cat /proc/sys/kernel/pid_max
$ man 5 proc

TODO

#13: Scalability and Availability

Scalability and Availability

6fa4049b10cd4b88b184c459d12dacd6.jpg

Load balancers

d6529cf4af8345da8116d6d1404b0aef.jpg

持久存储的部分委托给分布式数据库处理

使用负载均衡器将 client 请求分派给不同的服务器

Load balance your load balancers

多个负载均衡器

$ dig +noall +answer bilibili.com
bilibili.com. 1 IN A 119.3.238.64
bilibili.com. 1 IN A 8.134.50.24
bilibili.com. 1 IN A 39.105.216.211
bilibili.com. 1 IN A 119.3.70.188
bilibili.com. 1 IN A 139.159.241.37
bilibili.com. 1 IN A 47.103.24.173
bilibili.com. 1 IN A 120.92.78.97

https://www.whatsmydns.net/

#15: Futures I

Non-blocking I/O

Epoll Syscall

Future

Executors

Combining futures

Async/Await

#16: Futures II

Project 2: Balancebeam

负载均衡的代理服务器

更新 clap 第三方库的使用

https://github.com/clap-rs/clap/blob/v3.1.18/examples/derive_ref/README.md

测试

$ cargo run -- --upstream 171.67.215.200:80
Compiling balancebeam v0.1.0 (/home/vgalaxy/Desktop/cs110l-spr-2020-starter-code/proj-2/balancebeam)
Finished dev [unoptimized + debuginfo] target(s) in 1.85s
Running `target/debug/balancebeam --upstream '171.67.215.200:80'`
INFO balancebeam > Listening for requests on 0.0.0.0:1100
INFO balancebeam > Connection received from 127.0.0.1
INFO balancebeam > 127.0.0.1 -> 127.0.0.1: GET /class/cs110l/ HTTP/1.1
DEBUG balancebeam > Forwarded request to server
INFO balancebeam > 127.0.0.1 <- HTTP/1.1 200 OK
DEBUG balancebeam > Forwarded response to client
INFO balancebeam > 127.0.0.1 -> 127.0.0.1: GET /favicon.ico HTTP/1.1
DEBUG balancebeam > Forwarded request to server
INFO balancebeam > 127.0.0.1 <- HTTP/1.1 200 OK
DEBUG balancebeam > Forwarded response to client
DEBUG balancebeam > Client finished sending requests. Shutting down connection

http://localhost:1100/class/cs110l/

Milestone 0: Read the starter code

RTFSC

实验指南中讲述的十分详细

一些第三方库

本地测试

$ cargo test --test 01_single_upstream_tests
$ cargo test --test 02_multiple_upstream_tests test_load_distribution

Milestone 1: Add multithreading

直接使用第三方库 threadpool

需要让 ProxyState derive Clone

let pool = ThreadPool::new(8);
for stream in listener.incoming() {
if let Ok(stream) = stream {
// Handle the connection!
let state_cloned = state.clone();
pool.execute(move || {
handle_connection(stream, &state_cloned);
});
}
}

Milestone 2: Use asynchronous I/O

为了不让测试代码 crash,使用老版本的 tokio

实验指南给出了详尽的修改方案

while let Some(stream) = listener.incoming().next().await {
match stream {
Ok(stream) => {
let state_cloned = state.clone();
task::spawn(async move {
handle_connection(stream, &state_cloned).await;
});
}
Err(err) => {
log::error!("connection failed: {}", err);
}
}
}

考虑是否必要在 task::spawn 后添加 .await

TFM 中并没有这样做

对比 Async Link Explorer

Milestone 3: Failover with passive health checks

At this point, your load balancer should be pretty scalable.

Let’s work on adding some more availability to our infrastructure.

注意到在 connect_to_upstream 的实现中

若随机选择的 upstream 连接失败则直接返回

为了增加可靠性,当连接失败时尝试连接其他的 upstream

为此在 ProxyState 中添加字段

upstream_states: Vec<bool>

并修改 connect_to_upstream

async fn connect_to_upstream(state: Arc<Mutex<ProxyState>>) -> Result<TcpStream, std::io::Error> {
// TODO: implement failover (milestone 3)
let mut state_ref = state.lock().await;
if !state_ref.upstream_states.contains(&true) {
log::error!("Failed to connect: all upstreams are dead");
return Err(std::io::Error::new(ErrorKind::Other, "oh no!"));
}
let mut rng = rand::rngs::StdRng::from_entropy();
loop {
let upstream_idx = rng.gen_range(0, state_ref.upstream_addresses.len());
if !state_ref.upstream_states[upstream_idx] {
continue;
}
let upstream_ip = &state_ref.upstream_addresses[upstream_idx];
match TcpStream::connect(upstream_ip).await {
Ok(stream) => {
return Ok(stream);
}
Err(err) => {
log::warn!("Failed to connect to upstream {}: {}", upstream_ip, err);
state_ref.upstream_states[upstream_idx] = false;
continue;
}
}
}
}

将参数类型修改为 Arc<Mutex<ProxyState>>

使用 tokio::sync::Mutex 保持独占性

也可以使用 tokio::sync::RwLock,不过写起来比较麻烦……

Milestone 4: Failover with active health checks

passive health checks 存在一些缺陷

于是考虑实现一个异步函数 active_health_check

然后在 tokio::main 中单独开启 spawn 一个异步任务即可

active_health_check 实现如下

async fn active_health_check(state: Arc<Mutex<ProxyState>>) {
let interval;
let path;
let upstream_addresses;
{
let state_ref = state.lock().await;
interval = state_ref.active_health_check_interval;
path = state_ref.active_health_check_path.clone();
upstream_addresses = state_ref.upstream_addresses.clone();
} // release lock
loop {
delay_for(Duration::from_secs(interval as u64)).await;
for upstream_idx in 0..upstream_addresses.len() {
let mut upstream_state = false;
let upstream_ip = &upstream_addresses[upstream_idx];
match TcpStream::connect(upstream_ip).await {
Ok(mut stream) => {
let request = http::Request::builder()
.method(http::Method::GET)
.uri(path.clone())
.header("Host", upstream_ip)
.body(Vec::new())
.unwrap();
if let Err(error) = request::write_to_stream(&request, &mut stream).await {
log::error!(
"Failed to send request (active health check) to upstream {}: {:?}",
upstream_ip,
error
);
// do nothing
}
match response::read_from_stream(&mut stream, request.method()).await {
Ok(response) => {
upstream_state = response.status() == StatusCode::OK;
}
Err(error) => {
log::error!(
"Error reading response (active health check) from server: {:?}",
error
);
// do nothing
}
}
}
Err(error) => {
log::error!(
"Error establishing connection with server {}: {:?}",
upstream_ip,
error
);
// do nothing
}
}
log::info!(
"Active health check result: upstream {} -> {}",
upstream_ip,
upstream_state
);
{
let mut state_ref = state.lock().await;
state_ref.upstream_states[upstream_idx] = upstream_state;
}
}
}
}

遍历上游服务器,对每个服务器尝试建立连接,并发送请求,观察响应是否正常

注意及时的释放锁

Milestone 5: Rate limiting

考虑到可能发生如下的情形

需要限制客户端发送请求的次数

考虑使用 fixed window algorithm

注意到同一个客户端可能多次 connect_to_upstream

所以计数器和计时器不应该是 connect_to_upstream 的内部状态

而应该是 ProxyState 的一部分

为此在 ProxyState 中添加字段

client_requests_map: HashMap<String, (Instant, usize)>

然后在 connect_to_upstream 添加相关代码

// Rate limiting
{
let state_cloned = state.clone();
let mut state_ref = state_cloned.lock().await;
let limits = state_ref.max_requests_per_minute;
let map_cloned = state_ref.client_requests_map.clone();
if limits != 0 {
match map_cloned.get(&client_ip) {
None => {
state_ref.client_requests_map.insert(client_ip.clone(), (Instant::now(), 1));
}
Some((when, counter)) => {
let mut count = counter.clone();
if when.elapsed().as_secs() >= 60 {
count = 0;
}
if count >= limits {
let response = response::make_http_error(http::StatusCode::TOO_MANY_REQUESTS);
send_response(&mut client_conn, &response).await;
log::debug!("Forwarded response `TOO_MANY_REQUESTS` to client");
continue;
} else {
count += 1;
}
state_ref.client_requests_map.insert(client_ip.clone(), (*when, count));
}
}
}
}

注意 tokio::time::Instant::elapsed 方法返回自 instant 创建经过的时间,因而实现了计时器的功能

仍需小心 Rust 的内存模型,以及 HashMap 的各种 reference 类型

Extensions

代理服务器维护与上流服务器的连接

https://konghq.com/blog/how-to-design-a-scalable-rate-limiting-algorithm

目前的算法为随机选择

https://www.haproxy.com/blog/power-of-two-load-balancing/