Commit 4cca3a77 authored by jamesbeilby's avatar jamesbeilby Committed by Alex Hultman

Support external event loops via non-blocking poll() alt. to run() (#762)

* Support external event loops via non-blocking poll() alternative to run()

* Removed tabs

* Reinstated setting of timepoint in Loop::run()/poll() for consistency with libuv
parent 8b0a89ca
...@@ -22,6 +22,10 @@ struct Loop : boost::asio::io_service { ...@@ -22,6 +22,10 @@ struct Loop : boost::asio::io_service {
void run() { void run() {
boost::asio::io_service::run(); boost::asio::io_service::run();
} }
void poll() {
boost::asio::io_service::poll();
}
}; };
struct Timer { struct Timer {
......
...@@ -9,55 +9,69 @@ std::recursive_mutex cbMutex; ...@@ -9,55 +9,69 @@ std::recursive_mutex cbMutex;
void (*callbacks[16])(Poll *, int, int); void (*callbacks[16])(Poll *, int, int);
int cbHead = 0; int cbHead = 0;
void Loop::run() { void Loop::doEpoll(int epollTimeout) {
timepoint = std::chrono::system_clock::now(); for (std::pair<Poll *, void (*)(Poll *)> c : closing) {
while (numPolls) { numPolls--;
for (std::pair<Poll *, void (*)(Poll *)> c : closing) {
numPolls--;
c.second(c.first); c.second(c.first);
if (!numPolls) { if (!numPolls) {
closing.clear(); closing.clear();
return; return;
}
} }
closing.clear(); }
closing.clear();
int numFdReady = epoll_wait(epfd, readyEvents, 1024, delay); int numFdReady = epoll_wait(epfd, readyEvents, 1024, epollTimeout);
timepoint = std::chrono::system_clock::now(); timepoint = std::chrono::system_clock::now();
if (preCb) { if (preCb) {
preCb(preCbData); preCb(preCbData);
} }
for (int i = 0; i < numFdReady; i++) { for (int i = 0; i < numFdReady; i++) {
Poll *poll = (Poll *) readyEvents[i].data.ptr; Poll *poll = (Poll *) readyEvents[i].data.ptr;
int status = -bool(readyEvents[i].events & EPOLLERR); int status = -bool(readyEvents[i].events & EPOLLERR);
callbacks[poll->state.cbIndex](poll, status, readyEvents[i].events); callbacks[poll->state.cbIndex](poll, status, readyEvents[i].events);
} }
while (timers.size() && timers[0].timepoint < timepoint) {
Timer *timer = timers[0].timer;
cancelledLastTimer = false;
timers[0].cb(timers[0].timer);
while (timers.size() && timers[0].timepoint < timepoint) { if (cancelledLastTimer) {
Timer *timer = timers[0].timer; continue;
cancelledLastTimer = false;
timers[0].cb(timers[0].timer);
if (cancelledLastTimer) {
continue;
}
int repeat = timers[0].nextDelay;
auto cb = timers[0].cb;
timers.erase(timers.begin());
if (repeat) {
timer->start(cb, repeat, repeat);
}
} }
if (postCb) { int repeat = timers[0].nextDelay;
postCb(postCbData); auto cb = timers[0].cb;
timers.erase(timers.begin());
if (repeat) {
timer->start(cb, repeat, repeat);
} }
} }
if (postCb) {
postCb(postCbData);
}
}
void Loop::run() {
// updated for consistency with libuv impl. behaviour
timepoint = std::chrono::system_clock::now();
while (numPolls) {
doEpoll(delay);
}
}
void Loop::poll() {
if (numPolls) {
doEpoll(0);
} else {
// updated for consistency with libuv impl. behaviour
timepoint = std::chrono::system_clock::now();
}
} }
} }
......
...@@ -34,7 +34,7 @@ struct Loop { ...@@ -34,7 +34,7 @@ struct Loop {
int epfd; int epfd;
int numPolls = 0; int numPolls = 0;
bool cancelledLastTimer; bool cancelledLastTimer;
int delay = -1; int delay = -1; // delay to next timer expiry, or -1 if no timers pending
epoll_event readyEvents[1024]; epoll_event readyEvents[1024];
std::chrono::system_clock::time_point timepoint; std::chrono::system_clock::time_point timepoint;
std::vector<Timepoint> timers; std::vector<Timepoint> timers;
...@@ -58,8 +58,12 @@ struct Loop { ...@@ -58,8 +58,12 @@ struct Loop {
delete this; delete this;
} }
void doEpoll(int epollTimeout);
void run(); void run();
void poll();
int getEpollFd() { int getEpollFd() {
return epfd; return epfd;
} }
......
...@@ -67,6 +67,7 @@ public: ...@@ -67,6 +67,7 @@ public:
} }
using uS::Node::run; using uS::Node::run;
using uS::Node::poll;
using uS::Node::getLoop; using uS::Node::getLoop;
using Group<SERVER>::onConnection; using Group<SERVER>::onConnection;
using Group<CLIENT>::onConnection; using Group<CLIENT>::onConnection;
......
...@@ -24,6 +24,10 @@ struct Loop : uv_loop_t { ...@@ -24,6 +24,10 @@ struct Loop : uv_loop_t {
void run() { void run() {
uv_run(this, UV_RUN_DEFAULT); uv_run(this, UV_RUN_DEFAULT);
} }
void poll() {
uv_run(this, UV_RUN_NOWAIT);
}
}; };
struct Async { struct Async {
......
...@@ -64,6 +64,10 @@ void Node::run() { ...@@ -64,6 +64,10 @@ void Node::run() {
loop->run(); loop->run();
} }
void Node::poll() {
loop->poll();
}
Node::~Node() { Node::~Node() {
delete [] nodeData->recvBufferMemoryBlock; delete [] nodeData->recvBufferMemoryBlock;
SSL_CTX_free(nodeData->clientContext); SSL_CTX_free(nodeData->clientContext);
......
...@@ -78,8 +78,13 @@ protected: ...@@ -78,8 +78,13 @@ protected:
public: public:
Node(int recvLength = 1024, int prePadding = 0, int postPadding = 0, bool useDefaultLoop = false); Node(int recvLength = 1024, int prePadding = 0, int postPadding = 0, bool useDefaultLoop = false);
~Node(); ~Node();
/* Blocking */
void run(); void run();
/* Non-blocking */
void poll();
Loop *getLoop() { Loop *getLoop() {
return loop; return loop;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment