1 module tristanable.watcher; 2 3 import std.socket : Socket; 4 import core.sync.mutex : Mutex; 5 import bmessage : receiveMessage; 6 import tristanable.queue : Queue; 7 import tristanable.queueitem : QueueItem; 8 import tristanable.manager : Manager; 9 import core.thread : Thread; 10 import tristanable.encoding; 11 12 public final class Watcher : Thread 13 { 14 /* The manager */ 15 private Manager manager; 16 17 /* The socket to read from */ 18 private Socket socket; 19 20 this(Manager manager, Socket endpoint) 21 { 22 super(&run); 23 this.manager = manager; 24 socket = endpoint; 25 26 start(); 27 } 28 29 private void run() 30 { 31 /* Continuously dequeue tristanable packets from socket */ 32 while(true) 33 { 34 /* Receive payload (tag+data) */ 35 byte[] receivedPayload; 36 37 /* Block for socket response */ 38 bool recvStatus = receiveMessage(socket, receivedPayload); 39 40 /* If the receive was successful */ 41 if(recvStatus) 42 { 43 /* Decode the ttag-encoded message */ 44 DataMessage message = DataMessage.decode(receivedPayload); 45 46 /* TODO: Remove isTag, improve later, oneshot */ 47 48 /* The matching queue (if any) */ 49 Queue queue = manager.getQueue(message.getTag()); 50 51 /* If the tag belongs to a queue */ 52 if(queue) 53 { 54 /* Add an item to this queue */ 55 queue.enqueue(new QueueItem(message.getData())); 56 } 57 /* If the tag is unknwon */ 58 else 59 { 60 /* TODO: Add to dropped queue? */ 61 62 /* Do nothing */ 63 } 64 } 65 /* If the receive failed */ 66 else 67 { 68 /* TODO: Stop everything */ 69 break; 70 } 71 72 /** 73 * Like in `dequeue` we don't want the possibility 74 * of racing back to the top of the loop and locking 75 * the mutex again right before a thread switch, 76 * so we make sure that a switch occurs to a different 77 * thread 78 */ 79 Thread.getThis().yield(); 80 } 81 } 82 }