1 /** 2 * Pipe 3 * 4 * Copyright: 5 * (C) 1999-2007 Jack Lloyd 6 * (C) 2014-2015 Etienne Cimon 7 * 2012 Markus Wanner 8 * 9 * License: 10 * Botan is released under the Simplified BSD License (see LICENSE.md) 11 */ 12 module botan.filters.pipe; 13 14 import botan.constants; 15 import botan.filters.data_src; 16 import botan.filters.filter; 17 import botan.utils.exceptn; 18 //static if (BOTAN_HAS_PIPE_UNIXFD_IO && false) 19 // import botan.fd_unix; 20 21 import botan.filters.out_buf; 22 import botan.filters.secqueue; 23 import botan.utils.parsing; 24 import botan.utils.types; 25 import std.conv : to; 26 import std.array : Appender; 27 28 29 /** 30 * This class represents pipe objects. 31 * A set of filters can be placed into a pipe, and information flows 32 * through the pipe until it reaches the end, where the output is 33 * collected for retrieval. If you're familiar with the Unix shell 34 * environment, this design will sound quite familiar. 35 */ 36 struct Pipe 37 { 38 public: 39 40 /** 41 * An opaque type that identifies a message in this Pipe 42 */ 43 alias message_id = size_t; 44 45 /** 46 * Exception if you use an invalid message as an argument to 47 * read, remaining, etc 48 */ 49 class InvalidMessageNumber : InvalidArgument 50 { 51 /** 52 * Params: 53 * where = the error occured 54 * msg = the invalid message id that was used 55 */ 56 this(in string where, message_id msg) { 57 super("Pipe:" ~ where ~ ": Invalid message number " ~ to!string(msg)); 58 } 59 } 60 61 /** 62 * A meta-id for whatever the last message is 63 */ 64 static const message_id LAST_MESSAGE = cast(message_id)(-2); 65 66 /** 67 * A meta-id for the default message (set with set_defaultMsg) 68 */ 69 static const message_id DEFAULT_MESSAGE = cast(message_id)(-1); 70 71 /** 72 * Write input to the pipe, i.e. to its first filter. 73 * 74 * Params: 75 * input = the ubyte array to write 76 * length = the length of the ubyte array in 77 */ 78 void write(const(ubyte)* input, size_t length) 79 { 80 if (!m_inside_msg) 81 throw new InvalidState("Cannot write to a Pipe while it is not processing"); 82 m_pipe_to.write(input, length); 83 } 84 85 /** 86 * Write input to the pipe, i.e. to its first filter. 87 * 88 * Params: 89 * input = the SecureVector containing the data to write 90 */ 91 void write(T, ALLOC)(auto const ref RefCounted!(Vector!(T, ALLOC), ALLOC) input) 92 { write(input.ptr, input.length); } 93 94 /** 95 * Write input to the pipe, i.e. to its first filter. 96 * 97 * Params: 98 * input = the std::vector containing the data to write 99 */ 100 void write(T, ALLOC)(auto const ref Vector!(T, ALLOC) input) 101 { write(input.ptr, input.length); } 102 103 /** 104 * Write input to the pipe, i.e. to its first filter. 105 * 106 * Params: 107 * input = the string containing the data to write 108 */ 109 void write(string input) 110 { 111 write(cast(const(ubyte)*)input.ptr, input.length); 112 } 113 114 /** 115 * Write input to the pipe, i.e. to its first filter. 116 * 117 * Params: 118 * source = the DataSource to read the data from 119 */ 120 void write(DataSource source) 121 { 122 SecureVector!ubyte buffer = SecureVector!ubyte(DEFAULT_BUFFERSIZE); 123 while (!source.endOfData()) 124 { 125 size_t got = source.read(buffer.ptr, buffer.length); 126 write(buffer.ptr, got); 127 } 128 } 129 130 /** 131 * Write input to the pipe, i.e. to its first filter. 132 * 133 * Params: 134 * input = a single ubyte to be written 135 */ 136 void write(ubyte input) 137 { 138 write(&input, 1); 139 } 140 141 /** 142 * Write input to the pipe, i.e. to its first filter. 143 * 144 * Params: 145 * input = a ubyte array to be written 146 */ 147 void write(ubyte[] input) 148 { 149 write(cast(const(ubyte)*)input.ptr, input.length); 150 } 151 152 /** 153 * Perform startMsg(), write() and endMsg() sequentially. 154 * 155 * Params: 156 * input = the ubyte array containing the data to write 157 * length = the length of the ubyte array to write 158 */ 159 void processMsg(const(ubyte)* input, size_t length) 160 { 161 startMsg(); 162 write(input, length); 163 endMsg(); 164 } 165 166 /** 167 * Perform startMsg(), write() and endMsg() sequentially. 168 * 169 * Params: 170 * input = the SecureVector containing the data to write 171 */ 172 void processMsg(ALLOC)(auto const ref Vector!(ubyte, ALLOC) input) 173 { 174 processMsg(input.ptr, input.length); 175 } 176 177 /** 178 * Perform startMsg(), write() and endMsg() sequentially. 179 * 180 * Params: 181 * input = the SecureVector containing the data to write 182 */ 183 void processMsg(ALLOC)(auto const ref RefCounted!(Vector!(ubyte, ALLOC), ALLOC) input) 184 { 185 processMsg(input.ptr, input.length); 186 } 187 188 /** 189 * Perform startMsg(), write() and endMsg() sequentially. 190 * 191 * Params: 192 * input = the string containing the data to write 193 */ 194 void processMsg(string input) 195 { 196 processMsg(cast(const(ubyte)*)(input.ptr), input.length); 197 } 198 199 /** 200 * Perform startMsg(), write() and endMsg() sequentially. 201 * 202 * Params: 203 * input = the DataSource providing the data to write 204 */ 205 void processMsg(DataSource input) 206 { 207 startMsg(); 208 write(input); 209 endMsg(); 210 } 211 212 /** 213 * Find out how many bytes are ready to read. 214 * 215 * Params: 216 * msg = the number identifying the message 217 * for which the information is desired 218 * Returns: number of bytes that can still be read 219 */ 220 size_t remaining(message_id msg = DEFAULT_MESSAGE) const 221 { 222 return m_outputs.remaining(getMessageNo("remaining", msg)); 223 } 224 225 /** 226 * Read the default message from the pipe. Moves the internal 227 * offset so that every call to read will return a new portion of 228 * the message. 229 * 230 * Params: 231 * output = the ubyte array to write the read bytes to 232 * length = the length of the ubyte array output 233 * Returns: number of bytes actually read into output 234 */ 235 size_t read(ubyte* output, size_t length) 236 { 237 return read(output, length, DEFAULT_MESSAGE); 238 } 239 240 /** 241 * Read a specified message from the pipe. Moves the internal 242 * offset so that every call to read will return a new portion of 243 * the message. 244 * 245 * Params: 246 * output = the ubyte array to write the read bytes to 247 * length = the length of the ubyte array output 248 * msg = the number identifying the message to read from 249 * Returns: number of bytes actually read into output 250 */ 251 size_t read(ubyte* output, size_t length, message_id msg) 252 { 253 return m_outputs.read(output, length, getMessageNo("read", msg)); 254 } 255 256 /** 257 * Read a specified message from the pipe. Moves the internal 258 * offset so that every call to read will return a new portion of 259 * the message. 260 * 261 * Params: 262 * output = the ubyte array to write the read bytes to 263 * msg = the number identifying the message to read from 264 * Returns: number of bytes actually read into output 265 */ 266 size_t read(ref ubyte[] output, message_id msg = DEFAULT_MESSAGE) 267 { 268 return m_outputs.read(output.ptr, output.length, getMessageNo("read", msg)); 269 } 270 271 /** 272 * Read a single ubyte from the pipe. Moves the internal offset so 273 * that every call to read will return a new portion of the 274 * message. 275 * 276 * Params: 277 * output = the ubyte to write the result to 278 * msg = the message to read from 279 * Returns: number of bytes actually read into output 280 */ 281 size_t read(ref ubyte output, message_id msg = DEFAULT_MESSAGE) 282 { 283 return read(&output, 1, msg); 284 } 285 286 /** 287 * Read the full contents of the pipe. 288 * 289 * Params: 290 * msg = the number identifying the message to read from 291 * Returns: SecureVector holding the contents of the pipe 292 */ 293 SecureVector!ubyte readAll(message_id msg = DEFAULT_MESSAGE) 294 { 295 msg = ((msg != DEFAULT_MESSAGE) ? msg : defaultMsg()); 296 SecureArray!ubyte buffer = SecureVector!ubyte(remaining(msg)); 297 size_t got = read(buffer.ptr, buffer.length, msg); 298 buffer.resize(got); 299 return buffer.move(); 300 } 301 302 /** 303 * Read the full contents of the pipe. 304 * 305 * Params: 306 * msg = the number identifying the message to read from 307 * Returns: string holding the contents of the pipe 308 */ 309 string toString(message_id msg = DEFAULT_MESSAGE) 310 { 311 msg = ((msg != DEFAULT_MESSAGE) ? msg : defaultMsg()); 312 SecureVector!ubyte buffer = SecureVector!ubyte(DEFAULT_BUFFERSIZE); 313 Appender!string str; 314 str.reserve(remaining(msg)); 315 316 while (true) 317 { 318 size_t got = read(buffer.ptr, buffer.length, msg); 319 if (got == 0) 320 break; 321 str ~= buffer.ptr[0 .. got]; 322 } 323 324 return str.data; 325 } 326 327 /** Read from the default message but do not modify the internal 328 * offset. Consecutive calls to peek() will return portions of 329 * the message starting at the same position. 330 * 331 * Params: 332 * output = the ubyte array to write the peeked message part to 333 * length = the length of the ubyte array output 334 * offset = the offset from the current position in message 335 * msg = the number identifying the message to peek from 336 * Returns: number of bytes actually peeked and written into output 337 */ 338 size_t peek(ubyte* output, size_t length, size_t offset, message_id msg = DEFAULT_MESSAGE) const 339 { 340 return m_outputs.peek(output, length, offset, getMessageNo("peek", msg)); 341 } 342 343 /** Read from the specified message but do not modify the 344 * internal offset. Consecutive calls to peek() will return 345 * portions of the message starting at the same position. 346 * 347 * Params: 348 * output = the ubyte array to write the peeked message part to 349 * offset = the offset from the current position in message 350 * msg = the number identifying the message to peek from 351 * Returns: number of bytes actually peeked and written into output 352 */ 353 size_t peek(ref ubyte[] output, size_t offset, message_id msg = DEFAULT_MESSAGE) const 354 { 355 return peek(output.ptr, output.length, offset, DEFAULT_MESSAGE); 356 } 357 358 /** Read a single ubyte from the specified message but do not 359 * modify the internal offset. Consecutive calls to peek() will 360 * return portions of the message starting at the same position. 361 * 362 * Params: 363 * output = the ubyte to write the peeked message ubyte to 364 * offset = the offset from the current position in message 365 * msg = the number identifying the message to peek from 366 * Returns: number of bytes actually peeked and written into output 367 */ 368 size_t peek(ref ubyte output, size_t offset, message_id msg = DEFAULT_MESSAGE) const 369 { 370 return peek(&output, 1, offset, msg); 371 } 372 373 /** 374 * Read one ubyte. 375 * 376 * Params: 377 * output = the ubyte to read to 378 * Returns: length in bytes that was actually read and put 379 * into out 380 */ 381 size_t readByte(ref ubyte output) 382 { 383 return read(&output, 1); 384 } 385 386 387 /** 388 * Peek at one ubyte. 389 * 390 * Params: 391 * output = an output ubyte 392 * Returns: length in bytes that was actually read and put 393 * into out 394 */ 395 size_t peekByte(ref ubyte output) const 396 { 397 return peek(&output, 1, 0); 398 } 399 400 401 /** 402 * Discard the next N bytes of the data 403 * Params: 404 * n = the number of bytes to discard 405 * Returns: number of bytes actually discarded 406 */ 407 size_t discardNext(size_t n) 408 { 409 size_t discarded = 0; 410 ubyte dummy; 411 foreach (size_t j; 0 .. n) 412 discarded += readByte(dummy); 413 return discarded; 414 } 415 416 /** 417 * Returns: the number of bytes read from the default message. 418 */ 419 size_t getBytesRead() const 420 { 421 return m_outputs.getBytesRead(DEFAULT_MESSAGE); 422 } 423 424 /** 425 * Returns: the number of bytes read from the specified message. 426 */ 427 size_t getBytesRead(message_id msg = DEFAULT_MESSAGE) const 428 { 429 return m_outputs.getBytesRead(msg); 430 } 431 432 /** 433 * Returns: currently set default message 434 */ 435 size_t defaultMsg() const { return m_default_read; } 436 437 /** 438 * Set the default message 439 * Params: 440 * msg = the number identifying the message which is going to 441 * be the new default message 442 */ 443 void setDefaultMsg(message_id msg) 444 { 445 if (msg >= messageCount()) 446 throw new InvalidArgument("Pipe::setDefaultMsg: msg number is too high"); 447 m_default_read = msg; 448 } 449 450 /** 451 * Get the number of messages the are in this pipe. 452 * Returns: number of messages the are in this pipe 453 */ 454 message_id messageCount() const 455 { 456 return m_outputs.messageCount(); 457 } 458 459 460 /** 461 * Test whether this pipe has any data that can be read from. 462 * Returns: true if there is more data to read, false otherwise 463 */ 464 bool endOfData() const 465 { 466 return (remaining() == 0); 467 } 468 469 /** 470 * Start a new message in the pipe. A potential other message in this pipe 471 * must be closed with endMsg() before this function may be called. 472 */ 473 void startMsg() 474 { 475 if (m_inside_msg) 476 throw new InvalidState("Pipe::startMsg: Message was already started"); 477 if (!m_pipe_to) 478 m_pipe_to = new NullFilter; 479 findEndpoints(m_pipe_to); 480 m_pipe_to.newMsg(); 481 m_inside_msg = true; 482 } 483 484 /** 485 * End the current message. 486 */ 487 void endMsg() 488 { 489 if (!m_inside_msg) 490 throw new InvalidState("Pipe::endMsg: Message was already ended"); 491 m_pipe_to.finishMsg(); 492 clearEndpoints(m_pipe_to); 493 if (cast(NullFilter)(m_pipe_to)) 494 { 495 destroy(m_pipe_to); 496 m_pipe_to = null; 497 } 498 m_inside_msg = false; 499 500 m_outputs.retire(); 501 } 502 503 /** 504 * Insert a new filter at the front of the pipe 505 * Params: 506 * filter = the new filter to insert 507 */ 508 void prepend(Filter filter) 509 { 510 if (m_inside_msg) 511 throw new InvalidState("Cannot prepend to a Pipe while it is processing"); 512 if (!filter) 513 return; 514 if (cast(SecureQueue)(filter)) 515 throw new InvalidArgument("Pipe::prepend: SecureQueue cannot be used"); 516 if (filter.m_owned) 517 throw new InvalidArgument("Filters cannot be shared among multiple Pipes"); 518 519 filter.m_owned = true; 520 521 if (m_pipe_to) filter.attach(m_pipe_to); 522 m_pipe_to = filter; 523 } 524 525 /** 526 * Insert a new filter at the back of the pipe 527 * Params: 528 * filter = the new filter to insert 529 */ 530 void append(Filter filter) 531 { 532 if (m_inside_msg) 533 throw new InvalidState("Cannot append to a Pipe while it is processing"); 534 if (!filter) 535 return; 536 if (cast(SecureQueue)(filter)) 537 throw new InvalidArgument("Pipe::append: SecureQueue cannot be used"); 538 if (filter.m_owned) 539 throw new InvalidArgument("Filters cannot be shared among multiple Pipes"); 540 541 filter.m_owned = true; 542 543 if (!m_pipe_to) m_pipe_to = filter; 544 else m_pipe_to.attach(filter); 545 } 546 547 548 /** 549 * Remove the first filter at the front of the pipe. 550 */ 551 void pop() 552 { 553 if (m_inside_msg) 554 throw new InvalidState("Cannot pop off a Pipe while it is processing"); 555 556 if (!m_pipe_to) 557 return; 558 559 if (m_pipe_to.totalPorts() > 1) 560 throw new InvalidState("Cannot pop off a Filter with multiple ports"); 561 562 Filter f = m_pipe_to; 563 size_t owns = f.owns(); 564 m_pipe_to = m_pipe_to.m_next[0]; 565 destroy(f); 566 567 while (owns--) 568 { 569 f = m_pipe_to; 570 m_pipe_to = m_pipe_to.m_next[0]; 571 destroy(f); 572 } 573 } 574 575 576 /** 577 * Reset this pipe to an empty pipe. 578 */ 579 void reset() 580 { 581 destruct(m_pipe_to); 582 m_pipe_to = null; 583 m_inside_msg = false; 584 } 585 586 587 /** 588 * Construct a Pipe of up to four filters. The filters are set up 589 * in the same order as the arguments. 590 */ 591 this(Filter f1, Filter f2 = null, Filter f3 = null, Filter f4 = null) 592 { 593 init(); 594 append(f1); 595 append(f2); 596 append(f3); 597 append(f4); 598 } 599 600 /** 601 * Construct a Pipe from a list of filters 602 * Params: 603 * filters = the set of filters to use 604 */ 605 this(Filter[] filters) 606 { 607 init(); 608 609 foreach (filter; filters) 610 append(filter); 611 } 612 613 ~this() 614 { 615 destruct(m_pipe_to); 616 } 617 618 private: 619 /* 620 * Initialize the Pipe 621 */ 622 void init() 623 { 624 m_pipe_to = null; 625 m_default_read = 0; 626 m_inside_msg = false; 627 } 628 629 /* 630 * Destroy the Pipe 631 */ 632 void destruct(Filter to_kill) 633 { 634 if (!to_kill || cast(SecureQueue)(to_kill)) 635 return; 636 for (size_t j = 0; j != to_kill.totalPorts(); ++j) 637 if (to_kill.m_next[j]) destruct(to_kill.m_next[j]); 638 destroy(to_kill); 639 } 640 641 /* 642 * Find the endpoints of the Pipe 643 */ 644 void findEndpoints(Filter f) 645 { 646 for (size_t j = 0; j != f.totalPorts(); ++j) 647 if (f.m_next[j] && !cast(SecureQueue)(f.m_next[j])) 648 findEndpoints(f.m_next[j]); 649 else 650 { 651 SecureQueue q = new SecureQueue; 652 f.m_next[j] = q; 653 m_outputs.add(q); 654 } 655 } 656 657 /* 658 * Remove the SecureQueues attached to the Filter 659 */ 660 void clearEndpoints(Filter f) 661 { 662 if (!f) return; 663 for (size_t j = 0; j != f.totalPorts(); ++j) 664 { 665 if (f.m_next[j] && cast(SecureQueue)(f.m_next[j])) 666 f.m_next[j] = null; 667 clearEndpoints(f.m_next[j]); 668 } 669 } 670 671 /* 672 * Look up the canonical ID for a queue 673 */ 674 message_id getMessageNo(in string func_name, 675 message_id msg) const 676 { 677 if (msg == DEFAULT_MESSAGE) 678 msg = defaultMsg(); 679 else if (msg == LAST_MESSAGE) 680 msg = messageCount() - 1; 681 682 if (msg >= messageCount()) 683 throw new InvalidMessageNumber(func_name, msg); 684 685 return msg; 686 } 687 688 Filter m_pipe_to; 689 OutputBuffers m_outputs; 690 message_id m_default_read; 691 bool m_inside_msg; 692 693 } 694 695 /* 696 * A Filter that does nothing 697 */ 698 final class NullFilter : Filter, Filterable 699 { 700 public: 701 override void write(const(ubyte)* input, size_t length) 702 { send(input, length); } 703 704 override @property string name() const { return "Null"; } 705 706 // Interface fallthrough 707 override bool attachable() { return super.attachable(); } 708 override void startMsg() { super.startMsg(); } 709 override void endMsg() { super.endMsg(); } 710 override void setNext(Filter* filters, size_t sz) { super.setNext(filters, sz); } 711 }