Commit e405db41 authored by Alex Hultman's avatar Alex Hultman

Initial v0.15 work

parent 59939287
[submodule "uSockets"]
path = uSockets
url = https://github.com/uNetworking/uSockets.git
#ifndef ASIO_H
#define ASIO_H
#include <boost/asio.hpp>
typedef boost::asio::ip::tcp::socket::native_type uv_os_sock_t;
static const int UV_READABLE = 1;
static const int UV_WRITABLE = 2;
struct Loop : boost::asio::io_service {
static Loop *createLoop(bool defaultLoop = true) {
return new Loop;
}
void destroy() {
delete this;
}
void run() {
boost::asio::io_service::run();
}
};
struct Timer {
boost::asio::deadline_timer asio_timer;
void *data;
Timer(Loop *loop) : asio_timer(*loop) {
}
void start(void (*cb)(Timer *), int first, int repeat) {
asio_timer.expires_from_now(boost::posix_time::milliseconds(first));
asio_timer.async_wait([this, cb, repeat](const boost::system::error_code &ec) {
if (ec != boost::asio::error::operation_aborted) {
if (repeat) {
start(cb, repeat, repeat);
}
cb(this);
}
});
}
void setData(void *data) {
this->data = data;
}
void *getData() {
return data;
}
// bug: cancel does not cancel expired timers!
// it has to guarantee that the timer is not called after
// stop is called! ffs boost!
void stop() {
asio_timer.cancel();
}
void close() {
asio_timer.get_io_service().post([this]() {
delete this;
});
}
};
struct Async {
Loop *loop;
void (*cb)(Async *);
void *data;
boost::asio::io_service::work asio_work;
Async(Loop *loop) : loop(loop), asio_work(*loop) {
}
void start(void (*cb)(Async *)) {
this->cb = cb;
}
void send() {
loop->post([this]() {
cb(this);
});
}
void close() {
loop->post([this]() {
delete this;
});
}
void setData(void *data) {
this->data = data;
}
void *getData() {
return data;
}
};
struct Poll {
boost::asio::posix::stream_descriptor *socket;
void (*cb)(Poll *p, int status, int events);
Poll(Loop *loop, uv_os_sock_t fd) {
socket = new boost::asio::posix::stream_descriptor(*loop, fd);
socket->non_blocking(true);
}
bool isClosed() {
return !socket;
}
boost::asio::ip::tcp::socket::native_type getFd() {
return socket ? socket->native_handle() : -1;
}
void setCb(void (*cb)(Poll *p, int status, int events)) {
this->cb = cb;
}
void (*getCb())(Poll *, int, int) {
return cb;
}
void reInit(Loop *loop, uv_os_sock_t fd) {
delete socket;
socket = new boost::asio::posix::stream_descriptor(*loop, fd);
socket->non_blocking(true);
}
void start(Loop *, Poll *self, int events) {
if (events & UV_READABLE) {
socket->async_read_some(boost::asio::null_buffers(), [self](boost::system::error_code ec, std::size_t) {
if (ec != boost::asio::error::operation_aborted) {
self->start(nullptr, self, UV_READABLE);
self->cb(self, ec ? -1 : 0, UV_READABLE);
}
});
}
if (events & UV_WRITABLE) {
socket->async_write_some(boost::asio::null_buffers(), [self](boost::system::error_code ec, std::size_t) {
if (ec != boost::asio::error::operation_aborted) {
self->start(nullptr, self, UV_WRITABLE);
self->cb(self, ec ? -1 : 0, UV_WRITABLE);
}
});
}
}
void change(Loop *, Poll *self, int events) {
socket->cancel();
start(nullptr, self, events);
}
bool fastTransfer(Loop *loop, Loop *newLoop, int events) {
return false;
}
// todo: asio is thread safe, use it!
bool threadSafeChange(Loop *loop, Poll *self, int events) {
return false;
}
void stop(Loop *) {
socket->cancel();
}
// this is not correct, but it works for now
// think about transfer - should allow one to not delete
// but in this case it doesn't matter at all
void close(Loop *loop, void (*cb)(Poll *)) {
socket->release();
socket->get_io_service().post([cb, this]() {
cb(this);
});
delete socket;
socket = nullptr;
}
};
#endif // ASIO_H
#ifndef BACKEND_H
#define BACKEND_H
// Default to Epoll if nothing specified and on Linux
// Default to Libuv if nothing specified and not on Linux
#ifdef USE_ASIO
#include "Asio.h"
#elif !defined(__linux__) || defined(USE_LIBUV)
#include "Libuv.h"
#else
#define USE_EPOLL
#include "Epoll.h"
#endif
#endif // BACKEND_H
#include "Backend.h"
#ifdef USE_EPOLL
// todo: remove this mutex, have callbacks set at program start
std::recursive_mutex cbMutex;
void (*callbacks[16])(Poll *, int, int);
int cbHead = 0;
void Loop::run() {
timepoint = std::chrono::system_clock::now();
while (numPolls) {
for (std::pair<Poll *, void (*)(Poll *)> c : closing) {
numPolls--;
c.second(c.first);
if (!numPolls) {
closing.clear();
return;
}
}
closing.clear();
int numFdReady = epoll_wait(epfd, readyEvents, 1024, delay);
timepoint = std::chrono::system_clock::now();
if (preCb) {
preCb(preCbData);
}
for (int i = 0; i < numFdReady; i++) {
Poll *poll = (Poll *) readyEvents[i].data.ptr;
int status = -bool(readyEvents[i].events & EPOLLERR);
callbacks[poll->state.cbIndex](poll, status, readyEvents[i].events);
}
while (timers.size() && timers[0].timepoint < timepoint) {
Timer *timer = timers[0].timer;
cancelledLastTimer = false;
timers[0].cb(timers[0].timer);
if (cancelledLastTimer) {
continue;
}
int repeat = timers[0].nextDelay;
auto cb = timers[0].cb;
timers.erase(timers.begin());
if (repeat) {
timer->start(cb, repeat, repeat);
}
}
if (postCb) {
postCb(postCbData);
}
}
}
#endif
#ifndef EPOLL_H
#define EPOLL_H
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <unistd.h>
#include <fcntl.h>
#include <chrono>
#include <algorithm>
#include <vector>
#include <mutex>
typedef int uv_os_sock_t;
static const int UV_READABLE = EPOLLIN;
static const int UV_WRITABLE = EPOLLOUT;
struct Poll;
struct Timer;
extern std::recursive_mutex cbMutex;
extern void (*callbacks[16])(Poll *, int, int);
extern int cbHead;
struct Timepoint {
void (*cb)(Timer *);
Timer *timer;
std::chrono::system_clock::time_point timepoint;
int nextDelay;
};
struct Loop {
int epfd;
int numPolls = 0;
bool cancelledLastTimer;
int delay = -1;
epoll_event readyEvents[1024];
std::chrono::system_clock::time_point timepoint;
std::vector<Timepoint> timers;
std::vector<std::pair<Poll *, void (*)(Poll *)>> closing;
void (*preCb)(void *) = nullptr;
void (*postCb)(void *) = nullptr;
void *preCbData, *postCbData;
Loop(bool defaultLoop) {
epfd = epoll_create1(EPOLL_CLOEXEC);
timepoint = std::chrono::system_clock::now();
}
static Loop *createLoop(bool defaultLoop = true) {
return new Loop(defaultLoop);
}
void destroy() {
::close(epfd);
delete this;
}
void run();
int getEpollFd() {
return epfd;
}
};
struct Timer {
Loop *loop;
void *data;
Timer(Loop *loop) {
this->loop = loop;
}
void start(void (*cb)(Timer *), int timeout, int repeat) {
loop->timepoint = std::chrono::system_clock::now();
std::chrono::system_clock::time_point timepoint = loop->timepoint + std::chrono::milliseconds(timeout);
Timepoint t = {cb, this, timepoint, repeat};
loop->timers.insert(
std::upper_bound(loop->timers.begin(), loop->timers.end(), t, [](const Timepoint &a, const Timepoint &b) {
return a.timepoint < b.timepoint;
}),
t
);
loop->delay = -1;
if (loop->timers.size()) {
loop->delay = std::max<int>(std::chrono::duration_cast<std::chrono::milliseconds>(loop->timers[0].timepoint - loop->timepoint).count(), 0);
}
}
void setData(void *data) {
this->data = data;
}
void *getData() {
return data;
}
// always called before destructor
void stop() {
auto pos = loop->timers.begin();
for (Timepoint &t : loop->timers) {
if (t.timer == this) {
loop->timers.erase(pos);
break;
}
pos++;
}
loop->cancelledLastTimer = true;
loop->delay = -1;
if (loop->timers.size()) {
loop->delay = std::max<int>(std::chrono::duration_cast<std::chrono::milliseconds>(loop->timers[0].timepoint - loop->timepoint).count(), 0);
}
}
void close() {
delete this;
}
};
// 4 bytes
struct Poll {
protected:
struct {
int fd : 28;
unsigned int cbIndex : 4;
} state = {-1, 0};
Poll(Loop *loop, uv_os_sock_t fd) {
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
state.fd = fd;
loop->numPolls++;
}
// todo: pre-set all of callbacks up front and remove mutex
void setCb(void (*cb)(Poll *p, int status, int events)) {
cbMutex.lock();
state.cbIndex = cbHead;
for (int i = 0; i < cbHead; i++) {
if (callbacks[i] == cb) {
state.cbIndex = i;
break;
}
}
if (state.cbIndex == cbHead) {
callbacks[cbHead++] = cb;
}
cbMutex.unlock();
}
void (*getCb())(Poll *, int, int) {
return callbacks[state.cbIndex];
}
void reInit(Loop *loop, uv_os_sock_t fd) {
state.fd = fd;
loop->numPolls++;
}
void start(Loop *loop, Poll *self, int events) {
epoll_event event;
event.events = events;
event.data.ptr = self;
epoll_ctl(loop->epfd, EPOLL_CTL_ADD, state.fd, &event);
}
void change(Loop *loop, Poll *self, int events) {
epoll_event event;
event.events = events;
event.data.ptr = self;
epoll_ctl(loop->epfd, EPOLL_CTL_MOD, state.fd, &event);
}
void stop(Loop *loop) {
epoll_event event;
epoll_ctl(loop->epfd, EPOLL_CTL_DEL, state.fd, &event);
}
bool fastTransfer(Loop *loop, Loop *newLoop, int events) {
stop(loop);
start(newLoop, this, events);
loop->numPolls--;
// needs to lock the newLoop's numPolls!
newLoop->numPolls++;
return true;
}
bool threadSafeChange(Loop *loop, Poll *self, int events) {
change(loop, self, events);
return true;
}
void close(Loop *loop, void (*cb)(Poll *)) {
state.fd = -1;
loop->closing.push_back({this, cb});
}
public:
bool isClosed() {
return state.fd == -1;
}
uv_os_sock_t getFd() {
return state.fd;
}
friend struct Loop;
};
// this should be put in the Loop as a general "post" function always available
struct Async : Poll {
void (*cb)(Async *);
Loop *loop;
void *data;
Async(Loop *loop) : Poll(loop, ::eventfd(0, EFD_CLOEXEC)) {
this->loop = loop;
}
void start(void (*cb)(Async *)) {
this->cb = cb;
Poll::setCb([](Poll *p, int, int) {
uint64_t val;
if (::read(((Async *) p)->state.fd, &val, 8) == 8) {
((Async *) p)->cb((Async *) p);
}
});
Poll::start(loop, this, UV_READABLE);
}
void send() {
uint64_t one = 1;
if (::write(state.fd, &one, 8) != 8) {
return;
}
}
void close() {
Poll::stop(loop);
::close(state.fd);
Poll::close(loop, [](Poll *p) {
delete p;
});
}
void setData(void *data) {
this->data = data;
}
void *getData() {
return data;
}
};
#endif // EPOLL_H
......@@ -13,11 +13,29 @@ void *Group<isServer>::getUserData() {
return userData;
}
// kills connect and shutdown sockets (can use userData as tracker)
template <bool isServer>
void Group<isServer>::timerCallback(Timer *timer) {
Group<isServer> *group = (Group<isServer> *) timer->getData();
void Group<isServer>::limboTimerCallback(uS::Loop::Timer *timer) {
Group<isServer> *group = static_cast<Group<isServer> *>(timer->getData());
group->forEach([](uWS::WebSocket<isServer> *webSocket) {
// for each limboSocket
// group->forEach([](uWS::WebSocket<isServer> *webSocket) {
// if (webSocket->hasOutstandingPong) {
// webSocket->terminate();
// } else {
// webSocket->hasOutstandingPong = true;
// }
// });
}
// this callback should be the main tick of all sockets in this group
// including any timer in connect or shutdown state
template <bool isServer>
void Group<isServer>::timerCallback(uS::Loop::Timer *timer) {
Group<isServer> *group = static_cast<Group<isServer> *>(timer->getData());
group->forEach(Group<isServer>::WEBSOCKET, [](uS::Socket *socket) {
uWS::WebSocket<isServer> *webSocket = static_cast<uWS::WebSocket<isServer> *>(socket);
if (webSocket->hasOutstandingPong) {
webSocket->terminate();
} else {
......@@ -33,92 +51,68 @@ void Group<isServer>::timerCallback(Timer *timer) {
}
template <bool isServer>
void Group<isServer>::startAutoPing(int intervalMs, std::string userMessage) {
timer = new Timer(loop);
timer->setData(this);
timer->start(timerCallback, intervalMs, intervalMs);
userPingMessage = userMessage;
}
template <bool isServer>
void Group<isServer>::addHttpSocket(HttpSocket<isServer> *httpSocket) {
if (httpSocketHead) {
httpSocketHead->prev = httpSocket;
httpSocket->next = httpSocketHead;
void Group<isServer>::add(int chainIndex, uS::Socket *socket) {
Socket *&head = chainHead[chainIndex];
if (head) {
head->prev = socket;
socket->next = head;
} else {
httpSocket->next = nullptr;
// start timer
httpTimer = new Timer(hub->getLoop());
httpTimer->setData(this);
httpTimer->start([](Timer *httpTimer) {
Group<isServer> *group = (Group<isServer> *) httpTimer->getData();
group->forEachHttpSocket([](HttpSocket<isServer> *httpSocket) {
if (httpSocket->missedDeadline) {
httpSocket->terminate();
} else if (!httpSocket->outstandingResponsesHead) {
httpSocket->missedDeadline = true;
}
});
}, 1000, 1000);
socket->next = nullptr;
if (chainIndex == HTTPSOCKET) {
httpTimer = new uS::Loop::Timer(getLoop());
httpTimer->setData(this);
httpTimer->start([](uS::Loop::Timer *httpTimer) {
Group<isServer> *group = static_cast<Group<isServer> *>(httpTimer->getData());
group->forEach(Group<isServer>::HTTPSOCKET, [](uS::Socket *socket) {
HttpSocket<isServer> *httpSocket = static_cast<HttpSocket<isServer> *>(socket);
if (httpSocket->missedDeadline) {
httpSocket->terminate();
} else if (!httpSocket->outstandingResponsesHead) {
httpSocket->missedDeadline = true;
}
});
}, 1000, 1000);
}
}
httpSocketHead = httpSocket;
httpSocket->prev = nullptr;
head = socket;
socket->prev = nullptr;
}
template <bool isServer>
void Group<isServer>::removeHttpSocket(HttpSocket<isServer> *httpSocket) {
void Group<isServer>::remove(int chainIndex, uS::Socket *socket) {
if (iterators.size()) {
iterators.top() = httpSocket->next;
iterators.top() = socket->next;
}
if (httpSocket->prev == httpSocket->next) {
httpSocketHead = nullptr;
httpTimer->stop();
httpTimer->close();
if (socket->prev == socket->next) {
chainHead[chainIndex] = nullptr;
if (chainIndex == HTTPSOCKET) {
httpTimer->stop();
httpTimer->close();
}
} else {
if (httpSocket->prev) {
((HttpSocket<isServer> *) httpSocket->prev)->next = httpSocket->next;
if (socket->prev) {
static_cast<uS::Socket *>(socket->prev)->next = socket->next;
} else {
httpSocketHead = (HttpSocket<isServer> *) httpSocket->next;
chainHead[chainIndex] = socket->next;
}
if (httpSocket->next) {
((HttpSocket<isServer> *) httpSocket->next)->prev = httpSocket->prev;
if (socket->next) {
static_cast<uS::Socket *>(socket->next)->prev = socket->prev;
}
}
}
template <bool isServer>
void Group<isServer>::addWebSocket(WebSocket<isServer> *webSocket) {
if (webSocketHead) {
webSocketHead->prev = webSocket;
webSocket->next = webSocketHead;
} else {
webSocket->next = nullptr;
}
webSocketHead = webSocket;
webSocket->prev = nullptr;
}
template <bool isServer>
void Group<isServer>::removeWebSocket(WebSocket<isServer> *webSocket) {
if (iterators.size()) {
iterators.top() = webSocket->next;
}
if (webSocket->prev == webSocket->next) {
webSocketHead = nullptr;
} else {
if (webSocket->prev) {
((WebSocket<isServer> *) webSocket->prev)->next = webSocket->next;
} else {
webSocketHead = (WebSocket<isServer> *) webSocket->next;
}
if (webSocket->next) {
((WebSocket<isServer> *) webSocket->next)->prev = webSocket->prev;
}
}
void Group<isServer>::startAutoPing(int intervalMs, std::string userMessage) {
timer = new uS::Loop::Timer(getLoop());
timer->setData(this);
timer->start(timerCallback, intervalMs, intervalMs);
userPingMessage = userMessage;
}
template <bool isServer>
Group<isServer>::Group(int extensionOptions, Hub *hub, uS::NodeData *nodeData) : uS::NodeData(*nodeData), hub(hub), extensionOptions(extensionOptions) {
Group<isServer>::Group(int extensionOptions, uS::Loop *loop) : extensionOptions(extensionOptions), uS::Context(loop) {
connectionHandler = [](WebSocket<isServer> *, HttpRequest) {};
transferHandler = [](WebSocket<isServer> *) {};
messageHandler = [](WebSocket<isServer> *, char *, size_t, OpCode) {};
......@@ -134,28 +128,17 @@ Group<isServer>::Group(int extensionOptions, Hub *hub, uS::NodeData *nodeData) :
this->extensionOptions |= CLIENT_NO_CONTEXT_TAKEOVER | SERVER_NO_CONTEXT_TAKEOVER;
}
// this is really just implemented in the Context!
template <bool isServer>
void Group<isServer>::stopListening() {
if (isServer) {
if (user) {
// todo: we should allow one group to listen to many ports!
uS::ListenSocket *listenSocket = (uS::ListenSocket *) user;
if (listenSocket->timer) {
listenSocket->timer->stop();
listenSocket->timer->close();
}
listenSocket->closeSocket<uS::ListenSocket>();
uS::Context::stopListening();
// mark as stopped listening (extra care?)
user = nullptr;
}
}
// stop context
if (async) {
async->close();
}
// if (async) {
// async->close();
// }
}