1 module ut.wrap;
2 
3 
4 import nanomsg.wrap;
5 import unit_threaded;
6 
7 
8 @("send.try")
9 @safe unittest {
10     import std.datetime: seconds, msecs;
11 
12     enum uri = "ipc://try_send_test";
13     auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo(uri));
14     auto push = NanoSocket(NanoSocket.Protocol.push, ConnectTo(uri));
15     push.trySend("foo", TotalDuration(1.seconds), RetryDuration(10.msecs));
16 }
17 
18 
19 @("option")
20 @safe unittest {
21     auto sock = NanoSocket(NanoSocket.Protocol.subscribe);
22     sock.getOption!int(NanoSocket.Option.sendTimeoutMs).shouldEqual(-1);
23     sock.setOption(NanoSocket.Option.sendTimeoutMs, 42);
24     sock.getOption!int(NanoSocket.Option.sendTimeoutMs).shouldEqual(42);
25 }
26 
27 
28 @("pubsub")
29 @safe unittest {
30     const uri = "inproc://test_pubsub";
31     auto pub = NanoSocket(NanoSocket.Protocol.publish, const BindTo(uri));
32     auto sub = NanoSocket(NanoSocket.Protocol.subscribe, ConnectTo(uri));
33     sub.setOption(NanoSocket.Option.subscribeTopic, "foo");
34 
35     // messages that start with the subscription topic should be received
36     pub.send("foo/hello");
37     sub.receive(No.blocking).bytes.shouldEqual("foo/hello");
38 
39     // but not messages that don't
40     pub.send("bar/oops");
41     sub.receive(No.blocking).bytes.length.should == 0;
42 
43     // after unsubscribing, messages are no longer received
44     sub.setOption(NanoSocket.Option.unsubscribeTopic, "foo");
45     pub.send("foo/hello");
46     sub.receive(No.blocking).bytes.length.should == 0;
47 }
48 
49 
50 // ASAN doesn't like D threads
51 version(nanomsg_wrapper_asan) {}
52 else {
53     @("reqrep")
54         @safe unittest {
55         import std.concurrency: spawnLinked, send;
56 
57         const uri = "inproc://test_reqrep";
58         const requester = NanoSocket(NanoSocket.Protocol.request, ConnectTo(uri));
59 
60         enum timeoutMs = 50;
61         requester.setOption(NanoSocket.Option.receiveTimeoutMs, timeoutMs);
62 
63         auto tid = () @trusted { return spawnLinked(&responder, uri, timeoutMs); }();
64         requester.send("shake?").bytes.shouldEqual("shake? yep!");
65         () @trusted { tid.send(Stop()); }();
66     }
67 }
68 
69 
70 private struct Respond { string value; }
71 private struct Stop {}
72 
73 /// utility function for unit tests/examples
74 private void responder(in string uri, in int timeoutMs) {
75     import std.concurrency: receiveTimeout;
76     import std.datetime: msecs;
77 
78     const socket = NanoSocket(NanoSocket.Protocol.response, const BindTo(uri));
79     socket.setOption(NanoSocket.Option.receiveTimeoutMs, timeoutMs);
80 
81     for(bool done; !done;) {
82         receiveTimeout(10.msecs,
83                        (Stop _) {
84                            done = true;
85                        },
86             );
87 
88         const resp = socket.receive(No.blocking);
89         if(resp.bytes.length) socket.send(resp.bytes ~ cast(ubyte[])" yep!");
90     }
91 }
92 
93 
94 version(Windows) {} //FIXME
95 else {
96     @("push.TCP")
97     @safe unittest {
98         import core.thread: Thread, msecs;
99 
100         auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo("tcp://localhost:13248"));
101         auto push = NanoSocket(NanoSocket.Protocol.push, ConnectTo("tcp://localhost:13248"));
102 
103         enum numTimes = 10;
104 
105         foreach(i; 0 .. numTimes)
106             push.send("foo");
107 
108         () @trusted { Thread.sleep(50.msecs); }();
109 
110         foreach(i; 0 .. numTimes)
111             pull.receive(No.blocking).bytes.shouldEqual("foo");
112     }
113 }
114 
115 
116 @HiddenTest /// it's here to show that this can fail, but it doesn't always
117 @("push.IPC")
118 @safe unittest {
119     auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_push_pull_test"));
120     auto push = NanoSocket(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_push_pull_test"));
121 
122     enum numTimes = 5;
123 
124     foreach(i; 0 .. numTimes)
125         push.send("foo");
126 
127     foreach(i; 0 .. numTimes)
128         pull.receive(No.blocking).bytes.shouldEqual("foo");
129 }
130 
131 
132 @("bind.several addresses at once")
133 @safe unittest {
134     auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo(["ipc://nanomsg_ipc_push_pull_1",
135                                                              "ipc://nanomsg_ipc_push_pull_2"]));
136     pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10);
137 
138     auto push1 = NanoSocket(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_push_pull_1"));
139     auto push2 = NanoSocket(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_push_pull_2"));
140 
141     push1.setOption(NanoSocket.Option.sendTimeoutMs, 10);
142     push2.setOption(NanoSocket.Option.sendTimeoutMs, 10);
143 
144     push1.send("foo");
145     push2.send("bar");
146 
147     pull.receive.bytes.shouldEqual("foo");
148     pull.receive.bytes.shouldEqual("bar");
149 }
150 
151 
152 @("init.after.construction")
153 @safe unittest {
154     NanoSocket pull;
155     NanoSocket push;
156 
157     pull.initialize(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_init_after"));
158     push.initialize(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_init_after"));
159 
160     pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10);
161     push.setOption(NanoSocket.Option.sendTimeoutMs, 10);
162 
163     push.send("foo");
164     push.send("bar");
165 
166     pull.receive.bytes.shouldEqual("foo");
167     pull.receive.bytes.shouldEqual("bar");
168 }
169 
170 
171 @("init.twice")
172 @safe unittest {
173     NanoSocket pull;
174     pull.initialize(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_init_twice"));
175     pull.initialize(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_init_twice"));
176 }
177 
178 
179 @Tags("notravis")
180 @("init.send throws if not initialised")
181 @safe unittest {
182     enum uri = "ipc://nanomsg_init_send_throws";
183     auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo(uri));
184     NanoSocket push;
185     push.send("foo").shouldThrow;
186 }
187 
188 
189 @("receive.buffer.nothing")
190 @safe unittest {
191     NanoSocket pull;
192     pull.initialize(NanoSocket.Protocol.pull, BindTo("inproc://nanomsg_receive_buffer"));
193     ubyte[1024] buf;
194     scope bytes = pull.receive(buf, No.blocking);
195     bytes.length.should == 0;
196 }
197 
198 
199 @("receive.buffer.something")
200 @safe unittest {
201 
202     import std.range: repeat, take;
203     NanoSocket pull, push;
204 
205     enum uri = "inproc://nanomsg_big_receive";
206     pull.initialize(NanoSocket.Protocol.pull, BindTo(uri));
207     push.initialize(NanoSocket.Protocol.push, ConnectTo(uri));
208 
209     pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10);
210     push.setOption(NanoSocket.Option.sendTimeoutMs, 10);
211 
212     enum numBytes = 32_000;
213     push.send(new ubyte[numBytes]);
214     pull.receive.bytes.toBytes.shouldEqual(0.repeat.take(numBytes));
215 }
216 
217 
218 @("receive.nogc.implicit")
219 @safe unittest {
220     enum uri = "inproc://nanomsg_receive_nogc";
221 
222     NanoSocket pull;
223     pull.initialize(NanoSocket.Protocol.pull, BindTo(uri));
224     pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10);
225 
226     NanoSocket push;
227     push.initialize(NanoSocket.Protocol.push, ConnectTo(uri));
228     push.setOption(NanoSocket.Option.sendTimeoutMs, 10);
229 
230     push.send("Don't need the GC to receive");
231     const buf = pull.receive;
232     const str = () @trusted { return cast(const(char)[]) buf.bytes.dup; }();
233     str.shouldEqual("Don't need the GC to receive");
234 }
235 
236 @("access underlying socket")
237 unittest {
238 
239     import std.range: iota;
240     import std.array : array;
241     import nanomsg.bindings : nn_send, nn_recv, nn_freemsg, NN_MSG;
242 
243     NanoSocket pull, push;
244 
245     enum uri = "inproc://nanomsg_raw_sock";
246     pull.initialize(NanoSocket.Protocol.pull, BindTo(uri));
247     push.initialize(NanoSocket.Protocol.push, ConnectTo(uri));
248 
249     pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10);
250     push.setOption(NanoSocket.Option.sendTimeoutMs, 10);
251 
252     auto sourceData = iota(ubyte(0), ubyte(255)).array;
253     push.nanoSock.nn_send(sourceData.ptr, sourceData.length, 0);
254     void* res;
255     auto len = pull.nanoSock.nn_recv(&res, NN_MSG, 0);
256     assert(len == 255);
257     scope (exit) nn_freemsg(res);
258     (cast(ubyte*)res)[0 .. 255].shouldEqual(sourceData);
259 }
260 
261 ubyte[] toBytes(T)(T bytes) @trusted {
262     return cast(ubyte[]) bytes.dup;
263 }
264