1 /** 2 nanomsg is a socket library that provides several common communication patterns. It aims to make the networking layer fast, scalable, and easy to use. 3 Implemented in C, it works on a wide range of operating systems with no further dependencies. 4 5 This module implements a convenience wrapper API for nanomsg 6 7 Authors: Laeeth Isharc and Atila Neves (Kaleidic Associates Advisory Limited) 8 9 Example code: 10 http://nanomsg.code.kaleidic.io/nanomsg.wrap.responder.html 11 http://nanomsg.code.kaleidic.io/nanomsg.wrap.checkNanoSocket.html 12 http://nanomsg.code.kaleidic.io/examples/nanomsg.examples.html 13 14 */ 15 module nanomsg.wrap; 16 17 18 import nanomsg.bindings; 19 public import std.typecons: Yes, No; // to facilitate using send, receive 20 21 version(unittest) 22 import unit_threaded; 23 else 24 enum HiddenTest; 25 26 27 /// wrapper for a string uri to connect to 28 struct ConnectTo { 29 string uri; 30 } 31 32 /// wrapper for a string uri to bind to 33 struct BindTo { 34 35 this(inout(string) uri) inout @safe pure { 36 this([uri]); 37 } 38 39 this(inout(string)[] uris) inout @safe pure { 40 this.uris = uris; 41 } 42 43 string[] uris; 44 } 45 46 struct TotalDuration { 47 import std.datetime: Duration; 48 Duration value; 49 alias value this; 50 } 51 52 struct RetryDuration { 53 import std.datetime: Duration; 54 Duration value; 55 alias value this; 56 } 57 58 /** 59 60 NanoSocket - high level wrapper for a nanomsg socket 61 62 */ 63 struct NanoSocket { 64 65 import std.traits: isArray; 66 import std.typecons: Flag; 67 import std.datetime: Duration; 68 69 /// nanomsg protocol 70 enum Protocol { 71 request, 72 response, 73 subscribe, 74 publish, 75 pull, 76 push, 77 pair, 78 surveyor, 79 respondent, 80 bus, 81 } 82 83 /// nanomsg socket options 84 enum Option { 85 lingerMs, /// How long to try and send pending messages after nn_close. -1 means infinite 86 sendBufferSize, // Size of the send buffer in bytes 87 receiveBufferSize, // Size of the receive buffer in bytes 88 receiveMaxSize, /// Maximum message size that can be received, in bytes 89 sendTimeoutMs, /// How long in milliseconds it takes for send to timeout 90 receiveTimeoutMs, /// How long in milliseconds it takes for receive to timeout 91 reconnectIntervalMs, /// How long to wait to re-establish connection 92 reconnectIntervalMax, /// Maximum reconnect interval 93 sendPriority, /// Outbound priority for endpoints added to socket 94 receivePriority, /// Inbout priority for endpoints added to socket 95 ipv4Only, /// Self-explanatory 96 socketName, /// Socket name for error reporting and statistics 97 timeToLive, /// Number of hops before message is dropped 98 subscribeTopic, /// Subscribe to topic 99 unsubscribeTopic, /// Unsubscribe to topic 100 tcpNoDelay, /// Disables Nagle's algorithm 101 surveyorDeadlineMs, /// How long to wait for responses in milliseconds 102 } 103 104 /// this(this) disabled to avoid sockets being destroyed 105 @disable this(this); 106 107 /// invalid FD 108 enum INVALID_FD = -1; 109 110 /// constructor 111 this(in Protocol protocol, in int domain = AF_SP) @safe { 112 initialize(protocol, domain); 113 } 114 115 /// constructor 116 this(in Protocol protocol, in BindTo bindTo, int domain = AF_SP) @safe { 117 initialize(protocol, bindTo, domain); 118 } 119 120 /// constructor 121 this(in Protocol protocol, in ConnectTo connectTo, int domain = AF_SP) @safe { 122 initialize(protocol, connectTo, domain); 123 } 124 125 /// destructor 126 ~this() @safe nothrow { 127 close; 128 } 129 130 /// Same as the revelant constructor, can be used on exisiting objects 131 void initialize(in Protocol protocol, in int domain = AF_SP) @trusted { 132 int protocolToInt(Protocol protocol) { 133 final switch(protocol) with(Protocol) { 134 case request: 135 return NN_REQ; 136 case response: 137 return NN_REP; 138 case publish: 139 return NN_PUB; 140 case subscribe: 141 return NN_SUB; 142 case pull: 143 return NN_PULL; 144 case push: 145 return NN_PUSH; 146 case pair: 147 return NN_PAIR; 148 case surveyor: 149 return NN_SURVEYOR; 150 case respondent: 151 return NN_RESPONDENT; 152 case bus: 153 return NN_BUS; 154 } 155 } 156 157 close; // init can be called twice 158 _nanoSock = nn_socket(domain, protocolToInt(protocol)); 159 _protocol = protocol; 160 enforceNanoMsgRet(_nanoSock); 161 } 162 163 /// Same as the revelant constructor, can be used on exisiting objects 164 void initialize(in Protocol protocol, in BindTo bindTo, int domain = AF_SP) @trusted { 165 import std.string: replace; 166 167 initialize(protocol, domain); 168 169 // this is so it's easy to specify the same string 170 // for both ends of the socket 171 foreach(uri; bindTo.uris) { 172 bind(uri.replace("localhost", "*")); 173 } 174 } 175 176 /// Same as the revelant constructor, can be used on exisiting objects 177 void initialize(in Protocol protocol, in ConnectTo connectTo, int domain = AF_SP) @trusted { 178 initialize(protocol, domain); 179 connect(connectTo.uri); 180 181 version(Windows) { 182 // on Windows sometimes the socket tries to send before the TCP handshake 183 import core.thread; 184 Thread.sleep(100.msecs); 185 } 186 } 187 188 /// close socket 189 void close() @trusted nothrow { 190 if(_nanoSock != INVALID_FD) { 191 _nanoSock.nn_close; 192 } 193 } 194 195 /// set socket option to a value 196 ref inout(NanoSocket) setOption(T)(Option option, T val) inout { 197 const optionC = toOptionC(option); 198 setOption(optionC.level, optionC.option, val); 199 return this; 200 } 201 202 /// get socket option value 203 T getOption(T)(Option option) const { 204 const optionC = toOptionC(option); 205 return getOption!T(optionC.level, optionC.option); 206 } 207 208 /// receive 209 ubyte[] receive(int BUF_SIZE = 1024)(Flag!"blocking" blocking = Yes.blocking) const { 210 import std.exception: enforce; 211 import std.conv: text; 212 import core.stdc.errno: EAGAIN, EINTR; 213 214 ubyte[BUF_SIZE] buf; 215 const flags = blocking ? 0 : NN_DONTWAIT; 216 const numBytes = nn_recv(_nanoSock, buf.ptr, buf.length, flags); 217 218 if(blocking) enforceNanoMsgRet(numBytes); 219 220 return numBytes >= 0 ? buf[0 .. numBytes].dup : []; 221 } 222 223 /** 224 Sends the bytes as expected. If the protocol is Request, then returns 225 the response, otherwise returns an empty array. 226 */ 227 ubyte[] send(T)(T[] data, 228 Flag!"blocking" blocking = Yes.blocking, 229 in string file = __FILE__, 230 in size_t line = __LINE__) 231 const 232 { 233 import std.conv: text; 234 235 const sent = nn_send(_nanoSock, data.ptr, data.length, flags(blocking)); 236 if(blocking) { 237 if(sent != data.length) 238 throw new Exception(text("Expected to send ", data.length, " bytes but sent ", sent), file, line); 239 } 240 241 ubyte[] empty; 242 return () @trusted { return _protocol == Protocol.request 243 ? receive(blocking) 244 : (sent == data.length ? cast(ubyte[])data : empty); }(); 245 } 246 247 /** 248 Tries to send bytes to the other side. 249 duration is how long to try for 250 recvBlocking controls whether or not to block on reception of a response. 251 This only matters when the protocol is request/response 252 Returns the response if in request mode, otherwise an empty byte slice. 253 */ 254 ubyte[] trySend(T)(T[] data, Duration totalDuration, Flag!"blocking" recvBlocking = Yes.blocking) { 255 import std.datetime: msecs; 256 return trySend(data, TotalDuration(totalDuration), RetryDuration(10.msecs), recvBlocking); 257 } 258 259 /** 260 Tries to send bytes to the other side. 261 duration is how long to try for 262 recvBlocking controls whether or not to block on reception of a response. 263 This only matters when the protocol is request/response 264 Returns the response if in request mode, otherwise an empty byte slice. 265 */ 266 ubyte[] trySend(T)(T[] data, 267 TotalDuration totalDuration, 268 RetryDuration retryDuration, 269 Flag!"blocking" recvBlocking = Yes.blocking) 270 { 271 import std.exception: enforce; 272 import std.datetime: StopWatch, AutoStart, msecs; 273 import std.conv: text; 274 import core.thread: Thread; 275 276 int sent; 277 auto sw = StopWatch(AutoStart.yes); 278 do { 279 sent = nn_send(_nanoSock, &data[0], data.length, flags(No.blocking)); 280 if(sent != data.length) Thread.sleep(retryDuration); // play nice with other threads and the CPU 281 } while(sent != data.length && cast(Duration)sw.peek < totalDuration); 282 283 enforce(sent == data.length, 284 text("Expected to send ", data.length, " bytes but sent ", sent)); 285 286 return _protocol == Protocol.request ? receive(recvBlocking) : []; 287 } 288 289 @("trySend") 290 unittest { 291 import std.datetime: seconds, msecs; 292 293 enum uri = "ipc://try_send_test"; 294 auto pull = NanoSocket(Protocol.pull, BindTo(uri)); 295 auto push = NanoSocket(Protocol.push, ConnectTo(uri)); 296 push.trySend("foo", TotalDuration(1.seconds), RetryDuration(10.msecs)); 297 } 298 299 /// connect 300 void connect(in string uri, in string file = __FILE__, in size_t line = __LINE__) { 301 import std.string: toStringz; 302 enforceNanoMsgRet(nn_connect(_nanoSock, uri.toStringz), file, line); 303 _uri = uri; 304 _connection = Connection.connected; 305 } 306 307 /// bind 308 void bind(in string uri, in string file = __FILE__, in size_t line = __LINE__) { 309 import std.string: toStringz; 310 enforceNanoMsgRet(nn_bind(_nanoSock, uri.toStringz), file, line); 311 _uri = uri; 312 _connection = Connection.bound; 313 } 314 315 /// get protocol 316 Protocol protocol() @safe @nogc pure const nothrow { 317 return _protocol; 318 } 319 320 /// get URI 321 string uri() @safe @nogc pure const nothrow { 322 return _uri; 323 } 324 325 /// toString 326 string toString() @safe pure const { 327 import std.conv: text; 328 329 if(_connection == Connection.none) 330 return text(protocol); 331 332 const connText = _connection == Connection.bound ? "@" : "@@"; 333 return text(_protocol, connText, _uri); 334 } 335 336 private: 337 338 enum Connection { 339 none, 340 bound, 341 connected, 342 } 343 344 int _nanoSock = INVALID_FD; 345 Protocol _protocol; 346 string _uri; 347 Connection _connection; 348 349 void enforceNanoMsgRet(E)(lazy E expr, string file = __FILE__, size_t line = __LINE__) const { 350 import std.conv: text; 351 const value = expr(); 352 if(value < 0) 353 throw new Exception(text("nanomsg expression failed with value ", value, 354 " errno ", nn_errno, ", error: ", nn_strerror(nn_errno)), 355 file, 356 line); 357 } 358 359 // the int level and option values needed by the nanomsg C API 360 static struct OptionC { 361 int level; 362 int option; 363 } 364 365 static OptionC toOptionC(Option option) @safe { 366 final switch(option) with(Option) { 367 case lingerMs: 368 return OptionC(NN_SOL_SOCKET, NN_LINGER); 369 370 case sendBufferSize: 371 return OptionC(NN_SOL_SOCKET, NN_SNDBUF); 372 373 case receiveBufferSize: 374 return OptionC(NN_SOL_SOCKET, NN_RCVBUF); 375 376 case receiveMaxSize: 377 return OptionC(NN_SOL_SOCKET, NN_RCVMAXSIZE); 378 379 case sendTimeoutMs: 380 return OptionC(NN_SOL_SOCKET, NN_SNDTIMEO); 381 382 case receiveTimeoutMs: 383 return OptionC(NN_SOL_SOCKET, NN_RCVTIMEO); 384 385 case reconnectIntervalMs: 386 return OptionC(NN_SOL_SOCKET, NN_RECONNECT_IVL); 387 388 case reconnectIntervalMax: 389 return OptionC(NN_SOL_SOCKET, NN_RECONNECT_IVL_MAX); 390 391 case sendPriority: 392 return OptionC(NN_SOL_SOCKET, NN_SNDPRIO); 393 394 case receivePriority: 395 return OptionC(NN_SOL_SOCKET, NN_RCVPRIO); 396 397 case ipv4Only: 398 return OptionC(NN_SOL_SOCKET, NN_IPV4ONLY); 399 400 case socketName: 401 return OptionC(NN_SOL_SOCKET, NN_SOCKET_NAME); 402 403 case timeToLive: 404 return OptionC(NN_SOL_SOCKET, NN_TTL); 405 406 case subscribeTopic: 407 return OptionC(NN_SUB, NN_SUB_SUBSCRIBE); 408 409 case unsubscribeTopic: 410 return OptionC(NN_SUB, NN_SUB_UNSUBSCRIBE); 411 412 case tcpNoDelay: 413 return OptionC(NN_TCP, NN_TCP_NODELAY); 414 415 case surveyorDeadlineMs: 416 return OptionC(NN_SURVEYOR, NN_SURVEYOR_DEADLINE); 417 } 418 } 419 420 void setOption(T)(int level, int option, ref T val) const if(isArray!T) { 421 enforceNanoMsgRet(nn_setsockopt(_nanoSock, level, option, val.ptr, val.length)); 422 } 423 424 void setOption(T)(int level, int option, T val) const if(!isArray!T) { 425 enforceNanoMsgRet(nn_setsockopt(_nanoSock, level, option, &val, val.sizeof)); 426 } 427 428 T getOption(T)(int level, int option) const if(isArray!T) { 429 import std.traits: Unqual; 430 import std.conv: to; 431 432 // ElementType!string is dchar, and we don't want that, 433 // so instead we use this typeof 434 alias U = Unqual!(typeof(T.init[0]))[1000]; 435 U val; 436 ulong length = val.length; 437 enforceNanoMsgRet(nn_getsockopt(_nanoSock, level, option, cast(void*)val.ptr, &length)); 438 return val[0 .. length].to!T; 439 } 440 441 T getOption(T)(int level, int option) const if(!isArray!T) { 442 import std.exception: enforce; 443 import std.conv: text; 444 445 T val; 446 size_t length = T.sizeof; 447 enforceNanoMsgRet(nn_getsockopt(_nanoSock, level, option, cast(void*)&val, &length)); 448 enforce(length == T.sizeof, 449 text("getsockopt returned ", length, " but sizeof(", T.stringof, ") is ", T.sizeof)); 450 return val; 451 } 452 453 static int flags(Flag!"blocking" blocking) @safe pure { 454 return blocking ? 0 : NN_DONTWAIT; 455 } 456 } 457 458 /// check nanomsg socket 459 void checkNanoSocket(T)() { 460 T s = T(NanoSocket.Protocol.subscribe, ConnectTo("foobar")); 461 s.send("foobar"); 462 s.setOption(NanoSocket.Option.subscribeTopic, "topic"); 463 s.setOption(NanoSocket.Option.receiveTimeoutMs, 100); 464 ubyte[] msg = s.receive(Yes.blocking); 465 s.send(msg); 466 } 467 enum isNanoSocket(T) = is(typeof(checkNanoSocket!T)); 468 static assert(isNanoSocket!NanoSocket); 469 470 471 /** 472 Examples: 473 */ 474 /// set/get option 475 /// 476 @("set/get option") 477 unittest { 478 auto sock = NanoSocket(NanoSocket.Protocol.subscribe); 479 sock.getOption!int(NanoSocket.Option.sendTimeoutMs).shouldEqual(-1); 480 sock.setOption(NanoSocket.Option.sendTimeoutMs, 42); 481 sock.getOption!int(NanoSocket.Option.sendTimeoutMs).shouldEqual(42); 482 } 483 484 /// publish/subscribe 485 /// 486 @("pub/sub") 487 unittest { 488 const uri = "inproc://test_pubsub"; 489 auto pub = NanoSocket(NanoSocket.Protocol.publish, const BindTo(uri)); 490 auto sub = NanoSocket(NanoSocket.Protocol.subscribe, ConnectTo(uri)); 491 sub.setOption(NanoSocket.Option.subscribeTopic, "foo"); 492 493 // messages that start with the subscription topic should be received 494 pub.send("foo/hello"); 495 sub.receive(No.blocking).shouldEqual("foo/hello"); 496 497 // but not messages that don't 498 pub.send("bar/oops"); 499 sub.receive(No.blocking).shouldBeEmpty; 500 501 // after unsubscribing, messages are no longer received 502 sub.setOption(NanoSocket.Option.unsubscribeTopic, "foo"); 503 pub.send("foo/hello"); 504 sub.receive(No.blocking).shouldBeEmpty; 505 } 506 507 /// request/response 508 /// 509 @("req/rep") 510 unittest { 511 import std.concurrency: spawnLinked, send; 512 513 const uri = "inproc://test_reqrep"; 514 const requester = NanoSocket(NanoSocket.Protocol.request, ConnectTo(uri)); 515 516 enum timeoutMs = 50; 517 requester.setOption(NanoSocket.Option.receiveTimeoutMs, timeoutMs); 518 519 auto tid = spawnLinked(&responder, uri, timeoutMs); 520 requester.send("shake?").shouldEqual("shake? yep!"); 521 tid.send(Stop()); 522 } 523 524 /** 525 Example: 526 utility function 527 */ 528 version(unittest) { 529 import std.concurrency: Tid; 530 531 /// utility struct for unit test 532 struct Respond { string value; } 533 /// utility struct for unit test 534 struct Stop {} 535 536 /// utility function for unit tests/examples 537 void responder(in string uri, in int timeoutMs) { 538 import std.concurrency: receiveTimeout; 539 import std.datetime: msecs; 540 541 const socket = NanoSocket(NanoSocket.Protocol.response, const BindTo(uri)); 542 socket.setOption(NanoSocket.Option.receiveTimeoutMs, timeoutMs); 543 544 for(bool done; !done;) { 545 receiveTimeout(10.msecs, 546 (Stop _) { 547 done = true; 548 }, 549 ); 550 551 const bytes = socket.receive(No.blocking); 552 if(bytes.length) socket.send(bytes ~ cast(ubyte[])" yep!"); 553 } 554 } 555 } 556 557 /** 558 Example: 559 push/pull over TCP 560 */ 561 version(Windows) {} //FIXME 562 else { 563 @("push/pull over TCP") 564 unittest { 565 import core.thread: Thread, msecs; 566 567 auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo("tcp://localhost:13248")); 568 auto push = NanoSocket(NanoSocket.Protocol.push, ConnectTo("tcp://localhost:13248")); 569 570 enum numTimes = 10; 571 572 foreach(i; 0 .. numTimes) 573 push.send("foo"); 574 575 Thread.sleep(50.msecs); 576 577 foreach(i; 0 .. numTimes) 578 pull.receive(No.blocking).shouldEqual("foo"); 579 } 580 } 581 582 /** 583 Example: 584 push/pull over IPC 585 */ 586 @HiddenTest /// it's here to show that this can fail, but it doesn't always 587 @("push/pull over IPC") 588 unittest { 589 auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_push_pull_test")); 590 auto push = NanoSocket(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_push_pull_test")); 591 592 enum numTimes = 5; 593 594 foreach(i; 0 .. numTimes) 595 push.send("foo"); 596 597 foreach(i; 0 .. numTimes) 598 pull.receive(No.blocking).shouldEqual("foo"); 599 } 600 601 602 @("bind to several addresses at once") 603 unittest { 604 auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo(["ipc://nanomsg_ipc_push_pull_1", 605 "ipc://nanomsg_ipc_push_pull_2"])); 606 pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10); 607 608 auto push1 = NanoSocket(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_push_pull_1")); 609 auto push2 = NanoSocket(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_push_pull_2")); 610 611 push1.setOption(NanoSocket.Option.sendTimeoutMs, 10); 612 push2.setOption(NanoSocket.Option.sendTimeoutMs, 10); 613 614 push1.send("foo"); 615 push2.send("bar"); 616 617 pull.receive.shouldEqual("foo"); 618 pull.receive.shouldEqual("bar"); 619 } 620 621 622 @("init NanoSocket after construction") 623 unittest { 624 NanoSocket pull; 625 NanoSocket push; 626 627 pull.initialize(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_init_after")); 628 push.initialize(NanoSocket.Protocol.push, ConnectTo("ipc://nanomsg_ipc_init_after")); 629 630 pull.setOption(NanoSocket.Option.receiveTimeoutMs, 10); 631 push.setOption(NanoSocket.Option.sendTimeoutMs, 10); 632 633 push.send("foo"); 634 push.send("bar"); 635 636 pull.receive.shouldEqual("foo"); 637 pull.receive.shouldEqual("bar"); 638 } 639 640 @("Can init twice") 641 unittest { 642 NanoSocket pull; 643 pull.initialize(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_init_twice")); 644 pull.initialize(NanoSocket.Protocol.pull, BindTo("ipc://nanomsg_ipc_init_twice")); 645 } 646 647 @("Non-initialised NanoSocket throws on send") 648 unittest { 649 enum uri = "ipc://nanomsg_init_send_throws"; 650 auto pull = NanoSocket(NanoSocket.Protocol.pull, BindTo(uri)); 651 NanoSocket push; 652 push.send("foo").shouldThrow; 653 }