Commit 7a20cf34 authored by Ralf's avatar Ralf
Browse files

go back to ownership threading. make it work for buzzing.

parent 9ec0d0a3
...@@ -3,7 +3,6 @@ name = "rinx" ...@@ -3,7 +3,6 @@ name = "rinx"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"chan 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", "chan 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
...@@ -15,11 +14,6 @@ dependencies = [ ...@@ -15,11 +14,6 @@ dependencies = [
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "crossbeam"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.7" version = "0.2.7"
......
...@@ -6,4 +6,3 @@ authors = ["Constantin Berhard <git.mail.enormator@xoxy.net>"] ...@@ -6,4 +6,3 @@ authors = ["Constantin Berhard <git.mail.enormator@xoxy.net>"]
[dependencies] [dependencies]
log = "0.3.5" log = "0.3.5"
chan = "0.1.14" chan = "0.1.14"
crossbeam = "0.2.8"
...@@ -13,5 +13,5 @@ pub struct Actors { ...@@ -13,5 +13,5 @@ pub struct Actors {
} }
pub trait State { pub trait State {
fn run(&mut self, &mut Actors, &mut environment::Environment) -> Option<Box<State>>; fn run(&mut self, Actors, &mut environment::Environment) -> Option<Box<State>>;
} }
use std::time::Duration; use std::time::Duration;
use crossbeam::scope;
use actors::{Actor, LockAction}; use actors::{Actor, LockAction};
use input::Event; use input::Event;
use util::wakeable::SpawnWakeable; use util::wakeable::{spawn_wakeable, prepare_wakeable};
use brain::environment::*; use brain::environment::*;
use brain::handlers; use brain::handlers;
use brain::{Actors, State}; use brain::{Actors, State};
...@@ -20,42 +19,35 @@ impl UnLockingState { ...@@ -20,42 +19,35 @@ impl UnLockingState {
} }
impl State for UnLockingState { impl State for UnLockingState {
fn run(&mut self, actors: &mut Actors, env: &mut Environment) -> Option<Box<State>> { fn run(&mut self, mut actors: Actors, env: &mut Environment) -> Option<Box<State>> {
// TODO: abort if already locked // TODO: abort if already locked
let locking = self.locking; let locking = self.locking;
for _ in 0..3 { for _ in 0..3 {
let lock_actor = &mut actors.lock; // move out let locker = spawn_wakeable(actors.lock, (), move |t, mut lock_actor, _| {
let cancel = scope(|scope| { lock_actor.act(if locking {
// it is important that we keep this thread alife, so remember the return value LockAction::PressLock
// TODO: Change the API around so that this pitfall no longer exists } else {
scope.spawn_wakeable(move |t| { LockAction::PressUnlock
lock_actor.act(if locking {
LockAction::PressLock
} else {
LockAction::PressUnlock
});
t.sleep(Duration::from_millis(100));
lock_actor.act(LockAction::Release);
}); });
let r = handle_events_timeout!(env, t.sleep(Duration::from_millis(100));
Duration::from_secs(1), lock_actor.act(LockAction::Release);
EventHandler::new_safe(handlers::user_command_reject_handler), lock_actor
EventHandler::new(|ev| {
match ev {
Event::DoorLocked(b) if b == locking => {
HandlerResult::QuitLoop
}
_ => HandlerResult::Continue,
}
}));
if r != EventsResult::Timeout {
assert!(r == EventsResult::ShuttingDown || env.door_locked() == locking);
return true;
}
return false;
}); });
if cancel { let r = handle_events_timeout!(env,
break; Duration::from_secs(1),
EventHandler::new_safe(handlers::user_command_reject_handler),
EventHandler::new(|ev| {
match ev {
Event::DoorLocked(b) if b == locking => {
HandlerResult::QuitLoop
}
_ => HandlerResult::Continue,
}
}));
actors.lock = locker.terminate();
if r != EventsResult::Timeout {
assert!(r == EventsResult::ShuttingDown || env.door_locked() == locking);
break; // the retries
} }
} }
None // FIXME None // FIXME
...@@ -72,20 +64,25 @@ impl OpenState { ...@@ -72,20 +64,25 @@ impl OpenState {
} }
impl State for OpenState { impl State for OpenState {
fn run(&mut self, actors: &mut Actors, env: &mut Environment) -> Option<Box<State>> { fn run(&mut self, mut actors: Actors, env: &mut Environment) -> Option<Box<State>> {
scope(|scope| { let mut buzzer = prepare_wakeable(actors.buzz, move |t, mut buzz_actor, d| {
let r = handle_events!(env, buzz_actor.act(true);
EventHandler::new_safe(|ev| { t.sleep(d);
if let Event::Bell(true) = ev { buzz_actor.act(false);
// TODO buzz the door. without blocking. buzz_actor
} });
}) let r = handle_events!(env,
); EventHandler::new_safe(|ev| {
if r == EventsResult::ShuttingDown { if let Event::Bell(true) = ev {
return None; buzzer.spawn(Duration::from_secs(2));
} }
None // FIXME })
}) );
actors.buzz = buzzer.terminate();
if r == EventsResult::ShuttingDown {
return None;
}
None // FIXME
} }
} }
...@@ -5,7 +5,6 @@ ...@@ -5,7 +5,6 @@
extern crate log; extern crate log;
#[macro_use] #[macro_use]
extern crate chan; extern crate chan;
extern crate crossbeam;
mod logger; mod logger;
......
use std::{mem, thread};
use std::time::Duration; use std::time::Duration;
use std::sync::{Arc, Mutex, Condvar}; use std::sync::{Arc, Mutex, Condvar};
use crossbeam::Scope;
#[derive(PartialEq,Eq,Debug,Copy,Clone)] #[derive(PartialEq,Eq,Debug,Copy,Clone)]
pub enum ShouldThreadRun { pub enum ShouldThreadRun {
...@@ -8,53 +8,121 @@ pub enum ShouldThreadRun { ...@@ -8,53 +8,121 @@ pub enum ShouldThreadRun {
No, No,
} }
struct ThreadState {
mutex: Mutex<ShouldThreadRun>,
cond: Condvar,
}
enum WakeableState<T> {
Lazy(T),
Running(thread::JoinHandle<T>, Arc<ThreadState>),
Empty,
}
pub struct WakeableHandle<T, F> {
w_state: WakeableState<T>, // invariant: This is never empty when control is to the user
fun: Arc<F>
}
pub struct WakeableThread { pub struct WakeableThread {
state: Arc<ThreadState>, state: Arc<ThreadState>,
} }
struct ThreadState { pub fn prepare_wakeable<D, F, T>(t: T, f: F) -> WakeableHandle<T, F> where
mutex: Mutex<ShouldThreadRun>, D: Send + 'static,
cond: Condvar, T: Send + 'static,
F: Fn(WakeableThread, T, D) -> T + Send + Sync + 'static
{
WakeableHandle { w_state: WakeableState::Lazy(t), fun: Arc::new(f) }
} }
pub trait SpawnWakeable<'a> { pub fn spawn_wakeable<D, F, T>(t: T, d: D, f: F) -> WakeableHandle<T, F> where
fn spawn_wakeable<F>(&self, f: F) where F: FnOnce(WakeableThread) + Send + 'a; D: Send + 'static,
T: Send + 'static,
F: Fn(WakeableThread, T, D) -> T + Send + Sync + 'static,
{
let mut handle = prepare_wakeable(t, f);
handle.spawn(d);
handle
} }
impl<'a> SpawnWakeable<'a> for Scope<'a> { impl<T, F> WakeableHandle<T, F>
fn spawn_wakeable<F>(&self, f: F) {
where F: FnOnce(WakeableThread) + Send + 'a pub fn spawn<D>(&mut self, d: D) where
D: Send + 'static,
T: Send + 'static,
F: Fn(WakeableThread, T, D) -> T + Send + Sync + 'static,
{ {
// get ahand of the t, wherever it is
let t = self.get_t();
// prepare the state mgmt for this thread
let state = Arc::new(ThreadState { let state = Arc::new(ThreadState {
mutex: Mutex::new(ShouldThreadRun::Yes), mutex: Mutex::new(ShouldThreadRun::Yes),
cond: Condvar::new(), cond: Condvar::new(),
}); });
let thread = WakeableThread { state: state.clone() }; let thread = WakeableThread { state: state.clone() };
// spawn the next thread with this
let fun = self.fun.clone();
let handle = thread::spawn(move || fun(thread, t, d));
// store everything back into our state
self.w_state = WakeableState::Running(handle, state);
}
/// This is "unsafe" because it leaves the handle in the empty state, which
/// can cause panics.
fn get_t(&mut self) -> T {
let w_state = mem::replace(&mut self.w_state, WakeableState::Empty);
match w_state {
WakeableState::Lazy(t) => t,
WakeableState::Running(handle, state) => {
state.wakeup();
handle.join().unwrap() // panics only if the thread paniced
},
WakeableState::Empty => panic!("The WakeableState can not be empty"),
}
}
pub fn terminate(mut self) -> T {
self.get_t() // This is fine to call, because the handle will be destroyed
}
}
self.spawn(move || f(thread)); impl<T, F> Drop for WakeableHandle<F, T> {
fn drop(&mut self) {
// Now defer the action that we want to wake up this thread. match self.w_state {
// This will be executed *before* the join deferred by spawn. WakeableState::Empty => {}, // all right, the user called "terminate"
WakeableState::Lazy(_) => {}, // we were never used :( TODO: emit a warning?
self.defer(move || { WakeableState::Running(_, ref state) => {
// We panic if the lock is poisened, which only happens if someone paniced // the thread could still be running. emit a warning? Also, speed up the shutdown
let mut should_run = state.mutex.lock().unwrap(); state.wakeup();
*should_run = ShouldThreadRun::No; },
state.cond.notify_one(); };
});
} }
} }
impl WakeableThread { impl ThreadState {
fn wakeup(&self) {
// We panic if the lock is poisened, which only happens if someone paniced
let mut should_run = self.mutex.lock().unwrap();
*should_run = ShouldThreadRun::No;
self.cond.notify_one();
}
pub fn sleep(&self, d: Duration) -> ShouldThreadRun { pub fn sleep(&self, d: Duration) -> ShouldThreadRun {
// We panic if the lock is poisened, which only happens if someone paniced // We panic if the lock is poisened, which only happens if someone paniced
let should_run = self.state.mutex.lock().unwrap(); let should_run = self.mutex.lock().unwrap();
// check if we have to terminate early // check if we have to terminate early
if *should_run == ShouldThreadRun::No { if *should_run == ShouldThreadRun::No {
return ShouldThreadRun::No; return ShouldThreadRun::No;
} }
// We panic if the lock is poisened, which only happens if someone paniced // We panic if the lock is poisened, which only happens if someone paniced
let (should_run, _) = self.state.cond.wait_timeout(should_run, d).unwrap(); let (should_run, _) = self.cond.wait_timeout(should_run, d).unwrap();
return *should_run; return *should_run;
} }
} }
impl WakeableThread {
pub fn sleep(&self, d: Duration) -> ShouldThreadRun {
self.state.sleep(d)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment