1 module tristanable.watcher; 2 3 import std.socket : Socket; 4 import core.sync.mutex : Mutex; 5 import bmessage : receiveMessage; 6 import tristanable.queue : Queue; 7 import tristanable.queueitem : QueueItem; 8 import tristanable.manager : Manager; 9 import core.thread : Thread; 10 import tristanable.encoding; 11 import tristanable.exceptions; 12 13 public final class Watcher : Thread 14 { 15 /* The manager */ 16 private Manager manager; 17 18 /* The socket to read from */ 19 private Socket socket; 20 21 private bool running; 22 23 this(Manager manager, Socket endpoint) 24 { 25 super(&run); 26 this.manager = manager; 27 socket = endpoint; 28 29 running = true; 30 start(); 31 } 32 33 public void shutdown() 34 { 35 running=false; 36 37 /* Close the socket, causing an error, breaking the event loop */ 38 socket.close(); 39 40 } 41 42 private void run() 43 { 44 /* Continuously dequeue tristanable packets from socket */ 45 while(true) 46 { 47 /* Receive payload (tag+data) */ 48 byte[] receivedPayload; 49 50 /* Block for socket response */ 51 bool recvStatus = receiveMessage(socket, receivedPayload); 52 53 /* If the receive was successful */ 54 if(recvStatus) 55 { 56 /* Decode the ttag-encoded message */ 57 DataMessage message = DataMessage.decode(receivedPayload); 58 59 /* TODO: Remove isTag, improve later, oneshot */ 60 61 /* The matching queue (if any) */ 62 Queue queue = manager.getQueue(message.getTag()); 63 64 /* If the tag belongs to a queue */ 65 if(queue) 66 { 67 /* Add an item to this queue */ 68 queue.enqueue(new QueueItem(message.getData())); 69 } 70 /* If the tag is unknwon */ 71 else 72 { 73 /* TODO: Add to dropped queue? */ 74 75 /* Do nothing */ 76 } 77 } 78 /* If the receive failed */ 79 else 80 { 81 /* TODO: depending on `running`, different error */ 82 83 /* TODO: Stop everything */ 84 break; 85 } 86 87 /** 88 * Like in `dequeue` we don't want the possibility 89 * of racing back to the top of the loop and locking 90 * the mutex again right before a thread switch, 91 * so we make sure that a switch occurs to a different 92 * thread 93 */ 94 Thread.getThis().yield(); 95 } 96 97 /* Check if we had an error */ 98 if(running) 99 { 100 throw new TristanableException(manager, "bformat socket error"); 101 } 102 else 103 { 104 /* Actual shut down, do nothing */ 105 } 106 } 107 }