1 module cassandra.cql.result; 2 3 public import cassandra.cql.utils; 4 5 import cassandra.cql.connection; 6 import cassandra.internal.utils; 7 import cassandra.internal.tcpconnection; 8 9 import std.array; 10 import std.bigint; 11 import std.bitmanip : bitfields; 12 import std.conv; 13 import std.exception : enforce; 14 import std.format : formattedWrite; 15 import std.range : isOutputRange; 16 import std.stdint; 17 import std..string : format; 18 import std.traits; 19 20 21 struct CassandraResult { 22 import std.typecons; 23 24 /* 25 *4.2.5. RESULT 26 * 27 * The result to a query (QUERY, PREPARE or EXECUTE messages). 28 * 29 * The first element of the body of a RESULT message is an [int] representing the 30 * `kind` of result. The rest of the body depends on the kind. The kind can be 31 * one of: 32 * 0x0001 Void: for results carrying no information. 33 * 0x0002 Rows: for results to select queries, returning a set of rows. 34 * 0x0003 Set_keyspace: the result to a `use` query. 35 * 0x0004 Prepared: result to a PREPARE message. 36 * 0x0005 Schema_change: the result to a schema altering query. 37 * 38 * The body for each kind (after the [int] kind) is defined below. 39 */ 40 enum Kind : short { 41 void_ = 0x0001, 42 rows = 0x0002, 43 setKeyspace = 0x0003, 44 prepared = 0x0004, 45 schemaChange = 0x0005 46 } 47 48 /* 49 *4.2.5.5. Schema_change 50 * 51 * The result to a schema altering query (creation/update/drop of a 52 * keyspace/table/index). The body (after the kind [int]) is composed of 3 53 * [string]: 54 * <change><keyspace><table> 55 * where: 56 * - <change> describe the type of change that has occured. It can be one of 57 * "CREATED", "UPDATED" or "DROPPED". 58 * - <keyspace> is the name of the affected keyspace or the keyspace of the 59 * affected table. 60 * - <table> is the name of the affected table. <table> will be empty (i.e. 61 * the empty string "") if the change was affecting a keyspace and not a 62 * table. 63 * 64 * Note that queries to create and drop an index are considered as change 65 * updating the table the index is on. 66 */ 67 enum Change : string { 68 created = "CREATED", 69 updated = "UPDATED", 70 dropped = "DROPPED" 71 } 72 73 /* where: 74 * - <metadata> is composed of: 75 * <flags><columns_count><global_table_spec>?<col_spec_1>...<col_spec_n> 76 * where: 77 * - <flags> is an [int]. The bits of <flags> provides information on the 78 * formatting of the remaining informations. A flag is set if the bit 79 * corresponding to its `mask` is set. Supported flags are, given there 80 * mask: 81 * 0x0001 Global_tables_spec: if set, only one table spec (keyspace 82 * and table name) is provided as <global_table_spec>. If not 83 * set, <global_table_spec> is not present. 84 * - <columns_count> is an [int] representing the number of columns selected 85 * by the query this result is of. It defines the number of <col_spec_i> 86 * elements in and the number of element for each row in <rows_content>. 87 * - <global_table_spec> is present if the Global_tables_spec is set in 88 * <flags>. If present, it is composed of two [string] representing the 89 * (unique) keyspace name and table name the columns return are of. 90 */ 91 struct MetaData { 92 int flags; enum GLOBAL_TABLES_SPEC = 0x0001; @property bool hasGlobalTablesSpec() { return flags & MetaData.GLOBAL_TABLES_SPEC ? true : false; } 93 int columns_count; 94 string[2] global_table_spec; 95 ColumnSpecification[] column_specs; 96 } 97 98 private { 99 // conceptually immutable: 100 Change m_lastChange; 101 string m_currentKeyspace; 102 string m_currentTable; 103 FrameHeader m_fh; 104 Kind m_kind; 105 TCPConnection m_sock; 106 MetaData m_metadata; 107 108 // counters 109 int* m_counterP; 110 @property ref int m_counter() { return *m_counterP; } 111 RefCounted!int m_rowCount; // keep the remaining row count consistent when CassandraResult gets copied 112 Connection.Lock m_lock; 113 } 114 115 this(Connection.Lock lock, FrameHeader fh, TCPConnection sock, ref int counter) 116 { 117 m_fh = fh; 118 m_sock = sock; 119 m_counterP = &counter; 120 121 int tmp; 122 m_kind = cast(Kind)readIntNotNULL(tmp, sock, m_counter); 123 124 final switch (m_kind) { 125 case Kind.void_: readVoid(); break; 126 case Kind.rows: 127 readRowMetaData(); 128 readIntNotNULL(m_rowCount, m_sock, m_counter); 129 break; 130 case Kind.setKeyspace: readSet_keyspace(); break; 131 case Kind.prepared: break; // ignored and handled by PreparedStatement later 132 case Kind.schemaChange: readSchema_change(); break; 133 } 134 135 if (m_rowCount) { 136 assert(lock !is Connection.Lock.init); 137 m_lock = lock; 138 } 139 } 140 141 ~this() 142 { 143 while (!empty) dropRow(); 144 } 145 146 @property Kind kind() { return m_kind; } 147 @property string lastchange() { return m_lastChange; } 148 @property string keyspace() { return m_currentKeyspace; } 149 @property string table() { return m_currentTable; } 150 @property MetaData metadata() { return m_metadata; } 151 152 bool empty() const { return m_kind != Kind.rows || m_rowCount == 0; } 153 154 void readRow(ROW)(ref ROW dst) 155 if (is(ROW == struct)) 156 { 157 import std.typecons : Nullable; 158 159 assert(!empty); 160 161 foreach (ref col; m_metadata.column_specs) { 162 auto tp = col.type.id; 163 switch (col.name) { 164 default: 165 log("Ignoring unknown column %s in result.", col.name); 166 break; 167 foreach (mname; __traits(allMembers, ROW)) 168 static if (is(typeof(__traits(getMember, dst, mname) = __traits(getMember, dst, mname)))) { 169 case mname: 170 try { 171 if (!readField(__traits(getMember, dst, mname), tp)) { 172 static if (isInstanceOf!(Nullable, typeof(__traits(getMember, dst, mname)))) 173 __traits(getMember, dst, mname).nullify(); 174 else __traits(getMember, dst, mname) = __traits(getMember, ROW.init, mname); 175 } 176 } catch (Exception e) { 177 throw new Exception("Failed to read field "~mname~" of type "~to!string(tp)~": "~e.msg); 178 } 179 break; 180 } 181 182 } 183 } 184 185 if (!--m_rowCount) m_lock = Connection.Lock.init; 186 } 187 188 private bool readField(T)(ref T dst, Option.Type type) 189 { 190 import std.typecons : Nullable; 191 import std.bitmanip : read; 192 import std.bigint; 193 import std.datetime; 194 import std.uuid; 195 196 auto len = readBigEndian!int(m_sock, m_counter); 197 if (len < 0) return false; 198 199 static if (isInstanceOf!(Nullable, T)) alias FT = typeof(dst.get()); 200 else alias FT = T; 201 202 auto buf = new ubyte[len]; 203 m_sock.read(buf); 204 m_counter -= len; 205 206 // TODO: 207 // Option.Type.inet: 208 // Option.Type.list: 209 // Option.Type.map: 210 // Option.Type.set: 211 212 static if (is(FT == bool)) { 213 enforce(type == Option.Type.boolean, "Expected boolean"); 214 dst = buf[0] != 0; 215 } else static if (is(FT == int)) { 216 enforce(type == Option.Type.int_, "Expected 32-bit integer"); 217 dst = read!int(buf); 218 } else static if (is(FT == long)) { 219 enforce(type == Option.Type.bigInt, "Expected 64-bit integer"); 220 dst = read!long(buf); 221 } else static if (is(FT : const(string))) { 222 with (Option.Type) 223 enforce(type == varChar || type == text || type == ascii, "Expected string"); 224 dst = cast(FT)buf; 225 } else static if (is(FT : const(ubyte)[])) { 226 //enforce(type == Option.Type.blob, "Expected binary blob"); 227 dst = cast(FT)buf; 228 } else static if (is(FT == float)) { 229 enforce(type == Option.Type.float_, "Expected 32-bit float"); 230 dst = read!float(buf); 231 } else static if (is(FT == double)) { 232 enforce(type == Option.Type.double_, "Expected 64-bit float"); 233 dst = read!double(buf); 234 } else static if (is(FT == SysTime)) { 235 enforce(type == Option.Type.timestamp, "Expected timestamp"); 236 enum unix_base = unixTimeToStdTime(0); 237 dst = SysTime(buf.read!long * 10_000 + unix_base, UTC()); 238 } else static if (is(FT == UUID)) { 239 with (Option.Type) 240 enforce(type == uuid || type == timeUUID, "Expected string"); 241 dst = UUID(buf[0 .. 16]); 242 } else static if (is(FT == BigInt)) { 243 enforce(type == Option.Type.varInt, "Expected timestamp"); 244 readBigInt(dst, buf); 245 } else static if (is(FT == Decimal)) { 246 enforce(type == Option.Type.decimal, "Expected decimal number"); 247 dst.exponent = read!int(buf); 248 readBigInt(dst.number, buf); 249 } //else static assert(false, "Unsupported result type: "~FT.stringof); 250 251 return true; 252 } 253 254 void dropRow() 255 { 256 readRowContent(); 257 if (!--m_rowCount) m_lock = Connection.Lock.init; 258 } 259 260 /* 261 *4.2.5.1. Void 262 * 263 * The rest of the body for a Void result is empty. It indicates that a query was 264 * successful without providing more information. 265 */ 266 private void readVoid() { assert(m_kind == Kind.void_); } 267 268 /* 269 *4.2.5.2. Rows 270 * 271 * Indicates a set of rows. The rest of body of a Rows result is: 272 * <metadata><rows_count><rows_content> 273 */ 274 private void readRowMetaData() 275 { 276 assert(m_kind == Kind.rows || m_kind == Kind.prepared); 277 auto md = MetaData(); 278 md.flags.readIntNotNULL(m_sock, m_counter); 279 md.columns_count.readIntNotNULL(m_sock, m_counter); 280 if (md.flags & MetaData.GLOBAL_TABLES_SPEC) { 281 md.global_table_spec[0] = readShortString(m_sock, m_counter); 282 md.global_table_spec[1] = readShortString(m_sock, m_counter); 283 } 284 md.column_specs = readColumnSpecifications(md.flags & MetaData.GLOBAL_TABLES_SPEC, md.columns_count); 285 log("got spec: %s", md); 286 m_metadata = md; 287 } 288 289 /* - <col_spec_i> specifies the columns returned in the query. There is 290 * <column_count> such column specification that are composed of: 291 * (<ksname><tablename>)?<column_name><type> 292 * The initial <ksname> and <tablename> are two [string] are only present 293 * if the Global_tables_spec flag is not set. The <column_name> is a 294 * [string] and <type> is an [option] that correspond to the column name 295 * and type. The option for <type> is either a native type (see below), 296 * in which case the option has no value, or a 'custom' type, in which 297 * case the value is a [string] representing the full qualified class 298 * name of the type represented. Valid option ids are: 299 * 0x0000 Custom: the value is a [string], see above. 300 * 0x0001 Ascii 301 * 0x0002 Bigint 302 * 0x0003 Blob 303 * 0x0004 Boolean 304 * 0x0005 Counter 305 * 0x0006 Decimal 306 * 0x0007 Double 307 * 0x0008 Float 308 * 0x0009 Int 309 * 0x000A Text 310 * 0x000B Timestamp 311 * 0x000C Uuid 312 * 0x000D Varchar 313 * 0x000E Varint 314 * 0x000F Timeuuid 315 * 0x0010 Inet 316 * 0x0020 List: the value is an [option], representing the type 317 * of the elements of the list. 318 * 0x0021 Map: the value is two [option], representing the types of the 319 * keys and values of the map 320 * 0x0022 Set: the value is an [option], representing the type 321 * of the elements of the set 322 */ 323 private Option* readOption() 324 { 325 auto ret = new Option(); 326 ret.id = cast(Option.Type)readShort(m_sock, m_counter); 327 final switch (ret.id) { 328 case Option.Type.custom: ret.string_value = readShortString(m_sock, m_counter); break; 329 case Option.Type.ascii: break; 330 case Option.Type.bigInt: break; 331 case Option.Type.blob: break; 332 case Option.Type.boolean: break; 333 case Option.Type.counter: break; 334 case Option.Type.decimal: break; 335 case Option.Type.double_: break; 336 case Option.Type.float_: break; 337 case Option.Type.int_: break; 338 case Option.Type.text: break; 339 case Option.Type.timestamp: break; 340 case Option.Type.uuid: break; 341 case Option.Type.varChar: break; 342 case Option.Type.varInt: break; 343 case Option.Type.timeUUID: break; 344 case Option.Type.inet: break; 345 case Option.Type.list: ret.option_value = readOption(); break; 346 case Option.Type.map: 347 ret.key_values_option_value[0] = readOption(); 348 ret.key_values_option_value[1] = readOption(); 349 break; 350 case Option.Type.set: ret.option_value = readOption(); break; 351 } 352 return ret; 353 } 354 355 struct ColumnSpecification { 356 string ksname; 357 string tablename; 358 359 string name; 360 Option type; 361 } 362 363 string toString() const { return format("Result(%s)", m_kind); } 364 365 private auto readColumnSpecification(bool hasGlobalTablesSpec) { 366 ColumnSpecification ret; 367 if (!hasGlobalTablesSpec) { 368 ret.ksname = readShortString(m_sock, m_counter); 369 ret.tablename = readShortString(m_sock, m_counter); 370 } 371 ret.name = readShortString(m_sock, m_counter); 372 ret.type = *readOption(); 373 return ret; 374 } 375 private auto readColumnSpecifications(bool hasGlobalTablesSpec, int column_count) { 376 ColumnSpecification[] ret; 377 for (int i=0; i<column_count; i++) { 378 ret ~= readColumnSpecification(hasGlobalTablesSpec); 379 } 380 return ret; 381 } 382 383 /** - <rows_count> is an [int] representing the number of rows present in this 384 * result. Those rows are serialized in the <rows_content> part. 385 * - <rows_content> is composed of <row_1>...<row_m> where m is <rows_count>. 386 * Each <row_i> is composed of <value_1>...<value_n> where n is 387 * <columns_count> and where <value_j> is a [bytes] representing the value 388 * returned for the jth column of the ith row. In other words, <rows_content> 389 * is composed of (<rows_count> * <columns_count>) [bytes]. 390 */ 391 private auto readRowContent() 392 { 393 ubyte[][] ret; 394 foreach (i; 0 .. m_metadata.columns_count) { 395 //log("reading index[%d], %s", i, md.column_specs[i]); 396 final switch (m_metadata.column_specs[i].type.id) { 397 case Option.Type.custom: 398 log("warning column %s has custom type", m_metadata.column_specs[i].name); 399 ret ~= readIntBytes(m_sock, m_counter); 400 break; 401 case Option.Type.counter: 402 ret ~= readIntBytes(m_sock, m_counter); 403 throw new Exception("Read Counter Type has not been checked this is what we got: "~ cast(string)ret[$-1]); 404 //break; 405 case Option.Type.decimal: 406 auto twobytes = readRawBytes(m_sock, m_counter, 2); 407 if (twobytes == [0xff,0xff]) { 408 twobytes = readRawBytes(m_sock, m_counter, 2); 409 ret ~= null; 410 break; 411 } 412 if (twobytes[0]==0x01 && twobytes[1]==0x01) { 413 ret ~= readIntBytes(m_sock, m_counter); 414 } else { 415 auto writer = appender!string(); 416 formattedWrite(writer, "%s", twobytes); 417 throw new Exception("New kind of decimal encountered"~ writer.data); 418 } 419 break; 420 case Option.Type.boolean: 421 ret ~= readRawBytes(m_sock, m_counter, int.sizeof); 422 break; 423 case Option.Type.ascii: 424 case Option.Type.bigInt: 425 case Option.Type.blob: 426 case Option.Type.double_: 427 case Option.Type.float_: 428 case Option.Type.int_: 429 case Option.Type.text: 430 case Option.Type.timestamp: 431 case Option.Type.uuid: 432 case Option.Type.varChar: 433 case Option.Type.varInt: 434 case Option.Type.timeUUID: 435 case Option.Type.inet: 436 case Option.Type.list: 437 case Option.Type.map: 438 case Option.Type.set: 439 ret ~= readIntBytes(m_sock, m_counter); 440 break; 441 } 442 } 443 return ret; 444 } 445 446 /**4.2.5.3. Set_keyspace 447 * 448 * The result to a `use` query. The body (after the kind [int]) is a single 449 * [string] indicating the name of the keyspace that has been set. 450 */ 451 private string readSet_keyspace() { 452 assert(m_kind is Kind.setKeyspace); 453 return readShortString(m_sock, m_counter); 454 } 455 456 private void readSchema_change() 457 { 458 assert(m_kind is Kind.schemaChange); 459 m_lastChange = cast(Change)readShortString(m_sock, m_counter); 460 m_currentKeyspace = readShortString(m_sock, m_counter); 461 m_currentTable = readShortString(m_sock, m_counter); 462 } 463 } 464 465 struct PreparedStatement { 466 private { 467 ubyte[] m_id; 468 Consistency m_consistency = Consistency.any; 469 } 470 471 this(ref CassandraResult result) 472 { 473 if (result.kind != CassandraResult.Kind.prepared) 474 throw new Exception("CQLProtocolException, Unknown result type for PREPARE command"); 475 476 /* 477 *4.2.5.4. Prepared 478 * 479 * The result to a PREPARE message. The rest of the body of a Prepared result is: 480 * <id><metadata> 481 * where: 482 * - <id> is [short bytes] representing the prepared query ID. 483 * - <metadata> is defined exactly as for a Rows RESULT (See section 4.2.5.2). 484 * 485 * Note that prepared query ID return is global to the node on which the query 486 * has been prepared. It can be used on any connection to that node and this 487 * until the node is restarted (after which the query must be reprepared). 488 */ 489 assert(result.kind is CassandraResult.Kind.prepared); 490 log("reading id"); 491 m_id = readShortBytes(result.m_sock, result.m_counter); 492 log("reading metadata"); 493 /*m_metadata =*/ result.readRowMetaData(); 494 log("done reading prepared stmt"); 495 } 496 497 @property const(ubyte)[] id() const { return m_id; } 498 499 @property Consistency consistency() const { return m_consistency; } 500 @property void consistency(Consistency value) { m_consistency = value; } 501 502 string toString() const { return format("PreparedStatement(%s)", id.hex); } 503 } 504 505 struct Decimal { 506 import std.bigint; 507 int exponent; // 10 ^ -exp 508 BigInt number; 509 510 string toString() const { 511 auto buf = appender!string(); 512 number.toString(str => buf.put(str), "%d"); 513 if (exponent <= 0) return buf.data; 514 else return buf.data[0 .. $-exponent] ~ "." ~ buf.data[$-exponent .. $]; 515 } 516 } 517 518 private void readBigInt(ref BigInt dst, in ubyte[] bytes) 519 { 520 auto strbuf = appender!string(); 521 strbuf.reserve(bytes.length*2 + 3); 522 if (bytes[0] < 0x80) { 523 strbuf.put("0x"); 524 foreach (b; bytes) strbuf.formattedWrite("%02X", b); 525 } else { 526 strbuf.put("-0x"); 527 foreach (b; bytes) strbuf.formattedWrite("%02X", cast(ubyte)~cast(int)b); 528 } 529 log(strbuf.data); 530 dst = BigInt(strbuf.data); 531 if (bytes[0] >= 0x80) dst--; // FIXME: this is not efficient 532 }