Commit fac23e65 authored by Alex Hultman's avatar Alex Hultman

Lots of Group add/remove/forEach changes, closeSocket, etc

parent 58e04f6b
......@@ -13,11 +13,29 @@ void *Group<isServer>::getUserData() {
return userData;
}
// kills connect and shutdown sockets (can use userData as tracker)
template <bool isServer>
void Group<isServer>::limboTimerCallback(uS::Loop::Timer *timer) {
Group<isServer> *group = static_cast<Group<isServer> *>(timer->getData());
// for each limboSocket
// group->forEach([](uWS::WebSocket<isServer> *webSocket) {
// if (webSocket->hasOutstandingPong) {
// webSocket->terminate();
// } else {
// webSocket->hasOutstandingPong = true;
// }
// });
}
// this callback should be the main tick of all sockets in this group
// including any timer in connect or shutdown state
template <bool isServer>
void Group<isServer>::timerCallback(uS::Loop::Timer *timer) {
Group<isServer> *group = (Group<isServer> *) timer->getData();
Group<isServer> *group = static_cast<Group<isServer> *>(timer->getData());
group->forEach([](uWS::WebSocket<isServer> *webSocket) {
group->forEach(Group<isServer>::WEBSOCKET, [](uS::Socket *socket) {
uWS::WebSocket<isServer> *webSocket = static_cast<uWS::WebSocket<isServer> *>(socket);
if (webSocket->hasOutstandingPong) {
webSocket->terminate();
} else {
......@@ -33,88 +51,64 @@ void Group<isServer>::timerCallback(uS::Loop::Timer *timer) {
}
template <bool isServer>
void Group<isServer>::startAutoPing(int intervalMs, std::string userMessage) {
timer = new uS::Loop::Timer(getLoop());
timer->setData(this);
timer->start(timerCallback, intervalMs, intervalMs);
userPingMessage = userMessage;
}
template <bool isServer>
void Group<isServer>::addHttpSocket(HttpSocket<isServer> *httpSocket) {
if (httpSocketHead) {
httpSocketHead->prev = httpSocket;
httpSocket->next = httpSocketHead;
void Group<isServer>::add(int chainIndex, uS::Socket *socket) {
Socket *&head = chainHead[chainIndex];
if (head) {
head->prev = socket;
socket->next = head;
} else {
httpSocket->next = nullptr;
// start timer
httpTimer = new uS::Loop::Timer(getLoop());
httpTimer->setData(this);
httpTimer->start([](uS::Loop::Timer *httpTimer) {
Group<isServer> *group = (Group<isServer> *) httpTimer->getData();
group->forEachHttpSocket([](HttpSocket<isServer> *httpSocket) {
if (httpSocket->missedDeadline) {
httpSocket->terminate();
} else if (!httpSocket->outstandingResponsesHead) {
httpSocket->missedDeadline = true;
}
});
}, 1000, 1000);
socket->next = nullptr;
if (chainIndex == HTTPSOCKET) {
httpTimer = new uS::Loop::Timer(getLoop());
httpTimer->setData(this);
httpTimer->start([](uS::Loop::Timer *httpTimer) {
Group<isServer> *group = static_cast<Group<isServer> *>(httpTimer->getData());
group->forEach(Group<isServer>::HTTPSOCKET, [](uS::Socket *socket) {
HttpSocket<isServer> *httpSocket = static_cast<HttpSocket<isServer> *>(socket);
if (httpSocket->missedDeadline) {
httpSocket->terminate();
} else if (!httpSocket->outstandingResponsesHead) {
httpSocket->missedDeadline = true;
}
});
}, 1000, 1000);
}
}
httpSocketHead = httpSocket;
httpSocket->prev = nullptr;
head = socket;
socket->prev = nullptr;
}
template <bool isServer>
void Group<isServer>::removeHttpSocket(HttpSocket<isServer> *httpSocket) {
void Group<isServer>::remove(int chainIndex, uS::Socket *socket) {
if (iterators.size()) {
iterators.top() = httpSocket->next;
iterators.top() = socket->next;
}
if (httpSocket->prev == httpSocket->next) {
httpSocketHead = nullptr;
httpTimer->stop();
httpTimer->close();
if (socket->prev == socket->next) {
chainHead[chainIndex] = nullptr;
if (chainIndex == HTTPSOCKET) {
httpTimer->stop();
httpTimer->close();
}
} else {
if (httpSocket->prev) {
((HttpSocket<isServer> *) httpSocket->prev)->next = httpSocket->next;
if (socket->prev) {
static_cast<uS::Socket *>(socket->prev)->next = socket->next;
} else {
httpSocketHead = (HttpSocket<isServer> *) httpSocket->next;
chainHead[chainIndex] = socket->next;
}
if (httpSocket->next) {
((HttpSocket<isServer> *) httpSocket->next)->prev = httpSocket->prev;
if (socket->next) {
static_cast<uS::Socket *>(socket->next)->prev = socket->prev;
}
}
}
template <bool isServer>
void Group<isServer>::addWebSocket(WebSocket<isServer> *webSocket) {
if (webSocketHead) {
webSocketHead->prev = webSocket;
webSocket->next = webSocketHead;
} else {
webSocket->next = nullptr;
}
webSocketHead = webSocket;
webSocket->prev = nullptr;
}
template <bool isServer>
void Group<isServer>::removeWebSocket(WebSocket<isServer> *webSocket) {
if (iterators.size()) {
iterators.top() = webSocket->next;
}
if (webSocket->prev == webSocket->next) {
webSocketHead = nullptr;
} else {
if (webSocket->prev) {
((WebSocket<isServer> *) webSocket->prev)->next = webSocket->next;
} else {
webSocketHead = (WebSocket<isServer> *) webSocket->next;
}
if (webSocket->next) {
((WebSocket<isServer> *) webSocket->next)->prev = webSocket->prev;
}
}
void Group<isServer>::startAutoPing(int intervalMs, std::string userMessage) {
timer = new uS::Loop::Timer(getLoop());
timer->setData(this);
timer->start(timerCallback, intervalMs, intervalMs);
userPingMessage = userMessage;
}
template <bool isServer>
......@@ -134,24 +128,11 @@ Group<isServer>::Group(int extensionOptions, uS::Loop *loop) : extensionOptions(
this->extensionOptions |= CLIENT_NO_CONTEXT_TAKEOVER | SERVER_NO_CONTEXT_TAKEOVER;
}
// this is really just implemented in the Context!
template <bool isServer>
void Group<isServer>::stopListening() {
// if (isServer) {
// if (user) {
// // todo: we should allow one group to listen to many ports!
// uS::ListenSocket *listenSocket = (uS::ListenSocket *) user;
// if (listenSocket->timer) {
// listenSocket->timer->stop();
// listenSocket->timer->close();
// }
// listenSocket->closeSocket<uS::ListenSocket>();
// // mark as stopped listening (extra care?)
// user = nullptr;
// }
// }
// stop context
// if (async) {
// async->close();
......@@ -231,24 +212,24 @@ void Group<isServer>::broadcast(const char *message, size_t length, OpCode opCod
#endif
typename WebSocket<isServer>::PreparedMessage *preparedMessage = WebSocket<isServer>::prepareMessage((char *) message, length, opCode, false);
forEach([preparedMessage](uWS::WebSocket<isServer> *ws) {
ws->sendPrepared(preparedMessage);
forEach(WEBSOCKET, [preparedMessage](uS::Socket *socket) {
static_cast<uWS::WebSocket<isServer> *>(socket)->sendPrepared(preparedMessage);
});
WebSocket<isServer>::finalizeMessage(preparedMessage);
}
template <bool isServer>
void Group<isServer>::terminate() {
forEach([](uWS::WebSocket<isServer> *ws) {
ws->terminate();
forEach(WEBSOCKET, [](uS::Socket *socket) {
static_cast<uWS::WebSocket<isServer> *>(socket)->terminate();
});
stopListening();
}
template <bool isServer>
void Group<isServer>::close(int code, char *message, size_t length) {
forEach([code, message, length](uWS::WebSocket<isServer> *ws) {
ws->close(code, message, length);
forEach(WEBSOCKET, [code, message, length](uS::Socket *socket) {
static_cast<uWS::WebSocket<isServer> *>(socket)->close(code, message, length);
});
stopListening();
if (timer) {
......@@ -257,7 +238,7 @@ void Group<isServer>::close(int code, char *message, size_t length) {
}
}
template struct Group<true>;
template struct Group<false>;
template struct Group<SERVER>;
template struct Group<CLIENT>;
}
......@@ -47,16 +47,18 @@ protected:
void *userData = nullptr;
static void timerCallback(uS::Loop::Timer *timer);
WebSocket<isServer> *webSocketHead = nullptr;
HttpSocket<isServer> *httpSocketHead = nullptr;
void addWebSocket(WebSocket<isServer> *webSocket);
void removeWebSocket(WebSocket<isServer> *webSocket);
// todo: remove these, template
void addHttpSocket(HttpSocket<isServer> *httpSocket);
void removeHttpSocket(HttpSocket<isServer> *httpSocket);
static void limboTimerCallback(uS::Loop::Timer *timer);
enum {
WEBSOCKET,
HTTPSOCKET,
WEBSOCKET_SHUTDOWN,
HTTPSOCKET_CONNECT,
SIZE
};
Socket *chainHead[SIZE] = {};
void add(int chainIndex, uS::Socket *socket);
void remove(int chainIndex, uS::Socket *socket);
Group(int extensionOptions, uS::Loop *loop);
void stopListening();
......@@ -93,39 +95,16 @@ public:
// }
}
// void listen(ListenOptions listenOptions) {
//// if (listenOptions == TRANSFERS && !async) {
//// addAsync();
//// }
// }
template <class F>
void forEach(const F &cb) {
WebSocket<isServer> *iterator = webSocketHead;
iterators.push(iterator);
while (iterator) {
WebSocket<isServer> *lastIterator = iterator;
cb((WebSocket<isServer> *) iterator);
iterator = (WebSocket<isServer> *) iterators.top();
if (lastIterator == iterator) {
iterator = (WebSocket<isServer> *) iterator->next;
iterators.top() = iterator;
}
}
iterators.pop();
}
// duplicated code for now!
template <class F>
void forEachHttpSocket(const F &cb) {
HttpSocket<isServer> *iterator = httpSocketHead;
void forEach(int chainIndex, const F &cb) {
uS::Socket *iterator = chainHead[chainIndex];
iterators.push(iterator);
while (iterator) {
HttpSocket<isServer> *lastIterator = iterator;
cb((HttpSocket<isServer> *) iterator);
iterator = (HttpSocket<isServer> *) iterators.top();
uS::Socket *lastIterator = iterator;
cb(iterator);
iterator = static_cast<uS::Socket *>(iterators.top());
if (lastIterator == iterator) {
iterator = (HttpSocket<isServer> *) iterator->next;
iterator = static_cast<uS::Socket *>(iterator->next);
iterators.top() = iterator;
}
}
......
......@@ -108,16 +108,15 @@ uS::Socket *HttpSocket<isServer>::onData(uS::Socket *s, char *data, size_t lengt
bool perMessageDeflate;
httpSocket->upgrade(secKey.value, extensions.value, extensions.valueLength,
subprotocol.value, subprotocol.valueLength, &perMessageDeflate);
Group<isServer>::from(httpSocket)->removeHttpSocket(httpSocket);
Group<isServer> *group = Group<isServer>::from(httpSocket);
group->remove(Group<isServer>::HTTPSOCKET, httpSocket);
// Warning: changes socket, needs to inform the stack of Poll address change!
WebSocket<isServer> *webSocket = new WebSocket<isServer>(perMessageDeflate, httpSocket);
webSocket->setDerivative(WEB_SOCKET_SERVER);
Group<isServer>::from(webSocket)->addWebSocket(webSocket);
// httpSocket is already corked!
//webSocket->cork(true);
Group<isServer>::from(webSocket)->connectionHandler(webSocket, req);
group->add(Group<isServer>::WEBSOCKET, webSocket);
group->connectionHandler(webSocket, req);
if (!webSocket->isClosed()) {
webSocket->cork(false);
}
......@@ -166,7 +165,7 @@ uS::Socket *HttpSocket<isServer>::onData(uS::Socket *s, char *data, size_t lengt
webSocket->setDerivative(WEB_SOCKET_CLIENT);
// httpSocket->cancelTimeout();
webSocket->setUserData(httpSocket->httpUser);
Group<isServer>::from(webSocket)->addWebSocket(webSocket);
//Group<isServer>::from(webSocket)->addWebSocket(webSocket);
webSocket->cork(true);
Group<isServer>::from(webSocket)->connectionHandler(webSocket, req);
......@@ -266,14 +265,16 @@ void HttpSocket<isServer>::onEnd(uS::Socket *s) {
if (!httpSocket->isShuttingDown()) {
if (isServer) {
Group<isServer>::from(httpSocket)->removeHttpSocket(httpSocket);
Group<isServer>::from(httpSocket)->remove(Group<isServer>::HTTPSOCKET, httpSocket);
Group<isServer>::from(httpSocket)->httpDisconnectionHandler(httpSocket);
}
} else {
//httpSocket->cancelTimeout();
}
//httpSocket->template closeSocket<HttpSocket<isServer>>();
httpSocket->close([](uS::Socket *socket) {
delete static_cast<HttpSocket<isServer> *>(socket);
});
while (!httpSocket->messageQueue.empty()) {
Queue::Message *message = httpSocket->messageQueue.front();
......
......@@ -68,7 +68,7 @@ bool Hub::listen(const char *host, int port, uS::TLS::Context sslContext, int op
bool listening = eh->listen(host, port, options, [](Socket *socket) {
HttpSocket<SERVER> *httpSocket = static_cast<HttpSocket<SERVER> *>(socket);
httpSocket->setDerivative(HTTP_SOCKET_SERVER);
Group<SERVER>::from(httpSocket)->addHttpSocket(httpSocket);
Group<SERVER>::from(httpSocket)->add(Group<SERVER>::HTTPSOCKET, httpSocket);
Group<SERVER>::from(httpSocket)->httpConnectionHandler(httpSocket);
}, [](uS::Context *context) -> Socket * {
return new HttpSocket<SERVER>(context);
......
......@@ -251,9 +251,11 @@ void WebSocket<isServer>::close(int code, const char *message, size_t length) {
static const int MAX_CLOSE_PAYLOAD = 123;
length = std::min<size_t>(MAX_CLOSE_PAYLOAD, length);
Group<isServer>::from(this)->removeWebSocket(this);
Group<isServer>::from(this)->remove(Group<isServer>::WEBSOCKET, this);
Group<isServer>::from(this)->disconnectionHandler(this, code, (char *) message, length);
//setShuttingDown(true);
// this should be autoamtic
setShuttingDown(true);
// todo: using the shared timer in the group, we can skip creating a new timer per socket
// only this line and the one in Hub::connect uses the timeout feature
......@@ -266,6 +268,8 @@ void WebSocket<isServer>::close(int code, const char *message, size_t length) {
p->shutdown();
}
});
// shutdown();
}
template <bool isServer>
......@@ -273,13 +277,15 @@ void WebSocket<isServer>::onEnd(uS::Socket *s) {
WebSocket<isServer> *webSocket = static_cast<WebSocket<isServer> *>(s);
if (!webSocket->isShuttingDown()) {
Group<isServer>::from(webSocket)->removeWebSocket(webSocket);
Group<isServer>::from(webSocket)->remove(Group<isServer>::WEBSOCKET, webSocket);
Group<isServer>::from(webSocket)->disconnectionHandler(webSocket, 1006, nullptr, 0);
} else {
//webSocket->cancelTimeout();
}
//webSocket->template closeSocket<WebSocket<isServer>>();
webSocket->uS::Socket::close([](uS::Socket *socket) {
delete static_cast<WebSocket<isServer> *>(socket);
});
while (!webSocket->messageQueue.empty()) {
Queue::Message *message = webSocket->messageQueue.front();
......
......@@ -1007,7 +1007,7 @@
void serveHttp() {
uWS::Hub h;
std::string document = "Hello World!";//"<h2>Well hello there, this is a basic test!</h2>";
std::string document = "<h2>Well hello there, this is a basic test!</h2>";
h.onHttpRequest([&document](uWS::HttpResponse *res, uWS::HttpRequest req, char *data, size_t length, size_t remainingBytes) {
res->end(document.data(), document.length());
......
......@@ -23,9 +23,9 @@ HEADERS += ../src/WebSocketProtocol.h \
../uSockets/Epoll.h \
../src/uSockets.h
LIBS += -lssl -lcrypto -lz -lpthread -luv -lboost_system
LIBS += -lasan -lssl -lcrypto -lz -lpthread -luv -lboost_system
QMAKE_CXXFLAGS += -Wno-unused-parameter
QMAKE_CXXFLAGS += -fsanitize=address -Wno-unused-parameter
QMAKE_CXXFLAGS_RELEASE -= -O1
QMAKE_CXXFLAGS_RELEASE -= -O2
QMAKE_CXXFLAGS_RELEASE *= -O3 -g
......
Subproject commit 4f236b9064513bd44fffdc7cdcbd7d9a800d8412
Subproject commit b28c3c6317b4b9ffb2bf1c36268e4f20de74e9cf
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment