From 162e1d4e82e24f0fe3e2bd3114e4366ddb1062c0 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 10 Jun 2020 17:07:43 -0400 Subject: merge from concurrent-output This includes e6d4139e15b3a52f4a60178bb7d15ba96f191340 which I hope fixes the reversion that has been plaguing propellor when trying to use more recent versions of concurrent-output. Embedding it will let me test, and also it will be years until that fix is widely enough available to depend on it. --- src/System/Console/Concurrent.hs | 10 +- src/System/Console/Concurrent/Internal.hs | 187 ++++++++++++------------------ src/System/Process/Concurrent.hs | 16 +-- 3 files changed, 77 insertions(+), 136 deletions(-) diff --git a/src/System/Console/Concurrent.hs b/src/System/Console/Concurrent.hs index 12447637..8ab73c3d 100644 --- a/src/System/Console/Concurrent.hs +++ b/src/System/Console/Concurrent.hs @@ -7,29 +7,25 @@ -- > import Control.Concurrent.Async -- > import System.Console.Concurrent -- > --- > main = withConcurrentOutput $ +-- > main = withConcurrentOutput $ do -- > outputConcurrent "washed the car\n" -- > `concurrently` -- > outputConcurrent "walked the dog\n" -- > `concurrently` -- > createProcessConcurrent (proc "ls" []) -{-# LANGUAGE CPP #-} - module System.Console.Concurrent ( -- * Concurrent output withConcurrentOutput, Outputable(..), outputConcurrent, errorConcurrent, - ConcurrentProcessHandle, -#ifndef mingw32_HOST_OS createProcessConcurrent, -#endif - waitForProcessConcurrent, createProcessForeground, flushConcurrentOutput, lockOutput, + ConcurrentProcessHandle, + waitForProcessConcurrent, -- * Low level access to the output buffer OutputBuffer, StdHandle(..), diff --git a/src/System/Console/Concurrent/Internal.hs b/src/System/Console/Concurrent/Internal.hs index ffe6a9e8..de4cffaf 100644 --- a/src/System/Console/Concurrent/Internal.hs +++ b/src/System/Console/Concurrent/Internal.hs @@ -1,5 +1,4 @@ {-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances, TupleSections #-} -{-# LANGUAGE CPP #-} {-# OPTIONS_GHC -O2 #-} {- Building this module with -O0 causes streams not to fuse and too much - memory to be used. -} @@ -15,9 +14,6 @@ module System.Console.Concurrent.Internal where import System.IO -#ifndef mingw32_HOST_OS -import System.Posix.IO -#endif import System.Directory import System.Exit import Control.Monad @@ -32,6 +28,7 @@ import Data.Monoid import qualified System.Process as P import qualified Data.Text as T import qualified Data.Text.IO as T +import qualified Data.Text.Lazy as L import Control.Applicative import Prelude @@ -43,8 +40,6 @@ data OutputHandle = OutputHandle , outputBuffer :: TMVar OutputBuffer , errorBuffer :: TMVar OutputBuffer , outputThreads :: TMVar Integer - , processWaiters :: TMVar [Async ()] - , waitForProcessLock :: TMVar () } data Lock = Locked @@ -57,8 +52,6 @@ globalOutputHandle = unsafePerformIO $ OutputHandle <*> newTMVarIO (OutputBuffer []) <*> newTMVarIO (OutputBuffer []) <*> newTMVarIO 0 - <*> newTMVarIO [] - <*> newEmptyTMVarIO -- | Holds a lock while performing an action. This allows the action to -- perform its own output to the console, without using functions from this @@ -109,7 +102,8 @@ dropOutputLock :: IO () dropOutputLock = withLock $ void . takeTMVar -- | Use this around any actions that use `outputConcurrent` --- or `createProcessConcurrent` +-- or `createProcessConcurrent`, unless +-- `System.Console.Regions.displayConsoleRegions` is being used. -- -- This is necessary to ensure that buffered concurrent output actually -- gets displayed before the program exits. @@ -140,20 +134,30 @@ class Outputable v where instance Outputable T.Text where toOutput = id +-- | Note that using a lazy Text as an Outputable value +-- will buffer it all in memory. +instance Outputable L.Text where + toOutput = toOutput . L.toStrict + instance Outputable String where toOutput = toOutput . T.pack -- | Displays a value to stdout. -- --- No newline is appended to the value, so if you want a newline, be sure --- to include it yourself. --- -- Uses locking to ensure that the whole output occurs atomically -- even when other threads are concurrently generating output. -- +-- No newline is appended to the value, so if you want a newline, be sure +-- to include it yourself. +-- -- When something else is writing to the console at the same time, this does -- not block. It buffers the value, so it will be displayed once the other -- writer is done. +-- +-- When outputConcurrent is used within a call to +-- `System.Console.Regions.displayConsoleRegions`, the output is displayed +-- above the currently open console regions. Only lines ending in a newline +-- are displayed in this case (it uses `waitCompleteLines`). outputConcurrent :: Outputable v => v -> IO () outputConcurrent = outputConcurrent' StdOut @@ -179,69 +183,13 @@ outputConcurrent' stdh v = bracket setup cleanup go h = toHandle stdh bv = bufferFor stdh -newtype ConcurrentProcessHandle = ConcurrentProcessHandle P.ProcessHandle - -toConcurrentProcessHandle :: (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -> (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) -toConcurrentProcessHandle (i, o, e, h) = (i, o, e, ConcurrentProcessHandle h) +-- | This alias is provided to avoid breaking backwards compatibility. +type ConcurrentProcessHandle = P.ProcessHandle --- | Use this to wait for processes started with --- `createProcessConcurrent` and `createProcessForeground`, and get their --- exit status. --- --- Note that such processes are actually automatically waited for --- internally, so not calling this explicitly will not result --- in zombie processes. This behavior differs from `P.waitForProcess` +-- | Same as `P.waitForProcess`; provided to avoid breaking backwards +-- compatibility. waitForProcessConcurrent :: ConcurrentProcessHandle -> IO ExitCode -waitForProcessConcurrent (ConcurrentProcessHandle h) = - bracket lock unlock checkexit - where - lck = waitForProcessLock globalOutputHandle - lock = atomically $ tryPutTMVar lck () - unlock True = atomically $ takeTMVar lck - unlock False = return () - checkexit locked = maybe (waitsome locked) return - =<< P.getProcessExitCode h - waitsome True = do - let v = processWaiters globalOutputHandle - l <- atomically $ readTMVar v - if null l - -- Avoid waitAny [] which blocks forever - then P.waitForProcess h - else do - -- Wait for any of the running - -- processes to exit. It may or may not - -- be the one corresponding to the - -- ProcessHandle. If it is, - -- getProcessExitCode will succeed. - void $ tryIO $ waitAny l - checkexit True - waitsome False = do - -- Another thread took the lck first. Wait for that thread to - -- wait for one of the running processes to exit. - atomically $ do - putTMVar lck () - takeTMVar lck - checkexit False - --- Registers an action that waits for a process to exit, --- adding it to the processWaiters list, and removing it once the action --- completes. -asyncProcessWaiter :: IO () -> IO () -asyncProcessWaiter waitaction = do - regdone <- newEmptyTMVarIO - waiter <- async $ do - self <- atomically (takeTMVar regdone) - waitaction `finally` unregister self - register waiter regdone - where - v = processWaiters globalOutputHandle - register waiter regdone = atomically $ do - l <- takeTMVar v - putTMVar v (waiter:l) - putTMVar regdone waiter - unregister waiter = atomically $ do - l <- takeTMVar v - putTMVar v (filter (/= waiter) l) +waitForProcessConcurrent = P.waitForProcess -- | Wrapper around `System.Process.createProcess` that prevents -- multiple processes that are running concurrently from writing @@ -260,9 +208,10 @@ asyncProcessWaiter waitaction = do -- redirected to a buffer. The buffered output will be displayed as soon -- as the output lock becomes free. -- --- Currently only available on Unix systems, not Windows. -#ifndef mingw32_HOST_OS -createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) +-- Note that the the process is waited for by a background thread, +-- so unlike createProcess, neglecting to call waitForProcess will not +-- result in zombie processess. +createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) createProcessConcurrent p | willOutput (P.std_out p) || willOutput (P.std_err p) = ifM tryTakeOutputLock @@ -271,56 +220,65 @@ createProcessConcurrent p ) | otherwise = do r@(_, _, _, h) <- P.createProcess p - asyncProcessWaiter $ - void $ tryIO $ P.waitForProcess h - return (toConcurrentProcessHandle r) -#endif + _ <- async $ void $ tryIO $ P.waitForProcess h + return r -- | Wrapper around `System.Process.createProcess` that makes sure a process -- is run in the foreground, with direct access to stdout and stderr. -- Useful when eg, running an interactive process. -createProcessForeground :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) +-- +-- Note that the the process is waited for by a background thread, +-- so unlike createProcess, neglecting to call waitForProcess will not +-- result in zombie processess. +createProcessForeground :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) createProcessForeground p = do takeOutputLock fgProcess p -fgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) +fgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) fgProcess p = do r@(_, _, _, h) <- P.createProcess p `onException` dropOutputLock registerOutputThread -- Wait for the process to exit and drop the lock. - asyncProcessWaiter $ do + _ <- async $ do void $ tryIO $ P.waitForProcess h unregisterOutputThread dropOutputLock - return (toConcurrentProcessHandle r) + return r -#ifndef mingw32_HOST_OS -bgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) +bgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) bgProcess p = do - (toouth, fromouth) <- pipe - (toerrh, fromerrh) <- pipe let p' = p - { P.std_out = rediroutput (P.std_out p) toouth - , P.std_err = rediroutput (P.std_err p) toerrh + { P.std_out = rediroutput (P.std_out p) + , P.std_err = rediroutput (P.std_err p) } registerOutputThread - r@(_, _, _, h) <- P.createProcess p' + (stdin_h, stdout_h, stderr_h, h) <- P.createProcess p' `onException` unregisterOutputThread - asyncProcessWaiter $ void $ tryIO $ P.waitForProcess h - outbuf <- setupOutputBuffer StdOut toouth (P.std_out p) fromouth - errbuf <- setupOutputBuffer StdErr toerrh (P.std_err p) fromerrh + let r = + ( stdin_h + , mungeret (P.std_out p) stdout_h + , mungeret (P.std_err p) stderr_h + , h + ) + -- Wait for the process for symmetry with fgProcess, + -- which does the same. + _ <- async $ void $ tryIO $ P.waitForProcess h + outbuf <- setupOutputBuffer StdOut (mungebuf (P.std_out p) stdout_h) + errbuf <- setupOutputBuffer StdErr (mungebuf (P.std_err p) stderr_h) void $ async $ bufferWriter [outbuf, errbuf] - return (toConcurrentProcessHandle r) + return r where - pipe = do - (from, to) <- createPipe - (,) <$> fdToHandle to <*> fdToHandle from - rediroutput ss h - | willOutput ss = P.UseHandle h + rediroutput ss + | willOutput ss = P.CreatePipe | otherwise = ss -#endif + mungebuf ss mh + | willOutput ss = mh + | otherwise = Nothing + mungeret ss mh + | willOutput ss = Nothing + | otherwise = mh willOutput :: P.StdStream -> Bool willOutput P.Inherit = True @@ -353,32 +311,31 @@ data AtEnd = AtEnd data BufSig = BufSig -setupOutputBuffer :: StdHandle -> Handle -> P.StdStream -> Handle -> IO (StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd) -setupOutputBuffer h toh ss fromh = do - hClose toh +setupOutputBuffer :: StdHandle -> Maybe Handle -> IO (StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd) +setupOutputBuffer h fromh = do buf <- newMVar (OutputBuffer []) bufsig <- atomically newEmptyTMVar bufend <- atomically newEmptyTMVar - void $ async $ outputDrainer ss fromh buf bufsig bufend + void $ async $ outputDrainer fromh buf bufsig bufend return (h, buf, bufsig, bufend) -- Drain output from the handle, and buffer it. -outputDrainer :: P.StdStream -> Handle -> MVar OutputBuffer -> TMVar BufSig -> TMVar AtEnd -> IO () -outputDrainer ss fromh buf bufsig bufend - | willOutput ss = go - | otherwise = atend +outputDrainer :: Maybe Handle -> MVar OutputBuffer -> TMVar BufSig -> TMVar AtEnd -> IO () +outputDrainer mfromh buf bufsig bufend = case mfromh of + Nothing -> atend + Just fromh -> go fromh where - go = do + go fromh = do t <- T.hGetChunk fromh if T.null t - then atend + then do + atend + hClose fromh else do modifyMVar_ buf $ addOutputBuffer (Output t) changed - go - atend = do - atomically $ putTMVar bufend AtEnd - hClose fromh + go fromh + atend = atomically $ putTMVar bufend AtEnd changed = atomically $ do void $ tryTakeTMVar bufsig putTMVar bufsig BufSig diff --git a/src/System/Process/Concurrent.hs b/src/System/Process/Concurrent.hs index 0e00e4fd..346ce2e0 100644 --- a/src/System/Process/Concurrent.hs +++ b/src/System/Process/Concurrent.hs @@ -9,26 +9,14 @@ module System.Process.Concurrent where import System.Console.Concurrent -import System.Console.Concurrent.Internal (ConcurrentProcessHandle(..)) import System.Process hiding (createProcess, waitForProcess) import System.IO import System.Exit -- | Calls `createProcessConcurrent` --- --- You should use the waitForProcess in this module on the resulting --- ProcessHandle. Using System.Process.waitForProcess instead can have --- mildly unexpected results. createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -createProcess p = do - (i, o, e, ConcurrentProcessHandle h) <- createProcessConcurrent p - return (i, o, e, h) +createProcess = createProcessConcurrent -- | Calls `waitForProcessConcurrent` --- --- You should only use this on a ProcessHandle obtained by calling --- createProcess from this module. Using this with a ProcessHandle --- obtained from System.Process.createProcess etc will have extremely --- unexpected results; it can wait a very long time before returning. waitForProcess :: ProcessHandle -> IO ExitCode -waitForProcess = waitForProcessConcurrent . ConcurrentProcessHandle +waitForProcess = waitForProcessConcurrent -- cgit v1.2.3