From 4e84fa68e3ea2a11e85d09860f2d6440d91e27d1 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 11:04:47 -0400 Subject: don't truncate over-large output; swap to temp files --- src/Utility/ConcurrentOutput.hs | 78 ++++++++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 33 deletions(-) (limited to 'src/Utility') diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index cf1d166e..186f881f 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -1,7 +1,4 @@ -- | Concurrent output handling. --- --- When two threads both try to display a message concurrently, --- the messages will be displayed sequentially. module Utility.ConcurrentOutput ( lockOutput, @@ -10,6 +7,7 @@ module Utility.ConcurrentOutput ( import System.IO import System.Posix.IO +import System.Directory import Control.Monad import Control.Monad.IO.Class (liftIO, MonadIO) import Control.Applicative @@ -17,7 +15,6 @@ import System.IO.Unsafe (unsafePerformIO) import Control.Concurrent import Control.Concurrent.Async import Data.Maybe -import Data.Char import Data.List import Data.Monoid import qualified Data.ByteString as B @@ -122,7 +119,7 @@ updateOutputLocker l = do -- -- The first process is allowed to write to stdout and stderr in the usual way. -- --- However, if another process runs concurrently with the +-- However, if another process is run concurrently with the -- first, any stdout or stderr that would have been displayed by it is -- instead buffered. The buffered output will be displayed the next time it -- is safe to do so (ie, after the first process exits). @@ -171,7 +168,13 @@ createProcessConcurrent p (from, to) <- createPipe (,) <$> fdToHandle to <*> fdToHandle from -type Buffer = [(Handle, Maybe B.ByteString)] +type Buffer = [(Handle, BufferedActivity)] + +data BufferedActivity + = ReachedEnd + | Output B.ByteString + | InTempFile FilePath + deriving (Eq) -- Drain output from the handle, and buffer it in memory. outputDrainer :: Handle -> Handle -> MVar Buffer -> IO () @@ -179,10 +182,10 @@ outputDrainer fromh toh buf = do v <- tryIO $ B.hGetSome fromh 1024 case v of Right b | not (B.null b) -> do - modifyMVar_ buf (pure . addBuffer (toh, Just b)) + modifyMVar_ buf $ addBuffer (toh, Output b) outputDrainer fromh toh buf _ -> do - modifyMVar_ buf (pure . (++ [(toh, Nothing)])) + modifyMVar_ buf $ pure . (++ [(toh, ReachedEnd)]) hClose fromh -- Wait to lock output, and once we can, display everything @@ -194,31 +197,40 @@ bufferWriter buf = lockOutput (go [stdout, stderr]) go [] = return () go hs = do l <- takeMVar buf - forM_ l $ \(h, mb) -> do - maybe noop (B.hPut h) mb - hFlush h - let hs' = filter (\h -> not (any (== (h, Nothing)) l)) hs + forM_ l $ \(h, ba) -> case ba of + Output b -> do + B.hPut h b + hFlush h + InTempFile tmp -> do + B.hPut h =<< B.readFile tmp + void $ tryWhenExists $ removeFile tmp + ReachedEnd -> return () + let hs' = filter (\h -> not (any (== (h, ReachedEnd)) l)) hs putMVar buf [] go hs' - --- The buffer can grow up to 1 mb in size, but after that point, --- it's truncated to avoid propellor using unbounded memory --- when a process outputs a whole lot of stuff. -bufsz :: Int -bufsz = 1000000 - -addBuffer :: (Handle, Maybe B.ByteString) -> Buffer -> Buffer -addBuffer v@(_, Nothing) buf = buf ++ [v] -addBuffer (toh, Just b) buf = (toh, Just b') : other + +-- Adds a value to the Buffer. When adding Output to a Handle, it's cheaper +-- to combine it with any already buffered Output to that same Handle. +-- +-- When the total buffered Output exceeds 1 mb in size, it's moved out of +-- memory, to a temp file. This should only happen rarely, but is done to +-- avoid some verbose process unexpectedly causing excessive memory use. +addBuffer :: (Handle, BufferedActivity) -> Buffer -> IO Buffer +addBuffer (toh, Output b) buf + | B.length b' <= 1000000 = return ((toh, Output b') : other) + | otherwise = do + tmpdir <- getTemporaryDirectory + (tmp, h) <- openTempFile tmpdir "output.tmp" + B.hPut h b' + hClose h + return ((toh, InTempFile tmp) : other) where - (this, other) = partition (\v -> fst v == toh && isJust (snd v)) buf - b' = truncateBuffer $ B.concat (mapMaybe snd this) <> b - --- Truncate a buffer by removing lines from the front until it's --- small enough. -truncateBuffer :: B.ByteString -> B.ByteString -truncateBuffer b - | B.length b <= bufsz = b - | otherwise = truncateBuffer $ snd $ B.breakByte nl b - where - nl = fromIntegral (ord '\n') + b' = B.concat (mapMaybe getOutput this) <> b + (this, other) = partition same buf + same v = fst v == toh && case snd v of + Output _ -> True + _ -> False + getOutput v = case snd v of + Output b'' -> Just b'' + _ -> Nothing +addBuffer v buf = return (buf ++ [v]) -- cgit v1.2.3