1 /**
2     nanomsg is a socket library that provides several common communication patterns. It aims to make the networking layer fast, scalable, and easy to use.
3     Implemented in C, it works on a wide range of operating systems with no further dependencies.
4 
5     This module implements a convenience wrapper API for nanomsg
6 
7     Authors: Laeeth Isharc and Atila Neves (Kaleidic Associates Advisory Limited)
8 
9     Example code:
10         http://nanomsg.code.kaleidic.io/nanomsg.wrap.responder.html
11         http://nanomsg.code.kaleidic.io/nanomsg.wrap.checkNanoSocket.html
12         http://nanomsg.code.kaleidic.io/examples/nanomsg.examples.html
13 
14  */
15 module nanomsg.wrap;
16 
17 
18 import nanomsg.bindings;
19 import concepts: models;
20 public import std.typecons: Yes, No; // to facilitate using send, receive
21 
22 
23 /// wrapper for a string uri to connect to
24 struct ConnectTo {
25     string uri;
26 }
27 
28 /// wrapper for a string uri to bind to
29 struct BindTo {
30 
31     this(in string uri) @safe pure {
32         this([uri]);
33     }
34 
35     this(in string[] uris) @safe pure {
36         this.uris = uris.dup;
37     }
38 
39     string[] uris;
40 }
41 
42 struct TotalDuration {
43     import std.datetime: Duration;
44     Duration value;
45     alias value this;
46 }
47 
48 struct RetryDuration {
49     import std.datetime: Duration;
50     Duration value;
51     alias value this;
52 }
53 
54 
55 /**
56 
57     NanoSocket - high level wrapper for a nanomsg socket
58 
59 */
60 @models!(NanoSocket, isNanoSocket)
61 struct NanoSocket {
62 
63     import std.traits: isArray;
64     import std.typecons: Flag;
65     import std.datetime: Duration;
66 
67     /// nanomsg protocol
68     enum Protocol {
69         request,
70         response,
71         subscribe,
72         publish,
73         pull,
74         push,
75         pair,
76         surveyor,
77         respondent,
78         bus,
79     }
80 
81     /// nanomsg socket options
82     enum Option {
83         lingerMs, /// How long to try and send pending messages after nn_close. -1 means infinite
84         sendBufferSize, // Size of the send buffer in bytes
85         receiveBufferSize, // Size of the receive buffer in bytes
86         receiveMaxSize, /// Maximum message size that can be received, in bytes
87         sendTimeoutMs, /// How long in milliseconds it takes for send to timeout
88         receiveTimeoutMs, /// How long in milliseconds it takes for receive to timeout
89         reconnectIntervalMs, /// How long to wait to re-establish connection
90         reconnectIntervalMax, /// Maximum reconnect interval
91         sendPriority, /// Outbound priority for endpoints added to socket
92         receivePriority, /// Inbout priority for endpoints added to socket
93         ipv4Only, /// Self-explanatory
94         socketName, /// Socket name for error reporting and statistics
95         timeToLive, /// Number of hops before message is dropped
96         subscribeTopic, /// Subscribe to topic
97         unsubscribeTopic, /// Unsubscribe to topic
98         tcpNoDelay, /// Disables Nagle's algorithm
99         surveyorDeadlineMs, /// How long to wait for responses in milliseconds
100     }
101 
102     /// this(this) disabled to avoid sockets being destroyed
103     @disable this(this);
104 
105     /// invalid FD
106     enum INVALID_FD = -1;
107 
108     /// constructor
109     this(in Protocol protocol, in int domain = AF_SP) @safe {
110         initialize(protocol, domain);
111     }
112 
113     /// constructor
114     this(in Protocol protocol, in BindTo bindTo, int domain = AF_SP) @safe {
115         initialize(protocol, bindTo, domain);
116     }
117 
118     /// constructor
119     this(in Protocol protocol, in ConnectTo connectTo, int domain = AF_SP) @safe {
120         initialize(protocol, connectTo, domain);
121     }
122 
123     /// destructor
124     ~this() @safe nothrow {
125         close;
126     }
127 
128     /// Same as the revelant constructor, can be used on exisiting objects
129     void initialize(in Protocol protocol, in int domain = AF_SP) @trusted {
130         int protocolToInt(Protocol protocol) {
131             final switch(protocol) with(Protocol) {
132                 case request:
133                     return NN_REQ;
134                 case response:
135                     return NN_REP;
136                 case publish:
137                     return NN_PUB;
138                 case subscribe:
139                     return NN_SUB;
140                 case pull:
141                     return NN_PULL;
142                 case push:
143                     return NN_PUSH;
144                 case pair:
145                     return NN_PAIR;
146                 case surveyor:
147                     return NN_SURVEYOR;
148                 case respondent:
149                     return NN_RESPONDENT;
150                 case bus:
151                     return NN_BUS;
152                 }
153         }
154 
155         close; // init can be called twice
156         _nanoSock = nn_socket(domain, protocolToInt(protocol));
157         _protocol = protocol;
158         enforceNanoMsgRet(_nanoSock);
159     }
160 
161     /// Same as the revelant constructor, can be used on exisiting objects
162     void initialize(in Protocol protocol, in BindTo bindTo, int domain = AF_SP) @trusted {
163         import std.string: replace;
164 
165         initialize(protocol, domain);
166 
167         // this is so it's easy to specify the same string
168         // for both ends of the socket
169         foreach(uri; bindTo.uris) {
170             bind(uri.replace("localhost", "*"));
171         }
172     }
173 
174     /// Same as the revelant constructor, can be used on exisiting objects
175     void initialize(in Protocol protocol, in ConnectTo connectTo, int domain = AF_SP) @trusted {
176         initialize(protocol, domain);
177         connect(connectTo.uri);
178 
179         version(Windows) {
180             // on Windows sometimes the socket tries to send before the TCP handshake
181             import core.thread;
182             Thread.sleep(100.msecs);
183         }
184     }
185 
186     /// close socket
187     void close() @trusted nothrow {
188         if(_nanoSock != INVALID_FD) {
189             _nanoSock.nn_close;
190         }
191     }
192 
193     /// set socket option to a value
194     ref inout(NanoSocket) setOption(T)(Option option, T val) inout {
195         const optionC = toOptionC(option);
196         setOption(optionC.level, optionC.option, val);
197         return this;
198     }
199 
200     /// get socket option value
201     T getOption(T)(Option option) const {
202         const optionC = toOptionC(option);
203         return getOption!T(optionC.level, optionC.option);
204     }
205 
206     /**
207        Receive bytes on this socket.
208        Memory is allocated by nanomsg and deleted in the `NanoBuffer` destructor.
209      */
210     NanoBuffer receive(Flag!"blocking" blocking = Yes.blocking,
211                        in string file = __FILE__,
212                        in size_t line = __LINE__)
213         @safe scope return const
214     {
215         static void[] buffer;
216         return receiveImpl(buffer, blocking, file, line);
217     }
218 
219     /**
220        A version of `receive` that takes a user supplied buffer to fill
221      */
222     void[] receive(return scope void[] buffer,
223                    Flag!"blocking" blocking = Yes.blocking,
224                    in string file = __FILE__,
225                    in size_t line = __LINE__)
226         const @safe
227     {
228         auto ptr = &buffer[0];
229         return receiveImpl(buffer, blocking, file, line).bytes;
230     }
231 
232 
233     /**
234        Sends the bytes as expected. If the protocol is Request, then returns
235        the response, otherwise returns a view of the input data.
236      */
237     NanoBuffer send(T)(T[] data,
238                        Flag!"blocking" blocking = Yes.blocking,
239                        in string file = __FILE__,
240                        in size_t line = __LINE__)
241         const
242     {
243         import std.conv: text;
244 
245         const sent = () @trusted { return nn_send(_nanoSock, data.ptr, data.length, flags(blocking)); }();
246         enforceNanoMsgRet(sent, file, line);
247 
248         void[] empty;
249         return () @trusted { return _protocol == Protocol.request
250                 ? receive(blocking)
251                 : NanoBuffer(cast(void[]) data, false /*shouldDelete*/);
252         }();
253     }
254 
255     /**
256      Tries to send bytes to the other side.
257      duration is how long to try for
258      recvBlocking controls whether or not to block on reception of a response.
259      This only matters when the protocol is request/response
260      Returns the response if in request mode, otherwise an empty byte slice.
261      */
262     auto trySend(T)(T[] data, Duration totalDuration, Flag!"blocking" recvBlocking = Yes.blocking) {
263         import std.datetime: msecs;
264         return trySend(data, TotalDuration(totalDuration), RetryDuration(10.msecs), recvBlocking);
265     }
266 
267     /**
268      Tries to send bytes to the other side.
269      duration is how long to try for
270      recvBlocking controls whether or not to block on reception of a response.
271      This only matters when the protocol is request/response
272      Returns the response if in request mode, otherwise an empty byte slice.
273      */
274     NanoBuffer trySend(T)(T[] data,
275                           TotalDuration totalDuration,
276                           RetryDuration retryDuration,
277                           Flag!"blocking" recvBlocking = Yes.blocking)
278     {
279         import std.exception: enforce;
280         static if(__VERSION__ >= 2077)
281             import std.datetime.stopwatch: StopWatch, AutoStart;
282         else
283             import std.datetime: StopWatch, AutoStart;
284         import std.datetime: msecs;
285         import std.conv: text;
286         import core.thread: Thread;
287 
288         int sent;
289         auto sw = StopWatch(AutoStart.yes);
290         do {
291             sent = () @trusted { return nn_send(_nanoSock, &data[0], data.length, flags(No.blocking)); }();
292             if(sent != data.length) () @trusted { Thread.sleep(retryDuration); }();
293         } while(sent != data.length && cast(Duration) sw.peek < totalDuration);
294 
295         enforce(sent == data.length,
296                 text("Expected to send ", data.length, " bytes but sent ", sent));
297 
298         return _protocol == Protocol.request ? receive(recvBlocking) : NanoBuffer();
299     }
300 
301     /// connect
302     void connect(in string uri, in string file = __FILE__, in size_t line = __LINE__) {
303         import std.string: toStringz;
304         enforceNanoMsgRet(nn_connect(_nanoSock, uri.toStringz), file, line);
305         _uri = uri;
306         _connection = Connection.connected;
307     }
308 
309     /// bind
310     void bind(in string uri, in string file = __FILE__, in size_t line = __LINE__) {
311         import std.string: toStringz;
312         enforceNanoMsgRet(nn_bind(_nanoSock, uri.toStringz), file, line);
313         _uri = uri;
314         _connection = Connection.bound;
315     }
316 
317     /// get protocol
318     Protocol protocol() @safe @nogc pure const nothrow {
319         return _protocol;
320     }
321 
322     /// get URI
323     string uri() @safe @nogc pure const nothrow {
324         return _uri;
325     }
326 
327     /// toString
328     string toString() @safe pure const {
329         import std.conv: text;
330 
331         if(_connection == Connection.none)
332             return text(protocol);
333 
334         const connText = _connection == Connection.bound ? "@" : "@@";
335         return text(_protocol, connText, _uri);
336     }
337 
338     /// get the underlying nanomsg socket file descriptor.
339     /// Bear in mind that NanoSocket contains state about
340     /// the socket options, so changing these from the
341     /// raw fd will result in unpredictable behaviour
342     int nanoSock() @system pure const @nogc nothrow {
343         return _nanoSock;
344     }
345 
346 private:
347 
348     enum Connection {
349         none,
350         bound,
351         connected,
352     }
353 
354     int _nanoSock = INVALID_FD;
355     Protocol _protocol;
356     string _uri;
357     Connection _connection;
358 
359     NanoBuffer receiveImpl(return scope void[] buffer,
360                            Flag!"blocking" blocking = Yes.blocking,
361                            in string file = __FILE__,
362                            in size_t line = __LINE__)
363         @safe return scope const
364     {
365         import std.algorithm: min;
366         static import core.stdc.errno;
367 
368         void* nanomsgBuffer = null;
369         // can't use &buffer[0] here since it might be empty
370         const haveBuffer = () @trusted { return buffer.ptr !is null; }();
371         const shouldDelete = !haveBuffer;
372 
373         auto recvPointer = () @trusted {
374             return haveBuffer ? &buffer[0] : cast(void*) &nanomsgBuffer;
375         }();
376 
377 
378         const length = haveBuffer ? buffer.length : NN_MSG;
379         const numBytes = () @trusted { return nn_recv(_nanoSock, recvPointer, length, flags(blocking)); }();
380 
381         bool isErrnoEagain() @trusted {
382             return nn_errno == core.stdc.errno.EAGAIN;
383         }
384 
385         if(blocking || (numBytes < 0 && !isErrnoEagain)) {
386             enforceNanoMsgRet(numBytes, file, line);
387         }
388 
389         auto pointer = haveBuffer ? &buffer[0] : nanomsgBuffer;
390         const retSliceLength = haveBuffer ? min(numBytes, buffer.length) : numBytes;
391 
392         return numBytes >= 0
393             ? NanoBuffer(() @trusted { return pointer[0 .. retSliceLength]; }(), shouldDelete)
394             : NanoBuffer();
395     }
396 
397     // the int level and option values needed by the nanomsg C API
398     static struct OptionC {
399         int level;
400         int option;
401     }
402 
403     static OptionC toOptionC(Option option) @safe {
404         final switch(option) with(Option) {
405             case lingerMs:
406                 return OptionC(NN_SOL_SOCKET, NN_LINGER);
407 
408             case sendBufferSize:
409                 return OptionC(NN_SOL_SOCKET, NN_SNDBUF);
410 
411             case receiveBufferSize:
412                 return OptionC(NN_SOL_SOCKET, NN_RCVBUF);
413 
414             case receiveMaxSize:
415                 return OptionC(NN_SOL_SOCKET, NN_RCVMAXSIZE);
416 
417             case sendTimeoutMs:
418                 return OptionC(NN_SOL_SOCKET, NN_SNDTIMEO);
419 
420             case receiveTimeoutMs:
421                 return OptionC(NN_SOL_SOCKET, NN_RCVTIMEO);
422 
423             case reconnectIntervalMs:
424                 return OptionC(NN_SOL_SOCKET, NN_RECONNECT_IVL);
425 
426             case reconnectIntervalMax:
427                 return OptionC(NN_SOL_SOCKET, NN_RECONNECT_IVL_MAX);
428 
429             case sendPriority:
430                 return OptionC(NN_SOL_SOCKET, NN_SNDPRIO);
431 
432             case receivePriority:
433                 return OptionC(NN_SOL_SOCKET, NN_RCVPRIO);
434 
435             case ipv4Only:
436                 return OptionC(NN_SOL_SOCKET, NN_IPV4ONLY);
437 
438             case socketName:
439                 return OptionC(NN_SOL_SOCKET, NN_SOCKET_NAME);
440 
441             case timeToLive:
442                 return OptionC(NN_SOL_SOCKET, NN_TTL);
443 
444             case subscribeTopic:
445                 return OptionC(NN_SUB, NN_SUB_SUBSCRIBE);
446 
447             case unsubscribeTopic:
448                 return OptionC(NN_SUB, NN_SUB_UNSUBSCRIBE);
449 
450             case tcpNoDelay:
451                 return OptionC(NN_TCP, NN_TCP_NODELAY);
452 
453             case surveyorDeadlineMs:
454                 return OptionC(NN_SURVEYOR, NN_SURVEYOR_DEADLINE);
455         }
456     }
457 
458     void setOption(T)(int level, int option, ref T val) const if(isArray!T) {
459         const ret = () @trusted { return nn_setsockopt(_nanoSock, level, option, val.ptr, val.length); }();
460         enforceNanoMsgRet(ret);
461     }
462 
463     void setOption(T)(int level, int option, T val) const if(!isArray!T) {
464         const ret = () @trusted { return nn_setsockopt(_nanoSock, level, option, &val, val.sizeof); }();
465         enforceNanoMsgRet(ret);
466     }
467 
468     T getOption(T)(int level, int option) const if(isArray!T) {
469         import std.traits: Unqual;
470         import std.conv: to;
471 
472         // ElementType!string is dchar, and we don't want that,
473         // so instead we use this typeof
474         alias U = Unqual!(typeof(T.init[0]))[1000];
475         U val;
476         ulong length = val.length;
477         const ret = () @trusted {
478             return nn_getsockopt(_nanoSock, level, option, cast(void*) val.ptr, &length);
479         }();
480         enforceNanoMsgRet(ret);
481         return val[0 .. length].to!T;
482     }
483 
484     T getOption(T)(int level, int option) const if(!isArray!T) {
485         import std.exception: enforce;
486         import std.conv: text;
487 
488         T val;
489         size_t length = T.sizeof;
490         const ret = () @trusted { return nn_getsockopt(_nanoSock, level, option, cast(void*) &val, &length); }();
491         enforceNanoMsgRet(ret);
492         enforce(length == T.sizeof,
493                 text("getsockopt returned ", length, " but sizeof(", T.stringof, ") is ", T.sizeof));
494         return val;
495     }
496 
497     static int flags(Flag!"blocking" blocking) @safe @nogc pure nothrow {
498         return blocking ? 0 : NN_DONTWAIT;
499     }
500 }
501 
502 int enforceNanoMsgRet(E)(lazy E expr, string file = __FILE__, size_t line = __LINE__) @trusted {
503     import std.conv: text;
504     const value = expr();
505     if(value < 0) {
506         version(NanomsgWrapperNoGcException) {
507             import nogc: NoGcException;
508             NoGcException.throwNewWithFileAndLine(
509                 file, line, "nanomsg expression failed with value ", numBytes,
510                 " errno ", nn_errno, ", error: ", nn_strerror(nn_errno));
511         } else {
512             throw new Exception(text("nanomsg expression failed with value ", value,
513                                      " errno ", nn_errno, ", error: ", nn_strerror(nn_errno)),
514                                 file,
515                                 line);
516         }
517     }
518     return value;
519 }
520 
521 /// check nanomsg socket
522 void checkNanoSocket(T)() {
523     T s = T(NanoSocket.Protocol.subscribe, ConnectTo("foobar"));
524     s.send("foobar");
525     s.setOption(NanoSocket.Option.subscribeTopic, "topic");
526     s.setOption(NanoSocket.Option.receiveTimeoutMs, 100);
527     auto msg = s.receive(Yes.blocking);
528     void[] bytes = msg.bytes;
529     s.send(bytes);
530 }
531 
532 enum isNanoSocket(T) = is(typeof(checkNanoSocket!T));
533 
534 
535 /// RAII struct for nn_freemsg
536 struct NanoBuffer {
537 
538     /// Could be allocated by nanomsg
539     void[] bytes;
540     private bool shouldDelete;
541 
542     private this(void[] bytes, bool shouldDelete) @safe @nogc pure nothrow scope {
543         this.bytes = bytes;
544         this.shouldDelete = shouldDelete;
545     }
546 
547     @disable this(this);
548 
549     ~this() @trusted @nogc scope {
550         import nanomsg.bindings: nn_freemsg;
551         if(shouldDelete && bytes.length > 0) nn_freemsg(&bytes[0]);
552     }
553 }