From 111ea88d4d7c54e9ab7950962ad22528d54dd959 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 14:46:17 -0400 Subject: fix bad MVar use, use STM I had 2 MVars both involved in the same lock, and it seemed intractable to avoid deadlocks with them. STM makes it easy. At this point, the concurrent process stuff seems to work pretty well, but I'm not 100% sure it's not got some bugs. --- src/Utility/ConcurrentOutput.hs | 173 ++++++++++++++++++++++------------------ 1 file changed, 96 insertions(+), 77 deletions(-) (limited to 'src/Utility/ConcurrentOutput.hs') diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index c6550b84..5535066f 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE BangPatterns #-} + -- | Concurrent output handling. module Utility.ConcurrentOutput ( @@ -14,6 +16,7 @@ import Control.Monad.IO.Class (liftIO, MonadIO) import Control.Applicative import System.IO.Unsafe (unsafePerformIO) import Control.Concurrent +import Control.Concurrent.STM import Control.Concurrent.Async import Data.Maybe import Data.List @@ -25,21 +28,23 @@ import Utility.Monad import Utility.Exception data OutputHandle = OutputHandle - { outputLock :: MVar () -- ^ empty when locked - , outputLockedBy :: MVar Locker + { outputLock :: TMVar (Maybe Locker) } data Locker = GeneralLock - | ProcessLock P.ProcessHandle + | ProcessLock P.ProcessHandle String + +instance Show Locker where + show GeneralLock = "GeneralLock" + show (ProcessLock _ cmd) = "ProcessLock " ++ cmd -- | A shared global variable for the OutputHandle. {-# NOINLINE globalOutputHandle #-} globalOutputHandle :: MVar OutputHandle globalOutputHandle = unsafePerformIO $ newMVar =<< OutputHandle - <$> newMVar () - <*> newEmptyMVar + <$> newTMVarIO Nothing -- | Gets the global OutputHandle. getOutputHandle :: IO OutputHandle @@ -58,61 +63,69 @@ takeOutputLock = void $ takeOutputLock' True tryTakeOutputLock :: IO Bool tryTakeOutputLock = takeOutputLock' False -takeOutputLock' :: Bool -> IO Bool -takeOutputLock' block = do +withLock :: (TMVar (Maybe Locker) -> STM a) -> IO a +withLock a = do lck <- outputLock <$> getOutputHandle - go =<< tryTakeMVar lck + atomically (a lck) + +-- The lock TMVar is kept full normally, even if only with Nothing, +-- so if we take it here, that blocks anyone else from trying +-- to take the lock while we are checking it. +takeOutputLock' :: Bool -> IO Bool +takeOutputLock' block = go =<< withLock tryTakeTMVar where - -- lck was full, and we've emptied it, so we hold the lock now. - go (Just ()) = havelock - -- lck is empty, so someone else is holding the lock. - go Nothing = do - lcker <- outputLockedBy <$> getOutputHandle - v' <- tryTakeMVar lcker - case v' of - Just (ProcessLock h) -> - -- if process has exited, lock is stale - ifM (isJust <$> P.getProcessExitCode h) - ( havelock - , if block - then do - void $ P.waitForProcess h - havelock - else do - putMVar lcker (ProcessLock h) - return False - ) - Just GeneralLock -> do - putMVar lcker GeneralLock - whenblock waitlock - Nothing -> whenblock waitlock + go Nothing = whenblock waitlock + -- Something has the lock. It may be stale, so check it. + -- We must always be sure to fill the TMVar back with Just or Nothing. + go (Just orig) = case orig of + Nothing -> havelock + (Just (ProcessLock h _)) -> + -- when process has exited, lock is stale + ifM (isJust <$> P.getProcessExitCode h) + ( havelock + , if block + then do + hPutStr stderr "WAITFORPROCESS in lock" + hFlush stderr + void $ P.waitForProcess h + hPutStr stderr "WAITFORPROCESS in lock done" + hFlush stderr + havelock + else do + withLock (`putTMVar` orig) + return False + ) + (Just GeneralLock) -> do + withLock (`putTMVar` orig) + whenblock waitlock havelock = do - updateOutputLocker GeneralLock + withLock (`putTMVar` Just GeneralLock) return True - waitlock = do - -- Wait for current lock holder to relinquish - -- it and take the lock. - lck <- outputLock <$> getOutputHandle - takeMVar lck - havelock + + -- Wait for current lock holder (if any) to relinquish + -- it and take the lock for ourselves. + waitlock = withLock $ \l -> do + v <- tryTakeTMVar l + case v of + Just (Just _) -> retry + _ -> do + putTMVar l (Just GeneralLock) + return True + whenblock a = if block then a else return False -- | Only safe to call after taking the output lock. dropOutputLock :: IO () -dropOutputLock = do - lcker <- outputLockedBy <$> getOutputHandle - lck <- outputLock <$> getOutputHandle - void $ takeMVar lcker - putMVar lck () +dropOutputLock = withLock $ \l -> do + void $ takeTMVar l + putTMVar l Nothing -- | Only safe to call after takeOutputLock; updates the Locker. updateOutputLocker :: Locker -> IO () -updateOutputLocker l = do - lcker <- outputLockedBy <$> getOutputHandle - void $ tryTakeMVar lcker - putMVar lcker l - modifyMVar_ lcker (const $ return l) +updateOutputLocker locker = withLock $ \l -> do + void $ takeTMVar l + putTMVar l (Just locker) -- | Use this around any IO actions that use `outputConcurrent` -- or `createProcessConcurrent` @@ -124,7 +137,7 @@ withConcurrentOutput a = a `finally` drain where -- Just taking the output lock is enough to ensure that anything -- that was buffering output has had a chance to flush its buffer. - drain = lockOutput (return ()) + drain = lockOutput noop -- | Displays a string to stdout, and flush output so it's displayed. -- @@ -158,28 +171,25 @@ outputConcurrent s = do -- as the output lock becomes free. createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) createProcessConcurrent p - | willoutput (P.std_out p) || willoutput (P.std_err p) = + | willOutput (P.std_out p) || willOutput (P.std_err p) = ifM tryTakeOutputLock - ( do - hPutStrLn stderr "IS NOT CONCURRENT" - firstprocess - , do - hPutStrLn stderr "IS CONCURRENT" - concurrentprocess + ( firstprocess + , concurrentprocess ) | otherwise = P.createProcess p where - willoutput P.Inherit = True - willoutput _ = False + rediroutput ss h + | willOutput ss = P.UseHandle h + | otherwise = ss - rediroutput str h - | willoutput str = P.UseHandle h - | otherwise = str + cmd = case P.cmdspec p of + P.ShellCommand s -> s + P.RawCommand c ps -> unwords (c:ps) firstprocess = do r@(_, _, _, h) <- P.createProcess p `onException` dropOutputLock - updateOutputLocker (ProcessLock h) + updateOutputLocker (ProcessLock h cmd) -- Output lock is still held as we return; the process -- is running now, and once it exits the output lock will -- be stale and can then be taken by something else. @@ -196,8 +206,8 @@ createProcessConcurrent p hClose toouth hClose toerrh buf <- newMVar [] - void $ async $ outputDrainer fromouth stdout buf - void $ async $ outputDrainer fromerrh stderr buf + void $ async $ outputDrainer (P.std_out p) fromouth stdout buf + void $ async $ outputDrainer (P.std_err p) fromerrh stderr buf void $ async $ bufferWriter buf return r @@ -205,6 +215,10 @@ createProcessConcurrent p (from, to) <- createPipe (,) <$> fdToHandle to <*> fdToHandle from +willOutput :: P.StdStream -> Bool +willOutput P.Inherit = True +willOutput _ = False + type Buffer = [(Handle, BufferedActivity)] data BufferedActivity @@ -213,17 +227,22 @@ data BufferedActivity | InTempFile FilePath deriving (Eq) --- Drain output from the handle, and buffer it in memory. -outputDrainer :: Handle -> Handle -> MVar Buffer -> IO () -outputDrainer fromh toh buf = do - v <- tryIO $ B.hGetSome fromh 1024 - case v of - Right b | not (B.null b) -> do - modifyMVar_ buf $ addBuffer (toh, Output b) - outputDrainer fromh toh buf - _ -> do - modifyMVar_ buf $ pure . (++ [(toh, ReachedEnd)]) - hClose fromh +-- Drain output from the handle, and buffer it. +outputDrainer :: P.StdStream -> Handle -> Handle -> MVar Buffer -> IO () +outputDrainer ss fromh toh buf + | willOutput ss = go + | otherwise = atend + where + go = do + v <- tryIO $ B.hGetSome fromh 1024 + case v of + Right b | not (B.null b) -> do + modifyMVar_ buf $ addBuffer (toh, Output b) + go + _ -> atend + atend = do + modifyMVar_ buf $ pure . (++ [(toh, ReachedEnd)]) + hClose fromh -- Wait to lock output, and once we can, display everything -- that's put into buffer, until the end is signaled by Nothing @@ -262,8 +281,8 @@ addBuffer (toh, Output b) buf hClose h return ((toh, InTempFile tmp) : other) where - b' = B.concat (mapMaybe getOutput this) <> b - (this, other) = partition same buf + !b' = B.concat (mapMaybe getOutput this) <> b + !(this, other) = partition same buf same v = fst v == toh && case snd v of Output _ -> True _ -> False -- cgit v1.2.3