1 /**
2     nanomsg is a socket library that provides several common communication patterns. It aims to make the networking layer fast, scalable, and easy to use.
3     Implemented in C, it works on a wide range of operating systems with no further dependencies.
4 
5     This module implements a convenience wrapper API for nanomsg
6 
7     Authors: Laeeth Isharc and Atila Neves (Kaleidic Associates Advisory Limited)
8 
9     Example code:
10         http://nanomsg.code.kaleidic.io/nanomsg.wrap.responder.html
11         http://nanomsg.code.kaleidic.io/nanomsg.wrap.checkNanoSocket.html
12         http://nanomsg.code.kaleidic.io/examples/nanomsg.examples.html
13 
14  */
15 module nanomsg.wrap;
16 
17 
18 import nanomsg.bindings;
19 public import std.typecons: Yes, No; // to facilitate using send, receive
20 
21 version(unittest)
22     import unit_threaded;
23 else
24     enum HiddenTest;
25 
26 
27 /// wrapper for a string uri to connect to
28 struct ConnectTo {
29     string uri;
30 }
31 
32 /// wrapper for a string uri to bind to
33 struct BindTo {
34 
35     this(inout(string) uri) inout @safe pure {
36         this([uri]);
37     }
38 
39     this(inout(string)[] uris) inout @safe pure {
40         this.uris = uris;
41     }
42 
43     string[] uris;
44 }
45 
46 /**
47 
48     NanoSocket - high level wrapper for a nanomsg socket
49 
50 */
51 struct NanoSocket {
52 
53     import std.traits: isArray;
54     import std.typecons: Flag;
55     import std.datetime: Duration;
56 
57     /// nanomsg protocol
58     enum Protocol {
59         request,
60         response,
61         subscribe,
62         publish,
63         pull,
64         push,
65         pair,
66         surveyor,
67         respondent,
68         bus,
69     }
70 
71     /// nanomsg socket options
72     enum Option {
73         lingerMs, /// How long to try and send pending messages after nn_close. -1 means infinite
74         sendBufferSize, // Size of the send buffer in bytes
75         receiveBufferSize, // Size of the receive buffer in bytes
76         receiveMaxSize, /// Maximum message size that can be received, in bytes
77         sendTimeoutMs, /// How long in milliseconds it takes for send to timeout
78         receiveTimeoutMs, /// How long in milliseconds it takes for receive to timeout
79         reconnectIntervalMs, /// How long to wait to re-establish connection
80         reconnectIntervalMax, /// Maximum reconnect interval
81         sendPriority, /// Outbound priority for endpoints added to socket
82         receivePriority, /// Inbout priority for endpoints added to socket
83         ipv4Only, /// Self-explanatory
84         socketName, /// Socket name for error reporting and statistics
85         timeToLive, /// Number of hops before message is dropped
86         subscribeTopic, /// Subscribe to topic
87         unsubscribeTopic, /// Unsubscribe to topic
88         tcpNoDelay, /// Disables Nagle's algorithm
89         surveyorDeadlineMs, /// How long to wait for responses in milliseconds
90     }
91 
92     /// this(this) disabled to avoid sockets being destroyed
93     @disable this(this);
94 
95     /// invalid FD
96     enum INVALID_FD = -1;
97 
98     /// constructor
99     this(in Protocol protocol, in int domain = AF_SP) @safe {
100         init(protocol, domain);
101     }
102 
103     /// constructor
104     this(in Protocol protocol, in BindTo bindTo, int domain = AF_SP) @safe {
105         init(protocol, bindTo, domain);
106     }
107 
108     /// constructor
109     this(in Protocol protocol, in ConnectTo connectTo, int domain = AF_SP) @safe {
110         init(protocol, connectTo, domain);
111     }
112 
113     /// destructor
114     ~this() @safe nothrow {
115         close;
116     }
117 
118     /// Same as the revelant constructor, can be used on exisiting objects
119     void init(in Protocol protocol, in int domain = AF_SP) @trusted {
120         int protocolToInt(Protocol protocol) {
121             final switch(protocol) with(Protocol) {
122                 case request:
123                     return NN_REQ;
124                 case response:
125                     return NN_REP;
126                 case publish:
127                     return NN_PUB;
128                 case subscribe:
129                     return NN_SUB;
130                 case pull:
131                     return NN_PULL;
132                 case push:
133                     return NN_PUSH;
134                 case pair:
135                     return NN_PAIR;
136                 case surveyor:
137                     return NN_SURVEYOR;
138                 case respondent:
139                     return NN_RESPONDENT;
140                 case bus:
141                     return NN_BUS;
142                 }
143         }
144 
145         close; // init can be called twice
146         _nanoSock = nn_socket(domain, protocolToInt(protocol));
147         _protocol = protocol;
148         enforceNanoMsgRet(_nanoSock);
149     }
150 
151     /// Same as the revelant constructor, can be used on exisiting objects
152     void init(in Protocol protocol, in BindTo bindTo, int domain = AF_SP) @trusted {
153         import std.string: replace;
154 
155         init(protocol, domain);
156 
157         // this is so it's easy to specify the same string
158         // for both ends of the socket
159         foreach(uri; bindTo.uris) {
160             bind(uri.replace("localhost", "*"));
161         }
162     }
163 
164     /// Same as the revelant constructor, can be used on exisiting objects
165     void init(in Protocol protocol, in ConnectTo connectTo, int domain = AF_SP) @trusted {
166         init(protocol, domain);
167         connect(connectTo.uri);
168 
169         version(Windows) {
170             // on Windows sometimes the socket tries to send before the TCP handshake
171             import core.thread;
172             Thread.sleep(100.msecs);
173         }
174     }
175 
176     /// close socket
177     void close() @trusted nothrow {
178         if(_nanoSock != INVALID_FD) {
179             _nanoSock.nn_close;
180         }
181     }
182 
183     /// set socket option to a value
184     ref inout(NanoSocket) setOption(T)(Option option, T val) inout {
185         const optionC = toOptionC(option);
186         setOption(optionC.level, optionC.option, val);
187         return this;
188     }
189 
190     /// get socket option value
191     T getOption(T)(Option option) const {
192         const optionC = toOptionC(option);
193         return getOption!T(optionC.level, optionC.option);
194     }
195 
196     /// receive
197     ubyte[] receive(int BUF_SIZE = 1024)(Flag!"blocking" blocking = Yes.blocking) const {
198         import std.exception: enforce;
199         import std.conv: text;
200         import core.stdc.errno: EAGAIN, EINTR;
201 
202         ubyte[BUF_SIZE] buf;
203         const flags = blocking ? 0 : NN_DONTWAIT;
204         const numBytes = nn_recv(_nanoSock, buf.ptr, buf.length, flags);
205 
206         if(blocking) enforceNanoMsgRet(numBytes);
207 
208         return numBytes >= 0 ? buf[0 .. numBytes].dup : [];
209     }
210 
211     /**
212        Sends the bytes as expected. If the protocol is Request, then returns
213        the response, otherwise returns an empty array.
214      */
215     ubyte[] send(T)(T[] data,
216                     Flag!"blocking" blocking = Yes.blocking,
217                     in string file = __FILE__,
218                     in size_t line = __LINE__)
219         const
220     {
221         import std.conv: text;
222 
223         const sent = nn_send(_nanoSock, data.ptr, data.length, flags(blocking));
224         if(blocking) {
225             if(sent != data.length)
226                 throw new Exception(text("Expected to send ", data.length, " bytes but sent ", sent), file, line);
227         }
228 
229         return _protocol == Protocol.request ? receive(blocking) : [];
230     }
231 
232     /**
233      Tries to send bytes to the other side.
234      duration is how long to try for
235      recvBlocking controls whether or not to block on reception of a response.
236      This only matters when the protocol is request/response
237      Returns the response if in request mode, otherwise an empty byte slice.
238      */
239     ubyte[] trySend(T)(T[] data, Duration duration, Flag!"blocking" recvBlocking = Yes.blocking) {
240         import std.exception: enforce;
241         import std.datetime: StopWatch, AutoStart, msecs;
242         import std.conv: text;
243         import core.thread: Thread;
244 
245         int sent;
246         auto sw = StopWatch(AutoStart.yes);
247         do {
248             sent = nn_send(_nanoSock, data.ptr, data.length, flags(No.blocking));
249             if(sent != data.length) Thread.sleep(10.msecs); // play nice with other threads and the CPU
250         } while(sent != data.length && cast(Duration)sw.peek < duration);
251 
252         enforce(sent == data.length,
253                 text("Expected to send ", data.length, " bytes but sent ", sent));
254 
255         return _protocol == Protocol.request ? receive(recvBlocking) : [];
256     }
257 
258     /// connect
259     void connect(in string uri, in string file = __FILE__, in size_t line = __LINE__) {
260         import std.string: toStringz;
261         enforceNanoMsgRet(nn_connect(_nanoSock, uri.toStringz), file, line);
262         _uri = uri;
263         _connection = Connection.connected;
264     }
265 
266     /// bind
267     void bind(in string uri, in string file = __FILE__, in size_t line = __LINE__) {
268         import std.string: toStringz;
269         enforceNanoMsgRet(nn_bind(_nanoSock, uri.toStringz), file, line);
270         _uri = uri;
271         _connection = Connection.bound;
272     }
273 
274     /// get protocol
275     Protocol protocol() @safe @nogc pure const nothrow {
276         return _protocol;
277     }
278 
279     /// get URI
280     string uri() @safe @nogc pure const nothrow {
281         return _uri;
282     }
283 
284     /// toString
285     string toString() @safe pure const {
286         import std.conv: text;
287 
288         if(_connection == Connection.none)
289             return text(protocol);
290 
291         const connText = _connection == Connection.bound ? "@" : "@@";
292         return text(_protocol, connText, _uri);
293     }
294 
295 private:
296 
297     enum Connection {
298         none,
299         bound,
300         connected,
301     }
302 
303     int _nanoSock = INVALID_FD;
304     Protocol _protocol;
305     string _uri;
306     Connection _connection;
307 
308     void enforceNanoMsgRet(E)(lazy E expr, string file = __FILE__, size_t line = __LINE__) const {
309         import std.conv: text;
310         const value = expr();
311         if(value < 0)
312             throw new Exception(text("nanomsg expression failed with value ", value,
313                                      " errno ", nn_errno, ", error: ", nn_strerror(nn_errno)),
314                                 file,
315                                 line);
316     }
317 
318     // the int level and option values needed by the nanomsg C API
319     static struct OptionC {
320         int level;
321         int option;
322     }
323 
324     static OptionC toOptionC(Option option) @safe {
325         final switch(option) with(Option) {
326             case lingerMs:
327                 return OptionC(NN_SOL_SOCKET, NN_LINGER);
328 
329             case sendBufferSize:
330                 return OptionC(NN_SOL_SOCKET, NN_SNDBUF);
331 
332             case receiveBufferSize:
333                 return OptionC(NN_SOL_SOCKET, NN_RCVBUF);
334 
335             case receiveMaxSize:
336                 return OptionC(NN_SOL_SOCKET, NN_RCVMAXSIZE);
337 
338             case sendTimeoutMs:
339                 return OptionC(NN_SOL_SOCKET, NN_SNDTIMEO);
340 
341             case receiveTimeoutMs:
342                 return OptionC(NN_SOL_SOCKET, NN_RCVTIMEO);
343 
344             case reconnectIntervalMs:
345                 return OptionC(NN_SOL_SOCKET, NN_RECONNECT_IVL);
346 
347             case reconnectIntervalMax:
348                 return OptionC(NN_SOL_SOCKET, NN_RECONNECT_IVL_MAX);
349 
350             case sendPriority:
351                 return OptionC(NN_SOL_SOCKET, NN_SNDPRIO);
352 
353             case receivePriority:
354                 return OptionC(NN_SOL_SOCKET, NN_RCVPRIO);
355 
356             case ipv4Only:
357                 return OptionC(NN_SOL_SOCKET, NN_IPV4ONLY);
358 
359             case socketName:
360                 return OptionC(NN_SOL_SOCKET, NN_SOCKET_NAME);
361 
362             case timeToLive:
363                 return OptionC(NN_SOL_SOCKET, NN_TTL);
364 
365             case subscribeTopic:
366                 return OptionC(NN_SUB, NN_SUB_SUBSCRIBE);
367 
368             case unsubscribeTopic:
369                 return OptionC(NN_SUB, NN_SUB_UNSUBSCRIBE);
370 
371             case tcpNoDelay:
372                 return OptionC(NN_TCP, NN_TCP_NODELAY);
373 
374             case surveyorDeadlineMs:
375                 return OptionC(NN_SURVEYOR, NN_SURVEYOR_DEADLINE);
376         }
377     }
378 
379     void setOption(T)(int level, int option, ref T val) const if(isArray!T) {
380         enforceNanoMsgRet(nn_setsockopt(_nanoSock, level, option, val.ptr, val.length));
381     }
382 
383     void setOption(T)(int level, int option, T val) const if(!isArray!T) {
384         enforceNanoMsgRet(nn_setsockopt(_nanoSock, level, option, &val, val.sizeof));
385     }
386 
387     T getOption(T)(int level, int option) const if(isArray!T) {
388         import std.traits: Unqual;
389         import std.conv: to;
390 
391         // ElementType!string is dchar, and we don't want that,
392         // so instead we use this typeof
393         alias U = Unqual!(typeof(T.init[0]))[1000];
394         U val;
395         ulong length = val.length;
396         enforceNanoMsgRet(nn_getsockopt(_nanoSock, level, option, cast(void*)val.ptr, &length));
397         return val[0 .. length].to!T;
398     }
399 
400     T getOption(T)(int level, int option) const if(!isArray!T) {
401         import std.exception: enforce;
402         import std.conv: text;
403 
404         T val;
405         ulong length = T.sizeof;
406         enforceNanoMsgRet(nn_getsockopt(_nanoSock, level, option, &val, &length));
407         enforce(length == T.sizeof,
408                 text("getsockopt returned ", length, " but sizeof(", T.stringof, ") is ", T.sizeof));
409         return val;
410     }
411 
412     static int flags(Flag!"blocking" blocking) @safe pure {
413         return blocking ? 0 : NN_DONTWAIT;
414     }
415 }
416 
417 /// check nanomsg socket
418 void checkNanoSocket(T)() {
419     T s = T(NanoSocket.Protocol.subscribe, ConnectTo("foobar"));
420     s.send("foobar");
421     s.setOption(NanoSocket.Option.subscribeTopic, "topic");
422     s.setOption(NanoSocket.Option.receiveTimeoutMs, 100);
423     ubyte[] msg = s.receive(Yes.blocking);
424     s.send(msg);
425 }
426 enum isNanoSocket(T) = is(typeof(checkNanoSocket!T));
427 static assert(isNanoSocket!NanoSocket);
428 
429 /**
430         Examples:
431 */
432 /// set/get option
433 ///
434 @("set/get option")
435 unittest {
436     auto sock = NanoSocket(NanoSocket.Protocol.subscribe);
437     sock.getOption!int(NanoSocket.Option.sendTimeoutMs).shouldEqual(-1);
438     sock.setOption(NanoSocket.Option.sendTimeoutMs, 42);
439     sock.getOption!int(NanoSocket.Option.sendTimeoutMs).shouldEqual(42);
440 }
441 
442 /// publish/subscribe
443 ///
444 @("pub/sub")
445 unittest {
446     const uri = "inproc://test_pubsub";
447     auto pub = NanoSocket(NanoSocket.Protocol.publish, BindTo(uri));
448     auto sub = NanoSocket(NanoSocket.Protocol.subscribe, ConnectTo(uri));
449     sub.setOption(NanoSocket.Option.subscribeTopic, "foo");
450 
451     // messages that start with the subscription topic should be received
452     pub.send("foo/hello");
453     sub.receive(No.blocking).shouldEqual("foo/hello");
454 
455     // but not messages that don't
456     pub.send("bar/oops");
457     sub.receive(No.blocking).shouldBeEmpty;
458 
459     // after unsubscribing, messages are no longer received
460     sub.setOption(NanoSocket.Option.unsubscribeTopic, "foo");
461     pub.send("foo/hello");
462     sub.receive(No.blocking).shouldBeEmpty;
463 }
464 
465 /// request/response
466 ///
467 @("req/rep")
468 unittest {
469     import std.concurrency: spawnLinked, send;
470 
471     const uri = "inproc://test_reqrep";
472     const requester = NanoSocket(NanoSocket.Protocol.request, ConnectTo(uri));
473 
474     enum timeoutMs = 50;
475     requester.setOption(NanoSocket.Option.receiveTimeoutMs, timeoutMs);
476 
477     auto tid = spawnLinked(&responder, uri, timeoutMs);
478     requester.send("shake?").shouldEqual("shake? yep!");
479     tid.send(Stop());
480 }
481 
482 /**
483     Example:
484         utility function
485 */
486 version(unittest) {
487     import std.concurrency: Tid;
488 
489     /// utility struct for unit test
490     struct Respond { string value; }
491     /// utility struct for unit test
492     struct Stop {}
493 
494     /// utility function for unit tests/examples
495     void responder(in string uri, in int timeoutMs) {
496         import std.concurrency: receiveTimeout;
497         import std.datetime: msecs;
498 
499         const socket = NanoSocket(NanoSocket.Protocol.response, BindTo(uri));
500         socket.setOption(NanoSocket.Option.receiveTimeoutMs, timeoutMs);
501 
502         for(bool done; !done;) {
503             receiveTimeout(10.msecs,
504                 (Stop _) {
505                     done = true;
506                 },
507             );
508 
509             const bytes = socket.receive(No.blocking);
510             if(bytes.length) socket.send(bytes ~ cast(ubyte[])" yep!");
511         }
512     }
513 }
514 
515 /**
516     Example:
517         push/pull over TCP
518 */
519 @("push/pull over TCP")
520 unittest {
521     import core.thread: Thread, msecs;
522 
523     auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo("tcp://localhost:13248"));
524     auto push = NanoSocket(NanoSocket.Protocol.push, ConnectTo("tcp://localhost:13248"));
525 
526     enum numTimes = 10;
527 
528     foreach(i; 0 .. numTimes)
529         push.send("foo");
530 
531     Thread.sleep(50.msecs);
532 
533     foreach(i; 0 .. numTimes)
534         pull.receive(No.blocking).shouldEqual("foo");
535 }
536 
537 /**
538     Example:
539         push/pull over IPC
540 */
541 @HiddenTest /// it's here to show that this can fail, but it doesn't always
542 @("push/pull over IPC")
543 unittest {
544     auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_push_pull_test"));
545     auto push = NanoSocket(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_push_pull_test"));
546 
547     enum numTimes = 5;
548 
549     foreach(i; 0 .. numTimes)
550         push.send("foo");
551 
552     foreach(i; 0 .. numTimes)
553         pull.receive(No.blocking).shouldEqual("foo");
554 }
555 
556 
557 @("bind to several addresses at once")
558 unittest {
559     auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo(["ipc://nanomsg_ipc_push_pull_1",
560                                                              "ipc://nanomsg_ipc_push_pull_2"]));
561     pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10);
562 
563     auto push1 = NanoSocket(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_push_pull_1"));
564     auto push2 = NanoSocket(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_push_pull_2"));
565 
566     push1.setOption(NanoSocket.Option.sendTimeoutMs, 10);
567     push2.setOption(NanoSocket.Option.sendTimeoutMs, 10);
568 
569     push1.send("foo");
570     push2.send("bar");
571 
572     pull.receive.shouldEqual("foo");
573     pull.receive.shouldEqual("bar");
574 }
575 
576 
577 @("init NanoSocket after construction")
578 unittest {
579     NanoSocket pull;
580     NanoSocket push;
581 
582     pull.init(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_init_after"));
583     push.init(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_init_after"));
584 
585     pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10);
586     push.setOption(NanoSocket.Option.sendTimeoutMs, 10);
587 
588     push.send("foo");
589     push.send("bar");
590 
591     pull.receive.shouldEqual("foo");
592     pull.receive.shouldEqual("bar");
593 }
594 
595 @("Can init twice")
596 unittest {
597     NanoSocket pull;
598     pull.init(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_init_twice"));
599     pull.init(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_init_twice"));
600 }