1 module ut.wrap;
2 
3 
4 import nanomsg.wrap;
5 import unit_threaded;
6 
7 
8 @("send.try")
9 @safe unittest {
10     import std.datetime: seconds, msecs;
11 
12     enum uri = "ipc://try_send_test";
13     auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo(uri));
14     auto push = NanoSocket(NanoSocket.Protocol.push, ConnectTo(uri));
15     push.trySend("foo", TotalDuration(1.seconds), RetryDuration(10.msecs));
16 }
17 
18 
19 @("option")
20 @safe unittest {
21     auto sock = NanoSocket(NanoSocket.Protocol.subscribe);
22     sock.getOption!int(NanoSocket.Option.sendTimeoutMs).shouldEqual(-1);
23     sock.setOption(NanoSocket.Option.sendTimeoutMs, 42);
24     sock.getOption!int(NanoSocket.Option.sendTimeoutMs).shouldEqual(42);
25 }
26 
27 
28 @("pubsub")
29 @safe unittest {
30     const uri = "inproc://test_pubsub";
31     auto pub = NanoSocket(NanoSocket.Protocol.publish, const BindTo(uri));
32     auto sub = NanoSocket(NanoSocket.Protocol.subscribe, ConnectTo(uri));
33     sub.setOption(NanoSocket.Option.subscribeTopic, "foo");
34 
35     // messages that start with the subscription topic should be received
36     pub.send("foo/hello");
37     sub.receive(No.blocking).bytes.shouldEqual("foo/hello");
38 
39     // but not messages that don't
40     pub.send("bar/oops");
41     sub.receive(No.blocking).bytes.length.should == 0;
42 
43     // after unsubscribing, messages are no longer received
44     sub.setOption(NanoSocket.Option.unsubscribeTopic, "foo");
45     pub.send("foo/hello");
46     sub.receive(No.blocking).bytes.length.should == 0;
47 }
48 
49 
50 // ASAN doesn't like D threads
51 version(nanomsg_wrapper_asan) {}
52 else {
53     @("reqrep")
54         @safe unittest {
55         import std.concurrency: spawnLinked, send;
56 
57         const uri = "inproc://test_reqrep";
58         const requester = NanoSocket(NanoSocket.Protocol.request, ConnectTo(uri));
59 
60         enum timeoutMs = 50;
61         requester.setOption(NanoSocket.Option.receiveTimeoutMs, timeoutMs);
62 
63         auto tid = () @trusted { return spawnLinked(&responder, uri, timeoutMs); }();
64         requester.send("shake?").bytes.shouldEqual("shake? yep!");
65         () @trusted { tid.send(Stop()); }();
66     }
67 }
68 
69 
70 private struct Respond { string value; }
71 private struct Stop {}
72 
73 /// utility function for unit tests/examples
74 private void responder(in string uri, in int timeoutMs) {
75     import std.concurrency: receiveTimeout;
76     import std.datetime: msecs;
77 
78     const socket = NanoSocket(NanoSocket.Protocol.response, const BindTo(uri));
79     socket.setOption(NanoSocket.Option.receiveTimeoutMs, timeoutMs);
80 
81     for(bool done; !done;) {
82         receiveTimeout(10.msecs,
83                        (Stop _) {
84                            done = true;
85                        },
86             );
87 
88         const resp = socket.receive(No.blocking);
89         if(resp.bytes.length) socket.send(resp.bytes ~ cast(ubyte[])" yep!");
90     }
91 }
92 
93 
94 version(Windows) {} //FIXME
95 else {
96     @("push.TCP")
97     @safe unittest {
98         import core.thread: Thread, msecs;
99 
100         auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo("tcp://localhost:13248"));
101         auto push = NanoSocket(NanoSocket.Protocol.push, ConnectTo("tcp://localhost:13248"));
102 
103         enum numTimes = 10;
104 
105         foreach(i; 0 .. numTimes)
106             push.send("foo");
107 
108         () @trusted { Thread.sleep(50.msecs); }();
109 
110         foreach(i; 0 .. numTimes)
111             pull.receive(No.blocking).bytes.shouldEqual("foo");
112     }
113 }
114 
115 
116 @HiddenTest /// it's here to show that this can fail, but it doesn't always
117 @("push.IPC")
118 @safe unittest {
119     auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_push_pull_test"));
120     auto push = NanoSocket(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_push_pull_test"));
121 
122     enum numTimes = 5;
123 
124     foreach(i; 0 .. numTimes)
125         push.send("foo");
126 
127     foreach(i; 0 .. numTimes)
128         pull.receive(No.blocking).bytes.shouldEqual("foo");
129 }
130 
131 
132 @("bind.several addresses at once")
133 @safe unittest {
134     auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo(["ipc://nanomsg_ipc_push_pull_1",
135                                                              "ipc://nanomsg_ipc_push_pull_2"]));
136     pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10);
137 
138     auto push1 = NanoSocket(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_push_pull_1"));
139     auto push2 = NanoSocket(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_push_pull_2"));
140 
141     push1.setOption(NanoSocket.Option.sendTimeoutMs, 10);
142     push2.setOption(NanoSocket.Option.sendTimeoutMs, 10);
143 
144     push1.send("foo");
145     push2.send("bar");
146 
147     pull.receive.bytes.shouldEqual("foo");
148     pull.receive.bytes.shouldEqual("bar");
149 }
150 
151 
152 @("init.after.construction")
153 @safe unittest {
154     NanoSocket pull;
155     NanoSocket push;
156 
157     pull.initialize(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_init_after"));
158     push.initialize(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_init_after"));
159 
160     pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10);
161     push.setOption(NanoSocket.Option.sendTimeoutMs, 10);
162 
163     push.send("foo");
164     push.send("bar");
165 
166     pull.receive.bytes.shouldEqual("foo");
167     pull.receive.bytes.shouldEqual("bar");
168 }
169 
170 
171 @("init.twice")
172 @safe unittest {
173     NanoSocket pull;
174     pull.initialize(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_init_twice"));
175     pull.initialize(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_init_twice"));
176 }
177 
178 
179 @("init.send throws if not initialised")
180 @safe unittest {
181     enum uri = "ipc://nanomsg_init_send_throws";
182     auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo(uri));
183     NanoSocket push;
184     push.send("foo").shouldThrow;
185 }
186 
187 
188 @("receive.buffer.nothing")
189 @safe unittest {
190     NanoSocket pull;
191     pull.initialize(NanoSocket.Protocol.pull, BindTo("inproc://nanomsg_receive_buffer"));
192     ubyte[1024] buf;
193     scope bytes = pull.receive(buf, No.blocking);
194     bytes.length.should == 0;
195 }
196 
197 
198 @("receive.buffer.something")
199 @safe unittest {
200 
201     import std.range: repeat, take;
202     NanoSocket pull, push;
203 
204     enum uri = "inproc://nanomsg_big_receive";
205     pull.initialize(NanoSocket.Protocol.pull, BindTo(uri));
206     push.initialize(NanoSocket.Protocol.push, ConnectTo(uri));
207 
208     pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10);
209     push.setOption(NanoSocket.Option.sendTimeoutMs, 10);
210 
211     enum numBytes = 32_000;
212     push.send(new ubyte[numBytes]);
213     pull.receive.bytes.toBytes.shouldEqual(0.repeat.take(numBytes));
214 }
215 
216 
217 @("receive.nogc.implicit")
218 @safe unittest {
219     enum uri = "inproc://nanomsg_receive_nogc";
220 
221     NanoSocket pull;
222     pull.initialize(NanoSocket.Protocol.pull, BindTo(uri));
223     pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10);
224 
225     NanoSocket push;
226     push.initialize(NanoSocket.Protocol.push, ConnectTo(uri));
227     push.setOption(NanoSocket.Option.sendTimeoutMs, 10);
228 
229     push.send("Don't need the GC to receive");
230     const buf = pull.receive;
231     const str = () @trusted { return cast(const(char)[]) buf.bytes.dup; }();
232     str.shouldEqual("Don't need the GC to receive");
233 }
234 
235 
236 ubyte[] toBytes(T)(T bytes) @trusted {
237     return cast(ubyte[]) bytes.dup;
238 }