summaryrefslogtreecommitdiff
path: root/src/Utility
diff options
context:
space:
mode:
authorJoey Hess2015-11-01 11:38:39 -0400
committerJoey Hess2015-11-01 11:38:39 -0400
commitb4f941ea7beaaaae8a4f0e13381a30e7af5ec3aa (patch)
tree0d69915201392ca2468acd1d089fd4350a9e17de /src/Utility
parent58dca9518993b0bcd380e91ae7080f59dd6d3245 (diff)
merge changes from concurrent-output
Diffstat (limited to 'src/Utility')
-rw-r--r--src/Utility/ConcurrentOutput.hs349
1 files changed, 240 insertions, 109 deletions
diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs
index c24744a3..4676c2fa 100644
--- a/src/Utility/ConcurrentOutput.hs
+++ b/src/Utility/ConcurrentOutput.hs
@@ -1,5 +1,4 @@
-{-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances #-}
-{-# OPTIONS_GHC -fno-warn-tabs #-}
+{-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances, TupleSections #-}
-- |
-- Copyright: 2013 Joey Hess <id@joeyh.name>
@@ -8,7 +7,7 @@
-- Concurrent output handling.
--
-- > import Control.Concurrent.Async
--- > import Control.Concurrent.Output
+-- > import System.Console.Concurrent
-- >
-- > main = withConcurrentOutput $
-- > outputConcurrent "washed the car\n"
@@ -18,13 +17,22 @@
-- > createProcessConcurrent (proc "ls" [])
module Utility.ConcurrentOutput (
+ -- * Concurrent output
withConcurrentOutput,
- flushConcurrentOutput,
Outputable(..),
outputConcurrent,
createProcessConcurrent,
waitForProcessConcurrent,
+ flushConcurrentOutput,
lockOutput,
+ -- * Low level access to the output buffer
+ OutputBuffer,
+ StdHandle(..),
+ bufferOutputSTM,
+ outputBufferWaiterSTM,
+ waitAnyBuffer,
+ waitCompleteLines,
+ emitOutputBuffer,
) where
import System.IO
@@ -42,40 +50,38 @@ import Data.Maybe
import Data.List
import Data.Monoid
import qualified System.Process as P
-import qualified Data.Set as S
-import qualified Data.ByteString as B
import qualified Data.Text as T
-import Data.Text.Encoding (encodeUtf8)
+import qualified Data.Text.IO as T
import Utility.Monad
import Utility.Exception
data OutputHandle = OutputHandle
{ outputLock :: TMVar Lock
- , outputBuffer :: TMVar Buffer
- , outputThreads :: TMVar (S.Set (Async ()))
+ , outputBuffer :: TMVar OutputBuffer
+ , errorBuffer :: TMVar OutputBuffer
+ , outputThreads :: TMVar Integer
}
data Lock = Locked
-- | A shared global variable for the OutputHandle.
{-# NOINLINE globalOutputHandle #-}
-globalOutputHandle :: MVar OutputHandle
-globalOutputHandle = unsafePerformIO $
- newMVar =<< OutputHandle
- <$> newEmptyTMVarIO
- <*> newTMVarIO []
- <*> newTMVarIO S.empty
-
--- | Gets the global OutputHandle.
-getOutputHandle :: IO OutputHandle
-getOutputHandle = readMVar globalOutputHandle
-
--- | Holds a lock while performing an action that will display output.
--- While this is running, other threads that try to lockOutput will block,
--- and calls to `outputConcurrent` and `createProcessConcurrent`
--- will result in that concurrent output being buffered and not
--- displayed until the action is done.
+globalOutputHandle :: OutputHandle
+globalOutputHandle = unsafePerformIO $ OutputHandle
+ <$> newEmptyTMVarIO
+ <*> newTMVarIO (OutputBuffer [])
+ <*> newTMVarIO (OutputBuffer [])
+ <*> newTMVarIO 0
+
+-- | Holds a lock while performing an action. This allows the action to
+-- perform its own output to the console, without using functions from this
+-- module.
+--
+-- While this is running, other threads that try to lockOutput will block.
+-- Any calls to `outputConcurrent` and `createProcessConcurrent` will not
+-- block, but the output will be buffered and displayed only once the
+-- action is done.
lockOutput :: (MonadIO m, MonadMask m) => m a -> m a
lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock)
@@ -88,9 +94,7 @@ tryTakeOutputLock :: IO Bool
tryTakeOutputLock = takeOutputLock' False
withLock :: (TMVar Lock -> STM a) -> IO a
-withLock a = do
- lck <- outputLock <$> getOutputHandle
- atomically (a lck)
+withLock a = atomically $ a (outputLock globalOutputHandle)
takeOutputLock' :: Bool -> IO Bool
takeOutputLock' block = do
@@ -107,32 +111,37 @@ takeOutputLock' block = do
putTMVar l Locked
return True
when locked $ do
- bv <- outputBuffer <$> getOutputHandle
- buf <- atomically $ swapTMVar bv []
- emitBuffer stdout buf
+ (outbuf, errbuf) <- atomically $ (,)
+ <$> swapTMVar (outputBuffer globalOutputHandle) (OutputBuffer [])
+ <*> swapTMVar (errorBuffer globalOutputHandle) (OutputBuffer [])
+ emitOutputBuffer StdOut outbuf
+ emitOutputBuffer StdErr errbuf
return locked
-- | Only safe to call after taking the output lock.
dropOutputLock :: IO ()
dropOutputLock = withLock $ void . takeTMVar
--- | Use this around any IO actions that use `outputConcurrent`
+-- | Use this around any actions that use `outputConcurrent`
-- or `createProcessConcurrent`
--
-- This is necessary to ensure that buffered concurrent output actually
-- gets displayed before the program exits.
-withConcurrentOutput :: IO a -> IO a
-withConcurrentOutput a = a `finally` flushConcurrentOutput
+withConcurrentOutput :: (MonadIO m, MonadMask m) => m a -> m a
+withConcurrentOutput a = a `finally` liftIO flushConcurrentOutput
-- | Blocks until any processes started by `createProcessConcurrent` have
--- finished, and any buffered output is displayed.
+-- finished, and any buffered output is displayed.
+--
+-- `withConcurrentOutput` calls this at the end; you can call it anytime
+-- you want to flush output.
flushConcurrentOutput :: IO ()
flushConcurrentOutput = do
-- Wait for all outputThreads to finish.
- v <- outputThreads <$> getOutputHandle
+ let v = outputThreads globalOutputHandle
atomically $ do
r <- takeTMVar v
- if r == S.empty
+ if r <= 0
then putTMVar v r
else retry
-- Take output lock to ensure that nothing else is currently
@@ -141,18 +150,18 @@ flushConcurrentOutput = do
-- | Values that can be output.
class Outputable v where
- toOutput :: v -> B.ByteString
-
-instance Outputable B.ByteString where
- toOutput = id
+ toOutput :: v -> T.Text
instance Outputable T.Text where
- toOutput = encodeUtf8
+ toOutput = id
instance Outputable String where
toOutput = toOutput . T.pack
--- | Displays a value to stdout, and flush output so it's displayed.
+-- | Displays a value to stdout.
+--
+-- No newline is appended to the value, so if you want a newline, be sure
+-- to include it yourself.
--
-- Uses locking to ensure that the whole output occurs atomically
-- even when other threads are concurrently generating output.
@@ -167,12 +176,12 @@ outputConcurrent v = bracket setup cleanup go
cleanup False = return ()
cleanup True = dropOutputLock
go True = do
- B.hPut stdout (toOutput v)
+ T.hPutStr stdout (toOutput v)
hFlush stdout
go False = do
- bv <- outputBuffer <$> getOutputHandle
+ let bv = outputBuffer globalOutputHandle
oldbuf <- atomically $ takeTMVar bv
- newbuf <- addBuffer (Output (toOutput v)) oldbuf
+ newbuf <- addOutputBuffer (Output (toOutput v)) oldbuf
atomically $ putTMVar bv newbuf
-- | This must be used to wait for processes started with
@@ -200,11 +209,11 @@ waitForProcessConcurrent h = do
-- A process is allowed to write to stdout and stderr in the usual
-- way, assuming it can successfully take the output lock.
--
--- When the output lock is held (by another concurrent process,
+-- When the output lock is held (ie, by another concurrent process,
-- or because `outputConcurrent` is being called at the same time),
-- the process is instead run with its stdout and stderr
-- redirected to a buffer. The buffered output will be displayed as soon
--- as the output lock becomes free.
+-- as the output lock becomes free, or after the command has finished.
createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
createProcessConcurrent p
| willOutput (P.std_out p) || willOutput (P.std_err p) =
@@ -234,9 +243,11 @@ createProcessConcurrent p
{ P.std_out = rediroutput (P.std_out p) toouth
, P.std_err = rediroutput (P.std_err p) toerrh
}
+ registerOutputThread
r <- P.createProcess p'
- outbuf <- setupBuffer stdout toouth (P.std_out p) fromouth
- errbuf <- setupBuffer stderr toerrh (P.std_err p) fromerrh
+ `onException` unregisterOutputThread
+ outbuf <- setupOutputBuffer StdOut toouth (P.std_out p) fromouth
+ errbuf <- setupOutputBuffer StdErr toerrh (P.std_err p) fromerrh
void $ async $ bufferWriter [outbuf, errbuf]
return r
@@ -248,101 +259,221 @@ willOutput :: P.StdStream -> Bool
willOutput P.Inherit = True
willOutput _ = False
--- Built up with newest seen output first.
-type Buffer = [BufferedActivity]
+-- | Buffered output.
+data OutputBuffer = OutputBuffer [OutputBufferedActivity]
+ deriving (Eq)
+
+data StdHandle = StdOut | StdErr
+
+toHandle :: StdHandle -> Handle
+toHandle StdOut = stdout
+toHandle StdErr = stderr
+
+bufferFor :: StdHandle -> TMVar OutputBuffer
+bufferFor StdOut = outputBuffer globalOutputHandle
+bufferFor StdErr = errorBuffer globalOutputHandle
-data BufferedActivity
- = ReachedEnd
- | Output B.ByteString
- | InTempFile FilePath
+data OutputBufferedActivity
+ = Output T.Text
+ | InTempFile
+ { tempFile :: FilePath
+ , endsInNewLine :: Bool
+ }
deriving (Eq)
-setupBuffer :: Handle -> Handle -> P.StdStream -> Handle -> IO (Handle, MVar Buffer, TMVar ())
-setupBuffer h toh ss fromh = do
+data AtEnd = AtEnd
+ deriving Eq
+
+data BufSig = BufSig
+
+setupOutputBuffer :: StdHandle -> Handle -> P.StdStream -> Handle -> IO (StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd)
+setupOutputBuffer h toh ss fromh = do
hClose toh
- buf <- newMVar []
+ buf <- newMVar (OutputBuffer [])
bufsig <- atomically newEmptyTMVar
- void $ async $ outputDrainer ss fromh buf bufsig
- return (h, buf, bufsig)
+ bufend <- atomically newEmptyTMVar
+ void $ async $ outputDrainer ss fromh buf bufsig bufend
+ return (h, buf, bufsig, bufend)
-- Drain output from the handle, and buffer it.
-outputDrainer :: P.StdStream -> Handle -> MVar Buffer -> TMVar () -> IO ()
-outputDrainer ss fromh buf bufsig
+outputDrainer :: P.StdStream -> Handle -> MVar OutputBuffer -> TMVar BufSig -> TMVar AtEnd -> IO ()
+outputDrainer ss fromh buf bufsig bufend
| willOutput ss = go
| otherwise = atend
where
go = do
- v <- tryIO $ B.hGetSome fromh 1048576
- case v of
- Right b | not (B.null b) -> do
- modifyMVar_ buf $ addBuffer (Output b)
+ t <- T.hGetChunk fromh
+ if T.null t
+ then atend
+ else do
+ modifyMVar_ buf $ addOutputBuffer (Output t)
changed
go
- _ -> atend
atend = do
- modifyMVar_ buf $ pure . (ReachedEnd :)
- changed
+ atomically $ putTMVar bufend AtEnd
hClose fromh
changed = atomically $ do
void $ tryTakeTMVar bufsig
- putTMVar bufsig ()
+ putTMVar bufsig BufSig
+
+registerOutputThread :: IO ()
+registerOutputThread = do
+ let v = outputThreads globalOutputHandle
+ atomically $ putTMVar v . succ =<< takeTMVar v
+
+unregisterOutputThread :: IO ()
+unregisterOutputThread = do
+ let v = outputThreads globalOutputHandle
+ atomically $ putTMVar v . pred =<< takeTMVar v
-- Wait to lock output, and once we can, display everything
-- that's put into the buffers, until the end.
-bufferWriter :: [(Handle, MVar Buffer, TMVar ())] -> IO ()
+--
+-- If end is reached before lock is taken, instead add the command's
+-- buffers to the global outputBuffer and errorBuffer.
+bufferWriter :: [(StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd)] -> IO ()
bufferWriter ts = do
- worker <- async $ void $ lockOutput $ mapConcurrently go ts
- v <- outputThreads <$> getOutputHandle
- atomically $ do
- s <- takeTMVar v
- putTMVar v (S.insert worker s)
+ activitysig <- atomically newEmptyTMVar
+ worker1 <- async $ lockOutput $
+ ifM (atomically $ tryPutTMVar activitysig ())
+ ( void $ mapConcurrently displaybuf ts
+ , noop -- buffers already moved to global
+ )
+ worker2 <- async $ void $ globalbuf activitysig
void $ async $ do
- void $ waitCatch worker
- atomically $ do
- s <- takeTMVar v
- putTMVar v (S.delete worker s)
+ void $ waitCatch worker1
+ void $ waitCatch worker2
+ unregisterOutputThread
where
- go v@(outh, buf, bufsig) = do
- void $ atomically $ takeTMVar bufsig
+ displaybuf v@(outh, buf, bufsig, bufend) = do
+ change <- atomically $
+ (Right <$> takeTMVar bufsig)
+ `orElse`
+ (Left <$> takeTMVar bufend)
l <- takeMVar buf
- putMVar buf []
- emitBuffer outh l
- if any (== ReachedEnd) l
- then return ()
- else go v
-
-emitBuffer :: Handle -> Buffer -> IO ()
-emitBuffer outh l = forM_ (reverse l) $ \ba -> case ba of
- Output b -> do
- B.hPut outh b
- hFlush outh
- InTempFile tmp -> do
- B.hPut outh =<< B.readFile tmp
- void $ tryWhenExists $ removeFile tmp
- ReachedEnd -> return ()
-
--- Adds a value to the Buffer. When adding Output to a Handle, it's cheaper
--- to combine it with any already buffered Output to that same Handle.
+ putMVar buf (OutputBuffer [])
+ emitOutputBuffer outh l
+ case change of
+ Right BufSig -> displaybuf v
+ Left AtEnd -> return ()
+ globalbuf activitysig = do
+ ok <- atomically $ do
+ -- signal we're going to handle it
+ -- (returns false if the displaybuf already did)
+ ok <- tryPutTMVar activitysig ()
+ -- wait for end of all buffers
+ when ok $
+ mapM_ (\(_outh, _buf, _bufsig, bufend) -> takeTMVar bufend) ts
+ return ok
+ when ok $ do
+ -- add all of the command's buffered output to the
+ -- global output buffer, atomically
+ bs <- forM ts $ \(outh, buf, _bufsig, _bufend) ->
+ (outh,) <$> takeMVar buf
+ atomically $
+ forM_ bs $ \(outh, b) ->
+ bufferOutputSTM' outh b
+
+-- Adds a value to the OutputBuffer. When adding Output to a Handle,
+-- it's cheaper to combine it with any already buffered Output to that
+-- same Handle.
--
-- When the total buffered Output exceeds 1 mb in size, it's moved out of
-- memory, to a temp file. This should only happen rarely, but is done to
-- avoid some verbose process unexpectedly causing excessive memory use.
-addBuffer :: BufferedActivity -> Buffer -> IO Buffer
-addBuffer (Output b) buf
- | B.length b' <= 1048576 = return (Output b' : other)
+addOutputBuffer :: OutputBufferedActivity -> OutputBuffer -> IO OutputBuffer
+addOutputBuffer (Output t) (OutputBuffer buf)
+ | T.length t' <= 1048576 = return $ OutputBuffer (Output t' : other)
| otherwise = do
tmpdir <- getTemporaryDirectory
(tmp, h) <- openTempFile tmpdir "output.tmp"
- B.hPut h b'
+ let !endnl = endsNewLine t'
+ let i = InTempFile
+ { tempFile = tmp
+ , endsInNewLine = endnl
+ }
+ T.hPutStr h t'
hClose h
- return (InTempFile tmp : other)
+ return $ OutputBuffer (i : other)
where
- !b' = B.concat (mapMaybe getOutput this) <> b
+ !t' = T.concat (mapMaybe getOutput this) <> t
!(this, other) = partition isOutput buf
isOutput v = case v of
Output _ -> True
_ -> False
getOutput v = case v of
- Output b'' -> Just b''
+ Output t'' -> Just t''
_ -> Nothing
-addBuffer v buf = return (v:buf)
+addOutputBuffer v (OutputBuffer buf) = return $ OutputBuffer (v:buf)
+
+-- | Adds a value to the output buffer for later display.
+--
+-- Note that buffering large quantities of data this way will keep it
+-- resident in memory until it can be displayed. While `outputConcurrent`
+-- uses temp files if the buffer gets too big, this STM function cannot do
+-- so.
+bufferOutputSTM :: Outputable v => StdHandle -> v -> STM ()
+bufferOutputSTM h v = bufferOutputSTM' h (OutputBuffer [Output (toOutput v)])
+
+bufferOutputSTM' :: StdHandle -> OutputBuffer -> STM ()
+bufferOutputSTM' h (OutputBuffer newbuf) = do
+ (OutputBuffer buf) <- takeTMVar bv
+ putTMVar bv (OutputBuffer (newbuf ++ buf))
+ where
+ bv = bufferFor h
+
+-- | A STM action that waits for some buffered output to become
+-- available, and returns it.
+--
+-- The function can select a subset of output when only some is desired;
+-- the fst part is returned and the snd is left in the buffer.
+--
+-- This will prevent it from being displayed in the usual way, so you'll
+-- need to use `emitOutputBuffer` to display it yourself.
+outputBufferWaiterSTM :: (OutputBuffer -> (OutputBuffer, OutputBuffer)) -> STM [(StdHandle, OutputBuffer)]
+outputBufferWaiterSTM selector = do
+ bs <- forM hs $ \h -> do
+ let bv = bufferFor h
+ (selected, rest) <- selector <$> takeTMVar bv
+ putTMVar bv rest
+ return selected
+ if all (== OutputBuffer []) bs
+ then retry
+ else do
+ return (zip hs bs)
+ where
+ hs = [StdOut, StdErr]
+
+waitAnyBuffer :: OutputBuffer -> (OutputBuffer, OutputBuffer)
+waitAnyBuffer b = (b, OutputBuffer [])
+
+-- | Use with `outputBufferWaiterSTM` to make it only return buffered
+-- output that ends with a newline. Anything buffered without a newline
+-- is left in the buffer.
+waitCompleteLines :: OutputBuffer -> (OutputBuffer, OutputBuffer)
+waitCompleteLines (OutputBuffer l) =
+ let (selected, rest) = span completeline l
+ in (OutputBuffer selected, OutputBuffer rest)
+ where
+ completeline (v@(InTempFile {})) = endsInNewLine v
+ completeline (Output b) = endsNewLine b
+
+endsNewLine :: T.Text -> Bool
+endsNewLine t = not (T.null t) && T.last t == '\n'
+
+-- | Emits the content of the OutputBuffer to the Handle
+--
+-- If you use this, you should use `lockOutput` to ensure you're the only
+-- thread writing to the console.
+emitOutputBuffer :: StdHandle -> OutputBuffer -> IO ()
+emitOutputBuffer stdh (OutputBuffer l) =
+ forM_ (reverse l) $ \ba -> case ba of
+ Output t -> emit t
+ InTempFile tmp _ -> do
+ emit =<< T.readFile tmp
+ void $ tryWhenExists $ removeFile tmp
+ where
+ outh = toHandle stdh
+ emit t = void $ tryIO $ do
+ T.hPutStr outh t
+ hFlush outh