;;;; Basic FIFO (defstruct queue (head nil) (tail nil)) (defun queue-empty-p (queue) (null (queue-head queue))) (defun enqueue (queue item) (cond ((null (queue-head queue)) (assert (null (queue-tail queue))) (let ((cons (list item))) (setf (queue-head queue) cons (queue-tail queue) cons))) (t (let ((cons (list item))) (setf (cdr (queue-tail queue)) cons (queue-tail queue) cons)))) queue) (defun dequeue (queue) (assert (queue-head queue)) (prog1 (pop (queue-head queue)) (when (null (queue-head queue)) (setf (queue-tail queue) nil)))) (defun clear-queue (queue) (setf (queue-tail queue) nil) (shiftf (queue-head queue) nil)) (defun join-queue (destination source) (cond ((null (queue-head source)) (assert (null (queue-tail source)))) ((null (queue-head destination)) (assert (null (queue-tail destination))) (setf (queue-head destination) (queue-head source) (queue-tail destination) (queue-tail source))) (t (assert (queue-tail source)) (assert (queue-tail destination)) (shiftf (cdr (queue-tail destination)) (queue-head source) nil) (shiftf (queue-tail destination) (queue-tail source) nil))) destination) ;;;; At last, interesting infrastructure ;; A future has a queue of waiters, or a known value. ;; It starts with a queue of waiters, and no value. ;; When a value is known, ;; it's either another future, in which case waiters are forwarded to ;; that second future (and the queue cleared), ;; or a list of genuine return values, in which case the waiters ;; are ready to be executed. (defclass future () ;; a queue or :done if all computed ((queue :initarg queue :initform (make-queue) :accessor queue) ;; :none if not yet computed, ;; a future if waiters should be forwarded ;; a list of values otherwise (values :initarg values :initform :none :accessor future-values))) ;; A callback is a future that also has a function to execute ;; when whatever it's waiting on is ready to run (defclass callback (future) ((function :initarg function :reader callback-function))) (defvar *ready-queue* (make-queue) "Queue of (callback . argument list)") ;; To trigger a future, we check that no value has been provided yet ;; and that none of the values is a future. ;; ;; The values are recorded so that future %attach can use them directly ;; and the queue cleared and dumped in the ready queue. (defun %trigger (future values) (check-type values list) (assert (eql (future-values future) :none)) (assert (notany (lambda (x) (typep x 'future)) values)) (setf (future-values future) values) (dolist (callback (clear-queue (shiftf (queue future) :done))) (enqueue *ready-queue* (cons callback values)))) ;; To attach a new callback, we walk chains of futures that returned ;; a future themselves. ;; Once the end has been reached, either the values are already known, ;; and the callback can be applied directly, or it must be attached. ;; If it's attached, we create a new callback object, and enqueue it ;; in the future's waiters. (defun %attach (future function) (loop while (typep (future-values future) 'future) do (setf future (future-values future))) (unless (eql (future-values future) :none) (return-from %attach (apply function (future-values future)))) (assert (queue-p (queue future))) (let ((self (make-instance 'callback 'function function))) (enqueue (queue future) self) self)) ;; Running a callback on a list of arguments should just be a ;; call to APPLY, and then to %TRIGGER (to mark waiters as ready) ;; ... except that we have to check if a future evaluates to ;; another future. ;; In that case, the callback is marked as such, and the queue joined ;; into the new future's queue. (defun run (callback arguments) (let ((values (multiple-value-list (apply (callback-function callback) arguments)))) (cond ((typep values '(cons future null)) (let ((next (first values)) (queue (shiftf (queue callback) :done))) (setf (future-values callback) next) (join-queue (queue next) queue))) (t (%trigger callback values)))) nil) ;; Just keep executing callbacks that are ready for execution ;; until none is left (defun run-ready-queue () (loop until (queue-empty-p *ready-queue*) do (destructuring-bind (callback . arguments) (dequeue *ready-queue*) (run callback arguments)))) ;;;; Dummy async blocking operations: asking the user for input (defvar *wait-queue* (make-queue)) ;; Blocking operations is simply asking for user input at the REPL (defclass user-prompt (future) ((prompt :initarg prompt :reader prompt-of))) (defun prompt (string &rest arguments) (let ((future (make-instance 'user-prompt 'prompt (apply 'format nil string arguments)))) (enqueue *wait-queue* future) future)) (defun run-prompts () (loop until (progn (run-ready-queue) (queue-empty-p *wait-queue*)) do (let ((prompt (dequeue *wait-queue*))) (format t "Prompt: ~A? " (prompt-of prompt)) (%trigger prompt (list (read)))))) ;;;; Basic interface (defmacro attach (future callback) (let ((_callback (gensym "CALLBACK")) (_values (gensym "VALUES"))) `(multiple-value-call (lambda (&rest ,_values) (let ((,_callback ,callback)) ;; SBCL-optimised code (cond ((and (= 1 (length ,_values)) (typep (nth 0 ,_values) 'future)) (%attach (nth 0 ,_values) ,_callback)) (t (apply ,_callback ,_values))))) ,future))) ;;;; Sweet interface (defmacro mprogn (&body body) (if (null (rest body)) `(progn ,(first body)) (let ((ignored (gensym "IGNORED"))) `(attach ,(first body) (lambda (&rest ,ignored) (declare (ignore ,ignored)) (mprogn ,@(rest body))))))) (defmacro mlet* ((&rest bindings) &body body) (cond ((null bindings) `(mprogn ,@body)) (t (destructuring-bind ((name value) . bindings) bindings (let ((ignored (gensym "IGNORED"))) `(attach ,value (lambda (&optional ,name &rest ,ignored) (declare (ignore ,ignored)) (mlet* ,bindings ,@body)))))))) ;; this lets the values run in parallel (defmacro mlet ((&rest bindings) &body body) (let ((gensyms (loop for binding in bindings collect (list (gensym "TMP") (second binding))))) `(let ,gensyms (mlet* ,(mapcar (lambda (binding gensym) (list (first binding) (first gensym))) bindings gensyms) ,@body)))) (defmacro mmultiple-value-bind ((&rest variables) form &body body) (let ((ignored (gensym "IGNORED"))) `(attach ,form (lambda (&optional ,@variables &rest ,ignored) (declare (ignore ,ignored)) ,@body)))) (defmacro with-async-context (() &body body) `(let ((*ready-queue* (make-queue)) (*wait-queue* (make-queue))) ,@body (run-prompts))) #|| Toy example ;; simple case: just wait for input and return it, but ;; as multiple values, because we can. CL-USER> (defun frobnicate-values (&rest values) (mlet* ((header (prompt "Header for ~A" values))) (apply 'values header values))) FROBNICATE-VALUES ;; slightly more complicated case: wait for a sequence of ;; input, and tail-call into another blocking function. CL-USER> (defun ask-foo-bar () (mlet* ((foo (prompt "Foo")) (bar (prompt "Bar (Foo = ~A)" foo)) (baz 42)) (frobnicate-values foo bar baz))) ASK-FOO-BAR ;; receive multiple values from a function with nested blocking ;; but that is transparent to the caller. CL-USER> (with-async-context () (mmultiple-value-bind (foo bar baz quux) (ask-foo-bar) (format t "Foo bar: ~{~A ~}~%" (list foo bar baz quux)))) Prompt: Foo? foo Prompt: Bar (Foo = FOO)? bar Prompt: Header for (FOO BAR 42)? header Foo bar: HEADER FOO BAR 42 NIL ||#