1 /**
2 * Queue
3 *
4 * Represents a queue with a tag.
5 *
6 * Any messages that are received with
7 * the matching tag (to this queue) are
8 * then enqueued to this queue
9 */
10 
11 module tristanable.queue;
12 
13 import tristanable.queueitem : QueueItem;
14 import std.socket : Socket;
15 import core.sync.mutex : Mutex;
16 import bmessage : bSendMessage = sendMessage;
17 import core.thread : Thread;
18 import std.container.dlist;
19 import std.range : walkLength;
20 
21 public enum QueuePolicy : ubyte
22 {
23 	LENGTH_CAP = 1
24 }
25 
26 public final class Queue
27 {
28 	/* This queue's tag */
29 	private ulong tag;
30 
31 	/* The queue */
32 	private DList!(QueueItem) queue;
33 
34 	/* The queue mutex */
35 	private Mutex queueLock;
36 
37 	/**
38 	* Construct a new queue with the given
39 	* tag
40 	*/
41 	this(ulong tag, QueuePolicy flags = cast(QueuePolicy)0)
42 	{
43 		this.tag = tag;
44 
45 		/* Initialize the mutex */
46 		queueLock = new Mutex();
47 
48 		this.flags = flags;
49 	}
50 
51 	public void setLengthCap(ulong lengthCap)
52 	{
53 		this.lengthCap = lengthCap;
54 	}
55 
56 	public ulong getLengthCap(ulong lengthCap)
57 	{
58 		return lengthCap;
59 	}
60 
61 	/**
62 	* Queue policy settings
63 	*/
64 	private ulong lengthCap = 1;
65 	private QueuePolicy flags;
66 	
67 
68 	public void enqueue(QueueItem item)
69 	{
70 		/* Lock the queue */
71 		queueLock.lock();
72 
73 		/**
74 		* Check to see if the queue has a length cap
75 		*
76 		* If so then determine whether to drop or
77 		* keep dependent on current capacity
78 		*/
79 		if(flags & QueuePolicy.LENGTH_CAP)
80 		{
81 			if(walkLength(queue[]) == lengthCap)
82 			{
83 				goto unlock;
84 			}
85 		}
86 
87 		/* Add it to the queue */
88 		queue ~= item;
89 
90 		unlock:
91 
92 		/* Unlock the queue */
93 		queueLock.unlock();
94 	}
95 
96 	/**
97 	* Returns true if this queue has items ready
98 	* to be dequeued, false otherwise
99 	*/
100 	public bool poll()
101 	{
102 		/* Status */
103 		bool status;
104 
105 		/* Lock the queue */
106 		queueLock.lock();
107 
108 		status = !queue.empty();
109 
110 		/* Unlock the queue */
111 		queueLock.unlock();
112 
113 		return status;
114 	}
115 
116 	/**
117 	* Attempts to coninuously dequeue the
118 	* head of the queue
119 	*
120 	* TODO: Add a timeout capability
121 	* TODO: Add tryLock, yield on failure (with loop for recheck ofc)
122 	* TODO: Possible multiple dequeue feature? Like .receive
123 	*/
124 	public QueueItem dequeue()
125 	{
126 		/* The head of the queue */
127 		QueueItem queueHead;
128 
129 		while(!queueHead)
130 		{
131 			/* Lock the queue */
132 			queueLock.lock();
133 
134 			/* Check if we can dequeue anything */
135 			if(!queue.empty())
136 			{
137 				/* If we can then dequeue */
138 				queueHead = queue.front();
139 				queue.removeFront();
140 
141 				/* Chop off the head */
142 				// offWithTheHead();
143 			}
144 
145 			/* Unlock the queue */
146 			queueLock.unlock();
147 
148 
149 			/**
150 			* Move away from this thread, let
151 			* the watcher (presumably) try
152 			* access our queue (successfully)
153 			* by getting a lock on it
154 			*
155 			* Prevents us possibly racing back
156 			* and locking queue again hence
157 			* starving the system
158 			*/
159 			Thread.getThis().yield();	
160 		}
161 		
162 		return queueHead;
163 	}
164 
165 	/**
166 	* Returns the tag for this queue
167 	*/
168 	public ulong getTag()
169 	{
170 		return tag;
171 	}
172 }