1 /** 2 * Queue 3 * 4 * Represents a queue with a tag. 5 * 6 * Any messages that are received with 7 * the matching tag (to this queue) are 8 * then enqueued to this queue 9 */ 10 11 module tristanable.queue; 12 13 import tristanable.queueitem : QueueItem; 14 import std.socket : Socket; 15 import core.sync.mutex : Mutex; 16 import bmessage : bSendMessage = sendMessage; 17 import core.thread : Thread; 18 import std.container.dlist; 19 import std.range : walkLength; 20 21 public enum QueuePolicy : ubyte 22 { 23 LENGTH_CAP = 1 24 } 25 26 public final class Queue 27 { 28 /* This queue's tag */ 29 private ulong tag; 30 31 /* The queue */ 32 private DList!(QueueItem) queue; 33 34 /* The queue mutex */ 35 private Mutex queueLock; 36 37 /** 38 * Construct a new queue with the given 39 * tag 40 */ 41 this(ulong tag, QueuePolicy flags = cast(QueuePolicy)0) 42 { 43 this.tag = tag; 44 45 /* Initialize the mutex */ 46 queueLock = new Mutex(); 47 48 this.flags = flags; 49 } 50 51 public void setLengthCap(ulong lengthCap) 52 { 53 this.lengthCap = lengthCap; 54 } 55 56 public ulong getLengthCap(ulong lengthCap) 57 { 58 return lengthCap; 59 } 60 61 /** 62 * Queue policy settings 63 */ 64 private ulong lengthCap = 1; 65 private QueuePolicy flags; 66 67 68 public void enqueue(QueueItem item) 69 { 70 /* Lock the queue */ 71 queueLock.lock(); 72 73 /** 74 * Check to see if the queue has a length cap 75 * 76 * If so then determine whether to drop or 77 * keep dependent on current capacity 78 */ 79 if(flags & QueuePolicy.LENGTH_CAP) 80 { 81 if(walkLength(queue[]) == lengthCap) 82 { 83 goto unlock; 84 } 85 } 86 87 /* Add it to the queue */ 88 queue ~= item; 89 90 unlock: 91 92 /* Unlock the queue */ 93 queueLock.unlock(); 94 } 95 96 /** 97 * Returns true if this queue has items ready 98 * to be dequeued, false otherwise 99 */ 100 public bool poll() 101 { 102 /* Status */ 103 bool status; 104 105 /* Lock the queue */ 106 queueLock.lock(); 107 108 status = !queue.empty(); 109 110 /* Unlock the queue */ 111 queueLock.unlock(); 112 113 return status; 114 } 115 116 /** 117 * Attempts to coninuously dequeue the 118 * head of the queue 119 * 120 * TODO: Add a timeout capability 121 * TODO: Add tryLock, yield on failure (with loop for recheck ofc) 122 * TODO: Possible multiple dequeue feature? Like .receive 123 */ 124 public QueueItem dequeue() 125 { 126 /* The head of the queue */ 127 QueueItem queueHead; 128 129 while(!queueHead) 130 { 131 /* Lock the queue */ 132 queueLock.lock(); 133 134 /* Check if we can dequeue anything */ 135 if(!queue.empty()) 136 { 137 /* If we can then dequeue */ 138 queueHead = queue.front(); 139 queue.removeFront(); 140 141 /* Chop off the head */ 142 // offWithTheHead(); 143 } 144 145 /* Unlock the queue */ 146 queueLock.unlock(); 147 148 149 /** 150 * Move away from this thread, let 151 * the watcher (presumably) try 152 * access our queue (successfully) 153 * by getting a lock on it 154 * 155 * Prevents us possibly racing back 156 * and locking queue again hence 157 * starving the system 158 */ 159 Thread.getThis().yield(); 160 } 161 162 return queueHead; 163 } 164 165 /** 166 * Returns the tag for this queue 167 */ 168 public ulong getTag() 169 { 170 return tag; 171 } 172 }