1 module tristanable.watcher;
2 
3 import std.socket : Socket;
4 import core.sync.mutex : Mutex;
5 import bmessage : receiveMessage;
6 import tristanable.queue : Queue;
7 import tristanable.queueitem : QueueItem;
8 import tristanable.manager : Manager;
9 import core.thread : Thread;
10 import tristanable.encoding;
11 
12 public final class Watcher : Thread
13 {
14 	/* The manager */
15 	private Manager manager;
16 
17 	/* The socket to read from */
18 	private Socket socket;
19 
20 	this(Manager manager, Socket endpoint)
21 	{
22 		super(&run);
23 		this.manager = manager;
24 		socket = endpoint;
25 
26 		start();
27 	}
28 
29 	private void run()
30 	{
31 		/* Continuously dequeue tristanable packets from socket */
32 		while(true)
33 		{
34 			/* Receive payload (tag+data) */
35 			byte[] receivedPayload;
36 			
37 			/* Block for socket response */
38 			bool recvStatus = receiveMessage(socket, receivedPayload);
39 
40 			/* If the receive was successful */
41 			if(recvStatus)
42 			{
43 				/* Decode the ttag-encoded message */
44 				DataMessage message = DataMessage.decode(receivedPayload);
45 
46 				/* TODO: Remove isTag, improve later, oneshot */
47 
48 				/* The matching queue (if any) */
49 				Queue queue = manager.getQueue(message.getTag());
50 
51 				/* If the tag belongs to a queue */
52 				if(queue)
53 				{
54 					/* Add an item to this queue */
55 					queue.enqueue(new QueueItem(message.getData()));
56 				}
57 				/* If the tag is unknwon */
58 				else
59 				{
60 					/* TODO: Add to dropped queue? */
61 
62 					/* Do nothing */
63 				}
64 			}
65 			/* If the receive failed */
66 			else
67 			{
68 				/* TODO: Stop everything */
69 				break;
70 			}
71 		
72 			/**
73 			* Like in `dequeue` we don't want the possibility
74 			* of racing back to the top of the loop and locking
75 			* the mutex again right before a thread switch,
76 			* so we make sure that a switch occurs to a different
77 			* thread
78 			*/
79 			Thread.getThis().yield();
80 		}
81 	}
82 }