DataChannelServer
A C++ library for creating WebRTC DataChannel servers.
server.h
Go to the documentation of this file.
1 #pragma once
2 
3 #include <boost/asio.hpp>
4 #include <iostream>
5 #include <websocketpp/config/asio_no_tls.hpp>
6 
7 #include <websocketpp/server.hpp>
8 
9 #include <iostream>
10 
11 #include <DataChannelServer/server-src/internal-api.h>
13 
16 
17 namespace data_channel {
18 namespace impl {
19 
20  template <typename T> class Inner {
21  public:
22  Inner(boost::asio::io_service &service, std::unique_ptr<T> impl)
23  : service_(service), impl_(std::move(impl)) {}
24 
25  template <typename F> void Wrap(F functor) {
26  service_.dispatch([this, functor]() { functor(impl_.get()); });
27  }
28 
29  private:
30  boost::asio::io_service &service_;
31  std::unique_ptr<T> impl_;
32  };
33 
34  template <typename T>
35  PeerConnectionObserver
36  WrapObserverImplementation(boost::asio::io_service &service,
37  std::unique_ptr<T> impl) {
38  PeerConnectionObserver result;
39 
40  result.data = new Inner<T>(service, std::move(impl));
41 
42  result.Deleter = [](void *data) {
43  Inner<T> *inner = (Inner<T> *)(data);
44  inner->Wrap([inner](T *) { delete inner; });
45  };
46 
47  result.OnOpen = [](void *data) {
48  Inner<T> *inner = (Inner<T> *)(data);
49  inner->Wrap([](T *impl) { impl->OnOpen(); });
50  };
51 
52  result.OnClose = [](void *data) {
53  Inner<T> *inner = (Inner<T> *)(data);
54  inner->Wrap([](T *impl) { impl->OnClose(); });
55  };
56 
57  result.ProcessWebsocketMessage =
58  [](void *data, const char *message, int message_length) {
59  Inner<T> *inner = (Inner<T> *)(data);
60  std::string message_str(message, message_length);
61  inner->Wrap([message_str](T *impl) {
62  impl->ProcessWebsocketMessage(message_str);
63  });
64  };
65 
66  result.ProcessDataChannelMessage =
67  [](void *data, const char *message, int message_length) {
68  Inner<T> *inner = (Inner<T> *)(data);
69  std::string message_str(message, message_length);
70  inner->Wrap([message_str](T *impl) {
71  impl->ProcessDataChannelMessage(message_str);
72  });
73  };
74 
75  return result;
76  }
77 
78 } // namespace impl
79 
86  : ordered(true), maxRetransmits(-1), maxRetransmitTime(-1) {}
87 
89  bool ordered;
90 
93 
96 };
97 
122 class Server {
123 private:
124 
125  typedef websocketpp::server<websocketpp::config::asio> server;
126 
127 public:
134  : thread(CreateProcessingThread()), settings_(settings) {
135  try {
136  // Set logging settings
137  echo_server_.set_access_channels(websocketpp::log::alevel::all);
138  echo_server_.clear_access_channels(
139  websocketpp::log::alevel::frame_payload);
140 
141  // Initialize asio
142  echo_server_.init_asio(&service_);
143 
144  // Register our message handlers
145  echo_server_.set_message_handler(
146  [this](websocketpp::connection_hdl hdl, server::message_ptr msg) {
147  on_message(hdl, msg);
148  });
149  echo_server_.set_open_handler(
150  [this](websocketpp::connection_hdl hdl) { on_open(hdl); });
151  echo_server_.set_close_handler(
152  [this](websocketpp::connection_hdl hdl) { on_close(hdl); });
153 
154  // Listen on provided port
155  echo_server_.listen(port);
156 
157  // Start the server accept loop
158  echo_server_.start_accept();
159  } catch (websocketpp::exception const &e) {
160  std::cout << e.what() << std::endl;
161  } catch (...) {
162  std::cout << "other exception" << std::endl;
163  }
164  }
165 
172  std::function<void(std::shared_ptr<DataChannel>)> handler) {
173  connection_handler_ = handler;
174  }
175 
179  void Start() {
180  // Start the ASIO io_service run loop
181  service_.run();
182  }
183 
187  void Stop() {
188  echo_server_.stop_listening();
189  }
190 
191 private:
192  static DataChannelOptions ConvertSettings(DataChannelSettings settings) {
193  DataChannelOptions options;
194  options.ordered = settings.ordered;
195  options.maxRetransmits = settings.maxRetransmits;
196  options.maxRetransmitTime = settings.maxRetransmitTime;
197 
198  return options;
199  }
200 
201  struct PeerConnectionDeleter {
202  explicit PeerConnectionDeleter(ProcessingThread *a_thread)
203  : thread(a_thread) {}
204 
205  void operator()(PeerConnection *peer) { DeletePeerConnection(thread, peer); }
206 
207  ProcessingThread *thread;
208  };
209 
210  struct ServerData {
211  ServerData(ProcessingThread *a_thread, PeerConnectionObserver observer,
212  std::shared_ptr<std::weak_ptr<DataChannel>> a_channel_holder,
213  DataChannelOptions options)
214  : thread(a_thread), peer(CreatePeerConnection(thread, observer, options),
215  PeerConnectionDeleter(thread)),
216  channel_holder(a_channel_holder) {}
217 
218  ProcessingThread *thread;
219 
220  std::unique_ptr<PeerConnection, PeerConnectionDeleter> peer;
221  std::shared_ptr<std::weak_ptr<DataChannel>> channel_holder;
222  };
223 
224 
225  struct ProcessingThreadDeleter {
226  void operator()(ProcessingThread *peer) { DeleteProcessingThread(peer); }
227  };
228 
229  void on_message(websocketpp::connection_hdl hdl, server::message_ptr msg) {
230  ServerData &channel = get_data(hdl);
231 
232  SendWebsocketMessage(thread.get(), channel.peer.get(),
233  msg->get_payload().data(), msg->get_payload().size());
234  }
235 
236  ServerData &get_data(websocketpp::connection_hdl hdl) {
237  auto iter = connection_info_.find(hdl);
238  if (iter == connection_info_.end()) {
239  std::cout << "Internal Error: Unable to find connection " << hdl.lock();
240  abort();
241  }
242 
243  return iter->second;
244  }
245 
246  void on_open(websocketpp::connection_hdl hdl) {
247  class PeerConnectionObserverImplementation {
248  public:
249  PeerConnectionObserverImplementation(
250  Server *server, websocketpp::connection_hdl hdl,
251  std::shared_ptr<std::weak_ptr<DataChannel>> channel_holder)
252  : server_(server), hdl_(hdl), channel_holder_(channel_holder) {}
253 
254  void OnOpen() {
255  auto send_message_callback = [this](const std::string &message) {
256  ServerData &channel = server_->get_data(hdl_);
257  SendDataChannelMessage(server_->thread.get(), channel.peer.get(),
258  message.data(), message.size());
259  };
260 
261  auto close_channel_callback = [this]() {
262  std::error_code error_code;
263  server_->echo_server_.close(hdl_, websocketpp::close::status::normal,
264  "server requested to quit", error_code);
265  if (error_code) {
266  std::cout << "Had error while closing " << error_code.message()
267  << std::endl;
268  }
269  server_->connection_info_.erase(hdl_);
270  };
271 
272  auto channel = std::make_shared<DataChannel>(send_message_callback,
273  close_channel_callback);
274  *channel_holder_ = channel;
275  server_->connection_handler_(channel);
276  }
277 
278  void OnClose() {
279  auto actual_channel = channel_holder_->lock();
280  if (actual_channel != nullptr) {
281  actual_channel->GetOnCloseHandler()();
282  }
283  }
284 
285  void ProcessWebsocketMessage(const std::string &message) {
286  std::error_code error_code;
287  server_->echo_server_.send(
288  hdl_, message, websocketpp::frame::opcode::text, error_code);
289  if (error_code) {
290  std::cout << "Had error while sending " << error_code.message()
291  << std::endl;
292  }
293  }
294 
295  void ProcessDataChannelMessage(const std::string &message) {
296  auto actual_channel = channel_holder_->lock();
297  if (actual_channel != nullptr) {
298  actual_channel->GetOnMessageHandler()(message);
299  }
300  }
301 
302  private:
303  Server *server_;
304  websocketpp::connection_hdl hdl_;
305  std::shared_ptr<std::weak_ptr<DataChannel>> channel_holder_;
306  };
307 
308  auto channel_holder = std::make_shared<std::weak_ptr<DataChannel>>();
309 
310  ServerData data(
311  thread.get(),
312  impl::WrapObserverImplementation(
313  service_, std::make_unique<PeerConnectionObserverImplementation>(
314  this, hdl, channel_holder)),
315  channel_holder, ConvertSettings(settings_));
316 
317  connection_info_.emplace(hdl, std::move(data));
318  }
319 
320  void on_close(websocketpp::connection_hdl hdl) {
321  std::cout << "on_close called with hdl: " << hdl.lock().get() << std::endl;
322  auto iter = connection_info_.find(hdl);
323  if (iter == connection_info_.end()) {
324  std::cout << "Already closed the connection" << std::endl;
325  return;
326  }
327  ServerData &channel = iter->second;
328  auto actual_channel = channel.channel_holder->lock();
329  if (actual_channel != nullptr) {
330  actual_channel->GetOnCloseHandler()();
331  } else {
332  connection_info_.erase(hdl);
333  }
334  }
335 
336  std::unique_ptr<ProcessingThread, ProcessingThreadDeleter> thread;
337 
338  DataChannelSettings settings_;
339 
340  std::map<websocketpp::connection_hdl, ServerData,
341  std::owner_less<websocketpp::connection_hdl>> connection_info_;
342 
343  std::function<void(std::shared_ptr<DataChannel>)> connection_handler_;
344 
345  boost::asio::io_service service_;
346  server echo_server_;
347 };
348 
349 } // namespace data_channel
void SetConnectionHandler(std::function< void(std::shared_ptr< DataChannel >)> handler)
Get notified whenever there is a connection.
Definition: server.h:171
DataChannelSettings specifies the settings for the created DataChannel.
Definition: server.h:84
void Stop()
Stop the server.
Definition: server.h:187
int maxRetransmits
How many retransmits before giving up.
Definition: server.h:92
The connection API for both the server and client.
bool ordered
True to force ordered delivery.
Definition: server.h:89
int maxRetransmitTime
How long before giving up on retransmission.
Definition: server.h:95
void Start()
Run the server.
Definition: server.h:179
Definition: server.h:17
Server(int port, DataChannelSettings settings=DataChannelSettings())
Create the server listening on a certain port.
Definition: server.h:133
A DataChannel server.
Definition: server.h:122