1 module tristanable.manager;
2 
3 import std.socket : Socket;
4 import core.sync.mutex : Mutex;
5 import bmessage : bSendMessage = sendMessage;
6 import tristanable.queue : Queue;
7 import tristanable.watcher;
8 import std.container.dlist;
9 import tristanable.exceptions;
10 
11 /**
12 * Manager
13 *
14 * This is the core class that is to be instantiated
15 * that represents an instance of the tristanable
16 * framework. It is passed a Socket from which it
17 * reads from (using a bformat block reader).
18 *
19 * It contains a Watcher which does the reading and
20 * appending to respective queues (the user need not
21 * worry about this factum).
22 *
23 * The functions provided allow users to wait in a
24 * tight loop to dequeue ("receive" in a blcoking mannger)
25 * from a specified queue.
26 */
27 public final class Manager
28 {
29 	/* All queues */
30 	private DList!(Queue) queues;
31 	private Mutex queuesLock;
32 	
33 	/* TODO Add drop queue? */
34 	
35 	/**
36     * The remote host
37     */
38     private Socket socket;
39 
40 
41 	private Watcher watcher;
42 
43 
44 	/**
45 	* Constructs a new Manager with the given
46 	* endpoint Socket
47 	*
48 	*/
49 	this(Socket socket)
50 	{
51 		/* TODO: Make sure the socket is in STREAM mode */
52 		
53 		/* Set the socket */
54 		this.socket = socket;
55 
56 		/* Initialize the queues mutex */
57 		queuesLock = new Mutex();
58 
59 		/* Initialize the watcher */
60 		watcher = new Watcher(this, socket);
61 	}
62 
63 	public Queue getQueue(ulong tag)
64 	{
65 		Queue matchingQueue;
66 
67 		queuesLock.lock();
68 
69 		foreach(Queue queue; queues)
70 		{
71 			if(queue.getTag() == tag)
72 			{
73 				matchingQueue = queue;
74 				break;
75 			}
76 		}
77 
78 		queuesLock.unlock();
79 
80 		return matchingQueue;
81 	}
82 
83 	/* TODO: Probably remove this or keep it */
84 	public bool isValidTag(ulong tag)
85 	{
86 		return !(getQueue(tag) is null);
87 	}
88 
89 	/**
90 	* Returns a new queue with a new ID,
91 	* if all IDs are used then it returns
92 	* null
93 	*
94 	* Use this if you don't care about reserving
95 	* queues IDs and just want a throwaway queue
96 	*
97 	* FIXME: All tags in use, this won't handle it
98 	*/
99 	public Queue generateQueue()
100 	{
101 		/* Newly generated queue */
102 		Queue newQueue;
103 
104 		queuesLock.lock();
105 
106 		ulong curGuess = 0;
107 		bool bad = true;
108 		reguess: while(bad)
109 		{
110 			if(isValidTag(curGuess))
111 			{
112 				curGuess++;
113 				continue reguess;
114 			}
115 
116 			bad = false;
117 		}
118 		
119 		/* Create the new queue with the free id found */
120 		newQueue = new Queue(curGuess);
121 
122 		/* Add the queue (recursive mutex) */
123 		addQueue(newQueue);
124 
125 		queuesLock.unlock();
126 
127 
128 		return newQueue;
129 	}
130 
131 	public Queue[] getQueues()
132 	{
133 		Queue[] queues;
134 		queuesLock.lock();
135 
136 		foreach(Queue queue; this.queues)
137 		{
138 			queues ~= queue;
139 		}
140 
141 		queuesLock.unlock();
142 
143 		return queues;
144 	}
145 
146 	/**
147 	* Removes the given Queue, `queue`, from the manager
148 	*
149 	* Throws a TristanableException if the id of the
150 	* queue wanting to be removed is not in use by any
151 	* queue already added
152 	*/
153 	public void removeQueue(Queue queue)
154 	{
155 		queuesLock.lock();
156 
157 		/* Make sure such a tag exists */
158 		if(isValidTag(queue.getTag()))
159 		{
160 			queues.linearRemoveElement(queue);
161 		}
162 		else
163 		{
164 			/* Unlock queue before throwing an exception */
165 			queuesLock.unlock();
166 			throw new TristanableException(this, "Cannot remove a queue with an id not in use");
167 		}
168 
169 		queuesLock.unlock();
170 	}
171 
172 	/**
173 	* Adds the given Queue, `queue`, to the manager
174 	*
175 	* Throws a TristanableException if the id of the
176 	* queue wanting to be added is already in use by
177 	* another already added queue
178 	*/
179 	public void addQueue(Queue queue)
180 	{
181 		queuesLock.lock();
182 
183 		/* Make sure such a tag does not exist already */
184 		if(!isValidTag(queue.getTag()))
185 		{
186 			queues ~= queue;
187 		}
188 		else
189 		{
190 			/* Unlock queue before throwing an exception */
191 			queuesLock.unlock();
192 			throw new TristanableException(this, "Cannot add queue with id already in use");
193 		}
194 
195 		queuesLock.unlock();
196 	}
197 
198 	public Socket getSocket()
199 	{
200 		return socket;
201 	}
202 
203 	/**
204 	* TODO: Comment
205 	* TODO: Testing
206 	*/
207 	public void shutdown()
208 	{
209 		/* TODO: Implement me */
210 
211 		/* Make the loop stop whenever it does */
212 		watcher.shutdown();
213 
214 		/* Wait for the thread to end */
215 		watcher.join();
216 	}
217 }