From 5cde1ed21cc912db0b53846196f920fe52835dbc Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 17:03:17 -0400 Subject: fix memory leak, and optimise when command output is very large --- src/Utility/ConcurrentOutput.hs | 113 +++++++++++++++++++++++----------------- 1 file changed, 66 insertions(+), 47 deletions(-) (limited to 'src/Utility') diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index 5bf973de..be1562ac 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -1,8 +1,11 @@ {-# LANGUAGE BangPatterns #-} +{-# OPTIONS_GHC -fno-warn-tabs #-} -- | Concurrent output handling. module Utility.ConcurrentOutput ( + takeOutputLock, + dropOutputLock, withConcurrentOutput, outputConcurrent, createProcessConcurrent, @@ -146,6 +149,20 @@ outputConcurrent s = do hFlush stdout -- TODO +-- | This must be used to wait for processes started with +-- `createProcessConcurrent`. +-- +-- This is necessary because `System.Process.waitForProcess` has a +-- race condition when two threads check the same process. If the race +-- is triggered, one thread will successfully wait, but the other +-- throws a DoesNotExist exception. +waitForProcessConcurrent :: P.ProcessHandle -> IO ExitCode +waitForProcessConcurrent h = do + v <- tryWhenExists (P.waitForProcess h) + case v of + Just r -> return r + Nothing -> maybe (waitForProcessConcurrent h) return =<< P.getProcessExitCode h + -- | Wrapper around `System.Process.createProcess` that prevents -- multiple processes that are running concurrently from writing -- to stdout/stderr at the same time. @@ -196,37 +213,20 @@ createProcessConcurrent p , P.std_err = rediroutput (P.std_err p) toerrh } r <- P.createProcess p' - hClose toouth - hClose toerrh - buf <- newMVar [] - void $ async $ outputDrainer (P.std_out p) fromouth stdout buf - void $ async $ outputDrainer (P.std_err p) fromerrh stderr buf - void $ async $ bufferWriter buf + outbuf <- setupBuffer stdout toouth (P.std_out p) fromouth + errbuf <- setupBuffer stderr toerrh (P.std_err p) fromerrh + void $ async $ bufferWriter [outbuf, errbuf] return r pipe = do (from, to) <- createPipe (,) <$> fdToHandle to <*> fdToHandle from --- | This must be used to wait for processes started with --- `createProcessConcurrent`. --- --- This is necessary because `System.Process.waitForProcess` has a --- race condition when two threads check the same process. If the race --- is triggered, one thread will successfully wait, but the other --- throws a DoesNotExist exception. -waitForProcessConcurrent :: P.ProcessHandle -> IO ExitCode -waitForProcessConcurrent h = do - v <- tryWhenExists (P.waitForProcess h) - case v of - Just r -> return r - Nothing -> maybe (waitForProcessConcurrent h) return =<< P.getProcessExitCode h - willOutput :: P.StdStream -> Bool willOutput P.Inherit = True willOutput _ = False -type Buffer = [(Handle, BufferedActivity)] +type Buffer = [BufferedActivity] data BufferedActivity = ReachedEnd @@ -234,43 +234,62 @@ data BufferedActivity | InTempFile FilePath deriving (Eq) +instance Show BufferedActivity where + show ReachedEnd = "ReachedEnd" + show (Output b) = "Output " ++ show (B.length b) + show (InTempFile t) = "InTempFile " ++ t + +setupBuffer :: Handle -> Handle -> P.StdStream -> Handle -> IO (Handle, MVar Buffer, TMVar ()) +setupBuffer h toh ss fromh = do + hClose toh + buf <- newMVar [] + bufsig <- atomically newEmptyTMVar + void $ async $ outputDrainer ss fromh buf bufsig + return (h, buf, bufsig) + -- Drain output from the handle, and buffer it. -outputDrainer :: P.StdStream -> Handle -> Handle -> MVar Buffer -> IO () -outputDrainer ss fromh toh buf +outputDrainer :: P.StdStream -> Handle -> MVar Buffer -> TMVar () -> IO () +outputDrainer ss fromh buf bufsig | willOutput ss = go | otherwise = atend where go = do - v <- tryIO $ B.hGetSome fromh 1024 + v <- tryIO $ B.hGetSome fromh 1048576 case v of Right b | not (B.null b) -> do - modifyMVar_ buf $ addBuffer (toh, Output b) + modifyMVar_ buf $ addBuffer (Output b) + changed go _ -> atend atend = do - modifyMVar_ buf $ pure . ((toh, ReachedEnd) :) + modifyMVar_ buf $ pure . (ReachedEnd :) + changed hClose fromh + changed = atomically $ do + void $ tryTakeTMVar bufsig + putTMVar bufsig () -- Wait to lock output, and once we can, display everything --- that's put into buffer, until the end is signaled by Nothing --- for both stdout and stderr. -bufferWriter :: MVar Buffer -> IO () -bufferWriter buf = lockOutput (go [stdout, stderr]) +-- that's put into the buffers. +bufferWriter :: [(Handle, MVar Buffer, TMVar ())] -> IO () +bufferWriter = void . lockOutput . mapConcurrently go where - go [] = return () - go hs = do + go v@(outh, buf, bufsig) = do + atomically $ takeTMVar bufsig l <- takeMVar buf - forM_ (reverse l) $ \(h, ba) -> case ba of + putMVar buf [] + forM_ (reverse l) $ \ba -> case ba of Output b -> do - B.hPut h b - hFlush h + B.hPut outh b + hFlush outh + return () InTempFile tmp -> do - B.hPut h =<< B.readFile tmp + B.hPut outh =<< B.readFile tmp void $ tryWhenExists $ removeFile tmp ReachedEnd -> return () - let hs' = filter (\h -> not (any (== (h, ReachedEnd)) l)) hs - putMVar buf [] - go hs' + if any (== ReachedEnd) l + then return () + else go v -- Adds a value to the Buffer. When adding Output to a Handle, it's cheaper -- to combine it with any already buffered Output to that same Handle. @@ -278,22 +297,22 @@ bufferWriter buf = lockOutput (go [stdout, stderr]) -- When the total buffered Output exceeds 1 mb in size, it's moved out of -- memory, to a temp file. This should only happen rarely, but is done to -- avoid some verbose process unexpectedly causing excessive memory use. -addBuffer :: (Handle, BufferedActivity) -> Buffer -> IO Buffer -addBuffer (toh, Output b) buf - | B.length b' <= 1000000 = return ((toh, Output b') : other) +addBuffer :: BufferedActivity -> Buffer -> IO Buffer +addBuffer (Output b) buf + | B.length b' <= 1048576 = return (Output b' : other) | otherwise = do tmpdir <- getTemporaryDirectory (tmp, h) <- openTempFile tmpdir "output.tmp" B.hPut h b' hClose h - return ((toh, InTempFile tmp) : other) + return (InTempFile tmp : other) where !b' = B.concat (mapMaybe getOutput this) <> b - !(this, other) = partition same buf - same v = fst v == toh && case snd v of + !(this, other) = partition isOutput buf + isOutput v = case v of Output _ -> True _ -> False - getOutput v = case snd v of + getOutput v = case v of Output b'' -> Just b'' _ -> Nothing -addBuffer v buf = return (buf ++ [v]) +addBuffer v buf = return (v:buf) -- cgit v1.2.3