1 /**
2 * Pipe
3 * 
4 * Copyright:
5 * (C) 1999-2007 Jack Lloyd
6 * (C) 2014-2015 Etienne Cimon
7 *     2012 Markus Wanner
8 *
9 * License:
10 * Botan is released under the Simplified BSD License (see LICENSE.md)
11 */
12 module botan.filters.pipe;
13 
14 import botan.constants;
15 import botan.filters.data_src;
16 import botan.filters.filter;
17 import botan.utils.exceptn;
18 //static if (BOTAN_HAS_PIPE_UNIXFD_IO && false)
19  //   import botan.fd_unix;
20 
21 import botan.filters.out_buf;
22 import botan.filters.secqueue;
23 import botan.utils.parsing;
24 import botan.utils.types;
25 import std.conv : to;
26 import std.array : Appender;
27 
28 /**
29   * An opaque type that identifies a message in this Pipe
30   */
31 alias message_id = size_t;
32 
33 /**
34   * Exception if you use an invalid message as an argument to
35   * read, remaining, etc
36   */
37 class InvalidMessageNumber : InvalidArgument
38 {
39 	/**
40       * Params:
41       *  where = the error occured
42       *  msg = the invalid message id that was used
43       */
44 	this(in string where, message_id msg) {
45 		super("Pipe:" ~ where ~ ": Invalid message number " ~ to!string(msg));
46 	}
47 }
48 
49 /**
50 * This class represents pipe objects.
51 * A set of filters can be placed into a pipe, and information flows
52 * through the pipe until it reaches the end, where the output is
53 * collected for retrieval.  If you're familiar with the Unix shell
54 * environment, this design will sound quite familiar.
55 */
56 struct Pipe
57 {
58 public:
59 
60 	alias message_id = size_t;
61 
62     /**
63     * A meta-id for whatever the last message is
64     */
65     static const message_id LAST_MESSAGE = cast(message_id)(-2);
66 
67     /**
68     * A meta-id for the default message (set with set_defaultMsg)
69     */
70     static const message_id DEFAULT_MESSAGE = cast(message_id)(-1);
71 
72     /**
73     * Write input to the pipe, i.e. to its first filter.
74     *
75     * Params:
76     *  input = the ubyte array to write
77     *  length = the length of the ubyte array in
78     */
79     void write(const(ubyte)* input, size_t length)
80     {
81         if (!m_inside_msg)
82             throw new InvalidState("Cannot write to a Pipe while it is not processing");
83         m_pipe_to.write(input, length);
84     }
85 
86     /**
87     * Write input to the pipe, i.e. to its first filter.
88     *
89     * Params:
90     *  input = the SecureVector containing the data to write
91     */
92     void write(T, ALLOC)(auto const ref RefCounted!(Vector!(T, ALLOC), ALLOC) input)
93     { write(input.ptr, input.length); }
94 
95     /**
96     * Write input to the pipe, i.e. to its first filter.
97     *
98     * Params:
99     *  input = the std::vector containing the data to write
100     */
101     void write(T, ALLOC)(auto const ref Vector!(T, ALLOC) input)
102     { write(input.ptr, input.length); }
103 
104     /**
105     * Write input to the pipe, i.e. to its first filter.
106     *
107     * Params:
108     *  input = the string containing the data to write
109     */
110     void write(string input)
111     {
112         write(cast(const(ubyte)*)input.ptr, input.length);
113     }
114 
115     /**
116     * Write input to the pipe, i.e. to its first filter.
117     *
118     * Params:
119     *  source = the DataSource to read the data from
120     */
121     void write(DataSource source)
122     {
123         SecureVector!ubyte buffer = SecureVector!ubyte(DEFAULT_BUFFERSIZE);
124         while (!source.endOfData())
125         {
126             size_t got = source.read(buffer.ptr, buffer.length);
127             write(buffer.ptr, got);
128         }
129     }
130 
131     /**
132     * Write input to the pipe, i.e. to its first filter.
133     *
134     * Params:
135     *  input = a single ubyte to be written
136     */
137     void write(ubyte input)
138     {
139         write(&input, 1);
140     }
141 
142     /**
143     * Write input to the pipe, i.e. to its first filter.
144     *
145     * Params:
146     *  input = a ubyte array to be written
147     */
148     void write(ubyte[] input)
149     {
150         write(cast(const(ubyte)*)input.ptr, input.length);
151     }
152 
153     /**
154     * Perform startMsg(), write() and endMsg() sequentially.
155     *
156     * Params:
157     *  input = the ubyte array containing the data to write
158     *  length = the length of the ubyte array to write
159     */
160     void processMsg(const(ubyte)* input, size_t length)
161     {
162         startMsg();
163         write(input, length);
164         endMsg();
165     }
166 
167     /**
168     * Perform startMsg(), write() and endMsg() sequentially.
169     *
170     * Params:
171     *  input = the SecureVector containing the data to write
172     */
173     void processMsg(ALLOC)(auto const ref Vector!(ubyte, ALLOC) input)
174     {
175         processMsg(input.ptr, input.length);
176     }
177 
178     /**
179     * Perform startMsg(), write() and endMsg() sequentially.
180     *
181     * Params:
182     *  input = the SecureVector containing the data to write
183     */
184     void processMsg(ALLOC)(auto const ref RefCounted!(Vector!(ubyte, ALLOC), ALLOC) input)
185     {
186         processMsg(input.ptr, input.length);
187     }
188 
189     /**
190     * Perform startMsg(), write() and endMsg() sequentially.
191     *
192     * Params:
193     *  input = the string containing the data to write
194     */
195     void processMsg(string input)
196     {
197         processMsg(cast(const(ubyte)*)(input.ptr), input.length);
198     }
199 
200     /**
201     * Perform startMsg(), write() and endMsg() sequentially.
202     *
203     * Params:
204     *  input = the DataSource providing the data to write
205     */
206     void processMsg(DataSource input)
207     {
208         startMsg();
209         write(input);
210         endMsg();
211     }
212 
213     /**
214     * Find out how many bytes are ready to read.
215     *
216     * Params:
217     *  msg = the number identifying the message
218     * for which the information is desired
219     * Returns: number of bytes that can still be read
220     */
221     size_t remaining(message_id msg = DEFAULT_MESSAGE) const
222     {
223         return m_outputs.remaining(getMessageNo("remaining", msg));
224     }
225 
226     /**
227     * Read the default message from the pipe. Moves the internal
228     * offset so that every call to read will return a new portion of
229     * the message.
230     *
231     * Params:
232     *  output = the ubyte array to write the read bytes to
233     *  length = the length of the ubyte array output
234     * Returns: number of bytes actually read into output
235     */
236     size_t read(ubyte* output, size_t length)
237     {
238         return read(output, length, DEFAULT_MESSAGE);
239     }
240 
241     /**
242     * Read a specified message from the pipe. Moves the internal
243     * offset so that every call to read will return a new portion of
244     * the message.
245     *
246     * Params:
247     *  output = the ubyte array to write the read bytes to
248     *  length = the length of the ubyte array output
249     *  msg = the number identifying the message to read from
250     * Returns: number of bytes actually read into output
251     */
252     size_t read(ubyte* output, size_t length, message_id msg)
253     {
254         return m_outputs.read(output, length, getMessageNo("read", msg));
255     }
256 
257     /**
258     * Read a specified message from the pipe. Moves the internal
259     * offset so that every call to read will return a new portion of
260     * the message.
261     *
262     * Params:
263     *  output = the ubyte array to write the read bytes to
264     *  msg = the number identifying the message to read from
265     * Returns: number of bytes actually read into output
266     */
267     size_t read(ref ubyte[] output, message_id msg = DEFAULT_MESSAGE)
268     {
269         return m_outputs.read(output.ptr, output.length, getMessageNo("read", msg));
270     }
271 
272     /**
273     * Read a single ubyte from the pipe. Moves the internal offset so
274     * that every call to read will return a new portion of the
275     * message.
276     *
277     * Params:
278     *  output = the ubyte to write the result to
279     *  msg = the message to read from
280     * Returns: number of bytes actually read into output
281     */
282     size_t read(ref ubyte output, message_id msg = DEFAULT_MESSAGE)
283     {
284         return read(&output, 1, msg);
285     }
286 
287     /**
288     * Read the full contents of the pipe.
289     *
290     * Params:
291     *  msg = the number identifying the message to read from
292     * Returns: SecureVector holding the contents of the pipe
293     */
294     SecureVector!ubyte readAll(message_id msg = DEFAULT_MESSAGE)
295     {
296         msg = ((msg != DEFAULT_MESSAGE) ? msg : defaultMsg());
297         SecureArray!ubyte buffer = SecureVector!ubyte(remaining(msg));
298         size_t got = read(buffer.ptr, buffer.length, msg);
299         buffer.resize(got);
300         return buffer.move();
301     }
302 
303     /**
304     * Read the full contents of the pipe.
305     *
306     * Params:
307     *  msg = the number identifying the message to read from
308     * Returns: string holding the contents of the pipe
309     */
310     string toString(message_id msg = DEFAULT_MESSAGE)
311     {
312         msg = ((msg != DEFAULT_MESSAGE) ? msg : defaultMsg());
313         SecureVector!ubyte buffer = SecureVector!ubyte(DEFAULT_BUFFERSIZE);
314         Appender!string str;
315         str.reserve(remaining(msg));
316         
317         while (true)
318         {
319             size_t got = read(buffer.ptr, buffer.length, msg);
320             if (got == 0)
321                 break;
322             str ~= buffer.ptr[0 .. got];
323         }
324         
325         return str.data;
326     }
327 
328     /** Read from the default message but do not modify the internal
329     * offset. Consecutive calls to peek() will return portions of
330     * the message starting at the same position.
331     *
332     * Params:
333     *  output = the ubyte array to write the peeked message part to
334     *  length = the length of the ubyte array output
335     *  offset = the offset from the current position in message
336     *  msg = the number identifying the message to peek from
337     * Returns: number of bytes actually peeked and written into output
338     */
339     size_t peek(ubyte* output, size_t length, size_t offset, message_id msg = DEFAULT_MESSAGE) const
340     {
341         return m_outputs.peek(output, length, offset, getMessageNo("peek", msg));
342     }
343 
344     /** Read from the specified message but do not modify the
345     * internal offset. Consecutive calls to peek() will return
346     * portions of the message starting at the same position.
347     *
348     * Params:
349     *  output = the ubyte array to write the peeked message part to
350     *  offset = the offset from the current position in message
351     *  msg = the number identifying the message to peek from
352     * Returns: number of bytes actually peeked and written into output
353     */
354     size_t peek(ref ubyte[] output, size_t offset, message_id msg = DEFAULT_MESSAGE) const
355     {
356         return peek(output.ptr, output.length, offset, DEFAULT_MESSAGE);
357     }
358 
359     /** Read a single ubyte from the specified message but do not
360     * modify the internal offset. Consecutive calls to peek() will
361     * return portions of the message starting at the same position.
362     *
363     * Params:
364     *  output = the ubyte to write the peeked message ubyte to
365     *  offset = the offset from the current position in message
366     *  msg = the number identifying the message to peek from
367     * Returns: number of bytes actually peeked and written into output
368     */
369     size_t peek(ref ubyte output, size_t offset, message_id msg = DEFAULT_MESSAGE) const
370     {
371         return peek(&output, 1, offset, msg);
372     }
373 
374     /**
375     * Read one ubyte.
376     *
377     * Params:
378     *  output = the ubyte to read to
379     * Returns: length in bytes that was actually read and put
380     * into out
381     */
382     size_t readByte(ref ubyte output)
383     {
384         return read(&output, 1);
385     }
386     
387     
388     /**
389     * Peek at one ubyte.
390     *
391     * Params:
392     *  output = an output ubyte
393     * Returns: length in bytes that was actually read and put
394     * into out
395     */
396     size_t peekByte(ref ubyte output) const
397     {
398         return peek(&output, 1, 0);
399     }
400     
401     
402     /**
403     * Discard the next N bytes of the data
404     * Params:
405     *  n = the number of bytes to discard
406     * Returns: number of bytes actually discarded
407     */
408     size_t discardNext(size_t n)
409     {
410         size_t discarded = 0;
411         ubyte dummy;
412         foreach (size_t j; 0 .. n)
413             discarded += readByte(dummy);
414         return discarded;
415     }
416 
417     /**
418     * Returns: the number of bytes read from the default message.
419     */
420     size_t getBytesRead() const
421     {
422         return m_outputs.getBytesRead(DEFAULT_MESSAGE);
423     }
424 
425     /**
426     * Returns: the number of bytes read from the specified message.
427     */
428     size_t getBytesRead(message_id msg = DEFAULT_MESSAGE) const
429     {
430         return m_outputs.getBytesRead(msg);
431     }
432 
433     /**
434     * Returns: currently set default message
435     */
436     size_t defaultMsg() const { return m_default_read; }
437 
438     /**
439     * Set the default message
440     * Params:
441     *  msg = the number identifying the message which is going to
442     * be the new default message
443     */
444     void setDefaultMsg(message_id msg)
445     {
446         if (msg >= messageCount())
447             throw new InvalidArgument("Pipe::setDefaultMsg: msg number is too high");
448         m_default_read = msg;
449     }
450 
451     /**
452     * Get the number of messages the are in this pipe.
453     * Returns: number of messages the are in this pipe
454     */
455     message_id messageCount() const
456     {
457         return m_outputs.messageCount();
458     }
459 
460 
461     /**
462     * Test whether this pipe has any data that can be read from.
463     * Returns: true if there is more data to read, false otherwise
464     */
465     bool endOfData() const
466     {
467         return (remaining() == 0);
468     }
469 
470     /**
471     * Start a new message in the pipe. A potential other message in this pipe
472     * must be closed with endMsg() before this function may be called.
473     */
474     void startMsg()
475     {
476         if (m_inside_msg)
477             throw new InvalidState("Pipe::startMsg: Message was already started");
478         if (!m_pipe_to)
479             m_pipe_to = new NullFilter;
480         findEndpoints(m_pipe_to);
481         m_pipe_to.newMsg();
482         m_inside_msg = true;
483     }
484 
485     /**
486     * End the current message.
487     */
488     void endMsg()
489     {
490         if (!m_inside_msg)
491             throw new InvalidState("Pipe::endMsg: Message was already ended");
492         m_pipe_to.finishMsg();
493         clearEndpoints(m_pipe_to);
494         if (cast(NullFilter)(m_pipe_to))
495         {
496             destroy(m_pipe_to);
497             m_pipe_to = null;
498         }
499         m_inside_msg = false;
500         
501         m_outputs.retire();
502     }
503 
504     /**
505     * Insert a new filter at the front of the pipe
506     * Params:
507     *  filter = the new filter to insert
508     */
509     void prepend(Filter filter)
510     {
511         if (m_inside_msg)
512             throw new InvalidState("Cannot prepend to a Pipe while it is processing");
513         if (!filter)
514             return;
515         if (cast(SecureQueue)(filter))
516             throw new InvalidArgument("Pipe::prepend: SecureQueue cannot be used");
517         if (filter.m_owned)
518             throw new InvalidArgument("Filters cannot be shared among multiple Pipes");
519         
520         filter.m_owned = true;
521         
522         if (m_pipe_to) filter.attach(m_pipe_to);
523         m_pipe_to = filter;
524     }
525 
526     /**
527     * Insert a new filter at the back of the pipe
528     * Params:
529     *  filter = the new filter to insert
530     */
531     void append(Filter filter)
532     {
533         if (m_inside_msg)
534             throw new InvalidState("Cannot append to a Pipe while it is processing");
535         if (!filter)
536             return;
537         if (cast(SecureQueue)(filter))
538             throw new InvalidArgument("Pipe::append: SecureQueue cannot be used");
539         if (filter.m_owned)
540             throw new InvalidArgument("Filters cannot be shared among multiple Pipes");
541         
542         filter.m_owned = true;
543         
544         if (!m_pipe_to) m_pipe_to = filter;
545         else        m_pipe_to.attach(filter);
546     }
547 
548 
549     /**
550     * Remove the first filter at the front of the pipe.
551     */
552     void pop()
553     {
554         if (m_inside_msg)
555             throw new InvalidState("Cannot pop off a Pipe while it is processing");
556         
557         if (!m_pipe_to)
558             return;
559         
560         if (m_pipe_to.totalPorts() > 1)
561             throw new InvalidState("Cannot pop off a Filter with multiple ports");
562         
563         Filter f = m_pipe_to;
564         size_t owns = f.owns();
565         m_pipe_to = m_pipe_to.m_next[0];
566         destroy(f);
567         
568         while (owns--)
569         {
570             f = m_pipe_to;
571             m_pipe_to = m_pipe_to.m_next[0];
572             destroy(f);
573         }
574     }
575 
576 
577     /**
578     * Reset this pipe to an empty pipe.
579     */
580     void reset()
581     {
582         destruct(m_pipe_to);
583         m_pipe_to = null;
584         m_inside_msg = false;
585     }
586 
587 
588     /**
589     * Construct a Pipe of up to four filters. The filters are set up
590     * in the same order as the arguments.
591     */
592     this(Filter f1, Filter f2 = null, Filter f3 = null, Filter f4 = null)
593     {
594         init();
595         append(f1);
596         append(f2);
597         append(f3);
598         append(f4);
599     }
600 
601     /**
602     * Construct a Pipe from a list of filters
603     * Params:
604     *  filters = the set of filters to use
605     */
606     this(Filter[] filters)
607     {
608         init();
609         
610         foreach (filter; filters)
611             append(filter);
612     }
613 
614     ~this()
615     {
616         destruct(m_pipe_to);
617     }
618 
619 private:
620     /*
621     * Initialize the Pipe
622     */
623     void init()
624     {
625         m_pipe_to = null;
626         m_default_read = 0;
627         m_inside_msg = false;
628     }
629 
630     /*
631     * Destroy the Pipe
632     */
633     void destruct(Filter to_kill)
634     {
635         if (!to_kill || cast(SecureQueue)(to_kill))
636             return;
637         for (size_t j = 0; j != to_kill.totalPorts(); ++j)
638             if (to_kill.m_next[j]) destruct(to_kill.m_next[j]);
639         destroy(to_kill);
640     }
641 
642     /*
643     * Find the endpoints of the Pipe
644     */
645     void findEndpoints(Filter f)
646     {
647         for (size_t j = 0; j != f.totalPorts(); ++j)
648             if (f.m_next[j] && !cast(SecureQueue)(f.m_next[j]))
649                 findEndpoints(f.m_next[j]);
650             else
651         {
652             SecureQueue q = new SecureQueue;
653             f.m_next[j] = q;
654             m_outputs.add(q);
655         }
656     }
657 
658     /*
659     * Remove the SecureQueues attached to the Filter
660     */
661     void clearEndpoints(Filter f)
662     {
663         if (!f) return;
664         for (size_t j = 0; j != f.totalPorts(); ++j)
665         {
666             if (f.m_next[j] && cast(SecureQueue)(f.m_next[j]))
667                 f.m_next[j] = null;
668             clearEndpoints(f.m_next[j]);
669         }
670     }
671 
672     /*
673     * Look up the canonical ID for a queue
674     */
675     message_id getMessageNo(in string func_name,
676                               message_id msg) const
677     {
678         if (msg == DEFAULT_MESSAGE)
679             msg = defaultMsg();
680         else if (msg == LAST_MESSAGE)
681             msg = messageCount() - 1;
682         
683         if (msg >= messageCount())
684             throw new InvalidMessageNumber(func_name, msg);
685         
686         return msg;
687     }
688 
689     Filter m_pipe_to;
690     OutputBuffers m_outputs;
691     message_id m_default_read;
692     bool m_inside_msg;
693 
694 }
695 
696 /*
697 * A Filter that does nothing
698 */
699 final class NullFilter : Filter, Filterable
700 {
701 public:
702     override void write(const(ubyte)* input, size_t length)
703     { send(input, length); }
704     
705     override @property string name() const { return "Null"; }
706 
707     // Interface fallthrough
708     override bool attachable() { return super.attachable(); }
709     override void startMsg() { super.startMsg(); }
710     override void endMsg() { super.endMsg(); }
711     override void setNext(Filter* filters, size_t sz) { super.setNext(filters, sz); }
712 }