1 /** 2 nanomsg is a socket library that provides several common communication patterns. It aims to make the networking layer fast, scalable, and easy to use. 3 Implemented in C, it works on a wide range of operating systems with no further dependencies. 4 5 This module implements a convenience wrapper API for nanomsg 6 7 Authors: Laeeth Isharc and Atila Neves (Kaleidic Associates Advisory Limited) 8 9 Example code: 10 http://nanomsg.code.kaleidic.io/nanomsg.wrap.responder.html 11 http://nanomsg.code.kaleidic.io/nanomsg.wrap.checkNanoSocket.html 12 http://nanomsg.code.kaleidic.io/examples/nanomsg.examples.html 13 14 */ 15 module nanomsg.wrap; 16 17 18 import nanomsg.bindings; 19 import concepts: models; 20 public import std.typecons: Yes, No; // to facilitate using send, receive 21 22 23 /// wrapper for a string uri to connect to 24 struct ConnectTo { 25 string uri; 26 } 27 28 /// wrapper for a string uri to bind to 29 struct BindTo { 30 31 this(in string uri) @safe pure { 32 this([uri]); 33 } 34 35 this(in string[] uris) @safe pure { 36 this.uris = uris.dup; 37 } 38 39 string[] uris; 40 } 41 42 struct TotalDuration { 43 import std.datetime: Duration; 44 Duration value; 45 alias value this; 46 } 47 48 struct RetryDuration { 49 import std.datetime: Duration; 50 Duration value; 51 alias value this; 52 } 53 54 55 /** 56 57 NanoSocket - high level wrapper for a nanomsg socket 58 59 */ 60 @models!(NanoSocket, isNanoSocket) 61 struct NanoSocket { 62 63 import std.traits: isArray; 64 import std.typecons: Flag; 65 import std.datetime: Duration; 66 67 /// nanomsg protocol 68 enum Protocol { 69 request, 70 response, 71 subscribe, 72 publish, 73 pull, 74 push, 75 pair, 76 surveyor, 77 respondent, 78 bus, 79 } 80 81 /// nanomsg socket options 82 enum Option { 83 lingerMs, /// How long to try and send pending messages after nn_close. -1 means infinite 84 sendBufferSize, // Size of the send buffer in bytes 85 receiveBufferSize, // Size of the receive buffer in bytes 86 receiveMaxSize, /// Maximum message size that can be received, in bytes 87 sendTimeoutMs, /// How long in milliseconds it takes for send to timeout 88 receiveTimeoutMs, /// How long in milliseconds it takes for receive to timeout 89 reconnectIntervalMs, /// How long to wait to re-establish connection 90 reconnectIntervalMax, /// Maximum reconnect interval 91 sendPriority, /// Outbound priority for endpoints added to socket 92 receivePriority, /// Inbout priority for endpoints added to socket 93 ipv4Only, /// Self-explanatory 94 socketName, /// Socket name for error reporting and statistics 95 timeToLive, /// Number of hops before message is dropped 96 subscribeTopic, /// Subscribe to topic 97 unsubscribeTopic, /// Unsubscribe to topic 98 tcpNoDelay, /// Disables Nagle's algorithm 99 surveyorDeadlineMs, /// How long to wait for responses in milliseconds 100 } 101 102 /// this(this) disabled to avoid sockets being destroyed 103 @disable this(this); 104 105 /// invalid FD 106 enum INVALID_FD = -1; 107 108 /// constructor 109 this(in Protocol protocol, in int domain = AF_SP) @safe { 110 initialize(protocol, domain); 111 } 112 113 /// constructor 114 this(in Protocol protocol, in BindTo bindTo, int domain = AF_SP) @safe { 115 initialize(protocol, bindTo, domain); 116 } 117 118 /// constructor 119 this(in Protocol protocol, in ConnectTo connectTo, int domain = AF_SP) @safe { 120 initialize(protocol, connectTo, domain); 121 } 122 123 /// destructor 124 ~this() @safe nothrow { 125 close; 126 } 127 128 /// Same as the revelant constructor, can be used on exisiting objects 129 void initialize(in Protocol protocol, in int domain = AF_SP) @trusted { 130 int protocolToInt(Protocol protocol) { 131 final switch(protocol) with(Protocol) { 132 case request: 133 return NN_REQ; 134 case response: 135 return NN_REP; 136 case publish: 137 return NN_PUB; 138 case subscribe: 139 return NN_SUB; 140 case pull: 141 return NN_PULL; 142 case push: 143 return NN_PUSH; 144 case pair: 145 return NN_PAIR; 146 case surveyor: 147 return NN_SURVEYOR; 148 case respondent: 149 return NN_RESPONDENT; 150 case bus: 151 return NN_BUS; 152 } 153 } 154 155 close; // init can be called twice 156 _nanoSock = nn_socket(domain, protocolToInt(protocol)); 157 _protocol = protocol; 158 enforceNanoMsgRet(_nanoSock); 159 } 160 161 /// Same as the revelant constructor, can be used on exisiting objects 162 void initialize(in Protocol protocol, in BindTo bindTo, int domain = AF_SP) @trusted { 163 import std.string: replace; 164 165 initialize(protocol, domain); 166 167 // this is so it's easy to specify the same string 168 // for both ends of the socket 169 foreach(uri; bindTo.uris) { 170 bind(uri.replace("localhost", "*")); 171 } 172 } 173 174 /// Same as the revelant constructor, can be used on exisiting objects 175 void initialize(in Protocol protocol, in ConnectTo connectTo, int domain = AF_SP) @trusted { 176 initialize(protocol, domain); 177 connect(connectTo.uri); 178 179 version(Windows) { 180 // on Windows sometimes the socket tries to send before the TCP handshake 181 import core.thread; 182 Thread.sleep(100.msecs); 183 } 184 } 185 186 /// close socket 187 void close() @trusted nothrow { 188 if(_nanoSock != INVALID_FD) { 189 _nanoSock.nn_close; 190 } 191 } 192 193 /// set socket option to a value 194 ref inout(NanoSocket) setOption(T)(Option option, T val) inout { 195 const optionC = toOptionC(option); 196 setOption(optionC.level, optionC.option, val); 197 return this; 198 } 199 200 /// get socket option value 201 T getOption(T)(Option option) const { 202 const optionC = toOptionC(option); 203 return getOption!T(optionC.level, optionC.option); 204 } 205 206 /** 207 Receive bytes on this socket. 208 Memory is allocated by nanomsg and deleted in the `NanoBuffer` destructor. 209 */ 210 NanoBuffer receive(Flag!"blocking" blocking = Yes.blocking, 211 in string file = __FILE__, 212 in size_t line = __LINE__) 213 @safe scope return const 214 { 215 static void[] buffer; 216 return receiveImpl(buffer, blocking, file, line); 217 } 218 219 /** 220 A version of `receive` that takes a user supplied buffer to fill 221 */ 222 void[] receive(return scope void[] buffer, 223 Flag!"blocking" blocking = Yes.blocking, 224 in string file = __FILE__, 225 in size_t line = __LINE__) 226 const @safe 227 { 228 auto ptr = &buffer[0]; 229 return receiveImpl(buffer, blocking, file, line).bytes; 230 } 231 232 233 /** 234 Sends the bytes as expected. If the protocol is Request, then returns 235 the response, otherwise returns a view of the input data. 236 */ 237 NanoBuffer send(T)(T[] data, 238 Flag!"blocking" blocking = Yes.blocking, 239 in string file = __FILE__, 240 in size_t line = __LINE__) 241 const 242 { 243 import std.conv: text; 244 245 const sent = () @trusted { return nn_send(_nanoSock, data.ptr, data.length, flags(blocking)); }(); 246 enforceNanoMsgRet(sent, file, line); 247 248 void[] empty; 249 return () @trusted { return _protocol == Protocol.request 250 ? receive(blocking) 251 : NanoBuffer(cast(void[]) data, false /*shouldDelete*/); 252 }(); 253 } 254 255 /** 256 Tries to send bytes to the other side. 257 duration is how long to try for 258 recvBlocking controls whether or not to block on reception of a response. 259 This only matters when the protocol is request/response 260 Returns the response if in request mode, otherwise an empty byte slice. 261 */ 262 auto trySend(T)(T[] data, Duration totalDuration, Flag!"blocking" recvBlocking = Yes.blocking) { 263 import std.datetime: msecs; 264 return trySend(data, TotalDuration(totalDuration), RetryDuration(10.msecs), recvBlocking); 265 } 266 267 /** 268 Tries to send bytes to the other side. 269 duration is how long to try for 270 recvBlocking controls whether or not to block on reception of a response. 271 This only matters when the protocol is request/response 272 Returns the response if in request mode, otherwise an empty byte slice. 273 */ 274 NanoBuffer trySend(T)(T[] data, 275 TotalDuration totalDuration, 276 RetryDuration retryDuration, 277 Flag!"blocking" recvBlocking = Yes.blocking) 278 { 279 import std.exception: enforce; 280 static if(__VERSION__ >= 2077) 281 import std.datetime.stopwatch: StopWatch, AutoStart; 282 else 283 import std.datetime: StopWatch, AutoStart; 284 import std.datetime: msecs; 285 import std.conv: text; 286 import core.thread: Thread; 287 288 int sent; 289 auto sw = StopWatch(AutoStart.yes); 290 do { 291 sent = () @trusted { return nn_send(_nanoSock, &data[0], data.length, flags(No.blocking)); }(); 292 if(sent != data.length) () @trusted { Thread.sleep(retryDuration); }(); 293 } while(sent != data.length && cast(Duration) sw.peek < totalDuration); 294 295 enforce(sent == data.length, 296 text("Expected to send ", data.length, " bytes but sent ", sent)); 297 298 return _protocol == Protocol.request ? receive(recvBlocking) : NanoBuffer(); 299 } 300 301 /// connect 302 void connect(in string uri, in string file = __FILE__, in size_t line = __LINE__) { 303 import std.string: toStringz; 304 enforceNanoMsgRet(nn_connect(_nanoSock, uri.toStringz), file, line); 305 _uri = uri; 306 _connection = Connection.connected; 307 } 308 309 /// bind 310 void bind(in string uri, in string file = __FILE__, in size_t line = __LINE__) { 311 import std.string: toStringz; 312 enforceNanoMsgRet(nn_bind(_nanoSock, uri.toStringz), file, line); 313 _uri = uri; 314 _connection = Connection.bound; 315 } 316 317 /// get protocol 318 Protocol protocol() @safe @nogc pure const nothrow { 319 return _protocol; 320 } 321 322 /// get URI 323 string uri() @safe @nogc pure const nothrow { 324 return _uri; 325 } 326 327 /// toString 328 string toString() @safe pure const { 329 import std.conv: text; 330 331 if(_connection == Connection.none) 332 return text(protocol); 333 334 const connText = _connection == Connection.bound ? "@" : "@@"; 335 return text(_protocol, connText, _uri); 336 } 337 338 /// get the underlying nanomsg socket file descriptor. 339 /// Bear in mind that NanoSocket contains state about 340 /// the socket options, so changing these from the 341 /// raw fd will result in unpredictable behaviour 342 int nanoSock() @system pure const @nogc nothrow { 343 return _nanoSock; 344 } 345 346 private: 347 348 enum Connection { 349 none, 350 bound, 351 connected, 352 } 353 354 int _nanoSock = INVALID_FD; 355 Protocol _protocol; 356 string _uri; 357 Connection _connection; 358 359 NanoBuffer receiveImpl(return scope void[] buffer, 360 Flag!"blocking" blocking = Yes.blocking, 361 in string file = __FILE__, 362 in size_t line = __LINE__) 363 @safe return scope const 364 { 365 import std.algorithm: min; 366 static import core.stdc.errno; 367 368 void* nanomsgBuffer = null; 369 // can't use &buffer[0] here since it might be empty 370 const haveBuffer = () @trusted { return buffer.ptr !is null; }(); 371 const shouldDelete = !haveBuffer; 372 373 auto recvPointer = () @trusted { 374 return haveBuffer ? &buffer[0] : cast(void*) &nanomsgBuffer; 375 }(); 376 377 378 const length = haveBuffer ? buffer.length : NN_MSG; 379 const numBytes = () @trusted { return nn_recv(_nanoSock, recvPointer, length, flags(blocking)); }(); 380 381 bool isErrnoEagain() @trusted { 382 return nn_errno == core.stdc.errno.EAGAIN; 383 } 384 385 if(blocking || (numBytes < 0 && !isErrnoEagain)) { 386 enforceNanoMsgRet(numBytes, file, line); 387 } 388 389 auto pointer = haveBuffer ? &buffer[0] : nanomsgBuffer; 390 const retSliceLength = haveBuffer ? min(numBytes, buffer.length) : numBytes; 391 392 return numBytes >= 0 393 ? NanoBuffer(() @trusted { return pointer[0 .. retSliceLength]; }(), shouldDelete) 394 : NanoBuffer(); 395 } 396 397 // the int level and option values needed by the nanomsg C API 398 static struct OptionC { 399 int level; 400 int option; 401 } 402 403 static OptionC toOptionC(Option option) @safe { 404 final switch(option) with(Option) { 405 case lingerMs: 406 return OptionC(NN_SOL_SOCKET, NN_LINGER); 407 408 case sendBufferSize: 409 return OptionC(NN_SOL_SOCKET, NN_SNDBUF); 410 411 case receiveBufferSize: 412 return OptionC(NN_SOL_SOCKET, NN_RCVBUF); 413 414 case receiveMaxSize: 415 return OptionC(NN_SOL_SOCKET, NN_RCVMAXSIZE); 416 417 case sendTimeoutMs: 418 return OptionC(NN_SOL_SOCKET, NN_SNDTIMEO); 419 420 case receiveTimeoutMs: 421 return OptionC(NN_SOL_SOCKET, NN_RCVTIMEO); 422 423 case reconnectIntervalMs: 424 return OptionC(NN_SOL_SOCKET, NN_RECONNECT_IVL); 425 426 case reconnectIntervalMax: 427 return OptionC(NN_SOL_SOCKET, NN_RECONNECT_IVL_MAX); 428 429 case sendPriority: 430 return OptionC(NN_SOL_SOCKET, NN_SNDPRIO); 431 432 case receivePriority: 433 return OptionC(NN_SOL_SOCKET, NN_RCVPRIO); 434 435 case ipv4Only: 436 return OptionC(NN_SOL_SOCKET, NN_IPV4ONLY); 437 438 case socketName: 439 return OptionC(NN_SOL_SOCKET, NN_SOCKET_NAME); 440 441 case timeToLive: 442 return OptionC(NN_SOL_SOCKET, NN_TTL); 443 444 case subscribeTopic: 445 return OptionC(NN_SUB, NN_SUB_SUBSCRIBE); 446 447 case unsubscribeTopic: 448 return OptionC(NN_SUB, NN_SUB_UNSUBSCRIBE); 449 450 case tcpNoDelay: 451 return OptionC(NN_TCP, NN_TCP_NODELAY); 452 453 case surveyorDeadlineMs: 454 return OptionC(NN_SURVEYOR, NN_SURVEYOR_DEADLINE); 455 } 456 } 457 458 void setOption(T)(int level, int option, ref T val) const if(isArray!T) { 459 const ret = () @trusted { return nn_setsockopt(_nanoSock, level, option, val.ptr, val.length); }(); 460 enforceNanoMsgRet(ret); 461 } 462 463 void setOption(T)(int level, int option, T val) const if(!isArray!T) { 464 const ret = () @trusted { return nn_setsockopt(_nanoSock, level, option, &val, val.sizeof); }(); 465 enforceNanoMsgRet(ret); 466 } 467 468 T getOption(T)(int level, int option) const if(isArray!T) { 469 import std.traits: Unqual; 470 import std.conv: to; 471 472 // ElementType!string is dchar, and we don't want that, 473 // so instead we use this typeof 474 alias U = Unqual!(typeof(T.init[0]))[1000]; 475 U val; 476 ulong length = val.length; 477 const ret = () @trusted { 478 return nn_getsockopt(_nanoSock, level, option, cast(void*) val.ptr, &length); 479 }(); 480 enforceNanoMsgRet(ret); 481 return val[0 .. length].to!T; 482 } 483 484 T getOption(T)(int level, int option) const if(!isArray!T) { 485 import std.exception: enforce; 486 import std.conv: text; 487 488 T val; 489 size_t length = T.sizeof; 490 const ret = () @trusted { return nn_getsockopt(_nanoSock, level, option, cast(void*) &val, &length); }(); 491 enforceNanoMsgRet(ret); 492 enforce(length == T.sizeof, 493 text("getsockopt returned ", length, " but sizeof(", T.stringof, ") is ", T.sizeof)); 494 return val; 495 } 496 497 static int flags(Flag!"blocking" blocking) @safe @nogc pure nothrow { 498 return blocking ? 0 : NN_DONTWAIT; 499 } 500 } 501 502 int enforceNanoMsgRet(E)(lazy E expr, string file = __FILE__, size_t line = __LINE__) @trusted { 503 import std.conv: text; 504 const value = expr(); 505 if(value < 0) { 506 version(NanomsgWrapperNoGcException) { 507 import nogc: NoGcException; 508 NoGcException.throwNewWithFileAndLine( 509 file, line, "nanomsg expression failed with value ", numBytes, 510 " errno ", nn_errno, ", error: ", nn_strerror(nn_errno)); 511 } else { 512 throw new Exception(text("nanomsg expression failed with value ", value, 513 " errno ", nn_errno, ", error: ", nn_strerror(nn_errno)), 514 file, 515 line); 516 } 517 } 518 return value; 519 } 520 521 /// check nanomsg socket 522 void checkNanoSocket(T)() { 523 T s = T(NanoSocket.Protocol.subscribe, ConnectTo("foobar")); 524 s.send("foobar"); 525 s.setOption(NanoSocket.Option.subscribeTopic, "topic"); 526 s.setOption(NanoSocket.Option.receiveTimeoutMs, 100); 527 auto msg = s.receive(Yes.blocking); 528 void[] bytes = msg.bytes; 529 s.send(bytes); 530 } 531 532 enum isNanoSocket(T) = is(typeof(checkNanoSocket!T)); 533 534 535 /// RAII struct for nn_freemsg 536 struct NanoBuffer { 537 538 /// Could be allocated by nanomsg 539 void[] bytes; 540 private bool shouldDelete; 541 542 private this(void[] bytes, bool shouldDelete) @safe @nogc pure nothrow scope { 543 this.bytes = bytes; 544 this.shouldDelete = shouldDelete; 545 } 546 547 @disable this(this); 548 549 ~this() @trusted @nogc scope { 550 import nanomsg.bindings: nn_freemsg; 551 if(shouldDelete && bytes.length > 0) nn_freemsg(&bytes[0]); 552 } 553 }