1 module tristanable.watcher;
2 
3 import std.socket : Socket;
4 import core.sync.mutex : Mutex;
5 import bmessage : receiveMessage;
6 import tristanable.queue : Queue;
7 import tristanable.queueitem : QueueItem;
8 import tristanable.manager : Manager;
9 import core.thread : Thread;
10 import tristanable.encoding;
11 import tristanable.exceptions;
12 
13 public final class Watcher : Thread
14 {
15 	/* The manager */
16 	private Manager manager;
17 
18 	/* The socket to read from */
19 	private Socket socket;
20 
21 	private bool running;
22 
23 	this(Manager manager, Socket endpoint)
24 	{
25 		super(&run);
26 		this.manager = manager;
27 		socket = endpoint;
28 
29 		running = true;
30 		start();
31 	}
32 
33 	public void shutdown()
34 	{
35 		running=false;
36 
37 		/* Close the socket, causing an error, breaking the event loop */
38 		socket.close();
39 		
40 	}
41 
42 	private void run()
43 	{
44 		/* Continuously dequeue tristanable packets from socket */
45 		while(true)
46 		{
47 			/* Receive payload (tag+data) */
48 			byte[] receivedPayload;
49 			
50 			/* Block for socket response */
51 			bool recvStatus = receiveMessage(socket, receivedPayload);
52 
53 			/* If the receive was successful */
54 			if(recvStatus)
55 			{
56 				/* Decode the ttag-encoded message */
57 				DataMessage message = DataMessage.decode(receivedPayload);
58 
59 				/* TODO: Remove isTag, improve later, oneshot */
60 
61 				/* The matching queue (if any) */
62 				Queue queue = manager.getQueue(message.getTag());
63 
64 				/* If the tag belongs to a queue */
65 				if(queue)
66 				{
67 					/* Add an item to this queue */
68 					queue.enqueue(new QueueItem(message.getData()));
69 				}
70 				/* If the tag is unknwon */
71 				else
72 				{
73 					/* TODO: Add to dropped queue? */
74 
75 					/* Do nothing */
76 				}
77 			}
78 			/* If the receive failed */
79 			else
80 			{
81 				/* TODO: depending on `running`, different error */
82 
83 				/* TODO: Stop everything */
84 				break;
85 			}
86 		
87 			/**
88 			* Like in `dequeue` we don't want the possibility
89 			* of racing back to the top of the loop and locking
90 			* the mutex again right before a thread switch,
91 			* so we make sure that a switch occurs to a different
92 			* thread
93 			*/
94 			Thread.getThis().yield();
95 		}
96 
97 		/* Check if we had an error */
98 		if(running)
99 		{
100 			throw new TristanableException(manager, "bformat socket error");
101 		}
102 		else
103 		{
104 			/* Actual shut down, do nothing */
105 		}
106 	}
107 }