Commit c5972091 authored by Ralf's avatar Ralf
Browse files

use scoped threads

parent 1a20b50c
...@@ -6,6 +6,7 @@ mod environment; ...@@ -6,6 +6,7 @@ mod environment;
mod event_handler; mod event_handler;
use std::time::Duration; use std::time::Duration;
use crossbeam::scope;
use self::environment::*; use self::environment::*;
use self::event_handler::*; use self::event_handler::*;
...@@ -26,7 +27,7 @@ struct Actors { ...@@ -26,7 +27,7 @@ struct Actors {
} }
trait State { trait State {
fn run(&mut self, Actors, &mut Environment) -> (Actors, Option<Box<State>>); fn run(&mut self, &mut Actors, &mut Environment) -> Option<Box<State>>;
} }
fn user_command_reject_handler(_: Event) {} fn user_command_reject_handler(_: Event) {}
...@@ -37,40 +38,45 @@ pub struct UnLockingState { ...@@ -37,40 +38,45 @@ pub struct UnLockingState {
} }
impl State for UnLockingState { impl State for UnLockingState {
fn run(&mut self, mut actors: Actors, env: &mut Environment) -> (Actors, Option<Box<State>>) { fn run(&mut self, actors: &mut Actors, env: &mut Environment) -> Option<Box<State>> {
// TODO: abort if already locked // TODO: abort if already locked
let mut lock_actor = actors.lock_actor; // move out
let locking = self.locking; let locking = self.locking;
for _ in 0..3 { for _ in 0..3 {
let locker = Wakeable::new(move |t| { let lock_actor = &mut actors.lock_actor; // move out
lock_actor.act(if locking { let cancel = scope(|scope| {
LockAction::PressLock // it is important that we keep this thread alife, so remember the return value
} else { // TODO: Change the API around so that this pitfall no longer exists
LockAction::PressUnlock let _ = Wakeable::new(scope, move |t| {
lock_actor.act(if locking {
LockAction::PressLock
} else {
LockAction::PressUnlock
});
t.sleep(Duration::from_millis(100));
lock_actor.act(LockAction::Release);
lock_actor
}); });
t.sleep(Duration::from_millis(100)); let r = handle_events_timeout!(env,
lock_actor.act(LockAction::Release); Duration::from_secs(1),
lock_actor EventHandler::new_safe(user_command_reject_handler),
EventHandler::new(|ev| {
match ev {
Event::DoorLocked(b) if b == locking => {
HandlerResult::QuitLoop
}
_ => HandlerResult::Continue,
}
}));
if r != EventsResult::Timeout {
assert!(r == EventsResult::ShuttingDown || env.door_locked() == locking);
return true;
}
return false;
}); });
let r = handle_events_timeout!(env, if cancel {
Duration::from_secs(1),
EventHandler::new_safe(user_command_reject_handler),
EventHandler::new(|ev| {
match ev {
Event::DoorLocked(b) if b == locking => {
HandlerResult::QuitLoop
}
_ => HandlerResult::Continue,
}
}));
// take back lock_actor to feel complete again
lock_actor = locker.terminate();
if r != EventsResult::Timeout {
assert!(r == EventsResult::ShuttingDown || env.door_locked() == locking);
break; break;
} }
} }
actors.lock_actor = lock_actor; // move back None
(actors, None)
} }
} }
use std::time::Duration; use std::time::Duration;
use std::sync::{Arc, Mutex, Condvar}; use std::sync::{Arc, Mutex, Condvar};
use std::thread; use crossbeam::{Scope, ScopedJoinHandle};
#[derive(PartialEq,Eq,Debug,Copy,Clone)] #[derive(PartialEq,Eq,Debug,Copy,Clone)]
pub enum ShouldThreadRun { pub enum ShouldThreadRun {
...@@ -8,8 +8,8 @@ pub enum ShouldThreadRun { ...@@ -8,8 +8,8 @@ pub enum ShouldThreadRun {
No, No,
} }
pub struct Wakeable<T> { pub struct Wakeable<T: Send> {
join: thread::JoinHandle<T>, join: Option<ScopedJoinHandle<T>>, // invariant: This will always be a Some outside of the destructor
state: Arc<ThreadState>, state: Arc<ThreadState>,
} }
pub struct Thread { pub struct Thread {
...@@ -20,9 +20,9 @@ struct ThreadState { ...@@ -20,9 +20,9 @@ struct ThreadState {
cond: Condvar, cond: Condvar,
} }
impl<T: Send + 'static> Wakeable<T> { impl<T: Send> Wakeable<T> {
pub fn new<F>(f: F) -> Self pub fn new<'a, F>(scope: &Scope<'a>, f: F) -> Self
where F: FnOnce(Thread) -> T + Send + 'static where F: FnOnce(Thread) -> T + Send + 'a, T: 'a
{ {
let state = Arc::new(ThreadState { let state = Arc::new(ThreadState {
mutex: Mutex::new(ShouldThreadRun::Yes), mutex: Mutex::new(ShouldThreadRun::Yes),
...@@ -30,19 +30,27 @@ impl<T: Send + 'static> Wakeable<T> { ...@@ -30,19 +30,27 @@ impl<T: Send + 'static> Wakeable<T> {
}); });
let thread = Thread { state: state.clone() }; let thread = Thread { state: state.clone() };
Wakeable { Wakeable {
join: thread::spawn(move || f(thread)), join: Some(scope.spawn(move || f(thread))),
state: state, state: state,
} }
} }
pub fn terminate(self) -> T { fn make_terminate(&self) {
{ // We panic if the lock is poisened, which only happens if someone paniced
// We panic if the lock is poisened, which only happens if someone paniced let mut should_run = self.state.mutex.lock().unwrap();
let mut should_run = self.state.mutex.lock().unwrap(); *should_run = ShouldThreadRun::No;
*should_run = ShouldThreadRun::No; self.state.cond.notify_one();
self.state.cond.notify_one(); }
}
self.join.join().unwrap() // If the child thread paniced, so do we pub fn terminate(mut self) -> T {
self.make_terminate();
self.join.take().unwrap().join() // this asserts that there was a "Some" in the join handle, which will always be the case
}
}
impl<T: Send> Drop for Wakeable<T> {
fn drop(&mut self) {
self.make_terminate();
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment