From 213cfc8f7253d6055883f6b3fb213cb4e0dffdc0 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 19:38:41 -0400 Subject: better lock taking using STM, and wait for concurrent processes writer threads on shutdown --- src/Utility/ConcurrentOutput.hs | 158 +++++++++++++++++++++------------------- 1 file changed, 83 insertions(+), 75 deletions(-) (limited to 'src/Utility') diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index 091513d0..a8de8ed1 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -15,6 +15,7 @@ module Utility.ConcurrentOutput ( import System.IO import System.Posix.IO import System.Directory +import System.Exit import Control.Monad import Control.Monad.IO.Class (liftIO, MonadIO) import Control.Applicative @@ -27,25 +28,28 @@ import Data.List import Data.Monoid import qualified Data.ByteString as B import qualified System.Process as P -import System.Exit +import qualified Data.Set as S import Utility.Monad import Utility.Exception +import Utility.FileSystemEncoding data OutputHandle = OutputHandle - { outputLock :: TMVar (Maybe Locker) + { outputLock :: TMVar Lock + , outputBuffer :: TMVar Buffer + , outputThreads :: TMVar (S.Set (Async ())) } -data Locker - = GeneralLock - | ProcessLock P.ProcessHandle +data Lock = Locked -- | A shared global variable for the OutputHandle. {-# NOINLINE globalOutputHandle #-} globalOutputHandle :: MVar OutputHandle globalOutputHandle = unsafePerformIO $ newMVar =<< OutputHandle - <$> newTMVarIO Nothing + <$> newEmptyTMVarIO + <*> newTMVarIO [] + <*> newTMVarIO S.empty -- | Gets the global OutputHandle. getOutputHandle :: IO OutputHandle @@ -64,60 +68,34 @@ takeOutputLock = void $ takeOutputLock' True tryTakeOutputLock :: IO Bool tryTakeOutputLock = takeOutputLock' False -withLock :: (TMVar (Maybe Locker) -> STM a) -> IO a +withLock :: (TMVar Lock -> STM a) -> IO a withLock a = do lck <- outputLock <$> getOutputHandle atomically (a lck) --- The lock TMVar is kept full normally, even if only with Nothing, --- so if we take it here, that blocks anyone else from trying --- to take the lock while we are checking it. takeOutputLock' :: Bool -> IO Bool -takeOutputLock' block = go =<< withLock tryTakeTMVar - where - go Nothing = whenblock waitlockchange - -- Something has the lock. It may be stale, so check it. - -- We must always be sure to fill the TMVar back with Just or Nothing. - go (Just orig) = case orig of - Nothing -> havelock - (Just (ProcessLock h)) -> - -- when process has exited, lock is stale - ifM (isJust <$> P.getProcessExitCode h) - ( havelock - , if block - then do - void $ waitForProcessConcurrent h - havelock - else do - withLock (`putTMVar` orig) - return False - ) - (Just GeneralLock) -> do - withLock (`putTMVar` orig) - whenblock waitlockchange - - havelock = do - withLock (`putTMVar` Just GeneralLock) - return True - - -- Wait for the lock to change, and try again. - waitlockchange = do - void $ withLock readTMVar - takeOutputLock' block - - whenblock a = if block then a else return False +takeOutputLock' block = do + locked <- withLock $ \l -> do + v <- tryTakeTMVar l + case v of + Just Locked + | block -> retry + | otherwise -> do + -- Restore value we took. + putTMVar l Locked + return False + Nothing -> do + putTMVar l Locked + return True + when locked $ do + bv <- outputBuffer <$> getOutputHandle + buf <- atomically $ swapTMVar bv [] + emitBuffer stdout buf + return locked -- | Only safe to call after taking the output lock. dropOutputLock :: IO () -dropOutputLock = withLock $ \l -> do - void $ takeTMVar l - putTMVar l Nothing - --- | Only safe to call after takeOutputLock; updates the Locker. -updateOutputLocker :: Locker -> IO () -updateOutputLocker locker = withLock $ \l -> do - void $ takeTMVar l - putTMVar l (Just locker) +dropOutputLock = withLock $ void . takeTMVar -- | Use this around any IO actions that use `outputConcurrent` -- or `createProcessConcurrent` @@ -127,9 +105,17 @@ updateOutputLocker locker = withLock $ \l -> do withConcurrentOutput :: IO a -> IO a withConcurrentOutput a = a `finally` drain where - -- Just taking the output lock is enough to ensure that anything - -- that was buffering output has had a chance to flush its buffer. - drain = lockOutput noop + -- Wait for all outputThreads to finish. Then, take the output lock + -- to ensure that nothing is currently generating output, and flush + -- any buffered output. + drain = do + v <- outputThreads <$> getOutputHandle + atomically $ do + r <- takeTMVar v + if r == S.empty + then return () + else retry + lockOutput $ return () -- | Displays a string to stdout, and flush output so it's displayed. -- @@ -140,10 +126,19 @@ withConcurrentOutput a = a `finally` drain -- not block. It buffers the string, so it will be displayed once the other -- writer is done. outputConcurrent :: String -> IO () -outputConcurrent s = do - putStr s - hFlush stdout - -- TODO +outputConcurrent s = bracket setup cleanup go + where + setup = tryTakeOutputLock + cleanup False = return () + cleanup True = dropOutputLock + go True = do + putStr s + hFlush stdout + go False = do + bv <- outputBuffer <$> getOutputHandle + oldbuf <- atomically $ takeTMVar bv + newbuf <- addBuffer (Output (B.pack (decodeW8NUL s))) oldbuf + atomically $ putTMVar bv newbuf -- | This must be used to wait for processes started with -- `createProcessConcurrent`. @@ -191,10 +186,10 @@ createProcessConcurrent p firstprocess = do r@(_, _, _, h) <- P.createProcess p `onException` dropOutputLock - updateOutputLocker (ProcessLock h) - -- Output lock is still held as we return; the process - -- is running now, and once it exits the output lock will - -- be stale and can then be taken by something else. + -- Wait for the process to exit and drop the lock. + void $ async $ do + void $ tryIO $ waitForProcessConcurrent h + dropOutputLock return r concurrentprocess = do @@ -218,6 +213,7 @@ willOutput :: P.StdStream -> Bool willOutput P.Inherit = True willOutput _ = False +-- Built up with newest seen output first. type Buffer = [BufferedActivity] data BufferedActivity @@ -257,27 +253,39 @@ outputDrainer ss fromh buf bufsig putTMVar bufsig () -- Wait to lock output, and once we can, display everything --- that's put into the buffers. +-- that's put into the buffers, until the end. bufferWriter :: [(Handle, MVar Buffer, TMVar ())] -> IO () -bufferWriter = void . lockOutput . mapConcurrently go +bufferWriter l = do + worker <- async $ void $ lockOutput $ mapConcurrently go l + v <- outputThreads <$> getOutputHandle + atomically $ do + s <- takeTMVar v + putTMVar v (S.insert worker s) + void $ async $ do + void $ waitCatch worker + atomically $ do + s <- takeTMVar v + putTMVar v (S.delete worker s) where go v@(outh, buf, bufsig) = do - atomically $ takeTMVar bufsig + void $ atomically $ takeTMVar bufsig l <- takeMVar buf putMVar buf [] - forM_ (reverse l) $ \ba -> case ba of - Output b -> do - B.hPut outh b - hFlush outh - return () - InTempFile tmp -> do - B.hPut outh =<< B.readFile tmp - void $ tryWhenExists $ removeFile tmp - ReachedEnd -> return () + emitBuffer outh l if any (== ReachedEnd) l then return () else go v +emitBuffer :: Handle -> Buffer -> IO () +emitBuffer outh l = forM_ (reverse l) $ \ba -> case ba of + Output b -> do + B.hPut outh b + hFlush outh + InTempFile tmp -> do + B.hPut outh =<< B.readFile tmp + void $ tryWhenExists $ removeFile tmp + ReachedEnd -> return () + -- Adds a value to the Buffer. When adding Output to a Handle, it's cheaper -- to combine it with any already buffered Output to that same Handle. -- -- cgit v1.2.3