Commit 786b0f30 authored by Hubert Denkmair's avatar Hubert Denkmair

Merge tag 'v0.14.8' into cmake

parents 3322671f 6d80b42e
......@@ -90,8 +90,15 @@ std::string ExtensionsNegotiator<isServer>::generateOffer() {
extensionsOffer += "; client_no_context_takeover";
}
// we do not support accepting this yet
// todo: if we agree on this, do not allocate a compressor
// per socket!
// It is RECOMMENDED that a server supports the
// "server_no_context_takeover" extension parameter in an extension
// negotiation offer.
if (options & Options::SERVER_NO_CONTEXT_TAKEOVER) {
extensionsOffer += "; server_no_context_takeover";
//extensionsOffer += "; server_no_context_takeover";
}
}
......
......@@ -8,9 +8,10 @@ namespace uWS {
enum Options : unsigned int {
NO_OPTIONS = 0,
PERMESSAGE_DEFLATE = 1,
SERVER_NO_CONTEXT_TAKEOVER = 2,
CLIENT_NO_CONTEXT_TAKEOVER = 4,
NO_DELAY = 8
SERVER_NO_CONTEXT_TAKEOVER = 2, // remove this
CLIENT_NO_CONTEXT_TAKEOVER = 4, // remove this
NO_DELAY = 8,
SLIDING_DEFLATE_WINDOW = 16
};
template <bool isServer>
......
......@@ -5,23 +5,30 @@
namespace uWS {
char *Hub::deflate(char *data, size_t &length) {
z_stream *Hub::allocateDefaultCompressor(z_stream *zStream) {
deflateInit2(zStream, 1, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY);
return zStream;
}
char *Hub::deflate(char *data, size_t &length, z_stream *slidingDeflateWindow) {
dynamicZlibBuffer.clear();
deflationStream.next_in = (Bytef *) data;
deflationStream.avail_in = (unsigned int) length;
z_stream *compressor = slidingDeflateWindow ? slidingDeflateWindow : &deflationStream;
compressor->next_in = (Bytef *) data;
compressor->avail_in = (unsigned int) length;
// note: zlib requires more than 6 bytes with Z_SYNC_FLUSH
const int DEFLATE_OUTPUT_CHUNK = LARGE_BUFFER_SIZE;
int err;
do {
deflationStream.next_out = (Bytef *) zlibBuffer;
deflationStream.avail_out = DEFLATE_OUTPUT_CHUNK;
compressor->next_out = (Bytef *) zlibBuffer;
compressor->avail_out = DEFLATE_OUTPUT_CHUNK;
err = ::deflate(&deflationStream, Z_SYNC_FLUSH);
if (Z_OK == err && deflationStream.avail_out == 0) {
dynamicZlibBuffer.append(zlibBuffer, DEFLATE_OUTPUT_CHUNK - deflationStream.avail_out);
err = ::deflate(compressor, Z_SYNC_FLUSH);
if (Z_OK == err && compressor->avail_out == 0) {
dynamicZlibBuffer.append(zlibBuffer, DEFLATE_OUTPUT_CHUNK - compressor->avail_out);
continue;
} else {
break;
......@@ -29,16 +36,18 @@ char *Hub::deflate(char *data, size_t &length) {
} while (true);
// note: should not change avail_out
deflateReset(&deflationStream);
if (!slidingDeflateWindow) {
deflateReset(compressor);
}
if (dynamicZlibBuffer.length()) {
dynamicZlibBuffer.append(zlibBuffer, DEFLATE_OUTPUT_CHUNK - deflationStream.avail_out);
dynamicZlibBuffer.append(zlibBuffer, DEFLATE_OUTPUT_CHUNK - compressor->avail_out);
length = dynamicZlibBuffer.length() - 4;
return (char *) dynamicZlibBuffer.data();
}
length = DEFLATE_OUTPUT_CHUNK - deflationStream.avail_out - 4;
length = DEFLATE_OUTPUT_CHUNK - compressor->avail_out - 4;
return zlibBuffer;
}
......
......@@ -18,8 +18,10 @@ protected:
Group<CLIENT> *group;
};
static z_stream *allocateDefaultCompressor(z_stream *zStream);
z_stream inflationStream = {}, deflationStream = {};
char *deflate(char *data, size_t &length);
char *deflate(char *data, size_t &length, z_stream *slidingDeflateWindow);
char *inflate(char *data, size_t &length, size_t maxPayload);
char *zlibBuffer;
std::string dynamicZlibBuffer;
......@@ -49,7 +51,7 @@ public:
inflateInit2(&inflationStream, -15);
zlibBuffer = new char[LARGE_BUFFER_SIZE];
deflateInit2(&deflationStream, 1, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY);
allocateDefaultCompressor(&deflationStream);
#ifdef UWS_THREADSAFE
getLoop()->preCbData = nodeData;
......
......@@ -4,6 +4,16 @@
namespace uWS {
template <bool isServer>
WebSocket<isServer>::WebSocket(bool perMessageDeflate, uS::Socket *socket) : uS::Socket(std::move(*socket)) {
compressionStatus = perMessageDeflate ? CompressionStatus::ENABLED : CompressionStatus::DISABLED;
// if we are created in a group with sliding deflate window allocate it here
if (Group<isServer>::from(this)->extensionOptions & SLIDING_DEFLATE_WINDOW) {
slidingDeflateWindow = Hub::allocateDefaultCompressor(new z_stream{});
}
}
/*
* Frames and sends a WebSocket message.
*
......@@ -31,8 +41,8 @@ void WebSocket<isServer>::send(const char *message, size_t length, OpCode opCode
struct TransformData {
OpCode opCode;
bool compress;
Socket *s;
} transformData = {opCode, compress && compressionStatus == WebSocket<isServer>::CompressionStatus::ENABLED, this};
WebSocket<isServer> *s;
} transformData = {opCode, compress && compressionStatus == WebSocket<isServer>::CompressionStatus::ENABLED && opCode < 3, this};
struct WebSocketTransformer {
static size_t estimate(const char *data, size_t length) {
......@@ -41,7 +51,7 @@ void WebSocket<isServer>::send(const char *message, size_t length, OpCode opCode
static size_t transform(const char *src, char *dst, size_t length, TransformData transformData) {
if (transformData.compress) {
char *deflated = Group<isServer>::from(transformData.s)->hub->deflate((char *) src, length);
char *deflated = Group<isServer>::from(transformData.s)->hub->deflate((char *) src, length, (z_stream *) transformData.s->slidingDeflateWindow);
return WebSocketProtocol<isServer, WebSocket<isServer>>::formatMessage(dst, deflated, length, transformData.opCode, length, true);
}
......@@ -297,6 +307,14 @@ void WebSocket<isServer>::onEnd(uS::Socket *s) {
}
webSocket->nodeData->clearPendingPollChanges(webSocket);
// remove any per-websocket zlib memory
if (webSocket->slidingDeflateWindow) {
// this relates to Hub::allocateDefaultCompressor
deflateEnd((z_stream *) webSocket->slidingDeflateWindow);
delete (z_stream *) webSocket->slidingDeflateWindow;
webSocket->slidingDeflateWindow = nullptr;
}
}
template <bool isServer>
......
......@@ -23,9 +23,9 @@ protected:
} compressionStatus;
unsigned char controlTipLength = 0, hasOutstandingPong = false;
WebSocket(bool perMessageDeflate, uS::Socket *socket) : uS::Socket(std::move(*socket)) {
compressionStatus = perMessageDeflate ? CompressionStatus::ENABLED : CompressionStatus::DISABLED;
}
void *slidingDeflateWindow = nullptr;
WebSocket(bool perMessageDeflate, uS::Socket *socket);
static uS::Socket *onData(uS::Socket *s, char *data, size_t length);
static void onEnd(uS::Socket *s);
......
......@@ -24,8 +24,9 @@ int countOccurrences(std::string word, std::string &document) {
void testAutobahn() {
uWS::Hub h;
// let's test pmd + ssl and then pmd + sliding window
uWS::Group<uWS::SERVER> *sslGroup = h.createGroup<uWS::SERVER>(uWS::PERMESSAGE_DEFLATE);
uWS::Group<uWS::SERVER> *group = h.createGroup<uWS::SERVER>(uWS::PERMESSAGE_DEFLATE);
uWS::Group<uWS::SERVER> *group = h.createGroup<uWS::SERVER>(uWS::PERMESSAGE_DEFLATE | uWS::SLIDING_DEFLATE_WINDOW);
auto messageHandler = [](uWS::WebSocket<uWS::SERVER> *ws, char *message, size_t length, uWS::OpCode opCode) {
ws->send(message, length, opCode, nullptr, nullptr, true);
......@@ -1156,6 +1157,25 @@ void testThreadSafety() {
}
}
void testAsync() {
uWS::Hub h;
uS::Async *a = new uS::Async(h.getLoop());
a->start([](uS::Async *a) {
a->close();
});
std::thread t([&h]() {
h.run();
});
std::this_thread::sleep_for(std::chrono::milliseconds(10));
a->send();
t.join();
std::cout << "Falling through Async test" << std::endl;
}
int main(int argc, char *argv[])
{
//serveEventSource();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment