summaryrefslogtreecommitdiff
path: root/src/Utility
diff options
context:
space:
mode:
authorJoey Hess2015-10-28 19:38:41 -0400
committerJoey Hess2015-10-28 19:38:41 -0400
commit213cfc8f7253d6055883f6b3fb213cb4e0dffdc0 (patch)
tree9199d73eb8561a57cdd6578cfc89992d24bcc3d8 /src/Utility
parent80e28e5e4e97e6b11b54c9f086601f84c2d27440 (diff)
better lock taking using STM, and wait for concurrent processes writer threads on shutdown
Diffstat (limited to 'src/Utility')
-rw-r--r--src/Utility/ConcurrentOutput.hs158
1 files changed, 83 insertions, 75 deletions
diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs
index 091513d0..a8de8ed1 100644
--- a/src/Utility/ConcurrentOutput.hs
+++ b/src/Utility/ConcurrentOutput.hs
@@ -15,6 +15,7 @@ module Utility.ConcurrentOutput (
import System.IO
import System.Posix.IO
import System.Directory
+import System.Exit
import Control.Monad
import Control.Monad.IO.Class (liftIO, MonadIO)
import Control.Applicative
@@ -27,25 +28,28 @@ import Data.List
import Data.Monoid
import qualified Data.ByteString as B
import qualified System.Process as P
-import System.Exit
+import qualified Data.Set as S
import Utility.Monad
import Utility.Exception
+import Utility.FileSystemEncoding
data OutputHandle = OutputHandle
- { outputLock :: TMVar (Maybe Locker)
+ { outputLock :: TMVar Lock
+ , outputBuffer :: TMVar Buffer
+ , outputThreads :: TMVar (S.Set (Async ()))
}
-data Locker
- = GeneralLock
- | ProcessLock P.ProcessHandle
+data Lock = Locked
-- | A shared global variable for the OutputHandle.
{-# NOINLINE globalOutputHandle #-}
globalOutputHandle :: MVar OutputHandle
globalOutputHandle = unsafePerformIO $
newMVar =<< OutputHandle
- <$> newTMVarIO Nothing
+ <$> newEmptyTMVarIO
+ <*> newTMVarIO []
+ <*> newTMVarIO S.empty
-- | Gets the global OutputHandle.
getOutputHandle :: IO OutputHandle
@@ -64,60 +68,34 @@ takeOutputLock = void $ takeOutputLock' True
tryTakeOutputLock :: IO Bool
tryTakeOutputLock = takeOutputLock' False
-withLock :: (TMVar (Maybe Locker) -> STM a) -> IO a
+withLock :: (TMVar Lock -> STM a) -> IO a
withLock a = do
lck <- outputLock <$> getOutputHandle
atomically (a lck)
--- The lock TMVar is kept full normally, even if only with Nothing,
--- so if we take it here, that blocks anyone else from trying
--- to take the lock while we are checking it.
takeOutputLock' :: Bool -> IO Bool
-takeOutputLock' block = go =<< withLock tryTakeTMVar
- where
- go Nothing = whenblock waitlockchange
- -- Something has the lock. It may be stale, so check it.
- -- We must always be sure to fill the TMVar back with Just or Nothing.
- go (Just orig) = case orig of
- Nothing -> havelock
- (Just (ProcessLock h)) ->
- -- when process has exited, lock is stale
- ifM (isJust <$> P.getProcessExitCode h)
- ( havelock
- , if block
- then do
- void $ waitForProcessConcurrent h
- havelock
- else do
- withLock (`putTMVar` orig)
- return False
- )
- (Just GeneralLock) -> do
- withLock (`putTMVar` orig)
- whenblock waitlockchange
-
- havelock = do
- withLock (`putTMVar` Just GeneralLock)
- return True
-
- -- Wait for the lock to change, and try again.
- waitlockchange = do
- void $ withLock readTMVar
- takeOutputLock' block
-
- whenblock a = if block then a else return False
+takeOutputLock' block = do
+ locked <- withLock $ \l -> do
+ v <- tryTakeTMVar l
+ case v of
+ Just Locked
+ | block -> retry
+ | otherwise -> do
+ -- Restore value we took.
+ putTMVar l Locked
+ return False
+ Nothing -> do
+ putTMVar l Locked
+ return True
+ when locked $ do
+ bv <- outputBuffer <$> getOutputHandle
+ buf <- atomically $ swapTMVar bv []
+ emitBuffer stdout buf
+ return locked
-- | Only safe to call after taking the output lock.
dropOutputLock :: IO ()
-dropOutputLock = withLock $ \l -> do
- void $ takeTMVar l
- putTMVar l Nothing
-
--- | Only safe to call after takeOutputLock; updates the Locker.
-updateOutputLocker :: Locker -> IO ()
-updateOutputLocker locker = withLock $ \l -> do
- void $ takeTMVar l
- putTMVar l (Just locker)
+dropOutputLock = withLock $ void . takeTMVar
-- | Use this around any IO actions that use `outputConcurrent`
-- or `createProcessConcurrent`
@@ -127,9 +105,17 @@ updateOutputLocker locker = withLock $ \l -> do
withConcurrentOutput :: IO a -> IO a
withConcurrentOutput a = a `finally` drain
where
- -- Just taking the output lock is enough to ensure that anything
- -- that was buffering output has had a chance to flush its buffer.
- drain = lockOutput noop
+ -- Wait for all outputThreads to finish. Then, take the output lock
+ -- to ensure that nothing is currently generating output, and flush
+ -- any buffered output.
+ drain = do
+ v <- outputThreads <$> getOutputHandle
+ atomically $ do
+ r <- takeTMVar v
+ if r == S.empty
+ then return ()
+ else retry
+ lockOutput $ return ()
-- | Displays a string to stdout, and flush output so it's displayed.
--
@@ -140,10 +126,19 @@ withConcurrentOutput a = a `finally` drain
-- not block. It buffers the string, so it will be displayed once the other
-- writer is done.
outputConcurrent :: String -> IO ()
-outputConcurrent s = do
- putStr s
- hFlush stdout
- -- TODO
+outputConcurrent s = bracket setup cleanup go
+ where
+ setup = tryTakeOutputLock
+ cleanup False = return ()
+ cleanup True = dropOutputLock
+ go True = do
+ putStr s
+ hFlush stdout
+ go False = do
+ bv <- outputBuffer <$> getOutputHandle
+ oldbuf <- atomically $ takeTMVar bv
+ newbuf <- addBuffer (Output (B.pack (decodeW8NUL s))) oldbuf
+ atomically $ putTMVar bv newbuf
-- | This must be used to wait for processes started with
-- `createProcessConcurrent`.
@@ -191,10 +186,10 @@ createProcessConcurrent p
firstprocess = do
r@(_, _, _, h) <- P.createProcess p
`onException` dropOutputLock
- updateOutputLocker (ProcessLock h)
- -- Output lock is still held as we return; the process
- -- is running now, and once it exits the output lock will
- -- be stale and can then be taken by something else.
+ -- Wait for the process to exit and drop the lock.
+ void $ async $ do
+ void $ tryIO $ waitForProcessConcurrent h
+ dropOutputLock
return r
concurrentprocess = do
@@ -218,6 +213,7 @@ willOutput :: P.StdStream -> Bool
willOutput P.Inherit = True
willOutput _ = False
+-- Built up with newest seen output first.
type Buffer = [BufferedActivity]
data BufferedActivity
@@ -257,27 +253,39 @@ outputDrainer ss fromh buf bufsig
putTMVar bufsig ()
-- Wait to lock output, and once we can, display everything
--- that's put into the buffers.
+-- that's put into the buffers, until the end.
bufferWriter :: [(Handle, MVar Buffer, TMVar ())] -> IO ()
-bufferWriter = void . lockOutput . mapConcurrently go
+bufferWriter l = do
+ worker <- async $ void $ lockOutput $ mapConcurrently go l
+ v <- outputThreads <$> getOutputHandle
+ atomically $ do
+ s <- takeTMVar v
+ putTMVar v (S.insert worker s)
+ void $ async $ do
+ void $ waitCatch worker
+ atomically $ do
+ s <- takeTMVar v
+ putTMVar v (S.delete worker s)
where
go v@(outh, buf, bufsig) = do
- atomically $ takeTMVar bufsig
+ void $ atomically $ takeTMVar bufsig
l <- takeMVar buf
putMVar buf []
- forM_ (reverse l) $ \ba -> case ba of
- Output b -> do
- B.hPut outh b
- hFlush outh
- return ()
- InTempFile tmp -> do
- B.hPut outh =<< B.readFile tmp
- void $ tryWhenExists $ removeFile tmp
- ReachedEnd -> return ()
+ emitBuffer outh l
if any (== ReachedEnd) l
then return ()
else go v
+emitBuffer :: Handle -> Buffer -> IO ()
+emitBuffer outh l = forM_ (reverse l) $ \ba -> case ba of
+ Output b -> do
+ B.hPut outh b
+ hFlush outh
+ InTempFile tmp -> do
+ B.hPut outh =<< B.readFile tmp
+ void $ tryWhenExists $ removeFile tmp
+ ReachedEnd -> return ()
+
-- Adds a value to the Buffer. When adding Output to a Handle, it's cheaper
-- to combine it with any already buffered Output to that same Handle.
--