1 /** 2 * Basic Filters 3 * 4 * Copyright: 5 * (C) 1999-2007 Jack Lloyd 6 * (C) 2014-2015 Etienne Cimon 7 * (C) 2013 Joel Low 8 * 9 * License: 10 * Botan is released under the Simplified BSD License (see LICENSE.md) 11 */ 12 module botan.filters.basefilt; 13 14 import botan.filters.filter; 15 import std.concurrency; 16 import memutils.refcounted; 17 import botan.filters.key_filt; 18 import botan.utils.types; 19 import botan.utils.semaphore; 20 21 /** 22 * BitBucket is a filter which simply discards all inputs 23 */ 24 final class BitBucket : Filter, Filterable 25 { 26 override void write(const(ubyte)*, size_t) {} 27 28 override @property string name() const { return "BitBucket"; } 29 30 override void setNext(Filter* filters, size_t sz) { super.setNext(filters, sz); } 31 } 32 33 /** 34 * This class represents Filter chains. A Filter chain is an ordered 35 * concatenation of Filters, the input to a Chain sequentially passes 36 * through all the Filters contained in the Chain. 37 */ 38 39 final class Chain : FanoutFilter, Filterable 40 { 41 public: 42 override void write(const(ubyte)* input, size_t length) { send(input, length); } 43 44 override @property string name() const 45 { 46 return "Chain"; 47 } 48 49 /** 50 * Construct a chain of up to four filters. The filters are set 51 * up in the same order as the arguments. 52 */ 53 this(Filter f1 = null, Filter f2 = null, 54 Filter f3 = null, Filter f4 = null) 55 { 56 if (f1) { attach(f1); incrOwns(); } 57 if (f2) { attach(f2); incrOwns(); } 58 if (f3) { attach(f3); incrOwns(); } 59 if (f4) { attach(f4); incrOwns(); } 60 } 61 62 /** 63 * Construct a chain from range of filters 64 * Params: 65 * filter_arr = the list of filters 66 * length = how many filters 67 */ 68 this(Filter* filter_arr, size_t length) { 69 foreach (size_t j; 0 .. length) { 70 if (filter_arr[j]) 71 { 72 attach(filter_arr[j]); 73 incrOwns(); 74 } 75 } 76 } 77 78 // Interface fallthrough 79 override bool attachable() { return super.attachable(); } 80 override void startMsg() { super.startMsg(); } 81 override void endMsg() { super.endMsg(); } 82 override void setNext(Filter* filters, size_t sz) { super.setNext(filters, sz); } 83 } 84 85 /** 86 * This class represents a fork filter, whose purpose is to fork the 87 * flow of data. It causes an input message to result in n messages at 88 * the end of the filter, where n is the number of forks. 89 */ 90 class Fork : FanoutFilter, Filterable 91 { 92 public: 93 override void write(const(ubyte)* input, size_t length) { send(input, length); } 94 override final void setPort(size_t n) { super.setPort(n); } 95 96 override @property string name() const 97 { 98 return "Fork"; 99 } 100 101 /** 102 * Construct a Fork filter with up to four forks. 103 */ 104 this(Filter f1, Filter f2, Filter f3 = null, Filter f4 = null) 105 { 106 Filter[4] filters = [ f1, f2, f3, f4 ]; 107 setNext(filters.ptr, 4); 108 } 109 110 /** 111 * Construct a Fork from range of filters 112 * Params: 113 * filter_arr = the list of filters 114 * length = how many filters 115 */ 116 this(Filter* filter_arr, size_t length) 117 { 118 setNext(filter_arr, length); 119 } 120 121 // Interface fallthrough 122 override bool attachable() { return super.attachable(); } 123 override void startMsg() { super.startMsg(); } 124 override void endMsg() { super.endMsg(); } 125 override void setNext(Filter* filters, size_t sz) { super.setNext(filters, sz); } 126 } 127 128 /** 129 * This class is a threaded version of the Fork filter. While this uses 130 * threads, the class itself is NOT thread-safe. This is meant as a drop- 131 * in replacement for Fork where performance gains are possible. 132 */ 133 class ThreadedFork : Fork, Filterable 134 { 135 public: 136 override @property string name() const 137 { 138 return "Threaded Fork"; 139 } 140 141 /** 142 * Construct a Threaded_Fork filter with up to four forks. 143 */ 144 this(Filter f1, Filter f2, Filter f3 = null, Filter f4 = null) 145 { 146 super(null, cast(size_t)(0)); 147 m_thread_data = new ThreadedForkData; 148 Filter[4] filters = [ f1, f2, f3, f4 ]; 149 setNext(filters.ptr, 4); 150 } 151 152 /** 153 * Construct a Threaded_Fork from range of filters 154 * Params: 155 * filter_arr = the list of filters 156 * length = how many filters 157 */ 158 this(Filter* filter_arr, size_t length) 159 { 160 161 super(null, cast(size_t)(0)); 162 m_thread_data = new ThreadedForkData; 163 setNext(filter_arr, length); 164 } 165 166 ~this() 167 { 168 m_thread_data.m_input = null; 169 m_thread_data.m_input_length = 0; 170 171 m_thread_data.m_input_ready_semaphore.release(m_threads.length); 172 /* 173 foreach (ref thread; m_threads) 174 thread.join();*/ 175 } 176 177 // Interface fallthrough 178 override bool attachable() { return super.attachable(); } 179 override void startMsg() { super.startMsg(); } 180 override void endMsg() { super.endMsg(); } 181 override void write(const(ubyte)* input, size_t len) { super.write(input, len); } 182 protected: 183 override void setNext(Filter* f, size_t n) 184 { 185 super.setNext(f, n); 186 n = m_next.length; 187 188 if (n < m_threads.length) 189 m_threads.resize(n); 190 else 191 { 192 m_threads.reserve(n); 193 foreach (size_t i; m_threads.length .. n) 194 { 195 m_threads.pushBack(spawn(&threadEntry, cast(shared)this, cast(shared)m_next[i])); 196 } 197 } 198 } 199 200 override void send(const(ubyte)* input, size_t length) 201 { 202 if (m_write_queue.length) 203 threadDelegateWork(m_write_queue.ptr, m_write_queue.length); 204 threadDelegateWork(input, length); 205 206 bool nothing_attached = true; 207 foreach (size_t j; 0 .. totalPorts()) 208 if (m_next[j]) 209 nothing_attached = false; 210 211 if (nothing_attached) 212 m_write_queue ~= input[0 .. length]; 213 else 214 m_write_queue.clear(); 215 } 216 217 private: 218 void threadDelegateWork(const(ubyte)* input, size_t length) 219 { 220 //Set the data to do. 221 m_thread_data.m_input = input; 222 m_thread_data.m_input_length = length; 223 224 //Let the workers start processing. 225 m_thread_data.m_input_ready_semaphore.release(totalPorts()); 226 227 //Wait for all the filters to finish processing. 228 foreach (size_t i; 0 .. totalPorts()) 229 m_thread_data.m_input_complete_semaphore.acquire(); 230 231 //Reset the thread data 232 m_thread_data.m_input = null; 233 m_thread_data.m_input_length = 0; 234 } 235 236 static void threadEntry(shared(ThreadedFork) This_, shared(Filter) filter_) 237 { 238 ThreadedFork This = cast(ThreadedFork) This_; 239 Filter filter = cast(Filter) filter_; 240 while (true) 241 { 242 This.m_thread_data.m_input_ready_semaphore.acquire(); 243 244 if (!This.m_thread_data.m_input) 245 break; 246 247 filter.write(This.m_thread_data.m_input[0 .. This.m_thread_data.m_input_length]); 248 This.m_thread_data.m_input_complete_semaphore.release(); 249 } 250 } 251 252 Vector!(Tid) m_threads; 253 Unique!ThreadedForkData m_thread_data; 254 } 255 256 struct ThreadedForkData 257 { 258 /* 259 * Semaphore for indicating that there is work to be done (or to 260 * quit) 261 */ 262 Semaphore m_input_ready_semaphore; 263 264 /* 265 * Ensures that all threads have completed processing data. 266 */ 267 Semaphore m_input_complete_semaphore; 268 269 /* 270 * The work that needs to be done. This should be only when the threads 271 * are NOT running (i.e. before notifying the work condition, after 272 * the input_complete_semaphore is completely reset.) 273 */ 274 const(ubyte)* m_input; 275 276 /* 277 * The length of the work that needs to be done. 278 */ 279 size_t m_input_length = 0; 280 }