wakeable.rs 3.93 KB
Newer Older
1
use std::{mem, thread};
Ralf's avatar
Ralf committed
2
use std::time::Duration;
Ralf's avatar
Ralf committed
3
use std::sync::{Arc, Mutex, Condvar};
Ralf's avatar
Ralf committed
4

Ralf's avatar
Ralf committed
5 6 7 8 9 10
#[derive(PartialEq,Eq,Debug,Copy,Clone)]
pub enum ShouldThreadRun {
    Yes,
    No,
}

11 12 13 14 15 16 17 18 19 20 21 22 23
struct ThreadState {
    mutex: Mutex<ShouldThreadRun>,
    cond: Condvar,
}

enum WakeableState<T> {
    Lazy(T),
    Running(thread::JoinHandle<T>, Arc<ThreadState>),
    Empty,
}

pub struct WakeableHandle<T, F> {
    w_state: WakeableState<T>, // invariant: This is never empty when control is to the user
Ralf's avatar
Ralf committed
24
    fun: Arc<F>,
25 26
}

27
pub struct WakeableThread {
Ralf's avatar
Ralf committed
28 29
    state: Arc<ThreadState>,
}
30

Ralf's avatar
Ralf committed
31 32 33 34
pub fn prepare_wakeable<D, F, T>(t: T, f: F) -> WakeableHandle<T, F>
    where D: Send + 'static,
          T: Send + 'static,
          F: Fn(WakeableThread, T, D) -> T + Send + Sync + 'static
35
{
Ralf's avatar
Ralf committed
36 37 38 39
    WakeableHandle {
        w_state: WakeableState::Lazy(t),
        fun: Arc::new(f),
    }
Ralf's avatar
Ralf committed
40 41
}

Ralf's avatar
Ralf committed
42 43 44 45
pub fn spawn_wakeable<D, F, T>(t: T, d: D, f: F) -> WakeableHandle<T, F>
    where D: Send + 'static,
          T: Send + 'static,
          F: Fn(WakeableThread, T, D) -> T + Send + Sync + 'static
46 47 48 49
{
    let mut handle = prepare_wakeable(t, f);
    handle.spawn(d);
    handle
50 51
}

Ralf's avatar
Ralf committed
52 53 54 55 56
impl<T, F> WakeableHandle<T, F> {
    pub fn spawn<D>(&mut self, d: D)
        where D: Send + 'static,
              T: Send + 'static,
              F: Fn(WakeableThread, T, D) -> T + Send + Sync + 'static
Ralf's avatar
Ralf committed
57
    {
58 59 60
        // get ahand of the t, wherever it is
        let t = self.get_t();
        // prepare the state mgmt for this thread
Ralf's avatar
Ralf committed
61 62 63 64
        let state = Arc::new(ThreadState {
            mutex: Mutex::new(ShouldThreadRun::Yes),
            cond: Condvar::new(),
        });
65
        let thread = WakeableThread { state: state.clone() };
66 67 68 69 70 71
        // spawn the next thread with this
        let fun = self.fun.clone();
        let handle = thread::spawn(move || fun(thread, t, d));
        // store everything back into our state
        self.w_state = WakeableState::Running(handle, state);
    }
Ralf's avatar
Ralf committed
72

73 74 75 76 77 78 79 80 81
    /// This is "unsafe" because it leaves the handle in the empty state, which
    /// can cause panics.
    fn get_t(&mut self) -> T {
        let w_state = mem::replace(&mut self.w_state, WakeableState::Empty);
        match w_state {
            WakeableState::Lazy(t) => t,
            WakeableState::Running(handle, state) => {
                state.wakeup();
                handle.join().unwrap() // panics only if the thread paniced
Ralf's avatar
Ralf committed
82
            }
83 84 85
            WakeableState::Empty => panic!("The WakeableState can not be empty"),
        }
    }
Ralf's avatar
Ralf committed
86

87 88 89 90
    pub fn terminate(mut self) -> T {
        self.get_t() // This is fine to call, because the handle will be destroyed
    }
}
Ralf's avatar
Ralf committed
91

92 93 94
impl<T, F> Drop for WakeableHandle<F, T> {
    fn drop(&mut self) {
        match self.w_state {
Ralf's avatar
Ralf committed
95 96
            WakeableState::Empty => {} // all right, the user called "terminate"
            WakeableState::Lazy(_) => {} // we were never used :( TODO: emit a warning?
97 98 99
            WakeableState::Running(_, ref state) => {
                // the thread could still be running. emit a warning? Also, speed up the shutdown
                state.wakeup();
Ralf's avatar
Ralf committed
100
            }
101
        };
Ralf's avatar
Ralf committed
102 103 104
    }
}

105 106 107 108 109 110 111
impl ThreadState {
    fn wakeup(&self) {
        // We panic if the lock is poisened, which only happens if someone paniced
        let mut should_run = self.mutex.lock().unwrap();
        *should_run = ShouldThreadRun::No;
        self.cond.notify_one();
    }
Ralf's avatar
Ralf committed
112

Ralf's avatar
Ralf committed
113 114
    pub fn sleep(&self, d: Duration) -> ShouldThreadRun {
        // We panic if the lock is poisened, which only happens if someone paniced
115
        let should_run = self.mutex.lock().unwrap();
Ralf's avatar
Ralf committed
116 117 118 119 120
        // check if we have to terminate early
        if *should_run == ShouldThreadRun::No {
            return ShouldThreadRun::No;
        }
        // We panic if the lock is poisened, which only happens if someone paniced
121
        let (should_run, _) = self.cond.wait_timeout(should_run, d).unwrap();
Ralf's avatar
Ralf committed
122
        return *should_run;
Ralf's avatar
Ralf committed
123 124
    }
}
125 126 127 128 129 130

impl WakeableThread {
    pub fn sleep(&self, d: Duration) -> ShouldThreadRun {
        self.state.sleep(d)
    }
}