Commit 53d391e0 authored by Alex Hultman's avatar Alex Hultman

Fix transfer, listen(TRANSFERS), some comments, Group::from

parent cf67d5a3
......@@ -117,6 +117,16 @@ struct Poll {
this->cb = cb;
}
void (*getCb())(Poll *, int, int) {
return cb;
}
void reInit(Loop *loop, uv_os_sock_t fd) {
delete socket;
socket = new boost::asio::posix::stream_descriptor(*loop, fd);
socket->non_blocking(true);
}
void start(Loop *, Poll *self, int events) {
if (events & UV_READABLE) {
socket->async_read_some(boost::asio::null_buffers(), [self](boost::system::error_code ec, std::size_t) {
......@@ -155,6 +165,9 @@ struct Poll {
socket->cancel();
}
// this is not correct, but it works for now
// think about transfer - should allow one to not delete
// but in this case it doesn't matter at all
void close(Loop *loop, void (*cb)(Poll *)) {
socket->release();
socket->get_io_service().post([cb, this]() {
......
......@@ -2,6 +2,8 @@
#ifdef USE_EPOLL
// todo: remove this mutex, have callbacks set at program start
std::recursive_mutex cbMutex;
void (*callbacks[16])(Poll *, int, int);
int cbHead = 0;
......
......@@ -8,6 +8,7 @@
#include <chrono>
#include <algorithm>
#include <vector>
#include <mutex>
typedef int uv_os_sock_t;
static const int UV_READABLE = EPOLLIN;
......@@ -16,6 +17,7 @@ static const int UV_WRITABLE = EPOLLOUT;
struct Poll;
struct Timer;
extern std::recursive_mutex cbMutex;
extern void (*callbacks[16])(Poll *, int, int);
extern int cbHead;
......@@ -130,7 +132,9 @@ protected:
loop->numPolls++;
}
// todo: pre-set all of callbacks up front and remove mutex
void setCb(void (*cb)(Poll *p, int status, int events)) {
cbMutex.lock();
state.cbIndex = cbHead;
for (int i = 0; i < cbHead; i++) {
if (callbacks[i] == cb) {
......@@ -141,6 +145,16 @@ protected:
if (state.cbIndex == cbHead) {
callbacks[cbHead++] = cb;
}
cbMutex.unlock();
}
void (*getCb())(Poll *, int, int) {
return callbacks[state.cbIndex];
}
void reInit(Loop *loop, uv_os_sock_t fd) {
state.fd = fd;
loop->numPolls++;
}
void start(Loop *loop, Poll *self, int events) {
......
......@@ -9,10 +9,14 @@
namespace uWS {
enum ListenOptions {
TRANSFERS
};
struct Hub;
template <bool isServer>
struct WIN32_EXPORT Group : uS::NodeData {
struct WIN32_EXPORT Group : private uS::NodeData {
protected:
friend struct Hub;
friend struct WebSocket<isServer>;
......@@ -82,7 +86,19 @@ public:
void terminate();
void close(int code = 1000, char *message = nullptr, size_t length = 0);
void startAutoPing(int intervalMs, std::string userMessage = "");
using NodeData::addAsync;
// same as listen(TRANSFERS), backwards compatible API for now
void addAsync() {
if (!async) {
NodeData::addAsync();
}
}
void listen(ListenOptions listenOptions) {
if (listenOptions == TRANSFERS && !async) {
addAsync();
}
}
template <class F>
void forEach(const F &cb) {
......@@ -116,12 +132,11 @@ public:
}
iterators.pop();
}
};
template <bool isServer>
Group<isServer> *getGroup(uS::Socket *s) {
return static_cast<Group<isServer> *>(s->getNodeData());
}
static Group<isServer> *from(uS::Socket *s) {
return static_cast<Group<isServer> *>(s->getNodeData());
}
};
}
......
......@@ -61,10 +61,10 @@ uS::Socket *HttpSocket<isServer>::onData(uS::Socket *s, char *data, size_t lengt
if (httpSocket->contentLength) {
httpSocket->missedDeadline = false;
if (httpSocket->contentLength >= length) {
getGroup<isServer>(httpSocket)->httpDataHandler(httpSocket->outstandingResponsesTail, data, length, httpSocket->contentLength -= length);
Group<isServer>::from(httpSocket)->httpDataHandler(httpSocket->outstandingResponsesTail, data, length, httpSocket->contentLength -= length);
return httpSocket;
} else {
getGroup<isServer>(httpSocket)->httpDataHandler(httpSocket->outstandingResponsesTail, data, httpSocket->contentLength, 0);
Group<isServer>::from(httpSocket)->httpDataHandler(httpSocket->outstandingResponsesTail, data, httpSocket->contentLength, 0);
data += httpSocket->contentLength;
length -= httpSocket->contentLength;
httpSocket->contentLength = 0;
......@@ -96,8 +96,8 @@ uS::Socket *HttpSocket<isServer>::onData(uS::Socket *s, char *data, size_t lengt
headers->valueLength = std::max<int>(0, headers->valueLength - 9);
httpSocket->missedDeadline = false;
if (req.getHeader("upgrade", 7)) {
if (getGroup<SERVER>(httpSocket)->httpUpgradeHandler) {
getGroup<SERVER>(httpSocket)->httpUpgradeHandler((HttpSocket<SERVER> *) httpSocket, req);
if (Group<SERVER>::from(httpSocket)->httpUpgradeHandler) {
Group<SERVER>::from(httpSocket)->httpUpgradeHandler((HttpSocket<SERVER> *) httpSocket, req);
} else {
Header secKey = req.getHeader("sec-websocket-key", 17);
Header extensions = req.getHeader("sec-websocket-extensions", 24);
......@@ -106,16 +106,16 @@ uS::Socket *HttpSocket<isServer>::onData(uS::Socket *s, char *data, size_t lengt
bool perMessageDeflate;
httpSocket->upgrade(secKey.value, extensions.value, extensions.valueLength,
subprotocol.value, subprotocol.valueLength, &perMessageDeflate);
getGroup<isServer>(httpSocket)->removeHttpSocket(httpSocket);
Group<isServer>::from(httpSocket)->removeHttpSocket(httpSocket);
// Warning: changes socket, needs to inform the stack of Poll address change!
WebSocket<isServer> *webSocket = new WebSocket<isServer>(perMessageDeflate, httpSocket);
webSocket->template setState<WebSocket<isServer>>();
webSocket->change(webSocket->nodeData->loop, webSocket, webSocket->setPoll(UV_READABLE));
getGroup<isServer>(webSocket)->addWebSocket(webSocket);
Group<isServer>::from(webSocket)->addWebSocket(webSocket);
webSocket->cork(true);
getGroup<isServer>(webSocket)->connectionHandler(webSocket, req);
Group<isServer>::from(webSocket)->connectionHandler(webSocket, req);
// todo: should not uncork if closed!
webSocket->cork(false);
delete httpSocket;
......@@ -127,7 +127,7 @@ uS::Socket *HttpSocket<isServer>::onData(uS::Socket *s, char *data, size_t lengt
}
return httpSocket;
} else {
if (getGroup<SERVER>(httpSocket)->httpRequestHandler) {
if (Group<SERVER>::from(httpSocket)->httpRequestHandler) {
HttpResponse *res = HttpResponse::allocateResponse(httpSocket);
if (httpSocket->outstandingResponsesTail) {
......@@ -141,10 +141,10 @@ uS::Socket *HttpSocket<isServer>::onData(uS::Socket *s, char *data, size_t lengt
if (req.getMethod() != HttpMethod::METHOD_GET && (contentLength = req.getHeader("content-length", 14))) {
httpSocket->contentLength = atoi(contentLength.value);
size_t bytesToRead = std::min<int>(httpSocket->contentLength, end - cursor);
getGroup<SERVER>(httpSocket)->httpRequestHandler(res, req, cursor, bytesToRead, httpSocket->contentLength -= bytesToRead);
Group<SERVER>::from(httpSocket)->httpRequestHandler(res, req, cursor, bytesToRead, httpSocket->contentLength -= bytesToRead);
cursor += bytesToRead;
} else {
getGroup<SERVER>(httpSocket)->httpRequestHandler(res, req, nullptr, 0, 0);
Group<SERVER>::from(httpSocket)->httpRequestHandler(res, req, nullptr, 0, 0);
}
if (httpSocket->isClosed() || httpSocket->isShuttingDown()) {
......@@ -164,10 +164,10 @@ uS::Socket *HttpSocket<isServer>::onData(uS::Socket *s, char *data, size_t lengt
webSocket->setUserData(httpSocket->httpUser);
webSocket->template setState<WebSocket<isServer>>();
webSocket->change(webSocket->nodeData->loop, webSocket, webSocket->setPoll(UV_READABLE));
getGroup<isServer>(webSocket)->addWebSocket(webSocket);
Group<isServer>::from(webSocket)->addWebSocket(webSocket);
webSocket->cork(true);
getGroup<isServer>(webSocket)->connectionHandler(webSocket, req);
Group<isServer>::from(webSocket)->connectionHandler(webSocket, req);
if (!(webSocket->isClosed() || webSocket->isShuttingDown())) {
WebSocketProtocol<isServer, WebSocket<isServer>>::consume(cursor, end - cursor, webSocket);
}
......@@ -209,7 +209,7 @@ void HttpSocket<isServer>::upgrade(const char *secKey, const char *extensions, s
*perMessageDeflate = false;
std::string extensionsResponse;
if (extensionsLength) {
Group<isServer> *group = getGroup<isServer>(this);
Group<isServer> *group = Group<isServer>::from(this);
ExtensionsNegotiator<uWS::SERVER> extensionsNegotiator(group->extensionOptions);
extensionsNegotiator.readOffer(std::string(extensions, extensionsLength));
extensionsResponse = extensionsNegotiator.generateOffer();
......@@ -268,8 +268,8 @@ void HttpSocket<isServer>::onEnd(uS::Socket *s) {
if (!httpSocket->isShuttingDown()) {
if (isServer) {
getGroup<isServer>(httpSocket)->removeHttpSocket(httpSocket);
getGroup<isServer>(httpSocket)->httpDisconnectionHandler(httpSocket);
Group<isServer>::from(httpSocket)->removeHttpSocket(httpSocket);
Group<isServer>::from(httpSocket)->httpDisconnectionHandler(httpSocket);
}
} else {
httpSocket->cancelTimeout();
......@@ -286,7 +286,7 @@ void HttpSocket<isServer>::onEnd(uS::Socket *s) {
}
while (httpSocket->outstandingResponsesHead) {
getGroup<isServer>(httpSocket)->httpCancelledRequestHandler(httpSocket->outstandingResponsesHead);
Group<isServer>::from(httpSocket)->httpCancelledRequestHandler(httpSocket->outstandingResponsesHead);
HttpResponse *next = httpSocket->outstandingResponsesHead->next;
delete httpSocket->outstandingResponsesHead;
httpSocket->outstandingResponsesHead = next;
......@@ -298,7 +298,7 @@ void HttpSocket<isServer>::onEnd(uS::Socket *s) {
if (!isServer) {
httpSocket->cancelTimeout();
getGroup<CLIENT>(httpSocket)->errorHandler(httpSocket->httpUser);
Group<CLIENT>::from(httpSocket)->errorHandler(httpSocket->httpUser);
}
}
......
......@@ -5,8 +5,6 @@
#include <string>
// #include <experimental/string_view>
#include <iostream>
namespace uWS {
struct Header {
......
......@@ -49,8 +49,8 @@ void Hub::onServerAccept(uS::Socket *s) {
httpSocket->setState<HttpSocket<SERVER>>();
httpSocket->start(httpSocket->nodeData->loop, httpSocket, httpSocket->setPoll(UV_READABLE));
httpSocket->setNoDelay(true);
getGroup<SERVER>(httpSocket)->addHttpSocket(httpSocket);
getGroup<SERVER>(httpSocket)->httpConnectionHandler(httpSocket);
Group<SERVER>::from(httpSocket)->addHttpSocket(httpSocket);
Group<SERVER>::from(httpSocket)->httpConnectionHandler(httpSocket);
}
void Hub::onClientConnection(uS::Socket *s, bool error) {
......
......@@ -128,6 +128,16 @@ struct Poll {
this->cb = cb;
}
void (*getCb())(Poll *, int, int) {
return cb;
}
void reInit(Loop *loop, uv_os_sock_t fd) {
delete uv_poll;
uv_poll = new uv_poll_t;
uv_poll_init_socket(loop, uv_poll, fd);
}
void start(Loop *, Poll *self, int events) {
uv_poll->data = self;
uv_poll_start(uv_poll, events, [](uv_poll_t *p, int status, int events) {
......
......@@ -210,16 +210,8 @@ struct WIN32_EXPORT NodeData {
Async *async = nullptr;
pthread_t tid;
struct TransferData {
Poll *p;
uv_os_sock_t fd;
Socket *socketData;
void (*pollCb)(Poll *, int, int);
void (*cb)(Poll *);
};
std::recursive_mutex *asyncMutex;
std::vector<TransferData> transferQueue;
std::vector<Poll *> transferQueue;
std::vector<Poll *> changePollQueue;
static void asyncCallback(Async *async);
......
......@@ -8,15 +8,20 @@ void NodeData::asyncCallback(Async *async)
NodeData *nodeData = (NodeData *) async->getData();
nodeData->asyncMutex->lock();
for (TransferData transferData : nodeData->transferQueue) {
for (Poll *p : nodeData->transferQueue) {
Socket *s = (Socket *) p;
TransferData *transferData = (TransferData *) s->getUserData();
s->reInit(nodeData->loop, transferData->fd);
s->setCb(transferData->pollCb);
s->start(nodeData->loop, s, s->setPoll(transferData->pollEvents));
Socket *s = (Socket *) transferData.p;
s->nodeData = transferData->destination;
s->setUserData(transferData->userData);
auto *transferCb = transferData->transferCb;
//transferData.p->init(nodeData->loop, transferData.fd);
s->setCb(transferData.pollCb);
//transferData.p->start(transferData.socketData->nodeData->loop, transferData.socketData->getPoll());
transferData.socketData->nodeData = nodeData;
transferData.cb(transferData.p);
delete transferData;
transferCb(s);
}
for (Poll *p : nodeData->changePollQueue) {
......
......@@ -5,6 +5,23 @@
namespace uS {
struct TransferData {
// Connection state
uv_os_sock_t fd;
SSL *ssl;
// Poll state
void (*pollCb)(Poll *, int, int);
int pollEvents;
// User state
void *userData;
// Destination
NodeData *destination;
void (*transferCb)(Poll *);
};
// perfectly 64 bytes (4 + 60)
struct WIN32_EXPORT Socket : Poll {
protected:
......@@ -69,30 +86,23 @@ protected:
state.shuttingDown = shuttingDown;
}
// todo: needs to lock newLoop's numPolls when doing fastTransfer
void transfer(NodeData *nodeData, void (*cb)(Poll *)) {
//nodeData->asyncMutex->lock();
if (fastTransfer(this->nodeData->loop, nodeData->loop, getPoll())) {
//nodeData->asyncMutex->unlock();
this->nodeData = nodeData;
cb(this);
//return;
} else {
// // todo: libuv is not thread safe
// nodeData->asyncMutex->lock();
// //nodeData->transferQueue.push_back({new Poll, getFd(), socketData, getPollCallback(), cb});
// nodeData->asyncMutex->unlock();
// if (nodeData->tid != nodeData->tid) {
// nodeData->async->send();
// } else {
// NodeData::asyncCallback(nodeData->async);
// }
// stop(nodeData->loop);
// close(nodeData->loop);
}
//nodeData->asyncMutex->unlock();
// userData is invalid from now on till onTransfer
setUserData(new TransferData({getFd(), ssl, getCb(), getPoll(), getUserData(), nodeData, cb}));
stop(this->nodeData->loop);
close(this->nodeData->loop, [](Poll *p) {
Socket *s = (Socket *) p;
TransferData *transferData = (TransferData *) s->getUserData();
transferData->destination->asyncMutex->lock();
bool wasEmpty = transferData->destination->transferQueue.empty();
transferData->destination->transferQueue.push_back(s);
transferData->destination->asyncMutex->unlock();
if (wasEmpty) {
transferData->destination->async->send();
}
});
}
void changePoll(Socket *socket) {
......@@ -403,7 +413,8 @@ protected:
public:
Socket(NodeData *nodeData, Loop *loop, uv_os_sock_t fd, SSL *ssl) : Poll(loop, fd), ssl(ssl), nodeData(nodeData) {
if (ssl) {
SSL_set_fd(ssl, fd);
// OpenSSL treats SOCKETs as int
SSL_set_fd(ssl, (int) fd);
SSL_set_mode(ssl, SSL_MODE_RELEASE_BUFFERS);
}
}
......
......@@ -4,6 +4,15 @@
namespace uWS {
/*
* Frames and sends a WebSocket message.
*
* Hints: Consider using any of the prepare function if any of their
* use cases match what you are trying to achieve (pub/sub, broadcast)
*
* Thread safe
*
*/
template <bool isServer>
void WebSocket<isServer>::send(const char *message, size_t length, OpCode opCode, void(*callback)(WebSocket<isServer> *webSocket, void *data, bool cancelled, void *reserved), void *callbackData) {
......@@ -36,6 +45,15 @@ void WebSocket<isServer>::send(const char *message, size_t length, OpCode opCode
sendTransformed<WebSocketTransformer>((char *) message, length, (void(*)(void *, void *, bool, void *)) callback, callbackData, transformData);
}
/*
* Prepares a single message for use with sendPrepared.
*
* Hints: Useful in cases where you need to send the same message to many
* recipients. Do not use when only sending one message.
*
* Thread safe
*
*/
template <bool isServer>
typename WebSocket<isServer>::PreparedMessage *WebSocket<isServer>::prepareMessage(char *data, size_t length, OpCode opCode, bool compressed, void(*callback)(WebSocket<isServer> *webSocket, void *data, bool cancelled, void *reserved)) {
PreparedMessage *preparedMessage = new PreparedMessage;
......@@ -46,6 +64,15 @@ typename WebSocket<isServer>::PreparedMessage *WebSocket<isServer>::prepareMessa
return preparedMessage;
}
/*
* Prepares a batch of messages to send as one single TCP packet / syscall.
*
* Hints: Useful when doing pub/sub-like broadcasts where many recipients should receive many
* messages. Do not use if only sending one message.
*
* Thread safe
*
*/
template <bool isServer>
typename WebSocket<isServer>::PreparedMessage *WebSocket<isServer>::prepareMessageBatch(std::vector<std::string> &messages, std::vector<int> &excludedMessages, OpCode opCode, bool compressed, void (*callback)(WebSocket<isServer> *, void *, bool, void *))
{
......@@ -68,9 +95,21 @@ typename WebSocket<isServer>::PreparedMessage *WebSocket<isServer>::prepareMessa
return preparedMessage;
}
// todo: see if this can be made a transformer instead
/*
* Sends a prepared message.
*
* Hints: Used to improve broadcasting and similar use cases where the same
* message is sent to multiple recipients. Do not used if only sending one message
* in total.
*
* Warning: Modifies passed PreparedMessage and is thus not thread safe. Other
* data is also modified and it makes sense to not make this function thread-safe
* since it is a central part in broadcasting and other high-perf code paths.
*
*/
template <bool isServer>
void WebSocket<isServer>::sendPrepared(typename WebSocket<isServer>::PreparedMessage *preparedMessage, void *callbackData) {
// todo: see if this can be made a transformer instead
preparedMessage->references++;
void (*callback)(void *webSocket, void *userData, bool cancelled, void *reserved) = [](void *webSocket, void *userData, bool cancelled, void *reserved) {
PreparedMessage *preparedMessage = (PreparedMessage *) userData;
......@@ -114,6 +153,15 @@ void WebSocket<isServer>::sendPrepared(typename WebSocket<isServer>::PreparedMes
}
}
/*
* Decrements the reference count of passed PreparedMessage. On zero references
* the memory will be deleted.
*
* Hints: Used together with prepareMessage, prepareMessageBatch and similar calls.
*
* Warning: Will modify passed PrepareMessage and is thus not thread safe by itself.
*
*/
template <bool isServer>
void WebSocket<isServer>::finalizeMessage(typename WebSocket<isServer>::PreparedMessage *preparedMessage) {
if (!--preparedMessage->references) {
......@@ -138,6 +186,12 @@ uS::Socket *WebSocket<isServer>::onData(uS::Socket *s, char *data, size_t length
return webSocket;
}
/*
* Immediately terminates this WebSocket. Will call onDisconnection of its Group.
*
* Hints: Close code will be 1006 and message will be empty.
*
*/
template <bool isServer>
void WebSocket<isServer>::terminate() {
......@@ -151,6 +205,45 @@ void WebSocket<isServer>::terminate() {
WebSocket<isServer>::onEnd(this);
}
/*
* Transfers this WebSocket from its current Group to specified Group.
*
* Receiving Group has to have called listen(uWS::TRANSFERS) prior.
*
* Hints: Useful to implement subprotocols on the same thread and Loop
* or to transfer WebSockets between threads at any point (dynamic load balancing).
*
* Warning: From the point of call to the point of onTransfer, this WebSocket
* is invalid and cannot be used. What you put in is not guaranteed to be what you
* get in onTransfer, the only guaranteed consistency is passed userData is the userData
* of given WebSocket in onTransfer. Use setUserData and getUserData to identify the WebSocket.
*/
template <bool isServer>
void WebSocket<isServer>::transfer(Group<isServer> *group) {
Group<isServer>::from(this)->removeWebSocket(this);
if (group->loop == Group<isServer>::from(this)->loop) {
// fast path
this->nodeData = group;
Group<isServer>::from(this)->addWebSocket(this);
Group<isServer>::from(this)->transferHandler(this);
} else {
// slow path
uS::Socket::transfer((uS::NodeData *) group, [](Poll *p) {
WebSocket<isServer> *webSocket = (WebSocket<isServer> *) p;
Group<isServer>::from(webSocket)->addWebSocket(webSocket);
Group<isServer>::from(webSocket)->transferHandler(webSocket);
});
}
}
/*
* Immediately calls onDisconnection of its Group and begins a passive
* WebSocket closedown handshake in the background (might succeed or not,
* we don't care).
*
* Hints: Close code and message will be what you pass yourself.
*
*/
template <bool isServer>
void WebSocket<isServer>::close(int code, const char *message, size_t length) {
......@@ -158,8 +251,8 @@ void WebSocket<isServer>::close(int code, const char *message, size_t length) {
static const int MAX_CLOSE_PAYLOAD = 123;
length = std::min<size_t>(MAX_CLOSE_PAYLOAD, length);
getGroup<isServer>(this)->removeWebSocket(this);
getGroup<isServer>(this)->disconnectionHandler(this, code, (char *) message, length);
Group<isServer>::from(this)->removeWebSocket(this);
Group<isServer>::from(this)->disconnectionHandler(this, code, (char *) message, length);
setShuttingDown(true);
// todo: using the shared timer in the group, we can skip creating a new timer per socket
......@@ -180,8 +273,8 @@ void WebSocket<isServer>::onEnd(uS::Socket *s) {
WebSocket<isServer> *webSocket = static_cast<WebSocket<isServer> *>(s);
if (!webSocket->isShuttingDown()) {
getGroup<isServer>(webSocket)->removeWebSocket(webSocket);
getGroup<isServer>(webSocket)->disconnectionHandler(webSocket, 1006, nullptr, 0);
Group<isServer>::from(webSocket)->removeWebSocket(webSocket);
Group<isServer>::from(webSocket)->disconnectionHandler(webSocket, 1006, nullptr, 0);
} else {
webSocket->cancelTimeout();
}
......
......@@ -65,14 +65,7 @@ public:
void sendPrepared(PreparedMessage *preparedMessage, void *callbackData = nullptr);
static void finalizeMessage(PreparedMessage *preparedMessage);
void close(int code = 1000, const char *message = nullptr, size_t length = 0);
void transfer(Group<isServer> *group) {
((Group<isServer> *) nodeData)->removeWebSocket(this);
uS::Socket::transfer((uS::NodeData *) group, [](Poll *p) {
WebSocket<isServer> *webSocket = (WebSocket<isServer> *) p;
((Group<isServer> *) webSocket->nodeData)->addWebSocket(webSocket);
((Group<isServer> *) webSocket->nodeData)->transferHandler(webSocket);
});
}
void transfer(Group<isServer> *group);
// Thread safe
void terminate();
......
......@@ -472,7 +472,7 @@ void testReusePort() {
delete group2;
}
void testMultithreading() {
void testTransfers() {
for (int ssl = 0; ssl < 2; ssl++) {
uWS::Group<uWS::SERVER> *tServerGroup = nullptr;
uWS::Group<uWS::CLIENT> *clientGroup = nullptr;
......@@ -486,7 +486,23 @@ void testMultithreading() {
uWS::Hub th;
tServerGroup = &th.getDefaultGroup<uWS::SERVER>();
th.onMessage([&tServerGroup, &client, &receivedMessages, &clientGroup, &m](uWS::WebSocket<uWS::SERVER> *ws, char *message, size_t length, uWS::OpCode opCode) {
bool transferred = false;
th.onTransfer([&transferred](uWS::WebSocket<uWS::SERVER> *ws) {
if (ws->getUserData() != (void *) 12345) {
std::cout << "onTransfer called with websocket with invalid user data set!" << std::endl;
exit(-1);
}
transferred = true;
});
th.onMessage([&tServerGroup, &client, &receivedMessages, &clientGroup, &m, &transferred](uWS::WebSocket<uWS::SERVER> *ws, char *message, size_t length, uWS::OpCode opCode) {
if (!transferred) {
std::cout << "FAILURE: onTransfer was not triggered in time" << std::endl;
exit(-1);
}
switch(++receivedMessages) {
case 1:
m.lock();
......@@ -518,7 +534,7 @@ void testMultithreading() {
}
});
th.getDefaultGroup<uWS::SERVER>().addAsync();
th.getDefaultGroup<uWS::SERVER>().listen(uWS::TRANSFERS);
th.run();
});
......@@ -529,9 +545,10 @@ void testMultithreading() {
clientGroup = &h.getDefaultGroup<uWS::CLIENT>();
clientGroup->addAsync();
clientGroup->listen(uWS::TRANSFERS);
h.onConnection([&tServerGroup](uWS::WebSocket<uWS::SERVER> *ws, uWS::HttpRequest req) {
ws->setUserData((void *) 12345);
ws->transfer(tServerGroup);
});
......@@ -1148,10 +1165,10 @@ int main(int argc, char *argv[])
testMessageBatch();
testAutoPing();
testConnections();
testTransfers();
// These are not working yet / not tested
#ifndef __APPLE__
testMultithreading();
// Linux-only feature
#ifdef __linux__
testReusePort();
#endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment