12 #include <condition_variable>
22 template <
typename _DeclT,
typename _CallT>
class rpc_call;
27 template <
bool _SkipByVal,
typename ...>
class marshaler;
33 template <
class _TransportT,
class _DeclT,
class ... _Calls>
class server_impl;
34 template <
typename _TransportT,
class ... _Calls>
class client;
56 template <
typename ... _ArgTs>
bad_call(_ArgTs&&...oArgs) :
_super_t(std::forward<_ArgTs>(oArgs)...){}
62 class payload :
public std::vector<uint8_t>{
65 using _super_t = std::vector<uint8_t>;
66 using invoke_handler_type = std::function<bool(payload&)>;
68 template <
typename ... _ArgTs>
payload(_ArgTs&&...oArgs) : _super_t(std::forward<_ArgTs>(oArgs)...){}
70 template <
typename _Ty> _Ty peek()
const{
72 return _Ty(*reinterpret_cast<const _Ty*>(&_super_t::at(0)));
79 static void marshal(
payload&){ }
80 static void unmarshal(
payload&){}
87 template <
typename _Ty>
90 static void marshal(
payload& oPayload,
const _Ty& val){
91 static_assert(std::is_pod<_Ty>::value,
"Invalid specialization");
92 oPayload.insert(oPayload.end(),
reinterpret_cast<const uint8_t*
>(&val),
sizeof(_Ty) +
reinterpret_cast<const uint8_t*
>(&val));
94 static _Ty unmarshal(
payload& oPayload){
95 static_assert(std::is_pod<_Ty>::value,
"Invalid specialization");
96 _Ty oRet(*reinterpret_cast<_Ty*>(&oPayload[0]));
97 oPayload.erase(oPayload.begin(), oPayload.begin() +
sizeof(_Ty));
103 template <
typename _Ty,
size_t _Len>
106 static void marshal(
payload& oPayload,
const _Ty(&val)[_Len]){
116 static void marshal(
payload& oPayload,
const std::vector<_Ty>& val){
120 static std::vector < _Ty> unmarshal(
payload& oPayload){
121 std::vector < _Ty> oRet;
123 for (std::size_t i = 0; i < iSize; i++){
135 static void marshal(
payload & oPayload,
const std::string& val){
137 oPayload.insert(oPayload.end(), val.cbegin(), val.cend());
139 static std::string unmarshal(
payload& oPayload){
142 std::string oRet(oPayload.begin(), oPayload.begin() + iSize);
143 oPayload.erase(oPayload.begin(), oPayload.begin() + iSize);
151 static void marshal(
payload&){}
152 static void unmarshal(
payload&){}
158 template <
typename _Ty,
typename..._ArgTs>
class marshaler<true, const _Ty&, _ArgTs...>{
160 static void marshal(
payload& oPayload,
const _Ty&, _ArgTs&&...oArgs){
163 static void unmarshal(
payload& oPayload,
const _Ty&, _ArgTs&&...oArgs){
172 template <
typename _Ty,
typename ..._ArgTs>
class marshaler<true, _Ty, _ArgTs...>{
174 static void marshal(
payload& oPayload,
const _Ty&, _ArgTs&&...oArgs){
177 static void unmarshal(
payload& oPayload,
const _Ty&, _ArgTs&&...oArgs){
186 template <
typename _Ty,
typename ..._ArgTs>
class marshaler<false, _Ty&, _ArgTs...>{
188 static void marshal(
payload& oPayload,
const _Ty& value, _ArgTs&&...oArgs){
192 static void unmarshal(
payload& oPayload, _Ty& value, _ArgTs&&...oArgs){
201 template <
typename _Ty,
typename ..._ArgTs>
class marshaler<false, _Ty, _ArgTs...>{
203 static void marshal(
payload& oPayload,
const _Ty& value, _ArgTs&&...oArgs){
207 static void unmarshal(
payload& oPayload, _Ty& value, _ArgTs&&...oArgs){
220 template <
typename _DeclT,
typename _ReturnT,
typename ... _ArgTs>
class rpc_call<_DeclT, _ReturnT(_ArgTs...)>{
223 using return_type = _ReturnT;
224 using function_type = std::function<_ReturnT(_ArgTs...)>;
235 std::unique_ptr<std::thread> _ServerThread;
237 xtd::concurrent::hash_map<std::thread::id, std::thread> _Clients;
238 bool _ClientConnected;
241 using pointer_type = std::shared_ptr<tcp_transport>;
245 void start_server(payload::invoke_handler_type oHandler){
247 std::shared_ptr<std::promise<void>> oServerStarted(
new std::promise<void>);
248 std::shared_ptr<payload::invoke_handler_type> oInvokeHandler(
new payload::invoke_handler_type(oHandler));
249 _ServerThread = xtd::make_unique<std::thread>([&, oInvokeHandler, oServerStarted]{
250 oServerStarted->set_value();
251 _Socket.bind(_Address);
252 bool ExitThread =
false;
254 while (!_StopServer && !ExitThread){
258 std::thread oClientThread([&, oClientSocket, oInvokeHandler](){
259 oClientSocket->onError.connect([&ExitThread](){
263 oClientSocket->onRead.connect([&](){
264 oClientSocket->read<payload::_super_t>(oPayload);
265 if (!oHandler(oPayload)){
269 oClientSocket->onWrite.connect([&](){
270 if (oPayload.size()){
271 oClientSocket->write<payload::_super_t>(oPayload);
276 oClientSocket->select(250);
279 oClientThread.detach();
282 oServerStarted->get_future().get();
286 TODO(
"implement stop_server")
288 void transact(
payload& oPayload){
289 if (!_ClientConnected){
290 _Socket.connect(_Address);
291 _ClientConnected =
true;
293 _Socket.write<
typename payload::_super_t>(oPayload);
294 _Socket.read<
typename payload::_super_t>(oPayload);
305 std::thread _ServerThread;
306 std::mutex _CallLock;
307 std::condition_variable _CallCheck;
308 std::promise<void> _ServerThreadStarted;
309 bool _ServerThreadExit =
false;
311 struct transport_info{
312 using pointer = std::shared_ptr<transport_info>;
314 std::promise<void> _Processed;
318 transport_info::stack _TransportInfo;
320 static globals&
get(){
321 static globals _globals;
328 using pointer_type = std::shared_ptr<null_transport>;
330 void start_server(payload::invoke_handler_type oHandler){
331 std::shared_ptr<payload::invoke_handler_type> oInvokeHandler(
new payload::invoke_handler_type(oHandler));
332 globals::get()._ServerThread = std::thread([&, oInvokeHandler]{
333 globals::get()._ServerThreadStarted.set_value();
334 globals::transport_info::pointer pTransportInfo;
335 while (!globals::get()._ServerThreadExit){
337 std::unique_lock<std::mutex> oLock(globals::get()._CallLock);
338 globals::get()._CallCheck.wait(oLock, [&]{
return globals::get()._TransportInfo.try_pop(pTransportInfo); });
340 globals::get()._CallCheck.notify_one();
342 if (pTransportInfo->oPayload.size()){
343 (*oInvokeHandler)(pTransportInfo->oPayload);
345 pTransportInfo->_Processed.set_value();
348 globals::get()._ServerThreadStarted.get_future().get();
351 globals::get()._ServerThreadExit =
true;
352 globals::transport_info::pointer pTransportInfo(
new globals::transport_info);
354 std::lock_guard<std::mutex> oLock(globals::get()._CallLock);
355 globals::get()._TransportInfo.push(pTransportInfo);
356 globals::get()._CallCheck.notify_one();
358 pTransportInfo->_Processed.get_future().get();
360 globals::get()._ServerThread.join();
363 void transact(
payload& oPayload){
364 globals::transport_info::pointer pTransportInfo(
new globals::transport_info);
365 pTransportInfo->oPayload = oPayload;
367 std::lock_guard<std::mutex> oLock(globals::get()._CallLock);
368 globals::get()._TransportInfo.push(pTransportInfo);
369 globals::get()._CallCheck.notify_one();
371 pTransportInfo->_Processed.get_future().get();
380 template <
class _TransportT,
class _DeclT>
class server_impl<_TransportT, _DeclT>{
384 virtual bool call_handler(
payload&){
393 ERR(
"An unhandled xtd::exception occured while stopping the transport: ", ex.
what());
398 typename _TransportT::pointer_type _Transport;
400 server_impl(
typename _TransportT::pointer_type& oTransport) : _Transport(oTransport){}
402 template <
typename ... _XportCtorTs>
server_impl(_XportCtorTs&&...oParams) : _Transport(
new _TransportT(std::forward<_XportCtorTs>(oParams)...)){}
405 _Transport->start_server([
this](
payload& oPayload)->
bool{
return static_cast<_DeclT*
>(
this)->call_handler(oPayload); });
409 _Transport->stop_server();
416 template <
class _TransportT,
class _DeclT,
class _HeadT,
class ... _TailT>
418 :
public server_impl<_TransportT, _DeclT, _TailT...>{
421 using function_type =
typename _HeadT::function_type;
422 function_type _Function;
424 template <
typename _CallT,
typename _Param>
void _attach(
typename std::enable_if<std::is_same<_CallT, _HeadT>::value, _Param>::type oParam){
428 template <
typename _CallT,
typename _ParamT>
void _attach(_ParamT oCallImpl){ _super_t::template _attach<_CallT, _ParamT>(oCallImpl); }
435 template <
typename ... _XportCtorTs>
server_impl(_XportCtorTs&&...oParams) :
_super_t(std::forward<_XportCtorTs>(oParams)...){}
437 virtual bool call_handler(
payload& oPayload)
override{
438 if (
typeid(_HeadT).hash_code() != oPayload.peek<
size_t>()){
439 return _super_t::call_handler(oPayload);
445 template <
typename _CallT,
typename _ParamT>
void attach(_ParamT oCallImpl){ _attach<_CallT, _ParamT>(oCallImpl); }
449 template <
class _TransportT,
class ... _Calls>
class server :
public server_impl<_TransportT, server<_TransportT, _Calls...>, _Calls...>{
453 template <
typename ... _XportCtorTs>
server(_XportCtorTs&&...oParams) :
_super_t(std::forward<_XportCtorTs>(oParams)...){}
458 template<
typename _TransportT >
461 _TransportT _Transport;
465 template<
typename ... _XportCtorTs>
explicit client(_XportCtorTs &&...oArgs) : _Transport(std::forward<_XportCtorTs>(oArgs)...){}
467 _TransportT& transport(){
return _Transport; }
468 const _TransportT& transport()
const {
return _Transport; }
470 void connect(){ _Transport.connect(); }
475 template<
typename _TransportT,
class _HeadT,
class ..._TailT >
476 class client<_TransportT, _HeadT, _TailT...> :
public client<_TransportT, _TailT...>{
480 template <
typename _CallT,
typename ..._ParamTs>
481 typename _HeadT::return_type _call(
typename std::enable_if<std::is_same<_CallT, _HeadT>::value,
const std::type_info&>::type, _ParamTs&&... oParams) {
483 typename _HeadT::return_type oRet;
484 _HeadT::upload_marshaler_type::marshal(oPayload,
typeid(_CallT).hash_code(), std::forward<_ParamTs>(oParams)...);
485 _super_t::_Transport.transact(oPayload);
486 _HeadT::download_marshaler_type::unmarshal(oPayload, std::forward<_ParamTs>(oParams)..., oRet);
490 template <
typename _CallT,
typename ..._ParamTs>
491 typename _CallT::return_type _call(
typename std::enable_if<!std::is_same<_CallT, _HeadT>::value,
const std::type_info&>::type oType, _ParamTs&&... oParams) {
492 return _super_t::template _call<_CallT>(oType, std::forward<_ParamTs>(oParams)...);
497 template<
typename ... _XportCtorTs>
explicit client(_XportCtorTs&&...oArgs) : client<_TransportT, _TailT...>(std::forward<_XportCtorTs>(oArgs)...){}
501 template <
typename _Ty,
typename ... _ParamTs>
502 typename _Ty::return_type call(_ParamTs&&...oParams) {
503 return _call<_Ty>(
typeid(_Ty), std::forward<_ParamTs>(oParams)...);
std::exception _super_t
shortcut typedef of the super class
represents an exception with the RPC protocol typically occurs as the result of failing to parse payl...
generic-form marshaler_base Used to recursively marshal a parameter pack of data into a payload ...
socket_base< ipv4address, socket_type::stream, socket_protocol::tcp, socket_options, ip_options, tcp_options, connectable_socket, bindable_socket, listening_socket, selectable_socket > ipv4_tcp_stream
General purpose IPV4 client and server socket type.
const char * what() const noexceptoverride
}@
contains the packed call data on the wire that is transported between clients and servers ...
general purpose socket communication
host, target and build configurations and settings Various components are purpose built for specific ...
IPv4 address wrapper around sockaddr_in.
concurrently insert, query and delete items in an unordered hash map
Dummy transport used for debugging.
generic-form marshaler used by clients and servers