Commit 4ab13b07 authored by Alex Hultman's avatar Alex Hultman

Initial optional thread-safety & improved encapsulation throughout

parent 2f7a8df7
......@@ -4,6 +4,7 @@ default:
$(CXX) -std=c++11 -O3 scalability_socketio.cpp -s -o scalability_socketio -lpthread
$(CXX) -std=c++11 -O3 throughput_socketio.cpp -s -o throughput_socketio -luv
$(CXX) -std=c++11 -O3 -I ../src ../src/Extensions.cpp ../src/Group.cpp ../src/WebSocketImpl.cpp ../src/Networking.cpp ../src/Hub.cpp ../src/Node.cpp ../src/WebSocket.cpp ../src/HTTPSocket.cpp ../src/Socket.cpp ../src/Epoll.cpp uWS.cpp -o uWS_epoll -lcrypto -lssl -lz -s
$(CXX) -DUWS_THREADSAFE -std=c++11 -O3 -I ../src ../src/Extensions.cpp ../src/Group.cpp ../src/WebSocketImpl.cpp ../src/Networking.cpp ../src/Hub.cpp ../src/Node.cpp ../src/WebSocket.cpp ../src/HTTPSocket.cpp ../src/Socket.cpp ../src/Epoll.cpp uWS.cpp -o uWS_threadsafe -lcrypto -lssl -lz -s
$(CXX) -DUSE_LIBUV -std=c++11 -O3 -I ../src ../src/Extensions.cpp ../src/Group.cpp ../src/WebSocketImpl.cpp ../src/Networking.cpp ../src/Hub.cpp ../src/Node.cpp ../src/WebSocket.cpp ../src/HTTPSocket.cpp ../src/Socket.cpp ../src/Epoll.cpp uWS.cpp -o uWS_libuv -luv -lcrypto -lssl -lz -s
$(CXX) -DUSE_ASIO -std=c++11 -O3 -I ../src ../src/Extensions.cpp ../src/Group.cpp ../src/WebSocketImpl.cpp ../src/Networking.cpp ../src/Hub.cpp ../src/Node.cpp ../src/WebSocket.cpp ../src/HTTPSocket.cpp ../src/Socket.cpp ../src/Epoll.cpp uWS.cpp -o uWS_asio -lboost_system -lcrypto -lssl -lz -s
$(CXX) -std=c++11 -O3 wsPP.cpp -s -o wsPP -lpthread -lboost_system -lboost_random -lssl -lcrypto
......
......@@ -22,6 +22,11 @@ void Loop::run() {
int numFdReady = epoll_wait(epfd, readyEvents, 1024, delay);
timepoint = std::chrono::system_clock::now();
if (preCb) {
preCb(preCbData);
}
for (int i = 0; i < numFdReady; i++) {
Poll *poll = (Poll *) readyEvents[i].data.ptr;
int status = -bool(readyEvents[i].events & EPOLLERR);
......@@ -44,6 +49,10 @@ void Loop::run() {
timer->start(cb, repeat, repeat);
}
}
if (postCb) {
postCb(postCbData);
}
}
}
#endif
......@@ -36,6 +36,10 @@ struct Loop {
std::vector<Timepoint> timers;
std::vector<std::pair<Poll *, void (*)(Poll *)>> closing;
void (*preCb)(void *) = nullptr;
void (*postCb)(void *) = nullptr;
void *preCbData, *postCbData;
Loop(bool defaultLoop) {
epfd = epoll_create1(EPOLL_CLOEXEC);
timepoint = std::chrono::system_clock::now();
......@@ -114,6 +118,7 @@ struct Timer {
// 4 bytes
struct Poll {
protected:
struct {
int fd : 28;
unsigned int cbIndex : 4;
......@@ -125,14 +130,6 @@ struct Poll {
loop->numPolls++;
}
bool isClosed() {
return state.fd == -1;
}
uv_os_sock_t getFd() {
return state.fd;
}
void setCb(void (*cb)(Poll *p, int status, int events)) {
state.cbIndex = cbHead;
for (int i = 0; i < cbHead; i++) {
......@@ -183,6 +180,17 @@ struct Poll {
state.fd = -1;
loop->closing.push_back({this, cb});
}
public:
bool isClosed() {
return state.fd == -1;
}
uv_os_sock_t getFd() {
return state.fd;
}
friend struct Loop;
};
// this should be put in the Loop as a general "post" function always available
......@@ -199,7 +207,7 @@ struct Async : Poll {
this->cb = cb;
Poll::setCb([](Poll *p, int, int) {
uint64_t val;
if (::read(p->state.fd, &val, 8) == 8) {
if (::read(((Async *) p)->state.fd, &val, 8) == 8) {
((Async *) p)->cb((Async *) p);
}
});
......
......@@ -225,6 +225,11 @@ void Group<isServer>::onHttpUpgrade(std::function<void(HttpSocket<isServer> *, H
template <bool isServer>
void Group<isServer>::broadcast(const char *message, size_t length, OpCode opCode) {
#ifdef UWS_THREADSAFE
std::lock_guard<std::recursive_mutex> lockGuard(*asyncMutex);
#endif
typename WebSocket<isServer>::PreparedMessage *preparedMessage = WebSocket<isServer>::prepareMessage((char *) message, length, opCode, false);
forEach([preparedMessage](uWS::WebSocket<isServer> *ws) {
ws->sendPrepared(preparedMessage);
......
......@@ -13,7 +13,13 @@ struct Hub;
template <bool isServer>
struct WIN32_EXPORT Group : uS::NodeData {
protected:
friend struct Hub;
friend struct WebSocket<isServer>;
friend struct WebSocketProtocol<isServer>;
friend struct HttpSocket<false>;
friend struct HttpSocket<true>;
std::function<void(WebSocket<isServer> *, HttpRequest)> connectionHandler;
std::function<void(WebSocket<isServer> *)> transferHandler;
std::function<void(WebSocket<isServer> *, char *message, size_t length, OpCode opCode)> messageHandler;
......@@ -40,7 +46,6 @@ struct WIN32_EXPORT Group : uS::NodeData {
void *userData = nullptr;
void setUserData(void *user);
void *getUserData();
void startAutoPing(int intervalMs, std::string userMessage = "");
static void timerCallback(Timer *timer);
WebSocket<isServer> *webSocketHead = nullptr;
......@@ -53,7 +58,6 @@ struct WIN32_EXPORT Group : uS::NodeData {
void addHttpSocket(HttpSocket<isServer> *httpSocket);
void removeHttpSocket(HttpSocket<isServer> *httpSocket);
protected:
Group(int extensionOptions, Hub *hub, uS::NodeData *nodeData);
void stopListening();
......@@ -72,12 +76,15 @@ public:
void onCancelledHttpRequest(std::function<void(HttpResponse *)> handler);
void onHttpUpgrade(std::function<void(HttpSocket<isServer> *, HttpRequest)> handler);
// Thread safe
void broadcast(const char *message, size_t length, OpCode opCode);
// Not thread safe
void terminate();
void close(int code = 1000, char *message = nullptr, size_t length = 0);
void startAutoPing(int intervalMs, std::string userMessage = "");
using NodeData::addAsync;
// todo: handle nested forEachs with removeWebSocket
template <class F>
void forEach(const F &cb) {
Poll *iterator = webSocketHead;
......@@ -114,7 +121,7 @@ public:
template <bool isServer>
Group<isServer> *getGroup(uS::Socket *s) {
return static_cast<Group<isServer> *>(s->nodeData);
return static_cast<Group<isServer> *>(s->getNodeData());
}
}
......
......@@ -109,13 +109,13 @@ uS::Socket *HttpSocket<isServer>::onData(uS::Socket *s, char *data, size_t lengt
getGroup<isServer>(httpSocket)->removeHttpSocket(httpSocket);
// Warning: changes socket, needs to inform the stack of Poll address change!
WebSocket<SERVER> *webSocket = new WebSocket<SERVER>(perMessageDeflate, httpSocket);
webSocket->setState<WebSocket<SERVER>>();
WebSocket<isServer> *webSocket = new WebSocket<isServer>(perMessageDeflate, httpSocket);
webSocket->setState<WebSocket<isServer>>();
webSocket->change(webSocket->nodeData->loop, webSocket, webSocket->setPoll(UV_READABLE));
getGroup<SERVER>(webSocket)->addWebSocket(webSocket);
getGroup<isServer>(webSocket)->addWebSocket(webSocket);
webSocket->cork(true);
getGroup<SERVER>(webSocket)->connectionHandler(webSocket, req);
getGroup<isServer>(webSocket)->connectionHandler(webSocket, req);
// todo: should not uncork if closed!
webSocket->cork(false);
delete httpSocket;
......@@ -159,15 +159,15 @@ uS::Socket *HttpSocket<isServer>::onData(uS::Socket *s, char *data, size_t lengt
if (req.getHeader("upgrade", 7)) {
// Warning: changes socket, needs to inform the stack of Poll address change!
WebSocket<CLIENT> *webSocket = new WebSocket<CLIENT>(false, httpSocket);
WebSocket<isServer> *webSocket = new WebSocket<isServer>(false, httpSocket);
httpSocket->cancelTimeout();
webSocket->setUserData(httpSocket->httpUser);
webSocket->setState<WebSocket<CLIENT>>();
webSocket->setState<WebSocket<isServer>>();
webSocket->change(webSocket->nodeData->loop, webSocket, webSocket->setPoll(UV_READABLE));
getGroup<CLIENT>(webSocket)->addWebSocket(webSocket);
getGroup<isServer>(webSocket)->addWebSocket(webSocket);
webSocket->cork(true);
getGroup<CLIENT>(webSocket)->connectionHandler(webSocket, req);
getGroup<isServer>(webSocket)->connectionHandler(webSocket, req);
if (!(webSocket->isClosed() || webSocket->isShuttingDown())) {
webSocket->consume(cursor, end - cursor, webSocket);
}
......
......@@ -120,14 +120,6 @@ struct WIN32_EXPORT HttpSocket : uS::Socket {
HttpSocket(uS::Socket *socket, bool areYouSure) : uS::Socket(std::move(*socket)) {}
using uS::Socket::getUserData;
using uS::Socket::setUserData;
using uS::Socket::getAddress;
using uS::Socket::Address;
using uS::Socket::shutdown;
using uS::Socket::close;
void terminate() {
onEnd(this);
}
......
......@@ -11,17 +11,7 @@
namespace uWS {
struct WIN32_EXPORT Hub : private uS::Node, public Group<SERVER>, public Group<CLIENT> {
template <bool isServer>
Group<isServer> *createGroup(int extensionOptions = 0) {
return new Group<isServer>(extensionOptions, this, nodeData);
}
template <bool isServer>
Group<isServer> &getDefaultGroup() {
return (Group<isServer> &) *this;
}
protected:
struct ConnectionData {
std::string path;
void *user;
......@@ -37,6 +27,17 @@ struct WIN32_EXPORT Hub : private uS::Node, public Group<SERVER>, public Group<C
static void onServerAccept(uS::Socket *s);
static void onClientConnection(uS::Socket *s, bool error);
public:
template <bool isServer>
Group<isServer> *createGroup(int extensionOptions = 0) {
return new Group<isServer>(extensionOptions, this, nodeData);
}
template <bool isServer>
Group<isServer> &getDefaultGroup() {
return (Group<isServer> &) *this;
}
bool listen(int port, uS::TLS::Context sslContext = nullptr, int options = 0, Group<SERVER> *eh = nullptr);
bool listen(const char *host, int port, uS::TLS::Context sslContext = nullptr, int options = 0, Group<SERVER> *eh = nullptr);
void connect(std::string uri, void *user = nullptr, std::map<std::string, std::string> extraHeaders = {}, int timeoutMs = 5000, Group<CLIENT> *eh = nullptr);
......@@ -46,6 +47,18 @@ struct WIN32_EXPORT Hub : private uS::Node, public Group<SERVER>, public Group<C
Group<SERVER>(extensionOptions, this, nodeData), Group<CLIENT>(0, this, nodeData) {
inflateInit2(&inflationStream, -15);
inflationBuffer = new char[LARGE_BUFFER_SIZE];
#ifdef UWS_THREADSAFE
getLoop()->preCbData = nodeData;
getLoop()->preCb = [](void *nodeData) {
((uS::NodeData *) nodeData)->asyncMutex->lock();
};
getLoop()->postCbData = nodeData;
getLoop()->postCb = [](void *nodeData) {
((uS::NodeData *) nodeData)->asyncMutex->unlock();
};
#endif
}
~Hub() {
......@@ -74,6 +87,9 @@ struct WIN32_EXPORT Hub : private uS::Node, public Group<SERVER>, public Group<C
using Group<SERVER>::onHttpDisconnection;
using Group<SERVER>::onHttpUpgrade;
using Group<SERVER>::onCancelledHttpRequest;
friend struct WebSocketProtocol<true>;
friend struct WebSocketProtocol<false>;
};
}
......
......@@ -218,13 +218,7 @@ struct WIN32_EXPORT NodeData {
void (*cb)(Poll *);
};
void addAsync() {
async = new Async(loop);
async->setData(this);
async->start(NodeData::asyncCallback);
}
std::mutex *asyncMutex;
std::recursive_mutex *asyncMutex;
std::vector<TransferData> transferQueue;
std::vector<Poll *> changePollQueue;
static void asyncCallback(Async *async);
......@@ -250,6 +244,13 @@ struct WIN32_EXPORT NodeData {
delete [] memory;
}
}
public:
void addAsync() {
async = new Async(loop);
async->setData(this);
async->start(NodeData::asyncCallback);
}
};
}
......
......@@ -2,22 +2,26 @@
namespace uS {
// this should be Node
void NodeData::asyncCallback(Async *async)
{
NodeData *nodeData = (NodeData *) async->getData();
nodeData->asyncMutex->lock();
for (TransferData transferData : nodeData->transferQueue) {
Socket *s = (Socket *) transferData.p;
//transferData.p->init(nodeData->loop, transferData.fd);
transferData.p->setCb(transferData.pollCb);
s->setCb(transferData.pollCb);
//transferData.p->start(transferData.socketData->nodeData->loop, transferData.socketData->getPoll());
transferData.socketData->nodeData = nodeData;
transferData.cb(transferData.p);
}
for (Poll *p : nodeData->changePollQueue) {
Socket *socketData = (Socket *) p;
p->change(socketData->nodeData->loop, socketData, socketData->getPoll());
Socket *s = (Socket *) p;
s->change(s->nodeData->loop, s, s->getPoll());
}
nodeData->changePollQueue.clear();
......
......@@ -13,60 +13,12 @@ enum ListenOptions : int {
};
class WIN32_EXPORT Node {
protected:
Loop *loop;
NodeData *nodeData;
std::mutex asyncMutex;
public:
Node(int recvLength = 1024, int prePadding = 0, int postPadding = 0, bool useDefaultLoop = false);
~Node();
void run();
Loop *getLoop() {
return loop;
}
private:
template <void C(Socket *p, bool error)>
static void connect_cb(Poll *p, int status, int events) {
C((Socket *) p, status < 0);
}
template <uS::Socket *I(Socket *s), void C(Socket *p, bool error)>
Socket *connect(const char *hostname, int port, bool secure, NodeData *nodeData) {
Context *netContext = nodeData->netContext;
addrinfo hints, *result;
memset(&hints, 0, sizeof(addrinfo));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
if (getaddrinfo(hostname, std::to_string(port).c_str(), &hints, &result) != 0) {
return nullptr;
}
uv_os_sock_t fd = netContext->createSocket(result->ai_family, result->ai_socktype, result->ai_protocol);
if (fd == INVALID_SOCKET) {
return nullptr;
}
::connect(fd, result->ai_addr, result->ai_addrlen);
freeaddrinfo(result);
SSL *ssl = nullptr;
if (secure) {
ssl = SSL_new(nodeData->clientContext);
SSL_set_connect_state(ssl);
SSL_set_tlsext_host_name(ssl, hostname);
}
Socket initialSocket(nodeData, getLoop(), fd, ssl);
uS::Socket *socket = I(&initialSocket);
socket->setCb(connect_cb<C>);
socket->start(loop, socket, socket->setPoll(UV_WRITABLE));
return socket;
}
template <void A(Socket *s)>
static void accept_poll_cb(Poll *p, int status, int events) {
ListenSocket *listenData = (ListenSocket *) p;
......@@ -119,6 +71,55 @@ public:
} while ((clientFd = netContext->acceptSocket(serverFd)) != INVALID_SOCKET);
}
protected:
Loop *loop;
NodeData *nodeData;
std::recursive_mutex asyncMutex;
public:
Node(int recvLength = 1024, int prePadding = 0, int postPadding = 0, bool useDefaultLoop = false);
~Node();
void run();
Loop *getLoop() {
return loop;
}
template <uS::Socket *I(Socket *s), void C(Socket *p, bool error)>
Socket *connect(const char *hostname, int port, bool secure, NodeData *nodeData) {
Context *netContext = nodeData->netContext;
addrinfo hints, *result;
memset(&hints, 0, sizeof(addrinfo));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
if (getaddrinfo(hostname, std::to_string(port).c_str(), &hints, &result) != 0) {
return nullptr;
}
uv_os_sock_t fd = netContext->createSocket(result->ai_family, result->ai_socktype, result->ai_protocol);
if (fd == INVALID_SOCKET) {
return nullptr;
}
::connect(fd, result->ai_addr, result->ai_addrlen);
freeaddrinfo(result);
SSL *ssl = nullptr;
if (secure) {
ssl = SSL_new(nodeData->clientContext);
SSL_set_connect_state(ssl);
SSL_set_tlsext_host_name(ssl, hostname);
}
Socket initialSocket(nodeData, getLoop(), fd, ssl);
uS::Socket *socket = I(&initialSocket);
socket->setCb(connect_cb<C>);
socket->start(loop, socket, socket->setPoll(UV_WRITABLE));
return socket;
}
// todo: hostname, backlog
template <void A(Socket *s)>
bool listen(const char *host, int port, uS::TLS::Context sslContext, int options, uS::NodeData *nodeData, void *user) {
......
......@@ -7,22 +7,15 @@ namespace uS {
// perfectly 64 bytes (4 + 60)
struct WIN32_EXPORT Socket : Poll {
protected:
struct {
int poll : 4;
int shuttingDown : 4;
} state = {0, false};
NodeData *nodeData;
SSL *ssl;
void *user = nullptr;
Socket(NodeData *nodeData, Loop *loop, uv_os_sock_t fd, SSL *ssl) : Poll(loop, fd), nodeData(nodeData) {
if (ssl) {
SSL_set_fd(ssl, fd);
SSL_set_mode(ssl, SSL_MODE_RELEASE_BUFFERS);
}
this->ssl = ssl;
}
NodeData *nodeData;
// this is not needed by HttpSocket!
struct Queue {
......@@ -63,8 +56,6 @@ struct WIN32_EXPORT Socket : Poll {
}
} messageQueue;
Poll *next = nullptr, *prev = nullptr;
int getPoll() {
return state.poll;
}
......@@ -74,10 +65,6 @@ struct WIN32_EXPORT Socket : Poll {
return poll;
}
bool isShuttingDown() {
return state.shuttingDown;
}
void setShuttingDown(bool shuttingDown) {
state.shuttingDown = shuttingDown;
}
......@@ -121,49 +108,6 @@ struct WIN32_EXPORT Socket : Poll {
}
}
void *getUserData() {
return user;
}
void setUserData(void *user) {
this->user = user;
}
struct Address {
unsigned int port;
const char *address;
const char *family;
};
Address getAddress();
void cork(int enable) {
#if defined(TCP_CORK)
// Linux & SmartOS have proper TCP_CORK
setsockopt(getFd(), IPPROTO_TCP, TCP_CORK, &enable, sizeof(int));
#elif defined(TCP_NOPUSH)
// Mac OS X & FreeBSD have TCP_NOPUSH
setsockopt(getFd(), IPPROTO_TCP, TCP_NOPUSH, &enable, sizeof(int));
if (!enable) {
// Tested on OS X, FreeBSD situation is unclear
::send(getFd(), "", 0, MSG_NOSIGNAL);
}
#endif
}
void setNoDelay(int enable) {
setsockopt(getFd(), IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(int));
}
void shutdown() {
if (ssl) {
//todo: poll in/out - have the io_cb recall shutdown if failed
SSL_shutdown(ssl);
} else {
::shutdown(getFd(), SHUT_WR);
}
}
// clears user data!
template <void onTimeout(Socket *)>
void startTimeout(int timeoutMs = 15000) {
......@@ -208,7 +152,7 @@ struct WIN32_EXPORT Socket : Poll {
socket->messageQueue.pop();
if (socket->messageQueue.empty()) {
if ((socket->state.poll & UV_WRITABLE) && SSL_want(socket->ssl) != SSL_WRITING) {
p->change(socket->nodeData->loop, socket, socket->setPoll(UV_READABLE));
socket->change(socket->nodeData->loop, socket, socket->setPoll(UV_READABLE));
}
break;
}
......@@ -218,7 +162,7 @@ struct WIN32_EXPORT Socket : Poll {
break;
case SSL_ERROR_WANT_WRITE:
if ((socket->getPoll() & UV_WRITABLE) == 0) {
p->change(socket->nodeData->loop, socket, socket->setPoll(socket->getPoll() | UV_WRITABLE));
socket->change(socket->nodeData->loop, socket, socket->setPoll(socket->getPoll() | UV_WRITABLE));
}
break;
default:
......@@ -240,7 +184,7 @@ struct WIN32_EXPORT Socket : Poll {
break;
case SSL_ERROR_WANT_WRITE:
if ((socket->getPoll() & UV_WRITABLE) == 0) {
p->change(socket->nodeData->loop, socket, socket->setPoll(socket->getPoll() | UV_WRITABLE));
socket->change(socket->nodeData->loop, socket, socket->setPoll(socket->getPoll() | UV_WRITABLE));
}
break;
default:
......@@ -283,7 +227,7 @@ struct WIN32_EXPORT Socket : Poll {
socket->messageQueue.pop();
if (socket->messageQueue.empty()) {
// todo, remove bit, don't set directly
p->change(socket->nodeData->loop, socket, socket->setPoll(UV_READABLE));
socket->change(socket->nodeData->loop, socket, socket->setPoll(UV_READABLE));
break;
}
} else if (sent == SOCKET_ERROR) {
......@@ -322,22 +266,6 @@ struct WIN32_EXPORT Socket : Poll {
}
}
template <class T>
void closeSocket() {
uv_os_sock_t fd = getFd();
Context *netContext = nodeData->netContext;
stop(nodeData->loop);
netContext->closeSocket(fd);
if (ssl) {
SSL_free(ssl);
}
Poll::close(nodeData->loop, [](Poll *p) {
delete (T *) p;
});
}
bool hasEmptyQueue() {
return messageQueue.empty();
}
......@@ -471,6 +399,86 @@ struct WIN32_EXPORT Socket : Poll {
enqueue(messagePtr);
}
}
public:
Socket(NodeData *nodeData, Loop *loop, uv_os_sock_t fd, SSL *ssl) : Poll(loop, fd), ssl(ssl), nodeData(nodeData) {
if (ssl) {
SSL_set_fd(ssl, fd);
SSL_set_mode(ssl, SSL_MODE_RELEASE_BUFFERS);
}
}
NodeData *getNodeData() {
return nodeData;
}
Poll *next = nullptr, *prev = nullptr;
void *getUserData() {
return user;
}
void setUserData(void *user) {
this->user = user;
}
struct Address {
unsigned int port;
const char *address;
const char *family;
};
Address getAddress();
void setNoDelay(int enable) {
setsockopt(getFd(), IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(int));
}
void cork(int enable) {
#if defined(TCP_CORK)
// Linux & SmartOS have proper TCP_CORK
setsockopt(getFd(), IPPROTO_TCP, TCP_CORK, &enable, sizeof(int));
#elif defined(TCP_NOPUSH)
// Mac OS X & FreeBSD have TCP_NOPUSH
setsockopt(getFd(), IPPROTO_TCP, TCP_NOPUSH, &enable, sizeof(int));
if (!enable) {
// Tested on OS X, FreeBSD situation is unclear
::send(getFd(), "", 0, MSG_NOSIGNAL);
}
#endif
}
void shutdown() {
if (ssl) {
//todo: poll in/out - have the io_cb recall shutdown if failed
SSL_shutdown(ssl);
} else {
::shutdown(getFd(), SHUT_WR);
}
}
template <class T>
void closeSocket() {
uv_os_sock_t fd = getFd();
Context *netContext = nodeData->netContext;
stop(nodeData->loop);
netContext->closeSocket(fd);
if (ssl) {
SSL_free(ssl);
}
Poll::close(nodeData->loop, [](Poll *p) {
delete (T *) p;
});
}
bool isShuttingDown() {