1 module cassandra.cql.utils; 2 3 import std.array; 4 import std.bitmanip : bitfields; 5 import std.conv; 6 import std.exception : enforce; 7 import std.format : formattedWrite; 8 import std.range : isOutputRange; 9 import std.stdint; 10 import std.traits; 11 12 import cassandra.internal.utils; 13 import cassandra.internal.tcpconnection; 14 15 16 /* 17 * CQL BINARY PROTOCOL v1 18 * 19 * 20 *Table of Contents 21 * 22 * 1. Overview 23 * 2. Frame header 24 * 2.1. version 25 * 2.2. flags 26 * 2.3. stream 27 * 2.4. opcode 28 * 2.5. length 29 * 3. Notations 30 * 4. Messages 31 * 4.1. Requests 32 * 4.1.1. STARTUP 33 * 4.1.2. CREDENTIALS 34 * 4.1.3. OPTIONS 35 * 4.1.4. QUERY 36 * 4.1.5. PREPARE 37 * 4.1.6. EXECUTE 38 * 4.1.7. REGISTER 39 * 4.2. Responses 40 * 4.2.1. ERROR 41 * 4.2.2. READY 42 * 4.2.3. AUTHENTICATE 43 * 4.2.4. SUPPORTED 44 * 4.2.5. RESULT 45 * 4.2.5.1. Void 46 * 4.2.5.2. Rows 47 * 4.2.5.3. Set_keyspace 48 * 4.2.5.4. Prepared 49 * 4.2.5.5. Schema_change 50 * 4.2.6. EVENT 51 * 5. Compression 52 * 6. Collection types 53 * 7. Error codes 54 * 55 * 56 *1. Overview 57 * 58 * The CQL binary protocol is a frame based protocol. Frames are defined as: 59 * 60 * 0 8 16 24 32 61 * +---------+---------+---------+---------+ 62 * | version | flags | stream | opcode | 63 * +---------+---------+---------+---------+ 64 * | length | 65 * +---------+---------+---------+---------+ 66 * | | 67 * . ... body ... . 68 * . . 69 * . . 70 * +---------------------------------------- 71 * 72 * The protocol is big-endian (network byte order). 73 * 74 * Each frame contains a fixed size header (8 bytes) followed by a variable size 75 * body. The header is described in Section 2. The content of the body depends 76 * on the header opcode value (the body can in particular be empty for some 77 * opcode values). The list of allowed opcode is defined Section 2.3 and the 78 * details of each corresponding message is described Section 4. 79 * 80 * The protocol distinguishes 2 types of frames: requests and responses. Requests 81 * are those frame sent by the clients to the server, response are the ones sent 82 * by the server. Note however that while communication are initiated by the 83 * client with the server responding to request, the protocol may likely add 84 * server pushes in the future, so responses does not obligatory come right after 85 * a client request. 86 * 87 * Note to client implementors: clients library should always assume that the 88 * body of a given frame may contain more data than what is described in this 89 * document. It will however always be safe to ignore the remaining of the frame 90 * body in such cases. The reason is that this may allow to sometimes extend the 91 * protocol with optional features without needing to change the protocol 92 * version. 93 * 94 */ 95 96 /** 97 *2. Frame header 98 */ 99 package struct FrameHeader { 100 101 /** 102 *2.1. version 103 * 104 * The version is a single byte that indicate both the direction of the message 105 * (request or response) and the version of the protocol in use. The up-most bit 106 * of version is used to define the direction of the message: 0 indicates a 107 * request, 1 indicates a responses. This can be useful for protocol analyzers to 108 * distinguish the nature of the packet from the direction which it is moving. 109 * The rest of that byte is the protocol version (1 for the protocol defined in 110 * this document). In other words, for this version of the protocol, version will 111 * have one of: 112 * 0x01 Request frame for this protocol version 113 * 0x81 Response frame for this protocol version 114 */ 115 enum Version : ubyte { 116 V1Request = 0x01, 117 V1Response = 0x81, 118 V2Request = 0x02, 119 V2Response = 0x82 120 } 121 Version version_; 122 123 /** 124 *2.2. flags 125 * 126 * Flags applying to this frame. The flags have the following meaning (described 127 * by the mask that allow to select them): 128 * 0x01: Compression flag. If set, the frame body is compressed. The actual 129 * compression to use should have been set up beforehand through the 130 * Startup message (which thus cannot be compressed; Section 4.1.1). 131 * 0x02: Tracing flag. For a request frame, this indicate the client requires 132 * tracing of the request. Note that not all requests support tracing. 133 * Currently, only QUERY, PREPARE and EXECUTE queries support tracing. 134 * Other requests will simply ignore the tracing flag if set. If a 135 * request support tracing and the tracing flag was set, the response to 136 * this request will have the tracing flag set and contain tracing 137 * information. 138 * If a response frame has the tracing flag set, its body contains 139 * a tracing ID. The tracing ID is a [uuid] and is the first thing in 140 * the frame body. The rest of the body will then be the usual body 141 * corresponding to the response opcode. 142 * 143 * The rest of the flags is currently unused and ignored. 144 */ 145 mixin(bitfields!( 146 bool,"compress", 1, 147 bool,"trace", 1, 148 uint, "", 6 149 )); 150 bool hasTracing() { if (this.trace) return true; return false; } 151 152 /**2.3. stream 153 * 154 * A frame has a stream id (one signed byte). When sending request messages, this 155 * stream id must be set by the client to a positive byte (negative stream id 156 * are reserved for streams initiated by the server; currently all EVENT messages 157 * (section 4.2.6) have a streamId of -1). If a client sends a request message 158 * with the stream id X, it is guaranteed that the stream id of the response to 159 * that message will be X. 160 * 161 * This allow to deal with the asynchronous nature of the protocol. If a client 162 * sends multiple messages simultaneously (without waiting for responses), there 163 * is no guarantee on the order of the responses. For instance, if the client 164 * writes REQ_1, REQ_2, REQ_3 on the wire (in that order), the server might 165 * respond to REQ_3 (or REQ_2) first. Assigning different stream id to these 3 166 * requests allows the client to distinguish to which request an received answer 167 * respond to. As there can only be 128 different simultaneous stream, it is up 168 * to the client to reuse stream id. 169 * 170 * Note that clients are free to use the protocol synchronously (i.e. wait for 171 * the response to REQ_N before sending REQ_N+1). In that case, the stream id 172 * can be safely set to 0. Clients should also feel free to use only a subset of 173 * the 128 maximum possible stream ids if it is simpler for those 174 * implementation. 175 */ 176 byte streamid; bool isServerStream() { if (streamid < 0) return true; return false; } bool isEvent() { if (streamid==-1) return true; return false;} 177 178 /**2.4. opcode 179 * 180 * An integer byte that distinguish the actual message: 181 * 0x00 ERROR 182 * 0x01 STARTUP 183 * 0x02 READY 184 * 0x03 AUTHENTICATE 185 * 0x04 CREDENTIALS 186 * 0x05 OPTIONS 187 * 0x06 SUPPORTED 188 * 0x07 QUERY 189 * 0x08 RESULT 190 * 0x09 PREPARE 191 * 0x0A EXECUTE 192 * 0x0B REGISTER 193 * 0x0C EVENT 194 * 195 * Messages are described in Section 4. 196 */ 197 enum OpCode : ubyte { 198 error = 0x00, 199 startup = 0x01, 200 ready = 0x02, 201 authenticate = 0x03, 202 credentials = 0x04, 203 options = 0x05, 204 supported = 0x06, 205 query = 0x07, 206 result = 0x08, 207 prepare = 0x09, 208 execute = 0x0A, 209 register = 0x0B, 210 event = 0x0C 211 } 212 OpCode opcode; 213 214 bool isERROR() const pure nothrow { return opcode == OpCode.error; } 215 bool isSTARTUP() const pure nothrow { return opcode == OpCode.startup; } 216 bool isREADY() const pure nothrow { return opcode == OpCode.ready; } 217 bool isAUTHENTICATE() const pure nothrow { return opcode == OpCode.authenticate; } 218 bool isCREDENTIALS() const pure nothrow { return opcode == OpCode.credentials; } 219 bool isOPTIONS() const pure nothrow { return opcode == OpCode.options; } 220 bool isSUPPORTED() const pure nothrow { return opcode == OpCode.supported; } 221 bool isQUERY() const pure nothrow { return opcode == OpCode.query; } 222 bool isRESULT() const pure nothrow { return opcode == OpCode.result; } 223 bool isPREPARE() const pure nothrow { return opcode == OpCode.prepare; } 224 bool isEXECUTE() const pure nothrow { return opcode == OpCode.execute; } 225 bool isREGISTER() const pure nothrow { return opcode == OpCode.register; } 226 bool isEVENT() const pure nothrow { return opcode == OpCode.event; } 227 228 229 /** 230 *2.5. length 231 * 232 * A 4 byte integer representing the length of the body of the frame (note: 233 * currently a frame is limited to 256MB in length). 234 */ 235 int length; 236 237 238 ubyte[] bytes() { 239 import std.bitmanip : write; 240 import std.array : appender; 241 auto buffer = appender!(ubyte[])(); 242 foreach (i,v; this.tupleof) { 243 if (is( typeof(v) : int )) { 244 ubyte[] buf = [0,0,0,0,0,0,0,0]; 245 buf.write!(typeof(v))(v, 0); 246 buffer.put(buf[0..typeof(v).sizeof]); 247 } 248 } 249 return buffer.data; 250 } 251 } 252 253 package T readBigEndian(T)(TCPConnection conn, ref int counter) 254 { 255 import std.bitmanip : read; 256 ubyte[T.sizeof] buf; 257 ubyte[] rng = buf; 258 conn.read(rng); 259 counter -= T.sizeof; 260 return std.bitmanip.read!T(rng); 261 } 262 263 264 package int getIntLength(Appender!(ubyte[]) appender) { 265 enforce(appender.data.length < int.max); 266 return cast(int)appender.data.length; 267 } 268 269 package FrameHeader readFrameHeader(TCPConnection s, ref int counter) { 270 assert(counter == 0, to!string(counter) ~" bytes unread from last Frame"); 271 counter = int.max; 272 log("===================read frame header========================"); 273 auto fh = FrameHeader(); 274 fh.version_ = cast(FrameHeader.Version)readByte(s, counter); 275 readByte(s, counter); // FIXME: this should load into flags 276 fh.streamid = readByte(s, counter); 277 fh.opcode = cast(FrameHeader.OpCode)readByte(s, counter); 278 readIntNotNULL(fh.length, s, counter); 279 280 counter = fh.length; 281 log("=================== end read frame header==================="); 282 //writefln("go %d data to play", counter); 283 284 return fh; 285 } 286 287 package byte readByte(TCPConnection s, ref int counter) { 288 ubyte[1] buf; 289 auto tmp = buf[0..$]; 290 s.read(tmp); 291 counter--; 292 return buf[0]; 293 } 294 295 /** 296 *3. Notations 297 * 298 * To describe the layout of the frame body for the messages in Section 4, we 299 * define the following: 300 * 301 * [int] A 4 bytes integer 302 */ 303 private int* readInt(ref int ptr, TCPConnection s, ref int counter) { 304 import std.bitmanip : read; 305 ubyte[int.sizeof] buffer; 306 auto tmp = buffer[0..$]; 307 s.read(tmp); 308 auto buf = buffer[0..int.sizeof]; 309 310 ptr = buf.read!int(); 311 //writefln("readInt %d %s", ptr, buffer); 312 /*if (r >= int.max) while (true) { 313 buffer[] = [0,0,0,0]; 314 auto n1 = s.read(buffer); 315 writefln("readInt bork %s bytes:%d", buffer, n1); 316 buf = buffer[0..int.sizeof]; 317 r = buf.read!uint(); 318 }*/ 319 counter -= int.sizeof; 320 if (ptr == -1) { 321 //throw new Exception("NULL"); 322 return null; 323 } 324 return &ptr; 325 } 326 package int readIntNotNULL(ref int ptr, TCPConnection s, ref int counter) { 327 auto tmp = readInt(ptr, s, counter); 328 if (tmp is null) throw new Exception("NullException"); 329 return *tmp; 330 } 331 /*void write(TCPConnection s, int n) { 332 import std.bitmanip : write; 333 334 ubyte[] buffer = [0,0,0,0,0,0,0,0]; 335 buffer.write!int(n,0); 336 337 if (s.send(buffer[0..n.sizeof]) != n.sizeof) { 338 throw new Exception("send failed", s.getErrorText); 339 } 340 writefln("wrote int %s vs %d", buffer, n); 341 }*/ 342 343 /// [short] A 2 bytes unsigned integer 344 package short readShort(TCPConnection s, ref int counter) { 345 import std.bitmanip : read; 346 ubyte[short.sizeof] buffer; 347 auto tmp = buffer[]; 348 s.read(tmp); 349 auto buf = buffer[0..short.sizeof]; 350 auto r = buf.read!ushort(); 351 //writefln("readShort %d @ %d", r, counter); 352 counter -= short.sizeof; 353 return cast(short)r; 354 } 355 /*void write(TCPConnection s, short n) { 356 import std.bitmanip : write; 357 358 ubyte[] buffer = [0,0,0,0,0,0,0,0]; 359 buffer.write!short(n,0); 360 361 if (s.send(buffer[0..n.sizeof]) != n.sizeof) { 362 throw new Exception("send failed", s.getErrorText); 363 } 364 writefln("wrote short %s vs %d", buffer, n); 365 }*/ 366 /** [string] A [short] n, followed by n bytes representing an UTF-8 367 * string. 368 */ 369 package ubyte[] readRawBytes(TCPConnection s, ref int counter, int len) { 370 ubyte[] buf = new ubyte[](len); 371 auto tmp = buf[]; 372 s.read(tmp); 373 counter -= buf.length; 374 return buf; 375 } 376 package string readShortString(TCPConnection s, ref int counter) { 377 auto len = readShort(s, counter); 378 if (len < 0) { return null; } 379 380 //writefln("readString %d @ %d", len, counter); 381 auto bytes = readRawBytes(s, counter, len); 382 string str = cast(string)bytes[0..len]; 383 return str; 384 } 385 /*void write(TCPConnection s, string str) { 386 writeln("writing string"); 387 if (str.length < short.max) { 388 write(s, cast(short)str.length); 389 } else if (str.length < int.max) { 390 write(s, cast(int)str.length); 391 } 392 if (s.send(cast(ubyte[])str.ptr[0..str.length]) != str.length) { 393 throw new Exception("send failed", s.getErrorText); 394 } 395 writeln("wrote string"); 396 }*/ 397 398 /// [long string] An [int] n, followed by n bytes representing an UTF-8 string. 399 package string readLongString(TCPConnection s, ref int counter) { 400 int len; 401 auto tmp = readInt(len, s, counter); 402 if (tmp is null) { return null; } 403 404 log("readString %d @ %d", len, counter); 405 auto bytes = readRawBytes(s, counter, len); 406 string str = cast(string)bytes[0..len]; 407 return str; 408 } 409 410 411 /** [uuid] A 16 bytes long uuid. 412 * [string list] A [short] n, followed by n [string]. 413 */ 414 alias string[] StringList; 415 package StringList readStringList(TCPConnection s, ref int counter) { 416 StringList ret; 417 auto len = readShort(s, counter); 418 419 for (int i=0; i<len && counter>0; i++) { 420 ret ~= readShortString(s, counter); 421 } 422 if (ret.length < len && counter <= 0) 423 throw new Exception("ran out of data"); 424 return ret; 425 } 426 427 /** [bytes] A [int] n, followed by n bytes if n >= 0. If n < 0, 428 * no byte should follow and the value represented is `null`. 429 */ 430 package ubyte[] readIntBytes(TCPConnection s, ref int counter) { 431 int len; 432 auto tmp = readInt(len, s, counter); 433 if (tmp is null) { 434 //writefln("reading (null) bytes"); 435 return null; 436 } 437 //writefln("reading int(%d) bytes", len); 438 439 440 ubyte[] buf = new ubyte[](len); 441 s.read(buf); 442 //writefln("got bytes: %s", cast(char[])buf); 443 counter -= buf.length; 444 return buf; 445 } 446 package void appendIntBytes(T, R)(R appender, T data) 447 if (isOutputRange!(R, ubyte)) 448 { 449 static if (is(T == string) || is(T == ubyte[]) || is(T == byte[])) { 450 assert(data.length < int.max); 451 append(appender, cast(int)data.length); 452 append(appender, cast(ubyte[])data.ptr[0..data.length]); 453 } else static if (isArray!T) { 454 assert(data.length < uint.max); 455 auto tmpapp = std.array.appender!(ubyte[])(); 456 tmpapp.appendRawBytes(cast(ushort)data.length); 457 foreach (item; data) { 458 assert(item.length < ushort.max); 459 tmpapp.appendShortBytes(item); 460 } 461 appender.appendIntBytes(tmpapp.data); 462 } else static if (isAssociativeArray!T) { 463 assert(data.length < uint.max); 464 auto tmpapp = std.array.appender!(ubyte[])(); 465 tmpapp.appendRawBytes(cast(ushort)data.length); 466 foreach (key,value; data) { 467 tmpapp.appendShortBytes(key); 468 tmpapp.appendShortBytes(value); 469 } 470 appender.appendIntBytes(tmpapp.data); 471 } else static if (isIntegral!T || is(T == double) || is(T == float) || is(T == ushort)) { 472 import std.bitmanip : write; 473 474 assert(T.sizeof < int.max); 475 ubyte[] buffer = [0, 0, 0, 0, 0, 0, 0, 0]; 476 append(appender, cast(int)T.sizeof); 477 buffer.write!(T)(data,0); 478 appender.append(buffer[0..T.sizeof]); 479 } else static if (isBoolean!T) { 480 import std.bitmanip : write; 481 482 ubyte[] buffer = [0, 0, 0, 0, 0, 0, 0, 0]; 483 append(appender, cast(int)1); 484 if (data) 485 buffer.write!(byte)(1,0); 486 else 487 buffer.write!(byte)(0,0); 488 appender.append(buffer[0 .. 1]); 489 } else { 490 static assert(0, "can't append raw bytes for type: "~ T.stringof); 491 } 492 } 493 494 /** [short bytes] A [short] n, followed by n bytes if n >= 0. 495 */ 496 package ubyte[] readShortBytes(TCPConnection s, ref int counter) { 497 auto len = readShort(s, counter); 498 if (len==0) { return null; } 499 500 ubyte[] buf = new ubyte[](len); 501 s.read(buf); 502 counter -= buf.length; 503 return buf; 504 } 505 package void appendShortBytes(T, R)(R appender, T data) 506 if (isOutputRange!(R, ubyte)) 507 { 508 static if (is (T : const(ubyte)[]) || is (T == string)) { 509 assert(data.length < short.max); 510 append(appender, cast(short)data.length); 511 512 513 append(appender, cast(ubyte[])data.ptr[0..data.length]); 514 } else { 515 static assert(0, "appendShortBytes can't append type: "~ T.stringof); 516 } 517 } 518 519 /** 520 * [option] A pair of <id><value> where <id> is a [short] representing 521 * the option id and <value> depends on that option (and can be 522 * of size 0). The supported id (and the corresponding <value>) 523 * will be described when this is used. 524 */ 525 struct Option { 526 /// See Section: 4.2.5.2. 527 enum Type { 528 custom = 0x0000, 529 ascii = 0x0001, 530 bigInt = 0x0002, 531 blob = 0x0003, 532 boolean = 0x0004, 533 counter = 0x0005, 534 decimal = 0x0006, 535 double_ = 0x0007, 536 float_ = 0x0008, 537 int_ = 0x0009, 538 text = 0x000A, 539 timestamp = 0x000B, 540 uuid = 0x000C, 541 varChar = 0x000D, 542 varInt = 0x000E, 543 timeUUID = 0x000F, 544 inet = 0x0010, 545 list = 0x0020, 546 map = 0x0021, 547 set = 0x0022 548 } 549 Type id; 550 union { 551 string string_value; 552 Option* option_value; 553 Option*[2] key_values_option_value; 554 } 555 556 string toString() { 557 auto buf = appender!string(); 558 formattedWrite(buf, "%s ", id); 559 if (id == Option.Type.custom) { 560 formattedWrite(buf, "%s", string_value); 561 } else if (id == Option.Type.list || id == Option.Type.set) { 562 formattedWrite(buf, "%s", option_value); 563 } else if (id == Option.Type.map) { 564 formattedWrite(buf, "%s[%s]", key_values_option_value[1], key_values_option_value[0]); 565 } 566 return buf.data; 567 } 568 569 } 570 571 572 /** [option list] A [short] n, followed by n [option]. 573 * [inet] An address (ip and port) to a node. It consists of one 574 * [byte] n, that represents the address size, followed by n 575 * [byte] representing the IP address (in practice n can only be 576 * either 4 (IPv4) or 16 (IPv6)), following by one [int] 577 * representing the port. 578 * [consistency] A consistency level specification. This is a [short] 579 * representing a consistency level with the following 580 * correspondance: 581 * 0x0000 ANY 582 * 0x0001 ONE 583 * 0x0002 TWO 584 * 0x0003 THREE 585 * 0x0004 QUORUM 586 * 0x0005 ALL 587 * 0x0006 LOCAL_QUORUM 588 * 0x0007 EACH_QUORUM 589 */ 590 enum Consistency : ushort { 591 any = 0x0000, 592 one = 0x0001, 593 two = 0x0002, 594 three = 0x0003, 595 quorum = 0x0004, 596 all = 0x0005, 597 localQuorum = 0x0006, 598 eachQuorum = 0x0007 599 } 600 601 /** 602 * [string map] A [short] n, followed by n pair <k><v> where <k> and <v> are [string]. 603 */ 604 alias string[string] StringMap; 605 606 package void append(R, Args...)(R appender, Args args) 607 if (isOutputRange!(R, ubyte)) 608 { 609 import std.bitmanip : write; 610 611 ubyte[] buffer = [0, 0, 0, 0, 0, 0, 0, 0]; 612 //writeln(typeof(args).stringof); 613 foreach (arg; args) { 614 //writeln(typeof(arg).stringof); 615 static if (is(typeof(arg) == ubyte[])) { 616 appender.put(arg); 617 //writefln("appended type: %s as: %s", typeof(arg).stringof, appender.data[appender.data.length-arg.length..$]); 618 } else static if (is(typeof(arg) == short) || is(typeof(arg) == int) || is(typeof(arg) == long) || is(typeof(arg) == ulong) || is(typeof(arg) == double)) { 619 buffer.write!(typeof(arg))(arg,0); 620 appender.put(buffer[0..typeof(arg).sizeof]); 621 //writefln("appended type: %s as: %s", typeof(arg).stringof, appender.data[appender.data.length-typeof(arg).sizeof..$]); 622 } else static if (is(typeof(arg) == string)) { 623 assert(arg.length < short.max); 624 appender.append(cast(short)arg.length); 625 626 appender.append(cast(ubyte[])arg[0..arg.length]); 627 //writefln("appended type: %s as: %s", typeof(arg).stringof, appender.data[appender.data.length-arg.length..$]); 628 } else static if (__traits(compiles, mixin("appendOverride(appender, arg)"))) {//hasUFCSmember!(typeof(arg),"toBytes")) { 629 auto oldlen = appender.data.length; 630 appendOverride(appender, arg); 631 //writefln("appended type: %s as: %s", typeof(arg).stringof, appender.data[oldlen..$]); 632 } else { 633 static assert(0, "cannot handle append of "~ typeof(arg).stringof); 634 } 635 } 636 } 637 //todo: add all the append functions features to append!T(appender,T) 638 package void appendRawBytes(T, R)(R appender, T data) 639 if (isOutputRange!(R, ubyte)) 640 { 641 import std.bitmanip : write; 642 static if (is (T == ushort) || is(T==uint) || is(T==int)) { 643 ubyte[] buffer = [0, 0, 0, 0, 0, 0, 0, 0]; 644 buffer.write!(T)(data,0); 645 appender.append(buffer[0..T.sizeof]); 646 } else { 647 static assert(0, "can't use appendRawBytes on type: "~T.stringof); 648 } 649 } 650 651 package void appendLongString(R)(R appender, string data) 652 if (isOutputRange!(R, ubyte)) 653 { 654 assert(data.length < int.max); 655 append(appender, cast(int)data.length); 656 append(appender, cast(ubyte[])data.ptr[0..data.length]); 657 } 658 659 package void appendOverride(R)(R appender, StringMap sm) 660 if (isOutputRange!(R, ubyte)) 661 { 662 assert(sm.length < short.max); 663 664 appender.append(cast(short)sm.length); 665 foreach (k,v; sm) { 666 appender.append(k); 667 appender.append(v); 668 } 669 } 670 package void appendOverride(R)(R appender, Consistency c) 671 if (isOutputRange!(R, ubyte)) 672 { 673 appender.append(cast(short)c); 674 } 675 676 package void appendOverride(R)(R appender, bool b) 677 if (isOutputRange!(R, ubyte)) 678 { 679 if (b) 680 appender.append(cast(int)0x00000001); 681 else 682 appender.append(cast(int)0x00000000); 683 } 684 package void appendOverride(R)(R appender, string[] strs) 685 if (isOutputRange!(R, ubyte)) 686 { 687 foreach (str; strs) { 688 appender.append(str); 689 } 690 } 691 692 /*auto append(Appender!(ubyte[]) appender, ubyte[] data) { 693 appender.put(data); 694 return appender; 695 } 696 auto append(Appender!(ubyte[]) appender, short data) { 697 import std.bitmanip : write; 698 699 ubyte[] buffer = [0,0,0,0,0,0,0,0]; 700 buffer.write!short(data,0); 701 appender.put(buffer[0..short.sizeof]); 702 return appender; 703 } 704 auto append(Appender!(ubyte[]) appender, int data) { 705 import std.bitmanip : write; 706 707 ubyte[] buffer = [0,0,0,0,0,0,0,0]; 708 buffer.write!int(data,0); 709 appender.put(buffer[0..int.sizeof]); 710 return appender; 711 } 712 auto append(Appender!(ubyte[]) appender, string data) { 713 assert(data.length < short.max); 714 append(appender, cast(short)data.length); 715 716 717 append(appender, cast(ubyte[])data.ptr[0..data.length]); 718 return appender; 719 } 720 721 auto append(Appender!(ubyte[]) appender, StringMap data) { 722 assert(data.length < short.max); 723 append(appender, cast(short)data.length); 724 foreach (k,v; data) { 725 append(appender, k); 726 append(appender, v); 727 } 728 729 return appender; 730 }*/ 731 732 /** [string multimap] A [short] n, followed by n pair <k><v> where <k> is a 733 * [string] and <v> is a [string list]. 734 */ 735 alias string[][string] StringMultiMap; 736 package StringMultiMap readStringMultiMap(TCPConnection s, ref int counter) { 737 //writefln("got %d to read", counter); 738 StringMultiMap smm; 739 auto count = readShort(s, counter); 740 for (int i=0; i<count && counter>0; i++) { 741 auto key = readShortString(s, counter); 742 auto values = readStringList(s, counter); 743 smm[key] = values; 744 } 745 if (smm.length < count && counter <= 0) 746 throw new Exception("ran out of data to read"); 747 748 return smm; 749 } 750 751 752 class Authenticator { 753 StringMap getCredentials() { 754 StringMap ret; 755 return ret; 756 } 757 } 758 759 Authenticator getAuthenticator(string type) { 760 // TODO: provide real authenticator types that work with the ones in Cassandra 761 return new Authenticator(); 762 } 763 764 765 766 767 768 class CQLException : Exception { 769 this(string s = "", string file=__FILE__, int line=__LINE__) { 770 super(s, file, line); 771 } 772 } 773 774 class CQLProtocolException : CQLException { 775 this(string s = "", string file=__FILE__, int line=__LINE__) { 776 super(s, file, line); 777 } 778 } 779 class NotImplementedException : CQLException { 780 this(string s = "Not Implemented", string file=__FILE__, int line=__LINE__) { 781 super(s, file, line); 782 } 783 } 784 785 786 template bestCassandraType(T) 787 { 788 import std.datetime; 789 static if (is (T == bool)) enum bestCassandraType = "boolean"; 790 else static if (is (T == int)) enum bestCassandraType = "int"; 791 else static if (is (T == long)) enum bestCassandraType = "bigint"; 792 else static if (is (T == float)) enum bestCassandraType = "float"; 793 else static if (is (T == double)) enum bestCassandraType = "double"; 794 else static if (is (T == string)) enum bestCassandraType = "text"; 795 else static if (is (T == ubyte[])) enum bestCassandraType = "blob"; 796 else static if (is (T == InetAddress)) enum bestCassandraType = "inet"; 797 else static if (is (T == InetAddress6)) enum bestCassandraType = "inet"; 798 else static if (is (T == DateTime)) enum bestCassandraType = "timestamp"; 799 else static assert(0, "Can't suggest a cassandra cql type for storing: "~T.stringof); 800 } 801 802 string toCQLString(T)(T value) 803 { 804 static if (is (T == bool) || is (T : long) || is (T : real)) 805 return to!string(value); 806 else static if (isSomeString!T) { 807 auto ret = appender!string(); 808 ret.reserve(value.length+2); 809 ret.put('\''); 810 foreach (dchar ch; value) { 811 if (ch == '\'') ret.put("''"); 812 else ret.put(ch); 813 } 814 ret.put('\''); 815 return ret.data; 816 } else static assert(false, "Type "~T.stringof~" isn't implemented."); 817 } 818 819 protected void log(Args...)(string s, Args args) 820 { 821 version (Have_vibe_d) { 822 import vibe.core.log; 823 logDebug(s, args); 824 } else { 825 import std.stdio; 826 writefln(s, args); 827 } 828 }