From 592c65d02bf07d053d2fbe8a568f88d1b28e1a65 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Sun, 1 Nov 2015 16:53:25 -0400 Subject: merge from concurrent-output --- src/Propellor/Gpg.hs | 5 +- src/Propellor/Message.hs | 2 +- src/Propellor/PrivData.hs | 5 +- src/Propellor/Property/Chroot.hs | 2 +- src/Propellor/Property/Docker.hs | 2 +- src/System/Console/Concurrent.hs | 39 +++ src/System/Console/Concurrent/Internal.hs | 522 +++++++++++++++++++++++++++++ src/System/Process/Concurrent.hs | 34 ++ src/Utility/ConcurrentOutput.hs | 526 ------------------------------ src/Utility/Process/Shim.hs | 10 +- 10 files changed, 605 insertions(+), 542 deletions(-) create mode 100644 src/System/Console/Concurrent.hs create mode 100644 src/System/Console/Concurrent/Internal.hs create mode 100644 src/System/Process/Concurrent.hs delete mode 100644 src/Utility/ConcurrentOutput.hs (limited to 'src') diff --git a/src/Propellor/Gpg.hs b/src/Propellor/Gpg.hs index 9c58a5d1..960c70d3 100644 --- a/src/Propellor/Gpg.hs +++ b/src/Propellor/Gpg.hs @@ -7,6 +7,8 @@ import System.Directory import Data.Maybe import Data.List.Utils import Control.Monad +import System.Console.Concurrent +import System.Console.Concurrent.Internal (ConcurrentProcessHandle(..)) import Propellor.PrivData.Paths import Propellor.Message @@ -16,7 +18,6 @@ import Utility.Monad import Utility.Misc import Utility.Tmp import Utility.FileSystemEncoding -import Utility.ConcurrentOutput type KeyId = String @@ -129,7 +130,7 @@ gitCommit msg ps = do ps'' <- gpgSignParams ps' if isNothing msg then do - (_, _, _, p) <- createProcessForeground $ + (_, _, _, ConcurrentProcessHandle p) <- createProcessForeground $ proc "git" (toCommand ps'') checkSuccessProcess p else boolSystem "git" ps'' diff --git a/src/Propellor/Message.hs b/src/Propellor/Message.hs index 7df5104a..e964c664 100644 --- a/src/Propellor/Message.hs +++ b/src/Propellor/Message.hs @@ -25,9 +25,9 @@ import Control.Monad.IO.Class (liftIO, MonadIO) import Control.Applicative import System.IO.Unsafe (unsafePerformIO) import Control.Concurrent +import System.Console.Concurrent import Propellor.Types -import Utility.ConcurrentOutput import Utility.PartialPrelude import Utility.Monad import Utility.Exception diff --git a/src/Propellor/PrivData.hs b/src/Propellor/PrivData.hs index 6b77f782..a1e34abc 100644 --- a/src/Propellor/PrivData.hs +++ b/src/Propellor/PrivData.hs @@ -36,6 +36,8 @@ import "mtl" Control.Monad.Reader import qualified Data.Map as M import qualified Data.Set as S import qualified Data.ByteString.Lazy as L +import System.Console.Concurrent +import System.Console.Concurrent.Internal (ConcurrentProcessHandle(..)) import Propellor.Types import Propellor.Types.PrivData @@ -54,7 +56,6 @@ import Utility.FileMode import Utility.Env import Utility.Table import Utility.FileSystemEncoding -import Utility.ConcurrentOutput import Utility.Process -- | Allows a Property to access the value of a specific PrivDataField, @@ -194,7 +195,7 @@ editPrivData field context = do hClose th maybe noop (\p -> writeFileProtected' f (`L.hPut` privDataByteString p)) v editor <- getEnvDefault "EDITOR" "vi" - (_, _, _, p) <- createProcessForeground $ proc editor [f] + (_, _, _, ConcurrentProcessHandle p) <- createProcessForeground $ proc editor [f] unlessM (checkSuccessProcess p) $ error "Editor failed; aborting." PrivData <$> readFile f diff --git a/src/Propellor/Property/Chroot.hs b/src/Propellor/Property/Chroot.hs index 0c00e8f4..8d1a2388 100644 --- a/src/Propellor/Property/Chroot.hs +++ b/src/Propellor/Property/Chroot.hs @@ -27,11 +27,11 @@ import qualified Propellor.Property.Systemd.Core as Systemd import qualified Propellor.Property.File as File import qualified Propellor.Shim as Shim import Propellor.Property.Mount -import Utility.ConcurrentOutput import qualified Data.Map as M import Data.List.Utils import System.Posix.Directory +import System.Console.Concurrent -- | Specification of a chroot. Normally you'll use `debootstrapped` or -- `bootstrapped` to construct a Chroot value. diff --git a/src/Propellor/Property/Docker.hs b/src/Propellor/Property/Docker.hs index f2dbaaf5..0cc8212b 100644 --- a/src/Propellor/Property/Docker.hs +++ b/src/Propellor/Property/Docker.hs @@ -56,7 +56,6 @@ import qualified Propellor.Property.Cmd as Cmd import qualified Propellor.Shim as Shim import Utility.Path import Utility.ThreadScheduler -import Utility.ConcurrentOutput import Control.Concurrent.Async hiding (link) import System.Posix.Directory @@ -65,6 +64,7 @@ import Prelude hiding (init) import Data.List hiding (init) import Data.List.Utils import qualified Data.Map as M +import System.Console.Concurrent installed :: Property NoInfo installed = Apt.installed ["docker.io"] diff --git a/src/System/Console/Concurrent.hs b/src/System/Console/Concurrent.hs new file mode 100644 index 00000000..efbfaa15 --- /dev/null +++ b/src/System/Console/Concurrent.hs @@ -0,0 +1,39 @@ +-- | +-- Copyright: 2015 Joey Hess +-- License: BSD-2-clause +-- +-- Concurrent output handling. +-- +-- > import Control.Concurrent.Async +-- > import System.Console.Concurrent +-- > +-- > main = withConcurrentOutput $ +-- > outputConcurrent "washed the car\n" +-- > `concurrently` +-- > outputConcurrent "walked the dog\n" +-- > `concurrently` +-- > createProcessConcurrent (proc "ls" []) + +module System.Console.Concurrent ( + -- * Concurrent output + withConcurrentOutput, + Outputable(..), + outputConcurrent, + ConcurrentProcessHandle, + createProcessConcurrent, + waitForProcessConcurrent, + createProcessForeground, + flushConcurrentOutput, + lockOutput, + -- * Low level access to the output buffer + OutputBuffer, + StdHandle(..), + bufferOutputSTM, + outputBufferWaiterSTM, + waitAnyBuffer, + waitCompleteLines, + emitOutputBuffer, +) where + +import System.Console.Concurrent.Internal + diff --git a/src/System/Console/Concurrent/Internal.hs b/src/System/Console/Concurrent/Internal.hs new file mode 100644 index 00000000..caef9833 --- /dev/null +++ b/src/System/Console/Concurrent/Internal.hs @@ -0,0 +1,522 @@ +{-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances, TupleSections #-} + +-- | +-- Copyright: 2015 Joey Hess +-- License: BSD-2-clause +-- +-- Concurrent output handling, internals. +-- +-- May change at any time. + +module System.Console.Concurrent.Internal where + +import System.IO +import System.Posix.IO +import System.Directory +import System.Exit +import Control.Monad +import Control.Monad.IO.Class (liftIO, MonadIO) +import Control.Applicative +import System.IO.Unsafe (unsafePerformIO) +import Control.Concurrent +import Control.Concurrent.STM +import Control.Concurrent.Async +import Data.Maybe +import Data.List +import Data.Monoid +import qualified System.Process as P +import qualified Data.Text as T +import qualified Data.Text.IO as T + +import Utility.Monad +import Utility.Exception + +data OutputHandle = OutputHandle + { outputLock :: TMVar Lock + , outputBuffer :: TMVar OutputBuffer + , errorBuffer :: TMVar OutputBuffer + , outputThreads :: TMVar Integer + , processWaiters :: TMVar [Async ()] + , waitForProcessLock :: TMVar () + } + +data Lock = Locked + +-- | A shared global variable for the OutputHandle. +{-# NOINLINE globalOutputHandle #-} +globalOutputHandle :: OutputHandle +globalOutputHandle = unsafePerformIO $ OutputHandle + <$> newEmptyTMVarIO + <*> newTMVarIO (OutputBuffer []) + <*> newTMVarIO (OutputBuffer []) + <*> newTMVarIO 0 + <*> newTMVarIO [] + <*> newEmptyTMVarIO + +-- | Holds a lock while performing an action. This allows the action to +-- perform its own output to the console, without using functions from this +-- module. +-- +-- While this is running, other threads that try to lockOutput will block. +-- Any calls to `outputConcurrent` and `createProcessConcurrent` will not +-- block, but the output will be buffered and displayed only once the +-- action is done. +lockOutput :: (MonadIO m, MonadMask m) => m a -> m a +lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock) + +-- | Blocks until we have the output lock. +takeOutputLock :: IO () +takeOutputLock = void $ takeOutputLock' True + +-- | Tries to take the output lock, without blocking. +tryTakeOutputLock :: IO Bool +tryTakeOutputLock = takeOutputLock' False + +withLock :: (TMVar Lock -> STM a) -> IO a +withLock a = atomically $ a (outputLock globalOutputHandle) + +takeOutputLock' :: Bool -> IO Bool +takeOutputLock' block = do + locked <- withLock $ \l -> do + v <- tryTakeTMVar l + case v of + Just Locked + | block -> retry + | otherwise -> do + -- Restore value we took. + putTMVar l Locked + return False + Nothing -> do + putTMVar l Locked + return True + when locked $ do + (outbuf, errbuf) <- atomically $ (,) + <$> swapTMVar (outputBuffer globalOutputHandle) (OutputBuffer []) + <*> swapTMVar (errorBuffer globalOutputHandle) (OutputBuffer []) + emitOutputBuffer StdOut outbuf + emitOutputBuffer StdErr errbuf + return locked + +-- | Only safe to call after taking the output lock. +dropOutputLock :: IO () +dropOutputLock = withLock $ void . takeTMVar + +-- | Use this around any actions that use `outputConcurrent` +-- or `createProcessConcurrent` +-- +-- This is necessary to ensure that buffered concurrent output actually +-- gets displayed before the program exits. +withConcurrentOutput :: (MonadIO m, MonadMask m) => m a -> m a +withConcurrentOutput a = a `finally` liftIO flushConcurrentOutput + +-- | Blocks until any processes started by `createProcessConcurrent` have +-- finished, and any buffered output is displayed. +-- +-- `withConcurrentOutput` calls this at the end; you can call it anytime +-- you want to flush output. +flushConcurrentOutput :: IO () +flushConcurrentOutput = do + -- Wait for all outputThreads to finish. + let v = outputThreads globalOutputHandle + atomically $ do + r <- takeTMVar v + if r <= 0 + then putTMVar v r + else retry + -- Take output lock to ensure that nothing else is currently + -- generating output, and flush any buffered output. + lockOutput $ return () + +-- | Values that can be output. +class Outputable v where + toOutput :: v -> T.Text + +instance Outputable T.Text where + toOutput = id + +instance Outputable String where + toOutput = toOutput . T.pack + +-- | Displays a value to stdout. +-- +-- No newline is appended to the value, so if you want a newline, be sure +-- to include it yourself. +-- +-- Uses locking to ensure that the whole output occurs atomically +-- even when other threads are concurrently generating output. +-- +-- When something else is writing to the console at the same time, this does +-- not block. It buffers the value, so it will be displayed once the other +-- writer is done. +outputConcurrent :: Outputable v => v -> IO () +outputConcurrent v = bracket setup cleanup go + where + setup = tryTakeOutputLock + cleanup False = return () + cleanup True = dropOutputLock + go True = do + T.hPutStr stdout (toOutput v) + hFlush stdout + go False = do + let bv = outputBuffer globalOutputHandle + oldbuf <- atomically $ takeTMVar bv + newbuf <- addOutputBuffer (Output (toOutput v)) oldbuf + atomically $ putTMVar bv newbuf + +newtype ConcurrentProcessHandle = ConcurrentProcessHandle P.ProcessHandle + +toConcurrentProcessHandle :: (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -> (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) +toConcurrentProcessHandle (i, o, e, h) = (i, o, e, ConcurrentProcessHandle h) + +-- | Use this to wait for processes started with +-- `createProcessConcurrent` and `createProcessForeground`, and get their +-- exit status. +-- +-- Note that such processes are actually automatically waited for +-- internally, so not calling this exiplictly will not result +-- in zombie processes. This behavior differs from `P.waitForProcess` +waitForProcessConcurrent :: ConcurrentProcessHandle -> IO ExitCode +waitForProcessConcurrent (ConcurrentProcessHandle h) = checkexit + where + checkexit = maybe waitsome return =<< P.getProcessExitCode h + waitsome = maybe checkexit return =<< bracket lock unlock go + lck = waitForProcessLock globalOutputHandle + lock = atomically $ tryPutTMVar lck () + unlock True = atomically $ takeTMVar lck + unlock False = return () + go True = do + let v = processWaiters globalOutputHandle + l <- atomically $ readTMVar v + if null l + -- Avoid waitAny [] which blocks forever; + then Just <$> P.waitForProcess h + else do + -- Wait for any of the running + -- processes to exit. It may or may not + -- be the one corresponding to the + -- ProcessHandle. If it is, + -- getProcessExitCode will succeed. + void $ tryIO $ waitAny l + hFlush stdout + return Nothing + go False = do + -- Another thread took the lck first. Wait for that thread to + -- wait for one of the running processes to exit. + atomically $ do + putTMVar lck () + takeTMVar lck + return Nothing + +-- Registers an action that waits for a process to exit, +-- adding it to the processWaiters list, and removing it once the action +-- completes. +asyncProcessWaiter :: IO () -> IO () +asyncProcessWaiter waitaction = do + regdone <- newEmptyTMVarIO + waiter <- async $ do + self <- atomically (takeTMVar regdone) + waitaction `finally` unregister self + register waiter regdone + where + v = processWaiters globalOutputHandle + register waiter regdone = atomically $ do + l <- takeTMVar v + putTMVar v (waiter:l) + putTMVar regdone waiter + unregister waiter = atomically $ do + l <- takeTMVar v + putTMVar v (filter (/= waiter) l) + +-- | Wrapper around `System.Process.createProcess` that prevents +-- multiple processes that are running concurrently from writing +-- to stdout/stderr at the same time. +-- +-- If the process does not output to stdout or stderr, it's run +-- by createProcess entirely as usual. Only processes that can generate +-- output are handled specially: +-- +-- A process is allowed to write to stdout and stderr in the usual +-- way, assuming it can successfully take the output lock. +-- +-- When the output lock is held (ie, by another concurrent process, +-- or because `outputConcurrent` is being called at the same time), +-- the process is instead run with its stdout and stderr +-- redirected to a buffer. The buffered output will be displayed as soon +-- as the output lock becomes free. +createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) +createProcessConcurrent p + | willOutput (P.std_out p) || willOutput (P.std_err p) = + ifM tryTakeOutputLock + ( fgProcess p + , bgProcess p + ) + | otherwise = do + r@(_, _, _, h) <- P.createProcess p + asyncProcessWaiter $ do + void $ P.waitForProcess h + return (toConcurrentProcessHandle r) + +-- | Wrapper around `System.Process.createProcess` that makes sure a process +-- is run in the foreground, with direct access to stdout and stderr. +-- Useful when eg, running an interactive process. +createProcessForeground :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) +createProcessForeground p = do + takeOutputLock + fgProcess p + +fgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) +fgProcess p = do + r@(_, _, _, h) <- P.createProcess p + `onException` dropOutputLock + -- Wait for the process to exit and drop the lock. + asyncProcessWaiter $ do + void $ P.waitForProcess h + dropOutputLock + return (toConcurrentProcessHandle r) + +bgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) +bgProcess p = do + (toouth, fromouth) <- pipe + (toerrh, fromerrh) <- pipe + let p' = p + { P.std_out = rediroutput (P.std_out p) toouth + , P.std_err = rediroutput (P.std_err p) toerrh + } + registerOutputThread + r@(_, _, _, h) <- P.createProcess p' + `onException` unregisterOutputThread + asyncProcessWaiter $ void $ P.waitForProcess h + outbuf <- setupOutputBuffer StdOut toouth (P.std_out p) fromouth + errbuf <- setupOutputBuffer StdErr toerrh (P.std_err p) fromerrh + void $ async $ bufferWriter [outbuf, errbuf] + return (toConcurrentProcessHandle r) + where + pipe = do + (from, to) <- createPipe + (,) <$> fdToHandle to <*> fdToHandle from + rediroutput ss h + | willOutput ss = P.UseHandle h + | otherwise = ss + +willOutput :: P.StdStream -> Bool +willOutput P.Inherit = True +willOutput _ = False + +-- | Buffered output. +data OutputBuffer = OutputBuffer [OutputBufferedActivity] + deriving (Eq) + +data StdHandle = StdOut | StdErr + +toHandle :: StdHandle -> Handle +toHandle StdOut = stdout +toHandle StdErr = stderr + +bufferFor :: StdHandle -> TMVar OutputBuffer +bufferFor StdOut = outputBuffer globalOutputHandle +bufferFor StdErr = errorBuffer globalOutputHandle + +data OutputBufferedActivity + = Output T.Text + | InTempFile + { tempFile :: FilePath + , endsInNewLine :: Bool + } + deriving (Eq) + +data AtEnd = AtEnd + deriving Eq + +data BufSig = BufSig + +setupOutputBuffer :: StdHandle -> Handle -> P.StdStream -> Handle -> IO (StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd) +setupOutputBuffer h toh ss fromh = do + hClose toh + buf <- newMVar (OutputBuffer []) + bufsig <- atomically newEmptyTMVar + bufend <- atomically newEmptyTMVar + void $ async $ outputDrainer ss fromh buf bufsig bufend + return (h, buf, bufsig, bufend) + +-- Drain output from the handle, and buffer it. +outputDrainer :: P.StdStream -> Handle -> MVar OutputBuffer -> TMVar BufSig -> TMVar AtEnd -> IO () +outputDrainer ss fromh buf bufsig bufend + | willOutput ss = go + | otherwise = atend + where + go = do + t <- T.hGetChunk fromh + if T.null t + then atend + else do + modifyMVar_ buf $ addOutputBuffer (Output t) + changed + go + atend = do + atomically $ putTMVar bufend AtEnd + hClose fromh + changed = atomically $ do + void $ tryTakeTMVar bufsig + putTMVar bufsig BufSig + +registerOutputThread :: IO () +registerOutputThread = do + let v = outputThreads globalOutputHandle + atomically $ putTMVar v . succ =<< takeTMVar v + +unregisterOutputThread :: IO () +unregisterOutputThread = do + let v = outputThreads globalOutputHandle + atomically $ putTMVar v . pred =<< takeTMVar v + +-- Wait to lock output, and once we can, display everything +-- that's put into the buffers, until the end. +-- +-- If end is reached before lock is taken, instead add the command's +-- buffers to the global outputBuffer and errorBuffer. +bufferWriter :: [(StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd)] -> IO () +bufferWriter ts = do + activitysig <- atomically newEmptyTMVar + worker1 <- async $ lockOutput $ + ifM (atomically $ tryPutTMVar activitysig ()) + ( void $ mapConcurrently displaybuf ts + , noop -- buffers already moved to global + ) + worker2 <- async $ void $ globalbuf activitysig + void $ async $ do + void $ waitCatch worker1 + void $ waitCatch worker2 + unregisterOutputThread + where + displaybuf v@(outh, buf, bufsig, bufend) = do + change <- atomically $ + (Right <$> takeTMVar bufsig) + `orElse` + (Left <$> takeTMVar bufend) + l <- takeMVar buf + putMVar buf (OutputBuffer []) + emitOutputBuffer outh l + case change of + Right BufSig -> displaybuf v + Left AtEnd -> return () + globalbuf activitysig = do + ok <- atomically $ do + -- signal we're going to handle it + -- (returns false if the displaybuf already did) + ok <- tryPutTMVar activitysig () + -- wait for end of all buffers + when ok $ + mapM_ (\(_outh, _buf, _bufsig, bufend) -> takeTMVar bufend) ts + return ok + when ok $ do + -- add all of the command's buffered output to the + -- global output buffer, atomically + bs <- forM ts $ \(outh, buf, _bufsig, _bufend) -> + (outh,) <$> takeMVar buf + atomically $ + forM_ bs $ \(outh, b) -> + bufferOutputSTM' outh b + +-- Adds a value to the OutputBuffer. When adding Output to a Handle, +-- it's cheaper to combine it with any already buffered Output to that +-- same Handle. +-- +-- When the total buffered Output exceeds 1 mb in size, it's moved out of +-- memory, to a temp file. This should only happen rarely, but is done to +-- avoid some verbose process unexpectedly causing excessive memory use. +addOutputBuffer :: OutputBufferedActivity -> OutputBuffer -> IO OutputBuffer +addOutputBuffer (Output t) (OutputBuffer buf) + | T.length t' <= 1048576 = return $ OutputBuffer (Output t' : other) + | otherwise = do + tmpdir <- getTemporaryDirectory + (tmp, h) <- openTempFile tmpdir "output.tmp" + let !endnl = endsNewLine t' + let i = InTempFile + { tempFile = tmp + , endsInNewLine = endnl + } + T.hPutStr h t' + hClose h + return $ OutputBuffer (i : other) + where + !t' = T.concat (mapMaybe getOutput this) <> t + !(this, other) = partition isOutput buf + isOutput v = case v of + Output _ -> True + _ -> False + getOutput v = case v of + Output t'' -> Just t'' + _ -> Nothing +addOutputBuffer v (OutputBuffer buf) = return $ OutputBuffer (v:buf) + +-- | Adds a value to the output buffer for later display. +-- +-- Note that buffering large quantities of data this way will keep it +-- resident in memory until it can be displayed. While `outputConcurrent` +-- uses temp files if the buffer gets too big, this STM function cannot do +-- so. +bufferOutputSTM :: Outputable v => StdHandle -> v -> STM () +bufferOutputSTM h v = bufferOutputSTM' h (OutputBuffer [Output (toOutput v)]) + +bufferOutputSTM' :: StdHandle -> OutputBuffer -> STM () +bufferOutputSTM' h (OutputBuffer newbuf) = do + (OutputBuffer buf) <- takeTMVar bv + putTMVar bv (OutputBuffer (newbuf ++ buf)) + where + bv = bufferFor h + +-- | A STM action that waits for some buffered output to become +-- available, and returns it. +-- +-- The function can select a subset of output when only some is desired; +-- the fst part is returned and the snd is left in the buffer. +-- +-- This will prevent it from being displayed in the usual way, so you'll +-- need to use `emitOutputBuffer` to display it yourself. +outputBufferWaiterSTM :: (OutputBuffer -> (OutputBuffer, OutputBuffer)) -> STM [(StdHandle, OutputBuffer)] +outputBufferWaiterSTM selector = do + bs <- forM hs $ \h -> do + let bv = bufferFor h + (selected, rest) <- selector <$> takeTMVar bv + putTMVar bv rest + return selected + if all (== OutputBuffer []) bs + then retry + else do + return (zip hs bs) + where + hs = [StdOut, StdErr] + +waitAnyBuffer :: OutputBuffer -> (OutputBuffer, OutputBuffer) +waitAnyBuffer b = (b, OutputBuffer []) + +-- | Use with `outputBufferWaiterSTM` to make it only return buffered +-- output that ends with a newline. Anything buffered without a newline +-- is left in the buffer. +waitCompleteLines :: OutputBuffer -> (OutputBuffer, OutputBuffer) +waitCompleteLines (OutputBuffer l) = + let (selected, rest) = span completeline l + in (OutputBuffer selected, OutputBuffer rest) + where + completeline (v@(InTempFile {})) = endsInNewLine v + completeline (Output b) = endsNewLine b + +endsNewLine :: T.Text -> Bool +endsNewLine t = not (T.null t) && T.last t == '\n' + +-- | Emits the content of the OutputBuffer to the Handle +-- +-- If you use this, you should use `lockOutput` to ensure you're the only +-- thread writing to the console. +emitOutputBuffer :: StdHandle -> OutputBuffer -> IO () +emitOutputBuffer stdh (OutputBuffer l) = + forM_ (reverse l) $ \ba -> case ba of + Output t -> emit t + InTempFile tmp _ -> do + emit =<< T.readFile tmp + void $ tryWhenExists $ removeFile tmp + where + outh = toHandle stdh + emit t = void $ tryIO $ do + T.hPutStr outh t + hFlush outh diff --git a/src/System/Process/Concurrent.hs b/src/System/Process/Concurrent.hs new file mode 100644 index 00000000..0e00e4fd --- /dev/null +++ b/src/System/Process/Concurrent.hs @@ -0,0 +1,34 @@ +-- | +-- Copyright: 2015 Joey Hess +-- License: BSD-2-clause +-- +-- The functions exported by this module are intended to be drop-in +-- replacements for those from System.Process, when converting a whole +-- program to use System.Console.Concurrent. + +module System.Process.Concurrent where + +import System.Console.Concurrent +import System.Console.Concurrent.Internal (ConcurrentProcessHandle(..)) +import System.Process hiding (createProcess, waitForProcess) +import System.IO +import System.Exit + +-- | Calls `createProcessConcurrent` +-- +-- You should use the waitForProcess in this module on the resulting +-- ProcessHandle. Using System.Process.waitForProcess instead can have +-- mildly unexpected results. +createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) +createProcess p = do + (i, o, e, ConcurrentProcessHandle h) <- createProcessConcurrent p + return (i, o, e, h) + +-- | Calls `waitForProcessConcurrent` +-- +-- You should only use this on a ProcessHandle obtained by calling +-- createProcess from this module. Using this with a ProcessHandle +-- obtained from System.Process.createProcess etc will have extremely +-- unexpected results; it can wait a very long time before returning. +waitForProcess :: ProcessHandle -> IO ExitCode +waitForProcess = waitForProcessConcurrent . ConcurrentProcessHandle diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs deleted file mode 100644 index ca1ae7c5..00000000 --- a/src/Utility/ConcurrentOutput.hs +++ /dev/null @@ -1,526 +0,0 @@ -{-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances, TupleSections #-} - --- | --- Copyright: 2013 Joey Hess --- License: BSD-2-clause --- --- Concurrent output handling. --- --- > import Control.Concurrent.Async --- > import System.Console.Concurrent --- > --- > main = withConcurrentOutput $ --- > outputConcurrent "washed the car\n" --- > `concurrently` --- > outputConcurrent "walked the dog\n" --- > `concurrently` --- > createProcessConcurrent (proc "ls" []) - -module Utility.ConcurrentOutput ( - -- * Concurrent output - withConcurrentOutput, - Outputable(..), - outputConcurrent, - createProcessConcurrent, - waitForProcessConcurrent, - createProcessForeground, - flushConcurrentOutput, - lockOutput, - -- * Low level access to the output buffer - OutputBuffer, - StdHandle(..), - bufferOutputSTM, - outputBufferWaiterSTM, - waitAnyBuffer, - waitCompleteLines, - emitOutputBuffer, -) where - -import System.IO -import System.Posix.IO -import System.Directory -import System.Exit -import Control.Monad -import Control.Monad.IO.Class (liftIO, MonadIO) -import Control.Applicative -import System.IO.Unsafe (unsafePerformIO) -import Control.Concurrent -import Control.Concurrent.STM -import Control.Concurrent.Async -import Data.Maybe -import Data.List -import Data.Monoid -import qualified System.Process as P -import qualified Data.Text as T -import qualified Data.Text.IO as T - -import Utility.Monad -import Utility.Exception - -data OutputHandle = OutputHandle - { outputLock :: TMVar Lock - , outputBuffer :: TMVar OutputBuffer - , errorBuffer :: TMVar OutputBuffer - , outputThreads :: TMVar Integer - } - -data Lock = Locked - --- | A shared global variable for the OutputHandle. -{-# NOINLINE globalOutputHandle #-} -globalOutputHandle :: OutputHandle -globalOutputHandle = unsafePerformIO $ OutputHandle - <$> newEmptyTMVarIO - <*> newTMVarIO (OutputBuffer []) - <*> newTMVarIO (OutputBuffer []) - <*> newTMVarIO 0 - --- | Holds a lock while performing an action. This allows the action to --- perform its own output to the console, without using functions from this --- module. --- --- While this is running, other threads that try to lockOutput will block. --- Any calls to `outputConcurrent` and `createProcessConcurrent` will not --- block, but the output will be buffered and displayed only once the --- action is done. -lockOutput :: (MonadIO m, MonadMask m) => m a -> m a -lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock) - --- | Blocks until we have the output lock. -takeOutputLock :: IO () -takeOutputLock = void $ takeOutputLock' True - --- | Tries to take the output lock, without blocking. -tryTakeOutputLock :: IO Bool -tryTakeOutputLock = takeOutputLock' False - -withLock :: (TMVar Lock -> STM a) -> IO a -withLock a = atomically $ a (outputLock globalOutputHandle) - -takeOutputLock' :: Bool -> IO Bool -takeOutputLock' block = do - locked <- withLock $ \l -> do - v <- tryTakeTMVar l - case v of - Just Locked - | block -> retry - | otherwise -> do - -- Restore value we took. - putTMVar l Locked - return False - Nothing -> do - putTMVar l Locked - return True - when locked $ do - (outbuf, errbuf) <- atomically $ (,) - <$> swapTMVar (outputBuffer globalOutputHandle) (OutputBuffer []) - <*> swapTMVar (errorBuffer globalOutputHandle) (OutputBuffer []) - emitOutputBuffer StdOut outbuf - emitOutputBuffer StdErr errbuf - return locked - --- | Only safe to call after taking the output lock. -dropOutputLock :: IO () -dropOutputLock = withLock $ void . takeTMVar - --- | Use this around any actions that use `outputConcurrent` --- or `createProcessConcurrent` --- --- This is necessary to ensure that buffered concurrent output actually --- gets displayed before the program exits. -withConcurrentOutput :: (MonadIO m, MonadMask m) => m a -> m a -withConcurrentOutput a = a `finally` liftIO flushConcurrentOutput - --- | Blocks until any processes started by `createProcessConcurrent` have --- finished, and any buffered output is displayed. --- --- `withConcurrentOutput` calls this at the end; you can call it anytime --- you want to flush output. -flushConcurrentOutput :: IO () -flushConcurrentOutput = do - -- Wait for all outputThreads to finish. - let v = outputThreads globalOutputHandle - atomically $ do - r <- takeTMVar v - if r <= 0 - then putTMVar v r - else retry - -- Take output lock to ensure that nothing else is currently - -- generating output, and flush any buffered output. - lockOutput $ return () - --- | Values that can be output. -class Outputable v where - toOutput :: v -> T.Text - -instance Outputable T.Text where - toOutput = id - -instance Outputable String where - toOutput = toOutput . T.pack - --- | Displays a value to stdout. --- --- No newline is appended to the value, so if you want a newline, be sure --- to include it yourself. --- --- Uses locking to ensure that the whole output occurs atomically --- even when other threads are concurrently generating output. --- --- When something else is writing to the console at the same time, this does --- not block. It buffers the value, so it will be displayed once the other --- writer is done. -outputConcurrent :: Outputable v => v -> IO () -outputConcurrent v = bracket setup cleanup go - where - setup = tryTakeOutputLock - cleanup False = return () - cleanup True = dropOutputLock - go True = do - T.hPutStr stdout (toOutput v) - hFlush stdout - go False = do - let bv = outputBuffer globalOutputHandle - oldbuf <- atomically $ takeTMVar bv - newbuf <- addOutputBuffer (Output (toOutput v)) oldbuf - atomically $ putTMVar bv newbuf - --- | This must be used to wait for processes started with --- `createProcessConcurrent` and `createProcessForeground`. It may also be --- used to wait for processes started by `System.Process.createProcess`. --- --- This is necessary because `System.Process.waitForProcess` has a --- race condition when two threads check the same process. If the race --- is triggered, one thread will successfully wait, but the other --- throws a DoesNotExist exception. -waitForProcessConcurrent :: P.ProcessHandle -> IO ExitCode -waitForProcessConcurrent h = do - v <- tryWhenExists (P.waitForProcess h) - case v of - Just r -> return r - Nothing -> maybe (waitForProcessConcurrent h) return =<< P.getProcessExitCode h - --- | Wrapper around `System.Process.createProcess` that prevents --- multiple processes that are running concurrently from writing --- to stdout/stderr at the same time. --- --- If the process does not output to stdout or stderr, it's run --- by createProcess entirely as usual. Only processes that can generate --- output are handled specially: --- --- A process is allowed to write to stdout and stderr in the usual --- way, assuming it can successfully take the output lock. --- --- When the output lock is held (ie, by another concurrent process, --- or because `outputConcurrent` is being called at the same time), --- the process is instead run with its stdout and stderr --- redirected to a buffer. The buffered output will be displayed as soon --- as the output lock becomes free, or after the command has finished. -createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -createProcessConcurrent p - | willOutput (P.std_out p) || willOutput (P.std_err p) = - ifM tryTakeOutputLock - ( fgProcess p - , bgProcess p - ) - | otherwise = P.createProcess p - --- | Wrapper around `System.Process.createProcess` that makes sure a process --- is run in the foreground, with direct access to stdout and stderr. --- Useful when eg, running an interactive process. --- --- If another process is already running in the foreground, this will block --- until it finishes. Background processes may continue to run while --- this process is running, and their output will be buffered until it --- exits. --- --- The obvious reason you might need to use this is in an example like this: --- --- > main = withConcurrentOutput $ --- > createProcessConcurrent (proc "ls" []) --- > `concurrently` createProcessForeground (proc "vim" []) --- --- Since vim is an interactive program, it needs to run in the foreground. --- If it were started by `createProcessConcurrent`, it would sometimes --- run in the background. --- --- Also, there is actually a race condition when calling --- `createProcessConcurrent` sequentially like this: --- --- > main = withConcurrentOutput $ do --- > (Nothing, Nothing, Nothing, h) <- createProcessConcurrent (proc "ls" []) --- > waitForProcessConcurrent h --- > createProcessConcurrent (proc "vim" []) --- --- Here vim runs about 50% of the time as a background process! Why is --- it not always run in the foreground? The reason is that the previous --- process was run in the foreground, and still holds the output lock. --- `waitForProcessConcurrent` waits for that process, but does not clear --- the output lock immediately. By the time the output lock does clear, --- the vim process may have already started up, in the background. --- --- It would be nice to fix that race, but it can't be fixed without --- an Eq instance for `ProcessHandle`. In any case, when you're using --- this module, you're typically actually doing concurrent things, --- not sequential as in the example above, and so even if the race were --- fixed, you'd still want to use `createProcessForeground` to run vim. -createProcessForeground :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -createProcessForeground p = do - takeOutputLock - fgProcess p - -fgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -fgProcess p = do - r@(_, _, _, h) <- P.createProcess p - `onException` dropOutputLock - -- Wait for the process to exit and drop the lock. - void $ async $ do - void $ tryIO $ waitForProcessConcurrent h - dropOutputLock - return r - -bgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -bgProcess p = do - (toouth, fromouth) <- pipe - (toerrh, fromerrh) <- pipe - let p' = p - { P.std_out = rediroutput (P.std_out p) toouth - , P.std_err = rediroutput (P.std_err p) toerrh - } - registerOutputThread - r <- P.createProcess p' - `onException` unregisterOutputThread - outbuf <- setupOutputBuffer StdOut toouth (P.std_out p) fromouth - errbuf <- setupOutputBuffer StdErr toerrh (P.std_err p) fromerrh - void $ async $ bufferWriter [outbuf, errbuf] - return r - where - pipe = do - (from, to) <- createPipe - (,) <$> fdToHandle to <*> fdToHandle from - rediroutput ss h - | willOutput ss = P.UseHandle h - | otherwise = ss - -willOutput :: P.StdStream -> Bool -willOutput P.Inherit = True -willOutput _ = False - --- | Buffered output. -data OutputBuffer = OutputBuffer [OutputBufferedActivity] - deriving (Eq) - -data StdHandle = StdOut | StdErr - -toHandle :: StdHandle -> Handle -toHandle StdOut = stdout -toHandle StdErr = stderr - -bufferFor :: StdHandle -> TMVar OutputBuffer -bufferFor StdOut = outputBuffer globalOutputHandle -bufferFor StdErr = errorBuffer globalOutputHandle - -data OutputBufferedActivity - = Output T.Text - | InTempFile - { tempFile :: FilePath - , endsInNewLine :: Bool - } - deriving (Eq) - -data AtEnd = AtEnd - deriving Eq - -data BufSig = BufSig - -setupOutputBuffer :: StdHandle -> Handle -> P.StdStream -> Handle -> IO (StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd) -setupOutputBuffer h toh ss fromh = do - hClose toh - buf <- newMVar (OutputBuffer []) - bufsig <- atomically newEmptyTMVar - bufend <- atomically newEmptyTMVar - void $ async $ outputDrainer ss fromh buf bufsig bufend - return (h, buf, bufsig, bufend) - --- Drain output from the handle, and buffer it. -outputDrainer :: P.StdStream -> Handle -> MVar OutputBuffer -> TMVar BufSig -> TMVar AtEnd -> IO () -outputDrainer ss fromh buf bufsig bufend - | willOutput ss = go - | otherwise = atend - where - go = do - t <- T.hGetChunk fromh - if T.null t - then atend - else do - modifyMVar_ buf $ addOutputBuffer (Output t) - changed - go - atend = do - atomically $ putTMVar bufend AtEnd - hClose fromh - changed = atomically $ do - void $ tryTakeTMVar bufsig - putTMVar bufsig BufSig - -registerOutputThread :: IO () -registerOutputThread = do - let v = outputThreads globalOutputHandle - atomically $ putTMVar v . succ =<< takeTMVar v - -unregisterOutputThread :: IO () -unregisterOutputThread = do - let v = outputThreads globalOutputHandle - atomically $ putTMVar v . pred =<< takeTMVar v - --- Wait to lock output, and once we can, display everything --- that's put into the buffers, until the end. --- --- If end is reached before lock is taken, instead add the command's --- buffers to the global outputBuffer and errorBuffer. -bufferWriter :: [(StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd)] -> IO () -bufferWriter ts = do - activitysig <- atomically newEmptyTMVar - worker1 <- async $ lockOutput $ - ifM (atomically $ tryPutTMVar activitysig ()) - ( void $ mapConcurrently displaybuf ts - , noop -- buffers already moved to global - ) - worker2 <- async $ void $ globalbuf activitysig - void $ async $ do - void $ waitCatch worker1 - void $ waitCatch worker2 - unregisterOutputThread - where - displaybuf v@(outh, buf, bufsig, bufend) = do - change <- atomically $ - (Right <$> takeTMVar bufsig) - `orElse` - (Left <$> takeTMVar bufend) - l <- takeMVar buf - putMVar buf (OutputBuffer []) - emitOutputBuffer outh l - case change of - Right BufSig -> displaybuf v - Left AtEnd -> return () - globalbuf activitysig = do - ok <- atomically $ do - -- signal we're going to handle it - -- (returns false if the displaybuf already did) - ok <- tryPutTMVar activitysig () - -- wait for end of all buffers - when ok $ - mapM_ (\(_outh, _buf, _bufsig, bufend) -> takeTMVar bufend) ts - return ok - when ok $ do - -- add all of the command's buffered output to the - -- global output buffer, atomically - bs <- forM ts $ \(outh, buf, _bufsig, _bufend) -> - (outh,) <$> takeMVar buf - atomically $ - forM_ bs $ \(outh, b) -> - bufferOutputSTM' outh b - --- Adds a value to the OutputBuffer. When adding Output to a Handle, --- it's cheaper to combine it with any already buffered Output to that --- same Handle. --- --- When the total buffered Output exceeds 1 mb in size, it's moved out of --- memory, to a temp file. This should only happen rarely, but is done to --- avoid some verbose process unexpectedly causing excessive memory use. -addOutputBuffer :: OutputBufferedActivity -> OutputBuffer -> IO OutputBuffer -addOutputBuffer (Output t) (OutputBuffer buf) - | T.length t' <= 1048576 = return $ OutputBuffer (Output t' : other) - | otherwise = do - tmpdir <- getTemporaryDirectory - (tmp, h) <- openTempFile tmpdir "output.tmp" - let !endnl = endsNewLine t' - let i = InTempFile - { tempFile = tmp - , endsInNewLine = endnl - } - T.hPutStr h t' - hClose h - return $ OutputBuffer (i : other) - where - !t' = T.concat (mapMaybe getOutput this) <> t - !(this, other) = partition isOutput buf - isOutput v = case v of - Output _ -> True - _ -> False - getOutput v = case v of - Output t'' -> Just t'' - _ -> Nothing -addOutputBuffer v (OutputBuffer buf) = return $ OutputBuffer (v:buf) - --- | Adds a value to the output buffer for later display. --- --- Note that buffering large quantities of data this way will keep it --- resident in memory until it can be displayed. While `outputConcurrent` --- uses temp files if the buffer gets too big, this STM function cannot do --- so. -bufferOutputSTM :: Outputable v => StdHandle -> v -> STM () -bufferOutputSTM h v = bufferOutputSTM' h (OutputBuffer [Output (toOutput v)]) - -bufferOutputSTM' :: StdHandle -> OutputBuffer -> STM () -bufferOutputSTM' h (OutputBuffer newbuf) = do - (OutputBuffer buf) <- takeTMVar bv - putTMVar bv (OutputBuffer (newbuf ++ buf)) - where - bv = bufferFor h - --- | A STM action that waits for some buffered output to become --- available, and returns it. --- --- The function can select a subset of output when only some is desired; --- the fst part is returned and the snd is left in the buffer. --- --- This will prevent it from being displayed in the usual way, so you'll --- need to use `emitOutputBuffer` to display it yourself. -outputBufferWaiterSTM :: (OutputBuffer -> (OutputBuffer, OutputBuffer)) -> STM [(StdHandle, OutputBuffer)] -outputBufferWaiterSTM selector = do - bs <- forM hs $ \h -> do - let bv = bufferFor h - (selected, rest) <- selector <$> takeTMVar bv - putTMVar bv rest - return selected - if all (== OutputBuffer []) bs - then retry - else do - return (zip hs bs) - where - hs = [StdOut, StdErr] - -waitAnyBuffer :: OutputBuffer -> (OutputBuffer, OutputBuffer) -waitAnyBuffer b = (b, OutputBuffer []) - --- | Use with `outputBufferWaiterSTM` to make it only return buffered --- output that ends with a newline. Anything buffered without a newline --- is left in the buffer. -waitCompleteLines :: OutputBuffer -> (OutputBuffer, OutputBuffer) -waitCompleteLines (OutputBuffer l) = - let (selected, rest) = span completeline l - in (OutputBuffer selected, OutputBuffer rest) - where - completeline (v@(InTempFile {})) = endsInNewLine v - completeline (Output b) = endsNewLine b - -endsNewLine :: T.Text -> Bool -endsNewLine t = not (T.null t) && T.last t == '\n' - --- | Emits the content of the OutputBuffer to the Handle --- --- If you use this, you should use `lockOutput` to ensure you're the only --- thread writing to the console. -emitOutputBuffer :: StdHandle -> OutputBuffer -> IO () -emitOutputBuffer stdh (OutputBuffer l) = - forM_ (reverse l) $ \ba -> case ba of - Output t -> emit t - InTempFile tmp _ -> do - emit =<< T.readFile tmp - void $ tryWhenExists $ removeFile tmp - where - outh = toHandle stdh - emit t = void $ tryIO $ do - T.hPutStr outh t - hFlush outh diff --git a/src/Utility/Process/Shim.hs b/src/Utility/Process/Shim.hs index 08694d5d..8c9d41d0 100644 --- a/src/Utility/Process/Shim.hs +++ b/src/Utility/Process/Shim.hs @@ -1,12 +1,4 @@ module Utility.Process.Shim (module X, createProcess, waitForProcess) where import System.Process as X hiding (createProcess, waitForProcess) -import Utility.ConcurrentOutput (createProcessConcurrent, waitForProcessConcurrent) -import System.IO -import System.Exit - -createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -createProcess = createProcessConcurrent - -waitForProcess :: ProcessHandle -> IO ExitCode -waitForProcess = waitForProcessConcurrent +import System.Process.Concurrent -- cgit v1.2.3