1 /**
2     nanomsg is a socket library that provides several common communication patterns. It aims to make the networking layer fast, scalable, and easy to use.
3     Implemented in C, it works on a wide range of operating systems with no further dependencies.
4 
5     This module implements a convenience wrapper API for nanomsg
6 
7     Authors: Laeeth Isharc and Atila Neves (Kaleidic Associates Advisory Limited)
8 
9     Example code:
10         http://nanomsg.code.kaleidic.io/nanomsg.wrap.responder.html
11         http://nanomsg.code.kaleidic.io/nanomsg.wrap.checkNanoSocket.html
12         http://nanomsg.code.kaleidic.io/examples/nanomsg.examples.html
13 
14  */
15 module nanomsg.wrap;
16 
17 
18 import nanomsg.bindings;
19 public import std.typecons: Yes, No; // to facilitate using send, receive
20 
21 version(unittest)
22     import unit_threaded;
23 else
24     enum HiddenTest;
25 
26 
27 /// wrapper for a string uri to connect to
28 struct ConnectTo {
29     string uri;
30 }
31 
32 /// wrapper for a string uri to bind to
33 struct BindTo {
34 
35     this(inout(string) uri) inout @safe pure {
36         this([uri]);
37     }
38 
39     this(inout(string)[] uris) inout @safe pure {
40         this.uris = uris;
41     }
42 
43     string[] uris;
44 }
45 
46 struct TotalDuration {
47     import std.datetime: Duration;
48     Duration value;
49     alias value this;
50 }
51 
52 struct RetryDuration {
53     import std.datetime: Duration;
54     Duration value;
55     alias value this;
56 }
57 
58 /**
59 
60     NanoSocket - high level wrapper for a nanomsg socket
61 
62 */
63 struct NanoSocket {
64 
65     import std.traits: isArray;
66     import std.typecons: Flag;
67     import std.datetime: Duration;
68 
69     /// nanomsg protocol
70     enum Protocol {
71         request,
72         response,
73         subscribe,
74         publish,
75         pull,
76         push,
77         pair,
78         surveyor,
79         respondent,
80         bus,
81     }
82 
83     /// nanomsg socket options
84     enum Option {
85         lingerMs, /// How long to try and send pending messages after nn_close. -1 means infinite
86         sendBufferSize, // Size of the send buffer in bytes
87         receiveBufferSize, // Size of the receive buffer in bytes
88         receiveMaxSize, /// Maximum message size that can be received, in bytes
89         sendTimeoutMs, /// How long in milliseconds it takes for send to timeout
90         receiveTimeoutMs, /// How long in milliseconds it takes for receive to timeout
91         reconnectIntervalMs, /// How long to wait to re-establish connection
92         reconnectIntervalMax, /// Maximum reconnect interval
93         sendPriority, /// Outbound priority for endpoints added to socket
94         receivePriority, /// Inbout priority for endpoints added to socket
95         ipv4Only, /// Self-explanatory
96         socketName, /// Socket name for error reporting and statistics
97         timeToLive, /// Number of hops before message is dropped
98         subscribeTopic, /// Subscribe to topic
99         unsubscribeTopic, /// Unsubscribe to topic
100         tcpNoDelay, /// Disables Nagle's algorithm
101         surveyorDeadlineMs, /// How long to wait for responses in milliseconds
102     }
103 
104     /// this(this) disabled to avoid sockets being destroyed
105     @disable this(this);
106 
107     /// invalid FD
108     enum INVALID_FD = -1;
109 
110     /// constructor
111     this(in Protocol protocol, in int domain = AF_SP) @safe {
112         initialize(protocol, domain);
113     }
114 
115     /// constructor
116     this(in Protocol protocol, in BindTo bindTo, int domain = AF_SP) @safe {
117         initialize(protocol, bindTo, domain);
118     }
119 
120     /// constructor
121     this(in Protocol protocol, in ConnectTo connectTo, int domain = AF_SP) @safe {
122         initialize(protocol, connectTo, domain);
123     }
124 
125     /// destructor
126     ~this() @safe nothrow {
127         close;
128     }
129 
130     /// Same as the revelant constructor, can be used on exisiting objects
131     void initialize(in Protocol protocol, in int domain = AF_SP) @trusted {
132         int protocolToInt(Protocol protocol) {
133             final switch(protocol) with(Protocol) {
134                 case request:
135                     return NN_REQ;
136                 case response:
137                     return NN_REP;
138                 case publish:
139                     return NN_PUB;
140                 case subscribe:
141                     return NN_SUB;
142                 case pull:
143                     return NN_PULL;
144                 case push:
145                     return NN_PUSH;
146                 case pair:
147                     return NN_PAIR;
148                 case surveyor:
149                     return NN_SURVEYOR;
150                 case respondent:
151                     return NN_RESPONDENT;
152                 case bus:
153                     return NN_BUS;
154                 }
155         }
156 
157         close; // init can be called twice
158         _nanoSock = nn_socket(domain, protocolToInt(protocol));
159         _protocol = protocol;
160         enforceNanoMsgRet(_nanoSock);
161     }
162 
163     /// Same as the revelant constructor, can be used on exisiting objects
164     void initialize(in Protocol protocol, in BindTo bindTo, int domain = AF_SP) @trusted {
165         import std.string: replace;
166 
167         initialize(protocol, domain);
168 
169         // this is so it's easy to specify the same string
170         // for both ends of the socket
171         foreach(uri; bindTo.uris) {
172             bind(uri.replace("localhost", "*"));
173         }
174     }
175 
176     /// Same as the revelant constructor, can be used on exisiting objects
177     void initialize(in Protocol protocol, in ConnectTo connectTo, int domain = AF_SP) @trusted {
178         initialize(protocol, domain);
179         connect(connectTo.uri);
180 
181         version(Windows) {
182             // on Windows sometimes the socket tries to send before the TCP handshake
183             import core.thread;
184             Thread.sleep(100.msecs);
185         }
186     }
187 
188     /// close socket
189     void close() @trusted nothrow {
190         if(_nanoSock != INVALID_FD) {
191             _nanoSock.nn_close;
192         }
193     }
194 
195     /// set socket option to a value
196     ref inout(NanoSocket) setOption(T)(Option option, T val) inout {
197         const optionC = toOptionC(option);
198         setOption(optionC.level, optionC.option, val);
199         return this;
200     }
201 
202     /// get socket option value
203     T getOption(T)(Option option) const {
204         const optionC = toOptionC(option);
205         return getOption!T(optionC.level, optionC.option);
206     }
207 
208     /// receive
209     ubyte[] receive(int BUF_SIZE = 1024)(Flag!"blocking" blocking = Yes.blocking) const {
210         import std.exception: enforce;
211         import std.conv: text;
212         import core.stdc.errno: EAGAIN, EINTR;
213 
214         ubyte[BUF_SIZE] buf;
215         const flags = blocking ? 0 : NN_DONTWAIT;
216         const numBytes = nn_recv(_nanoSock, buf.ptr, buf.length, flags);
217 
218         if(blocking) enforceNanoMsgRet(numBytes);
219 
220         return numBytes >= 0 ? buf[0 .. numBytes].dup : [];
221     }
222 
223     /**
224        Sends the bytes as expected. If the protocol is Request, then returns
225        the response, otherwise returns an empty array.
226      */
227     ubyte[] send(T)(T[] data,
228                     Flag!"blocking" blocking = Yes.blocking,
229                     in string file = __FILE__,
230                     in size_t line = __LINE__)
231         const
232     {
233         import std.conv: text;
234 
235         const sent = nn_send(_nanoSock, data.ptr, data.length, flags(blocking));
236         if(blocking) {
237             if(sent != data.length)
238                 throw new Exception(text("Expected to send ", data.length, " bytes but sent ", sent), file, line);
239         }
240 
241         ubyte[] empty;
242         return () @trusted { return _protocol == Protocol.request
243                 ? receive(blocking)
244                 : (sent == data.length ? cast(ubyte[])data : empty); }();
245     }
246 
247     /**
248      Tries to send bytes to the other side.
249      duration is how long to try for
250      recvBlocking controls whether or not to block on reception of a response.
251      This only matters when the protocol is request/response
252      Returns the response if in request mode, otherwise an empty byte slice.
253      */
254     ubyte[] trySend(T)(T[] data, Duration totalDuration, Flag!"blocking" recvBlocking = Yes.blocking) {
255         import std.datetime: msecs;
256         return trySend(data, TotalDuration(totalDuration), RetryDuration(10.msecs), recvBlocking);
257     }
258 
259     /**
260      Tries to send bytes to the other side.
261      duration is how long to try for
262      recvBlocking controls whether or not to block on reception of a response.
263      This only matters when the protocol is request/response
264      Returns the response if in request mode, otherwise an empty byte slice.
265      */
266     ubyte[] trySend(T)(T[] data,
267                        TotalDuration totalDuration,
268                        RetryDuration retryDuration,
269                        Flag!"blocking" recvBlocking = Yes.blocking)
270     {
271         import std.exception: enforce;
272         import std.datetime: StopWatch, AutoStart, msecs;
273         import std.conv: text;
274         import core.thread: Thread;
275 
276         int sent;
277         auto sw = StopWatch(AutoStart.yes);
278         do {
279             sent = nn_send(_nanoSock, &data[0], data.length, flags(No.blocking));
280             if(sent != data.length) Thread.sleep(retryDuration); // play nice with other threads and the CPU
281         } while(sent != data.length && cast(Duration)sw.peek < totalDuration);
282 
283         enforce(sent == data.length,
284                 text("Expected to send ", data.length, " bytes but sent ", sent));
285 
286         return _protocol == Protocol.request ? receive(recvBlocking) : [];
287     }
288 
289     @("trySend")
290     unittest {
291         import std.datetime: seconds, msecs;
292 
293         enum uri = "ipc://try_send_test";
294         auto pull = NanoSocket(Protocol.pull, BindTo(uri));
295         auto push = NanoSocket(Protocol.push, ConnectTo(uri));
296         push.trySend("foo", TotalDuration(1.seconds), RetryDuration(10.msecs));
297     }
298 
299     /// connect
300     void connect(in string uri, in string file = __FILE__, in size_t line = __LINE__) {
301         import std.string: toStringz;
302         enforceNanoMsgRet(nn_connect(_nanoSock, uri.toStringz), file, line);
303         _uri = uri;
304         _connection = Connection.connected;
305     }
306 
307     /// bind
308     void bind(in string uri, in string file = __FILE__, in size_t line = __LINE__) {
309         import std.string: toStringz;
310         enforceNanoMsgRet(nn_bind(_nanoSock, uri.toStringz), file, line);
311         _uri = uri;
312         _connection = Connection.bound;
313     }
314 
315     /// get protocol
316     Protocol protocol() @safe @nogc pure const nothrow {
317         return _protocol;
318     }
319 
320     /// get URI
321     string uri() @safe @nogc pure const nothrow {
322         return _uri;
323     }
324 
325     /// toString
326     string toString() @safe pure const {
327         import std.conv: text;
328 
329         if(_connection == Connection.none)
330             return text(protocol);
331 
332         const connText = _connection == Connection.bound ? "@" : "@@";
333         return text(_protocol, connText, _uri);
334     }
335 
336 private:
337 
338     enum Connection {
339         none,
340         bound,
341         connected,
342     }
343 
344     int _nanoSock = INVALID_FD;
345     Protocol _protocol;
346     string _uri;
347     Connection _connection;
348 
349     void enforceNanoMsgRet(E)(lazy E expr, string file = __FILE__, size_t line = __LINE__) const {
350         import std.conv: text;
351         const value = expr();
352         if(value < 0)
353             throw new Exception(text("nanomsg expression failed with value ", value,
354                                      " errno ", nn_errno, ", error: ", nn_strerror(nn_errno)),
355                                 file,
356                                 line);
357     }
358 
359     // the int level and option values needed by the nanomsg C API
360     static struct OptionC {
361         int level;
362         int option;
363     }
364 
365     static OptionC toOptionC(Option option) @safe {
366         final switch(option) with(Option) {
367             case lingerMs:
368                 return OptionC(NN_SOL_SOCKET, NN_LINGER);
369 
370             case sendBufferSize:
371                 return OptionC(NN_SOL_SOCKET, NN_SNDBUF);
372 
373             case receiveBufferSize:
374                 return OptionC(NN_SOL_SOCKET, NN_RCVBUF);
375 
376             case receiveMaxSize:
377                 return OptionC(NN_SOL_SOCKET, NN_RCVMAXSIZE);
378 
379             case sendTimeoutMs:
380                 return OptionC(NN_SOL_SOCKET, NN_SNDTIMEO);
381 
382             case receiveTimeoutMs:
383                 return OptionC(NN_SOL_SOCKET, NN_RCVTIMEO);
384 
385             case reconnectIntervalMs:
386                 return OptionC(NN_SOL_SOCKET, NN_RECONNECT_IVL);
387 
388             case reconnectIntervalMax:
389                 return OptionC(NN_SOL_SOCKET, NN_RECONNECT_IVL_MAX);
390 
391             case sendPriority:
392                 return OptionC(NN_SOL_SOCKET, NN_SNDPRIO);
393 
394             case receivePriority:
395                 return OptionC(NN_SOL_SOCKET, NN_RCVPRIO);
396 
397             case ipv4Only:
398                 return OptionC(NN_SOL_SOCKET, NN_IPV4ONLY);
399 
400             case socketName:
401                 return OptionC(NN_SOL_SOCKET, NN_SOCKET_NAME);
402 
403             case timeToLive:
404                 return OptionC(NN_SOL_SOCKET, NN_TTL);
405 
406             case subscribeTopic:
407                 return OptionC(NN_SUB, NN_SUB_SUBSCRIBE);
408 
409             case unsubscribeTopic:
410                 return OptionC(NN_SUB, NN_SUB_UNSUBSCRIBE);
411 
412             case tcpNoDelay:
413                 return OptionC(NN_TCP, NN_TCP_NODELAY);
414 
415             case surveyorDeadlineMs:
416                 return OptionC(NN_SURVEYOR, NN_SURVEYOR_DEADLINE);
417         }
418     }
419 
420     void setOption(T)(int level, int option, ref T val) const if(isArray!T) {
421         enforceNanoMsgRet(nn_setsockopt(_nanoSock, level, option, val.ptr, val.length));
422     }
423 
424     void setOption(T)(int level, int option, T val) const if(!isArray!T) {
425         enforceNanoMsgRet(nn_setsockopt(_nanoSock, level, option, &val, val.sizeof));
426     }
427 
428     T getOption(T)(int level, int option) const if(isArray!T) {
429         import std.traits: Unqual;
430         import std.conv: to;
431 
432         // ElementType!string is dchar, and we don't want that,
433         // so instead we use this typeof
434         alias U = Unqual!(typeof(T.init[0]))[1000];
435         U val;
436         ulong length = val.length;
437         enforceNanoMsgRet(nn_getsockopt(_nanoSock, level, option, cast(void*)val.ptr, &length));
438         return val[0 .. length].to!T;
439     }
440 
441     T getOption(T)(int level, int option) const if(!isArray!T) {
442         import std.exception: enforce;
443         import std.conv: text;
444 
445         T val;
446         size_t length = T.sizeof;
447         enforceNanoMsgRet(nn_getsockopt(_nanoSock, level, option, cast(void*)&val, &length));
448         enforce(length == T.sizeof,
449                 text("getsockopt returned ", length, " but sizeof(", T.stringof, ") is ", T.sizeof));
450         return val;
451     }
452 
453     static int flags(Flag!"blocking" blocking) @safe pure {
454         return blocking ? 0 : NN_DONTWAIT;
455     }
456 }
457 
458 /// check nanomsg socket
459 void checkNanoSocket(T)() {
460     T s = T(NanoSocket.Protocol.subscribe, ConnectTo("foobar"));
461     s.send("foobar");
462     s.setOption(NanoSocket.Option.subscribeTopic, "topic");
463     s.setOption(NanoSocket.Option.receiveTimeoutMs, 100);
464     ubyte[] msg = s.receive(Yes.blocking);
465     s.send(msg);
466 }
467 enum isNanoSocket(T) = is(typeof(checkNanoSocket!T));
468 static assert(isNanoSocket!NanoSocket);
469 
470 
471 /**
472         Examples:
473 */
474 /// set/get option
475 ///
476 @("set/get option")
477 unittest {
478     auto sock = NanoSocket(NanoSocket.Protocol.subscribe);
479     sock.getOption!int(NanoSocket.Option.sendTimeoutMs).shouldEqual(-1);
480     sock.setOption(NanoSocket.Option.sendTimeoutMs, 42);
481     sock.getOption!int(NanoSocket.Option.sendTimeoutMs).shouldEqual(42);
482 }
483 
484 /// publish/subscribe
485 ///
486 @("pub/sub")
487 unittest {
488     const uri = "inproc://test_pubsub";
489     auto pub = NanoSocket(NanoSocket.Protocol.publish, const BindTo(uri));
490     auto sub = NanoSocket(NanoSocket.Protocol.subscribe, ConnectTo(uri));
491     sub.setOption(NanoSocket.Option.subscribeTopic, "foo");
492 
493     // messages that start with the subscription topic should be received
494     pub.send("foo/hello");
495     sub.receive(No.blocking).shouldEqual("foo/hello");
496 
497     // but not messages that don't
498     pub.send("bar/oops");
499     sub.receive(No.blocking).shouldBeEmpty;
500 
501     // after unsubscribing, messages are no longer received
502     sub.setOption(NanoSocket.Option.unsubscribeTopic, "foo");
503     pub.send("foo/hello");
504     sub.receive(No.blocking).shouldBeEmpty;
505 }
506 
507 /// request/response
508 ///
509 @("req/rep")
510 unittest {
511     import std.concurrency: spawnLinked, send;
512 
513     const uri = "inproc://test_reqrep";
514     const requester = NanoSocket(NanoSocket.Protocol.request, ConnectTo(uri));
515 
516     enum timeoutMs = 50;
517     requester.setOption(NanoSocket.Option.receiveTimeoutMs, timeoutMs);
518 
519     auto tid = spawnLinked(&responder, uri, timeoutMs);
520     requester.send("shake?").shouldEqual("shake? yep!");
521     tid.send(Stop());
522 }
523 
524 /**
525     Example:
526         utility function
527 */
528 version(unittest) {
529     import std.concurrency: Tid;
530 
531     /// utility struct for unit test
532     struct Respond { string value; }
533     /// utility struct for unit test
534     struct Stop {}
535 
536     /// utility function for unit tests/examples
537     void responder(in string uri, in int timeoutMs) {
538         import std.concurrency: receiveTimeout;
539         import std.datetime: msecs;
540 
541         const socket = NanoSocket(NanoSocket.Protocol.response, const BindTo(uri));
542         socket.setOption(NanoSocket.Option.receiveTimeoutMs, timeoutMs);
543 
544         for(bool done; !done;) {
545             receiveTimeout(10.msecs,
546                 (Stop _) {
547                     done = true;
548                 },
549             );
550 
551             const bytes = socket.receive(No.blocking);
552             if(bytes.length) socket.send(bytes ~ cast(ubyte[])" yep!");
553         }
554     }
555 }
556 
557 /**
558     Example:
559         push/pull over TCP
560 */
561 version(Windows) {} //FIXME
562 else {
563     @("push/pull over TCP")
564     unittest {
565         import core.thread: Thread, msecs;
566 
567         auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo("tcp://localhost:13248"));
568         auto push = NanoSocket(NanoSocket.Protocol.push, ConnectTo("tcp://localhost:13248"));
569 
570         enum numTimes = 10;
571 
572         foreach(i; 0 .. numTimes)
573             push.send("foo");
574 
575         Thread.sleep(50.msecs);
576 
577         foreach(i; 0 .. numTimes)
578             pull.receive(No.blocking).shouldEqual("foo");
579     }
580 }
581 
582 /**
583     Example:
584         push/pull over IPC
585 */
586 @HiddenTest /// it's here to show that this can fail, but it doesn't always
587 @("push/pull over IPC")
588 unittest {
589     auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_push_pull_test"));
590     auto push = NanoSocket(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_push_pull_test"));
591 
592     enum numTimes = 5;
593 
594     foreach(i; 0 .. numTimes)
595         push.send("foo");
596 
597     foreach(i; 0 .. numTimes)
598         pull.receive(No.blocking).shouldEqual("foo");
599 }
600 
601 
602 @("bind to several addresses at once")
603 unittest {
604     auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo(["ipc://nanomsg_ipc_push_pull_1",
605                                                              "ipc://nanomsg_ipc_push_pull_2"]));
606     pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10);
607 
608     auto push1 = NanoSocket(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_push_pull_1"));
609     auto push2 = NanoSocket(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_push_pull_2"));
610 
611     push1.setOption(NanoSocket.Option.sendTimeoutMs, 10);
612     push2.setOption(NanoSocket.Option.sendTimeoutMs, 10);
613 
614     push1.send("foo");
615     push2.send("bar");
616 
617     pull.receive.shouldEqual("foo");
618     pull.receive.shouldEqual("bar");
619 }
620 
621 
622 @("init NanoSocket after construction")
623 unittest {
624     NanoSocket pull;
625     NanoSocket push;
626 
627     pull.initialize(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_init_after"));
628     push.initialize(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_init_after"));
629 
630     pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10);
631     push.setOption(NanoSocket.Option.sendTimeoutMs, 10);
632 
633     push.send("foo");
634     push.send("bar");
635 
636     pull.receive.shouldEqual("foo");
637     pull.receive.shouldEqual("bar");
638 }
639 
640 @("Can init twice")
641 unittest {
642     NanoSocket pull;
643     pull.initialize(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_init_twice"));
644     pull.initialize(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_init_twice"));
645 }
646 
647 @("Non-initialised NanoSocket throws on send")
648 unittest {
649     enum uri = "ipc://nanomsg_init_send_throws";
650     auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo(uri));
651     NanoSocket push;
652     push.send("foo").shouldThrow;
653 }