1 /**
2 * Basic Filters
3 * 
4 * Copyright:
5 * (C) 1999-2007 Jack Lloyd
6 * (C) 2014-2015 Etienne Cimon
7 * (C) 2013 Joel Low
8 *
9 * License:
10 * Botan is released under the Simplified BSD License (see LICENSE.md)
11 */
12 module botan.filters.basefilt;
13 
14 import botan.filters.filter;
15 import std.concurrency;
16 import memutils.refcounted;
17 import botan.filters.key_filt;
18 import botan.utils.types;
19 import botan.utils.semaphore;
20 
21 /**
22 * BitBucket is a filter which simply discards all inputs
23 */
24 final class BitBucket : Filter, Filterable
25 {
26     override void write(const(ubyte)*, size_t) {}
27 
28     override @property string name() const { return "BitBucket"; }
29 
30     override void setNext(Filter* filters, size_t sz) { super.setNext(filters, sz); }
31 }
32 
33 /**
34 * This class represents Filter chains. A Filter chain is an ordered
35 * concatenation of Filters, the input to a Chain sequentially passes
36 * through all the Filters contained in the Chain.
37 */
38 
39 final class Chain : FanoutFilter, Filterable
40 {
41 public:
42     override void write(const(ubyte)* input, size_t length) { send(input, length); }
43 
44     override @property string name() const
45     {
46         return "Chain";
47     }
48 
49     /**
50     * Construct a chain of up to four filters. The filters are set
51     * up in the same order as the arguments.
52     */
53     this(Filter f1 = null, Filter f2 = null,
54             Filter f3 = null, Filter f4 = null)
55     {
56         if (f1) { attach(f1); incrOwns(); }
57         if (f2) { attach(f2); incrOwns(); }
58         if (f3) { attach(f3); incrOwns(); }
59         if (f4) { attach(f4); incrOwns(); }
60     }
61 
62     /**
63     * Construct a chain from range of filters
64     * Params:
65     *  filter_arr = the list of filters
66     *  length = how many filters
67     */
68     this(Filter* filter_arr, size_t length) {
69         foreach (size_t j; 0 .. length) {
70             if (filter_arr[j])
71             {
72                 attach(filter_arr[j]);
73                 incrOwns();
74             }
75         }
76     }
77 
78     // Interface fallthrough
79     override bool attachable() { return super.attachable(); }
80     override void startMsg() { super.startMsg(); }
81     override void endMsg() { super.endMsg(); }
82     override void setNext(Filter* filters, size_t sz) { super.setNext(filters, sz); }
83 }
84 
85 /**
86 * This class represents a fork filter, whose purpose is to fork the
87 * flow of data. It causes an input message to result in n messages at
88 * the end of the filter, where n is the number of forks.
89 */
90 class Fork : FanoutFilter, Filterable
91 {
92 public:
93     override void write(const(ubyte)* input, size_t length) { send(input, length); }
94     override final void setPort(size_t n) { super.setPort(n); }
95 
96     override @property string name() const
97     {
98         return "Fork";
99     }
100 
101     /**
102     * Construct a Fork filter with up to four forks.
103     */
104     this(Filter f1, Filter f2, Filter f3 = null, Filter f4 = null)
105     {
106         Filter[4] filters = [ f1, f2, f3, f4 ];
107         setNext(filters.ptr, 4);
108     }
109 
110     /**
111     * Construct a Fork from range of filters
112     * Params:
113     *  filter_arr = the list of filters
114     *  length = how many filters
115     */    
116     this(Filter* filter_arr, size_t length)
117     {
118         setNext(filter_arr, length);
119     }
120 
121     // Interface fallthrough
122     override bool attachable() { return super.attachable(); }
123     override void startMsg() { super.startMsg(); }
124     override void endMsg() { super.endMsg(); }
125     override void setNext(Filter* filters, size_t sz) { super.setNext(filters, sz); }
126 }
127 
128 /**
129 * This class is a threaded version of the Fork filter. While this uses
130 * threads, the class itself is NOT thread-safe. This is meant as a drop-
131 * in replacement for Fork where performance gains are possible.
132 */
133 class ThreadedFork : Fork, Filterable
134 {
135 public:
136     override @property string name() const
137     {
138         return "Threaded Fork";
139     }
140 
141     /**
142     * Construct a Threaded_Fork filter with up to four forks.
143     */
144     this(Filter f1, Filter f2, Filter f3 = null, Filter f4 = null)
145     { 
146         super(null, cast(size_t)(0));
147         m_thread_data = new ThreadedForkData;
148         Filter[4] filters = [ f1, f2, f3, f4 ];
149         setNext(filters.ptr, 4);
150     }
151 
152     /**
153     * Construct a Threaded_Fork from range of filters
154     * Params:
155     *  filter_arr = the list of filters
156     *  length = how many filters
157     */
158     this(Filter* filter_arr, size_t length)
159     {
160         
161         super(null, cast(size_t)(0));
162         m_thread_data = new ThreadedForkData;
163         setNext(filter_arr, length);
164     }
165 
166     ~this()
167     {
168         m_thread_data.m_input = null;
169         m_thread_data.m_input_length = 0;
170         
171         m_thread_data.m_input_ready_semaphore.release(m_threads.length);
172         /*
173         foreach (ref thread; m_threads)
174             thread.join();*/
175     }
176 
177     // Interface fallthrough
178     override bool attachable() { return super.attachable(); }
179     override void startMsg() { super.startMsg(); }
180     override void endMsg() { super.endMsg(); }
181     override void write(const(ubyte)* input, size_t len) { super.write(input, len); }
182 protected:
183     override void setNext(Filter* f, size_t n)
184     {
185         super.setNext(f, n);
186         n = m_next.length;
187         
188         if (n < m_threads.length)
189             m_threads.resize(n);
190         else
191         {
192             m_threads.reserve(n);
193             foreach (size_t i; m_threads.length .. n)
194             {
195                 m_threads.pushBack(spawn(&threadEntry, cast(shared)this, cast(shared)m_next[i]));
196             }
197         }
198     }
199 
200     override void send(const(ubyte)* input, size_t length)
201     {
202         if (m_write_queue.length)
203             threadDelegateWork(m_write_queue.ptr, m_write_queue.length);
204         threadDelegateWork(input, length);
205         
206         bool nothing_attached = true;
207         foreach (size_t j; 0 .. totalPorts())
208             if (m_next[j])
209                 nothing_attached = false;
210         
211         if (nothing_attached)
212             m_write_queue ~= input[0 .. length];
213         else
214             m_write_queue.clear();
215     }
216 
217 private:
218     void threadDelegateWork(const(ubyte)* input, size_t length)
219     {
220         //Set the data to do.
221         m_thread_data.m_input = input;
222         m_thread_data.m_input_length = length;
223         
224         //Let the workers start processing.
225         m_thread_data.m_input_ready_semaphore.release(totalPorts());
226         
227         //Wait for all the filters to finish processing.
228         foreach (size_t i; 0 .. totalPorts())
229             m_thread_data.m_input_complete_semaphore.acquire();
230         
231         //Reset the thread data
232         m_thread_data.m_input = null;
233         m_thread_data.m_input_length = 0;
234     }
235 
236     static void threadEntry(shared(ThreadedFork) This_, shared(Filter) filter_)
237     {
238         ThreadedFork This = cast(ThreadedFork) This_;
239         Filter filter = cast(Filter) filter_;
240         while (true)
241         {
242             This.m_thread_data.m_input_ready_semaphore.acquire();
243             
244             if (!This.m_thread_data.m_input)
245                 break;
246             
247             filter.write(This.m_thread_data.m_input[0 .. This.m_thread_data.m_input_length]);
248             This.m_thread_data.m_input_complete_semaphore.release();
249         }
250     }
251 
252     Vector!(Tid) m_threads;
253     Unique!ThreadedForkData m_thread_data;
254 }
255 
256 struct ThreadedForkData
257 {
258     /*
259     * Semaphore for indicating that there is work to be done (or to
260     * quit)
261     */
262     Semaphore m_input_ready_semaphore;
263     
264     /*
265     * Ensures that all threads have completed processing data.
266     */
267     Semaphore m_input_complete_semaphore;
268     
269     /*
270     * The work that needs to be done. This should be only when the threads
271     * are NOT running (i.e. before notifying the work condition, after
272     * the input_complete_semaphore is completely reset.)
273     */
274     const(ubyte)* m_input;
275     
276     /*
277     * The length of the work that needs to be done.
278     */
279     size_t m_input_length = 0;
280 }