1 /** 2 * Pipe 3 * 4 * Copyright: 5 * (C) 1999-2007 Jack Lloyd 6 * (C) 2014-2015 Etienne Cimon 7 * 2012 Markus Wanner 8 * 9 * License: 10 * Botan is released under the Simplified BSD License (see LICENSE.md) 11 */ 12 module botan.filters.pipe; 13 14 import botan.constants; 15 import botan.filters.data_src; 16 import botan.filters.filter; 17 import botan.utils.exceptn; 18 //static if (BOTAN_HAS_PIPE_UNIXFD_IO && false) 19 // import botan.fd_unix; 20 21 import botan.filters.out_buf; 22 import botan.filters.secqueue; 23 import botan.utils.parsing; 24 import botan.utils.types; 25 import std.conv : to; 26 import std.array : Appender; 27 28 /** 29 * An opaque type that identifies a message in this Pipe 30 */ 31 alias message_id = size_t; 32 33 /** 34 * Exception if you use an invalid message as an argument to 35 * read, remaining, etc 36 */ 37 class InvalidMessageNumber : InvalidArgument 38 { 39 /** 40 * Params: 41 * where = the error occured 42 * msg = the invalid message id that was used 43 */ 44 this(in string where, message_id msg) { 45 super("Pipe:" ~ where ~ ": Invalid message number " ~ to!string(msg)); 46 } 47 } 48 49 /** 50 * This class represents pipe objects. 51 * A set of filters can be placed into a pipe, and information flows 52 * through the pipe until it reaches the end, where the output is 53 * collected for retrieval. If you're familiar with the Unix shell 54 * environment, this design will sound quite familiar. 55 */ 56 struct Pipe 57 { 58 public: 59 60 alias message_id = size_t; 61 62 /** 63 * A meta-id for whatever the last message is 64 */ 65 static const message_id LAST_MESSAGE = cast(message_id)(-2); 66 67 /** 68 * A meta-id for the default message (set with set_defaultMsg) 69 */ 70 static const message_id DEFAULT_MESSAGE = cast(message_id)(-1); 71 72 /** 73 * Write input to the pipe, i.e. to its first filter. 74 * 75 * Params: 76 * input = the ubyte array to write 77 * length = the length of the ubyte array in 78 */ 79 void write(const(ubyte)* input, size_t length) 80 { 81 if (!m_inside_msg) 82 throw new InvalidState("Cannot write to a Pipe while it is not processing"); 83 m_pipe_to.write(input, length); 84 } 85 86 /** 87 * Write input to the pipe, i.e. to its first filter. 88 * 89 * Params: 90 * input = the SecureVector containing the data to write 91 */ 92 void write(T, ALLOC)(auto const ref RefCounted!(Vector!(T, ALLOC), ALLOC) input) 93 { write(input.ptr, input.length); } 94 95 /** 96 * Write input to the pipe, i.e. to its first filter. 97 * 98 * Params: 99 * input = the std::vector containing the data to write 100 */ 101 void write(T, ALLOC)(auto const ref Vector!(T, ALLOC) input) 102 { write(input.ptr, input.length); } 103 104 /** 105 * Write input to the pipe, i.e. to its first filter. 106 * 107 * Params: 108 * input = the string containing the data to write 109 */ 110 void write(string input) 111 { 112 write(cast(const(ubyte)*)input.ptr, input.length); 113 } 114 115 /** 116 * Write input to the pipe, i.e. to its first filter. 117 * 118 * Params: 119 * source = the DataSource to read the data from 120 */ 121 void write(DataSource source) 122 { 123 SecureVector!ubyte buffer = SecureVector!ubyte(DEFAULT_BUFFERSIZE); 124 while (!source.endOfData()) 125 { 126 size_t got = source.read(buffer.ptr, buffer.length); 127 write(buffer.ptr, got); 128 } 129 } 130 131 /** 132 * Write input to the pipe, i.e. to its first filter. 133 * 134 * Params: 135 * input = a single ubyte to be written 136 */ 137 void write(ubyte input) 138 { 139 write(&input, 1); 140 } 141 142 /** 143 * Write input to the pipe, i.e. to its first filter. 144 * 145 * Params: 146 * input = a ubyte array to be written 147 */ 148 void write(ubyte[] input) 149 { 150 write(cast(const(ubyte)*)input.ptr, input.length); 151 } 152 153 /** 154 * Perform startMsg(), write() and endMsg() sequentially. 155 * 156 * Params: 157 * input = the ubyte array containing the data to write 158 * length = the length of the ubyte array to write 159 */ 160 void processMsg(const(ubyte)* input, size_t length) 161 { 162 startMsg(); 163 write(input, length); 164 endMsg(); 165 } 166 167 /** 168 * Perform startMsg(), write() and endMsg() sequentially. 169 * 170 * Params: 171 * input = the SecureVector containing the data to write 172 */ 173 void processMsg(ALLOC)(auto const ref Vector!(ubyte, ALLOC) input) 174 { 175 processMsg(input.ptr, input.length); 176 } 177 178 /** 179 * Perform startMsg(), write() and endMsg() sequentially. 180 * 181 * Params: 182 * input = the SecureVector containing the data to write 183 */ 184 void processMsg(ALLOC)(auto const ref RefCounted!(Vector!(ubyte, ALLOC), ALLOC) input) 185 { 186 processMsg(input.ptr, input.length); 187 } 188 189 /** 190 * Perform startMsg(), write() and endMsg() sequentially. 191 * 192 * Params: 193 * input = the string containing the data to write 194 */ 195 void processMsg(string input) 196 { 197 processMsg(cast(const(ubyte)*)(input.ptr), input.length); 198 } 199 200 /** 201 * Perform startMsg(), write() and endMsg() sequentially. 202 * 203 * Params: 204 * input = the DataSource providing the data to write 205 */ 206 void processMsg(DataSource input) 207 { 208 startMsg(); 209 write(input); 210 endMsg(); 211 } 212 213 /** 214 * Find out how many bytes are ready to read. 215 * 216 * Params: 217 * msg = the number identifying the message 218 * for which the information is desired 219 * Returns: number of bytes that can still be read 220 */ 221 size_t remaining(message_id msg = DEFAULT_MESSAGE) const 222 { 223 return m_outputs.remaining(getMessageNo("remaining", msg)); 224 } 225 226 /** 227 * Read the default message from the pipe. Moves the internal 228 * offset so that every call to read will return a new portion of 229 * the message. 230 * 231 * Params: 232 * output = the ubyte array to write the read bytes to 233 * length = the length of the ubyte array output 234 * Returns: number of bytes actually read into output 235 */ 236 size_t read(ubyte* output, size_t length) 237 { 238 return read(output, length, DEFAULT_MESSAGE); 239 } 240 241 /** 242 * Read a specified message from the pipe. Moves the internal 243 * offset so that every call to read will return a new portion of 244 * the message. 245 * 246 * Params: 247 * output = the ubyte array to write the read bytes to 248 * length = the length of the ubyte array output 249 * msg = the number identifying the message to read from 250 * Returns: number of bytes actually read into output 251 */ 252 size_t read(ubyte* output, size_t length, message_id msg) 253 { 254 return m_outputs.read(output, length, getMessageNo("read", msg)); 255 } 256 257 /** 258 * Read a specified message from the pipe. Moves the internal 259 * offset so that every call to read will return a new portion of 260 * the message. 261 * 262 * Params: 263 * output = the ubyte array to write the read bytes to 264 * msg = the number identifying the message to read from 265 * Returns: number of bytes actually read into output 266 */ 267 size_t read(ref ubyte[] output, message_id msg = DEFAULT_MESSAGE) 268 { 269 return m_outputs.read(output.ptr, output.length, getMessageNo("read", msg)); 270 } 271 272 /** 273 * Read a single ubyte from the pipe. Moves the internal offset so 274 * that every call to read will return a new portion of the 275 * message. 276 * 277 * Params: 278 * output = the ubyte to write the result to 279 * msg = the message to read from 280 * Returns: number of bytes actually read into output 281 */ 282 size_t read(ref ubyte output, message_id msg = DEFAULT_MESSAGE) 283 { 284 return read(&output, 1, msg); 285 } 286 287 /** 288 * Read the full contents of the pipe. 289 * 290 * Params: 291 * msg = the number identifying the message to read from 292 * Returns: SecureVector holding the contents of the pipe 293 */ 294 SecureVector!ubyte readAll(message_id msg = DEFAULT_MESSAGE) 295 { 296 msg = ((msg != DEFAULT_MESSAGE) ? msg : defaultMsg()); 297 SecureArray!ubyte buffer = SecureVector!ubyte(remaining(msg)); 298 size_t got = read(buffer.ptr, buffer.length, msg); 299 buffer.resize(got); 300 return buffer.move(); 301 } 302 303 /** 304 * Read the full contents of the pipe. 305 * 306 * Params: 307 * msg = the number identifying the message to read from 308 * Returns: string holding the contents of the pipe 309 */ 310 string toString(message_id msg = DEFAULT_MESSAGE) 311 { 312 msg = ((msg != DEFAULT_MESSAGE) ? msg : defaultMsg()); 313 SecureVector!ubyte buffer = SecureVector!ubyte(DEFAULT_BUFFERSIZE); 314 Appender!string str; 315 str.reserve(remaining(msg)); 316 317 while (true) 318 { 319 size_t got = read(buffer.ptr, buffer.length, msg); 320 if (got == 0) 321 break; 322 str ~= buffer.ptr[0 .. got]; 323 } 324 325 return str.data; 326 } 327 328 /** Read from the default message but do not modify the internal 329 * offset. Consecutive calls to peek() will return portions of 330 * the message starting at the same position. 331 * 332 * Params: 333 * output = the ubyte array to write the peeked message part to 334 * length = the length of the ubyte array output 335 * offset = the offset from the current position in message 336 * msg = the number identifying the message to peek from 337 * Returns: number of bytes actually peeked and written into output 338 */ 339 size_t peek(ubyte* output, size_t length, size_t offset, message_id msg = DEFAULT_MESSAGE) const 340 { 341 return m_outputs.peek(output, length, offset, getMessageNo("peek", msg)); 342 } 343 344 /** Read from the specified message but do not modify the 345 * internal offset. Consecutive calls to peek() will return 346 * portions of the message starting at the same position. 347 * 348 * Params: 349 * output = the ubyte array to write the peeked message part to 350 * offset = the offset from the current position in message 351 * msg = the number identifying the message to peek from 352 * Returns: number of bytes actually peeked and written into output 353 */ 354 size_t peek(ref ubyte[] output, size_t offset, message_id msg = DEFAULT_MESSAGE) const 355 { 356 return peek(output.ptr, output.length, offset, DEFAULT_MESSAGE); 357 } 358 359 /** Read a single ubyte from the specified message but do not 360 * modify the internal offset. Consecutive calls to peek() will 361 * return portions of the message starting at the same position. 362 * 363 * Params: 364 * output = the ubyte to write the peeked message ubyte to 365 * offset = the offset from the current position in message 366 * msg = the number identifying the message to peek from 367 * Returns: number of bytes actually peeked and written into output 368 */ 369 size_t peek(ref ubyte output, size_t offset, message_id msg = DEFAULT_MESSAGE) const 370 { 371 return peek(&output, 1, offset, msg); 372 } 373 374 /** 375 * Read one ubyte. 376 * 377 * Params: 378 * output = the ubyte to read to 379 * Returns: length in bytes that was actually read and put 380 * into out 381 */ 382 size_t readByte(ref ubyte output) 383 { 384 return read(&output, 1); 385 } 386 387 388 /** 389 * Peek at one ubyte. 390 * 391 * Params: 392 * output = an output ubyte 393 * Returns: length in bytes that was actually read and put 394 * into out 395 */ 396 size_t peekByte(ref ubyte output) const 397 { 398 return peek(&output, 1, 0); 399 } 400 401 402 /** 403 * Discard the next N bytes of the data 404 * Params: 405 * n = the number of bytes to discard 406 * Returns: number of bytes actually discarded 407 */ 408 size_t discardNext(size_t n) 409 { 410 size_t discarded = 0; 411 ubyte dummy; 412 foreach (size_t j; 0 .. n) 413 discarded += readByte(dummy); 414 return discarded; 415 } 416 417 /** 418 * Returns: the number of bytes read from the default message. 419 */ 420 size_t getBytesRead() const 421 { 422 return m_outputs.getBytesRead(DEFAULT_MESSAGE); 423 } 424 425 /** 426 * Returns: the number of bytes read from the specified message. 427 */ 428 size_t getBytesRead(message_id msg = DEFAULT_MESSAGE) const 429 { 430 return m_outputs.getBytesRead(msg); 431 } 432 433 /** 434 * Returns: currently set default message 435 */ 436 size_t defaultMsg() const { return m_default_read; } 437 438 /** 439 * Set the default message 440 * Params: 441 * msg = the number identifying the message which is going to 442 * be the new default message 443 */ 444 void setDefaultMsg(message_id msg) 445 { 446 if (msg >= messageCount()) 447 throw new InvalidArgument("Pipe::setDefaultMsg: msg number is too high"); 448 m_default_read = msg; 449 } 450 451 /** 452 * Get the number of messages the are in this pipe. 453 * Returns: number of messages the are in this pipe 454 */ 455 message_id messageCount() const 456 { 457 return m_outputs.messageCount(); 458 } 459 460 461 /** 462 * Test whether this pipe has any data that can be read from. 463 * Returns: true if there is more data to read, false otherwise 464 */ 465 bool endOfData() const 466 { 467 return (remaining() == 0); 468 } 469 470 /** 471 * Start a new message in the pipe. A potential other message in this pipe 472 * must be closed with endMsg() before this function may be called. 473 */ 474 void startMsg() 475 { 476 if (m_inside_msg) 477 throw new InvalidState("Pipe::startMsg: Message was already started"); 478 if (!m_pipe_to) 479 m_pipe_to = new NullFilter; 480 findEndpoints(m_pipe_to); 481 m_pipe_to.newMsg(); 482 m_inside_msg = true; 483 } 484 485 /** 486 * End the current message. 487 */ 488 void endMsg() 489 { 490 if (!m_inside_msg) 491 throw new InvalidState("Pipe::endMsg: Message was already ended"); 492 m_pipe_to.finishMsg(); 493 clearEndpoints(m_pipe_to); 494 if (cast(NullFilter)(m_pipe_to)) 495 { 496 destroy(m_pipe_to); 497 m_pipe_to = null; 498 } 499 m_inside_msg = false; 500 501 m_outputs.retire(); 502 } 503 504 /** 505 * Insert a new filter at the front of the pipe 506 * Params: 507 * filter = the new filter to insert 508 */ 509 void prepend(Filter filter) 510 { 511 if (m_inside_msg) 512 throw new InvalidState("Cannot prepend to a Pipe while it is processing"); 513 if (!filter) 514 return; 515 if (cast(SecureQueue)(filter)) 516 throw new InvalidArgument("Pipe::prepend: SecureQueue cannot be used"); 517 if (filter.m_owned) 518 throw new InvalidArgument("Filters cannot be shared among multiple Pipes"); 519 520 filter.m_owned = true; 521 522 if (m_pipe_to) filter.attach(m_pipe_to); 523 m_pipe_to = filter; 524 } 525 526 /** 527 * Insert a new filter at the back of the pipe 528 * Params: 529 * filter = the new filter to insert 530 */ 531 void append(Filter filter) 532 { 533 if (m_inside_msg) 534 throw new InvalidState("Cannot append to a Pipe while it is processing"); 535 if (!filter) 536 return; 537 if (cast(SecureQueue)(filter)) 538 throw new InvalidArgument("Pipe::append: SecureQueue cannot be used"); 539 if (filter.m_owned) 540 throw new InvalidArgument("Filters cannot be shared among multiple Pipes"); 541 542 filter.m_owned = true; 543 544 if (!m_pipe_to) m_pipe_to = filter; 545 else m_pipe_to.attach(filter); 546 } 547 548 549 /** 550 * Remove the first filter at the front of the pipe. 551 */ 552 void pop() 553 { 554 if (m_inside_msg) 555 throw new InvalidState("Cannot pop off a Pipe while it is processing"); 556 557 if (!m_pipe_to) 558 return; 559 560 if (m_pipe_to.totalPorts() > 1) 561 throw new InvalidState("Cannot pop off a Filter with multiple ports"); 562 563 Filter f = m_pipe_to; 564 size_t owns = f.owns(); 565 m_pipe_to = m_pipe_to.m_next[0]; 566 destroy(f); 567 568 while (owns--) 569 { 570 f = m_pipe_to; 571 m_pipe_to = m_pipe_to.m_next[0]; 572 destroy(f); 573 } 574 } 575 576 577 /** 578 * Reset this pipe to an empty pipe. 579 */ 580 void reset() 581 { 582 destruct(m_pipe_to); 583 m_pipe_to = null; 584 m_inside_msg = false; 585 } 586 587 588 /** 589 * Construct a Pipe of up to four filters. The filters are set up 590 * in the same order as the arguments. 591 */ 592 this(Filter f1, Filter f2 = null, Filter f3 = null, Filter f4 = null) 593 { 594 init(); 595 append(f1); 596 append(f2); 597 append(f3); 598 append(f4); 599 } 600 601 /** 602 * Construct a Pipe from a list of filters 603 * Params: 604 * filters = the set of filters to use 605 */ 606 this(Filter[] filters) 607 { 608 init(); 609 610 foreach (filter; filters) 611 append(filter); 612 } 613 614 ~this() 615 { 616 destruct(m_pipe_to); 617 } 618 619 private: 620 /* 621 * Initialize the Pipe 622 */ 623 void init() 624 { 625 m_pipe_to = null; 626 m_default_read = 0; 627 m_inside_msg = false; 628 } 629 630 /* 631 * Destroy the Pipe 632 */ 633 void destruct(Filter to_kill) 634 { 635 if (!to_kill || cast(SecureQueue)(to_kill)) 636 return; 637 for (size_t j = 0; j != to_kill.totalPorts(); ++j) 638 if (to_kill.m_next[j]) destruct(to_kill.m_next[j]); 639 destroy(to_kill); 640 } 641 642 /* 643 * Find the endpoints of the Pipe 644 */ 645 void findEndpoints(Filter f) 646 { 647 for (size_t j = 0; j != f.totalPorts(); ++j) 648 if (f.m_next[j] && !cast(SecureQueue)(f.m_next[j])) 649 findEndpoints(f.m_next[j]); 650 else 651 { 652 SecureQueue q = new SecureQueue; 653 f.m_next[j] = q; 654 m_outputs.add(q); 655 } 656 } 657 658 /* 659 * Remove the SecureQueues attached to the Filter 660 */ 661 void clearEndpoints(Filter f) 662 { 663 if (!f) return; 664 for (size_t j = 0; j != f.totalPorts(); ++j) 665 { 666 if (f.m_next[j] && cast(SecureQueue)(f.m_next[j])) 667 f.m_next[j] = null; 668 clearEndpoints(f.m_next[j]); 669 } 670 } 671 672 /* 673 * Look up the canonical ID for a queue 674 */ 675 message_id getMessageNo(in string func_name, 676 message_id msg) const 677 { 678 if (msg == DEFAULT_MESSAGE) 679 msg = defaultMsg(); 680 else if (msg == LAST_MESSAGE) 681 msg = messageCount() - 1; 682 683 if (msg >= messageCount()) 684 throw new InvalidMessageNumber(func_name, msg); 685 686 return msg; 687 } 688 689 Filter m_pipe_to; 690 OutputBuffers m_outputs; 691 message_id m_default_read; 692 bool m_inside_msg; 693 694 } 695 696 /* 697 * A Filter that does nothing 698 */ 699 final class NullFilter : Filter, Filterable 700 { 701 public: 702 override void write(const(ubyte)* input, size_t length) 703 { send(input, length); } 704 705 override @property string name() const { return "Null"; } 706 707 // Interface fallthrough 708 override bool attachable() { return super.attachable(); } 709 override void startMsg() { super.startMsg(); } 710 override void endMsg() { super.endMsg(); } 711 override void setNext(Filter* filters, size_t sz) { super.setNext(filters, sz); } 712 }