summaryrefslogtreecommitdiff
path: root/src/Utility/ConcurrentOutput.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Utility/ConcurrentOutput.hs')
-rw-r--r--src/Utility/ConcurrentOutput.hs348
1 files changed, 0 insertions, 348 deletions
diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs
deleted file mode 100644
index c24744a3..00000000
--- a/src/Utility/ConcurrentOutput.hs
+++ /dev/null
@@ -1,348 +0,0 @@
-{-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances #-}
-{-# OPTIONS_GHC -fno-warn-tabs #-}
-
--- |
--- Copyright: 2013 Joey Hess <id@joeyh.name>
--- License: BSD-2-clause
---
--- Concurrent output handling.
---
--- > import Control.Concurrent.Async
--- > import Control.Concurrent.Output
--- >
--- > main = withConcurrentOutput $
--- > outputConcurrent "washed the car\n"
--- > `concurrently`
--- > outputConcurrent "walked the dog\n"
--- > `concurrently`
--- > createProcessConcurrent (proc "ls" [])
-
-module Utility.ConcurrentOutput (
- withConcurrentOutput,
- flushConcurrentOutput,
- Outputable(..),
- outputConcurrent,
- createProcessConcurrent,
- waitForProcessConcurrent,
- lockOutput,
-) where
-
-import System.IO
-import System.Posix.IO
-import System.Directory
-import System.Exit
-import Control.Monad
-import Control.Monad.IO.Class (liftIO, MonadIO)
-import Control.Applicative
-import System.IO.Unsafe (unsafePerformIO)
-import Control.Concurrent
-import Control.Concurrent.STM
-import Control.Concurrent.Async
-import Data.Maybe
-import Data.List
-import Data.Monoid
-import qualified System.Process as P
-import qualified Data.Set as S
-import qualified Data.ByteString as B
-import qualified Data.Text as T
-import Data.Text.Encoding (encodeUtf8)
-
-import Utility.Monad
-import Utility.Exception
-
-data OutputHandle = OutputHandle
- { outputLock :: TMVar Lock
- , outputBuffer :: TMVar Buffer
- , outputThreads :: TMVar (S.Set (Async ()))
- }
-
-data Lock = Locked
-
--- | A shared global variable for the OutputHandle.
-{-# NOINLINE globalOutputHandle #-}
-globalOutputHandle :: MVar OutputHandle
-globalOutputHandle = unsafePerformIO $
- newMVar =<< OutputHandle
- <$> newEmptyTMVarIO
- <*> newTMVarIO []
- <*> newTMVarIO S.empty
-
--- | Gets the global OutputHandle.
-getOutputHandle :: IO OutputHandle
-getOutputHandle = readMVar globalOutputHandle
-
--- | Holds a lock while performing an action that will display output.
--- While this is running, other threads that try to lockOutput will block,
--- and calls to `outputConcurrent` and `createProcessConcurrent`
--- will result in that concurrent output being buffered and not
--- displayed until the action is done.
-lockOutput :: (MonadIO m, MonadMask m) => m a -> m a
-lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock)
-
--- | Blocks until we have the output lock.
-takeOutputLock :: IO ()
-takeOutputLock = void $ takeOutputLock' True
-
--- | Tries to take the output lock, without blocking.
-tryTakeOutputLock :: IO Bool
-tryTakeOutputLock = takeOutputLock' False
-
-withLock :: (TMVar Lock -> STM a) -> IO a
-withLock a = do
- lck <- outputLock <$> getOutputHandle
- atomically (a lck)
-
-takeOutputLock' :: Bool -> IO Bool
-takeOutputLock' block = do
- locked <- withLock $ \l -> do
- v <- tryTakeTMVar l
- case v of
- Just Locked
- | block -> retry
- | otherwise -> do
- -- Restore value we took.
- putTMVar l Locked
- return False
- Nothing -> do
- putTMVar l Locked
- return True
- when locked $ do
- bv <- outputBuffer <$> getOutputHandle
- buf <- atomically $ swapTMVar bv []
- emitBuffer stdout buf
- return locked
-
--- | Only safe to call after taking the output lock.
-dropOutputLock :: IO ()
-dropOutputLock = withLock $ void . takeTMVar
-
--- | Use this around any IO actions that use `outputConcurrent`
--- or `createProcessConcurrent`
---
--- This is necessary to ensure that buffered concurrent output actually
--- gets displayed before the program exits.
-withConcurrentOutput :: IO a -> IO a
-withConcurrentOutput a = a `finally` flushConcurrentOutput
-
--- | Blocks until any processes started by `createProcessConcurrent` have
--- finished, and any buffered output is displayed.
-flushConcurrentOutput :: IO ()
-flushConcurrentOutput = do
- -- Wait for all outputThreads to finish.
- v <- outputThreads <$> getOutputHandle
- atomically $ do
- r <- takeTMVar v
- if r == S.empty
- then putTMVar v r
- else retry
- -- Take output lock to ensure that nothing else is currently
- -- generating output, and flush any buffered output.
- lockOutput $ return ()
-
--- | Values that can be output.
-class Outputable v where
- toOutput :: v -> B.ByteString
-
-instance Outputable B.ByteString where
- toOutput = id
-
-instance Outputable T.Text where
- toOutput = encodeUtf8
-
-instance Outputable String where
- toOutput = toOutput . T.pack
-
--- | Displays a value to stdout, and flush output so it's displayed.
---
--- Uses locking to ensure that the whole output occurs atomically
--- even when other threads are concurrently generating output.
---
--- When something else is writing to the console at the same time, this does
--- not block. It buffers the value, so it will be displayed once the other
--- writer is done.
-outputConcurrent :: Outputable v => v -> IO ()
-outputConcurrent v = bracket setup cleanup go
- where
- setup = tryTakeOutputLock
- cleanup False = return ()
- cleanup True = dropOutputLock
- go True = do
- B.hPut stdout (toOutput v)
- hFlush stdout
- go False = do
- bv <- outputBuffer <$> getOutputHandle
- oldbuf <- atomically $ takeTMVar bv
- newbuf <- addBuffer (Output (toOutput v)) oldbuf
- atomically $ putTMVar bv newbuf
-
--- | This must be used to wait for processes started with
--- `createProcessConcurrent`.
---
--- This is necessary because `System.Process.waitForProcess` has a
--- race condition when two threads check the same process. If the race
--- is triggered, one thread will successfully wait, but the other
--- throws a DoesNotExist exception.
-waitForProcessConcurrent :: P.ProcessHandle -> IO ExitCode
-waitForProcessConcurrent h = do
- v <- tryWhenExists (P.waitForProcess h)
- case v of
- Just r -> return r
- Nothing -> maybe (waitForProcessConcurrent h) return =<< P.getProcessExitCode h
-
--- | Wrapper around `System.Process.createProcess` that prevents
--- multiple processes that are running concurrently from writing
--- to stdout/stderr at the same time.
---
--- If the process does not output to stdout or stderr, it's run
--- by createProcess entirely as usual. Only processes that can generate
--- output are handled specially:
---
--- A process is allowed to write to stdout and stderr in the usual
--- way, assuming it can successfully take the output lock.
---
--- When the output lock is held (by another concurrent process,
--- or because `outputConcurrent` is being called at the same time),
--- the process is instead run with its stdout and stderr
--- redirected to a buffer. The buffered output will be displayed as soon
--- as the output lock becomes free.
-createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
-createProcessConcurrent p
- | willOutput (P.std_out p) || willOutput (P.std_err p) =
- ifM tryTakeOutputLock
- ( firstprocess
- , concurrentprocess
- )
- | otherwise = P.createProcess p
- where
- rediroutput ss h
- | willOutput ss = P.UseHandle h
- | otherwise = ss
-
- firstprocess = do
- r@(_, _, _, h) <- P.createProcess p
- `onException` dropOutputLock
- -- Wait for the process to exit and drop the lock.
- void $ async $ do
- void $ tryIO $ waitForProcessConcurrent h
- dropOutputLock
- return r
-
- concurrentprocess = do
- (toouth, fromouth) <- pipe
- (toerrh, fromerrh) <- pipe
- let p' = p
- { P.std_out = rediroutput (P.std_out p) toouth
- , P.std_err = rediroutput (P.std_err p) toerrh
- }
- r <- P.createProcess p'
- outbuf <- setupBuffer stdout toouth (P.std_out p) fromouth
- errbuf <- setupBuffer stderr toerrh (P.std_err p) fromerrh
- void $ async $ bufferWriter [outbuf, errbuf]
- return r
-
- pipe = do
- (from, to) <- createPipe
- (,) <$> fdToHandle to <*> fdToHandle from
-
-willOutput :: P.StdStream -> Bool
-willOutput P.Inherit = True
-willOutput _ = False
-
--- Built up with newest seen output first.
-type Buffer = [BufferedActivity]
-
-data BufferedActivity
- = ReachedEnd
- | Output B.ByteString
- | InTempFile FilePath
- deriving (Eq)
-
-setupBuffer :: Handle -> Handle -> P.StdStream -> Handle -> IO (Handle, MVar Buffer, TMVar ())
-setupBuffer h toh ss fromh = do
- hClose toh
- buf <- newMVar []
- bufsig <- atomically newEmptyTMVar
- void $ async $ outputDrainer ss fromh buf bufsig
- return (h, buf, bufsig)
-
--- Drain output from the handle, and buffer it.
-outputDrainer :: P.StdStream -> Handle -> MVar Buffer -> TMVar () -> IO ()
-outputDrainer ss fromh buf bufsig
- | willOutput ss = go
- | otherwise = atend
- where
- go = do
- v <- tryIO $ B.hGetSome fromh 1048576
- case v of
- Right b | not (B.null b) -> do
- modifyMVar_ buf $ addBuffer (Output b)
- changed
- go
- _ -> atend
- atend = do
- modifyMVar_ buf $ pure . (ReachedEnd :)
- changed
- hClose fromh
- changed = atomically $ do
- void $ tryTakeTMVar bufsig
- putTMVar bufsig ()
-
--- Wait to lock output, and once we can, display everything
--- that's put into the buffers, until the end.
-bufferWriter :: [(Handle, MVar Buffer, TMVar ())] -> IO ()
-bufferWriter ts = do
- worker <- async $ void $ lockOutput $ mapConcurrently go ts
- v <- outputThreads <$> getOutputHandle
- atomically $ do
- s <- takeTMVar v
- putTMVar v (S.insert worker s)
- void $ async $ do
- void $ waitCatch worker
- atomically $ do
- s <- takeTMVar v
- putTMVar v (S.delete worker s)
- where
- go v@(outh, buf, bufsig) = do
- void $ atomically $ takeTMVar bufsig
- l <- takeMVar buf
- putMVar buf []
- emitBuffer outh l
- if any (== ReachedEnd) l
- then return ()
- else go v
-
-emitBuffer :: Handle -> Buffer -> IO ()
-emitBuffer outh l = forM_ (reverse l) $ \ba -> case ba of
- Output b -> do
- B.hPut outh b
- hFlush outh
- InTempFile tmp -> do
- B.hPut outh =<< B.readFile tmp
- void $ tryWhenExists $ removeFile tmp
- ReachedEnd -> return ()
-
--- Adds a value to the Buffer. When adding Output to a Handle, it's cheaper
--- to combine it with any already buffered Output to that same Handle.
---
--- When the total buffered Output exceeds 1 mb in size, it's moved out of
--- memory, to a temp file. This should only happen rarely, but is done to
--- avoid some verbose process unexpectedly causing excessive memory use.
-addBuffer :: BufferedActivity -> Buffer -> IO Buffer
-addBuffer (Output b) buf
- | B.length b' <= 1048576 = return (Output b' : other)
- | otherwise = do
- tmpdir <- getTemporaryDirectory
- (tmp, h) <- openTempFile tmpdir "output.tmp"
- B.hPut h b'
- hClose h
- return (InTempFile tmp : other)
- where
- !b' = B.concat (mapMaybe getOutput this) <> b
- !(this, other) = partition isOutput buf
- isOutput v = case v of
- Output _ -> True
- _ -> False
- getOutput v = case v of
- Output b'' -> Just b''
- _ -> Nothing
-addBuffer v buf = return (v:buf)