From 632a12dac2b56212bdd92009ecdd31090bbf1f5f Mon Sep 17 00:00:00 2001 From: Gemini Lasswell Date: Mon, 8 Oct 2018 12:35:00 -0700 Subject: [PATCH] Add thread-safe messages and thread-safe queues * lisp/thread.el (thread--message): New cl-defstruct. (thread-message-value, thread-message-send) (thread-message-cancel, thread-message-wait): New functions. (thread--queue): New cl-defstruct. (thread-queue-empty-p, thread-queue-full-p) (thread-queue-length, thread-queue-remove-all) (thread-queue-put, thread-queue-get): New functions. --- lisp/thread.el | 115 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/lisp/thread.el b/lisp/thread.el index 7974a2603cb..65825102922 100644 --- a/lisp/thread.el +++ b/lisp/thread.el @@ -196,5 +196,120 @@ Ask for user confirmation before signaling the thread." (and (eq thread main-thread) "Main") (prin1-to-string thread))) +;;; Thread-safe messages + +(cl-defstruct + (thread--message + (:constructor + thread-make-message (&optional name + &aux + (mutex (make-mutex name)) + (condition + (make-condition-variable mutex name))))) + name mutex value condition) + +(defun thread-message-value (message) + "Return the value of MESSAGE." + (thread--message-value message)) + +(defun thread-message-send (message value) + "Set the VALUE of MESSAGE, and awaken all threads waiting for it." + (with-mutex (thread--message-mutex message) + (setf (thread--message-value message) value) + (condition-notify (thread--message-condition message) t))) + +(defun thread-message-cancel (message) + "Cancel MESSAGE by setting its value to nil." + (with-mutex (thread--message-mutex message) + (setf (thread--message-value message) nil))) + +(defun thread-message-wait (message &optional cancel) + "If MESSAGE's value is nil, block until it is set to something else. +Return the value of MESSAGE. If CANCEL is non-nil, clear MESSAGE +by setting its value to nil. If multiple threads are waiting on +the same message, and all pass a non-nil CANCEL, then only one +thread will unblock and receive the message's value, and the +others will continue to block." + (with-mutex (thread--message-mutex message) + (while (not (thread--message-value message)) + (condition-wait (thread--message-condition message))) + (let ((value (thread--message-value message))) + (when cancel + (setf (thread--message-value message) nil)) + value))) + +;;; Thread-safe queues + +(cl-defstruct (thread--queue + (:constructor + thread-make-queue (&optional + size-limit + type + &aux + (fifo (eq type 'fifo)) + (limit (when (natnump size-limit) size-limit)) + (mutex (make-mutex)) + (not-full (make-condition-variable mutex)) + (not-empty (make-condition-variable mutex))))) + fifo + limit + items + mutex + not-full + not-empty) + +(defun thread-queue-empty-p (queue) + "Return non-nil if QUEUE is empty. +There is no guarantee that QUEUE will contain the same number of +items the next time you access it." + (with-mutex (thread--queue-mutex queue) + (null (thread--queue-items queue)))) + +(defun thread-queue-full-p (queue) + "Return non-nil if QUEUE is full. +There is no guarantee that QUEUE will contain the same number of +items the next time you access it." + (when (thread--queue-limit queue) + (with-mutex (thread--queue-mutex queue) + (= (length (thread--queue-items queue)) (thread--queue-limit queue))))) + +(defun thread-queue-length (queue) + "Return the number of items in QUEUE. +There is no guarantee that QUEUE will contain the same number of +items the next time you access it." + (with-mutex (thread--queue-mutex queue) + (length (thread--queue-items queue)))) + +(defun thread-queue-remove-all (queue) + "Discard any items in QUEUE." + (with-mutex (thread--queue-mutex queue) + (setf (thread--queue-items queue) nil) + (condition-notify (thread--queue-not-full queue)))) + +(defun thread-queue-put (item queue) + "Put ITEM into QUEUE. +If QUEUE was created with a size limit, and already contains that many items, +block until one is removed." + (with-mutex (thread--queue-mutex queue) + (while (and (thread--queue-limit queue) + (= (length (thread--queue-items queue)) (thread--queue-limit queue))) + (condition-wait (thread--queue-not-full queue))) + (if (thread--queue-fifo queue) + (setf (thread--queue-items queue) + (nconc (thread--queue-items queue) (list item))) + (push item (thread--queue-items queue))) + (condition-notify (thread--queue-not-empty queue)))) + +(defun thread-queue-get (queue) + "Remove an item from QUEUE and return it. +If there are no items in QUEUE, block until one is added." + (with-mutex (thread--queue-mutex queue) + (while (null (thread--queue-items queue)) + (condition-wait (thread--queue-not-empty queue))) + (let ((item (pop (thread--queue-items queue)))) + (condition-notify (thread--queue-not-full queue)) + item))) + + (provide 'thread) ;;; thread.el ends here -- 2.39.5