From b4f941ea7beaaaae8a4f0e13381a30e7af5ec3aa Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Sun, 1 Nov 2015 11:38:39 -0400 Subject: merge changes from concurrent-output --- src/Utility/ConcurrentOutput.hs | 349 +++++++++++++++++++++++++++------------- 1 file changed, 240 insertions(+), 109 deletions(-) diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index c24744a3..4676c2fa 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -1,5 +1,4 @@ -{-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances #-} -{-# OPTIONS_GHC -fno-warn-tabs #-} +{-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances, TupleSections #-} -- | -- Copyright: 2013 Joey Hess @@ -8,7 +7,7 @@ -- Concurrent output handling. -- -- > import Control.Concurrent.Async --- > import Control.Concurrent.Output +-- > import System.Console.Concurrent -- > -- > main = withConcurrentOutput $ -- > outputConcurrent "washed the car\n" @@ -18,13 +17,22 @@ -- > createProcessConcurrent (proc "ls" []) module Utility.ConcurrentOutput ( + -- * Concurrent output withConcurrentOutput, - flushConcurrentOutput, Outputable(..), outputConcurrent, createProcessConcurrent, waitForProcessConcurrent, + flushConcurrentOutput, lockOutput, + -- * Low level access to the output buffer + OutputBuffer, + StdHandle(..), + bufferOutputSTM, + outputBufferWaiterSTM, + waitAnyBuffer, + waitCompleteLines, + emitOutputBuffer, ) where import System.IO @@ -42,40 +50,38 @@ import Data.Maybe import Data.List import Data.Monoid import qualified System.Process as P -import qualified Data.Set as S -import qualified Data.ByteString as B import qualified Data.Text as T -import Data.Text.Encoding (encodeUtf8) +import qualified Data.Text.IO as T import Utility.Monad import Utility.Exception data OutputHandle = OutputHandle { outputLock :: TMVar Lock - , outputBuffer :: TMVar Buffer - , outputThreads :: TMVar (S.Set (Async ())) + , outputBuffer :: TMVar OutputBuffer + , errorBuffer :: TMVar OutputBuffer + , outputThreads :: TMVar Integer } data Lock = Locked -- | A shared global variable for the OutputHandle. {-# NOINLINE globalOutputHandle #-} -globalOutputHandle :: MVar OutputHandle -globalOutputHandle = unsafePerformIO $ - newMVar =<< OutputHandle - <$> newEmptyTMVarIO - <*> newTMVarIO [] - <*> newTMVarIO S.empty - --- | Gets the global OutputHandle. -getOutputHandle :: IO OutputHandle -getOutputHandle = readMVar globalOutputHandle - --- | Holds a lock while performing an action that will display output. --- While this is running, other threads that try to lockOutput will block, --- and calls to `outputConcurrent` and `createProcessConcurrent` --- will result in that concurrent output being buffered and not --- displayed until the action is done. +globalOutputHandle :: OutputHandle +globalOutputHandle = unsafePerformIO $ OutputHandle + <$> newEmptyTMVarIO + <*> newTMVarIO (OutputBuffer []) + <*> newTMVarIO (OutputBuffer []) + <*> newTMVarIO 0 + +-- | Holds a lock while performing an action. This allows the action to +-- perform its own output to the console, without using functions from this +-- module. +-- +-- While this is running, other threads that try to lockOutput will block. +-- Any calls to `outputConcurrent` and `createProcessConcurrent` will not +-- block, but the output will be buffered and displayed only once the +-- action is done. lockOutput :: (MonadIO m, MonadMask m) => m a -> m a lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock) @@ -88,9 +94,7 @@ tryTakeOutputLock :: IO Bool tryTakeOutputLock = takeOutputLock' False withLock :: (TMVar Lock -> STM a) -> IO a -withLock a = do - lck <- outputLock <$> getOutputHandle - atomically (a lck) +withLock a = atomically $ a (outputLock globalOutputHandle) takeOutputLock' :: Bool -> IO Bool takeOutputLock' block = do @@ -107,32 +111,37 @@ takeOutputLock' block = do putTMVar l Locked return True when locked $ do - bv <- outputBuffer <$> getOutputHandle - buf <- atomically $ swapTMVar bv [] - emitBuffer stdout buf + (outbuf, errbuf) <- atomically $ (,) + <$> swapTMVar (outputBuffer globalOutputHandle) (OutputBuffer []) + <*> swapTMVar (errorBuffer globalOutputHandle) (OutputBuffer []) + emitOutputBuffer StdOut outbuf + emitOutputBuffer StdErr errbuf return locked -- | Only safe to call after taking the output lock. dropOutputLock :: IO () dropOutputLock = withLock $ void . takeTMVar --- | Use this around any IO actions that use `outputConcurrent` +-- | Use this around any actions that use `outputConcurrent` -- or `createProcessConcurrent` -- -- This is necessary to ensure that buffered concurrent output actually -- gets displayed before the program exits. -withConcurrentOutput :: IO a -> IO a -withConcurrentOutput a = a `finally` flushConcurrentOutput +withConcurrentOutput :: (MonadIO m, MonadMask m) => m a -> m a +withConcurrentOutput a = a `finally` liftIO flushConcurrentOutput -- | Blocks until any processes started by `createProcessConcurrent` have --- finished, and any buffered output is displayed. +-- finished, and any buffered output is displayed. +-- +-- `withConcurrentOutput` calls this at the end; you can call it anytime +-- you want to flush output. flushConcurrentOutput :: IO () flushConcurrentOutput = do -- Wait for all outputThreads to finish. - v <- outputThreads <$> getOutputHandle + let v = outputThreads globalOutputHandle atomically $ do r <- takeTMVar v - if r == S.empty + if r <= 0 then putTMVar v r else retry -- Take output lock to ensure that nothing else is currently @@ -141,18 +150,18 @@ flushConcurrentOutput = do -- | Values that can be output. class Outputable v where - toOutput :: v -> B.ByteString - -instance Outputable B.ByteString where - toOutput = id + toOutput :: v -> T.Text instance Outputable T.Text where - toOutput = encodeUtf8 + toOutput = id instance Outputable String where toOutput = toOutput . T.pack --- | Displays a value to stdout, and flush output so it's displayed. +-- | Displays a value to stdout. +-- +-- No newline is appended to the value, so if you want a newline, be sure +-- to include it yourself. -- -- Uses locking to ensure that the whole output occurs atomically -- even when other threads are concurrently generating output. @@ -167,12 +176,12 @@ outputConcurrent v = bracket setup cleanup go cleanup False = return () cleanup True = dropOutputLock go True = do - B.hPut stdout (toOutput v) + T.hPutStr stdout (toOutput v) hFlush stdout go False = do - bv <- outputBuffer <$> getOutputHandle + let bv = outputBuffer globalOutputHandle oldbuf <- atomically $ takeTMVar bv - newbuf <- addBuffer (Output (toOutput v)) oldbuf + newbuf <- addOutputBuffer (Output (toOutput v)) oldbuf atomically $ putTMVar bv newbuf -- | This must be used to wait for processes started with @@ -200,11 +209,11 @@ waitForProcessConcurrent h = do -- A process is allowed to write to stdout and stderr in the usual -- way, assuming it can successfully take the output lock. -- --- When the output lock is held (by another concurrent process, +-- When the output lock is held (ie, by another concurrent process, -- or because `outputConcurrent` is being called at the same time), -- the process is instead run with its stdout and stderr -- redirected to a buffer. The buffered output will be displayed as soon --- as the output lock becomes free. +-- as the output lock becomes free, or after the command has finished. createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) createProcessConcurrent p | willOutput (P.std_out p) || willOutput (P.std_err p) = @@ -234,9 +243,11 @@ createProcessConcurrent p { P.std_out = rediroutput (P.std_out p) toouth , P.std_err = rediroutput (P.std_err p) toerrh } + registerOutputThread r <- P.createProcess p' - outbuf <- setupBuffer stdout toouth (P.std_out p) fromouth - errbuf <- setupBuffer stderr toerrh (P.std_err p) fromerrh + `onException` unregisterOutputThread + outbuf <- setupOutputBuffer StdOut toouth (P.std_out p) fromouth + errbuf <- setupOutputBuffer StdErr toerrh (P.std_err p) fromerrh void $ async $ bufferWriter [outbuf, errbuf] return r @@ -248,101 +259,221 @@ willOutput :: P.StdStream -> Bool willOutput P.Inherit = True willOutput _ = False --- Built up with newest seen output first. -type Buffer = [BufferedActivity] +-- | Buffered output. +data OutputBuffer = OutputBuffer [OutputBufferedActivity] + deriving (Eq) + +data StdHandle = StdOut | StdErr + +toHandle :: StdHandle -> Handle +toHandle StdOut = stdout +toHandle StdErr = stderr + +bufferFor :: StdHandle -> TMVar OutputBuffer +bufferFor StdOut = outputBuffer globalOutputHandle +bufferFor StdErr = errorBuffer globalOutputHandle -data BufferedActivity - = ReachedEnd - | Output B.ByteString - | InTempFile FilePath +data OutputBufferedActivity + = Output T.Text + | InTempFile + { tempFile :: FilePath + , endsInNewLine :: Bool + } deriving (Eq) -setupBuffer :: Handle -> Handle -> P.StdStream -> Handle -> IO (Handle, MVar Buffer, TMVar ()) -setupBuffer h toh ss fromh = do +data AtEnd = AtEnd + deriving Eq + +data BufSig = BufSig + +setupOutputBuffer :: StdHandle -> Handle -> P.StdStream -> Handle -> IO (StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd) +setupOutputBuffer h toh ss fromh = do hClose toh - buf <- newMVar [] + buf <- newMVar (OutputBuffer []) bufsig <- atomically newEmptyTMVar - void $ async $ outputDrainer ss fromh buf bufsig - return (h, buf, bufsig) + bufend <- atomically newEmptyTMVar + void $ async $ outputDrainer ss fromh buf bufsig bufend + return (h, buf, bufsig, bufend) -- Drain output from the handle, and buffer it. -outputDrainer :: P.StdStream -> Handle -> MVar Buffer -> TMVar () -> IO () -outputDrainer ss fromh buf bufsig +outputDrainer :: P.StdStream -> Handle -> MVar OutputBuffer -> TMVar BufSig -> TMVar AtEnd -> IO () +outputDrainer ss fromh buf bufsig bufend | willOutput ss = go | otherwise = atend where go = do - v <- tryIO $ B.hGetSome fromh 1048576 - case v of - Right b | not (B.null b) -> do - modifyMVar_ buf $ addBuffer (Output b) + t <- T.hGetChunk fromh + if T.null t + then atend + else do + modifyMVar_ buf $ addOutputBuffer (Output t) changed go - _ -> atend atend = do - modifyMVar_ buf $ pure . (ReachedEnd :) - changed + atomically $ putTMVar bufend AtEnd hClose fromh changed = atomically $ do void $ tryTakeTMVar bufsig - putTMVar bufsig () + putTMVar bufsig BufSig + +registerOutputThread :: IO () +registerOutputThread = do + let v = outputThreads globalOutputHandle + atomically $ putTMVar v . succ =<< takeTMVar v + +unregisterOutputThread :: IO () +unregisterOutputThread = do + let v = outputThreads globalOutputHandle + atomically $ putTMVar v . pred =<< takeTMVar v -- Wait to lock output, and once we can, display everything -- that's put into the buffers, until the end. -bufferWriter :: [(Handle, MVar Buffer, TMVar ())] -> IO () +-- +-- If end is reached before lock is taken, instead add the command's +-- buffers to the global outputBuffer and errorBuffer. +bufferWriter :: [(StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd)] -> IO () bufferWriter ts = do - worker <- async $ void $ lockOutput $ mapConcurrently go ts - v <- outputThreads <$> getOutputHandle - atomically $ do - s <- takeTMVar v - putTMVar v (S.insert worker s) + activitysig <- atomically newEmptyTMVar + worker1 <- async $ lockOutput $ + ifM (atomically $ tryPutTMVar activitysig ()) + ( void $ mapConcurrently displaybuf ts + , noop -- buffers already moved to global + ) + worker2 <- async $ void $ globalbuf activitysig void $ async $ do - void $ waitCatch worker - atomically $ do - s <- takeTMVar v - putTMVar v (S.delete worker s) + void $ waitCatch worker1 + void $ waitCatch worker2 + unregisterOutputThread where - go v@(outh, buf, bufsig) = do - void $ atomically $ takeTMVar bufsig + displaybuf v@(outh, buf, bufsig, bufend) = do + change <- atomically $ + (Right <$> takeTMVar bufsig) + `orElse` + (Left <$> takeTMVar bufend) l <- takeMVar buf - putMVar buf [] - emitBuffer outh l - if any (== ReachedEnd) l - then return () - else go v - -emitBuffer :: Handle -> Buffer -> IO () -emitBuffer outh l = forM_ (reverse l) $ \ba -> case ba of - Output b -> do - B.hPut outh b - hFlush outh - InTempFile tmp -> do - B.hPut outh =<< B.readFile tmp - void $ tryWhenExists $ removeFile tmp - ReachedEnd -> return () - --- Adds a value to the Buffer. When adding Output to a Handle, it's cheaper --- to combine it with any already buffered Output to that same Handle. + putMVar buf (OutputBuffer []) + emitOutputBuffer outh l + case change of + Right BufSig -> displaybuf v + Left AtEnd -> return () + globalbuf activitysig = do + ok <- atomically $ do + -- signal we're going to handle it + -- (returns false if the displaybuf already did) + ok <- tryPutTMVar activitysig () + -- wait for end of all buffers + when ok $ + mapM_ (\(_outh, _buf, _bufsig, bufend) -> takeTMVar bufend) ts + return ok + when ok $ do + -- add all of the command's buffered output to the + -- global output buffer, atomically + bs <- forM ts $ \(outh, buf, _bufsig, _bufend) -> + (outh,) <$> takeMVar buf + atomically $ + forM_ bs $ \(outh, b) -> + bufferOutputSTM' outh b + +-- Adds a value to the OutputBuffer. When adding Output to a Handle, +-- it's cheaper to combine it with any already buffered Output to that +-- same Handle. -- -- When the total buffered Output exceeds 1 mb in size, it's moved out of -- memory, to a temp file. This should only happen rarely, but is done to -- avoid some verbose process unexpectedly causing excessive memory use. -addBuffer :: BufferedActivity -> Buffer -> IO Buffer -addBuffer (Output b) buf - | B.length b' <= 1048576 = return (Output b' : other) +addOutputBuffer :: OutputBufferedActivity -> OutputBuffer -> IO OutputBuffer +addOutputBuffer (Output t) (OutputBuffer buf) + | T.length t' <= 1048576 = return $ OutputBuffer (Output t' : other) | otherwise = do tmpdir <- getTemporaryDirectory (tmp, h) <- openTempFile tmpdir "output.tmp" - B.hPut h b' + let !endnl = endsNewLine t' + let i = InTempFile + { tempFile = tmp + , endsInNewLine = endnl + } + T.hPutStr h t' hClose h - return (InTempFile tmp : other) + return $ OutputBuffer (i : other) where - !b' = B.concat (mapMaybe getOutput this) <> b + !t' = T.concat (mapMaybe getOutput this) <> t !(this, other) = partition isOutput buf isOutput v = case v of Output _ -> True _ -> False getOutput v = case v of - Output b'' -> Just b'' + Output t'' -> Just t'' _ -> Nothing -addBuffer v buf = return (v:buf) +addOutputBuffer v (OutputBuffer buf) = return $ OutputBuffer (v:buf) + +-- | Adds a value to the output buffer for later display. +-- +-- Note that buffering large quantities of data this way will keep it +-- resident in memory until it can be displayed. While `outputConcurrent` +-- uses temp files if the buffer gets too big, this STM function cannot do +-- so. +bufferOutputSTM :: Outputable v => StdHandle -> v -> STM () +bufferOutputSTM h v = bufferOutputSTM' h (OutputBuffer [Output (toOutput v)]) + +bufferOutputSTM' :: StdHandle -> OutputBuffer -> STM () +bufferOutputSTM' h (OutputBuffer newbuf) = do + (OutputBuffer buf) <- takeTMVar bv + putTMVar bv (OutputBuffer (newbuf ++ buf)) + where + bv = bufferFor h + +-- | A STM action that waits for some buffered output to become +-- available, and returns it. +-- +-- The function can select a subset of output when only some is desired; +-- the fst part is returned and the snd is left in the buffer. +-- +-- This will prevent it from being displayed in the usual way, so you'll +-- need to use `emitOutputBuffer` to display it yourself. +outputBufferWaiterSTM :: (OutputBuffer -> (OutputBuffer, OutputBuffer)) -> STM [(StdHandle, OutputBuffer)] +outputBufferWaiterSTM selector = do + bs <- forM hs $ \h -> do + let bv = bufferFor h + (selected, rest) <- selector <$> takeTMVar bv + putTMVar bv rest + return selected + if all (== OutputBuffer []) bs + then retry + else do + return (zip hs bs) + where + hs = [StdOut, StdErr] + +waitAnyBuffer :: OutputBuffer -> (OutputBuffer, OutputBuffer) +waitAnyBuffer b = (b, OutputBuffer []) + +-- | Use with `outputBufferWaiterSTM` to make it only return buffered +-- output that ends with a newline. Anything buffered without a newline +-- is left in the buffer. +waitCompleteLines :: OutputBuffer -> (OutputBuffer, OutputBuffer) +waitCompleteLines (OutputBuffer l) = + let (selected, rest) = span completeline l + in (OutputBuffer selected, OutputBuffer rest) + where + completeline (v@(InTempFile {})) = endsInNewLine v + completeline (Output b) = endsNewLine b + +endsNewLine :: T.Text -> Bool +endsNewLine t = not (T.null t) && T.last t == '\n' + +-- | Emits the content of the OutputBuffer to the Handle +-- +-- If you use this, you should use `lockOutput` to ensure you're the only +-- thread writing to the console. +emitOutputBuffer :: StdHandle -> OutputBuffer -> IO () +emitOutputBuffer stdh (OutputBuffer l) = + forM_ (reverse l) $ \ba -> case ba of + Output t -> emit t + InTempFile tmp _ -> do + emit =<< T.readFile tmp + void $ tryWhenExists $ removeFile tmp + where + outh = toHandle stdh + emit t = void $ tryIO $ do + T.hPutStr outh t + hFlush outh -- cgit v1.2.3