1 module tasky.engine;
2 
3 import std.container.dlist : DList;
4 import core.sync.mutex : Mutex;
5 import tristanable.manager;
6 import std.socket;
7 import tristanable.queue : Queue;
8 import tristanable.queueitem;
9 import tristanable.encoding : DataMessage, encodeForSend;
10 import eventy;
11 
12 import core.thread : Thread;
13 
14 import std.stdio;
15 
16 unittest
17 {
18     import std.stdio;
19 
20     /**
21     * Server process
22     */
23     Socket servSocket = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
24     servSocket.bind(parseAddress("::1", 0));
25     servSocket.listen(0);
26 
27     auto serverThread = new class Thread
28     {
29         this()
30         {
31             super(&worker);
32         }
33 
34         private void worker()
35         {
36             
37             while(true)
38             {
39                 Socket client = servSocket.accept();
40 
41                 import bmessage;
42 
43                 byte[] data;
44                 receiveMessage(client, data);
45                 writeln("Server received: ", data);
46 
47                 byte[] dataOut = [65,66,66,65];
48                 DataMessage dOut = new DataMessage(0, dataOut);
49                 client.send(bmessage.encodeBformat(dOut.encode()));
50                 
51                 
52                 /* Wait for a single byte (for preparation) */
53                 // byte[] k = [1];
54                 // client.receive(k);
55             }
56         }
57     };
58 
59     /* Start the server thread */
60     serverThread.start();
61 
62     /* Open a socket to the server */
63     Socket conn = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
64     conn.connect(servSocket.localAddress);
65 
66     /* Start the task manager */
67     TaskManager taskManager = new TaskManager(conn);
68 
69     /* Create a Task to submit as a job */
70     TestTask testTask = new TestTask("Hello, world, this is a test message");
71     taskManager.submitTask(testTask);
72 
73     
74 }
75 
76 public final class TaskManager : Thread
77 {
78     /**
79     * Job queue
80     */
81     private DList!(Job) jobs;
82     private Mutex jobsLock;
83 
84     /*
85     * Tristanable queue filter
86     */
87     private Manager manager;
88 
89     /**
90     * Event-loop
91     */
92     private Engine eventEngine;
93 
94     this(Socket socket)
95     {
96         super(&worker);
97 
98         /* Initialize tristanable */
99         manager = new Manager(socket);
100 
101         /* Initialize the event-loop */
102         eventEngine = new Engine();
103 
104         /* Start the event engine */
105         eventEngine.start();
106 
107         /* Initialize job queue lock */
108         jobsLock =  new Mutex();
109 
110         /* Start the thread */
111         start();
112     }
113 
114     private void worker()
115     {
116         while(true)
117         {
118             /* Lock the job queue */
119             jobsLock.lock();
120 
121             /* Clean list (list of jobs to be removed) */
122             Job[] cleanList;
123 
124             foreach(Job job; jobs)
125             {
126                 /* If the job is fulfilled */
127                 if(job.isFulfilled())
128                 {
129                     /* Get the Event for dispatching */
130                     Event dispatchEvent = job.getEventForDispatch();
131 
132                     /* Dispatch the event */
133                     eventEngine.push(dispatchEvent);
134 
135                     /* Free the tristanable tag for this job */
136                     job.complete();
137 
138                     /* Add job to the deletion queue */
139                     cleanList ~= job;
140                 }
141 
142                 
143             }
144 
145             /* Delete tje jobs */
146             foreach(Job job; cleanList)
147             {
148                 jobs.linearRemoveElement(job);
149             }
150 
151             /* Unlock the job queue */
152             jobsLock.unlock();
153         }
154     }
155 
156 
157     /**
158     * Job
159     *
160     * Represents an enqueued (in-progress) task with
161     * an associated tristanable tag
162     *
163     * Created by the task manager and not to be used
164     * by the user at all
165     */
166     private final class Job
167     {
168         private Task task;
169         private Queue tristanableTag;
170 
171         this(Task task, Queue tristanableTag)
172         {
173             this.task = task;
174             this.tristanableTag = tristanableTag;
175         }
176 
177         public Task getTask()
178         {
179             return task;
180         }
181 
182         public DataMessage encode()
183         {
184             /* Get the Task's data to be sent */
185             byte[] taskPayload = task.getData();
186 
187             /* Encode into tristanable format */
188             DataMessage tEncoded = new DataMessage(tristanableTag.getTag(), taskPayload);
189 
190             return tEncoded;
191         }
192 
193         public Event getEventForDispatch()
194         {
195             /* Dequeue the data from the tristanable queue */
196             QueueItem queueItem = tristanableTag.dequeue();
197             byte[] receivedData = queueItem.getData();
198 
199             /* Parse into Event (based on the Job's task type) and return */
200             Event eventToDispatch = task.getEvent(receivedData);
201 
202             return eventToDispatch;
203         }
204 
205         public bool isFulfilled()
206         {
207             return tristanableTag.poll();
208         }
209 
210         public void complete()
211         {
212             manager.removeQueue(tristanableTag);
213         }
214     }
215 
216     /*
217     * Registers the type of Task by the Event it returns
218     *
219     * This is always called by `submitTask` but is only
220     * ever used once to
221     */
222     public void registerTaskType(Task task)
223     {
224         /* Task typeID */
225         ulong typeID = task.getTypeID();
226 
227         /* Get the EventHandler */
228         EventHandler handler = task.getHandler();
229 
230         /* Check if there is already such a handler */
231         /* FIXME: This should (in eventy) take a ulong, semantics of taking in EVent give it a weird meaning */
232         bool signalExists = eventEngine.getSignalsForEvent(new Event(typeID)).length > 0;
233 
234         /* If no such signal handler exists, then add it */
235         if(!signalExists)
236         {
237             Signal signalHandler = new Signal([typeID], handler);
238             eventEngine.addSignalHandler(signalHandler);
239 
240             /* Because this happens at the same time and a queue for this type would exist add that too */
241             /* TODO: Make eventy crash if typeID for non-existent queue */
242             eventEngine.addQueue(typeID);
243         }
244         
245     }
246 
247     /**
248     * Submits a new Task, enqueues it as a job,
249     * sends the payload
250     */
251     public void submitTask(Task task)
252     {
253         /* Get a unique tristanable ID for the new job */
254         Queue newQueue = manager.generateQueue();
255 
256         /* If the queue generation was successful */
257         if(newQueue)
258         {
259             /* Register the task (if not already done) */
260             registerTaskType(task);
261 
262             /* Create a new job */
263             Job newJob = new Job(task, newQueue);
264 
265             /* Lock the job queue */
266             jobsLock.lock();
267 
268             /* Enqueue the job */
269             jobs ~= newJob;
270 
271             /* Unlock the job queue */
272             jobsLock.unlock();
273 
274             /* Get the DataMessage of the job */
275             DataMessage jobDMessage = newJob.encode();
276 
277             /* Encode for sending (bformat) */
278             byte[] bEncoded = encodeForSend(jobDMessage);
279 
280             /* Send the payload */
281             manager.getSocket().send(bEncoded);
282         }
283         /* If unsuccessful, throw exception */
284         else
285         {
286             /* TODO: Add an exception */
287         }
288 
289         /* Lock the jobs */
290     }
291     
292 }
293 
294 
295 
296 /**
297 * Represents a Task
298 */
299 public abstract class Task
300 {
301     private byte[] data;
302     private ulong typeID;
303     private EventHandler handler;
304 
305     /*
306     * Constructs a new Task with the given data to be
307     * sent and a typeID that reoresents which Signal
308     * handler to call
309     */
310     this(byte[] data, ulong typeID, EventHandler handler)
311     {
312         this.data = data;
313         this.typeID = typeID;
314         this.handler = handler;
315     }
316 
317     public byte[] getData()
318     {
319         return data;
320     }
321 
322     public ulong getTypeID()
323     {
324         return typeID;
325     }
326 
327     public EventHandler getHandler()
328     {
329         return handler;
330     }
331 
332     /**
333     * Intended to take the received data from the Job's
334     * tristanable queue and decode it as per this Task's
335     * type
336     */
337     public abstract Event getEvent(byte[] dataIn);
338 }
339 
340 public final class TestTask : Task
341 {
342     this(string payloadOut)
343     {
344         super(cast(byte[])payloadOut, 69, &TestTaskHandlerFunc);
345     }
346 
347     private static void TestTaskHandlerFunc(Event e)
348     {
349         import std.stdio;
350         writeln("<<<<Tasky task diapatched>>>>", e);
351     }
352 
353     public override Event getEvent(byte[] dataIn)
354     {
355         auto event = new class Event
356         {
357             this()
358             {
359                 /* TestTask is of type 69 for signal dispatching in Eventy */
360                 super(getTypeID());
361             }
362         };
363 
364         return event;
365     }
366 }