From 592c65d02bf07d053d2fbe8a568f88d1b28e1a65 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Sun, 1 Nov 2015 16:53:25 -0400 Subject: merge from concurrent-output --- src/System/Console/Concurrent/Internal.hs | 522 ++++++++++++++++++++++++++++++ 1 file changed, 522 insertions(+) create mode 100644 src/System/Console/Concurrent/Internal.hs (limited to 'src/System/Console/Concurrent/Internal.hs') diff --git a/src/System/Console/Concurrent/Internal.hs b/src/System/Console/Concurrent/Internal.hs new file mode 100644 index 00000000..caef9833 --- /dev/null +++ b/src/System/Console/Concurrent/Internal.hs @@ -0,0 +1,522 @@ +{-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances, TupleSections #-} + +-- | +-- Copyright: 2015 Joey Hess +-- License: BSD-2-clause +-- +-- Concurrent output handling, internals. +-- +-- May change at any time. + +module System.Console.Concurrent.Internal where + +import System.IO +import System.Posix.IO +import System.Directory +import System.Exit +import Control.Monad +import Control.Monad.IO.Class (liftIO, MonadIO) +import Control.Applicative +import System.IO.Unsafe (unsafePerformIO) +import Control.Concurrent +import Control.Concurrent.STM +import Control.Concurrent.Async +import Data.Maybe +import Data.List +import Data.Monoid +import qualified System.Process as P +import qualified Data.Text as T +import qualified Data.Text.IO as T + +import Utility.Monad +import Utility.Exception + +data OutputHandle = OutputHandle + { outputLock :: TMVar Lock + , outputBuffer :: TMVar OutputBuffer + , errorBuffer :: TMVar OutputBuffer + , outputThreads :: TMVar Integer + , processWaiters :: TMVar [Async ()] + , waitForProcessLock :: TMVar () + } + +data Lock = Locked + +-- | A shared global variable for the OutputHandle. +{-# NOINLINE globalOutputHandle #-} +globalOutputHandle :: OutputHandle +globalOutputHandle = unsafePerformIO $ OutputHandle + <$> newEmptyTMVarIO + <*> newTMVarIO (OutputBuffer []) + <*> newTMVarIO (OutputBuffer []) + <*> newTMVarIO 0 + <*> newTMVarIO [] + <*> newEmptyTMVarIO + +-- | Holds a lock while performing an action. This allows the action to +-- perform its own output to the console, without using functions from this +-- module. +-- +-- While this is running, other threads that try to lockOutput will block. +-- Any calls to `outputConcurrent` and `createProcessConcurrent` will not +-- block, but the output will be buffered and displayed only once the +-- action is done. +lockOutput :: (MonadIO m, MonadMask m) => m a -> m a +lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock) + +-- | Blocks until we have the output lock. +takeOutputLock :: IO () +takeOutputLock = void $ takeOutputLock' True + +-- | Tries to take the output lock, without blocking. +tryTakeOutputLock :: IO Bool +tryTakeOutputLock = takeOutputLock' False + +withLock :: (TMVar Lock -> STM a) -> IO a +withLock a = atomically $ a (outputLock globalOutputHandle) + +takeOutputLock' :: Bool -> IO Bool +takeOutputLock' block = do + locked <- withLock $ \l -> do + v <- tryTakeTMVar l + case v of + Just Locked + | block -> retry + | otherwise -> do + -- Restore value we took. + putTMVar l Locked + return False + Nothing -> do + putTMVar l Locked + return True + when locked $ do + (outbuf, errbuf) <- atomically $ (,) + <$> swapTMVar (outputBuffer globalOutputHandle) (OutputBuffer []) + <*> swapTMVar (errorBuffer globalOutputHandle) (OutputBuffer []) + emitOutputBuffer StdOut outbuf + emitOutputBuffer StdErr errbuf + return locked + +-- | Only safe to call after taking the output lock. +dropOutputLock :: IO () +dropOutputLock = withLock $ void . takeTMVar + +-- | Use this around any actions that use `outputConcurrent` +-- or `createProcessConcurrent` +-- +-- This is necessary to ensure that buffered concurrent output actually +-- gets displayed before the program exits. +withConcurrentOutput :: (MonadIO m, MonadMask m) => m a -> m a +withConcurrentOutput a = a `finally` liftIO flushConcurrentOutput + +-- | Blocks until any processes started by `createProcessConcurrent` have +-- finished, and any buffered output is displayed. +-- +-- `withConcurrentOutput` calls this at the end; you can call it anytime +-- you want to flush output. +flushConcurrentOutput :: IO () +flushConcurrentOutput = do + -- Wait for all outputThreads to finish. + let v = outputThreads globalOutputHandle + atomically $ do + r <- takeTMVar v + if r <= 0 + then putTMVar v r + else retry + -- Take output lock to ensure that nothing else is currently + -- generating output, and flush any buffered output. + lockOutput $ return () + +-- | Values that can be output. +class Outputable v where + toOutput :: v -> T.Text + +instance Outputable T.Text where + toOutput = id + +instance Outputable String where + toOutput = toOutput . T.pack + +-- | Displays a value to stdout. +-- +-- No newline is appended to the value, so if you want a newline, be sure +-- to include it yourself. +-- +-- Uses locking to ensure that the whole output occurs atomically +-- even when other threads are concurrently generating output. +-- +-- When something else is writing to the console at the same time, this does +-- not block. It buffers the value, so it will be displayed once the other +-- writer is done. +outputConcurrent :: Outputable v => v -> IO () +outputConcurrent v = bracket setup cleanup go + where + setup = tryTakeOutputLock + cleanup False = return () + cleanup True = dropOutputLock + go True = do + T.hPutStr stdout (toOutput v) + hFlush stdout + go False = do + let bv = outputBuffer globalOutputHandle + oldbuf <- atomically $ takeTMVar bv + newbuf <- addOutputBuffer (Output (toOutput v)) oldbuf + atomically $ putTMVar bv newbuf + +newtype ConcurrentProcessHandle = ConcurrentProcessHandle P.ProcessHandle + +toConcurrentProcessHandle :: (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -> (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) +toConcurrentProcessHandle (i, o, e, h) = (i, o, e, ConcurrentProcessHandle h) + +-- | Use this to wait for processes started with +-- `createProcessConcurrent` and `createProcessForeground`, and get their +-- exit status. +-- +-- Note that such processes are actually automatically waited for +-- internally, so not calling this exiplictly will not result +-- in zombie processes. This behavior differs from `P.waitForProcess` +waitForProcessConcurrent :: ConcurrentProcessHandle -> IO ExitCode +waitForProcessConcurrent (ConcurrentProcessHandle h) = checkexit + where + checkexit = maybe waitsome return =<< P.getProcessExitCode h + waitsome = maybe checkexit return =<< bracket lock unlock go + lck = waitForProcessLock globalOutputHandle + lock = atomically $ tryPutTMVar lck () + unlock True = atomically $ takeTMVar lck + unlock False = return () + go True = do + let v = processWaiters globalOutputHandle + l <- atomically $ readTMVar v + if null l + -- Avoid waitAny [] which blocks forever; + then Just <$> P.waitForProcess h + else do + -- Wait for any of the running + -- processes to exit. It may or may not + -- be the one corresponding to the + -- ProcessHandle. If it is, + -- getProcessExitCode will succeed. + void $ tryIO $ waitAny l + hFlush stdout + return Nothing + go False = do + -- Another thread took the lck first. Wait for that thread to + -- wait for one of the running processes to exit. + atomically $ do + putTMVar lck () + takeTMVar lck + return Nothing + +-- Registers an action that waits for a process to exit, +-- adding it to the processWaiters list, and removing it once the action +-- completes. +asyncProcessWaiter :: IO () -> IO () +asyncProcessWaiter waitaction = do + regdone <- newEmptyTMVarIO + waiter <- async $ do + self <- atomically (takeTMVar regdone) + waitaction `finally` unregister self + register waiter regdone + where + v = processWaiters globalOutputHandle + register waiter regdone = atomically $ do + l <- takeTMVar v + putTMVar v (waiter:l) + putTMVar regdone waiter + unregister waiter = atomically $ do + l <- takeTMVar v + putTMVar v (filter (/= waiter) l) + +-- | Wrapper around `System.Process.createProcess` that prevents +-- multiple processes that are running concurrently from writing +-- to stdout/stderr at the same time. +-- +-- If the process does not output to stdout or stderr, it's run +-- by createProcess entirely as usual. Only processes that can generate +-- output are handled specially: +-- +-- A process is allowed to write to stdout and stderr in the usual +-- way, assuming it can successfully take the output lock. +-- +-- When the output lock is held (ie, by another concurrent process, +-- or because `outputConcurrent` is being called at the same time), +-- the process is instead run with its stdout and stderr +-- redirected to a buffer. The buffered output will be displayed as soon +-- as the output lock becomes free. +createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) +createProcessConcurrent p + | willOutput (P.std_out p) || willOutput (P.std_err p) = + ifM tryTakeOutputLock + ( fgProcess p + , bgProcess p + ) + | otherwise = do + r@(_, _, _, h) <- P.createProcess p + asyncProcessWaiter $ do + void $ P.waitForProcess h + return (toConcurrentProcessHandle r) + +-- | Wrapper around `System.Process.createProcess` that makes sure a process +-- is run in the foreground, with direct access to stdout and stderr. +-- Useful when eg, running an interactive process. +createProcessForeground :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) +createProcessForeground p = do + takeOutputLock + fgProcess p + +fgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) +fgProcess p = do + r@(_, _, _, h) <- P.createProcess p + `onException` dropOutputLock + -- Wait for the process to exit and drop the lock. + asyncProcessWaiter $ do + void $ P.waitForProcess h + dropOutputLock + return (toConcurrentProcessHandle r) + +bgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) +bgProcess p = do + (toouth, fromouth) <- pipe + (toerrh, fromerrh) <- pipe + let p' = p + { P.std_out = rediroutput (P.std_out p) toouth + , P.std_err = rediroutput (P.std_err p) toerrh + } + registerOutputThread + r@(_, _, _, h) <- P.createProcess p' + `onException` unregisterOutputThread + asyncProcessWaiter $ void $ P.waitForProcess h + outbuf <- setupOutputBuffer StdOut toouth (P.std_out p) fromouth + errbuf <- setupOutputBuffer StdErr toerrh (P.std_err p) fromerrh + void $ async $ bufferWriter [outbuf, errbuf] + return (toConcurrentProcessHandle r) + where + pipe = do + (from, to) <- createPipe + (,) <$> fdToHandle to <*> fdToHandle from + rediroutput ss h + | willOutput ss = P.UseHandle h + | otherwise = ss + +willOutput :: P.StdStream -> Bool +willOutput P.Inherit = True +willOutput _ = False + +-- | Buffered output. +data OutputBuffer = OutputBuffer [OutputBufferedActivity] + deriving (Eq) + +data StdHandle = StdOut | StdErr + +toHandle :: StdHandle -> Handle +toHandle StdOut = stdout +toHandle StdErr = stderr + +bufferFor :: StdHandle -> TMVar OutputBuffer +bufferFor StdOut = outputBuffer globalOutputHandle +bufferFor StdErr = errorBuffer globalOutputHandle + +data OutputBufferedActivity + = Output T.Text + | InTempFile + { tempFile :: FilePath + , endsInNewLine :: Bool + } + deriving (Eq) + +data AtEnd = AtEnd + deriving Eq + +data BufSig = BufSig + +setupOutputBuffer :: StdHandle -> Handle -> P.StdStream -> Handle -> IO (StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd) +setupOutputBuffer h toh ss fromh = do + hClose toh + buf <- newMVar (OutputBuffer []) + bufsig <- atomically newEmptyTMVar + bufend <- atomically newEmptyTMVar + void $ async $ outputDrainer ss fromh buf bufsig bufend + return (h, buf, bufsig, bufend) + +-- Drain output from the handle, and buffer it. +outputDrainer :: P.StdStream -> Handle -> MVar OutputBuffer -> TMVar BufSig -> TMVar AtEnd -> IO () +outputDrainer ss fromh buf bufsig bufend + | willOutput ss = go + | otherwise = atend + where + go = do + t <- T.hGetChunk fromh + if T.null t + then atend + else do + modifyMVar_ buf $ addOutputBuffer (Output t) + changed + go + atend = do + atomically $ putTMVar bufend AtEnd + hClose fromh + changed = atomically $ do + void $ tryTakeTMVar bufsig + putTMVar bufsig BufSig + +registerOutputThread :: IO () +registerOutputThread = do + let v = outputThreads globalOutputHandle + atomically $ putTMVar v . succ =<< takeTMVar v + +unregisterOutputThread :: IO () +unregisterOutputThread = do + let v = outputThreads globalOutputHandle + atomically $ putTMVar v . pred =<< takeTMVar v + +-- Wait to lock output, and once we can, display everything +-- that's put into the buffers, until the end. +-- +-- If end is reached before lock is taken, instead add the command's +-- buffers to the global outputBuffer and errorBuffer. +bufferWriter :: [(StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd)] -> IO () +bufferWriter ts = do + activitysig <- atomically newEmptyTMVar + worker1 <- async $ lockOutput $ + ifM (atomically $ tryPutTMVar activitysig ()) + ( void $ mapConcurrently displaybuf ts + , noop -- buffers already moved to global + ) + worker2 <- async $ void $ globalbuf activitysig + void $ async $ do + void $ waitCatch worker1 + void $ waitCatch worker2 + unregisterOutputThread + where + displaybuf v@(outh, buf, bufsig, bufend) = do + change <- atomically $ + (Right <$> takeTMVar bufsig) + `orElse` + (Left <$> takeTMVar bufend) + l <- takeMVar buf + putMVar buf (OutputBuffer []) + emitOutputBuffer outh l + case change of + Right BufSig -> displaybuf v + Left AtEnd -> return () + globalbuf activitysig = do + ok <- atomically $ do + -- signal we're going to handle it + -- (returns false if the displaybuf already did) + ok <- tryPutTMVar activitysig () + -- wait for end of all buffers + when ok $ + mapM_ (\(_outh, _buf, _bufsig, bufend) -> takeTMVar bufend) ts + return ok + when ok $ do + -- add all of the command's buffered output to the + -- global output buffer, atomically + bs <- forM ts $ \(outh, buf, _bufsig, _bufend) -> + (outh,) <$> takeMVar buf + atomically $ + forM_ bs $ \(outh, b) -> + bufferOutputSTM' outh b + +-- Adds a value to the OutputBuffer. When adding Output to a Handle, +-- it's cheaper to combine it with any already buffered Output to that +-- same Handle. +-- +-- When the total buffered Output exceeds 1 mb in size, it's moved out of +-- memory, to a temp file. This should only happen rarely, but is done to +-- avoid some verbose process unexpectedly causing excessive memory use. +addOutputBuffer :: OutputBufferedActivity -> OutputBuffer -> IO OutputBuffer +addOutputBuffer (Output t) (OutputBuffer buf) + | T.length t' <= 1048576 = return $ OutputBuffer (Output t' : other) + | otherwise = do + tmpdir <- getTemporaryDirectory + (tmp, h) <- openTempFile tmpdir "output.tmp" + let !endnl = endsNewLine t' + let i = InTempFile + { tempFile = tmp + , endsInNewLine = endnl + } + T.hPutStr h t' + hClose h + return $ OutputBuffer (i : other) + where + !t' = T.concat (mapMaybe getOutput this) <> t + !(this, other) = partition isOutput buf + isOutput v = case v of + Output _ -> True + _ -> False + getOutput v = case v of + Output t'' -> Just t'' + _ -> Nothing +addOutputBuffer v (OutputBuffer buf) = return $ OutputBuffer (v:buf) + +-- | Adds a value to the output buffer for later display. +-- +-- Note that buffering large quantities of data this way will keep it +-- resident in memory until it can be displayed. While `outputConcurrent` +-- uses temp files if the buffer gets too big, this STM function cannot do +-- so. +bufferOutputSTM :: Outputable v => StdHandle -> v -> STM () +bufferOutputSTM h v = bufferOutputSTM' h (OutputBuffer [Output (toOutput v)]) + +bufferOutputSTM' :: StdHandle -> OutputBuffer -> STM () +bufferOutputSTM' h (OutputBuffer newbuf) = do + (OutputBuffer buf) <- takeTMVar bv + putTMVar bv (OutputBuffer (newbuf ++ buf)) + where + bv = bufferFor h + +-- | A STM action that waits for some buffered output to become +-- available, and returns it. +-- +-- The function can select a subset of output when only some is desired; +-- the fst part is returned and the snd is left in the buffer. +-- +-- This will prevent it from being displayed in the usual way, so you'll +-- need to use `emitOutputBuffer` to display it yourself. +outputBufferWaiterSTM :: (OutputBuffer -> (OutputBuffer, OutputBuffer)) -> STM [(StdHandle, OutputBuffer)] +outputBufferWaiterSTM selector = do + bs <- forM hs $ \h -> do + let bv = bufferFor h + (selected, rest) <- selector <$> takeTMVar bv + putTMVar bv rest + return selected + if all (== OutputBuffer []) bs + then retry + else do + return (zip hs bs) + where + hs = [StdOut, StdErr] + +waitAnyBuffer :: OutputBuffer -> (OutputBuffer, OutputBuffer) +waitAnyBuffer b = (b, OutputBuffer []) + +-- | Use with `outputBufferWaiterSTM` to make it only return buffered +-- output that ends with a newline. Anything buffered without a newline +-- is left in the buffer. +waitCompleteLines :: OutputBuffer -> (OutputBuffer, OutputBuffer) +waitCompleteLines (OutputBuffer l) = + let (selected, rest) = span completeline l + in (OutputBuffer selected, OutputBuffer rest) + where + completeline (v@(InTempFile {})) = endsInNewLine v + completeline (Output b) = endsNewLine b + +endsNewLine :: T.Text -> Bool +endsNewLine t = not (T.null t) && T.last t == '\n' + +-- | Emits the content of the OutputBuffer to the Handle +-- +-- If you use this, you should use `lockOutput` to ensure you're the only +-- thread writing to the console. +emitOutputBuffer :: StdHandle -> OutputBuffer -> IO () +emitOutputBuffer stdh (OutputBuffer l) = + forM_ (reverse l) $ \ba -> case ba of + Output t -> emit t + InTempFile tmp _ -> do + emit =<< T.readFile tmp + void $ tryWhenExists $ removeFile tmp + where + outh = toHandle stdh + emit t = void $ tryIO $ do + T.hPutStr outh t + hFlush outh -- cgit v1.2.3