summaryrefslogtreecommitdiff
path: root/src/Utility
diff options
context:
space:
mode:
authorJoey Hess2015-10-28 14:46:17 -0400
committerJoey Hess2015-10-28 14:46:17 -0400
commit111ea88d4d7c54e9ab7950962ad22528d54dd959 (patch)
tree69053673ab24be9eb61f0ce87c23d2d84b108ae7 /src/Utility
parent68dbfe1b08c9cf1d976ac84ea53817c54fcd3479 (diff)
fix bad MVar use, use STM
I had 2 MVars both involved in the same lock, and it seemed intractable to avoid deadlocks with them. STM makes it easy. At this point, the concurrent process stuff seems to work pretty well, but I'm not 100% sure it's not got some bugs.
Diffstat (limited to 'src/Utility')
-rw-r--r--src/Utility/ConcurrentOutput.hs173
1 files changed, 96 insertions, 77 deletions
diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs
index c6550b84..5535066f 100644
--- a/src/Utility/ConcurrentOutput.hs
+++ b/src/Utility/ConcurrentOutput.hs
@@ -1,3 +1,5 @@
+{-# LANGUAGE BangPatterns #-}
+
-- | Concurrent output handling.
module Utility.ConcurrentOutput (
@@ -14,6 +16,7 @@ import Control.Monad.IO.Class (liftIO, MonadIO)
import Control.Applicative
import System.IO.Unsafe (unsafePerformIO)
import Control.Concurrent
+import Control.Concurrent.STM
import Control.Concurrent.Async
import Data.Maybe
import Data.List
@@ -25,21 +28,23 @@ import Utility.Monad
import Utility.Exception
data OutputHandle = OutputHandle
- { outputLock :: MVar () -- ^ empty when locked
- , outputLockedBy :: MVar Locker
+ { outputLock :: TMVar (Maybe Locker)
}
data Locker
= GeneralLock
- | ProcessLock P.ProcessHandle
+ | ProcessLock P.ProcessHandle String
+
+instance Show Locker where
+ show GeneralLock = "GeneralLock"
+ show (ProcessLock _ cmd) = "ProcessLock " ++ cmd
-- | A shared global variable for the OutputHandle.
{-# NOINLINE globalOutputHandle #-}
globalOutputHandle :: MVar OutputHandle
globalOutputHandle = unsafePerformIO $
newMVar =<< OutputHandle
- <$> newMVar ()
- <*> newEmptyMVar
+ <$> newTMVarIO Nothing
-- | Gets the global OutputHandle.
getOutputHandle :: IO OutputHandle
@@ -58,61 +63,69 @@ takeOutputLock = void $ takeOutputLock' True
tryTakeOutputLock :: IO Bool
tryTakeOutputLock = takeOutputLock' False
-takeOutputLock' :: Bool -> IO Bool
-takeOutputLock' block = do
+withLock :: (TMVar (Maybe Locker) -> STM a) -> IO a
+withLock a = do
lck <- outputLock <$> getOutputHandle
- go =<< tryTakeMVar lck
+ atomically (a lck)
+
+-- The lock TMVar is kept full normally, even if only with Nothing,
+-- so if we take it here, that blocks anyone else from trying
+-- to take the lock while we are checking it.
+takeOutputLock' :: Bool -> IO Bool
+takeOutputLock' block = go =<< withLock tryTakeTMVar
where
- -- lck was full, and we've emptied it, so we hold the lock now.
- go (Just ()) = havelock
- -- lck is empty, so someone else is holding the lock.
- go Nothing = do
- lcker <- outputLockedBy <$> getOutputHandle
- v' <- tryTakeMVar lcker
- case v' of
- Just (ProcessLock h) ->
- -- if process has exited, lock is stale
- ifM (isJust <$> P.getProcessExitCode h)
- ( havelock
- , if block
- then do
- void $ P.waitForProcess h
- havelock
- else do
- putMVar lcker (ProcessLock h)
- return False
- )
- Just GeneralLock -> do
- putMVar lcker GeneralLock
- whenblock waitlock
- Nothing -> whenblock waitlock
+ go Nothing = whenblock waitlock
+ -- Something has the lock. It may be stale, so check it.
+ -- We must always be sure to fill the TMVar back with Just or Nothing.
+ go (Just orig) = case orig of
+ Nothing -> havelock
+ (Just (ProcessLock h _)) ->
+ -- when process has exited, lock is stale
+ ifM (isJust <$> P.getProcessExitCode h)
+ ( havelock
+ , if block
+ then do
+ hPutStr stderr "WAITFORPROCESS in lock"
+ hFlush stderr
+ void $ P.waitForProcess h
+ hPutStr stderr "WAITFORPROCESS in lock done"
+ hFlush stderr
+ havelock
+ else do
+ withLock (`putTMVar` orig)
+ return False
+ )
+ (Just GeneralLock) -> do
+ withLock (`putTMVar` orig)
+ whenblock waitlock
havelock = do
- updateOutputLocker GeneralLock
+ withLock (`putTMVar` Just GeneralLock)
return True
- waitlock = do
- -- Wait for current lock holder to relinquish
- -- it and take the lock.
- lck <- outputLock <$> getOutputHandle
- takeMVar lck
- havelock
+
+ -- Wait for current lock holder (if any) to relinquish
+ -- it and take the lock for ourselves.
+ waitlock = withLock $ \l -> do
+ v <- tryTakeTMVar l
+ case v of
+ Just (Just _) -> retry
+ _ -> do
+ putTMVar l (Just GeneralLock)
+ return True
+
whenblock a = if block then a else return False
-- | Only safe to call after taking the output lock.
dropOutputLock :: IO ()
-dropOutputLock = do
- lcker <- outputLockedBy <$> getOutputHandle
- lck <- outputLock <$> getOutputHandle
- void $ takeMVar lcker
- putMVar lck ()
+dropOutputLock = withLock $ \l -> do
+ void $ takeTMVar l
+ putTMVar l Nothing
-- | Only safe to call after takeOutputLock; updates the Locker.
updateOutputLocker :: Locker -> IO ()
-updateOutputLocker l = do
- lcker <- outputLockedBy <$> getOutputHandle
- void $ tryTakeMVar lcker
- putMVar lcker l
- modifyMVar_ lcker (const $ return l)
+updateOutputLocker locker = withLock $ \l -> do
+ void $ takeTMVar l
+ putTMVar l (Just locker)
-- | Use this around any IO actions that use `outputConcurrent`
-- or `createProcessConcurrent`
@@ -124,7 +137,7 @@ withConcurrentOutput a = a `finally` drain
where
-- Just taking the output lock is enough to ensure that anything
-- that was buffering output has had a chance to flush its buffer.
- drain = lockOutput (return ())
+ drain = lockOutput noop
-- | Displays a string to stdout, and flush output so it's displayed.
--
@@ -158,28 +171,25 @@ outputConcurrent s = do
-- as the output lock becomes free.
createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
createProcessConcurrent p
- | willoutput (P.std_out p) || willoutput (P.std_err p) =
+ | willOutput (P.std_out p) || willOutput (P.std_err p) =
ifM tryTakeOutputLock
- ( do
- hPutStrLn stderr "IS NOT CONCURRENT"
- firstprocess
- , do
- hPutStrLn stderr "IS CONCURRENT"
- concurrentprocess
+ ( firstprocess
+ , concurrentprocess
)
| otherwise = P.createProcess p
where
- willoutput P.Inherit = True
- willoutput _ = False
+ rediroutput ss h
+ | willOutput ss = P.UseHandle h
+ | otherwise = ss
- rediroutput str h
- | willoutput str = P.UseHandle h
- | otherwise = str
+ cmd = case P.cmdspec p of
+ P.ShellCommand s -> s
+ P.RawCommand c ps -> unwords (c:ps)
firstprocess = do
r@(_, _, _, h) <- P.createProcess p
`onException` dropOutputLock
- updateOutputLocker (ProcessLock h)
+ updateOutputLocker (ProcessLock h cmd)
-- Output lock is still held as we return; the process
-- is running now, and once it exits the output lock will
-- be stale and can then be taken by something else.
@@ -196,8 +206,8 @@ createProcessConcurrent p
hClose toouth
hClose toerrh
buf <- newMVar []
- void $ async $ outputDrainer fromouth stdout buf
- void $ async $ outputDrainer fromerrh stderr buf
+ void $ async $ outputDrainer (P.std_out p) fromouth stdout buf
+ void $ async $ outputDrainer (P.std_err p) fromerrh stderr buf
void $ async $ bufferWriter buf
return r
@@ -205,6 +215,10 @@ createProcessConcurrent p
(from, to) <- createPipe
(,) <$> fdToHandle to <*> fdToHandle from
+willOutput :: P.StdStream -> Bool
+willOutput P.Inherit = True
+willOutput _ = False
+
type Buffer = [(Handle, BufferedActivity)]
data BufferedActivity
@@ -213,17 +227,22 @@ data BufferedActivity
| InTempFile FilePath
deriving (Eq)
--- Drain output from the handle, and buffer it in memory.
-outputDrainer :: Handle -> Handle -> MVar Buffer -> IO ()
-outputDrainer fromh toh buf = do
- v <- tryIO $ B.hGetSome fromh 1024
- case v of
- Right b | not (B.null b) -> do
- modifyMVar_ buf $ addBuffer (toh, Output b)
- outputDrainer fromh toh buf
- _ -> do
- modifyMVar_ buf $ pure . (++ [(toh, ReachedEnd)])
- hClose fromh
+-- Drain output from the handle, and buffer it.
+outputDrainer :: P.StdStream -> Handle -> Handle -> MVar Buffer -> IO ()
+outputDrainer ss fromh toh buf
+ | willOutput ss = go
+ | otherwise = atend
+ where
+ go = do
+ v <- tryIO $ B.hGetSome fromh 1024
+ case v of
+ Right b | not (B.null b) -> do
+ modifyMVar_ buf $ addBuffer (toh, Output b)
+ go
+ _ -> atend
+ atend = do
+ modifyMVar_ buf $ pure . (++ [(toh, ReachedEnd)])
+ hClose fromh
-- Wait to lock output, and once we can, display everything
-- that's put into buffer, until the end is signaled by Nothing
@@ -262,8 +281,8 @@ addBuffer (toh, Output b) buf
hClose h
return ((toh, InTempFile tmp) : other)
where
- b' = B.concat (mapMaybe getOutput this) <> b
- (this, other) = partition same buf
+ !b' = B.concat (mapMaybe getOutput this) <> b
+ !(this, other) = partition same buf
same v = fst v == toh && case snd v of
Output _ -> True
_ -> False