Commit 42b76b8a authored by Alex Hultman's avatar Alex Hultman
Browse files

Introduce per-Node uS::Context to wrap mtcp_context

parent f79b85a6
......@@ -145,7 +145,7 @@ void Group<isServer>::stopListening() {
} else if (listenData->listenTimer) {
uv_os_sock_t fd = listenData->sock;
listenData->listenTimer->stop();
uS::Networking::closeSocket(fd);
listenData->nodeData->netContext->closeSocket(fd);
SSL *ssl = listenData->ssl;
if (ssl) {
......
......@@ -75,9 +75,27 @@ inline SOCKET dup(SOCKET socket) {
namespace uS {
// todo: mark sockets nonblocking in these functions
struct Networking {
struct Context {
#ifdef USE_MTCP
mtcp_context *mctx;
#endif
Context() {
// mtcp_create_context
#ifdef USE_MTCP
mctx = mtcp_create_context(0); // cpu index?
#endif
}
~Context() {
#ifdef USE_MTCP
mtcp_destroy_context(mctx);
#endif
}
// returns INVALID_SOCKET on error
static uv_os_sock_t acceptSocket(uv_os_sock_t fd) {
uv_os_sock_t acceptSocket(uv_os_sock_t fd) {
#if defined(SOCK_CLOEXEC) && defined(SOCK_NONBLOCK)
// Linux, FreeBSD
return accept4(fd, nullptr, nullptr, SOCK_CLOEXEC | SOCK_NONBLOCK);
......@@ -88,7 +106,7 @@ struct Networking {
}
// returns INVALID_SOCKET on error
static uv_os_sock_t createSocket(int domain, int type, int protocol) {
uv_os_sock_t createSocket(int domain, int type, int protocol) {
int flags = 0;
#if defined(SOCK_CLOEXEC) && defined(SOCK_NONBLOCK)
flags = SOCK_CLOEXEC | SOCK_NONBLOCK;
......@@ -97,7 +115,7 @@ struct Networking {
return socket(domain, type | flags, protocol);
}
static void closeSocket(uv_os_sock_t fd) {
void closeSocket(uv_os_sock_t fd) {
#ifdef _WIN32
closesocket(fd);
#else
......@@ -152,6 +170,7 @@ struct WIN32_EXPORT NodeData {
char *recvBuffer;
int recvLength;
Loop *loop;
uS::Context *netContext;
void *user = nullptr;
static const int preAllocMaxSize = 1024;
char **preAlloc;
......
......@@ -34,6 +34,9 @@ Node::Node(int recvLength, int prePadding, int postPadding, bool useDefaultLoop)
nodeData->tid = pthread_self();
loop = Loop::createLoop(useDefaultLoop);
// each node has a context
nodeData->netContext = new Context();
nodeData->loop = loop;
nodeData->asyncMutex = &asyncMutex;
......@@ -63,6 +66,7 @@ Node::~Node() {
}
}
delete [] nodeData->preAlloc;
delete nodeData->netContext;
delete nodeData;
loop->destroy();
}
......
......@@ -85,7 +85,8 @@ public:
template <void A(Socket *s), bool TIMER>
static void accept_cb(ListenData *listenData) {
uv_os_sock_t serverFd = listenData->sock;
uv_os_sock_t clientFd = Networking::acceptSocket(serverFd);
Context *netContext = listenData->nodeData->netContext;
uv_os_sock_t clientFd = netContext->acceptSocket(serverFd);
// if (clientFd == INVALID_SOCKET) {
// /*
// * If accept is failing, the pending connection won't be removed and the
......@@ -129,7 +130,7 @@ public:
Socket *socket = new Socket(listenData->nodeData, listenData->nodeData->loop, clientFd, ssl);
socket->setPoll(UV_READABLE);
A(socket);
} while ((clientFd = Networking::acceptSocket(serverFd)) != INVALID_SOCKET);
} while ((clientFd = netContext->acceptSocket(serverFd)) != INVALID_SOCKET);
}
// todo: hostname, backlog
......@@ -142,6 +143,8 @@ public:
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
Context *netContext = nodeData->netContext;
if (getaddrinfo(host, std::to_string(port).c_str(), &hints, &result)) {
return true;
}
......@@ -151,7 +154,7 @@ public:
if ((options & uS::ONLY_IPV4) == 0) {
for (addrinfo *a = result; a && listenFd == SOCKET_ERROR; a = a->ai_next) {
if (a->ai_family == AF_INET6) {
listenFd = Networking::createSocket(a->ai_family, a->ai_socktype, a->ai_protocol);
listenFd = netContext->createSocket(a->ai_family, a->ai_socktype, a->ai_protocol);
listenAddr = a;
}
}
......@@ -159,7 +162,7 @@ public:
for (addrinfo *a = result; a && listenFd == SOCKET_ERROR; a = a->ai_next) {
if (a->ai_family == AF_INET) {
listenFd = Networking::createSocket(a->ai_family, a->ai_socktype, a->ai_protocol);
listenFd = netContext->createSocket(a->ai_family, a->ai_socktype, a->ai_protocol);
listenAddr = a;
}
}
......@@ -182,7 +185,7 @@ public:
setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled));
if (bind(listenFd, listenAddr->ai_addr, listenAddr->ai_addrlen) || ::listen(listenFd, 512)) {
Networking::closeSocket(listenFd);
netContext->closeSocket(listenFd);
freeaddrinfo(result);
return true;
}
......
......@@ -324,8 +324,9 @@ struct WIN32_EXPORT Socket : Poll {
template <class T>
void closeSocket() {
uv_os_sock_t fd = getFd();
Context *netContext = nodeData->netContext;
stop(nodeData->loop);
Networking::closeSocket(fd);
netContext->closeSocket(fd);
if (ssl) {
SSL_free(ssl);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment