1 module cassandra.cql.utils;
2 
3 import std.array;
4 import std.bitmanip : bitfields;
5 import std.conv;
6 import std.exception : enforce;
7 import std.format : formattedWrite;
8 import std.range : isOutputRange;
9 import std.stdint;
10 import std.traits;
11 
12 import cassandra.internal.utils;
13 import cassandra.internal.tcpconnection;
14 
15 
16 /*
17  *                             CQL BINARY PROTOCOL v1
18  *
19  *
20  *Table of Contents
21  *
22  *  1. Overview
23  *  2. Frame header
24  *    2.1. version
25  *    2.2. flags
26  *    2.3. stream
27  *    2.4. opcode
28  *    2.5. length
29  *  3. Notations
30  *  4. Messages
31  *    4.1. Requests
32  *      4.1.1. STARTUP
33  *      4.1.2. CREDENTIALS
34  *      4.1.3. OPTIONS
35  *      4.1.4. QUERY
36  *      4.1.5. PREPARE
37  *      4.1.6. EXECUTE
38  *      4.1.7. REGISTER
39  *    4.2. Responses
40  *      4.2.1. ERROR
41  *      4.2.2. READY
42  *      4.2.3. AUTHENTICATE
43  *      4.2.4. SUPPORTED
44  *      4.2.5. RESULT
45  *        4.2.5.1. Void
46  *        4.2.5.2. Rows
47  *        4.2.5.3. Set_keyspace
48  *        4.2.5.4. Prepared
49  *        4.2.5.5. Schema_change
50  *      4.2.6. EVENT
51  *  5. Compression
52  *  6. Collection types
53  *  7. Error codes
54  *
55  *
56  *1. Overview
57  *
58  *  The CQL binary protocol is a frame based protocol. Frames are defined as:
59  *
60  *      0         8        16        24        32
61  *      +---------+---------+---------+---------+
62  *      | version |  flags  | stream  | opcode  |
63  *      +---------+---------+---------+---------+
64  *      |                length                 |
65  *      +---------+---------+---------+---------+
66  *      |                                       |
67  *      .            ...  body ...              .
68  *      .                                       .
69  *      .                                       .
70  *      +----------------------------------------
71  *
72  *  The protocol is big-endian (network byte order).
73  *
74  *  Each frame contains a fixed size header (8 bytes) followed by a variable size
75  *  body. The header is described in Section 2. The content of the body depends
76  *  on the header opcode value (the body can in particular be empty for some
77  *  opcode values). The list of allowed opcode is defined Section 2.3 and the
78  *  details of each corresponding message is described Section 4.
79  *
80  *  The protocol distinguishes 2 types of frames: requests and responses. Requests
81  *  are those frame sent by the clients to the server, response are the ones sent
82  *  by the server. Note however that while communication are initiated by the
83  *  client with the server responding to request, the protocol may likely add
84  *  server pushes in the future, so responses does not obligatory come right after
85  *  a client request.
86  *
87  *  Note to client implementors: clients library should always assume that the
88  *  body of a given frame may contain more data than what is described in this
89  *  document. It will however always be safe to ignore the remaining of the frame
90  *  body in such cases. The reason is that this may allow to sometimes extend the
91  *  protocol with optional features without needing to change the protocol
92  *  version.
93  *
94  */
95 
96 /**
97  *2. Frame header
98  */
99 package struct FrameHeader {
100 
101 	/**
102 	 *2.1. version
103 	 *
104 	 *  The version is a single byte that indicate both the direction of the message
105 	 *  (request or response) and the version of the protocol in use. The up-most bit
106 	 *  of version is used to define the direction of the message: 0 indicates a
107 	 *  request, 1 indicates a responses. This can be useful for protocol analyzers to
108 	 *  distinguish the nature of the packet from the direction which it is moving.
109 	 *  The rest of that byte is the protocol version (1 for the protocol defined in
110 	 *  this document). In other words, for this version of the protocol, version will
111 	 *  have one of:
112 	 *    0x01    Request frame for this protocol version
113 	 *    0x81    Response frame for this protocol version
114 	 */
115 	enum Version : ubyte {
116 		V1Request = 0x01,
117 		V1Response =  0x81,
118 		V2Request = 0x02,
119 		V2Response = 0x82
120 	}
121 	Version version_;
122 
123 	/**
124 	 *2.2. flags
125 	 *
126 	 *  Flags applying to this frame. The flags have the following meaning (described
127 	 *  by the mask that allow to select them):
128 	 *    0x01: Compression flag. If set, the frame body is compressed. The actual
129 	 *          compression to use should have been set up beforehand through the
130 	 *          Startup message (which thus cannot be compressed; Section 4.1.1).
131 	 *    0x02: Tracing flag. For a request frame, this indicate the client requires
132 	 *          tracing of the request. Note that not all requests support tracing.
133 	 *          Currently, only QUERY, PREPARE and EXECUTE queries support tracing.
134 	 *          Other requests will simply ignore the tracing flag if set. If a
135 	 *          request support tracing and the tracing flag was set, the response to
136 	 *          this request will have the tracing flag set and contain tracing
137 	 *          information.
138 	 *          If a response frame has the tracing flag set, its body contains
139 	 *          a tracing ID. The tracing ID is a [uuid] and is the first thing in
140 	 *          the frame body. The rest of the body will then be the usual body
141 	 *          corresponding to the response opcode.
142 	 *
143 	 *  The rest of the flags is currently unused and ignored.
144 	 */
145 	mixin(bitfields!(
146 		bool,"compress", 1,
147 		bool,"trace", 1,
148 		uint, "", 6
149 		));
150 	bool hasTracing() { if (this.trace) return true; return false; }
151 
152 	/**2.3. stream
153 	 *
154 	 *  A frame has a stream id (one signed byte). When sending request messages, this
155 	 *  stream id must be set by the client to a positive byte (negative stream id
156 	 *  are reserved for streams initiated by the server; currently all EVENT messages
157 	 *  (section 4.2.6) have a streamId of -1). If a client sends a request message
158 	 *  with the stream id X, it is guaranteed that the stream id of the response to
159 	 *  that message will be X.
160 	 *
161 	 *  This allow to deal with the asynchronous nature of the protocol. If a client
162 	 *  sends multiple messages simultaneously (without waiting for responses), there
163 	 *  is no guarantee on the order of the responses. For instance, if the client
164 	 *  writes REQ_1, REQ_2, REQ_3 on the wire (in that order), the server might
165 	 *  respond to REQ_3 (or REQ_2) first. Assigning different stream id to these 3
166 	 *  requests allows the client to distinguish to which request an received answer
167 	 *  respond to. As there can only be 128 different simultaneous stream, it is up
168 	 *  to the client to reuse stream id.
169 	 *
170 	 *  Note that clients are free to use the protocol synchronously (i.e. wait for
171 	 *  the response to REQ_N before sending REQ_N+1). In that case, the stream id
172 	 *  can be safely set to 0. Clients should also feel free to use only a subset of
173 	 *  the 128 maximum possible stream ids if it is simpler for those
174 	 *  implementation.
175 	 */
176 	byte streamid; bool isServerStream() { if (streamid < 0) return true; return false; } bool isEvent() { if (streamid==-1) return true; return false;}
177 
178 	/**2.4. opcode
179 	 *
180 	 *  An integer byte that distinguish the actual message:
181 	 *    0x00    ERROR
182 	 *    0x01    STARTUP
183 	 *    0x02    READY
184 	 *    0x03    AUTHENTICATE
185 	 *    0x04    CREDENTIALS
186 	 *    0x05    OPTIONS
187 	 *    0x06    SUPPORTED
188 	 *    0x07    QUERY
189 	 *    0x08    RESULT
190 	 *    0x09    PREPARE
191 	 *    0x0A    EXECUTE
192 	 *    0x0B    REGISTER
193 	 *    0x0C    EVENT
194 	 *
195 	 *  Messages are described in Section 4.
196 	 */
197 	enum OpCode : ubyte {
198 		error         = 0x00,
199 		startup       = 0x01,
200 		ready         = 0x02,
201 		authenticate  = 0x03,
202 		credentials   = 0x04,
203 		options       = 0x05,
204 		supported     = 0x06,
205 		query         = 0x07,
206 		result        = 0x08,
207 		prepare       = 0x09,
208 		execute       = 0x0A,
209 		register      = 0x0B,
210 		event         = 0x0C
211 	}
212 	OpCode opcode;
213 
214 	bool isERROR() const pure nothrow { return opcode == OpCode.error; }
215 	bool isSTARTUP() const pure nothrow { return opcode == OpCode.startup; }
216 	bool isREADY() const pure nothrow { return opcode == OpCode.ready; }
217 	bool isAUTHENTICATE() const pure nothrow { return opcode == OpCode.authenticate; }
218 	bool isCREDENTIALS() const pure nothrow { return opcode == OpCode.credentials; }
219 	bool isOPTIONS() const pure nothrow { return opcode == OpCode.options; }
220 	bool isSUPPORTED() const pure nothrow { return opcode == OpCode.supported; }
221 	bool isQUERY() const pure nothrow { return opcode == OpCode.query; }
222 	bool isRESULT() const pure nothrow { return opcode == OpCode.result; }
223 	bool isPREPARE() const pure nothrow { return opcode == OpCode.prepare; }
224 	bool isEXECUTE() const pure nothrow { return opcode == OpCode.execute; }
225 	bool isREGISTER() const pure nothrow { return opcode == OpCode.register; }
226 	bool isEVENT() const pure nothrow { return opcode == OpCode.event; }
227 
228 
229 	/**
230 	 *2.5. length
231 	 *
232 	 *  A 4 byte integer representing the length of the body of the frame (note:
233 	 *  currently a frame is limited to 256MB in length).
234 	 */
235 	int length;
236 
237 
238 	ubyte[] bytes() {
239 		import std.bitmanip : write;
240 		import std.array : appender;
241 		auto buffer = appender!(ubyte[])();
242 		foreach (i,v; this.tupleof) {
243 			if (is( typeof(v) : int )) {
244 				ubyte[] buf = [0,0,0,0,0,0,0,0];
245 				buf.write!(typeof(v))(v, 0);
246 				buffer.put(buf[0..typeof(v).sizeof]);
247 			}
248 		}
249 		return buffer.data;
250 	}
251 }
252 
253 package T readBigEndian(T)(TCPConnection conn, ref int counter)
254 {
255 	static import std.bitmanip;
256 	ubyte[T.sizeof] buf;
257 	ubyte[] rng = buf;
258 	conn.read(rng);
259 	counter -= T.sizeof;
260 	return std.bitmanip.read!T(rng);
261 }
262 
263 
264 package int getIntLength(Appender!(ubyte[]) appender) {
265 	enforce(appender.data.length < int.max);
266 	return cast(int)appender.data.length;
267 }
268 
269 package FrameHeader readFrameHeader(TCPConnection s, ref int counter) {
270 	assert(counter == 0, to!string(counter) ~" bytes unread from last Frame");
271 	counter = int.max;
272 	log("===================read frame header========================");
273 	auto fh = FrameHeader();
274 	fh.version_ = cast(FrameHeader.Version)readByte(s, counter);
275 	readByte(s, counter); // FIXME: this should load into flags
276 	fh.streamid = readByte(s, counter);
277 	fh.opcode = cast(FrameHeader.OpCode)readByte(s, counter);
278 	readIntNotNULL(fh.length, s, counter);
279 
280 	counter = fh.length;
281 	log("=================== end read frame header===================");
282 	//writefln("go %d data to play", counter);
283 
284 	return fh;
285 }
286 
287 package byte readByte(TCPConnection s, ref int counter) {
288 	ubyte[1] buf;
289 	auto tmp = buf[0..$];
290 	s.read(tmp);
291 	counter--;
292 	return buf[0];
293 }
294 
295 /**
296  *3. Notations
297  *
298  *  To describe the layout of the frame body for the messages in Section 4, we
299  *  define the following:
300  *
301  *    [int]          A 4 bytes integer
302  */
303 private int* readInt(ref int ptr, TCPConnection s, ref int counter) {
304 	import std.bitmanip : read;
305 	ubyte[int.sizeof] buffer;
306 	auto tmp = buffer[0..$];
307 	s.read(tmp);
308 	auto buf = buffer[0..int.sizeof];
309 
310 	ptr = buf.read!int();
311 	//writefln("readInt %d %s", ptr, buffer);
312 	/*if (r >= int.max) while (true) {
313 		buffer[] = [0,0,0,0];
314 		auto n1 = s.read(buffer);
315 		writefln("readInt bork %s bytes:%d", buffer, n1);
316 		buf = buffer[0..int.sizeof];
317 		r = buf.read!uint();
318 	}*/
319 	counter -= int.sizeof;
320 	if (ptr == -1) {
321 		//throw new Exception("NULL");
322 		return null;
323 	}
324 	return &ptr;
325 }
326 package int readIntNotNULL(ref int ptr, TCPConnection s, ref int counter) {
327 	auto tmp = readInt(ptr, s, counter);
328 	if (tmp is null) throw new Exception("NullException");
329 	return *tmp;
330 }
331 /*void write(TCPConnection s, int n) {
332 	import std.bitmanip : write;
333 
334 	ubyte[] buffer = [0,0,0,0,0,0,0,0];
335 	buffer.write!int(n,0);
336 
337 	if (s.send(buffer[0..n.sizeof]) != n.sizeof) {
338 		throw new Exception("send failed", s.getErrorText);
339 	}
340 	writefln("wrote int %s vs %d", buffer, n);
341 }*/
342 
343 ///    [short]        A 2 bytes unsigned integer
344 package short readShort(TCPConnection s, ref int counter) {
345 	import std.bitmanip : read;
346 	ubyte[short.sizeof] buffer;
347 	auto tmp = buffer[];
348 	s.read(tmp);
349 	auto buf = buffer[0..short.sizeof];
350 	auto r = buf.read!ushort();
351 	//writefln("readShort %d @ %d", r, counter);
352 	counter -= short.sizeof;
353 	return cast(short)r;
354 }
355 /*void write(TCPConnection s, short n) {
356 	import std.bitmanip : write;
357 
358 	ubyte[] buffer = [0,0,0,0,0,0,0,0];
359 	buffer.write!short(n,0);
360 
361 	if (s.send(buffer[0..n.sizeof]) != n.sizeof) {
362 		throw new Exception("send failed", s.getErrorText);
363 	}
364 	writefln("wrote short %s vs %d", buffer, n);
365 }*/
366  /**    [string]       A [short] n, followed by n bytes representing an UTF-8
367  *                   string.
368  */
369 package ubyte[] readRawBytes(TCPConnection s, ref int counter, int len) {
370 	ubyte[] buf = new ubyte[](len);
371 	auto tmp = buf[];
372 	s.read(tmp);
373 	counter -= buf.length;
374 	return buf;
375 }
376 package string readShortString(TCPConnection s, ref int counter) {
377 	auto len = readShort(s, counter);
378 	if (len < 0) { return null; }
379 
380 	//writefln("readString %d @ %d", len, counter);
381 	auto bytes = readRawBytes(s, counter, len);
382 	string str = cast(string)bytes[0..len];
383 	return str;
384 }
385 /*void write(TCPConnection s, string str) {
386 	writeln("writing string");
387 	if (str.length < short.max) {
388 		write(s, cast(short)str.length);
389 	} else if (str.length < int.max) {
390 		write(s, cast(int)str.length);
391 	}
392 	if (s.send(cast(ubyte[])str.ptr[0..str.length]) != str.length) {
393 		throw new Exception("send failed", s.getErrorText);
394 	}
395 	writeln("wrote string");
396 }*/
397 
398 ///    [long string]  An [int] n, followed by n bytes representing an UTF-8 string.
399 package string readLongString(TCPConnection s, ref int counter) {
400 	int len;
401 	auto tmp = readInt(len, s, counter);
402 	if (tmp is null) { return null; }
403 
404 	log("readString %d @ %d", len, counter);
405 	auto bytes = readRawBytes(s, counter, len);
406 	string str = cast(string)bytes[0..len];
407 	return str;
408 }
409 
410 
411 /**    [uuid]         A 16 bytes long uuid.
412  *    [string list]  A [short] n, followed by n [string].
413  */
414 alias string[] StringList;
415 package StringList readStringList(TCPConnection s, ref int counter) {
416 	StringList ret;
417 	auto len = readShort(s, counter);
418 
419 	for (int i=0; i<len && counter>0; i++) {
420 		ret ~= readShortString(s, counter);
421 	}
422 	if (ret.length < len && counter <= 0)
423 		throw new Exception("ran out of data");
424 	return ret;
425 }
426 
427 /**    [bytes]        A [int] n, followed by n bytes if n >= 0. If n < 0,
428  *                   no byte should follow and the value represented is `null`.
429  */
430 package ubyte[] readIntBytes(TCPConnection s, ref int counter) {
431 	int len;
432 	auto tmp = readInt(len, s, counter);
433 	if (tmp is null) {
434 		//writefln("reading (null) bytes");
435 		return null;
436 	}
437 	//writefln("reading int(%d) bytes", len);
438 
439 
440 	ubyte[] buf = new ubyte[](len);
441 	s.read(buf);
442 	//writefln("got bytes: %s", cast(char[])buf);
443 	counter -= buf.length;
444 	return buf;
445 }
446 package void appendIntBytes(T, R)(R appender, T data)
447 	if (isOutputRange!(R, ubyte))
448 {
449 	static if (is(T == string) || is(T == ubyte[]) || is(T == byte[])) {
450 		assert(data.length < int.max);
451 		append(appender, cast(int)data.length);
452 		append(appender, cast(ubyte[])data.ptr[0..data.length]);
453 	} else static if (isArray!T) {
454 		assert(data.length < uint.max);
455 		auto tmpapp = std.array.appender!(ubyte[])();
456 		tmpapp.appendRawBytes(cast(ushort)data.length);
457 		foreach (item; data) {
458 			assert(item.length < ushort.max);
459 			tmpapp.appendShortBytes(item);
460 		}
461 		appender.appendIntBytes(tmpapp.data);
462 	} else static if (isAssociativeArray!T) {
463 		assert(data.length < uint.max);
464 		auto tmpapp = std.array.appender!(ubyte[])();
465 		tmpapp.appendRawBytes(cast(ushort)data.length);
466 		foreach (key,value; data) {
467 			tmpapp.appendShortBytes(key);
468 			tmpapp.appendShortBytes(value);
469 		}
470 		appender.appendIntBytes(tmpapp.data);
471 	} else static if (isIntegral!T || is(T == double) || is(T == float) || is(T == ushort)) {
472 		import std.bitmanip : write;
473 
474 		assert(T.sizeof < int.max);
475 		ubyte[] buffer = [0, 0, 0, 0, 0, 0, 0, 0];
476 		append(appender, cast(int)T.sizeof);
477 		buffer.write!(T)(data,0);
478 		appender.append(buffer[0..T.sizeof]);
479 	} else static if (isBoolean!T) {
480 		import std.bitmanip : write;
481 
482 		ubyte[] buffer = [0, 0, 0, 0, 0, 0, 0, 0];
483 		append(appender, cast(int)1);
484 		if (data)
485 			buffer.write!(byte)(1,0);
486 		else
487 			buffer.write!(byte)(0,0);
488 		appender.append(buffer[0 .. 1]);
489 	} else {
490 		static assert(0, "can't append raw bytes for type: "~ T.stringof);
491 	}
492 }
493 
494 /**    [short bytes]  A [short] n, followed by n bytes if n >= 0.
495  */
496 package ubyte[] readShortBytes(TCPConnection s, ref int counter) {
497 	auto len = readShort(s, counter);
498 	if (len==0) { return null; }
499 
500 	ubyte[] buf = new ubyte[](len);
501 	s.read(buf);
502 	counter -= buf.length;
503 	return buf;
504 }
505 package void appendShortBytes(T, R)(R appender, T data)
506 	if (isOutputRange!(R, ubyte))
507 {
508 	static if (is (T : const(ubyte)[]) || is (T == string)) {
509 		assert(data.length < short.max);
510 		append(appender, cast(short)data.length);
511 
512 
513 		append(appender, cast(ubyte[])data.ptr[0..data.length]);
514 	} else {
515 		static assert(0, "appendShortBytes can't append type: "~ T.stringof);
516 	}
517 }
518 
519 /**
520  *    [option]       A pair of <id><value> where <id> is a [short] representing
521  *                   the option id and <value> depends on that option (and can be
522  *                   of size 0). The supported id (and the corresponding <value>)
523  *                   will be described when this is used.
524  */
525 struct Option {
526 	/// See Section: 4.2.5.2.
527 	enum Type {
528 		custom = 0x0000,
529 		ascii = 0x0001,
530 		bigInt = 0x0002,
531 		blob = 0x0003,
532 		boolean = 0x0004,
533 		counter = 0x0005,
534 		decimal = 0x0006,
535 		double_ = 0x0007,
536 		float_ = 0x0008,
537 		int_ = 0x0009,
538 		text = 0x000A,
539 		timestamp = 0x000B,
540 		uuid = 0x000C,
541 		varChar = 0x000D,
542 		varInt = 0x000E,
543 		timeUUID = 0x000F,
544 		inet = 0x0010,
545 		list = 0x0020,
546 		map = 0x0021,
547 		set = 0x0022
548 	}
549 	Type id;
550 	union {
551 		string string_value;
552 		Option* option_value;
553 		Option*[2] key_values_option_value;
554 	}
555 
556 	string toString() {
557 		auto buf = appender!string();
558 		formattedWrite(buf, "%s ", id);
559 		if (id == Option.Type.custom) {
560 			formattedWrite(buf, "%s", string_value);
561 		} else if (id == Option.Type.list || id == Option.Type.set) {
562 			formattedWrite(buf, "%s", option_value);
563 		} else if (id == Option.Type.map) {
564 			formattedWrite(buf, "%s[%s]", key_values_option_value[1], key_values_option_value[0]);
565 		}
566 		return buf.data;
567 	}
568 
569 }
570 
571 
572 /**    [option list]  A [short] n, followed by n [option].
573  *    [inet]         An address (ip and port) to a node. It consists of one
574  *                   [byte] n, that represents the address size, followed by n
575  *                   [byte] representing the IP address (in practice n can only be
576  *                   either 4 (IPv4) or 16 (IPv6)), following by one [int]
577  *                   representing the port.
578  *    [consistency]  A consistency level specification. This is a [short]
579  *                   representing a consistency level with the following
580  *                   correspondance:
581  *                     0x0000    ANY
582  *                     0x0001    ONE
583  *                     0x0002    TWO
584  *                     0x0003    THREE
585  *                     0x0004    QUORUM
586  *                     0x0005    ALL
587  *                     0x0006    LOCAL_QUORUM
588  *                     0x0007    EACH_QUORUM
589  */
590 enum Consistency : ushort  {
591 	any         = 0x0000,
592 	one         = 0x0001,
593 	two         = 0x0002,
594 	three       = 0x0003,
595 	quorum      = 0x0004,
596 	all         = 0x0005,
597 	localQuorum = 0x0006,
598 	eachQuorum  = 0x0007
599 }
600 
601 /**
602  *    [string map]      A [short] n, followed by n pair <k><v> where <k> and <v> are [string].
603  */
604 alias string[string] StringMap;
605 
606 package void append(R, Args...)(R appender, Args args)
607 	if (isOutputRange!(R, ubyte))
608 {
609 	import std.bitmanip : write;
610 
611 	ubyte[] buffer = [0, 0, 0, 0, 0, 0, 0, 0];
612 	//writeln(typeof(args).stringof);
613 	foreach (arg; args) {
614 		//writeln(typeof(arg).stringof);
615 		static if (is(typeof(arg) == ubyte[])) {
616 			appender.put(arg);
617 			//writefln("appended type: %s as: %s", typeof(arg).stringof, appender.data[appender.data.length-arg.length..$]);
618 		} else static if (is(typeof(arg) == short) || is(typeof(arg) == int) || is(typeof(arg) == long) || is(typeof(arg) == ulong) || is(typeof(arg) == double)) {
619 			buffer.write!(typeof(arg))(arg,0);
620 			appender.put(buffer[0..typeof(arg).sizeof]);
621 			//writefln("appended type: %s as: %s", typeof(arg).stringof, appender.data[appender.data.length-typeof(arg).sizeof..$]);
622 		} else static if (is(typeof(arg) == string)) {
623 			assert(arg.length < short.max);
624 			appender.append(cast(short)arg.length);
625 
626 			appender.append(cast(ubyte[])arg[0..arg.length]);
627 			//writefln("appended type: %s as: %s", typeof(arg).stringof, appender.data[appender.data.length-arg.length..$]);
628 		} else static if (__traits(compiles, mixin("appendOverride(appender, arg)"))) {//hasUFCSmember!(typeof(arg),"toBytes")) {
629 			auto oldlen = appender.data.length;
630 			appendOverride(appender, arg);
631 			//writefln("appended type: %s as: %s", typeof(arg).stringof, appender.data[oldlen..$]);
632 		} else {
633 			static assert(0, "cannot handle append of "~ typeof(arg).stringof);
634 		}
635 	}
636 }
637 //todo: add all the append functions features to append!T(appender,T)
638 package void appendRawBytes(T, R)(R appender, T data)
639 	if (isOutputRange!(R, ubyte))
640 {
641 	import std.bitmanip : write;
642 	static if (is (T == ushort) || is(T==uint) || is(T==int)) {
643 		ubyte[] buffer = [0, 0, 0, 0, 0, 0, 0, 0];
644 		buffer.write!(T)(data,0);
645 		appender.append(buffer[0..T.sizeof]);
646 	} else {
647 		static assert(0, "can't use appendRawBytes on type: "~T.stringof);
648 	}
649 }
650 
651 package void appendLongString(R)(R appender, string data)
652 	if (isOutputRange!(R, ubyte))
653 {
654 	assert(data.length < int.max);
655 	append(appender, cast(int)data.length);
656 	append(appender, cast(ubyte[])data.ptr[0..data.length]);
657 }
658 
659 package void appendOverride(R)(R appender, StringMap sm)
660 	if (isOutputRange!(R, ubyte))
661 {
662 	assert(sm.length < short.max);
663 
664 	appender.append(cast(short)sm.length);
665 	foreach (k,v; sm) {
666 		appender.append(k);
667 		appender.append(v);
668 	}
669 }
670 package void appendOverride(R)(R appender, Consistency c)
671 	if (isOutputRange!(R, ubyte))
672 {
673 	appender.append(cast(short)c);
674 }
675 
676 package void appendOverride(R)(R appender, bool b)
677 	if (isOutputRange!(R, ubyte))
678 {
679 	if (b)
680 		appender.append(cast(int)0x00000001);
681 	else
682 		appender.append(cast(int)0x00000000);
683 }
684 package void appendOverride(R)(R appender, string[] strs)
685 	if (isOutputRange!(R, ubyte))
686 {
687 	foreach (str; strs) {
688 		appender.append(str);
689 	}
690 }
691 
692 /*auto append(Appender!(ubyte[]) appender, ubyte[] data) {
693 	appender.put(data);
694 	return appender;
695 }
696 auto append(Appender!(ubyte[]) appender, short data) {
697 	import std.bitmanip : write;
698 
699 	ubyte[] buffer = [0,0,0,0,0,0,0,0];
700 	buffer.write!short(data,0);
701 	appender.put(buffer[0..short.sizeof]);
702 	return appender;
703 }
704 auto append(Appender!(ubyte[]) appender, int data) {
705 	import std.bitmanip : write;
706 
707 	ubyte[] buffer = [0,0,0,0,0,0,0,0];
708 	buffer.write!int(data,0);
709 	appender.put(buffer[0..int.sizeof]);
710 	return appender;
711 }
712 auto append(Appender!(ubyte[]) appender, string data) {
713 	assert(data.length < short.max);
714 	append(appender, cast(short)data.length);
715 
716 
717 	append(appender, cast(ubyte[])data.ptr[0..data.length]);
718 	return appender;
719 }
720 
721 auto append(Appender!(ubyte[]) appender, StringMap data) {
722 	assert(data.length < short.max);
723 	append(appender, cast(short)data.length);
724 	foreach (k,v; data) {
725 		append(appender, k);
726 		append(appender, v);
727 	}
728 
729 	return appender;
730 }*/
731 
732 /**   [string multimap] A [short] n, followed by n pair <k><v> where <k> is a
733  *                      [string] and <v> is a [string list].
734  */
735 alias string[][string] StringMultiMap;
736 package StringMultiMap readStringMultiMap(TCPConnection s, ref int counter) {
737 	//writefln("got %d to read", counter);
738 	StringMultiMap smm;
739 	auto count = readShort(s, counter);
740 	for (int i=0; i<count && counter>0; i++) {
741 		auto key = readShortString(s, counter);
742 		auto values = readStringList(s, counter);
743 		smm[key] = values;
744 	}
745 	if (smm.length < count && counter <= 0)
746 		throw new Exception("ran out of data to read");
747 
748 	return smm;
749 }
750 
751 
752 class Authenticator {
753 	StringMap getCredentials() {
754 		StringMap ret;
755 		return ret;
756 	}
757 }
758 
759 Authenticator getAuthenticator(string type) {
760 	// TODO: provide real authenticator types that work with the ones in Cassandra
761 	return new Authenticator();
762 }
763 
764 
765 
766 
767 
768 class CQLException : Exception {
769 	this(string s = "", string file=__FILE__, int line=__LINE__) {
770 		super(s, file, line);
771 	}
772 }
773 
774 class CQLProtocolException : CQLException {
775 	this(string s = "", string file=__FILE__, int line=__LINE__) {
776 		super(s, file, line);
777 	}
778 }
779 class NotImplementedException : CQLException {
780 	this(string s = "Not Implemented", string file=__FILE__, int line=__LINE__) {
781 		super(s, file, line);
782 	}
783 }
784 
785 
786 template bestCassandraType(T)
787 {
788 	import std.datetime;
789 	static if (is (T == bool)) enum bestCassandraType = "boolean";
790 	else static if (is (T == int)) enum bestCassandraType = "int";
791 	else static if (is (T == long)) enum bestCassandraType = "bigint";
792 	else static if (is (T == float)) enum bestCassandraType = "float";
793 	else static if (is (T == double)) enum bestCassandraType = "double";
794 	else static if (is (T == string)) enum bestCassandraType = "text";
795 	else static if (is (T == ubyte[])) enum bestCassandraType = "blob";
796 	else static if (is (T == InetAddress)) enum bestCassandraType = "inet";
797 	else static if (is (T == InetAddress6)) enum bestCassandraType = "inet";
798 	else static if (is (T == DateTime)) enum bestCassandraType = "timestamp";
799 	else static assert(0, "Can't suggest a cassandra cql type for storing: "~T.stringof);
800 }
801 
802 string toCQLString(T)(T value)
803 {
804 	static if (is (T == bool) || is (T : long) || is (T : real))
805 		return to!string(value);
806 	else static if (isSomeString!T) {
807 		auto ret = appender!string();
808 		ret.reserve(value.length+2);
809 		ret.put('\'');
810 		foreach (dchar ch; value) {
811 			if (ch == '\'') ret.put("''");
812 			else ret.put(ch);
813 		}
814 		ret.put('\'');
815 		return ret.data;
816 	} else static assert(false, "Type "~T.stringof~" isn't implemented.");
817 }
818 
819 protected void log(Args...)(string s, Args args)
820 {
821 	version (Have_vibe_d) {
822 		import vibe.core.log;
823 		logDebug(s, args);
824 	} else {
825 		import std.stdio;
826 		writefln(s, args);
827 	}
828 }