XTL  0.1
eXtended Template Library
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
rpc.hpp
Go to the documentation of this file.
1 
6 #pragma once
7 
8 #include <xtd/xtd.hpp>
9 
10 #include <thread>
11 #include <mutex>
12 #include <condition_variable>
13 #include <future>
14 
15 #include <xtd/socket.hpp>
17 #include <xtd/memory.hpp>
18 #include <xtd/debug.hpp>
19 
20 namespace xtd{
21  namespace rpc{
22  template <typename _DeclT, typename _CallT> class rpc_call;
27  template <bool _SkipByVal, typename ...> class marshaler;
32  template <typename...> class marshaler_base;
33  template <class _TransportT, class _DeclT, class ... _Calls> class server_impl;
34  template <typename _TransportT, class ... _Calls> class client;
35 
36 
37 
38 
39 
44  public:
45  using _super_t = xtd::exception;
46  template <typename ... _ArgTs> protocol_exception(_ArgTs&&...oArgs) : _super_t(std::forward<_ArgTs>(oArgs)...){}
47  };
49  public:
51  template <typename ... _ArgTs> malformed_payload(_ArgTs&&...oArgs) : _super_t(std::forward<_ArgTs>(oArgs)...){}
52  };
53  class bad_call : public protocol_exception{
54  public:
56  template <typename ... _ArgTs> bad_call(_ArgTs&&...oArgs) : _super_t(std::forward<_ArgTs>(oArgs)...){}
57  };
58 
59 
62  class payload : public std::vector<uint8_t>{
63  public:
64 
65  using _super_t = std::vector<uint8_t>;
66  using invoke_handler_type = std::function<bool(payload&)>;
67 
68  template <typename ... _ArgTs> payload(_ArgTs&&...oArgs) : _super_t(std::forward<_ArgTs>(oArgs)...){}
69 
70  template <typename _Ty> _Ty peek() const{
71  if (_super_t::size() < sizeof(_Ty)) throw rpc::malformed_payload(here(), "Malformed payload");
72  return _Ty(*reinterpret_cast<const _Ty*>(&_super_t::at(0)));
73  }
74  };
75 
77  template <> class marshaler_base<>{
78  public:
79  static void marshal(payload&){ /*this specialization is a recursion terminator so has no implementation*/}
80  static void unmarshal(payload&){/*this specialization is a recursion terminator so has no implementation*/}
81  };
82 
87  template <typename _Ty>
88  class marshaler_base<_Ty>{
89  public:
90  static void marshal(payload& oPayload, const _Ty& val){
91  static_assert(std::is_pod<_Ty>::value, "Invalid specialization");
92  oPayload.insert(oPayload.end(), reinterpret_cast<const uint8_t*>(&val), sizeof(_Ty) + reinterpret_cast<const uint8_t*>(&val));
93  }
94  static _Ty unmarshal(payload& oPayload){
95  static_assert(std::is_pod<_Ty>::value, "Invalid specialization");
96  _Ty oRet(*reinterpret_cast<_Ty*>(&oPayload[0]));
97  oPayload.erase(oPayload.begin(), oPayload.begin() + sizeof(_Ty));
98  return oRet;
99  }
100  };
101 
103  template <typename _Ty, size_t _Len>
104  class marshaler_base<_Ty(&)[_Len] >{
105  public:
106  static void marshal(payload& oPayload, const _Ty(&val)[_Len]){
107  marshaler_base<size_t>::marshal(oPayload, _Len);
108  for (const auto & oItem : val) marshaler_base<_Ty>::marshal(oPayload, oItem);
109  }
110  };
111 
112 
114  template <typename _Ty> class marshaler_base<std::vector < _Ty>> {
115  public:
116  static void marshal(payload& oPayload, const std::vector<_Ty>& val){
117  marshaler_base<size_t>::marshal(oPayload, val.size());
118  for (const auto & oItem : val) marshaler_base<_Ty>::marshal(oPayload, oItem);
119  }
120  static std::vector < _Ty> unmarshal(payload& oPayload){
121  std::vector < _Ty> oRet;
122  auto iSize = marshaler_base<std::size_t>::unmarshal(oPayload);
123  for (std::size_t i = 0; i < iSize; i++){
124  oRet.push_back(marshaler_base<_Ty>::unmarshal(oPayload));
125  }
126  return oRet;
127  }
128  };
129 
130 
132  template <>
133  class marshaler_base<std::string>{
134  public:
135  static void marshal(payload & oPayload, const std::string& val){
136  marshaler_base<std::size_t>::marshal(oPayload, val.size());
137  oPayload.insert(oPayload.end(), val.cbegin(), val.cend());
138  }
139  static std::string unmarshal(payload& oPayload){
140 
141  auto iSize = marshaler_base<size_t>::unmarshal(oPayload);
142  std::string oRet(oPayload.begin(), oPayload.begin() + iSize);
143  oPayload.erase(oPayload.begin(), oPayload.begin() + iSize);
144  return oRet;
145  }
146  };
147 
149  template <bool _SkipByVal> class marshaler<_SkipByVal>{
150  public:
151  static void marshal(payload&){/*this specialization is a recursion terminator so has no implementation*/}
152  static void unmarshal(payload&){/*this specialization is a recursion terminator so has no implementation*/}
153  };
154 
158  template <typename _Ty, typename..._ArgTs> class marshaler<true, const _Ty&, _ArgTs...>{
159  public:
160  static void marshal(payload& oPayload, const _Ty&, _ArgTs&&...oArgs){
161  marshaler<true, _ArgTs...>::marshal(oPayload, std::forward<_ArgTs>(oArgs)...);
162  }
163  static void unmarshal(payload& oPayload, const _Ty&, _ArgTs&&...oArgs){
164  marshaler<true, _ArgTs...>::unmarshal(oPayload, std::forward<_ArgTs>(oArgs)...);
165  }
166  };
167 
168 
172  template <typename _Ty, typename ..._ArgTs> class marshaler<true, _Ty, _ArgTs...>{
173  public:
174  static void marshal(payload& oPayload, const _Ty&, _ArgTs&&...oArgs){
175  marshaler<true, _ArgTs...>::marshal(oPayload, std::forward<_ArgTs>(oArgs)...);
176  }
177  static void unmarshal(payload& oPayload, const _Ty&, _ArgTs&&...oArgs){
178  marshaler<true, _ArgTs...>::unmarshal(oPayload, std::forward<_ArgTs>(oArgs)...);
179  }
180  };
181 
182 
186  template <typename _Ty, typename ..._ArgTs> class marshaler<false, _Ty&, _ArgTs...>{
187  public:
188  static void marshal(payload& oPayload, const _Ty& value, _ArgTs&&...oArgs){
189  marshaler_base<_Ty>::marshal(oPayload, value);
190  marshaler<false, _ArgTs...>::marshal(oPayload, std::forward<_ArgTs>(oArgs)...);
191  }
192  static void unmarshal(payload& oPayload, _Ty& value, _ArgTs&&...oArgs){
193  value = marshaler_base<_Ty>::unmarshal(oPayload);
194  marshaler<false, _ArgTs...>::unmarshal(oPayload, std::forward<_ArgTs>(oArgs)...);
195  }
196  };
197 
201  template <typename _Ty, typename ..._ArgTs> class marshaler<false, _Ty, _ArgTs...>{
202  public:
203  static void marshal(payload& oPayload, const _Ty& value, _ArgTs&&...oArgs){
204  marshaler_base<_Ty>::marshal(oPayload, value);
205  marshaler<false, _ArgTs...>::marshal(oPayload, std::forward<_ArgTs>(oArgs)...);
206  }
207  static void unmarshal(payload& oPayload, _Ty& value, _ArgTs&&...oArgs){
208  value = marshaler_base<_Ty>::unmarshal(oPayload);
209  marshaler<false, _ArgTs...>::unmarshal(oPayload, std::forward<_ArgTs>(oArgs)...);
210  }
211  };
212 
213 
220  template <typename _DeclT, typename _ReturnT, typename ... _ArgTs> class rpc_call<_DeclT, _ReturnT(_ArgTs...)>{
221 
222  public:
223  using return_type = _ReturnT;
224  using function_type = std::function<_ReturnT(_ArgTs...)>;
225  using upload_marshaler_type = marshaler<false, size_t, _ArgTs...>;
226  using download_marshaler_type = marshaler<true, _ArgTs..., _ReturnT&>;
227  };
228 
229 
233  socket::ipv4address _Address;
235  std::unique_ptr<std::thread> _ServerThread;
236  bool _StopServer;
237  xtd::concurrent::hash_map<std::thread::id, std::thread> _Clients;
238  bool _ClientConnected;
239 
240  public:
241  using pointer_type = std::shared_ptr<tcp_transport>;
242 
243  tcp_transport(const socket::ipv4address& oAddress) : _Address(oAddress), _Socket(), _ServerThread(), _ClientConnected(false){}
244 
245  void start_server(payload::invoke_handler_type oHandler){
246  _StopServer = false;
247  std::shared_ptr<std::promise<void>> oServerStarted(new std::promise<void>);
248  std::shared_ptr<payload::invoke_handler_type> oInvokeHandler(new payload::invoke_handler_type(oHandler));
249  _ServerThread = xtd::make_unique<std::thread>([&, oInvokeHandler, oServerStarted]{
250  oServerStarted->set_value();
251  _Socket.bind(_Address);
252  bool ExitThread = false;
253  payload oPayload;
254  while (!_StopServer && !ExitThread){
255  _Socket.listen();
256  auto oClient = _Socket.accept<xtd::socket::ipv4_tcp_stream>();
257  std::shared_ptr<xtd::socket::ipv4_tcp_stream> oClientSocket(new xtd::socket::ipv4_tcp_stream(std::move(oClient)));
258  std::thread oClientThread([&, oClientSocket, oInvokeHandler](){
259  oClientSocket->onError.connect([&ExitThread](){
260  ERR("Socket error");
261  ExitThread = true;
262  });
263  oClientSocket->onRead.connect([&](){
264  oClientSocket->read<payload::_super_t>(oPayload);
265  if (!oHandler(oPayload)){
266  ExitThread = true;
267  }
268  });
269  oClientSocket->onWrite.connect([&](){
270  if (oPayload.size()){
271  oClientSocket->write<payload::_super_t>(oPayload);
272  }
273  });
274  payload oPayload;
275  for (;;){
276  oClientSocket->select(250);
277  }
278  });
279  oClientThread.detach();
280  }
281  });
282  oServerStarted->get_future().get();
283  }
284  void stop_server(){
285  //TODO: implement stop_server
286  TODO("implement stop_server")
287  }
288  void transact(payload& oPayload){
289  if (!_ClientConnected){
290  _Socket.connect(_Address);
291  _ClientConnected = true;
292  }
293  _Socket.write<typename payload::_super_t>(oPayload);
294  _Socket.read<typename payload::_super_t>(oPayload);
295 
296  }
297 
298  };
299 
303 
304  struct globals{
305  std::thread _ServerThread;
306  std::mutex _CallLock;
307  std::condition_variable _CallCheck;
308  std::promise<void> _ServerThreadStarted;
309  bool _ServerThreadExit = false;
310 
311  struct transport_info{
312  using pointer = std::shared_ptr<transport_info>;
313  using stack = xtd::concurrent::stack<pointer>;
314  std::promise<void> _Processed;
315  payload oPayload;
316  };
317 
318  transport_info::stack _TransportInfo;
319 
320  static globals& get(){
321  static globals _globals;
322  return _globals;
323  }
324  };
325 
326  public:
327 
328  using pointer_type = std::shared_ptr<null_transport>;
329 
330  void start_server(payload::invoke_handler_type oHandler){
331  std::shared_ptr<payload::invoke_handler_type> oInvokeHandler(new payload::invoke_handler_type(oHandler));
332  globals::get()._ServerThread = std::thread([&, oInvokeHandler]{
333  globals::get()._ServerThreadStarted.set_value();
334  globals::transport_info::pointer pTransportInfo;
335  while (!globals::get()._ServerThreadExit){
336  {
337  std::unique_lock<std::mutex> oLock(globals::get()._CallLock);
338  globals::get()._CallCheck.wait(oLock, [&]{ return globals::get()._TransportInfo.try_pop(pTransportInfo); });
339  oLock.unlock();
340  globals::get()._CallCheck.notify_one();
341  }
342  if (pTransportInfo->oPayload.size()){
343  (*oInvokeHandler)(pTransportInfo->oPayload);
344  }
345  pTransportInfo->_Processed.set_value();
346  }
347  });
348  globals::get()._ServerThreadStarted.get_future().get();
349  }
350  void stop_server(){
351  globals::get()._ServerThreadExit = true;
352  globals::transport_info::pointer pTransportInfo(new globals::transport_info);
353  {
354  std::lock_guard<std::mutex> oLock(globals::get()._CallLock);
355  globals::get()._TransportInfo.push(pTransportInfo);
356  globals::get()._CallCheck.notify_one();
357  }
358  pTransportInfo->_Processed.get_future().get();
359 
360  globals::get()._ServerThread.join();
361  }
362 
363  void transact(payload& oPayload){
364  globals::transport_info::pointer pTransportInfo(new globals::transport_info);
365  pTransportInfo->oPayload = oPayload;
366  {
367  std::lock_guard<std::mutex> oLock(globals::get()._CallLock);
368  globals::get()._TransportInfo.push(pTransportInfo);
369  globals::get()._CallCheck.notify_one();
370  }
371  pTransportInfo->_Processed.get_future().get();
372  }
373  };
374 
375 
380  template <class _TransportT, class _DeclT> class server_impl<_TransportT, _DeclT>{
381  protected:
382 
383  public:
384  virtual bool call_handler(payload&){
385  return false;
386  }
387 
388  ~server_impl(){
389  try{
390  stop_server();
391  }
392  catch (const xtd::exception& ex){
393  ERR("An unhandled xtd::exception occured while stopping the transport: ", ex.what());
394  }
395  }
396 
397 
398  typename _TransportT::pointer_type _Transport;
399 
400  server_impl(typename _TransportT::pointer_type& oTransport) : _Transport(oTransport){}
401 
402  template <typename ... _XportCtorTs> server_impl(_XportCtorTs&&...oParams) : _Transport(new _TransportT(std::forward<_XportCtorTs>(oParams)...)){}
403 
404  void start_server(){
405  _Transport->start_server([this](payload& oPayload)->bool{ return static_cast<_DeclT*>(this)->call_handler(oPayload); });
406  }
407 
408  void stop_server(){
409  _Transport->stop_server();
410  }
411 
412 
413  };
414 
415 
416  template <class _TransportT, class _DeclT, class _HeadT, class ... _TailT>
417  class server_impl<_TransportT, _DeclT, _HeadT, _TailT...>
418  : public server_impl<_TransportT, _DeclT, _TailT...>{
419  protected:
420  using _super_t = server_impl<_TransportT, _DeclT, _TailT...>;
421  using function_type = typename _HeadT::function_type;
422  function_type _Function;
423 
424  template <typename _CallT, typename _Param> void _attach(typename std::enable_if<std::is_same<_CallT, _HeadT>::value, _Param>::type oParam){
425  _Function = oParam;
426  }
427 
428  template <typename _CallT, typename _ParamT> void _attach(_ParamT oCallImpl){ _super_t::template _attach<_CallT, _ParamT>(oCallImpl); }
429 
430 
431 
432  public:
433 
434 
435  template <typename ... _XportCtorTs> server_impl(_XportCtorTs&&...oParams) : _super_t(std::forward<_XportCtorTs>(oParams)...){}
436 
437  virtual bool call_handler(payload& oPayload) override{
438  if (typeid(_HeadT).hash_code() != oPayload.peek<size_t>()){
439  return _super_t::call_handler(oPayload);
440  }
441  TODO("invoke call")
442  return true;
443  }
444 
445  template <typename _CallT, typename _ParamT> void attach(_ParamT oCallImpl){ _attach<_CallT, _ParamT>(oCallImpl); }
446 
447  };
448 
449  template <class _TransportT, class ... _Calls> class server : public server_impl<_TransportT, server<_TransportT, _Calls...>, _Calls...>{
450  using _super_t = server_impl<_TransportT, server<_TransportT, _Calls...>, _Calls...>;
451  public:
452  using client_type = client<_TransportT, _Calls...>;
453  template <typename ... _XportCtorTs> server(_XportCtorTs&&...oParams) : _super_t(std::forward<_XportCtorTs>(oParams)...){}
454 
455  };
456 
457 
458  template<typename _TransportT >
459  class client<_TransportT>{
460  protected:
461  _TransportT _Transport;
462 
463  public:
465  template<typename ... _XportCtorTs> explicit client(_XportCtorTs &&...oArgs) : _Transport(std::forward<_XportCtorTs>(oArgs)...){}
466 
467  _TransportT& transport(){ return _Transport; }
468  const _TransportT& transport() const { return _Transport; }
469 
470  void connect(){ _Transport.connect(); }
471 
472  };
473 
474 
475  template<typename _TransportT, class _HeadT, class ..._TailT >
476  class client<_TransportT, _HeadT, _TailT...> : public client<_TransportT, _TailT...>{
477  protected:
478  using _super_t = client<_TransportT, _TailT...>;
479 
480  template <typename _CallT, typename ..._ParamTs>
481  typename _HeadT::return_type _call(typename std::enable_if<std::is_same<_CallT, _HeadT>::value, const std::type_info&>::type, _ParamTs&&... oParams) {
482  payload oPayload;
483  typename _HeadT::return_type oRet;
484  _HeadT::upload_marshaler_type::marshal(oPayload, typeid(_CallT).hash_code(), std::forward<_ParamTs>(oParams)...);
485  _super_t::_Transport.transact(oPayload);
486  _HeadT::download_marshaler_type::unmarshal(oPayload, std::forward<_ParamTs>(oParams)..., oRet);
487  return oRet;
488  }
489 
490  template <typename _CallT, typename ..._ParamTs>
491  typename _CallT::return_type _call(typename std::enable_if<!std::is_same<_CallT, _HeadT>::value, const std::type_info&>::type oType, _ParamTs&&... oParams) {
492  return _super_t::template _call<_CallT>(oType, std::forward<_ParamTs>(oParams)...);
493  }
494 
495  public:
496  using server_type = server<_TransportT, _HeadT, _TailT...>;
497  template<typename ... _XportCtorTs> explicit client(_XportCtorTs&&...oArgs) : client<_TransportT, _TailT...>(std::forward<_XportCtorTs>(oArgs)...){}
498 
499 
500 
501  template <typename _Ty, typename ... _ParamTs>
502  typename _Ty::return_type call(_ParamTs&&...oParams) {
503  return _call<_Ty>(typeid(_Ty), std::forward<_ParamTs>(oParams)...);
504  }
505 
506 
507  };
508 
509  }
510 }
std::exception _super_t
shortcut typedef of the super class
Definition: exception.hpp:30
tcp/ip transport
Definition: rpc.hpp:232
represents an exception with the RPC protocol typically occurs as the result of failing to parse payl...
Definition: rpc.hpp:43
generic-form marshaler_base Used to recursively marshal a parameter pack of data into a payload ...
Definition: rpc.hpp:32
socket_base< ipv4address, socket_type::stream, socket_protocol::tcp, socket_options, ip_options, tcp_options, connectable_socket, bindable_socket, listening_socket, selectable_socket > ipv4_tcp_stream
General purpose IPV4 client and server socket type.
Definition: socket.hpp:610
const char * what() const noexceptoverride
}@
Definition: exception.hpp:69
contains the packed call data on the wire that is transported between clients and servers ...
Definition: rpc.hpp:62
general purpose socket communication
host, target and build configurations and settings Various components are purpose built for specific ...
Debugging.
Base exception for XTL.
Definition: exception.hpp:27
IPv4 address wrapper around sockaddr_in.
Definition: socket.hpp:114
concurrently insert, query and delete items in an unordered hash map
Dummy transport used for debugging.
Definition: rpc.hpp:302
memory related methods
generic-form marshaler used by clients and servers
Definition: rpc.hpp:27