1 module cassandra.cql.result;
2 
3 public import cassandra.cql.utils;
4 
5 import cassandra.cql.connection;
6 import cassandra.internal.utils;
7 import cassandra.internal.tcpconnection;
8 
9 import std.array;
10 import std.bigint;
11 import std.bitmanip : bitfields;
12 import std.conv;
13 import std.exception : enforce;
14 import std.format : formattedWrite;
15 import std.range : isOutputRange;
16 import std.stdint;
17 import std..string : format;
18 import std.traits;
19 
20 
21 struct CassandraResult {
22 	import std.typecons;
23 
24 	/*
25 	 *4.2.5. RESULT
26 	 *
27 	 *  The result to a query (QUERY, PREPARE or EXECUTE messages).
28 	 *
29 	 *  The first element of the body of a RESULT message is an [int] representing the
30 	 *  `kind` of result. The rest of the body depends on the kind. The kind can be
31 	 *  one of:
32 	 *    0x0001    Void: for results carrying no information.
33 	 *    0x0002    Rows: for results to select queries, returning a set of rows.
34 	 *    0x0003    Set_keyspace: the result to a `use` query.
35 	 *    0x0004    Prepared: result to a PREPARE message.
36 	 *    0x0005    Schema_change: the result to a schema altering query.
37 	 *
38 	 *  The body for each kind (after the [int] kind) is defined below.
39 	 */
40 	enum Kind : short {
41 		void_        = 0x0001,
42 		rows         = 0x0002,
43 		setKeyspace  = 0x0003,
44 		prepared     = 0x0004,
45 		schemaChange = 0x0005
46 	}
47 
48 	/*
49 	 *4.2.5.5. Schema_change
50 	 *
51 	 *  The result to a schema altering query (creation/update/drop of a
52 	 *  keyspace/table/index). The body (after the kind [int]) is composed of 3
53 	 *  [string]:
54 	 *    <change><keyspace><table>
55 	 *  where:
56 	 *    - <change> describe the type of change that has occured. It can be one of
57 	 *      "CREATED", "UPDATED" or "DROPPED".
58 	 *    - <keyspace> is the name of the affected keyspace or the keyspace of the
59 	 *      affected table.
60 	 *    - <table> is the name of the affected table. <table> will be empty (i.e.
61 	 *      the empty string "") if the change was affecting a keyspace and not a
62 	 *      table.
63 	 *
64 	 *  Note that queries to create and drop an index are considered as change
65 	 *  updating the table the index is on.
66 	 */
67 	enum Change : string {
68 		created = "CREATED",
69 		updated = "UPDATED",
70 		dropped = "DROPPED"
71 	}
72 
73 	/*  where:
74 	 *    - <metadata> is composed of:
75 	 *        <flags><columns_count><global_table_spec>?<col_spec_1>...<col_spec_n>
76 	 *      where:
77 	 *        - <flags> is an [int]. The bits of <flags> provides information on the
78 	 *          formatting of the remaining informations. A flag is set if the bit
79 	 *          corresponding to its `mask` is set. Supported flags are, given there
80 	 *          mask:
81 	 *            0x0001    Global_tables_spec: if set, only one table spec (keyspace
82 	 *                      and table name) is provided as <global_table_spec>. If not
83 	 *                      set, <global_table_spec> is not present.
84 	 *        - <columns_count> is an [int] representing the number of columns selected
85 	 *          by the query this result is of. It defines the number of <col_spec_i>
86 	 *          elements in and the number of element for each row in <rows_content>.
87 	 *        - <global_table_spec> is present if the Global_tables_spec is set in
88 	 *          <flags>. If present, it is composed of two [string] representing the
89 	 *          (unique) keyspace name and table name the columns return are of.
90 	 */
91 	struct MetaData {
92 		int flags; enum GLOBAL_TABLES_SPEC = 0x0001; @property bool hasGlobalTablesSpec() { return flags & MetaData.GLOBAL_TABLES_SPEC ? true : false; }
93 		int columns_count;
94 		string[2] global_table_spec;
95 		ColumnSpecification[] column_specs;
96 	}
97 
98 	private {
99 		// conceptually immutable:
100 		Change m_lastChange;
101 		string m_currentKeyspace;
102 		string m_currentTable;
103 		FrameHeader m_fh;
104 		Kind m_kind;
105 		TCPConnection m_sock;
106 		MetaData m_metadata;
107 
108 		// counters
109 		int* m_counterP;
110 		@property ref int m_counter() { return *m_counterP; }
111 		RefCounted!int m_rowCount; // keep the remaining row count consistent when CassandraResult gets copied
112 		Connection.Lock m_lock;
113 	}
114 
115 	this(Connection.Lock lock, FrameHeader fh, TCPConnection sock, ref int counter)
116 	{
117 		m_fh = fh;
118 		m_sock = sock;
119 		m_counterP = &counter;
120 		
121 		int tmp;
122 		m_kind = cast(Kind)readIntNotNULL(tmp, sock, m_counter);
123 		
124 		final switch (m_kind) {
125 			case Kind.void_: readVoid(); break;
126 			case Kind.rows:
127 				readRowMetaData();
128 				readIntNotNULL(m_rowCount, m_sock, m_counter);
129 				break;
130 			case Kind.setKeyspace: readSet_keyspace(); break;
131 			case Kind.prepared: break; // ignored and handled by PreparedStatement later
132 			case Kind.schemaChange: readSchema_change(); break;
133 		}
134 
135 		if (m_rowCount) {
136 			assert(lock !is Connection.Lock.init);
137 			m_lock = lock;
138 		}
139 	}
140 
141 	~this()
142 	{
143 		while (!empty) dropRow();
144 	}
145 
146 	@property Kind kind() { return m_kind; }
147 	@property string lastchange() { return m_lastChange; }
148 	@property string keyspace() { return m_currentKeyspace; }
149 	@property string table() { return m_currentTable; }
150 	@property MetaData metadata() { return m_metadata; }
151 
152 	bool empty() const { return m_kind != Kind.rows || m_rowCount == 0; }
153 
154 	void readRow(ROW)(ref ROW dst)
155 		if (is(ROW == struct))
156 	{
157 		import std.typecons : Nullable;
158 
159 		assert(!empty);
160 
161 		foreach (ref col; m_metadata.column_specs) {
162 			auto tp = col.type.id;
163 			switch (col.name) {
164 				default:
165 					log("Ignoring unknown column %s in result.", col.name);
166 					break;
167 				foreach (mname; __traits(allMembers, ROW))
168 					static if (is(typeof(__traits(getMember, dst, mname) = __traits(getMember, dst, mname)))) {
169 						case mname:
170 							try {
171 								if (!readField(__traits(getMember, dst, mname), tp)) {
172 									static if (isInstanceOf!(Nullable, typeof(__traits(getMember, dst, mname))))
173 										__traits(getMember, dst, mname).nullify();
174 									else __traits(getMember, dst, mname) = __traits(getMember, ROW.init, mname);
175 								}
176 							} catch (Exception e) {
177 								throw new Exception("Failed to read field "~mname~" of type "~to!string(tp)~": "~e.msg);
178 							}
179 							break;
180 					}
181 
182 			}
183 		}
184 
185 		if (!--m_rowCount) m_lock = Connection.Lock.init;
186 	}
187 
188 	private bool readField(T)(ref T dst, Option.Type type)
189 	{
190 		import std.typecons : Nullable;
191 		import std.bitmanip : read;
192 		import std.bigint;
193 		import std.datetime;
194 		import std.uuid;
195 
196 		auto len = readBigEndian!int(m_sock, m_counter);
197 		if (len < 0) return false;
198 
199 		static if (isInstanceOf!(Nullable, T)) alias FT = typeof(dst.get());
200 		else alias FT = T;
201 
202 		auto buf = new ubyte[len];
203 		m_sock.read(buf);
204 		m_counter -= len;
205 
206 		// TODO:
207 		//   Option.Type.inet:
208 		//   Option.Type.list:
209 		//   Option.Type.map:
210 		//   Option.Type.set:
211 
212 		static if (is(FT == bool)) {
213 			enforce(type == Option.Type.boolean, "Expected boolean");
214 			dst = buf[0] != 0;
215 		} else static if (is(FT == int)) {
216 			enforce(type == Option.Type.int_, "Expected 32-bit integer");
217 			dst = read!int(buf);
218 		} else static if (is(FT == long)) {
219 			enforce(type == Option.Type.bigInt, "Expected 64-bit integer");
220 			dst = read!long(buf);
221 		} else static if (is(FT : const(string))) {
222 			with (Option.Type)
223 				enforce(type == varChar || type == text || type == ascii, "Expected string");
224 			dst = cast(FT)buf;
225 		} else static if (is(FT : const(ubyte)[])) {
226 			//enforce(type == Option.Type.blob, "Expected binary blob");
227 			dst = cast(FT)buf;
228 		} else static if (is(FT == float)) {
229 			enforce(type == Option.Type.float_, "Expected 32-bit float");
230 			dst = read!float(buf);
231 		} else static if (is(FT == double)) {
232 			enforce(type == Option.Type.double_, "Expected 64-bit float");
233 			dst = read!double(buf);
234 		} else static if (is(FT == SysTime)) {
235 			enforce(type == Option.Type.timestamp, "Expected timestamp");
236 			enum unix_base = unixTimeToStdTime(0);
237 			dst = SysTime(buf.read!long * 10_000 + unix_base, UTC());
238 		} else static if (is(FT == UUID)) {
239 			with (Option.Type)
240 				enforce(type == uuid || type == timeUUID, "Expected string");
241 			dst = UUID(buf[0 .. 16]);
242 		} else static if (is(FT == BigInt)) {
243 			enforce(type == Option.Type.varInt, "Expected timestamp");
244 			readBigInt(dst, buf);
245 		} else static if (is(FT == Decimal)) {
246 			enforce(type == Option.Type.decimal, "Expected decimal number");
247 			dst.exponent = read!int(buf);
248 			readBigInt(dst.number, buf);
249 		} //else static assert(false, "Unsupported result type: "~FT.stringof);
250 
251 		return true;
252 	}
253 
254 	void dropRow()
255 	{
256 		readRowContent();
257 		if (!--m_rowCount) m_lock = Connection.Lock.init;
258 	}
259 
260 	/*
261 	 *4.2.5.1. Void
262 	 *
263 	 *  The rest of the body for a Void result is empty. It indicates that a query was
264 	 *  successful without providing more information.
265 	 */
266 	private void readVoid() { assert(m_kind == Kind.void_); }
267 
268 	/*
269 	 *4.2.5.2. Rows
270 	 *
271 	 *  Indicates a set of rows. The rest of body of a Rows result is:
272 	 *    <metadata><rows_count><rows_content>
273 	 */
274 	private void readRowMetaData()
275 	{
276 		assert(m_kind == Kind.rows || m_kind == Kind.prepared);
277 		auto md = MetaData();
278 		md.flags.readIntNotNULL(m_sock, m_counter);
279 		md.columns_count.readIntNotNULL(m_sock, m_counter);
280 		if (md.flags & MetaData.GLOBAL_TABLES_SPEC) {
281 			md.global_table_spec[0] = readShortString(m_sock, m_counter);
282 			md.global_table_spec[1] = readShortString(m_sock, m_counter);
283 		}
284 		md.column_specs = readColumnSpecifications(md.flags & MetaData.GLOBAL_TABLES_SPEC, md.columns_count);
285 		log("got spec: %s", md);
286 		m_metadata = md;
287 	}
288 
289 	/*       - <col_spec_i> specifies the columns returned in the query. There is
290 	 *          <column_count> such column specification that are composed of:
291 	 *            (<ksname><tablename>)?<column_name><type>
292 	 *          The initial <ksname> and <tablename> are two [string] are only present
293 	 *          if the Global_tables_spec flag is not set. The <column_name> is a
294 	 *          [string] and <type> is an [option] that correspond to the column name
295 	 *          and type. The option for <type> is either a native type (see below),
296 	 *          in which case the option has no value, or a 'custom' type, in which
297 	 *          case the value is a [string] representing the full qualified class
298 	 *          name of the type represented. Valid option ids are:
299 	 *            0x0000    Custom: the value is a [string], see above.
300 	 *            0x0001    Ascii
301 	 *            0x0002    Bigint
302 	 *            0x0003    Blob
303 	 *            0x0004    Boolean
304 	 *            0x0005    Counter
305 	 *            0x0006    Decimal
306 	 *            0x0007    Double
307 	 *            0x0008    Float
308 	 *            0x0009    Int
309 	 *            0x000A    Text
310 	 *            0x000B    Timestamp
311 	 *            0x000C    Uuid
312 	 *            0x000D    Varchar
313 	 *            0x000E    Varint
314 	 *            0x000F    Timeuuid
315 	 *            0x0010    Inet
316 	 *            0x0020    List: the value is an [option], representing the type
317 	 *                            of the elements of the list.
318 	 *            0x0021    Map: the value is two [option], representing the types of the
319 	 *                           keys and values of the map
320 	 *            0x0022    Set: the value is an [option], representing the type
321 	 *                            of the elements of the set
322 	 */
323 	private Option* readOption()
324 	{
325 		auto ret = new Option();
326 		ret.id = cast(Option.Type)readShort(m_sock, m_counter);
327 		final switch (ret.id) {
328 			case Option.Type.custom: ret.string_value = readShortString(m_sock, m_counter); break;
329 			case Option.Type.ascii: break;
330 			case Option.Type.bigInt: break;
331 			case Option.Type.blob: break;
332 			case Option.Type.boolean: break;
333 			case Option.Type.counter: break;
334 			case Option.Type.decimal: break;
335 			case Option.Type.double_: break;
336 			case Option.Type.float_: break;
337 			case Option.Type.int_: break;
338 			case Option.Type.text: break;
339 			case Option.Type.timestamp: break;
340 			case Option.Type.uuid: break;
341 			case Option.Type.varChar: break;
342 			case Option.Type.varInt: break;
343 			case Option.Type.timeUUID: break;
344 			case Option.Type.inet: break;
345 			case Option.Type.list: ret.option_value = readOption(); break;
346 			case Option.Type.map:
347 				ret.key_values_option_value[0] = readOption();
348 				ret.key_values_option_value[1] = readOption();
349 				break;
350 			case Option.Type.set: ret.option_value = readOption(); break;
351 		}
352 		return ret;
353 	}
354 
355 	struct ColumnSpecification {
356 		string ksname;
357 		string tablename;
358 
359 		string name;
360 		Option type;
361 	}
362 
363 	string toString() const { return format("Result(%s)", m_kind); }
364 
365 	private auto readColumnSpecification(bool hasGlobalTablesSpec) {
366 		ColumnSpecification ret;
367 		if (!hasGlobalTablesSpec) {
368 			ret.ksname = readShortString(m_sock, m_counter);
369 			ret.tablename = readShortString(m_sock, m_counter);
370 		}
371 		ret.name = readShortString(m_sock, m_counter);
372 		ret.type = *readOption();
373 		return ret;
374 	}
375 	private auto readColumnSpecifications(bool hasGlobalTablesSpec, int column_count) {
376 		ColumnSpecification[] ret;
377 		for (int i=0; i<column_count; i++) {
378 			ret ~= readColumnSpecification(hasGlobalTablesSpec);
379 		}
380 		return ret;
381 	}
382 
383 	/**    - <rows_count> is an [int] representing the number of rows present in this
384 	 *      result. Those rows are serialized in the <rows_content> part.
385 	 *    - <rows_content> is composed of <row_1>...<row_m> where m is <rows_count>.
386 	 *      Each <row_i> is composed of <value_1>...<value_n> where n is
387 	 *      <columns_count> and where <value_j> is a [bytes] representing the value
388 	 *      returned for the jth column of the ith row. In other words, <rows_content>
389 	 *      is composed of (<rows_count> * <columns_count>) [bytes].
390 	 */
391 	private auto readRowContent()
392 	{
393 		ubyte[][] ret;
394 		foreach (i; 0 .. m_metadata.columns_count) {
395 			//log("reading index[%d], %s", i, md.column_specs[i]);
396 			final switch (m_metadata.column_specs[i].type.id) {
397 				case Option.Type.custom:
398 					log("warning column %s has custom type", m_metadata.column_specs[i].name);
399 					ret ~= readIntBytes(m_sock, m_counter);
400 					break;
401 				case Option.Type.counter:
402 					ret ~= readIntBytes(m_sock, m_counter);
403 					throw new Exception("Read Counter Type has not been checked this is what we got: "~ cast(string)ret[$-1]);
404 					//break;
405 				case Option.Type.decimal:
406 					auto twobytes = readRawBytes(m_sock, m_counter, 2);
407 					if (twobytes == [0xff,0xff]) {
408 						twobytes = readRawBytes(m_sock, m_counter, 2);
409 						ret ~= null;
410 						break;
411 					}
412 					if (twobytes[0]==0x01 && twobytes[1]==0x01) {
413 						ret ~= readIntBytes(m_sock, m_counter);
414 					} else {
415 						auto writer = appender!string();
416 						formattedWrite(writer, "%s", twobytes);
417 						throw new Exception("New kind of decimal encountered"~ writer.data);
418 					}
419 					break;
420 				case Option.Type.boolean:
421 					ret ~= readRawBytes(m_sock, m_counter, int.sizeof);
422 					break;
423 				case Option.Type.ascii:
424 				case Option.Type.bigInt:
425 				case Option.Type.blob:
426 				case Option.Type.double_:
427 				case Option.Type.float_:
428 				case Option.Type.int_:
429 				case Option.Type.text:
430 				case Option.Type.timestamp:
431 				case Option.Type.uuid:
432 				case Option.Type.varChar:
433 				case Option.Type.varInt:
434 				case Option.Type.timeUUID:
435 				case Option.Type.inet:
436 				case Option.Type.list:
437 				case Option.Type.map:
438 				case Option.Type.set:
439 					ret ~= readIntBytes(m_sock, m_counter);
440 					break;
441 			}
442 		}
443 		return ret;
444 	}
445 
446 	/**4.2.5.3. Set_keyspace
447 	 *
448 	 *  The result to a `use` query. The body (after the kind [int]) is a single
449 	 *  [string] indicating the name of the keyspace that has been set.
450 	 */
451 	private string readSet_keyspace() {
452 		assert(m_kind is Kind.setKeyspace);
453 		return readShortString(m_sock, m_counter);
454 	}
455 
456 	private void readSchema_change()
457 	{
458 		assert(m_kind is Kind.schemaChange);
459 		m_lastChange = cast(Change)readShortString(m_sock, m_counter);
460 		m_currentKeyspace = readShortString(m_sock, m_counter);
461 		m_currentTable = readShortString(m_sock, m_counter);
462 	}
463 }
464 
465 struct PreparedStatement {
466 	private {
467 		ubyte[] m_id;
468 		Consistency m_consistency = Consistency.any;
469 	}
470 
471 	this(ref CassandraResult result)
472 	{
473 		if (result.kind != CassandraResult.Kind.prepared)
474 			throw new Exception("CQLProtocolException, Unknown result type for PREPARE command");
475 
476 		/*
477 		 *4.2.5.4. Prepared
478 		 *
479 		 *  The result to a PREPARE message. The rest of the body of a Prepared result is:
480 		 *    <id><metadata>
481 		 *  where:
482 		 *    - <id> is [short bytes] representing the prepared query ID.
483 		 *    - <metadata> is defined exactly as for a Rows RESULT (See section 4.2.5.2).
484 		 *
485 		 *  Note that prepared query ID return is global to the node on which the query
486 		 *  has been prepared. It can be used on any connection to that node and this
487 		 *  until the node is restarted (after which the query must be reprepared).
488 		 */
489 		assert(result.kind is CassandraResult.Kind.prepared);
490 		log("reading id");
491 		m_id = readShortBytes(result.m_sock, result.m_counter);
492 		log("reading metadata");
493 		/*m_metadata =*/ result.readRowMetaData();
494 		log("done reading prepared stmt");
495 	}
496 
497 	@property const(ubyte)[] id() const { return m_id; }
498 
499 	@property Consistency consistency() const { return m_consistency; }
500 	@property void consistency(Consistency value) { m_consistency = value; }
501 
502 	string toString() const { return format("PreparedStatement(%s)", id.hex); }
503 }
504 
505 struct Decimal {
506 	import std.bigint;
507 	int exponent; // 10 ^ -exp
508 	BigInt number;
509 
510 	string toString() const {
511 		auto buf = appender!string();
512 		number.toString(str => buf.put(str), "%d");
513 		if (exponent <= 0) return buf.data;
514 		else return buf.data[0 .. $-exponent] ~ "." ~ buf.data[$-exponent .. $];
515 	}
516 }
517 
518 private void readBigInt(ref BigInt dst, in ubyte[] bytes)
519 {
520 	auto strbuf = appender!string();
521 	strbuf.reserve(bytes.length*2 + 3);
522 	if (bytes[0] < 0x80) {
523 		strbuf.put("0x");
524 		foreach (b; bytes) strbuf.formattedWrite("%02X", b);
525 	} else {
526 		strbuf.put("-0x");
527 		foreach (b; bytes) strbuf.formattedWrite("%02X", ~b);
528 	}
529 	log(strbuf.data);
530 	dst = BigInt(strbuf.data);
531 	if (bytes[0] >= 0x80) dst--; // FIXME: this is not efficient
532 }