summaryrefslogtreecommitdiff
path: root/src/Utility
diff options
context:
space:
mode:
authorJoey Hess2015-10-28 17:03:17 -0400
committerJoey Hess2015-10-28 17:03:17 -0400
commit5cde1ed21cc912db0b53846196f920fe52835dbc (patch)
treeee6ffa3c3044daca1b25bc08db9c653dbb774f0b /src/Utility
parentd61e3866d794635de5875d7292861fb49ad0340a (diff)
fix memory leak, and optimise when command output is very large
Diffstat (limited to 'src/Utility')
-rw-r--r--src/Utility/ConcurrentOutput.hs113
1 files changed, 66 insertions, 47 deletions
diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs
index 5bf973de..be1562ac 100644
--- a/src/Utility/ConcurrentOutput.hs
+++ b/src/Utility/ConcurrentOutput.hs
@@ -1,8 +1,11 @@
{-# LANGUAGE BangPatterns #-}
+{-# OPTIONS_GHC -fno-warn-tabs #-}
-- | Concurrent output handling.
module Utility.ConcurrentOutput (
+ takeOutputLock,
+ dropOutputLock,
withConcurrentOutput,
outputConcurrent,
createProcessConcurrent,
@@ -146,6 +149,20 @@ outputConcurrent s = do
hFlush stdout
-- TODO
+-- | This must be used to wait for processes started with
+-- `createProcessConcurrent`.
+--
+-- This is necessary because `System.Process.waitForProcess` has a
+-- race condition when two threads check the same process. If the race
+-- is triggered, one thread will successfully wait, but the other
+-- throws a DoesNotExist exception.
+waitForProcessConcurrent :: P.ProcessHandle -> IO ExitCode
+waitForProcessConcurrent h = do
+ v <- tryWhenExists (P.waitForProcess h)
+ case v of
+ Just r -> return r
+ Nothing -> maybe (waitForProcessConcurrent h) return =<< P.getProcessExitCode h
+
-- | Wrapper around `System.Process.createProcess` that prevents
-- multiple processes that are running concurrently from writing
-- to stdout/stderr at the same time.
@@ -196,37 +213,20 @@ createProcessConcurrent p
, P.std_err = rediroutput (P.std_err p) toerrh
}
r <- P.createProcess p'
- hClose toouth
- hClose toerrh
- buf <- newMVar []
- void $ async $ outputDrainer (P.std_out p) fromouth stdout buf
- void $ async $ outputDrainer (P.std_err p) fromerrh stderr buf
- void $ async $ bufferWriter buf
+ outbuf <- setupBuffer stdout toouth (P.std_out p) fromouth
+ errbuf <- setupBuffer stderr toerrh (P.std_err p) fromerrh
+ void $ async $ bufferWriter [outbuf, errbuf]
return r
pipe = do
(from, to) <- createPipe
(,) <$> fdToHandle to <*> fdToHandle from
--- | This must be used to wait for processes started with
--- `createProcessConcurrent`.
---
--- This is necessary because `System.Process.waitForProcess` has a
--- race condition when two threads check the same process. If the race
--- is triggered, one thread will successfully wait, but the other
--- throws a DoesNotExist exception.
-waitForProcessConcurrent :: P.ProcessHandle -> IO ExitCode
-waitForProcessConcurrent h = do
- v <- tryWhenExists (P.waitForProcess h)
- case v of
- Just r -> return r
- Nothing -> maybe (waitForProcessConcurrent h) return =<< P.getProcessExitCode h
-
willOutput :: P.StdStream -> Bool
willOutput P.Inherit = True
willOutput _ = False
-type Buffer = [(Handle, BufferedActivity)]
+type Buffer = [BufferedActivity]
data BufferedActivity
= ReachedEnd
@@ -234,43 +234,62 @@ data BufferedActivity
| InTempFile FilePath
deriving (Eq)
+instance Show BufferedActivity where
+ show ReachedEnd = "ReachedEnd"
+ show (Output b) = "Output " ++ show (B.length b)
+ show (InTempFile t) = "InTempFile " ++ t
+
+setupBuffer :: Handle -> Handle -> P.StdStream -> Handle -> IO (Handle, MVar Buffer, TMVar ())
+setupBuffer h toh ss fromh = do
+ hClose toh
+ buf <- newMVar []
+ bufsig <- atomically newEmptyTMVar
+ void $ async $ outputDrainer ss fromh buf bufsig
+ return (h, buf, bufsig)
+
-- Drain output from the handle, and buffer it.
-outputDrainer :: P.StdStream -> Handle -> Handle -> MVar Buffer -> IO ()
-outputDrainer ss fromh toh buf
+outputDrainer :: P.StdStream -> Handle -> MVar Buffer -> TMVar () -> IO ()
+outputDrainer ss fromh buf bufsig
| willOutput ss = go
| otherwise = atend
where
go = do
- v <- tryIO $ B.hGetSome fromh 1024
+ v <- tryIO $ B.hGetSome fromh 1048576
case v of
Right b | not (B.null b) -> do
- modifyMVar_ buf $ addBuffer (toh, Output b)
+ modifyMVar_ buf $ addBuffer (Output b)
+ changed
go
_ -> atend
atend = do
- modifyMVar_ buf $ pure . ((toh, ReachedEnd) :)
+ modifyMVar_ buf $ pure . (ReachedEnd :)
+ changed
hClose fromh
+ changed = atomically $ do
+ void $ tryTakeTMVar bufsig
+ putTMVar bufsig ()
-- Wait to lock output, and once we can, display everything
--- that's put into buffer, until the end is signaled by Nothing
--- for both stdout and stderr.
-bufferWriter :: MVar Buffer -> IO ()
-bufferWriter buf = lockOutput (go [stdout, stderr])
+-- that's put into the buffers.
+bufferWriter :: [(Handle, MVar Buffer, TMVar ())] -> IO ()
+bufferWriter = void . lockOutput . mapConcurrently go
where
- go [] = return ()
- go hs = do
+ go v@(outh, buf, bufsig) = do
+ atomically $ takeTMVar bufsig
l <- takeMVar buf
- forM_ (reverse l) $ \(h, ba) -> case ba of
+ putMVar buf []
+ forM_ (reverse l) $ \ba -> case ba of
Output b -> do
- B.hPut h b
- hFlush h
+ B.hPut outh b
+ hFlush outh
+ return ()
InTempFile tmp -> do
- B.hPut h =<< B.readFile tmp
+ B.hPut outh =<< B.readFile tmp
void $ tryWhenExists $ removeFile tmp
ReachedEnd -> return ()
- let hs' = filter (\h -> not (any (== (h, ReachedEnd)) l)) hs
- putMVar buf []
- go hs'
+ if any (== ReachedEnd) l
+ then return ()
+ else go v
-- Adds a value to the Buffer. When adding Output to a Handle, it's cheaper
-- to combine it with any already buffered Output to that same Handle.
@@ -278,22 +297,22 @@ bufferWriter buf = lockOutput (go [stdout, stderr])
-- When the total buffered Output exceeds 1 mb in size, it's moved out of
-- memory, to a temp file. This should only happen rarely, but is done to
-- avoid some verbose process unexpectedly causing excessive memory use.
-addBuffer :: (Handle, BufferedActivity) -> Buffer -> IO Buffer
-addBuffer (toh, Output b) buf
- | B.length b' <= 1000000 = return ((toh, Output b') : other)
+addBuffer :: BufferedActivity -> Buffer -> IO Buffer
+addBuffer (Output b) buf
+ | B.length b' <= 1048576 = return (Output b' : other)
| otherwise = do
tmpdir <- getTemporaryDirectory
(tmp, h) <- openTempFile tmpdir "output.tmp"
B.hPut h b'
hClose h
- return ((toh, InTempFile tmp) : other)
+ return (InTempFile tmp : other)
where
!b' = B.concat (mapMaybe getOutput this) <> b
- !(this, other) = partition same buf
- same v = fst v == toh && case snd v of
+ !(this, other) = partition isOutput buf
+ isOutput v = case v of
Output _ -> True
_ -> False
- getOutput v = case snd v of
+ getOutput v = case v of
Output b'' -> Just b''
_ -> Nothing
-addBuffer v buf = return (buf ++ [v])
+addBuffer v buf = return (v:buf)