1 module ut.wrap; 2 3 4 import nanomsg.wrap; 5 import unit_threaded; 6 7 8 @("send.try") 9 @safe unittest { 10 import std.datetime: seconds, msecs; 11 12 enum uri = "ipc://try_send_test"; 13 auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo(uri)); 14 auto push = NanoSocket(NanoSocket.Protocol.push, ConnectTo(uri)); 15 push.trySend("foo", TotalDuration(1.seconds), RetryDuration(10.msecs)); 16 } 17 18 19 @("option") 20 @safe unittest { 21 auto sock = NanoSocket(NanoSocket.Protocol.subscribe); 22 sock.getOption!int(NanoSocket.Option.sendTimeoutMs).shouldEqual(-1); 23 sock.setOption(NanoSocket.Option.sendTimeoutMs, 42); 24 sock.getOption!int(NanoSocket.Option.sendTimeoutMs).shouldEqual(42); 25 } 26 27 28 @("pubsub") 29 @safe unittest { 30 const uri = "inproc://test_pubsub"; 31 auto pub = NanoSocket(NanoSocket.Protocol.publish, const BindTo(uri)); 32 auto sub = NanoSocket(NanoSocket.Protocol.subscribe, ConnectTo(uri)); 33 sub.setOption(NanoSocket.Option.subscribeTopic, "foo"); 34 35 // messages that start with the subscription topic should be received 36 pub.send("foo/hello"); 37 sub.receive(No.blocking).bytes.shouldEqual("foo/hello"); 38 39 // but not messages that don't 40 pub.send("bar/oops"); 41 sub.receive(No.blocking).bytes.length.should == 0; 42 43 // after unsubscribing, messages are no longer received 44 sub.setOption(NanoSocket.Option.unsubscribeTopic, "foo"); 45 pub.send("foo/hello"); 46 sub.receive(No.blocking).bytes.length.should == 0; 47 } 48 49 50 // ASAN doesn't like D threads 51 version(nanomsg_wrapper_asan) {} 52 else { 53 @("reqrep") 54 @safe unittest { 55 import std.concurrency: spawnLinked, send; 56 57 const uri = "inproc://test_reqrep"; 58 const requester = NanoSocket(NanoSocket.Protocol.request, ConnectTo(uri)); 59 60 enum timeoutMs = 50; 61 requester.setOption(NanoSocket.Option.receiveTimeoutMs, timeoutMs); 62 63 auto tid = () @trusted { return spawnLinked(&responder, uri, timeoutMs); }(); 64 requester.send("shake?").bytes.shouldEqual("shake? yep!"); 65 () @trusted { tid.send(Stop()); }(); 66 } 67 } 68 69 70 private struct Respond { string value; } 71 private struct Stop {} 72 73 /// utility function for unit tests/examples 74 private void responder(in string uri, in int timeoutMs) { 75 import std.concurrency: receiveTimeout; 76 import std.datetime: msecs; 77 78 const socket = NanoSocket(NanoSocket.Protocol.response, const BindTo(uri)); 79 socket.setOption(NanoSocket.Option.receiveTimeoutMs, timeoutMs); 80 81 for(bool done; !done;) { 82 receiveTimeout(10.msecs, 83 (Stop _) { 84 done = true; 85 }, 86 ); 87 88 const resp = socket.receive(No.blocking); 89 if(resp.bytes.length) socket.send(resp.bytes ~ cast(ubyte[])" yep!"); 90 } 91 } 92 93 94 version(Windows) {} //FIXME 95 else { 96 @("push.TCP") 97 @safe unittest { 98 import core.thread: Thread, msecs; 99 100 auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo("tcp://localhost:13248")); 101 auto push = NanoSocket(NanoSocket.Protocol.push, ConnectTo("tcp://localhost:13248")); 102 103 enum numTimes = 10; 104 105 foreach(i; 0 .. numTimes) 106 push.send("foo"); 107 108 () @trusted { Thread.sleep(50.msecs); }(); 109 110 foreach(i; 0 .. numTimes) 111 pull.receive(No.blocking).bytes.shouldEqual("foo"); 112 } 113 } 114 115 116 @HiddenTest /// it's here to show that this can fail, but it doesn't always 117 @("push.IPC") 118 @safe unittest { 119 auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_push_pull_test")); 120 auto push = NanoSocket(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_push_pull_test")); 121 122 enum numTimes = 5; 123 124 foreach(i; 0 .. numTimes) 125 push.send("foo"); 126 127 foreach(i; 0 .. numTimes) 128 pull.receive(No.blocking).bytes.shouldEqual("foo"); 129 } 130 131 132 @("bind.several addresses at once") 133 @safe unittest { 134 auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo(["ipc://nanomsg_ipc_push_pull_1", 135 "ipc://nanomsg_ipc_push_pull_2"])); 136 pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10); 137 138 auto push1 = NanoSocket(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_push_pull_1")); 139 auto push2 = NanoSocket(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_push_pull_2")); 140 141 push1.setOption(NanoSocket.Option.sendTimeoutMs, 10); 142 push2.setOption(NanoSocket.Option.sendTimeoutMs, 10); 143 144 push1.send("foo"); 145 push2.send("bar"); 146 147 pull.receive.bytes.shouldEqual("foo"); 148 pull.receive.bytes.shouldEqual("bar"); 149 } 150 151 152 @("init.after.construction") 153 @safe unittest { 154 NanoSocket pull; 155 NanoSocket push; 156 157 pull.initialize(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_init_after")); 158 push.initialize(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_init_after")); 159 160 pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10); 161 push.setOption(NanoSocket.Option.sendTimeoutMs, 10); 162 163 push.send("foo"); 164 push.send("bar"); 165 166 pull.receive.bytes.shouldEqual("foo"); 167 pull.receive.bytes.shouldEqual("bar"); 168 } 169 170 171 @("init.twice") 172 @safe unittest { 173 NanoSocket pull; 174 pull.initialize(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_init_twice")); 175 pull.initialize(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_init_twice")); 176 } 177 178 179 @Tags("notravis") 180 @("init.send throws if not initialised") 181 @safe unittest { 182 enum uri = "ipc://nanomsg_init_send_throws"; 183 auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo(uri)); 184 NanoSocket push; 185 push.send("foo").shouldThrow; 186 } 187 188 189 @("receive.buffer.nothing") 190 @safe unittest { 191 NanoSocket pull; 192 pull.initialize(NanoSocket.Protocol.pull, BindTo("inproc://nanomsg_receive_buffer")); 193 ubyte[1024] buf; 194 scope bytes = pull.receive(buf, No.blocking); 195 bytes.length.should == 0; 196 } 197 198 199 @("receive.buffer.something") 200 @safe unittest { 201 202 import std.range: repeat, take; 203 NanoSocket pull, push; 204 205 enum uri = "inproc://nanomsg_big_receive"; 206 pull.initialize(NanoSocket.Protocol.pull, BindTo(uri)); 207 push.initialize(NanoSocket.Protocol.push, ConnectTo(uri)); 208 209 pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10); 210 push.setOption(NanoSocket.Option.sendTimeoutMs, 10); 211 212 enum numBytes = 32_000; 213 push.send(new ubyte[numBytes]); 214 pull.receive.bytes.toBytes.shouldEqual(0.repeat.take(numBytes)); 215 } 216 217 218 @("receive.nogc.implicit") 219 @safe unittest { 220 enum uri = "inproc://nanomsg_receive_nogc"; 221 222 NanoSocket pull; 223 pull.initialize(NanoSocket.Protocol.pull, BindTo(uri)); 224 pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10); 225 226 NanoSocket push; 227 push.initialize(NanoSocket.Protocol.push, ConnectTo(uri)); 228 push.setOption(NanoSocket.Option.sendTimeoutMs, 10); 229 230 push.send("Don't need the GC to receive"); 231 const buf = pull.receive; 232 const str = () @trusted { return cast(const(char)[]) buf.bytes.dup; }(); 233 str.shouldEqual("Don't need the GC to receive"); 234 } 235 236 @("access underlying socket") 237 unittest { 238 239 import std.range: iota; 240 import std.array : array; 241 import nanomsg.bindings : nn_send, nn_recv, nn_freemsg, NN_MSG; 242 243 NanoSocket pull, push; 244 245 enum uri = "inproc://nanomsg_raw_sock"; 246 pull.initialize(NanoSocket.Protocol.pull, BindTo(uri)); 247 push.initialize(NanoSocket.Protocol.push, ConnectTo(uri)); 248 249 pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10); 250 push.setOption(NanoSocket.Option.sendTimeoutMs, 10); 251 252 auto sourceData = iota(ubyte(0), ubyte(255)).array; 253 push.nanoSock.nn_send(sourceData.ptr, sourceData.length, 0); 254 void* res; 255 auto len = pull.nanoSock.nn_recv(&res, NN_MSG, 0); 256 assert(len == 255); 257 scope (exit) nn_freemsg(res); 258 (cast(ubyte*)res)[0 .. 255].shouldEqual(sourceData); 259 } 260 261 ubyte[] toBytes(T)(T bytes) @trusted { 262 return cast(ubyte[]) bytes.dup; 263 } 264