Commit 927579d0 authored by Hubert Denkmair's avatar Hubert Denkmair
Browse files

port to uWebSockets

parent cc829b68
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -4,3 +4,7 @@
[submodule "lib/msgpack-c"]
	path = lib/msgpack-c
	url = https://github.com/msgpack/msgpack-c.git
[submodule "lib/uWebSockets"]
	path = lib/uWebSockets
	url = git@git.bingo-ev.de:GPN18Programmierspiel/uWebSockets.git
	branch = cmake
+1 −0
Original line number Diff line number Diff line
@@ -2,4 +2,5 @@ cmake_minimum_required (VERSION 3.2)
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_FLAGS "-Wall -pedantic")
add_subdirectory(lib/TcpServer/TcpServer)
add_subdirectory(lib/uWebSockets)
add_subdirectory(relayserver)

uWebSockets @ 3322671f

Original line number Diff line number Diff line
Subproject commit 3322671f50d0ae902a3e22708e4c98d475893961
+7 −2
Original line number Diff line number Diff line
@@ -6,8 +6,8 @@ include_directories(${EIGEN3_INCLUDE_DIR})

include_directories(
	${PROJECT_NAME}
	lib/msgpack-c/include/
	lib/TcpServer/TcpServer/include/
	../lib/uWebSockets/src
	../lib/msgpack-c/include/
)

add_executable(
@@ -22,4 +22,9 @@ add_executable(
target_link_libraries(
	${PROJECT_NAME}
	tcpserver
	uWebSockets
	pthread
	ssl
	crypto
	z
)
+45 −121
Original line number Diff line number Diff line
#include "RelayServer.h"
#include <iostream>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <TcpServer/TcpSocket.h>
#include <TcpServer/EPoll.h>
#include <msgpack.hpp>

RelayServer::RelayServer()
{
	_tcpServer.AddConnectionEstablishedListener(
		[this](TcpSocket& socket)
		{
			return OnConnectionEstablished(socket);
		}
	);

	_tcpServer.AddConnectionClosedListener(
		[this](TcpSocket& socket)
		{
			return OnConnectionClosed(socket);
		}
	);

	_tcpServer.AddDataAvailableListener(
		[this](TcpSocket& socket)
		{
			return OnDataAvailable(socket);
		}
	);

	_tcpProtocol.SetFrameCompleteCallback(
		[this](uint64_t frame_id)
		{
			//std::cout << "frame " << frame_id << " complete." << std::endl;

			for (auto& it: _connections)
			{
				it.second.FrameComplete(frame_id, _tcpProtocol);
			}
		}
	);

	_websocketServer.clear_access_channels(websocketpp::log::alevel::all);
	_websocketServer.set_access_channels(websocketpp::log::alevel::connect);
	_websocketServer.set_access_channels(websocketpp::log::alevel::disconnect);
	_websocketServer.set_access_channels(websocketpp::log::alevel::app);
	// websocketServer.set_message_handler()...
}

int RelayServer::Run()
{
	uWS::Hub h;
	EPoll epoll;
	if(!_tcpServer.Listen(9009))
	{
		return -1;
	}
	epoll.AddFileDescriptor(_tcpServer.GetEPoll().GetFileDescriptor(), EPOLLIN|EPOLLPRI|EPOLLERR|EPOLLRDHUP|EPOLLHUP);

	_clientSocket = socket(AF_INET, SOCK_STREAM, 0);
	struct sockaddr_in serv_addr;
@@ -69,96 +21,68 @@ int RelayServer::Run()
		perror("connect to server failed");
		return -1;
	}
	epoll.AddFileDescriptor(_clientSocket, EPOLLIN|EPOLLPRI|EPOLLERR);

	while(true)
	{
		epoll.Poll(1000,
			[this](const epoll_event& ev)
	_tcpProtocol.SetFrameCompleteCallback(
		[this, &h](uint64_t frame_id)
		{
				if (ev.data.fd == _clientSocket)
			h.getDefaultGroup<uWS::SERVER>().forEach(
				[this, frame_id](uWS::WebSocket<uWS::SERVER>* sock)
				{
					return _tcpProtocol.Read(_clientSocket);
				}
				else
				{
					_tcpServer.Poll(0);
				}
				return true;
					auto con = static_cast<WebsocketConnection*>(sock->getUserData());
					con->FrameComplete(frame_id, _tcpProtocol);
				}
			);
			std::cout << "frame " << frame_id << " complete." << std::endl;
		}
}
	);
	epoll.AddFileDescriptor(_clientSocket, EPOLLIN|EPOLLPRI|EPOLLERR);

bool RelayServer::OnConnectionEstablished(TcpSocket &socket)
{
	std::cerr << "connection established to " << socket.GetPeer() << std::endl;
	auto con = _websocketServer.get_connection();
	con->set_write_handler(
		[&socket](websocketpp::connection_hdl, char const* data, size_t size)
		{
			if (socket.Write(data, size, false) != static_cast<ssize_t>(size))
	h.onConnection(
		[](uWS::WebSocket<uWS::SERVER> *ws, uWS::HttpRequest req)
		{
				return websocketpp::transport::iostream::error::make_error_code(
					websocketpp::transport::iostream::error::general
				);
			}
			return websocketpp::lib::error_code();
			ws->setUserData(new WebsocketConnection(ws));
		}
	);

	con->set_shutdown_handler(
		[&socket](websocketpp::connection_hdl)
	h.onDisconnection(
		[](uWS::WebSocket<uWS::SERVER> *ws, int code, const char *message, size_t length)
		{
			socket.Close();
			return websocketpp::lib::error_code();
			auto *con = static_cast<WebsocketConnection*>(ws->getUserData());
			delete con;
		}
	);

	con->start();
	_connections.emplace(socket.GetFileDescriptor(), WebsocketConnection { socket.GetFileDescriptor(), con });
	return true;
}

bool RelayServer::OnConnectionClosed(TcpSocket &socket)
	h.onMessage([](uWS::WebSocket<uWS::SERVER> *ws, char *message, size_t length, uWS::OpCode opCode)
	{
	std::cerr << "connection to " << socket.GetPeer() << " closed." << std::endl;
		//ws->send(message, length, opCode);
	});

	auto it = _connections.find(socket.GetFileDescriptor());
	if (it == _connections.end())
	std::string response = "Hello!";
	h.onHttpRequest([&](uWS::HttpResponse *res, uWS::HttpRequest req, char *data, size_t length, size_t remainingBytes)
	{
		return false;
	}
		res->end(response.data(), response.length());
	});

	it->second.Eof();
	_connections.erase(socket.GetFileDescriptor());
	return true;
	if (!h.listen(9009))
	{
		return -1;
	}

bool RelayServer::OnDataAvailable(TcpSocket &socket)
	epoll.AddFileDescriptor(h.getLoop()->getEpollFd(), EPOLLIN|EPOLLPRI|EPOLLERR|EPOLLRDHUP|EPOLLHUP); // TODO check which events are neccessary
	while (true)
	{
		epoll.Poll(1000,
			[this, &h](const epoll_event& ev)
			{
	auto it = _connections.find(socket.GetFileDescriptor());
	if (it == _connections.end())
				if (ev.data.fd == _clientSocket)
				{
		return false;
					return _tcpProtocol.Read(_clientSocket);
				}

	char data[1024];
	ssize_t count = socket.Read(data, sizeof(data));
	if (count > 0)
				else
				{
		it->second.DataReceived(data, static_cast<size_t>(count));
					h.poll();
				}
				return true;
			}

bool RelayServer::OnServerDataReceived(const epoll_event &ev)
{
	std::array<char, 102400> buf;
	if (read(_clientSocket, buf.data(), buf.size()) <= 0)
	{
		return false;
		);
	}

	return true;
}
Loading