1 /**
2 * Pipe
3 * 
4 * Copyright:
5 * (C) 1999-2007 Jack Lloyd
6 * (C) 2014-2015 Etienne Cimon
7 *     2012 Markus Wanner
8 *
9 * License:
10 * Botan is released under the Simplified BSD License (see LICENSE.md)
11 */
12 module botan.filters.pipe;
13 
14 import botan.constants;
15 import botan.filters.data_src;
16 import botan.filters.filter;
17 import botan.utils.exceptn;
18 //static if (BOTAN_HAS_PIPE_UNIXFD_IO && false)
19  //   import botan.fd_unix;
20 
21 import botan.filters.out_buf;
22 import botan.filters.secqueue;
23 import botan.utils.parsing;
24 import botan.utils.types;
25 import std.conv : to;
26 import std.array : Appender;
27 
28 
29 /**
30 * This class represents pipe objects.
31 * A set of filters can be placed into a pipe, and information flows
32 * through the pipe until it reaches the end, where the output is
33 * collected for retrieval.  If you're familiar with the Unix shell
34 * environment, this design will sound quite familiar.
35 */
36 struct Pipe
37 {
38 public:
39 
40     /**
41     * An opaque type that identifies a message in this Pipe
42     */
43     alias message_id = size_t;
44 
45     /**
46     * Exception if you use an invalid message as an argument to
47     * read, remaining, etc
48     */
49     class InvalidMessageNumber : InvalidArgument
50     {
51         /**
52         * Params:
53         *  where = the error occured
54         *  msg = the invalid message id that was used
55         */
56         this(in string where, message_id msg) {
57             super("Pipe:" ~ where ~ ": Invalid message number " ~ to!string(msg));
58         }
59     }
60 
61     /**
62     * A meta-id for whatever the last message is
63     */
64     static const message_id LAST_MESSAGE = cast(message_id)(-2);
65 
66     /**
67     * A meta-id for the default message (set with set_defaultMsg)
68     */
69     static const message_id DEFAULT_MESSAGE = cast(message_id)(-1);
70 
71     /**
72     * Write input to the pipe, i.e. to its first filter.
73     *
74     * Params:
75     *  input = the ubyte array to write
76     *  length = the length of the ubyte array in
77     */
78     void write(const(ubyte)* input, size_t length)
79     {
80         if (!m_inside_msg)
81             throw new InvalidState("Cannot write to a Pipe while it is not processing");
82         m_pipe_to.write(input, length);
83     }
84 
85     /**
86     * Write input to the pipe, i.e. to its first filter.
87     *
88     * Params:
89     *  input = the SecureVector containing the data to write
90     */
91     void write(T, ALLOC)(auto const ref RefCounted!(Vector!(T, ALLOC), ALLOC) input)
92     { write(input.ptr, input.length); }
93 
94     /**
95     * Write input to the pipe, i.e. to its first filter.
96     *
97     * Params:
98     *  input = the std::vector containing the data to write
99     */
100     void write(T, ALLOC)(auto const ref Vector!(T, ALLOC) input)
101     { write(input.ptr, input.length); }
102 
103     /**
104     * Write input to the pipe, i.e. to its first filter.
105     *
106     * Params:
107     *  input = the string containing the data to write
108     */
109     void write(string input)
110     {
111         write(cast(const(ubyte)*)input.ptr, input.length);
112     }
113 
114     /**
115     * Write input to the pipe, i.e. to its first filter.
116     *
117     * Params:
118     *  source = the DataSource to read the data from
119     */
120     void write(DataSource source)
121     {
122         SecureVector!ubyte buffer = SecureVector!ubyte(DEFAULT_BUFFERSIZE);
123         while (!source.endOfData())
124         {
125             size_t got = source.read(buffer.ptr, buffer.length);
126             write(buffer.ptr, got);
127         }
128     }
129 
130     /**
131     * Write input to the pipe, i.e. to its first filter.
132     *
133     * Params:
134     *  input = a single ubyte to be written
135     */
136     void write(ubyte input)
137     {
138         write(&input, 1);
139     }
140 
141     /**
142     * Write input to the pipe, i.e. to its first filter.
143     *
144     * Params:
145     *  input = a ubyte array to be written
146     */
147     void write(ubyte[] input)
148     {
149         write(cast(const(ubyte)*)input.ptr, input.length);
150     }
151 
152     /**
153     * Perform startMsg(), write() and endMsg() sequentially.
154     *
155     * Params:
156     *  input = the ubyte array containing the data to write
157     *  length = the length of the ubyte array to write
158     */
159     void processMsg(const(ubyte)* input, size_t length)
160     {
161         startMsg();
162         write(input, length);
163         endMsg();
164     }
165 
166     /**
167     * Perform startMsg(), write() and endMsg() sequentially.
168     *
169     * Params:
170     *  input = the SecureVector containing the data to write
171     */
172     void processMsg(ALLOC)(auto const ref Vector!(ubyte, ALLOC) input)
173     {
174         processMsg(input.ptr, input.length);
175     }
176 
177     /**
178     * Perform startMsg(), write() and endMsg() sequentially.
179     *
180     * Params:
181     *  input = the SecureVector containing the data to write
182     */
183     void processMsg(ALLOC)(auto const ref RefCounted!(Vector!(ubyte, ALLOC), ALLOC) input)
184     {
185         processMsg(input.ptr, input.length);
186     }
187 
188     /**
189     * Perform startMsg(), write() and endMsg() sequentially.
190     *
191     * Params:
192     *  input = the string containing the data to write
193     */
194     void processMsg(string input)
195     {
196         processMsg(cast(const(ubyte)*)(input.ptr), input.length);
197     }
198 
199     /**
200     * Perform startMsg(), write() and endMsg() sequentially.
201     *
202     * Params:
203     *  input = the DataSource providing the data to write
204     */
205     void processMsg(DataSource input)
206     {
207         startMsg();
208         write(input);
209         endMsg();
210     }
211 
212     /**
213     * Find out how many bytes are ready to read.
214     *
215     * Params:
216     *  msg = the number identifying the message
217     * for which the information is desired
218     * Returns: number of bytes that can still be read
219     */
220     size_t remaining(message_id msg = DEFAULT_MESSAGE) const
221     {
222         return m_outputs.remaining(getMessageNo("remaining", msg));
223     }
224 
225     /**
226     * Read the default message from the pipe. Moves the internal
227     * offset so that every call to read will return a new portion of
228     * the message.
229     *
230     * Params:
231     *  output = the ubyte array to write the read bytes to
232     *  length = the length of the ubyte array output
233     * Returns: number of bytes actually read into output
234     */
235     size_t read(ubyte* output, size_t length)
236     {
237         return read(output, length, DEFAULT_MESSAGE);
238     }
239 
240     /**
241     * Read a specified message from the pipe. Moves the internal
242     * offset so that every call to read will return a new portion of
243     * the message.
244     *
245     * Params:
246     *  output = the ubyte array to write the read bytes to
247     *  length = the length of the ubyte array output
248     *  msg = the number identifying the message to read from
249     * Returns: number of bytes actually read into output
250     */
251     size_t read(ubyte* output, size_t length, message_id msg)
252     {
253         return m_outputs.read(output, length, getMessageNo("read", msg));
254     }
255 
256     /**
257     * Read a specified message from the pipe. Moves the internal
258     * offset so that every call to read will return a new portion of
259     * the message.
260     *
261     * Params:
262     *  output = the ubyte array to write the read bytes to
263     *  msg = the number identifying the message to read from
264     * Returns: number of bytes actually read into output
265     */
266     size_t read(ref ubyte[] output, message_id msg = DEFAULT_MESSAGE)
267     {
268         return m_outputs.read(output.ptr, output.length, getMessageNo("read", msg));
269     }
270 
271     /**
272     * Read a single ubyte from the pipe. Moves the internal offset so
273     * that every call to read will return a new portion of the
274     * message.
275     *
276     * Params:
277     *  output = the ubyte to write the result to
278     *  msg = the message to read from
279     * Returns: number of bytes actually read into output
280     */
281     size_t read(ref ubyte output, message_id msg = DEFAULT_MESSAGE)
282     {
283         return read(&output, 1, msg);
284     }
285 
286     /**
287     * Read the full contents of the pipe.
288     *
289     * Params:
290     *  msg = the number identifying the message to read from
291     * Returns: SecureVector holding the contents of the pipe
292     */
293     SecureVector!ubyte readAll(message_id msg = DEFAULT_MESSAGE)
294     {
295         msg = ((msg != DEFAULT_MESSAGE) ? msg : defaultMsg());
296         SecureArray!ubyte buffer = SecureVector!ubyte(remaining(msg));
297         size_t got = read(buffer.ptr, buffer.length, msg);
298         buffer.resize(got);
299         return buffer.move();
300     }
301 
302     /**
303     * Read the full contents of the pipe.
304     *
305     * Params:
306     *  msg = the number identifying the message to read from
307     * Returns: string holding the contents of the pipe
308     */
309     string toString(message_id msg = DEFAULT_MESSAGE)
310     {
311         msg = ((msg != DEFAULT_MESSAGE) ? msg : defaultMsg());
312         SecureVector!ubyte buffer = SecureVector!ubyte(DEFAULT_BUFFERSIZE);
313         Appender!string str;
314         str.reserve(remaining(msg));
315         
316         while (true)
317         {
318             size_t got = read(buffer.ptr, buffer.length, msg);
319             if (got == 0)
320                 break;
321             str ~= buffer.ptr[0 .. got];
322         }
323         
324         return str.data;
325     }
326 
327     /** Read from the default message but do not modify the internal
328     * offset. Consecutive calls to peek() will return portions of
329     * the message starting at the same position.
330     *
331     * Params:
332     *  output = the ubyte array to write the peeked message part to
333     *  length = the length of the ubyte array output
334     *  offset = the offset from the current position in message
335     *  msg = the number identifying the message to peek from
336     * Returns: number of bytes actually peeked and written into output
337     */
338     size_t peek(ubyte* output, size_t length, size_t offset, message_id msg = DEFAULT_MESSAGE) const
339     {
340         return m_outputs.peek(output, length, offset, getMessageNo("peek", msg));
341     }
342 
343     /** Read from the specified message but do not modify the
344     * internal offset. Consecutive calls to peek() will return
345     * portions of the message starting at the same position.
346     *
347     * Params:
348     *  output = the ubyte array to write the peeked message part to
349     *  offset = the offset from the current position in message
350     *  msg = the number identifying the message to peek from
351     * Returns: number of bytes actually peeked and written into output
352     */
353     size_t peek(ref ubyte[] output, size_t offset, message_id msg = DEFAULT_MESSAGE) const
354     {
355         return peek(output.ptr, output.length, offset, DEFAULT_MESSAGE);
356     }
357 
358     /** Read a single ubyte from the specified message but do not
359     * modify the internal offset. Consecutive calls to peek() will
360     * return portions of the message starting at the same position.
361     *
362     * Params:
363     *  output = the ubyte to write the peeked message ubyte to
364     *  offset = the offset from the current position in message
365     *  msg = the number identifying the message to peek from
366     * Returns: number of bytes actually peeked and written into output
367     */
368     size_t peek(ref ubyte output, size_t offset, message_id msg = DEFAULT_MESSAGE) const
369     {
370         return peek(&output, 1, offset, msg);
371     }
372 
373     /**
374     * Read one ubyte.
375     *
376     * Params:
377     *  output = the ubyte to read to
378     * Returns: length in bytes that was actually read and put
379     * into out
380     */
381     size_t readByte(ref ubyte output)
382     {
383         return read(&output, 1);
384     }
385     
386     
387     /**
388     * Peek at one ubyte.
389     *
390     * Params:
391     *  output = an output ubyte
392     * Returns: length in bytes that was actually read and put
393     * into out
394     */
395     size_t peekByte(ref ubyte output) const
396     {
397         return peek(&output, 1, 0);
398     }
399     
400     
401     /**
402     * Discard the next N bytes of the data
403     * Params:
404     *  n = the number of bytes to discard
405     * Returns: number of bytes actually discarded
406     */
407     size_t discardNext(size_t n)
408     {
409         size_t discarded = 0;
410         ubyte dummy;
411         foreach (size_t j; 0 .. n)
412             discarded += readByte(dummy);
413         return discarded;
414     }
415 
416     /**
417     * Returns: the number of bytes read from the default message.
418     */
419     size_t getBytesRead() const
420     {
421         return m_outputs.getBytesRead(DEFAULT_MESSAGE);
422     }
423 
424     /**
425     * Returns: the number of bytes read from the specified message.
426     */
427     size_t getBytesRead(message_id msg = DEFAULT_MESSAGE) const
428     {
429         return m_outputs.getBytesRead(msg);
430     }
431 
432     /**
433     * Returns: currently set default message
434     */
435     size_t defaultMsg() const { return m_default_read; }
436 
437     /**
438     * Set the default message
439     * Params:
440     *  msg = the number identifying the message which is going to
441     * be the new default message
442     */
443     void setDefaultMsg(message_id msg)
444     {
445         if (msg >= messageCount())
446             throw new InvalidArgument("Pipe::setDefaultMsg: msg number is too high");
447         m_default_read = msg;
448     }
449 
450     /**
451     * Get the number of messages the are in this pipe.
452     * Returns: number of messages the are in this pipe
453     */
454     message_id messageCount() const
455     {
456         return m_outputs.messageCount();
457     }
458 
459 
460     /**
461     * Test whether this pipe has any data that can be read from.
462     * Returns: true if there is more data to read, false otherwise
463     */
464     bool endOfData() const
465     {
466         return (remaining() == 0);
467     }
468 
469     /**
470     * Start a new message in the pipe. A potential other message in this pipe
471     * must be closed with endMsg() before this function may be called.
472     */
473     void startMsg()
474     {
475         if (m_inside_msg)
476             throw new InvalidState("Pipe::startMsg: Message was already started");
477         if (!m_pipe_to)
478             m_pipe_to = new NullFilter;
479         findEndpoints(m_pipe_to);
480         m_pipe_to.newMsg();
481         m_inside_msg = true;
482     }
483 
484     /**
485     * End the current message.
486     */
487     void endMsg()
488     {
489         if (!m_inside_msg)
490             throw new InvalidState("Pipe::endMsg: Message was already ended");
491         m_pipe_to.finishMsg();
492         clearEndpoints(m_pipe_to);
493         if (cast(NullFilter)(m_pipe_to))
494         {
495             destroy(m_pipe_to);
496             m_pipe_to = null;
497         }
498         m_inside_msg = false;
499         
500         m_outputs.retire();
501     }
502 
503     /**
504     * Insert a new filter at the front of the pipe
505     * Params:
506     *  filter = the new filter to insert
507     */
508     void prepend(Filter filter)
509     {
510         if (m_inside_msg)
511             throw new InvalidState("Cannot prepend to a Pipe while it is processing");
512         if (!filter)
513             return;
514         if (cast(SecureQueue)(filter))
515             throw new InvalidArgument("Pipe::prepend: SecureQueue cannot be used");
516         if (filter.m_owned)
517             throw new InvalidArgument("Filters cannot be shared among multiple Pipes");
518         
519         filter.m_owned = true;
520         
521         if (m_pipe_to) filter.attach(m_pipe_to);
522         m_pipe_to = filter;
523     }
524 
525     /**
526     * Insert a new filter at the back of the pipe
527     * Params:
528     *  filter = the new filter to insert
529     */
530     void append(Filter filter)
531     {
532         if (m_inside_msg)
533             throw new InvalidState("Cannot append to a Pipe while it is processing");
534         if (!filter)
535             return;
536         if (cast(SecureQueue)(filter))
537             throw new InvalidArgument("Pipe::append: SecureQueue cannot be used");
538         if (filter.m_owned)
539             throw new InvalidArgument("Filters cannot be shared among multiple Pipes");
540         
541         filter.m_owned = true;
542         
543         if (!m_pipe_to) m_pipe_to = filter;
544         else        m_pipe_to.attach(filter);
545     }
546 
547 
548     /**
549     * Remove the first filter at the front of the pipe.
550     */
551     void pop()
552     {
553         if (m_inside_msg)
554             throw new InvalidState("Cannot pop off a Pipe while it is processing");
555         
556         if (!m_pipe_to)
557             return;
558         
559         if (m_pipe_to.totalPorts() > 1)
560             throw new InvalidState("Cannot pop off a Filter with multiple ports");
561         
562         Filter f = m_pipe_to;
563         size_t owns = f.owns();
564         m_pipe_to = m_pipe_to.m_next[0];
565         destroy(f);
566         
567         while (owns--)
568         {
569             f = m_pipe_to;
570             m_pipe_to = m_pipe_to.m_next[0];
571             destroy(f);
572         }
573     }
574 
575 
576     /**
577     * Reset this pipe to an empty pipe.
578     */
579     void reset()
580     {
581         destruct(m_pipe_to);
582         m_pipe_to = null;
583         m_inside_msg = false;
584     }
585 
586 
587     /**
588     * Construct a Pipe of up to four filters. The filters are set up
589     * in the same order as the arguments.
590     */
591     this(Filter f1, Filter f2 = null, Filter f3 = null, Filter f4 = null)
592     {
593         init();
594         append(f1);
595         append(f2);
596         append(f3);
597         append(f4);
598     }
599 
600     /**
601     * Construct a Pipe from a list of filters
602     * Params:
603     *  filters = the set of filters to use
604     */
605     this(Filter[] filters)
606     {
607         init();
608         
609         foreach (filter; filters)
610             append(filter);
611     }
612 
613     ~this()
614     {
615         destruct(m_pipe_to);
616     }
617 
618 private:
619     /*
620     * Initialize the Pipe
621     */
622     void init()
623     {
624         m_pipe_to = null;
625         m_default_read = 0;
626         m_inside_msg = false;
627     }
628 
629     /*
630     * Destroy the Pipe
631     */
632     void destruct(Filter to_kill)
633     {
634         if (!to_kill || cast(SecureQueue)(to_kill))
635             return;
636         for (size_t j = 0; j != to_kill.totalPorts(); ++j)
637             if (to_kill.m_next[j]) destruct(to_kill.m_next[j]);
638         destroy(to_kill);
639     }
640 
641     /*
642     * Find the endpoints of the Pipe
643     */
644     void findEndpoints(Filter f)
645     {
646         for (size_t j = 0; j != f.totalPorts(); ++j)
647             if (f.m_next[j] && !cast(SecureQueue)(f.m_next[j]))
648                 findEndpoints(f.m_next[j]);
649             else
650         {
651             SecureQueue q = new SecureQueue;
652             f.m_next[j] = q;
653             m_outputs.add(q);
654         }
655     }
656 
657     /*
658     * Remove the SecureQueues attached to the Filter
659     */
660     void clearEndpoints(Filter f)
661     {
662         if (!f) return;
663         for (size_t j = 0; j != f.totalPorts(); ++j)
664         {
665             if (f.m_next[j] && cast(SecureQueue)(f.m_next[j]))
666                 f.m_next[j] = null;
667             clearEndpoints(f.m_next[j]);
668         }
669     }
670 
671     /*
672     * Look up the canonical ID for a queue
673     */
674     message_id getMessageNo(in string func_name,
675                               message_id msg) const
676     {
677         if (msg == DEFAULT_MESSAGE)
678             msg = defaultMsg();
679         else if (msg == LAST_MESSAGE)
680             msg = messageCount() - 1;
681         
682         if (msg >= messageCount())
683             throw new InvalidMessageNumber(func_name, msg);
684         
685         return msg;
686     }
687 
688     Filter m_pipe_to;
689     OutputBuffers m_outputs;
690     message_id m_default_read;
691     bool m_inside_msg;
692 
693 }
694 
695 /*
696 * A Filter that does nothing
697 */
698 final class NullFilter : Filter, Filterable
699 {
700 public:
701     override void write(const(ubyte)* input, size_t length)
702     { send(input, length); }
703     
704     override @property string name() const { return "Null"; }
705 
706     // Interface fallthrough
707     override bool attachable() { return super.attachable(); }
708     override void startMsg() { super.startMsg(); }
709     override void endMsg() { super.endMsg(); }
710     override void setNext(Filter* filters, size_t sz) { super.setNext(filters, sz); }
711 }