1 module tasky.engine; 2 3 import std.container.dlist : DList; 4 import core.sync.mutex : Mutex; 5 import tristanable.manager; 6 import std.socket; 7 import tristanable.queue : Queue; 8 import tristanable.queueitem; 9 import tristanable.encoding : DataMessage, encodeForSend; 10 import eventy; 11 12 import core.thread : Thread; 13 14 import std.stdio; 15 16 unittest 17 { 18 import std.stdio; 19 20 /** 21 * Server process 22 */ 23 Socket servSocket = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP); 24 servSocket.bind(parseAddress("::1", 0)); 25 servSocket.listen(0); 26 27 auto serverThread = new class Thread 28 { 29 this() 30 { 31 super(&worker); 32 } 33 34 private void worker() 35 { 36 37 while(true) 38 { 39 Socket client = servSocket.accept(); 40 41 import bmessage; 42 43 byte[] data; 44 receiveMessage(client, data); 45 writeln("Server received: ", data); 46 47 byte[] dataOut = [65,66,66,65]; 48 DataMessage dOut = new DataMessage(0, dataOut); 49 client.send(bmessage.encodeBformat(dOut.encode())); 50 51 52 /* Wait for a single byte (for preparation) */ 53 // byte[] k = [1]; 54 // client.receive(k); 55 } 56 } 57 }; 58 59 /* Start the server thread */ 60 serverThread.start(); 61 62 /* Open a socket to the server */ 63 Socket conn = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP); 64 conn.connect(servSocket.localAddress); 65 66 /* Start the task manager */ 67 TaskManager taskManager = new TaskManager(conn); 68 69 /* Create a Task to submit as a job */ 70 TestTask testTask = new TestTask("Hello, world, this is a test message"); 71 taskManager.submitTask(testTask); 72 73 74 } 75 76 public final class TaskManager : Thread 77 { 78 /** 79 * Job queue 80 */ 81 private DList!(Job) jobs; 82 private Mutex jobsLock; 83 84 /* 85 * Tristanable queue filter 86 */ 87 private Manager manager; 88 89 /** 90 * Event-loop 91 */ 92 private Engine eventEngine; 93 94 this(Socket socket) 95 { 96 super(&worker); 97 98 /* Initialize tristanable */ 99 manager = new Manager(socket); 100 101 /* Initialize the event-loop */ 102 eventEngine = new Engine(); 103 104 /* Start the event engine */ 105 eventEngine.start(); 106 107 /* Initialize job queue lock */ 108 jobsLock = new Mutex(); 109 110 /* Start the thread */ 111 start(); 112 } 113 114 private void worker() 115 { 116 while(true) 117 { 118 /* Lock the job queue */ 119 jobsLock.lock(); 120 121 /* Clean list (list of jobs to be removed) */ 122 Job[] cleanList; 123 124 foreach(Job job; jobs) 125 { 126 /* If the job is fulfilled */ 127 if(job.isFulfilled()) 128 { 129 /* Get the Event for dispatching */ 130 Event dispatchEvent = job.getEventForDispatch(); 131 132 /* Dispatch the event */ 133 eventEngine.push(dispatchEvent); 134 135 /* Free the tristanable tag for this job */ 136 job.complete(); 137 138 /* Add job to the deletion queue */ 139 cleanList ~= job; 140 } 141 142 143 } 144 145 /* Delete tje jobs */ 146 foreach(Job job; cleanList) 147 { 148 jobs.linearRemoveElement(job); 149 } 150 151 /* Unlock the job queue */ 152 jobsLock.unlock(); 153 } 154 } 155 156 157 /** 158 * Job 159 * 160 * Represents an enqueued (in-progress) task with 161 * an associated tristanable tag 162 * 163 * Created by the task manager and not to be used 164 * by the user at all 165 */ 166 private final class Job 167 { 168 private Task task; 169 private Queue tristanableTag; 170 171 this(Task task, Queue tristanableTag) 172 { 173 this.task = task; 174 this.tristanableTag = tristanableTag; 175 } 176 177 public Task getTask() 178 { 179 return task; 180 } 181 182 public DataMessage encode() 183 { 184 /* Get the Task's data to be sent */ 185 byte[] taskPayload = task.getData(); 186 187 /* Encode into tristanable format */ 188 DataMessage tEncoded = new DataMessage(tristanableTag.getTag(), taskPayload); 189 190 return tEncoded; 191 } 192 193 public Event getEventForDispatch() 194 { 195 /* Dequeue the data from the tristanable queue */ 196 QueueItem queueItem = tristanableTag.dequeue(); 197 byte[] receivedData = queueItem.getData(); 198 199 /* Parse into Event (based on the Job's task type) and return */ 200 Event eventToDispatch = task.getEvent(receivedData); 201 202 return eventToDispatch; 203 } 204 205 public bool isFulfilled() 206 { 207 return tristanableTag.poll(); 208 } 209 210 public void complete() 211 { 212 manager.removeQueue(tristanableTag); 213 } 214 } 215 216 /* 217 * Registers the type of Task by the Event it returns 218 * 219 * This is always called by `submitTask` but is only 220 * ever used once to 221 */ 222 public void registerTaskType(Task task) 223 { 224 /* Task typeID */ 225 ulong typeID = task.getTypeID(); 226 227 /* Get the EventHandler */ 228 EventHandler handler = task.getHandler(); 229 230 /* Check if there is already such a handler */ 231 /* FIXME: This should (in eventy) take a ulong, semantics of taking in EVent give it a weird meaning */ 232 bool signalExists = eventEngine.getSignalsForEvent(new Event(typeID)).length > 0; 233 234 /* If no such signal handler exists, then add it */ 235 if(!signalExists) 236 { 237 Signal signalHandler = new Signal([typeID], handler); 238 eventEngine.addSignalHandler(signalHandler); 239 240 /* Because this happens at the same time and a queue for this type would exist add that too */ 241 /* TODO: Make eventy crash if typeID for non-existent queue */ 242 eventEngine.addQueue(typeID); 243 } 244 245 } 246 247 /** 248 * Submits a new Task, enqueues it as a job, 249 * sends the payload 250 */ 251 public void submitTask(Task task) 252 { 253 /* Get a unique tristanable ID for the new job */ 254 Queue newQueue = manager.generateQueue(); 255 256 /* If the queue generation was successful */ 257 if(newQueue) 258 { 259 /* Register the task (if not already done) */ 260 registerTaskType(task); 261 262 /* Create a new job */ 263 Job newJob = new Job(task, newQueue); 264 265 /* Lock the job queue */ 266 jobsLock.lock(); 267 268 /* Enqueue the job */ 269 jobs ~= newJob; 270 271 /* Unlock the job queue */ 272 jobsLock.unlock(); 273 274 /* Get the DataMessage of the job */ 275 DataMessage jobDMessage = newJob.encode(); 276 277 /* Encode for sending (bformat) */ 278 byte[] bEncoded = encodeForSend(jobDMessage); 279 280 /* Send the payload */ 281 manager.getSocket().send(bEncoded); 282 } 283 /* If unsuccessful, throw exception */ 284 else 285 { 286 /* TODO: Add an exception */ 287 } 288 289 /* Lock the jobs */ 290 } 291 292 } 293 294 295 296 /** 297 * Represents a Task 298 */ 299 public abstract class Task 300 { 301 private byte[] data; 302 private ulong typeID; 303 private EventHandler handler; 304 305 /* 306 * Constructs a new Task with the given data to be 307 * sent and a typeID that reoresents which Signal 308 * handler to call 309 */ 310 this(byte[] data, ulong typeID, EventHandler handler) 311 { 312 this.data = data; 313 this.typeID = typeID; 314 this.handler = handler; 315 } 316 317 public byte[] getData() 318 { 319 return data; 320 } 321 322 public ulong getTypeID() 323 { 324 return typeID; 325 } 326 327 public EventHandler getHandler() 328 { 329 return handler; 330 } 331 332 /** 333 * Intended to take the received data from the Job's 334 * tristanable queue and decode it as per this Task's 335 * type 336 */ 337 public abstract Event getEvent(byte[] dataIn); 338 } 339 340 public final class TestTask : Task 341 { 342 this(string payloadOut) 343 { 344 super(cast(byte[])payloadOut, 69, &TestTaskHandlerFunc); 345 } 346 347 private static void TestTaskHandlerFunc(Event e) 348 { 349 import std.stdio; 350 writeln("<<<<Tasky task diapatched>>>>", e); 351 } 352 353 public override Event getEvent(byte[] dataIn) 354 { 355 auto event = new class Event 356 { 357 this() 358 { 359 /* TestTask is of type 69 for signal dispatching in Eventy */ 360 super(getTypeID()); 361 } 362 }; 363 364 return event; 365 } 366 }