3 #include <boost/asio.hpp> 5 #include <websocketpp/config/asio_no_tls.hpp> 7 #include <websocketpp/server.hpp> 11 #include <DataChannelServer/server-src/internal-api.h> 20 template <
typename T>
class Inner {
22 Inner(boost::asio::io_service &service, std::unique_ptr<T> impl)
23 : service_(service), impl_(std::move(impl)) {}
25 template <
typename F>
void Wrap(F functor) {
26 service_.dispatch([
this, functor]() { functor(impl_.get()); });
30 boost::asio::io_service &service_;
31 std::unique_ptr<T> impl_;
35 PeerConnectionObserver
36 WrapObserverImplementation(boost::asio::io_service &service,
37 std::unique_ptr<T> impl) {
38 PeerConnectionObserver result;
40 result.data =
new Inner<T>(service, std::move(impl));
42 result.Deleter = [](
void *data) {
43 Inner<T> *inner = (Inner<T> *)(data);
44 inner->Wrap([inner](T *) {
delete inner; });
47 result.OnOpen = [](
void *data) {
48 Inner<T> *inner = (Inner<T> *)(data);
49 inner->Wrap([](T *impl) { impl->OnOpen(); });
52 result.OnClose = [](
void *data) {
53 Inner<T> *inner = (Inner<T> *)(data);
54 inner->Wrap([](T *impl) { impl->OnClose(); });
57 result.ProcessWebsocketMessage =
58 [](
void *data,
const char *message,
int message_length) {
59 Inner<T> *inner = (Inner<T> *)(data);
60 std::string message_str(message, message_length);
61 inner->Wrap([message_str](T *impl) {
62 impl->ProcessWebsocketMessage(message_str);
66 result.ProcessDataChannelMessage =
67 [](
void *data,
const char *message,
int message_length) {
68 Inner<T> *inner = (Inner<T> *)(data);
69 std::string message_str(message, message_length);
70 inner->Wrap([message_str](T *impl) {
71 impl->ProcessDataChannelMessage(message_str);
86 : ordered(
true), maxRetransmits(-1), maxRetransmitTime(-1) {}
125 typedef websocketpp::server<websocketpp::config::asio> server;
134 : thread(CreateProcessingThread()), settings_(settings) {
137 echo_server_.set_access_channels(websocketpp::log::alevel::all);
138 echo_server_.clear_access_channels(
139 websocketpp::log::alevel::frame_payload);
142 echo_server_.init_asio(&service_);
145 echo_server_.set_message_handler(
146 [
this](websocketpp::connection_hdl hdl, server::message_ptr msg) {
147 on_message(hdl, msg);
149 echo_server_.set_open_handler(
150 [
this](websocketpp::connection_hdl hdl) { on_open(hdl); });
151 echo_server_.set_close_handler(
152 [
this](websocketpp::connection_hdl hdl) { on_close(hdl); });
155 echo_server_.listen(port);
158 echo_server_.start_accept();
159 }
catch (websocketpp::exception
const &e) {
160 std::cout << e.what() << std::endl;
162 std::cout <<
"other exception" << std::endl;
172 std::function<
void(std::shared_ptr<DataChannel>)> handler) {
173 connection_handler_ = handler;
188 echo_server_.stop_listening();
193 DataChannelOptions options;
194 options.ordered = settings.
ordered;
201 struct PeerConnectionDeleter {
202 explicit PeerConnectionDeleter(ProcessingThread *a_thread)
203 : thread(a_thread) {}
205 void operator()(PeerConnection *peer) { DeletePeerConnection(thread, peer); }
207 ProcessingThread *thread;
211 ServerData(ProcessingThread *a_thread, PeerConnectionObserver observer,
212 std::shared_ptr<std::weak_ptr<DataChannel>> a_channel_holder,
213 DataChannelOptions options)
214 : thread(a_thread), peer(CreatePeerConnection(thread, observer, options),
215 PeerConnectionDeleter(thread)),
216 channel_holder(a_channel_holder) {}
218 ProcessingThread *thread;
220 std::unique_ptr<PeerConnection, PeerConnectionDeleter> peer;
221 std::shared_ptr<std::weak_ptr<DataChannel>> channel_holder;
225 struct ProcessingThreadDeleter {
226 void operator()(ProcessingThread *peer) { DeleteProcessingThread(peer); }
229 void on_message(websocketpp::connection_hdl hdl, server::message_ptr msg) {
230 ServerData &channel = get_data(hdl);
232 SendWebsocketMessage(thread.get(), channel.peer.get(),
233 msg->get_payload().data(), msg->get_payload().size());
236 ServerData &get_data(websocketpp::connection_hdl hdl) {
237 auto iter = connection_info_.find(hdl);
238 if (iter == connection_info_.end()) {
239 std::cout <<
"Internal Error: Unable to find connection " << hdl.lock();
246 void on_open(websocketpp::connection_hdl hdl) {
247 class PeerConnectionObserverImplementation {
249 PeerConnectionObserverImplementation(
250 Server *server, websocketpp::connection_hdl hdl,
251 std::shared_ptr<std::weak_ptr<DataChannel>> channel_holder)
252 : server_(server), hdl_(hdl), channel_holder_(channel_holder) {}
255 auto send_message_callback = [
this](
const std::string &message) {
256 ServerData &channel = server_->get_data(hdl_);
257 SendDataChannelMessage(server_->thread.get(), channel.peer.get(),
258 message.data(), message.size());
261 auto close_channel_callback = [
this]() {
262 std::error_code error_code;
263 server_->echo_server_.close(hdl_, websocketpp::close::status::normal,
264 "server requested to quit", error_code);
266 std::cout <<
"Had error while closing " << error_code.message()
269 server_->connection_info_.erase(hdl_);
272 auto channel = std::make_shared<DataChannel>(send_message_callback,
273 close_channel_callback);
274 *channel_holder_ = channel;
275 server_->connection_handler_(channel);
279 auto actual_channel = channel_holder_->lock();
280 if (actual_channel !=
nullptr) {
281 actual_channel->GetOnCloseHandler()();
285 void ProcessWebsocketMessage(
const std::string &message) {
286 std::error_code error_code;
287 server_->echo_server_.send(
288 hdl_, message, websocketpp::frame::opcode::text, error_code);
290 std::cout <<
"Had error while sending " << error_code.message()
295 void ProcessDataChannelMessage(
const std::string &message) {
296 auto actual_channel = channel_holder_->lock();
297 if (actual_channel !=
nullptr) {
298 actual_channel->GetOnMessageHandler()(message);
304 websocketpp::connection_hdl hdl_;
305 std::shared_ptr<std::weak_ptr<DataChannel>> channel_holder_;
308 auto channel_holder = std::make_shared<std::weak_ptr<DataChannel>>();
312 impl::WrapObserverImplementation(
313 service_, std::make_unique<PeerConnectionObserverImplementation>(
314 this, hdl, channel_holder)),
315 channel_holder, ConvertSettings(settings_));
317 connection_info_.emplace(hdl, std::move(data));
320 void on_close(websocketpp::connection_hdl hdl) {
321 std::cout <<
"on_close called with hdl: " << hdl.lock().get() << std::endl;
322 auto iter = connection_info_.find(hdl);
323 if (iter == connection_info_.end()) {
324 std::cout <<
"Already closed the connection" << std::endl;
327 ServerData &channel = iter->second;
328 auto actual_channel = channel.channel_holder->lock();
329 if (actual_channel !=
nullptr) {
330 actual_channel->GetOnCloseHandler()();
332 connection_info_.erase(hdl);
336 std::unique_ptr<ProcessingThread, ProcessingThreadDeleter> thread;
340 std::map<websocketpp::connection_hdl, ServerData,
341 std::owner_less<websocketpp::connection_hdl>> connection_info_;
343 std::function<void(std::shared_ptr<DataChannel>)> connection_handler_;
345 boost::asio::io_service service_;
void SetConnectionHandler(std::function< void(std::shared_ptr< DataChannel >)> handler)
Get notified whenever there is a connection.
Definition: server.h:171
DataChannelSettings specifies the settings for the created DataChannel.
Definition: server.h:84
void Stop()
Stop the server.
Definition: server.h:187
int maxRetransmits
How many retransmits before giving up.
Definition: server.h:92
The connection API for both the server and client.
bool ordered
True to force ordered delivery.
Definition: server.h:89
int maxRetransmitTime
How long before giving up on retransmission.
Definition: server.h:95
void Start()
Run the server.
Definition: server.h:179
Server(int port, DataChannelSettings settings=DataChannelSettings())
Create the server listening on a certain port.
Definition: server.h:133
A DataChannel server.
Definition: server.h:122