1 module tristanable.manager; 2 3 import std.socket : Socket; 4 import core.sync.mutex : Mutex; 5 import bmessage : bSendMessage = sendMessage; 6 import tristanable.queue : Queue; 7 import tristanable.watcher; 8 import std.container.dlist; 9 import tristanable.exceptions; 10 11 /** 12 * Manager 13 * 14 * This is the core class that is to be instantiated 15 * that represents an instance of the tristanable 16 * framework. It is passed a Socket from which it 17 * reads from (using a bformat block reader). 18 * 19 * It contains a Watcher which does the reading and 20 * appending to respective queues (the user need not 21 * worry about this factum). 22 * 23 * The functions provided allow users to wait in a 24 * tight loop to dequeue ("receive" in a blcoking mannger) 25 * from a specified queue. 26 */ 27 public final class Manager 28 { 29 /* All queues */ 30 private DList!(Queue) queues; 31 private Mutex queuesLock; 32 33 /* TODO Add drop queue? */ 34 35 /** 36 * The remote host 37 */ 38 private Socket socket; 39 40 41 private Watcher watcher; 42 43 44 /** 45 * Constructs a new Manager with the given 46 * endpoint Socket 47 * 48 */ 49 this(Socket socket) 50 { 51 /* TODO: Make sure the socket is in STREAM mode */ 52 53 /* Set the socket */ 54 this.socket = socket; 55 56 /* Initialize the queues mutex */ 57 queuesLock = new Mutex(); 58 59 /* Initialize the watcher */ 60 watcher = new Watcher(this, socket); 61 } 62 63 public Queue getQueue(ulong tag) 64 { 65 Queue matchingQueue; 66 67 queuesLock.lock(); 68 69 foreach(Queue queue; queues) 70 { 71 if(queue.getTag() == tag) 72 { 73 matchingQueue = queue; 74 break; 75 } 76 } 77 78 queuesLock.unlock(); 79 80 return matchingQueue; 81 } 82 83 /* TODO: Probably remove this or keep it */ 84 public bool isValidTag(ulong tag) 85 { 86 return !(getQueue(tag) is null); 87 } 88 89 /** 90 * Returns a new queue with a new ID, 91 * if all IDs are used then it returns 92 * null 93 * 94 * Use this if you don't care about reserving 95 * queues IDs and just want a throwaway queue 96 * 97 * FIXME: All tags in use, this won't handle it 98 */ 99 public Queue generateQueue() 100 { 101 /* Newly generated queue */ 102 Queue newQueue; 103 104 queuesLock.lock(); 105 106 ulong curGuess = 0; 107 bool bad = true; 108 reguess: while(bad) 109 { 110 if(isValidTag(curGuess)) 111 { 112 curGuess++; 113 continue reguess; 114 } 115 116 bad = false; 117 } 118 119 /* Create the new queue with the free id found */ 120 newQueue = new Queue(curGuess); 121 122 /* Add the queue (recursive mutex) */ 123 addQueue(newQueue); 124 125 queuesLock.unlock(); 126 127 128 return newQueue; 129 } 130 131 public Queue[] getQueues() 132 { 133 Queue[] queues; 134 queuesLock.lock(); 135 136 foreach(Queue queue; this.queues) 137 { 138 queues ~= queue; 139 } 140 141 queuesLock.unlock(); 142 143 return queues; 144 } 145 146 /** 147 * Removes the given Queue, `queue`, from the manager 148 * 149 * Throws a TristanableException if the id of the 150 * queue wanting to be removed is not in use by any 151 * queue already added 152 */ 153 public void removeQueue(Queue queue) 154 { 155 queuesLock.lock(); 156 157 /* Make sure such a tag exists */ 158 if(isValidTag(queue.getTag())) 159 { 160 queues.linearRemoveElement(queue); 161 } 162 else 163 { 164 /* Unlock queue before throwing an exception */ 165 queuesLock.unlock(); 166 throw new TristanableException(this, "Cannot remove a queue with an id not in use"); 167 } 168 169 queuesLock.unlock(); 170 } 171 172 /** 173 * Adds the given Queue, `queue`, to the manager 174 * 175 * Throws a TristanableException if the id of the 176 * queue wanting to be added is already in use by 177 * another already added queue 178 */ 179 public void addQueue(Queue queue) 180 { 181 queuesLock.lock(); 182 183 /* Make sure such a tag does not exist already */ 184 if(!isValidTag(queue.getTag())) 185 { 186 queues ~= queue; 187 } 188 else 189 { 190 /* Unlock queue before throwing an exception */ 191 queuesLock.unlock(); 192 throw new TristanableException(this, "Cannot add queue with id already in use"); 193 } 194 195 queuesLock.unlock(); 196 } 197 198 public Socket getSocket() 199 { 200 return socket; 201 } 202 203 /** 204 * TODO: Comment 205 * TODO: Testing 206 */ 207 public void shutdown() 208 { 209 /* TODO: Implement me */ 210 211 /* Make the loop stop whenever it does */ 212 watcher.shutdown(); 213 214 /* Wait for the thread to end */ 215 watcher.join(); 216 } 217 }