1 /** 2 nanomsg is a socket library that provides several common communication patterns. It aims to make the networking layer fast, scalable, and easy to use. 3 Implemented in C, it works on a wide range of operating systems with no further dependencies. 4 5 This module implements a convenience wrapper API for nanomsg 6 7 Authors: Laeeth Isharc and Atila Neves (Kaleidic Associates Advisory Limited) 8 9 Example code: 10 http://nanomsg.code.kaleidic.io/nanomsg.wrap.responder.html 11 http://nanomsg.code.kaleidic.io/nanomsg.wrap.checkNanoSocket.html 12 http://nanomsg.code.kaleidic.io/examples/nanomsg.examples.html 13 14 */ 15 module nanomsg.wrap; 16 17 18 import nanomsg.bindings; 19 import concepts: models; 20 public import std.typecons: Yes, No; // to facilitate using send, receive 21 22 23 /// wrapper for a string uri to connect to 24 struct ConnectTo { 25 string uri; 26 } 27 28 /// wrapper for a string uri to bind to 29 struct BindTo { 30 31 this(in string uri) @safe pure { 32 this([uri]); 33 } 34 35 this(in string[] uris) @safe pure { 36 this.uris = uris.dup; 37 } 38 39 string[] uris; 40 } 41 42 struct TotalDuration { 43 import std.datetime: Duration; 44 Duration value; 45 alias value this; 46 } 47 48 struct RetryDuration { 49 import std.datetime: Duration; 50 Duration value; 51 alias value this; 52 } 53 54 55 /** 56 57 NanoSocket - high level wrapper for a nanomsg socket 58 59 */ 60 @models!(NanoSocket, isNanoSocket) 61 struct NanoSocket { 62 63 import std.traits: isArray; 64 import std.typecons: Flag; 65 import std.datetime: Duration; 66 67 /// nanomsg protocol 68 enum Protocol { 69 request, 70 response, 71 subscribe, 72 publish, 73 pull, 74 push, 75 pair, 76 surveyor, 77 respondent, 78 bus, 79 } 80 81 /// nanomsg socket options 82 enum Option { 83 lingerMs, /// How long to try and send pending messages after nn_close. -1 means infinite 84 sendBufferSize, // Size of the send buffer in bytes 85 receiveBufferSize, // Size of the receive buffer in bytes 86 receiveMaxSize, /// Maximum message size that can be received, in bytes 87 sendTimeoutMs, /// How long in milliseconds it takes for send to timeout 88 receiveTimeoutMs, /// How long in milliseconds it takes for receive to timeout 89 reconnectIntervalMs, /// How long to wait to re-establish connection 90 reconnectIntervalMax, /// Maximum reconnect interval 91 sendPriority, /// Outbound priority for endpoints added to socket 92 receivePriority, /// Inbout priority for endpoints added to socket 93 ipv4Only, /// Self-explanatory 94 socketName, /// Socket name for error reporting and statistics 95 timeToLive, /// Number of hops before message is dropped 96 subscribeTopic, /// Subscribe to topic 97 unsubscribeTopic, /// Unsubscribe to topic 98 tcpNoDelay, /// Disables Nagle's algorithm 99 surveyorDeadlineMs, /// How long to wait for responses in milliseconds 100 } 101 102 /// this(this) disabled to avoid sockets being destroyed 103 @disable this(this); 104 105 /// invalid FD 106 enum INVALID_FD = -1; 107 108 /// constructor 109 this(in Protocol protocol, in int domain = AF_SP) @safe { 110 initialize(protocol, domain); 111 } 112 113 /// constructor 114 this(in Protocol protocol, in BindTo bindTo, int domain = AF_SP) @safe { 115 initialize(protocol, bindTo, domain); 116 } 117 118 /// constructor 119 this(in Protocol protocol, in ConnectTo connectTo, int domain = AF_SP) @safe { 120 initialize(protocol, connectTo, domain); 121 } 122 123 /// destructor 124 ~this() @safe nothrow { 125 close; 126 } 127 128 /// Same as the revelant constructor, can be used on exisiting objects 129 void initialize(in Protocol protocol, in int domain = AF_SP) @trusted { 130 int protocolToInt(Protocol protocol) { 131 final switch(protocol) with(Protocol) { 132 case request: 133 return NN_REQ; 134 case response: 135 return NN_REP; 136 case publish: 137 return NN_PUB; 138 case subscribe: 139 return NN_SUB; 140 case pull: 141 return NN_PULL; 142 case push: 143 return NN_PUSH; 144 case pair: 145 return NN_PAIR; 146 case surveyor: 147 return NN_SURVEYOR; 148 case respondent: 149 return NN_RESPONDENT; 150 case bus: 151 return NN_BUS; 152 } 153 } 154 155 close; // init can be called twice 156 _nanoSock = nn_socket(domain, protocolToInt(protocol)); 157 _protocol = protocol; 158 enforceNanoMsgRet(_nanoSock); 159 } 160 161 /// Same as the revelant constructor, can be used on exisiting objects 162 void initialize(in Protocol protocol, in BindTo bindTo, int domain = AF_SP) @trusted { 163 import std.string: replace; 164 165 initialize(protocol, domain); 166 167 // this is so it's easy to specify the same string 168 // for both ends of the socket 169 foreach(uri; bindTo.uris) { 170 bind(uri.replace("localhost", "*")); 171 } 172 } 173 174 /// Same as the revelant constructor, can be used on exisiting objects 175 void initialize(in Protocol protocol, in ConnectTo connectTo, int domain = AF_SP) @trusted { 176 initialize(protocol, domain); 177 connect(connectTo.uri); 178 179 version(Windows) { 180 // on Windows sometimes the socket tries to send before the TCP handshake 181 import core.thread; 182 Thread.sleep(100.msecs); 183 } 184 } 185 186 /// close socket 187 void close() @trusted nothrow { 188 if(_nanoSock != INVALID_FD) { 189 _nanoSock.nn_close; 190 } 191 } 192 193 /// set socket option to a value 194 ref inout(NanoSocket) setOption(T)(Option option, T val) inout { 195 const optionC = toOptionC(option); 196 setOption(optionC.level, optionC.option, val); 197 return this; 198 } 199 200 /// get socket option value 201 T getOption(T)(Option option) const { 202 const optionC = toOptionC(option); 203 return getOption!T(optionC.level, optionC.option); 204 } 205 206 /** 207 Receive bytes on this socket. 208 Memory is allocated by nanomsg and deleted in the `NanoBuffer` destructor. 209 */ 210 NanoBuffer receive(Flag!"blocking" blocking = Yes.blocking, 211 in string file = __FILE__, 212 in size_t line = __LINE__) 213 @safe scope return const 214 { 215 static void[] buffer; 216 return receiveImpl(buffer, blocking, file, line); 217 } 218 219 /** 220 A version of `receive` that takes a user supplied buffer to fill 221 */ 222 void[] receive(return scope void[] buffer, 223 Flag!"blocking" blocking = Yes.blocking, 224 in string file = __FILE__, 225 in size_t line = __LINE__) 226 const @safe 227 { 228 auto ptr = &buffer[0]; 229 return receiveImpl(buffer, blocking, file, line).bytes; 230 } 231 232 233 /** 234 Sends the bytes as expected. If the protocol is Request, then returns 235 the response, otherwise returns an empty array. 236 */ 237 NanoBuffer send(T)(T[] data, 238 Flag!"blocking" blocking = Yes.blocking, 239 in string file = __FILE__, 240 in size_t line = __LINE__) 241 const 242 { 243 import std.conv: text; 244 245 const sent = () @trusted { return nn_send(_nanoSock, data.ptr, data.length, flags(blocking)); }(); 246 enforceNanoMsgRet(sent, file, line); 247 248 void[] empty; 249 return () @trusted { return _protocol == Protocol.request 250 ? receive(blocking) 251 : NanoBuffer(cast(void[]) data, false /*shouldDelete*/); 252 }(); 253 } 254 255 /** 256 Tries to send bytes to the other side. 257 duration is how long to try for 258 recvBlocking controls whether or not to block on reception of a response. 259 This only matters when the protocol is request/response 260 Returns the response if in request mode, otherwise an empty byte slice. 261 */ 262 auto trySend(T)(T[] data, Duration totalDuration, Flag!"blocking" recvBlocking = Yes.blocking) { 263 import std.datetime: msecs; 264 return trySend(data, TotalDuration(totalDuration), RetryDuration(10.msecs), recvBlocking); 265 } 266 267 /** 268 Tries to send bytes to the other side. 269 duration is how long to try for 270 recvBlocking controls whether or not to block on reception of a response. 271 This only matters when the protocol is request/response 272 Returns the response if in request mode, otherwise an empty byte slice. 273 */ 274 NanoBuffer trySend(T)(T[] data, 275 TotalDuration totalDuration, 276 RetryDuration retryDuration, 277 Flag!"blocking" recvBlocking = Yes.blocking) 278 { 279 import std.exception: enforce; 280 static if(__VERSION__ >= 2077) 281 import std.datetime.stopwatch: StopWatch, AutoStart; 282 else 283 import std.datetime: StopWatch, AutoStart; 284 import std.datetime: msecs; 285 import std.conv: text; 286 import core.thread: Thread; 287 288 int sent; 289 auto sw = StopWatch(AutoStart.yes); 290 do { 291 sent = () @trusted { return nn_send(_nanoSock, &data[0], data.length, flags(No.blocking)); }(); 292 if(sent != data.length) () @trusted { Thread.sleep(retryDuration); }(); 293 } while(sent != data.length && cast(Duration) sw.peek < totalDuration); 294 295 enforce(sent == data.length, 296 text("Expected to send ", data.length, " bytes but sent ", sent)); 297 298 return _protocol == Protocol.request ? receive(recvBlocking) : NanoBuffer(); 299 } 300 301 /// connect 302 void connect(in string uri, in string file = __FILE__, in size_t line = __LINE__) { 303 import std.string: toStringz; 304 enforceNanoMsgRet(nn_connect(_nanoSock, uri.toStringz), file, line); 305 _uri = uri; 306 _connection = Connection.connected; 307 } 308 309 /// bind 310 void bind(in string uri, in string file = __FILE__, in size_t line = __LINE__) { 311 import std.string: toStringz; 312 enforceNanoMsgRet(nn_bind(_nanoSock, uri.toStringz), file, line); 313 _uri = uri; 314 _connection = Connection.bound; 315 } 316 317 /// get protocol 318 Protocol protocol() @safe @nogc pure const nothrow { 319 return _protocol; 320 } 321 322 /// get URI 323 string uri() @safe @nogc pure const nothrow { 324 return _uri; 325 } 326 327 /// toString 328 string toString() @safe pure const { 329 import std.conv: text; 330 331 if(_connection == Connection.none) 332 return text(protocol); 333 334 const connText = _connection == Connection.bound ? "@" : "@@"; 335 return text(_protocol, connText, _uri); 336 } 337 338 private: 339 340 enum Connection { 341 none, 342 bound, 343 connected, 344 } 345 346 int _nanoSock = INVALID_FD; 347 Protocol _protocol; 348 string _uri; 349 Connection _connection; 350 351 NanoBuffer receiveImpl(return scope void[] buffer, 352 Flag!"blocking" blocking = Yes.blocking, 353 in string file = __FILE__, 354 in size_t line = __LINE__) 355 @safe return scope const 356 { 357 import std.algorithm: min; 358 static import core.stdc.errno; 359 360 void* nanomsgBuffer = null; 361 // can't use &buffer[0] here since it might be empty 362 const haveBuffer = () @trusted { return buffer.ptr !is null; }(); 363 const shouldDelete = !haveBuffer; 364 365 auto recvPointer = () @trusted { 366 return haveBuffer ? &buffer[0] : cast(void*) &nanomsgBuffer; 367 }(); 368 369 370 const length = haveBuffer ? buffer.length : NN_MSG; 371 const numBytes = () @trusted { return nn_recv(_nanoSock, recvPointer, length, flags(blocking)); }(); 372 373 bool isErrnoEagain() @trusted { 374 return nn_errno == core.stdc.errno.EAGAIN; 375 } 376 377 if(blocking || (numBytes < 0 && !isErrnoEagain)) { 378 enforceNanoMsgRet(numBytes, file, line); 379 } 380 381 auto pointer = haveBuffer ? &buffer[0] : nanomsgBuffer; 382 const retSliceLength = haveBuffer ? min(numBytes, buffer.length) : numBytes; 383 384 return numBytes >= 0 385 ? NanoBuffer(() @trusted { return pointer[0 .. retSliceLength]; }(), shouldDelete) 386 : NanoBuffer(); 387 } 388 389 void enforceNanoMsgRet(E)(lazy E expr, string file = __FILE__, size_t line = __LINE__) @trusted const { 390 import std.conv: text; 391 const value = expr(); 392 if(value < 0) { 393 version(NanomsgWrapperNoGcException) { 394 import nogc: NoGcException; 395 NoGcException.throwNewWithFileAndLine( 396 file, line, "nanomsg expression failed with value ", numBytes, 397 " errno ", nn_errno, ", error: ", nn_strerror(nn_errno)); 398 } else { 399 throw new Exception(text("nanomsg expression failed with value ", value, 400 " errno ", nn_errno, ", error: ", nn_strerror(nn_errno)), 401 file, 402 line); 403 } 404 } 405 } 406 407 // the int level and option values needed by the nanomsg C API 408 static struct OptionC { 409 int level; 410 int option; 411 } 412 413 static OptionC toOptionC(Option option) @safe { 414 final switch(option) with(Option) { 415 case lingerMs: 416 return OptionC(NN_SOL_SOCKET, NN_LINGER); 417 418 case sendBufferSize: 419 return OptionC(NN_SOL_SOCKET, NN_SNDBUF); 420 421 case receiveBufferSize: 422 return OptionC(NN_SOL_SOCKET, NN_RCVBUF); 423 424 case receiveMaxSize: 425 return OptionC(NN_SOL_SOCKET, NN_RCVMAXSIZE); 426 427 case sendTimeoutMs: 428 return OptionC(NN_SOL_SOCKET, NN_SNDTIMEO); 429 430 case receiveTimeoutMs: 431 return OptionC(NN_SOL_SOCKET, NN_RCVTIMEO); 432 433 case reconnectIntervalMs: 434 return OptionC(NN_SOL_SOCKET, NN_RECONNECT_IVL); 435 436 case reconnectIntervalMax: 437 return OptionC(NN_SOL_SOCKET, NN_RECONNECT_IVL_MAX); 438 439 case sendPriority: 440 return OptionC(NN_SOL_SOCKET, NN_SNDPRIO); 441 442 case receivePriority: 443 return OptionC(NN_SOL_SOCKET, NN_RCVPRIO); 444 445 case ipv4Only: 446 return OptionC(NN_SOL_SOCKET, NN_IPV4ONLY); 447 448 case socketName: 449 return OptionC(NN_SOL_SOCKET, NN_SOCKET_NAME); 450 451 case timeToLive: 452 return OptionC(NN_SOL_SOCKET, NN_TTL); 453 454 case subscribeTopic: 455 return OptionC(NN_SUB, NN_SUB_SUBSCRIBE); 456 457 case unsubscribeTopic: 458 return OptionC(NN_SUB, NN_SUB_UNSUBSCRIBE); 459 460 case tcpNoDelay: 461 return OptionC(NN_TCP, NN_TCP_NODELAY); 462 463 case surveyorDeadlineMs: 464 return OptionC(NN_SURVEYOR, NN_SURVEYOR_DEADLINE); 465 } 466 } 467 468 void setOption(T)(int level, int option, ref T val) const if(isArray!T) { 469 const ret = () @trusted { return nn_setsockopt(_nanoSock, level, option, val.ptr, val.length); }(); 470 enforceNanoMsgRet(ret); 471 } 472 473 void setOption(T)(int level, int option, T val) const if(!isArray!T) { 474 const ret = () @trusted { return nn_setsockopt(_nanoSock, level, option, &val, val.sizeof); }(); 475 enforceNanoMsgRet(ret); 476 } 477 478 T getOption(T)(int level, int option) const if(isArray!T) { 479 import std.traits: Unqual; 480 import std.conv: to; 481 482 // ElementType!string is dchar, and we don't want that, 483 // so instead we use this typeof 484 alias U = Unqual!(typeof(T.init[0]))[1000]; 485 U val; 486 ulong length = val.length; 487 const ret = () @trusted { 488 return nn_getsockopt(_nanoSock, level, option, cast(void*) val.ptr, &length); 489 }(); 490 enforceNanoMsgRet(ret); 491 return val[0 .. length].to!T; 492 } 493 494 T getOption(T)(int level, int option) const if(!isArray!T) { 495 import std.exception: enforce; 496 import std.conv: text; 497 498 T val; 499 size_t length = T.sizeof; 500 const ret = () @trusted { return nn_getsockopt(_nanoSock, level, option, cast(void*) &val, &length); }(); 501 enforceNanoMsgRet(ret); 502 enforce(length == T.sizeof, 503 text("getsockopt returned ", length, " but sizeof(", T.stringof, ") is ", T.sizeof)); 504 return val; 505 } 506 507 static int flags(Flag!"blocking" blocking) @safe @nogc pure nothrow { 508 return blocking ? 0 : NN_DONTWAIT; 509 } 510 } 511 512 /// check nanomsg socket 513 void checkNanoSocket(T)() { 514 T s = T(NanoSocket.Protocol.subscribe, ConnectTo("foobar")); 515 s.send("foobar"); 516 s.setOption(NanoSocket.Option.subscribeTopic, "topic"); 517 s.setOption(NanoSocket.Option.receiveTimeoutMs, 100); 518 auto msg = s.receive(Yes.blocking); 519 void[] bytes = msg.bytes; 520 s.send(bytes); 521 } 522 523 enum isNanoSocket(T) = is(typeof(checkNanoSocket!T)); 524 525 526 /// RAII struct for nn_freemsg 527 struct NanoBuffer { 528 529 /// Could be allocated by nanomsg 530 void[] bytes; 531 private bool shouldDelete; 532 533 private this(void[] bytes, bool shouldDelete) @safe @nogc pure nothrow scope { 534 this.bytes = bytes; 535 this.shouldDelete = shouldDelete; 536 } 537 538 @disable this(this); 539 540 ~this() @trusted @nogc scope { 541 import nanomsg.bindings: nn_freemsg; 542 if(shouldDelete && bytes.length > 0) nn_freemsg(&bytes[0]); 543 } 544 }