1 /**
2     nanomsg is a socket library that provides several common communication patterns. It aims to make the networking layer fast, scalable, and easy to use.
3     Implemented in C, it works on a wide range of operating systems with no further dependencies.
4 
5     This module implements a convenience wrapper API for nanomsg
6 
7     Authors: Laeeth Isharc and Atila Neves (Kaleidic Associates Advisory Limited)
8 
9     Example code:
10         http://nanomsg.code.kaleidic.io/nanomsg.wrap.responder.html
11         http://nanomsg.code.kaleidic.io/nanomsg.wrap.checkNanoSocket.html
12         http://nanomsg.code.kaleidic.io/examples/nanomsg.examples.html
13 
14  */
15 module nanomsg.wrap;
16 
17 
18 import nanomsg.bindings;
19 import concepts: models;
20 public import std.typecons: Yes, No; // to facilitate using send, receive
21 
22 
23 /// wrapper for a string uri to connect to
24 struct ConnectTo {
25     string uri;
26 }
27 
28 /// wrapper for a string uri to bind to
29 struct BindTo {
30 
31     this(in string uri) @safe pure {
32         this([uri]);
33     }
34 
35     this(in string[] uris) @safe pure {
36         this.uris = uris.dup;
37     }
38 
39     string[] uris;
40 }
41 
42 struct TotalDuration {
43     import std.datetime: Duration;
44     Duration value;
45     alias value this;
46 }
47 
48 struct RetryDuration {
49     import std.datetime: Duration;
50     Duration value;
51     alias value this;
52 }
53 
54 
55 /**
56 
57     NanoSocket - high level wrapper for a nanomsg socket
58 
59 */
60 @models!(NanoSocket, isNanoSocket)
61 struct NanoSocket {
62 
63     import std.traits: isArray;
64     import std.typecons: Flag;
65     import std.datetime: Duration;
66 
67     /// nanomsg protocol
68     enum Protocol {
69         request,
70         response,
71         subscribe,
72         publish,
73         pull,
74         push,
75         pair,
76         surveyor,
77         respondent,
78         bus,
79     }
80 
81     /// nanomsg socket options
82     enum Option {
83         lingerMs, /// How long to try and send pending messages after nn_close. -1 means infinite
84         sendBufferSize, // Size of the send buffer in bytes
85         receiveBufferSize, // Size of the receive buffer in bytes
86         receiveMaxSize, /// Maximum message size that can be received, in bytes
87         sendTimeoutMs, /// How long in milliseconds it takes for send to timeout
88         receiveTimeoutMs, /// How long in milliseconds it takes for receive to timeout
89         reconnectIntervalMs, /// How long to wait to re-establish connection
90         reconnectIntervalMax, /// Maximum reconnect interval
91         sendPriority, /// Outbound priority for endpoints added to socket
92         receivePriority, /// Inbout priority for endpoints added to socket
93         ipv4Only, /// Self-explanatory
94         socketName, /// Socket name for error reporting and statistics
95         timeToLive, /// Number of hops before message is dropped
96         subscribeTopic, /// Subscribe to topic
97         unsubscribeTopic, /// Unsubscribe to topic
98         tcpNoDelay, /// Disables Nagle's algorithm
99         surveyorDeadlineMs, /// How long to wait for responses in milliseconds
100     }
101 
102     /// this(this) disabled to avoid sockets being destroyed
103     @disable this(this);
104 
105     /// invalid FD
106     enum INVALID_FD = -1;
107 
108     /// constructor
109     this(in Protocol protocol, in int domain = AF_SP) @safe {
110         initialize(protocol, domain);
111     }
112 
113     /// constructor
114     this(in Protocol protocol, in BindTo bindTo, int domain = AF_SP) @safe {
115         initialize(protocol, bindTo, domain);
116     }
117 
118     /// constructor
119     this(in Protocol protocol, in ConnectTo connectTo, int domain = AF_SP) @safe {
120         initialize(protocol, connectTo, domain);
121     }
122 
123     /// destructor
124     ~this() @safe nothrow {
125         close;
126     }
127 
128     /// Same as the revelant constructor, can be used on exisiting objects
129     void initialize(in Protocol protocol, in int domain = AF_SP) @trusted {
130         int protocolToInt(Protocol protocol) {
131             final switch(protocol) with(Protocol) {
132                 case request:
133                     return NN_REQ;
134                 case response:
135                     return NN_REP;
136                 case publish:
137                     return NN_PUB;
138                 case subscribe:
139                     return NN_SUB;
140                 case pull:
141                     return NN_PULL;
142                 case push:
143                     return NN_PUSH;
144                 case pair:
145                     return NN_PAIR;
146                 case surveyor:
147                     return NN_SURVEYOR;
148                 case respondent:
149                     return NN_RESPONDENT;
150                 case bus:
151                     return NN_BUS;
152                 }
153         }
154 
155         close; // init can be called twice
156         _nanoSock = nn_socket(domain, protocolToInt(protocol));
157         _protocol = protocol;
158         enforceNanoMsgRet(_nanoSock);
159     }
160 
161     /// Same as the revelant constructor, can be used on exisiting objects
162     void initialize(in Protocol protocol, in BindTo bindTo, int domain = AF_SP) @trusted {
163         import std.string: replace;
164 
165         initialize(protocol, domain);
166 
167         // this is so it's easy to specify the same string
168         // for both ends of the socket
169         foreach(uri; bindTo.uris) {
170             bind(uri.replace("localhost", "*"));
171         }
172     }
173 
174     /// Same as the revelant constructor, can be used on exisiting objects
175     void initialize(in Protocol protocol, in ConnectTo connectTo, int domain = AF_SP) @trusted {
176         initialize(protocol, domain);
177         connect(connectTo.uri);
178 
179         version(Windows) {
180             // on Windows sometimes the socket tries to send before the TCP handshake
181             import core.thread;
182             Thread.sleep(100.msecs);
183         }
184     }
185 
186     /// close socket
187     void close() @trusted nothrow {
188         if(_nanoSock != INVALID_FD) {
189             _nanoSock.nn_close;
190         }
191     }
192 
193     /// set socket option to a value
194     ref inout(NanoSocket) setOption(T)(Option option, T val) inout {
195         const optionC = toOptionC(option);
196         setOption(optionC.level, optionC.option, val);
197         return this;
198     }
199 
200     /// get socket option value
201     T getOption(T)(Option option) const {
202         const optionC = toOptionC(option);
203         return getOption!T(optionC.level, optionC.option);
204     }
205 
206     /**
207        Receive bytes on this socket.
208        Memory is allocated by nanomsg and deleted in the `NanoBuffer` destructor.
209      */
210     NanoBuffer receive(Flag!"blocking" blocking = Yes.blocking,
211                        in string file = __FILE__,
212                        in size_t line = __LINE__)
213         @safe scope return const
214     {
215         static void[] buffer;
216         return receiveImpl(buffer, blocking, file, line);
217     }
218 
219     /**
220        A version of `receive` that takes a user supplied buffer to fill
221      */
222     void[] receive(return scope void[] buffer,
223                    Flag!"blocking" blocking = Yes.blocking,
224                    in string file = __FILE__,
225                    in size_t line = __LINE__)
226         const @safe
227     {
228         auto ptr = &buffer[0];
229         return receiveImpl(buffer, blocking, file, line).bytes;
230     }
231 
232 
233     /**
234        Sends the bytes as expected. If the protocol is Request, then returns
235        the response, otherwise returns an empty array.
236      */
237     NanoBuffer send(T)(T[] data,
238                        Flag!"blocking" blocking = Yes.blocking,
239                        in string file = __FILE__,
240                        in size_t line = __LINE__)
241         const
242     {
243         import std.conv: text;
244 
245         const sent = () @trusted { return nn_send(_nanoSock, data.ptr, data.length, flags(blocking)); }();
246         enforceNanoMsgRet(sent, file, line);
247 
248         void[] empty;
249         return () @trusted { return _protocol == Protocol.request
250                 ? receive(blocking)
251                 : NanoBuffer(cast(void[]) data, false /*shouldDelete*/);
252         }();
253     }
254 
255     /**
256      Tries to send bytes to the other side.
257      duration is how long to try for
258      recvBlocking controls whether or not to block on reception of a response.
259      This only matters when the protocol is request/response
260      Returns the response if in request mode, otherwise an empty byte slice.
261      */
262     auto trySend(T)(T[] data, Duration totalDuration, Flag!"blocking" recvBlocking = Yes.blocking) {
263         import std.datetime: msecs;
264         return trySend(data, TotalDuration(totalDuration), RetryDuration(10.msecs), recvBlocking);
265     }
266 
267     /**
268      Tries to send bytes to the other side.
269      duration is how long to try for
270      recvBlocking controls whether or not to block on reception of a response.
271      This only matters when the protocol is request/response
272      Returns the response if in request mode, otherwise an empty byte slice.
273      */
274     NanoBuffer trySend(T)(T[] data,
275                           TotalDuration totalDuration,
276                           RetryDuration retryDuration,
277                           Flag!"blocking" recvBlocking = Yes.blocking)
278     {
279         import std.exception: enforce;
280         static if(__VERSION__ >= 2077)
281             import std.datetime.stopwatch: StopWatch, AutoStart;
282         else
283             import std.datetime: StopWatch, AutoStart;
284         import std.datetime: msecs;
285         import std.conv: text;
286         import core.thread: Thread;
287 
288         int sent;
289         auto sw = StopWatch(AutoStart.yes);
290         do {
291             sent = () @trusted { return nn_send(_nanoSock, &data[0], data.length, flags(No.blocking)); }();
292             if(sent != data.length) () @trusted { Thread.sleep(retryDuration); }();
293         } while(sent != data.length && cast(Duration) sw.peek < totalDuration);
294 
295         enforce(sent == data.length,
296                 text("Expected to send ", data.length, " bytes but sent ", sent));
297 
298         return _protocol == Protocol.request ? receive(recvBlocking) : NanoBuffer();
299     }
300 
301     /// connect
302     void connect(in string uri, in string file = __FILE__, in size_t line = __LINE__) {
303         import std.string: toStringz;
304         enforceNanoMsgRet(nn_connect(_nanoSock, uri.toStringz), file, line);
305         _uri = uri;
306         _connection = Connection.connected;
307     }
308 
309     /// bind
310     void bind(in string uri, in string file = __FILE__, in size_t line = __LINE__) {
311         import std.string: toStringz;
312         enforceNanoMsgRet(nn_bind(_nanoSock, uri.toStringz), file, line);
313         _uri = uri;
314         _connection = Connection.bound;
315     }
316 
317     /// get protocol
318     Protocol protocol() @safe @nogc pure const nothrow {
319         return _protocol;
320     }
321 
322     /// get URI
323     string uri() @safe @nogc pure const nothrow {
324         return _uri;
325     }
326 
327     /// toString
328     string toString() @safe pure const {
329         import std.conv: text;
330 
331         if(_connection == Connection.none)
332             return text(protocol);
333 
334         const connText = _connection == Connection.bound ? "@" : "@@";
335         return text(_protocol, connText, _uri);
336     }
337 
338 private:
339 
340     enum Connection {
341         none,
342         bound,
343         connected,
344     }
345 
346     int _nanoSock = INVALID_FD;
347     Protocol _protocol;
348     string _uri;
349     Connection _connection;
350 
351     NanoBuffer receiveImpl(return scope void[] buffer,
352                            Flag!"blocking" blocking = Yes.blocking,
353                            in string file = __FILE__,
354                            in size_t line = __LINE__)
355         @safe return scope const
356     {
357         import std.algorithm: min;
358         static import core.stdc.errno;
359 
360         void* nanomsgBuffer = null;
361         // can't use &buffer[0] here since it might be empty
362         const haveBuffer = () @trusted { return buffer.ptr !is null; }();
363         const shouldDelete = !haveBuffer;
364 
365         auto recvPointer = () @trusted {
366             return haveBuffer ? &buffer[0] : cast(void*) &nanomsgBuffer;
367         }();
368 
369 
370         const length = haveBuffer ? buffer.length : NN_MSG;
371         const numBytes = () @trusted { return nn_recv(_nanoSock, recvPointer, length, flags(blocking)); }();
372 
373         bool isErrnoEagain() @trusted {
374             return nn_errno == core.stdc.errno.EAGAIN;
375         }
376 
377         if(blocking || (numBytes < 0 && !isErrnoEagain)) {
378             enforceNanoMsgRet(numBytes, file, line);
379         }
380 
381         auto pointer = haveBuffer ? &buffer[0] : nanomsgBuffer;
382         const retSliceLength = haveBuffer ? min(numBytes, buffer.length) : numBytes;
383 
384         return numBytes >= 0
385             ? NanoBuffer(() @trusted { return pointer[0 .. retSliceLength]; }(), shouldDelete)
386             : NanoBuffer();
387     }
388 
389     void enforceNanoMsgRet(E)(lazy E expr, string file = __FILE__, size_t line = __LINE__) @trusted const {
390         import std.conv: text;
391         const value = expr();
392         if(value < 0) {
393             version(NanomsgWrapperNoGcException) {
394                 import nogc: NoGcException;
395                 NoGcException.throwNewWithFileAndLine(
396                     file, line, "nanomsg expression failed with value ", numBytes,
397                     " errno ", nn_errno, ", error: ", nn_strerror(nn_errno));
398             } else {
399                 throw new Exception(text("nanomsg expression failed with value ", value,
400                                          " errno ", nn_errno, ", error: ", nn_strerror(nn_errno)),
401                                     file,
402                                     line);
403             }
404         }
405     }
406 
407     // the int level and option values needed by the nanomsg C API
408     static struct OptionC {
409         int level;
410         int option;
411     }
412 
413     static OptionC toOptionC(Option option) @safe {
414         final switch(option) with(Option) {
415             case lingerMs:
416                 return OptionC(NN_SOL_SOCKET, NN_LINGER);
417 
418             case sendBufferSize:
419                 return OptionC(NN_SOL_SOCKET, NN_SNDBUF);
420 
421             case receiveBufferSize:
422                 return OptionC(NN_SOL_SOCKET, NN_RCVBUF);
423 
424             case receiveMaxSize:
425                 return OptionC(NN_SOL_SOCKET, NN_RCVMAXSIZE);
426 
427             case sendTimeoutMs:
428                 return OptionC(NN_SOL_SOCKET, NN_SNDTIMEO);
429 
430             case receiveTimeoutMs:
431                 return OptionC(NN_SOL_SOCKET, NN_RCVTIMEO);
432 
433             case reconnectIntervalMs:
434                 return OptionC(NN_SOL_SOCKET, NN_RECONNECT_IVL);
435 
436             case reconnectIntervalMax:
437                 return OptionC(NN_SOL_SOCKET, NN_RECONNECT_IVL_MAX);
438 
439             case sendPriority:
440                 return OptionC(NN_SOL_SOCKET, NN_SNDPRIO);
441 
442             case receivePriority:
443                 return OptionC(NN_SOL_SOCKET, NN_RCVPRIO);
444 
445             case ipv4Only:
446                 return OptionC(NN_SOL_SOCKET, NN_IPV4ONLY);
447 
448             case socketName:
449                 return OptionC(NN_SOL_SOCKET, NN_SOCKET_NAME);
450 
451             case timeToLive:
452                 return OptionC(NN_SOL_SOCKET, NN_TTL);
453 
454             case subscribeTopic:
455                 return OptionC(NN_SUB, NN_SUB_SUBSCRIBE);
456 
457             case unsubscribeTopic:
458                 return OptionC(NN_SUB, NN_SUB_UNSUBSCRIBE);
459 
460             case tcpNoDelay:
461                 return OptionC(NN_TCP, NN_TCP_NODELAY);
462 
463             case surveyorDeadlineMs:
464                 return OptionC(NN_SURVEYOR, NN_SURVEYOR_DEADLINE);
465         }
466     }
467 
468     void setOption(T)(int level, int option, ref T val) const if(isArray!T) {
469         const ret = () @trusted { return nn_setsockopt(_nanoSock, level, option, val.ptr, val.length); }();
470         enforceNanoMsgRet(ret);
471     }
472 
473     void setOption(T)(int level, int option, T val) const if(!isArray!T) {
474         const ret = () @trusted { return nn_setsockopt(_nanoSock, level, option, &val, val.sizeof); }();
475         enforceNanoMsgRet(ret);
476     }
477 
478     T getOption(T)(int level, int option) const if(isArray!T) {
479         import std.traits: Unqual;
480         import std.conv: to;
481 
482         // ElementType!string is dchar, and we don't want that,
483         // so instead we use this typeof
484         alias U = Unqual!(typeof(T.init[0]))[1000];
485         U val;
486         ulong length = val.length;
487         const ret = () @trusted {
488             return nn_getsockopt(_nanoSock, level, option, cast(void*) val.ptr, &length);
489         }();
490         enforceNanoMsgRet(ret);
491         return val[0 .. length].to!T;
492     }
493 
494     T getOption(T)(int level, int option) const if(!isArray!T) {
495         import std.exception: enforce;
496         import std.conv: text;
497 
498         T val;
499         size_t length = T.sizeof;
500         const ret = () @trusted { return nn_getsockopt(_nanoSock, level, option, cast(void*) &val, &length); }();
501         enforceNanoMsgRet(ret);
502         enforce(length == T.sizeof,
503                 text("getsockopt returned ", length, " but sizeof(", T.stringof, ") is ", T.sizeof));
504         return val;
505     }
506 
507     static int flags(Flag!"blocking" blocking) @safe @nogc pure nothrow {
508         return blocking ? 0 : NN_DONTWAIT;
509     }
510 }
511 
512 /// check nanomsg socket
513 void checkNanoSocket(T)() {
514     T s = T(NanoSocket.Protocol.subscribe, ConnectTo("foobar"));
515     s.send("foobar");
516     s.setOption(NanoSocket.Option.subscribeTopic, "topic");
517     s.setOption(NanoSocket.Option.receiveTimeoutMs, 100);
518     auto msg = s.receive(Yes.blocking);
519     void[] bytes = msg.bytes;
520     s.send(bytes);
521 }
522 
523 enum isNanoSocket(T) = is(typeof(checkNanoSocket!T));
524 
525 
526 /// RAII struct for nn_freemsg
527 struct NanoBuffer {
528 
529     /// Could be allocated by nanomsg
530     void[] bytes;
531     private bool shouldDelete;
532 
533     private this(void[] bytes, bool shouldDelete) @safe @nogc pure nothrow scope {
534         this.bytes = bytes;
535         this.shouldDelete = shouldDelete;
536     }
537 
538     @disable this(this);
539 
540     ~this() @trusted @nogc scope {
541         import nanomsg.bindings: nn_freemsg;
542         if(shouldDelete && bytes.length > 0) nn_freemsg(&bytes[0]);
543     }
544 }