1 /** 2 nanomsg is a socket library that provides several common communication patterns. It aims to make the networking layer fast, scalable, and easy to use. 3 Implemented in C, it works on a wide range of operating systems with no further dependencies. 4 5 This module implements a convenience wrapper API for nanomsg 6 7 Authors: Laeeth Isharc and Atila Neves (Kaleidic Associates Advisory Limited) 8 9 Example code: 10 http://nanomsg.code.kaleidic.io/nanomsg.wrap.responder.html 11 http://nanomsg.code.kaleidic.io/nanomsg.wrap.checkNanoSocket.html 12 http://nanomsg.code.kaleidic.io/examples/nanomsg.examples.html 13 14 */ 15 module nanomsg.wrap; 16 17 18 import nanomsg.bindings; 19 public import std.typecons: Yes, No; // to facilitate using send, receive 20 21 version(unittest) 22 import unit_threaded; 23 else 24 enum HiddenTest; 25 26 27 /// wrapper for a string uri to connect to 28 struct ConnectTo { 29 string uri; 30 } 31 32 /// wrapper for a string uri to bind to 33 struct BindTo { 34 35 this(inout(string) uri) inout @safe pure { 36 this([uri]); 37 } 38 39 this(inout(string)[] uris) inout @safe pure { 40 this.uris = uris; 41 } 42 43 string[] uris; 44 } 45 46 /** 47 48 NanoSocket - high level wrapper for a nanomsg socket 49 50 */ 51 struct NanoSocket { 52 53 import std.traits: isArray; 54 import std.typecons: Flag; 55 import std.datetime: Duration; 56 57 /// nanomsg protocol 58 enum Protocol { 59 request, 60 response, 61 subscribe, 62 publish, 63 pull, 64 push, 65 pair, 66 surveyor, 67 respondent, 68 bus, 69 } 70 71 /// nanomsg socket options 72 enum Option { 73 lingerMs, /// How long to try and send pending messages after nn_close. -1 means infinite 74 sendBufferSize, // Size of the send buffer in bytes 75 receiveBufferSize, // Size of the receive buffer in bytes 76 receiveMaxSize, /// Maximum message size that can be received, in bytes 77 sendTimeoutMs, /// How long in milliseconds it takes for send to timeout 78 receiveTimeoutMs, /// How long in milliseconds it takes for receive to timeout 79 reconnectIntervalMs, /// How long to wait to re-establish connection 80 reconnectIntervalMax, /// Maximum reconnect interval 81 sendPriority, /// Outbound priority for endpoints added to socket 82 receivePriority, /// Inbout priority for endpoints added to socket 83 ipv4Only, /// Self-explanatory 84 socketName, /// Socket name for error reporting and statistics 85 timeToLive, /// Number of hops before message is dropped 86 subscribeTopic, /// Subscribe to topic 87 unsubscribeTopic, /// Unsubscribe to topic 88 tcpNoDelay, /// Disables Nagle's algorithm 89 surveyorDeadlineMs, /// How long to wait for responses in milliseconds 90 } 91 92 /// this(this) disabled to avoid sockets being destroyed 93 @disable this(this); 94 95 /// invalid FD 96 enum INVALID_FD = -1; 97 98 /// constructor 99 this(in Protocol protocol, in int domain = AF_SP) @safe { 100 init(protocol, domain); 101 } 102 103 /// constructor 104 this(in Protocol protocol, in BindTo bindTo, int domain = AF_SP) @safe { 105 init(protocol, bindTo, domain); 106 } 107 108 /// constructor 109 this(in Protocol protocol, in ConnectTo connectTo, int domain = AF_SP) @safe { 110 init(protocol, connectTo, domain); 111 } 112 113 /// destructor 114 ~this() @safe nothrow { 115 close; 116 } 117 118 /// Same as the revelant constructor, can be used on exisiting objects 119 void init(in Protocol protocol, in int domain = AF_SP) @trusted { 120 int protocolToInt(Protocol protocol) { 121 final switch(protocol) with(Protocol) { 122 case request: 123 return NN_REQ; 124 case response: 125 return NN_REP; 126 case publish: 127 return NN_PUB; 128 case subscribe: 129 return NN_SUB; 130 case pull: 131 return NN_PULL; 132 case push: 133 return NN_PUSH; 134 case pair: 135 return NN_PAIR; 136 case surveyor: 137 return NN_SURVEYOR; 138 case respondent: 139 return NN_RESPONDENT; 140 case bus: 141 return NN_BUS; 142 } 143 } 144 145 close; // init can be called twice 146 _nanoSock = nn_socket(domain, protocolToInt(protocol)); 147 _protocol = protocol; 148 enforceNanoMsgRet(_nanoSock); 149 } 150 151 /// Same as the revelant constructor, can be used on exisiting objects 152 void init(in Protocol protocol, in BindTo bindTo, int domain = AF_SP) @trusted { 153 import std.string: replace; 154 155 init(protocol, domain); 156 157 // this is so it's easy to specify the same string 158 // for both ends of the socket 159 foreach(uri; bindTo.uris) { 160 bind(uri.replace("localhost", "*")); 161 } 162 } 163 164 /// Same as the revelant constructor, can be used on exisiting objects 165 void init(in Protocol protocol, in ConnectTo connectTo, int domain = AF_SP) @trusted { 166 init(protocol, domain); 167 connect(connectTo.uri); 168 169 version(Windows) { 170 // on Windows sometimes the socket tries to send before the TCP handshake 171 import core.thread; 172 Thread.sleep(100.msecs); 173 } 174 } 175 176 /// close socket 177 void close() @trusted nothrow { 178 if(_nanoSock != INVALID_FD) { 179 _nanoSock.nn_close; 180 } 181 } 182 183 /// set socket option to a value 184 ref inout(NanoSocket) setOption(T)(Option option, T val) inout { 185 const optionC = toOptionC(option); 186 setOption(optionC.level, optionC.option, val); 187 return this; 188 } 189 190 /// get socket option value 191 T getOption(T)(Option option) const { 192 const optionC = toOptionC(option); 193 return getOption!T(optionC.level, optionC.option); 194 } 195 196 /// receive 197 ubyte[] receive(int BUF_SIZE = 1024)(Flag!"blocking" blocking = Yes.blocking) const { 198 import std.exception: enforce; 199 import std.conv: text; 200 import core.stdc.errno: EAGAIN, EINTR; 201 202 ubyte[BUF_SIZE] buf; 203 const flags = blocking ? 0 : NN_DONTWAIT; 204 const numBytes = nn_recv(_nanoSock, buf.ptr, buf.length, flags); 205 206 if(blocking) enforceNanoMsgRet(numBytes); 207 208 return numBytes >= 0 ? buf[0 .. numBytes].dup : []; 209 } 210 211 /** 212 Sends the bytes as expected. If the protocol is Request, then returns 213 the response, otherwise returns an empty array. 214 */ 215 ubyte[] send(T)(T[] data, 216 Flag!"blocking" blocking = Yes.blocking, 217 in string file = __FILE__, 218 in size_t line = __LINE__) 219 const 220 { 221 import std.conv: text; 222 223 const sent = nn_send(_nanoSock, data.ptr, data.length, flags(blocking)); 224 if(blocking) { 225 if(sent != data.length) 226 throw new Exception(text("Expected to send ", data.length, " bytes but sent ", sent), file, line); 227 } 228 229 return _protocol == Protocol.request ? receive(blocking) : []; 230 } 231 232 /** 233 Tries to send bytes to the other side. 234 duration is how long to try for 235 recvBlocking controls whether or not to block on reception of a response. 236 This only matters when the protocol is request/response 237 Returns the response if in request mode, otherwise an empty byte slice. 238 */ 239 ubyte[] trySend(T)(T[] data, Duration duration, Flag!"blocking" recvBlocking = Yes.blocking) { 240 import std.exception: enforce; 241 import std.datetime: StopWatch, AutoStart, msecs; 242 import std.conv: text; 243 import core.thread: Thread; 244 245 int sent; 246 auto sw = StopWatch(AutoStart.yes); 247 do { 248 sent = nn_send(_nanoSock, data.ptr, data.length, flags(No.blocking)); 249 if(sent != data.length) Thread.sleep(10.msecs); // play nice with other threads and the CPU 250 } while(sent != data.length && cast(Duration)sw.peek < duration); 251 252 enforce(sent == data.length, 253 text("Expected to send ", data.length, " bytes but sent ", sent)); 254 255 return _protocol == Protocol.request ? receive(recvBlocking) : []; 256 } 257 258 /// connect 259 void connect(in string uri, in string file = __FILE__, in size_t line = __LINE__) { 260 import std.string: toStringz; 261 enforceNanoMsgRet(nn_connect(_nanoSock, uri.toStringz), file, line); 262 _uri = uri; 263 _connection = Connection.connected; 264 } 265 266 /// bind 267 void bind(in string uri, in string file = __FILE__, in size_t line = __LINE__) { 268 import std.string: toStringz; 269 enforceNanoMsgRet(nn_bind(_nanoSock, uri.toStringz), file, line); 270 _uri = uri; 271 _connection = Connection.bound; 272 } 273 274 /// get protocol 275 Protocol protocol() @safe @nogc pure const nothrow { 276 return _protocol; 277 } 278 279 /// get URI 280 string uri() @safe @nogc pure const nothrow { 281 return _uri; 282 } 283 284 /// toString 285 string toString() @safe pure const { 286 import std.conv: text; 287 288 if(_connection == Connection.none) 289 return text(protocol); 290 291 const connText = _connection == Connection.bound ? "@" : "@@"; 292 return text(_protocol, connText, _uri); 293 } 294 295 private: 296 297 enum Connection { 298 none, 299 bound, 300 connected, 301 } 302 303 int _nanoSock = INVALID_FD; 304 Protocol _protocol; 305 string _uri; 306 Connection _connection; 307 308 void enforceNanoMsgRet(E)(lazy E expr, string file = __FILE__, size_t line = __LINE__) const { 309 import std.conv: text; 310 const value = expr(); 311 if(value < 0) 312 throw new Exception(text("nanomsg expression failed with value ", value, 313 " errno ", nn_errno, ", error: ", nn_strerror(nn_errno)), 314 file, 315 line); 316 } 317 318 // the int level and option values needed by the nanomsg C API 319 static struct OptionC { 320 int level; 321 int option; 322 } 323 324 static OptionC toOptionC(Option option) @safe { 325 final switch(option) with(Option) { 326 case lingerMs: 327 return OptionC(NN_SOL_SOCKET, NN_LINGER); 328 329 case sendBufferSize: 330 return OptionC(NN_SOL_SOCKET, NN_SNDBUF); 331 332 case receiveBufferSize: 333 return OptionC(NN_SOL_SOCKET, NN_RCVBUF); 334 335 case receiveMaxSize: 336 return OptionC(NN_SOL_SOCKET, NN_RCVMAXSIZE); 337 338 case sendTimeoutMs: 339 return OptionC(NN_SOL_SOCKET, NN_SNDTIMEO); 340 341 case receiveTimeoutMs: 342 return OptionC(NN_SOL_SOCKET, NN_RCVTIMEO); 343 344 case reconnectIntervalMs: 345 return OptionC(NN_SOL_SOCKET, NN_RECONNECT_IVL); 346 347 case reconnectIntervalMax: 348 return OptionC(NN_SOL_SOCKET, NN_RECONNECT_IVL_MAX); 349 350 case sendPriority: 351 return OptionC(NN_SOL_SOCKET, NN_SNDPRIO); 352 353 case receivePriority: 354 return OptionC(NN_SOL_SOCKET, NN_RCVPRIO); 355 356 case ipv4Only: 357 return OptionC(NN_SOL_SOCKET, NN_IPV4ONLY); 358 359 case socketName: 360 return OptionC(NN_SOL_SOCKET, NN_SOCKET_NAME); 361 362 case timeToLive: 363 return OptionC(NN_SOL_SOCKET, NN_TTL); 364 365 case subscribeTopic: 366 return OptionC(NN_SUB, NN_SUB_SUBSCRIBE); 367 368 case unsubscribeTopic: 369 return OptionC(NN_SUB, NN_SUB_UNSUBSCRIBE); 370 371 case tcpNoDelay: 372 return OptionC(NN_TCP, NN_TCP_NODELAY); 373 374 case surveyorDeadlineMs: 375 return OptionC(NN_SURVEYOR, NN_SURVEYOR_DEADLINE); 376 } 377 } 378 379 void setOption(T)(int level, int option, ref T val) const if(isArray!T) { 380 enforceNanoMsgRet(nn_setsockopt(_nanoSock, level, option, val.ptr, val.length)); 381 } 382 383 void setOption(T)(int level, int option, T val) const if(!isArray!T) { 384 enforceNanoMsgRet(nn_setsockopt(_nanoSock, level, option, &val, val.sizeof)); 385 } 386 387 T getOption(T)(int level, int option) const if(isArray!T) { 388 import std.traits: Unqual; 389 import std.conv: to; 390 391 // ElementType!string is dchar, and we don't want that, 392 // so instead we use this typeof 393 alias U = Unqual!(typeof(T.init[0]))[1000]; 394 U val; 395 ulong length = val.length; 396 enforceNanoMsgRet(nn_getsockopt(_nanoSock, level, option, cast(void*)val.ptr, &length)); 397 return val[0 .. length].to!T; 398 } 399 400 T getOption(T)(int level, int option) const if(!isArray!T) { 401 import std.exception: enforce; 402 import std.conv: text; 403 404 T val; 405 ulong length = T.sizeof; 406 enforceNanoMsgRet(nn_getsockopt(_nanoSock, level, option, &val, &length)); 407 enforce(length == T.sizeof, 408 text("getsockopt returned ", length, " but sizeof(", T.stringof, ") is ", T.sizeof)); 409 return val; 410 } 411 412 static int flags(Flag!"blocking" blocking) @safe pure { 413 return blocking ? 0 : NN_DONTWAIT; 414 } 415 } 416 417 /// check nanomsg socket 418 void checkNanoSocket(T)() { 419 T s = T(NanoSocket.Protocol.subscribe, ConnectTo("foobar")); 420 s.send("foobar"); 421 s.setOption(NanoSocket.Option.subscribeTopic, "topic"); 422 s.setOption(NanoSocket.Option.receiveTimeoutMs, 100); 423 ubyte[] msg = s.receive(Yes.blocking); 424 s.send(msg); 425 } 426 enum isNanoSocket(T) = is(typeof(checkNanoSocket!T)); 427 static assert(isNanoSocket!NanoSocket); 428 429 /** 430 Examples: 431 */ 432 /// set/get option 433 /// 434 @("set/get option") 435 unittest { 436 auto sock = NanoSocket(NanoSocket.Protocol.subscribe); 437 sock.getOption!int(NanoSocket.Option.sendTimeoutMs).shouldEqual(-1); 438 sock.setOption(NanoSocket.Option.sendTimeoutMs, 42); 439 sock.getOption!int(NanoSocket.Option.sendTimeoutMs).shouldEqual(42); 440 } 441 442 /// publish/subscribe 443 /// 444 @("pub/sub") 445 unittest { 446 const uri = "inproc://test_pubsub"; 447 auto pub = NanoSocket(NanoSocket.Protocol.publish, BindTo(uri)); 448 auto sub = NanoSocket(NanoSocket.Protocol.subscribe, ConnectTo(uri)); 449 sub.setOption(NanoSocket.Option.subscribeTopic, "foo"); 450 451 // messages that start with the subscription topic should be received 452 pub.send("foo/hello"); 453 sub.receive(No.blocking).shouldEqual("foo/hello"); 454 455 // but not messages that don't 456 pub.send("bar/oops"); 457 sub.receive(No.blocking).shouldBeEmpty; 458 459 // after unsubscribing, messages are no longer received 460 sub.setOption(NanoSocket.Option.unsubscribeTopic, "foo"); 461 pub.send("foo/hello"); 462 sub.receive(No.blocking).shouldBeEmpty; 463 } 464 465 /// request/response 466 /// 467 @("req/rep") 468 unittest { 469 import std.concurrency: spawnLinked, send; 470 471 const uri = "inproc://test_reqrep"; 472 const requester = NanoSocket(NanoSocket.Protocol.request, ConnectTo(uri)); 473 474 enum timeoutMs = 50; 475 requester.setOption(NanoSocket.Option.receiveTimeoutMs, timeoutMs); 476 477 auto tid = spawnLinked(&responder, uri, timeoutMs); 478 requester.send("shake?").shouldEqual("shake? yep!"); 479 tid.send(Stop()); 480 } 481 482 /** 483 Example: 484 utility function 485 */ 486 version(unittest) { 487 import std.concurrency: Tid; 488 489 /// utility struct for unit test 490 struct Respond { string value; } 491 /// utility struct for unit test 492 struct Stop {} 493 494 /// utility function for unit tests/examples 495 void responder(in string uri, in int timeoutMs) { 496 import std.concurrency: receiveTimeout; 497 import std.datetime: msecs; 498 499 const socket = NanoSocket(NanoSocket.Protocol.response, BindTo(uri)); 500 socket.setOption(NanoSocket.Option.receiveTimeoutMs, timeoutMs); 501 502 for(bool done; !done;) { 503 receiveTimeout(10.msecs, 504 (Stop _) { 505 done = true; 506 }, 507 ); 508 509 const bytes = socket.receive(No.blocking); 510 if(bytes.length) socket.send(bytes ~ cast(ubyte[])" yep!"); 511 } 512 } 513 } 514 515 /** 516 Example: 517 push/pull over TCP 518 */ 519 @("push/pull over TCP") 520 unittest { 521 import core.thread: Thread, msecs; 522 523 auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo("tcp://localhost:13248")); 524 auto push = NanoSocket(NanoSocket.Protocol.push, ConnectTo("tcp://localhost:13248")); 525 526 enum numTimes = 10; 527 528 foreach(i; 0 .. numTimes) 529 push.send("foo"); 530 531 Thread.sleep(50.msecs); 532 533 foreach(i; 0 .. numTimes) 534 pull.receive(No.blocking).shouldEqual("foo"); 535 } 536 537 /** 538 Example: 539 push/pull over IPC 540 */ 541 @HiddenTest /// it's here to show that this can fail, but it doesn't always 542 @("push/pull over IPC") 543 unittest { 544 auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_push_pull_test")); 545 auto push = NanoSocket(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_push_pull_test")); 546 547 enum numTimes = 5; 548 549 foreach(i; 0 .. numTimes) 550 push.send("foo"); 551 552 foreach(i; 0 .. numTimes) 553 pull.receive(No.blocking).shouldEqual("foo"); 554 } 555 556 557 @("bind to several addresses at once") 558 unittest { 559 auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo(["ipc://nanomsg_ipc_push_pull_1", 560 "ipc://nanomsg_ipc_push_pull_2"])); 561 pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10); 562 563 auto push1 = NanoSocket(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_push_pull_1")); 564 auto push2 = NanoSocket(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_push_pull_2")); 565 566 push1.setOption(NanoSocket.Option.sendTimeoutMs, 10); 567 push2.setOption(NanoSocket.Option.sendTimeoutMs, 10); 568 569 push1.send("foo"); 570 push2.send("bar"); 571 572 pull.receive.shouldEqual("foo"); 573 pull.receive.shouldEqual("bar"); 574 } 575 576 577 @("init NanoSocket after construction") 578 unittest { 579 NanoSocket pull; 580 NanoSocket push; 581 582 pull.init(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_init_after")); 583 push.init(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_init_after")); 584 585 pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10); 586 push.setOption(NanoSocket.Option.sendTimeoutMs, 10); 587 588 push.send("foo"); 589 push.send("bar"); 590 591 pull.receive.shouldEqual("foo"); 592 pull.receive.shouldEqual("bar"); 593 } 594 595 @("Can init twice") 596 unittest { 597 NanoSocket pull; 598 pull.init(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_init_twice")); 599 pull.init(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_init_twice")); 600 }