1 module cassandra.cql.connection;
2 
3 public import cassandra.cql.result;
4 
5 import std.array;
6 import std.bitmanip : bitfields;
7 import std.conv;
8 import std.exception : enforce;
9 import std.format : formattedWrite;
10 import std.range : isOutputRange;
11 import std.stdint;
12 import std..string : startsWith;
13 import std.traits;
14 
15 import cassandra.internal.utils;
16 import cassandra.internal.tcpconnection;
17 
18 
19 class Connection {
20 	version (Have_vibe_d) {
21 		import vibe.core.connectionpool : LockedConnection;
22 		alias Lock = LockedConnection!Connection;
23 	} else {
24 		Connection m_connection;
25 		alias Lock = Connection;
26 	}
27 
28 	private {
29 		TCPConnection sock;
30 		string m_host;
31 		ushort m_port;
32 		string m_usedKeyspace;
33 
34 		int m_counter; // keeps track of data left after each read
35 
36 		bool m_compressionEnabled;
37 		bool m_tracingEnabled;
38 		byte m_streamID;
39 		FrameHeader.Version m_transportVersion; // this is just the protocol version number
40 	}
41 
42 	enum defaultPort = 9042;
43 
44 	this(string host, ushort port = defaultPort)
45 	{
46 		m_host = host;
47 		m_port = port;
48 	}
49 
50 	void connect()
51 	{
52 		if (!sock || !sock.connected) {
53 			log("connecting");
54 			sock = connectTCP(m_host, m_port);
55 			log("connected. doing handshake...");
56 			startup();
57 			log("handshake completed.");
58 			m_usedKeyspace = null;
59 		}
60 	}
61 
62 	void close()
63 	{
64 		if (m_counter > 0) {
65 			auto buf = readRawBytes(sock, m_counter, m_counter);
66 			log("buf:", buf);
67 		}
68 		assert(m_counter == 0, "Did not complete reading of stream: "~ to!string(m_counter) ~" bytes left");
69 		sock.close();
70 		sock = null;
71 	}
72 
73 	void useKeyspace(string name)
74 	{
75 		if (name == m_usedKeyspace) return;
76 		enforceValidIdentifier(name);
77 		query(Lock.init, `USE `~name, Consistency.any);
78 		m_usedKeyspace = name;
79 	}
80 
81 
82 	//vvvvvvvvvvvvvvvvvvvvv CQL Implementation vvvvvvvvvvvvvvvvvv
83 	/// Make a FrameHeader corresponding to this Stream
84 	FrameHeader makeHeader(FrameHeader.OpCode opcode) {
85 		FrameHeader fh;
86 		switch (m_transportVersion) {
87 			case 1: fh.version_ = FrameHeader.Version.V1Request; break;
88 			case 2: fh.version_ = FrameHeader.Version.V2Request; break;
89 			default: assert(0, "invalid transport_version");
90 		}
91 		fh.compress = m_compressionEnabled;
92 		fh.trace = m_tracingEnabled;
93 		fh.streamid = m_streamID;
94 		fh.opcode = opcode;
95 		return fh;
96 	}
97 
98 	/**4. Messages
99 	 *
100 	 *4.1. Requests
101 	 *
102 	 *  Note that outside of their normal responses (described below), all requests
103 	 *  can get an ERROR message (Section 4.2.1) as response.
104 	 *
105 	 *4.1.1. STARTUP
106 	 *
107 	 *  Initialize the connection. The server will respond by either a READY message
108 	 *  (in which case the connection is ready for queries) or an AUTHENTICATE message
109 	 *  (in which case credentials will need to be provided using CREDENTIALS).
110 	 *
111 	 *  This must be the first message of the connection, except for OPTIONS that can
112 	 *  be sent before to find out the options supported by the server. Once the
113 	 *  connection has been initialized, a client should not send any more STARTUP
114 	 *  message.
115 	 *
116 	 *  The body is a [string map] of options. Possible options are:
117 	 *    - "CQL_VERSION": the version of CQL to use. This option is mandatory and
118 	 *      currenty, the only version supported is "3.0.0". Note that this is
119 	 *      different from the protocol version.
120 	 *    - "COMPRESSION": the compression algorithm to use for frames (See section 5).
121 	 *      This is optional, if not specified no compression will be used.
122 	 */
123 	 private void startup(string compression_algorithm = "") {
124 		StringMap data;
125 		data["CQL_VERSION"] = "3.0.0";
126 		if (compression_algorithm.length > 0)
127 			data["COMPRESSION"] = compression_algorithm;
128 
129 		auto fh = makeHeader(FrameHeader.OpCode.startup);
130 
131 		auto bytebuf = appender!(ubyte[])();
132 		bytebuf.append(data);
133 		fh.length = bytebuf.getIntLength();
134 		sock.write(fh.bytes);
135 		sock.write(bytebuf.data);
136 
137 		fh = readFrameHeader(sock, m_counter);
138 		if (fh.isAUTHENTICATE) {
139 			fh = authenticate(fh);
140 		}
141 		throwOnError(fh);
142 	}
143 
144 	private FrameHeader authenticate(FrameHeader fh) {
145 		auto authenticatorname = readAuthenticate(fh);
146 		auto authenticator = getAuthenticator(authenticatorname);
147 		sendCredentials(authenticator.getCredentials());
148 		throw new Exception("NotImplementedException Authentication: "~ authenticatorname);
149 	}
150 
151 
152 	/**
153 	 *4.1.2. CREDENTIALS
154 	 *
155 	 *  Provides credentials information for the purpose of identification. This
156 	 *  message comes as a response to an AUTHENTICATE message from the server, but
157 	 *  can be use later in the communication to change the authentication
158 	 *  information.
159 	 *
160 	 *  The body is a list of key/value informations. It is a [short] n, followed by n
161 	 *  pair of [string]. These key/value pairs are passed as is to the Cassandra
162 	 *  IAuthenticator and thus the detail of which informations is needed depends on
163 	 *  that authenticator.
164 	 *
165 	 *  The response to a CREDENTIALS is a READY message (or an ERROR message).
166 	 */
167 	private void sendCredentials(StringMap data) {
168 		auto fh = makeHeader(FrameHeader.OpCode.credentials);
169 		auto bytebuf = appender!(ubyte[])();
170 		bytebuf.append(data);
171 		fh.length = bytebuf.getIntLength;
172 		sock.write(fh.bytes);
173 		sock.write(bytebuf.data);
174 		
175 		assert(false, "todo: read credentials response");
176 	}
177 
178 	/**
179 	 *4.1.3. OPTIONS
180 	 *
181 	 *  Asks the server to return what STARTUP options are supported. The body of an
182 	 *  OPTIONS message should be empty and the server will respond with a SUPPORTED
183 	 *  message.
184 	 */
185 	StringMultiMap requestOptions() {
186 		connect();
187 		auto fh = makeHeader(FrameHeader.OpCode.options);
188 		sock.write(fh.bytes);
189 		fh = readFrameHeader(sock, m_counter);
190 		if (!fh.isSUPPORTED) {
191 			throw new Exception("CQLProtocolException, Unknown response to OPTIONS request");
192 		}
193 		return readSupported(fh);
194 	}
195 
196 	 /**
197 	 *4.1.4. QUERY
198 	 *
199 	 *  Performs a CQL query. The body of the message consists of a CQL query as a [long
200 	 *  string] followed by the [consistency] for the operation.
201 	 *
202 	 *  Note that the consistency is ignored by some queries (USE, CREATE, ALTER,
203 	 *  TRUNCATE, ...).
204 	 *
205 	 *  The server will respond to a QUERY message with a RESULT message, the content
206 	 *  of which depends on the query.
207 	 */
208 	CassandraResult query(Connection.Lock lock, string q, Consistency consistency = Consistency.one)
209 	{
210 		assert(!q.startsWith("PREPARE"), "use Connection.prepare to issue PREPARE statements.");
211 		connect();
212 		auto fh = makeHeader(FrameHeader.OpCode.query);
213 		auto bytebuf = appender!(ubyte[])();
214 		log("-----------");
215 		bytebuf.appendLongString(q);
216 		//print(bytebuf.data);
217 		log("-----------");
218 		bytebuf.append(consistency);
219 		fh.length = bytebuf.getIntLength;
220 		sock.write(fh.bytes);
221 		sock.write(bytebuf.data);
222 
223 		fh = readFrameHeader(sock, m_counter);
224 		throwOnError(fh);
225 		auto ret = CassandraResult(lock, fh, sock, m_counter);
226 		assert(ret.kind != CassandraResult.Kind.prepared, "use Connection.prepare to issue PREPARE statements.");
227 		return ret;
228 	}
229 
230 	 /**
231 	 *4.1.5. PREPARE
232 	 *
233 	 *  Prepare a query for later execution (through EXECUTE). The body consists of
234 	 *  the CQL query to prepare as a [long string].
235 	 *
236 	 *  The server will respond with a RESULT message with a `prepared` kind (0x0004,
237 	 *  see Section 4.2.5).
238 	 */
239 	PreparedStatement prepare(string q) {
240 		connect();
241 		auto fh = makeHeader(FrameHeader.OpCode.prepare);
242 		auto bytebuf = appender!(ubyte[])();
243 		log("---------=-");
244 		bytebuf.appendLongString(q);
245 		fh.length = bytebuf.getIntLength;
246 		sock.write(fh.bytes);
247 		sock.write(bytebuf.data);
248 		log("---------=-");
249 
250 		fh = readFrameHeader(sock, m_counter);
251 		throwOnError(fh);
252 		if (!fh.isRESULT) {
253 			throw new Exception("CQLProtocolException, Unknown response to PREPARE command");
254 		}
255 		auto result = CassandraResult(Lock.init, fh, sock, m_counter);
256 		return PreparedStatement(result);
257 	}
258 
259 	/**
260 	 *4.1.6. EXECUTE
261 	 *
262 	 *  Executes a prepared query. The body of the message must be:
263 	 *    <id><n><value_1>....<value_n><consistency>
264 	 *  where:
265 	 *    - <id> is the prepared query ID. It's the [short bytes] returned as a
266 	 *      response to a PREPARE message.
267 	 *    - <n> is a [short] indicating the number of following values.
268 	 *    - <value_1>...<value_n> are the [bytes] to use for bound variables in the
269 	 *      prepared query.
270 	 *    - <consistency> is the [consistency] level for the operation.
271 	 *
272 	 *  Note that the consistency is ignored by some (prepared) queries (USE, CREATE,
273 	 *  ALTER, TRUNCATE, ...).
274 	 *
275 	 *  The response from the server will be a RESULT message.
276 	 */
277 	auto execute(Args...)(Connection.Lock lock, PreparedStatement stmt, Args args)
278 	{
279 	//private auto execute(Args...)(ubyte[] preparedStatementID, Consistency consistency, Args args) {
280 		connect();
281 		auto fh = makeHeader(FrameHeader.OpCode.execute);
282 		auto bytebuf = appender!(ubyte[])();
283 		log("-----=----=-");
284 		bytebuf.appendShortBytes(stmt.id);
285 		assert(args.length < short.max);
286 		bytebuf.append(cast(short)args.length);
287 		foreach (arg; args) {
288 			bytebuf.appendIntBytes(arg);
289 		}
290 		bytebuf.append(stmt.consistency);
291 
292 		fh.length = bytebuf.getIntLength;
293 		sock.write(fh.bytes);
294 		log("Sending: %s", bytebuf.data);
295 		sock.write(bytebuf.data);
296 		log("-----=----=-");
297 
298 		fh = readFrameHeader(sock, m_counter);
299 		throwOnError(fh);
300 		if (!fh.isRESULT) {
301 			throw new Exception("CQLProtocolException, Unknown response to Execute command: "~ to!string(fh.opcode));
302 		}
303 		return CassandraResult(lock, fh, sock, m_counter);
304 	}
305 
306 	/**
307 	 *4.1.7. REGISTER
308 	 *
309 	 *  Register this connection to receive some type of events. The body of the
310 	 *  message is a [string list] representing the event types to register to. See
311 	 *  section 4.2.6 for the list of valid event types.
312 	 *
313 	 *  The response to a REGISTER message will be a READY message.
314 	 *
315 	 *  Please note that if a client driver maintains multiple connections to a
316 	 *  Cassandra node and/or connections to multiple nodes, it is advised to
317 	 *  dedicate a handful of connections to receive events, but to *not* register
318 	 *  for events on all connections, as this would only result in receiving
319 	 *  multiple times the same event messages, wasting bandwidth.
320 	 */
321 	void listen(Event[] events...) {
322 		connect();
323 		auto fh = makeHeader(FrameHeader.OpCode.register);
324 		auto bytebuf = appender!(ubyte[])();
325 		auto tmpbuf = appender!(ubyte[])();
326 		//tmpbuf.append(events); // FIXME: append(Event) isn't implemented
327 		fh.length = tmpbuf.getIntLength;
328 		bytebuf.put(fh.bytes);
329 		bytebuf.append(tmpbuf.data);
330 		sock.write(bytebuf.data);
331 
332 		fh = readFrameHeader(sock, m_counter);
333 		if (!fh.isREADY) {
334 			throw new Exception("CQLProtocolException, Unknown response to REGISTER command");
335 		}
336 		assert(false, "Untested: setup of event listening");
337 	}
338 
339 	/**
340 	 *4.2. Responses
341 	 *
342 	 *  This section describes the content of the frame body for the different
343 	 *  responses. Please note that to make room for future evolution, clients should
344 	 *  support extra informations (that they should simply discard) to the one
345 	 *  described in this document at the end of the frame body.
346 	 *
347 	 *4.2.1. ERROR
348 	 *
349 	 *  Indicates an error processing a request. The body of the message will be an
350 	 *  error code ([int]) followed by a [string] error message. Then, depending on
351 	 *  the exception, more content may follow. The error codes are defined in
352 	 *  Section 7, along with their additional content if any.
353 	 */
354 	protected void throwOnError(FrameHeader fh) {
355 		if (!fh.isERROR) return;
356 		int tmp;
357 		Error code;
358 		readIntNotNULL(tmp, sock, m_counter);
359 		code = cast(Error)tmp;
360 
361 		auto msg = readShortString(sock, m_counter);
362 
363 		auto spec_msg = toString(code);
364 
365 		final switch (code) {
366 			case Error.serverError:
367 				throw new Exception("CQL Exception, "~ spec_msg ~ msg);
368 			case Error.protocolError:
369 				throw new Exception("CQL Exception, "~ spec_msg ~ msg);
370 			case Error.badCredentials:
371 				throw new Exception("CQL Exception, "~ spec_msg ~ msg);
372 			case Error.unavailableException:
373 				auto cs = cast(Consistency)readShort(sock, m_counter);
374 				auto required = readIntNotNULL(tmp, sock, m_counter);
375 				auto alive = readIntNotNULL(tmp, sock, m_counter);
376 				throw new Exception("CQL Exception, "~ spec_msg ~ msg ~" consistency:"~ .to!string(cs) ~" required:"~ to!string(required) ~" alive:"~ to!string(alive));
377 			case Error.overloaded:
378 				throw new Exception("CQL Exception, "~ spec_msg ~ msg);
379 			case Error.isBootstrapping:
380 				throw new Exception("CQL Exception, "~ spec_msg ~ msg);
381 			case Error.truncateError:
382 				throw new Exception("CQL Exception, "~ spec_msg ~ msg);
383 			case Error.writeTimeout:
384 				auto cl = cast(Consistency)readShort(sock, m_counter);
385 				auto received = readIntNotNULL(tmp, sock, m_counter);
386 				auto blockfor = readIntNotNULL(tmp, sock, m_counter); // WARN: the type for blockfor does not seem to be in the spec!!!
387 				auto writeType = cast(WriteType)readShortString(sock, m_counter);
388 				throw new Exception("CQL Exception, "~ spec_msg ~ msg ~" consistency:"~ to!string(cl) ~" received:"~ to!string(received) ~" blockfor:"~ to!string(blockfor) ~" writeType:"~ toString(writeType));
389 			case Error.readTimeout:
390 				auto cl = cast(Consistency)readShort(sock, m_counter);
391 				auto received = readIntNotNULL(tmp, sock, m_counter);
392 				auto blockfor = readIntNotNULL(tmp, sock, m_counter); // WARN: the type for blockfor does not seem to be in the spec!!!
393 				auto data_present = readByte(sock, m_counter);
394 				throw new Exception("CQL Exception, "~ spec_msg ~ msg ~" consistency:"~ to!string(cl) ~" received:"~ to!string(received) ~" blockfor:"~ to!string(blockfor) ~" data_present:"~ (data_present==0x00?"false":"true"));
395 			case Error.syntaxError:
396 				throw new Exception("CQL Exception, "~ spec_msg ~ msg);
397 			case Error.unauthorized:
398 				throw new Exception("CQL Exception, "~ spec_msg ~ msg);
399 			case Error.invalid:
400 				throw new Exception("CQL Exception, "~ spec_msg ~ msg);
401 			case Error.configError:
402 				throw new Exception("CQL Exception, "~ spec_msg ~ msg);
403 			case Error.alreadyExists:
404 				auto ks = readShortString(sock, m_counter);
405 				auto table = readShortString(sock, m_counter);
406 				throw new Exception("CQL Exception, "~ spec_msg ~ msg ~", keyspace:"~ks ~", table:"~ table );
407 			case Error.unprepared:
408 				auto unknown_id = readShort(sock, m_counter);
409 				throw new Exception("CQL Exception, "~ spec_msg ~ msg ~":"~ to!string(unknown_id));
410 		}
411 	}
412 
413 	 /*
414 	 *4.2.2. READY
415 	 *
416 	 *  Indicates that the server is ready to process queries. This message will be
417 	 *  sent by the server either after a STARTUP message if no authentication is
418 	 *  required, or after a successful CREDENTIALS message.
419 	 *
420 	 *  The body of a READY message is empty.
421 	 *
422 	 *
423 	 *4.2.3. AUTHENTICATE
424 	 *
425 	 *  Indicates that the server require authentication. This will be sent following
426 	 *  a STARTUP message and must be answered by a CREDENTIALS message from the
427 	 *  client to provide authentication informations.
428 	 *
429 	 *  The body consists of a single [string] indicating the full class name of the
430 	 *  IAuthenticator in use.
431 	 */
432 	protected string readAuthenticate(FrameHeader fh) {
433 		assert(fh.isAUTHENTICATE);
434 		return readShortString(sock, m_counter);
435 	}
436 
437 	/**
438 	 *4.2.4. SUPPORTED
439 	 *
440 	 *  Indicates which startup options are supported by the server. This message
441 	 *  comes as a response to an OPTIONS message.
442 	 *
443 	 *  The body of a SUPPORTED message is a [string multimap]. This multimap gives
444 	 *  for each of the supported STARTUP options, the list of supported values.
445 	 */
446 	protected StringMultiMap readSupported(FrameHeader fh) {
447 		return readStringMultiMap(sock, m_counter);
448 	}
449 
450 
451 	/**
452 	 *4.2.6. EVENT
453 	 *
454 	 *  And event pushed by the server. A client will only receive events for the
455 	 *  type it has REGISTER to. The body of an EVENT message will start by a
456 	 *  [string] representing the event type. The rest of the message depends on the
457 	 *  event type. The valid event types are:
458 	 *    - "TOPOLOGY_CHANGE": events related to change in the cluster topology.
459 	 *      Currently, events are sent when new nodes are added to the cluster, and
460 	 *      when nodes are removed. The body of the message (after the event type)
461 	 *      consists of a [string] and an [inet], corresponding respectively to the
462 	 *      type of change ("NEW_NODE" or "REMOVED_NODE") followed by the address of
463 	 *      the new/removed node.
464 	 *    - "STATUS_CHANGE": events related to change of node status. Currently,
465 	 *      up/down events are sent. The body of the message (after the event type)
466 	 *      consists of a [string] and an [inet], corresponding respectively to the
467 	 *      type of status change ("UP" or "DOWN") followed by the address of the
468 	 *      concerned node.
469 	 *    - "SCHEMA_CHANGE": events related to schema change. The body of the message
470 	 *      (after the event type) consists of 3 [string] corresponding respectively
471 	 *      to the type of schema change ("CREATED", "UPDATED" or "DROPPED"),
472 	 *      followed by the name of the affected keyspace and the name of the
473 	 *      affected table within that keyspace. For changes that affect a keyspace
474 	 *      directly, the table name will be empty (i.e. the empty string "").
475 	 *
476 	 *  All EVENT message have a streamId of -1 (Section 2.3).
477 	 *
478 	 *  Please note that "NEW_NODE" and "UP" events are sent based on internal Gossip
479 	 *  communication and as such may be sent a short delay before the binary
480 	 *  protocol server on the newly up node is fully started. Clients are thus
481 	 *  advise to wait a short time before trying to connect to the node (1 seconds
482 	 *  should be enough), otherwise they may experience a connection refusal at
483 	 *  first.
484 	 */
485 	enum Event : string {
486 		topologyChange = "TOPOLOGY_CHANGE",
487 		statusChange = "STATUS_CHANGE",
488 		schemaChange = "SCHEMA_CHANGE",
489 		newNode = "NEW_NODE",
490 		up = "UP"
491 	}
492 	protected void readEvent(FrameHeader fh) {
493 		assert(fh.isEVENT);
494 	}
495 	/*void writeEvents(Appender!(ubyte[]) appender, Event e...) {
496 		appender.append(e);
497 	}*/
498 
499 
500 	/**5. Compression
501 	 *
502 	 *  Frame compression is supported by the protocol, but then only the frame body
503 	 *  is compressed (the frame header should never be compressed).
504 	 *
505 	 *  Before being used, client and server must agree on a compression algorithm to
506 	 *  use, which is done in the STARTUP message. As a consequence, a STARTUP message
507 	 *  must never be compressed.  However, once the STARTUP frame has been received
508 	 *  by the server can be compressed (including the response to the STARTUP
509 	 *  request). Frame do not have to be compressed however, even if compression has
510 	 *  been agreed upon (a server may only compress frame above a certain size at its
511 	 *  discretion). A frame body should be compressed if and only if the compressed
512 	 *  flag (see Section 2.2) is set.
513 	 */
514 
515 	/**
516 	 *6. Collection types
517 	 *
518 	 *  This section describe the serialization format for the collection types:
519 	 *  list, map and set. This serialization format is both useful to decode values
520 	 *  returned in RESULT messages but also to encode values for EXECUTE ones.
521 	 *
522 	 *  The serialization formats are:
523 	 *     List: a [short] n indicating the size of the list, followed by n elements.
524 	 *           Each element is [short bytes] representing the serialized element
525 	 *           value.
526 	 */
527 	protected auto readList(T)(FrameHeader fh) {
528 		auto size = readShort(fh);
529 		T[] ret;
530 		foreach (i; 0..size) {
531 			ret ~= readBytes!T(sock, m_counter);
532 		}
533 		return ret;
534 
535 	}
536 
537 	/**     Map: a [short] n indicating the size of the map, followed by n entries.
538 	 *          Each entry is composed of two [short bytes] representing the key and
539 	 *          the value of the entry map.
540 	 */
541 	protected auto readMap(T,U)(FrameHeader fh) {
542 		auto size = readShort(fh);
543 		T[U] ret;
544 		foreach (i; 0..size) {
545 			ret[readShortBytes!T(sock, m_counter)] = readShortBytes!U(sock, m_counter);
546 
547 		}
548 		return ret;
549 	}
550 
551 	/**     Set: a [short] n indicating the size of the set, followed by n elements.
552 	 *          Each element is [short bytes] representing the serialized element
553 	 *          value.
554 	 */
555 	protected auto readSet(T)(FrameHeader fh) {
556 		auto size = readShort(fh);
557 		T[] ret;
558 		foreach (i; 0..size) {
559 			ret[] ~= readBytes!T(sock, m_counter);
560 
561 		}
562 		return ret;
563 	}
564 
565 	/**
566 	 *7. Error codes
567 	 *
568 	 *  The supported error codes are described below:
569 	 *    0x0000    Server error: something unexpected happened. This indicates a
570 	 *              server-side bug.
571 	 *    0x000A    Protocol error: some client message triggered a protocol
572 	 *              violation (for instance a QUERY message is sent before a STARTUP
573 	 *              one has been sent)
574 	 *    0x0100    Bad credentials: CREDENTIALS request failed because Cassandra
575 	 *              did not accept the provided credentials.
576 	 *
577 	 *    0x1000    Unavailable exception. The rest of the ERROR message body will be
578 	 *                <cl><required><alive>
579 	 *              where:
580 	 *                <cl> is the [consistency] level of the query having triggered
581 	 *                     the exception.
582 	 *                <required> is an [int] representing the number of node that
583 	 *                           should be alive to respect <cl>
584 	 *                <alive> is an [int] representing the number of replica that
585 	 *                        were known to be alive when the request has been
586 	 *                        processed (since an unavailable exception has been
587 	 *                        triggered, there will be <alive> < <required>)
588 	 *    0x1001    Overloaded: the request cannot be processed because the
589 	 *              coordinator node is overloaded
590 	 *    0x1002    Is_bootstrapping: the request was a read request but the
591 	 *              coordinator node is bootstrapping
592 	 *    0x1003    Truncate_error: error during a truncation error.
593 	 *    0x1100    Write_timeout: Timeout exception during a write request. The rest
594 	 *              of the ERROR message body will be
595 	 *                <cl><received><blockfor><writeType>
596 	 *              where:
597 	 *                <cl> is the [consistency] level of the query having triggered
598 	 *                     the exception.
599 	 *                <received> is an [int] representing the number of nodes having
600 	 *                           acknowledged the request.
601 	 *                <blockfor> is the number of replica whose acknowledgement is
602 	 *                           required to achieve <cl>.
603 	 *                <writeType> is a [string] that describe the type of the write
604 	 *                            that timeouted. The value of that string can be one
605 	 *                            of:
606 	 *                             - "SIMPLE": the write was a non-batched
607 	 *                               non-counter write.
608 	 *                             - "BATCH": the write was a (logged) batch write.
609 	 *                               If this type is received, it means the batch log
610 	 *                               has been successfully written (otherwise a
611 	 *                               "BATCH_LOG" type would have been send instead).
612 	 *                             - "UNLOGGED_BATCH": the write was an unlogged
613 	 *                               batch. Not batch log write has been attempted.
614 	 *                             - "COUNTER": the write was a counter write
615 	 *                               (batched or not).
616 	 *                             - "BATCH_LOG": the timeout occured during the
617 	 *                               write to the batch log when a (logged) batch
618 	 *                               write was requested.
619 	 */
620 	 alias string WriteType;
621 	 string toString(WriteType wt) {
622 		final switch (cast(string)wt) {
623 			case "SIMPLE":
624 				return "SIMPLE: the write was a non-batched non-counter write.";
625 			case "BATCH":
626 				return "BATCH: the write was a (logged) batch write. If this type is received, it means the batch log has been successfully written (otherwise a \"BATCH_LOG\" type would have been send instead).";
627 			case "UNLOGGED_BATCH":
628 				return "UNLOGGED_BATCH: the write was an unlogged batch. Not batch log write has been attempted.";
629 			case "COUNTER":
630 				return "COUNTER: the write was a counter write (batched or not).";
631 			case "BATCH_LOG":
632 				return "BATCH_LOG: the timeout occured during the write to the batch log when a (logged) batch write was requested.";
633 		 }
634 	 }
635 	 /**    0x1200    Read_timeout: Timeout exception during a read request. The rest
636 	 *              of the ERROR message body will be
637 	 *                <cl><received><blockfor><data_present>
638 	 *              where:
639 	 *                <cl> is the [consistency] level of the query having triggered
640 	 *                     the exception.
641 	 *                <received> is an [int] representing the number of nodes having
642 	 *                           answered the request.
643 	 *                <blockfor> is the number of replica whose response is
644 	 *                           required to achieve <cl>. Please note that it is
645 	 *                           possible to have <received> >= <blockfor> if
646 	 *                           <data_present> is false. And also in the (unlikely)
647 	 *                           case were <cl> is achieved but the coordinator node
648 	 *                           timeout while waiting for read-repair
649 	 *                           acknowledgement.
650 	 *                <data_present> is a single byte. If its value is 0, it means
651 	 *                               the replica that was asked for data has not
652 	 *                               responded. Otherwise, the value is != 0.
653 	 *
654 	 *    0x2000    Syntax_error: The submitted query has a syntax error.
655 	 *    0x2100    Unauthorized: The logged user doesn't have the right to perform
656 	 *              the query.
657 	 *    0x2200    Invalid: The query is syntactically correct but invalid.
658 	 *    0x2300    Config_error: The query is invalid because of some configuration issue
659 	 *    0x2400    Already_exists: The query attempted to create a keyspace or a
660 	 *              table that was already existing. The rest of the ERROR message
661 	 *              body will be <ks><table> where:
662 	 *                <ks> is a [string] representing either the keyspace that
663 	 *                     already exists, or the keyspace in which the table that
664 	 *                     already exists is.
665 	 *                <table> is a [string] representing the name of the table that
666 	 *                        already exists. If the query was attempting to create a
667 	 *                        keyspace, <table> will be present but will be the empty
668 	 *                        string.
669 	 *    0x2500    Unprepared: Can be thrown while a prepared statement tries to be
670 	 *              executed if the provide prepared statement ID is not known by
671 	 *              this host. The rest of the ERROR message body will be [short
672 	 *              bytes] representing the unknown ID.
673 	 **/
674 	 enum Error : ushort {
675 		serverError = 0x0000,
676 		protocolError = 0x000A,
677 		badCredentials = 0x0100,
678 		unavailableException = 0x1000,
679 		overloaded = 0x1001,
680 		isBootstrapping = 0x1002,
681 		truncateError = 0x1003,
682 		writeTimeout = 0x1100,
683 		readTimeout = 0x1200,
684 		syntaxError = 0x2000,
685 		unauthorized = 0x2100,
686 		invalid = 0x2200,
687 		configError = 0x2300,
688 		alreadyExists = 0x2400,
689 		unprepared = 0x2500
690 	 }
691 
692 	 static string toString(Error err)
693 	 {
694 		switch (err) {
695 			case Error.serverError:
696 				return "Server error: something unexpected happened. This indicates a server-side bug.";
697 			case Error.protocolError:
698 				return "Protocol error: some client message triggered a protocol violation (for instance a QUERY message is sent before a STARTUP one has been sent)";
699 			case Error.badCredentials:
700 				return "Bad credentials: CREDENTIALS request failed because Cassandra did not accept the provided credentials.";
701 			case Error.unavailableException:
702 				return "Unavailable exception.";
703 			case Error.overloaded:
704 				return "Overloaded: the request cannot be processed because the coordinator node is overloaded";
705 			case Error.isBootstrapping:
706 				return "Is_bootstrapping: the request was a read request but the coordinator node is bootstrapping";
707 			case Error.truncateError:
708 				return "Truncate_error: error during a truncation error.";
709 			case Error.writeTimeout:
710 				return "Write_timeout: Timeout exception during a write request.";
711 			case Error.readTimeout:
712 				return "Read_timeout: Timeout exception during a read request.";
713 			case Error.syntaxError:
714 				return "Syntax_error: The submitted query has a syntax error.";
715 			case Error.unauthorized:
716 				return "Unauthorized: The logged user doesn't have the right to perform the query.";
717 			case Error.invalid:
718 				return "Invalid: The query is syntactically correct but invalid.";
719 			case Error.configError:
720 				return "Config_error: The query is invalid because of some configuration issue.";
721 			case Error.alreadyExists:
722 				return "Already_exists: The query attempted to create a keyspace or a table that was already existing.";
723 			case Error.unprepared:
724 				return "Unprepared: Can be thrown while a prepared statement tries to be executed if the provide prepared statement ID is not known by this host.";
725 			default:
726 				assert(false);
727 		}
728 	 }
729 }