From a077594770132b4a07b168936d07385ff0c618d6 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 10 Jun 2020 15:08:44 -0400 Subject: Revert "Added dependency on concurrent-output; removed embedded copy." This reverts commit dbd3ba3400a3097498252097540ffe8075b00833. Still has the same problem as in 2018! --- src/System/Console/Concurrent.hs | 44 +++ src/System/Console/Concurrent/Internal.hs | 546 ++++++++++++++++++++++++++++++ src/System/Process/Concurrent.hs | 34 ++ 3 files changed, 624 insertions(+) create mode 100644 src/System/Console/Concurrent.hs create mode 100644 src/System/Console/Concurrent/Internal.hs create mode 100644 src/System/Process/Concurrent.hs (limited to 'src/System') diff --git a/src/System/Console/Concurrent.hs b/src/System/Console/Concurrent.hs new file mode 100644 index 00000000..12447637 --- /dev/null +++ b/src/System/Console/Concurrent.hs @@ -0,0 +1,44 @@ +-- | +-- Copyright: 2015 Joey Hess +-- License: BSD-2-clause +-- +-- Concurrent output handling. +-- +-- > import Control.Concurrent.Async +-- > import System.Console.Concurrent +-- > +-- > main = withConcurrentOutput $ +-- > outputConcurrent "washed the car\n" +-- > `concurrently` +-- > outputConcurrent "walked the dog\n" +-- > `concurrently` +-- > createProcessConcurrent (proc "ls" []) + +{-# LANGUAGE CPP #-} + +module System.Console.Concurrent ( + -- * Concurrent output + withConcurrentOutput, + Outputable(..), + outputConcurrent, + errorConcurrent, + ConcurrentProcessHandle, +#ifndef mingw32_HOST_OS + createProcessConcurrent, +#endif + waitForProcessConcurrent, + createProcessForeground, + flushConcurrentOutput, + lockOutput, + -- * Low level access to the output buffer + OutputBuffer, + StdHandle(..), + bufferOutputSTM, + outputBufferWaiterSTM, + waitAnyBuffer, + waitCompleteLines, + emitOutputBuffer, +) where + +import System.Console.Concurrent.Internal + diff --git a/src/System/Console/Concurrent/Internal.hs b/src/System/Console/Concurrent/Internal.hs new file mode 100644 index 00000000..ffe6a9e8 --- /dev/null +++ b/src/System/Console/Concurrent/Internal.hs @@ -0,0 +1,546 @@ +{-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances, TupleSections #-} +{-# LANGUAGE CPP #-} +{-# OPTIONS_GHC -O2 #-} +{- Building this module with -O0 causes streams not to fuse and too much + - memory to be used. -} + +-- | +-- Copyright: 2015 Joey Hess +-- License: BSD-2-clause +-- +-- Concurrent output handling, internals. +-- +-- May change at any time. + +module System.Console.Concurrent.Internal where + +import System.IO +#ifndef mingw32_HOST_OS +import System.Posix.IO +#endif +import System.Directory +import System.Exit +import Control.Monad +import Control.Monad.IO.Class (liftIO, MonadIO) +import System.IO.Unsafe (unsafePerformIO) +import Control.Concurrent +import Control.Concurrent.STM +import Control.Concurrent.Async +import Data.Maybe +import Data.List +import Data.Monoid +import qualified System.Process as P +import qualified Data.Text as T +import qualified Data.Text.IO as T +import Control.Applicative +import Prelude + +import Utility.Monad +import Utility.Exception + +data OutputHandle = OutputHandle + { outputLock :: TMVar Lock + , outputBuffer :: TMVar OutputBuffer + , errorBuffer :: TMVar OutputBuffer + , outputThreads :: TMVar Integer + , processWaiters :: TMVar [Async ()] + , waitForProcessLock :: TMVar () + } + +data Lock = Locked + +-- | A shared global variable for the OutputHandle. +{-# NOINLINE globalOutputHandle #-} +globalOutputHandle :: OutputHandle +globalOutputHandle = unsafePerformIO $ OutputHandle + <$> newEmptyTMVarIO + <*> newTMVarIO (OutputBuffer []) + <*> newTMVarIO (OutputBuffer []) + <*> newTMVarIO 0 + <*> newTMVarIO [] + <*> newEmptyTMVarIO + +-- | Holds a lock while performing an action. This allows the action to +-- perform its own output to the console, without using functions from this +-- module. +-- +-- While this is running, other threads that try to lockOutput will block. +-- Any calls to `outputConcurrent` and `createProcessConcurrent` will not +-- block, but the output will be buffered and displayed only once the +-- action is done. +lockOutput :: (MonadIO m, MonadMask m) => m a -> m a +lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock) + +-- | Blocks until we have the output lock. +takeOutputLock :: IO () +takeOutputLock = void $ takeOutputLock' True + +-- | Tries to take the output lock, without blocking. +tryTakeOutputLock :: IO Bool +tryTakeOutputLock = takeOutputLock' False + +withLock :: (TMVar Lock -> STM a) -> IO a +withLock a = atomically $ a (outputLock globalOutputHandle) + +takeOutputLock' :: Bool -> IO Bool +takeOutputLock' block = do + locked <- withLock $ \l -> do + v <- tryTakeTMVar l + case v of + Just Locked + | block -> retry + | otherwise -> do + -- Restore value we took. + putTMVar l Locked + return False + Nothing -> do + putTMVar l Locked + return True + when locked $ do + (outbuf, errbuf) <- atomically $ (,) + <$> swapTMVar (outputBuffer globalOutputHandle) (OutputBuffer []) + <*> swapTMVar (errorBuffer globalOutputHandle) (OutputBuffer []) + emitOutputBuffer StdOut outbuf + emitOutputBuffer StdErr errbuf + return locked + +-- | Only safe to call after taking the output lock. +dropOutputLock :: IO () +dropOutputLock = withLock $ void . takeTMVar + +-- | Use this around any actions that use `outputConcurrent` +-- or `createProcessConcurrent` +-- +-- This is necessary to ensure that buffered concurrent output actually +-- gets displayed before the program exits. +withConcurrentOutput :: (MonadIO m, MonadMask m) => m a -> m a +withConcurrentOutput a = a `finally` liftIO flushConcurrentOutput + +-- | Blocks until any processes started by `createProcessConcurrent` have +-- finished, and any buffered output is displayed. Also blocks while +-- `lockOutput` is is use. +-- +-- `withConcurrentOutput` calls this at the end, so you do not normally +-- need to use this. +flushConcurrentOutput :: IO () +flushConcurrentOutput = do + atomically $ do + r <- takeTMVar (outputThreads globalOutputHandle) + if r <= 0 + then putTMVar (outputThreads globalOutputHandle) r + else retry + -- Take output lock to wait for anything else that might be + -- currently generating output. + lockOutput $ return () + +-- | Values that can be output. +class Outputable v where + toOutput :: v -> T.Text + +instance Outputable T.Text where + toOutput = id + +instance Outputable String where + toOutput = toOutput . T.pack + +-- | Displays a value to stdout. +-- +-- No newline is appended to the value, so if you want a newline, be sure +-- to include it yourself. +-- +-- Uses locking to ensure that the whole output occurs atomically +-- even when other threads are concurrently generating output. +-- +-- When something else is writing to the console at the same time, this does +-- not block. It buffers the value, so it will be displayed once the other +-- writer is done. +outputConcurrent :: Outputable v => v -> IO () +outputConcurrent = outputConcurrent' StdOut + +-- | Like `outputConcurrent`, but displays to stderr. +-- +-- (Does not throw an exception.) +errorConcurrent :: Outputable v => v -> IO () +errorConcurrent = outputConcurrent' StdErr + +outputConcurrent' :: Outputable v => StdHandle -> v -> IO () +outputConcurrent' stdh v = bracket setup cleanup go + where + setup = tryTakeOutputLock + cleanup False = return () + cleanup True = dropOutputLock + go True = do + T.hPutStr h (toOutput v) + hFlush h + go False = do + oldbuf <- atomically $ takeTMVar bv + newbuf <- addOutputBuffer (Output (toOutput v)) oldbuf + atomically $ putTMVar bv newbuf + h = toHandle stdh + bv = bufferFor stdh + +newtype ConcurrentProcessHandle = ConcurrentProcessHandle P.ProcessHandle + +toConcurrentProcessHandle :: (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -> (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) +toConcurrentProcessHandle (i, o, e, h) = (i, o, e, ConcurrentProcessHandle h) + +-- | Use this to wait for processes started with +-- `createProcessConcurrent` and `createProcessForeground`, and get their +-- exit status. +-- +-- Note that such processes are actually automatically waited for +-- internally, so not calling this explicitly will not result +-- in zombie processes. This behavior differs from `P.waitForProcess` +waitForProcessConcurrent :: ConcurrentProcessHandle -> IO ExitCode +waitForProcessConcurrent (ConcurrentProcessHandle h) = + bracket lock unlock checkexit + where + lck = waitForProcessLock globalOutputHandle + lock = atomically $ tryPutTMVar lck () + unlock True = atomically $ takeTMVar lck + unlock False = return () + checkexit locked = maybe (waitsome locked) return + =<< P.getProcessExitCode h + waitsome True = do + let v = processWaiters globalOutputHandle + l <- atomically $ readTMVar v + if null l + -- Avoid waitAny [] which blocks forever + then P.waitForProcess h + else do + -- Wait for any of the running + -- processes to exit. It may or may not + -- be the one corresponding to the + -- ProcessHandle. If it is, + -- getProcessExitCode will succeed. + void $ tryIO $ waitAny l + checkexit True + waitsome False = do + -- Another thread took the lck first. Wait for that thread to + -- wait for one of the running processes to exit. + atomically $ do + putTMVar lck () + takeTMVar lck + checkexit False + +-- Registers an action that waits for a process to exit, +-- adding it to the processWaiters list, and removing it once the action +-- completes. +asyncProcessWaiter :: IO () -> IO () +asyncProcessWaiter waitaction = do + regdone <- newEmptyTMVarIO + waiter <- async $ do + self <- atomically (takeTMVar regdone) + waitaction `finally` unregister self + register waiter regdone + where + v = processWaiters globalOutputHandle + register waiter regdone = atomically $ do + l <- takeTMVar v + putTMVar v (waiter:l) + putTMVar regdone waiter + unregister waiter = atomically $ do + l <- takeTMVar v + putTMVar v (filter (/= waiter) l) + +-- | Wrapper around `System.Process.createProcess` that prevents +-- multiple processes that are running concurrently from writing +-- to stdout/stderr at the same time. +-- +-- If the process does not output to stdout or stderr, it's run +-- by createProcess entirely as usual. Only processes that can generate +-- output are handled specially: +-- +-- A process is allowed to write to stdout and stderr in the usual +-- way, assuming it can successfully take the output lock. +-- +-- When the output lock is held (ie, by another concurrent process, +-- or because `outputConcurrent` is being called at the same time), +-- the process is instead run with its stdout and stderr +-- redirected to a buffer. The buffered output will be displayed as soon +-- as the output lock becomes free. +-- +-- Currently only available on Unix systems, not Windows. +#ifndef mingw32_HOST_OS +createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) +createProcessConcurrent p + | willOutput (P.std_out p) || willOutput (P.std_err p) = + ifM tryTakeOutputLock + ( fgProcess p + , bgProcess p + ) + | otherwise = do + r@(_, _, _, h) <- P.createProcess p + asyncProcessWaiter $ + void $ tryIO $ P.waitForProcess h + return (toConcurrentProcessHandle r) +#endif + +-- | Wrapper around `System.Process.createProcess` that makes sure a process +-- is run in the foreground, with direct access to stdout and stderr. +-- Useful when eg, running an interactive process. +createProcessForeground :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) +createProcessForeground p = do + takeOutputLock + fgProcess p + +fgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) +fgProcess p = do + r@(_, _, _, h) <- P.createProcess p + `onException` dropOutputLock + registerOutputThread + -- Wait for the process to exit and drop the lock. + asyncProcessWaiter $ do + void $ tryIO $ P.waitForProcess h + unregisterOutputThread + dropOutputLock + return (toConcurrentProcessHandle r) + +#ifndef mingw32_HOST_OS +bgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) +bgProcess p = do + (toouth, fromouth) <- pipe + (toerrh, fromerrh) <- pipe + let p' = p + { P.std_out = rediroutput (P.std_out p) toouth + , P.std_err = rediroutput (P.std_err p) toerrh + } + registerOutputThread + r@(_, _, _, h) <- P.createProcess p' + `onException` unregisterOutputThread + asyncProcessWaiter $ void $ tryIO $ P.waitForProcess h + outbuf <- setupOutputBuffer StdOut toouth (P.std_out p) fromouth + errbuf <- setupOutputBuffer StdErr toerrh (P.std_err p) fromerrh + void $ async $ bufferWriter [outbuf, errbuf] + return (toConcurrentProcessHandle r) + where + pipe = do + (from, to) <- createPipe + (,) <$> fdToHandle to <*> fdToHandle from + rediroutput ss h + | willOutput ss = P.UseHandle h + | otherwise = ss +#endif + +willOutput :: P.StdStream -> Bool +willOutput P.Inherit = True +willOutput _ = False + +-- | Buffered output. +data OutputBuffer = OutputBuffer [OutputBufferedActivity] + deriving (Eq) + +data StdHandle = StdOut | StdErr + +toHandle :: StdHandle -> Handle +toHandle StdOut = stdout +toHandle StdErr = stderr + +bufferFor :: StdHandle -> TMVar OutputBuffer +bufferFor StdOut = outputBuffer globalOutputHandle +bufferFor StdErr = errorBuffer globalOutputHandle + +data OutputBufferedActivity + = Output T.Text + | InTempFile + { tempFile :: FilePath + , endsInNewLine :: Bool + } + deriving (Eq) + +data AtEnd = AtEnd + deriving Eq + +data BufSig = BufSig + +setupOutputBuffer :: StdHandle -> Handle -> P.StdStream -> Handle -> IO (StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd) +setupOutputBuffer h toh ss fromh = do + hClose toh + buf <- newMVar (OutputBuffer []) + bufsig <- atomically newEmptyTMVar + bufend <- atomically newEmptyTMVar + void $ async $ outputDrainer ss fromh buf bufsig bufend + return (h, buf, bufsig, bufend) + +-- Drain output from the handle, and buffer it. +outputDrainer :: P.StdStream -> Handle -> MVar OutputBuffer -> TMVar BufSig -> TMVar AtEnd -> IO () +outputDrainer ss fromh buf bufsig bufend + | willOutput ss = go + | otherwise = atend + where + go = do + t <- T.hGetChunk fromh + if T.null t + then atend + else do + modifyMVar_ buf $ addOutputBuffer (Output t) + changed + go + atend = do + atomically $ putTMVar bufend AtEnd + hClose fromh + changed = atomically $ do + void $ tryTakeTMVar bufsig + putTMVar bufsig BufSig + +registerOutputThread :: IO () +registerOutputThread = do + let v = outputThreads globalOutputHandle + atomically $ putTMVar v . succ =<< takeTMVar v + +unregisterOutputThread :: IO () +unregisterOutputThread = do + let v = outputThreads globalOutputHandle + atomically $ putTMVar v . pred =<< takeTMVar v + +-- Wait to lock output, and once we can, display everything +-- that's put into the buffers, until the end. +-- +-- If end is reached before lock is taken, instead add the command's +-- buffers to the global outputBuffer and errorBuffer. +bufferWriter :: [(StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd)] -> IO () +bufferWriter ts = do + activitysig <- atomically newEmptyTMVar + worker1 <- async $ lockOutput $ + ifM (atomically $ tryPutTMVar activitysig ()) + ( void $ mapConcurrently displaybuf ts + , noop -- buffers already moved to global + ) + worker2 <- async $ void $ globalbuf activitysig worker1 + void $ async $ do + void $ waitCatch worker1 + void $ waitCatch worker2 + unregisterOutputThread + where + displaybuf v@(outh, buf, bufsig, bufend) = do + change <- atomically $ + (Right <$> takeTMVar bufsig) + `orElse` + (Left <$> takeTMVar bufend) + l <- takeMVar buf + putMVar buf (OutputBuffer []) + emitOutputBuffer outh l + case change of + Right BufSig -> displaybuf v + Left AtEnd -> return () + globalbuf activitysig worker1 = do + ok <- atomically $ do + -- signal we're going to handle it + -- (returns false if the displaybuf already did) + ok <- tryPutTMVar activitysig () + -- wait for end of all buffers + when ok $ + mapM_ (\(_outh, _buf, _bufsig, bufend) -> takeTMVar bufend) ts + return ok + when ok $ do + -- add all of the command's buffered output to the + -- global output buffer, atomically + bs <- forM ts $ \(outh, buf, _bufsig, _bufend) -> + (outh,) <$> takeMVar buf + atomically $ + forM_ bs $ \(outh, b) -> + bufferOutputSTM' outh b + -- worker1 might be blocked waiting for the output + -- lock, and we've already done its job, so cancel it + cancel worker1 + +-- Adds a value to the OutputBuffer. When adding Output to a Handle, +-- it's cheaper to combine it with any already buffered Output to that +-- same Handle. +-- +-- When the total buffered Output exceeds 1 mb in size, it's moved out of +-- memory, to a temp file. This should only happen rarely, but is done to +-- avoid some verbose process unexpectedly causing excessive memory use. +addOutputBuffer :: OutputBufferedActivity -> OutputBuffer -> IO OutputBuffer +addOutputBuffer (Output t) (OutputBuffer buf) + | T.length t' <= 1048576 = return $ OutputBuffer (Output t' : other) + | otherwise = do + tmpdir <- getTemporaryDirectory + (tmp, h) <- openTempFile tmpdir "output.tmp" + let !endnl = endsNewLine t' + let i = InTempFile + { tempFile = tmp + , endsInNewLine = endnl + } + T.hPutStr h t' + hClose h + return $ OutputBuffer (i : other) + where + !t' = T.concat (mapMaybe getOutput this) <> t + !(this, other) = partition isOutput buf + isOutput v = case v of + Output _ -> True + _ -> False + getOutput v = case v of + Output t'' -> Just t'' + _ -> Nothing +addOutputBuffer v (OutputBuffer buf) = return $ OutputBuffer (v:buf) + +-- | Adds a value to the output buffer for later display. +-- +-- Note that buffering large quantities of data this way will keep it +-- resident in memory until it can be displayed. While `outputConcurrent` +-- uses temp files if the buffer gets too big, this STM function cannot do +-- so. +bufferOutputSTM :: Outputable v => StdHandle -> v -> STM () +bufferOutputSTM h v = bufferOutputSTM' h (OutputBuffer [Output (toOutput v)]) + +bufferOutputSTM' :: StdHandle -> OutputBuffer -> STM () +bufferOutputSTM' h (OutputBuffer newbuf) = do + (OutputBuffer buf) <- takeTMVar bv + putTMVar bv (OutputBuffer (newbuf ++ buf)) + where + bv = bufferFor h + +-- | A STM action that waits for some buffered output to become +-- available, and returns it. +-- +-- The function can select a subset of output when only some is desired; +-- the fst part is returned and the snd is left in the buffer. +-- +-- This will prevent it from being displayed in the usual way, so you'll +-- need to use `emitOutputBuffer` to display it yourself. +outputBufferWaiterSTM :: (OutputBuffer -> (OutputBuffer, OutputBuffer)) -> STM (StdHandle, OutputBuffer) +outputBufferWaiterSTM selector = waitgetbuf StdOut `orElse` waitgetbuf StdErr + where + waitgetbuf h = do + let bv = bufferFor h + (selected, rest) <- selector <$> takeTMVar bv + when (selected == OutputBuffer []) + retry + putTMVar bv rest + return (h, selected) + +waitAnyBuffer :: OutputBuffer -> (OutputBuffer, OutputBuffer) +waitAnyBuffer b = (b, OutputBuffer []) + +-- | Use with `outputBufferWaiterSTM` to make it only return buffered +-- output that ends with a newline. Anything buffered without a newline +-- is left in the buffer. +waitCompleteLines :: OutputBuffer -> (OutputBuffer, OutputBuffer) +waitCompleteLines (OutputBuffer l) = + let (selected, rest) = span completeline l + in (OutputBuffer selected, OutputBuffer rest) + where + completeline (v@(InTempFile {})) = endsInNewLine v + completeline (Output b) = endsNewLine b + +endsNewLine :: T.Text -> Bool +endsNewLine t = not (T.null t) && T.last t == '\n' + +-- | Emits the content of the OutputBuffer to the Handle +-- +-- If you use this, you should use `lockOutput` to ensure you're the only +-- thread writing to the console. +emitOutputBuffer :: StdHandle -> OutputBuffer -> IO () +emitOutputBuffer stdh (OutputBuffer l) = + forM_ (reverse l) $ \ba -> case ba of + Output t -> emit t + InTempFile tmp _ -> do + emit =<< T.readFile tmp + void $ tryWhenExists $ removeFile tmp + where + outh = toHandle stdh + emit t = void $ tryIO $ do + T.hPutStr outh t + hFlush outh diff --git a/src/System/Process/Concurrent.hs b/src/System/Process/Concurrent.hs new file mode 100644 index 00000000..0e00e4fd --- /dev/null +++ b/src/System/Process/Concurrent.hs @@ -0,0 +1,34 @@ +-- | +-- Copyright: 2015 Joey Hess +-- License: BSD-2-clause +-- +-- The functions exported by this module are intended to be drop-in +-- replacements for those from System.Process, when converting a whole +-- program to use System.Console.Concurrent. + +module System.Process.Concurrent where + +import System.Console.Concurrent +import System.Console.Concurrent.Internal (ConcurrentProcessHandle(..)) +import System.Process hiding (createProcess, waitForProcess) +import System.IO +import System.Exit + +-- | Calls `createProcessConcurrent` +-- +-- You should use the waitForProcess in this module on the resulting +-- ProcessHandle. Using System.Process.waitForProcess instead can have +-- mildly unexpected results. +createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) +createProcess p = do + (i, o, e, ConcurrentProcessHandle h) <- createProcessConcurrent p + return (i, o, e, h) + +-- | Calls `waitForProcessConcurrent` +-- +-- You should only use this on a ProcessHandle obtained by calling +-- createProcess from this module. Using this with a ProcessHandle +-- obtained from System.Process.createProcess etc will have extremely +-- unexpected results; it can wait a very long time before returning. +waitForProcess :: ProcessHandle -> IO ExitCode +waitForProcess = waitForProcessConcurrent . ConcurrentProcessHandle -- cgit v1.2.3