summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoey Hess2020-08-14 15:55:28 -0400
committerJoey Hess2020-08-14 15:55:28 -0400
commit9c08fa24f2e2bf07413b151656c373d873de7298 (patch)
treeb53b9fe2585ab60536640fd6a4dcbc798c355742
parent5eab7dbb71faf7c5a31daf141a26d8207c919e89 (diff)
Revert "merge from concurrent-output"
This reverts commit 162e1d4e82e24f0fe3e2bd3114e4366ddb1062c0. concurrent-output depends on process-1.6.0. Older versions of process have a bug that it tickles. But, propellor does not depend on this version of process.
-rw-r--r--src/System/Console/Concurrent.hs10
-rw-r--r--src/System/Console/Concurrent/Internal.hs187
-rw-r--r--src/System/Process/Concurrent.hs16
3 files changed, 136 insertions, 77 deletions
diff --git a/src/System/Console/Concurrent.hs b/src/System/Console/Concurrent.hs
index 8ab73c3d..12447637 100644
--- a/src/System/Console/Concurrent.hs
+++ b/src/System/Console/Concurrent.hs
@@ -7,25 +7,29 @@
-- > import Control.Concurrent.Async
-- > import System.Console.Concurrent
-- >
--- > main = withConcurrentOutput $ do
+-- > main = withConcurrentOutput $
-- > outputConcurrent "washed the car\n"
-- > `concurrently`
-- > outputConcurrent "walked the dog\n"
-- > `concurrently`
-- > createProcessConcurrent (proc "ls" [])
+{-# LANGUAGE CPP #-}
+
module System.Console.Concurrent (
-- * Concurrent output
withConcurrentOutput,
Outputable(..),
outputConcurrent,
errorConcurrent,
+ ConcurrentProcessHandle,
+#ifndef mingw32_HOST_OS
createProcessConcurrent,
+#endif
+ waitForProcessConcurrent,
createProcessForeground,
flushConcurrentOutput,
lockOutput,
- ConcurrentProcessHandle,
- waitForProcessConcurrent,
-- * Low level access to the output buffer
OutputBuffer,
StdHandle(..),
diff --git a/src/System/Console/Concurrent/Internal.hs b/src/System/Console/Concurrent/Internal.hs
index de4cffaf..ffe6a9e8 100644
--- a/src/System/Console/Concurrent/Internal.hs
+++ b/src/System/Console/Concurrent/Internal.hs
@@ -1,4 +1,5 @@
{-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances, TupleSections #-}
+{-# LANGUAGE CPP #-}
{-# OPTIONS_GHC -O2 #-}
{- Building this module with -O0 causes streams not to fuse and too much
- memory to be used. -}
@@ -14,6 +15,9 @@
module System.Console.Concurrent.Internal where
import System.IO
+#ifndef mingw32_HOST_OS
+import System.Posix.IO
+#endif
import System.Directory
import System.Exit
import Control.Monad
@@ -28,7 +32,6 @@ import Data.Monoid
import qualified System.Process as P
import qualified Data.Text as T
import qualified Data.Text.IO as T
-import qualified Data.Text.Lazy as L
import Control.Applicative
import Prelude
@@ -40,6 +43,8 @@ data OutputHandle = OutputHandle
, outputBuffer :: TMVar OutputBuffer
, errorBuffer :: TMVar OutputBuffer
, outputThreads :: TMVar Integer
+ , processWaiters :: TMVar [Async ()]
+ , waitForProcessLock :: TMVar ()
}
data Lock = Locked
@@ -52,6 +57,8 @@ globalOutputHandle = unsafePerformIO $ OutputHandle
<*> newTMVarIO (OutputBuffer [])
<*> newTMVarIO (OutputBuffer [])
<*> newTMVarIO 0
+ <*> newTMVarIO []
+ <*> newEmptyTMVarIO
-- | Holds a lock while performing an action. This allows the action to
-- perform its own output to the console, without using functions from this
@@ -102,8 +109,7 @@ dropOutputLock :: IO ()
dropOutputLock = withLock $ void . takeTMVar
-- | Use this around any actions that use `outputConcurrent`
--- or `createProcessConcurrent`, unless
--- `System.Console.Regions.displayConsoleRegions` is being used.
+-- or `createProcessConcurrent`
--
-- This is necessary to ensure that buffered concurrent output actually
-- gets displayed before the program exits.
@@ -134,30 +140,20 @@ class Outputable v where
instance Outputable T.Text where
toOutput = id
--- | Note that using a lazy Text as an Outputable value
--- will buffer it all in memory.
-instance Outputable L.Text where
- toOutput = toOutput . L.toStrict
-
instance Outputable String where
toOutput = toOutput . T.pack
-- | Displays a value to stdout.
--
--- Uses locking to ensure that the whole output occurs atomically
--- even when other threads are concurrently generating output.
---
-- No newline is appended to the value, so if you want a newline, be sure
-- to include it yourself.
--
+-- Uses locking to ensure that the whole output occurs atomically
+-- even when other threads are concurrently generating output.
+--
-- When something else is writing to the console at the same time, this does
-- not block. It buffers the value, so it will be displayed once the other
-- writer is done.
---
--- When outputConcurrent is used within a call to
--- `System.Console.Regions.displayConsoleRegions`, the output is displayed
--- above the currently open console regions. Only lines ending in a newline
--- are displayed in this case (it uses `waitCompleteLines`).
outputConcurrent :: Outputable v => v -> IO ()
outputConcurrent = outputConcurrent' StdOut
@@ -183,13 +179,69 @@ outputConcurrent' stdh v = bracket setup cleanup go
h = toHandle stdh
bv = bufferFor stdh
--- | This alias is provided to avoid breaking backwards compatibility.
-type ConcurrentProcessHandle = P.ProcessHandle
+newtype ConcurrentProcessHandle = ConcurrentProcessHandle P.ProcessHandle
+
+toConcurrentProcessHandle :: (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -> (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
+toConcurrentProcessHandle (i, o, e, h) = (i, o, e, ConcurrentProcessHandle h)
--- | Same as `P.waitForProcess`; provided to avoid breaking backwards
--- compatibility.
+-- | Use this to wait for processes started with
+-- `createProcessConcurrent` and `createProcessForeground`, and get their
+-- exit status.
+--
+-- Note that such processes are actually automatically waited for
+-- internally, so not calling this explicitly will not result
+-- in zombie processes. This behavior differs from `P.waitForProcess`
waitForProcessConcurrent :: ConcurrentProcessHandle -> IO ExitCode
-waitForProcessConcurrent = P.waitForProcess
+waitForProcessConcurrent (ConcurrentProcessHandle h) =
+ bracket lock unlock checkexit
+ where
+ lck = waitForProcessLock globalOutputHandle
+ lock = atomically $ tryPutTMVar lck ()
+ unlock True = atomically $ takeTMVar lck
+ unlock False = return ()
+ checkexit locked = maybe (waitsome locked) return
+ =<< P.getProcessExitCode h
+ waitsome True = do
+ let v = processWaiters globalOutputHandle
+ l <- atomically $ readTMVar v
+ if null l
+ -- Avoid waitAny [] which blocks forever
+ then P.waitForProcess h
+ else do
+ -- Wait for any of the running
+ -- processes to exit. It may or may not
+ -- be the one corresponding to the
+ -- ProcessHandle. If it is,
+ -- getProcessExitCode will succeed.
+ void $ tryIO $ waitAny l
+ checkexit True
+ waitsome False = do
+ -- Another thread took the lck first. Wait for that thread to
+ -- wait for one of the running processes to exit.
+ atomically $ do
+ putTMVar lck ()
+ takeTMVar lck
+ checkexit False
+
+-- Registers an action that waits for a process to exit,
+-- adding it to the processWaiters list, and removing it once the action
+-- completes.
+asyncProcessWaiter :: IO () -> IO ()
+asyncProcessWaiter waitaction = do
+ regdone <- newEmptyTMVarIO
+ waiter <- async $ do
+ self <- atomically (takeTMVar regdone)
+ waitaction `finally` unregister self
+ register waiter regdone
+ where
+ v = processWaiters globalOutputHandle
+ register waiter regdone = atomically $ do
+ l <- takeTMVar v
+ putTMVar v (waiter:l)
+ putTMVar regdone waiter
+ unregister waiter = atomically $ do
+ l <- takeTMVar v
+ putTMVar v (filter (/= waiter) l)
-- | Wrapper around `System.Process.createProcess` that prevents
-- multiple processes that are running concurrently from writing
@@ -208,10 +260,9 @@ waitForProcessConcurrent = P.waitForProcess
-- redirected to a buffer. The buffered output will be displayed as soon
-- as the output lock becomes free.
--
--- Note that the the process is waited for by a background thread,
--- so unlike createProcess, neglecting to call waitForProcess will not
--- result in zombie processess.
-createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
+-- Currently only available on Unix systems, not Windows.
+#ifndef mingw32_HOST_OS
+createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
createProcessConcurrent p
| willOutput (P.std_out p) || willOutput (P.std_err p) =
ifM tryTakeOutputLock
@@ -220,65 +271,56 @@ createProcessConcurrent p
)
| otherwise = do
r@(_, _, _, h) <- P.createProcess p
- _ <- async $ void $ tryIO $ P.waitForProcess h
- return r
+ asyncProcessWaiter $
+ void $ tryIO $ P.waitForProcess h
+ return (toConcurrentProcessHandle r)
+#endif
-- | Wrapper around `System.Process.createProcess` that makes sure a process
-- is run in the foreground, with direct access to stdout and stderr.
-- Useful when eg, running an interactive process.
---
--- Note that the the process is waited for by a background thread,
--- so unlike createProcess, neglecting to call waitForProcess will not
--- result in zombie processess.
-createProcessForeground :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
+createProcessForeground :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
createProcessForeground p = do
takeOutputLock
fgProcess p
-fgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
+fgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
fgProcess p = do
r@(_, _, _, h) <- P.createProcess p
`onException` dropOutputLock
registerOutputThread
-- Wait for the process to exit and drop the lock.
- _ <- async $ do
+ asyncProcessWaiter $ do
void $ tryIO $ P.waitForProcess h
unregisterOutputThread
dropOutputLock
- return r
+ return (toConcurrentProcessHandle r)
-bgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
+#ifndef mingw32_HOST_OS
+bgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
bgProcess p = do
+ (toouth, fromouth) <- pipe
+ (toerrh, fromerrh) <- pipe
let p' = p
- { P.std_out = rediroutput (P.std_out p)
- , P.std_err = rediroutput (P.std_err p)
+ { P.std_out = rediroutput (P.std_out p) toouth
+ , P.std_err = rediroutput (P.std_err p) toerrh
}
registerOutputThread
- (stdin_h, stdout_h, stderr_h, h) <- P.createProcess p'
+ r@(_, _, _, h) <- P.createProcess p'
`onException` unregisterOutputThread
- let r =
- ( stdin_h
- , mungeret (P.std_out p) stdout_h
- , mungeret (P.std_err p) stderr_h
- , h
- )
- -- Wait for the process for symmetry with fgProcess,
- -- which does the same.
- _ <- async $ void $ tryIO $ P.waitForProcess h
- outbuf <- setupOutputBuffer StdOut (mungebuf (P.std_out p) stdout_h)
- errbuf <- setupOutputBuffer StdErr (mungebuf (P.std_err p) stderr_h)
+ asyncProcessWaiter $ void $ tryIO $ P.waitForProcess h
+ outbuf <- setupOutputBuffer StdOut toouth (P.std_out p) fromouth
+ errbuf <- setupOutputBuffer StdErr toerrh (P.std_err p) fromerrh
void $ async $ bufferWriter [outbuf, errbuf]
- return r
+ return (toConcurrentProcessHandle r)
where
- rediroutput ss
- | willOutput ss = P.CreatePipe
+ pipe = do
+ (from, to) <- createPipe
+ (,) <$> fdToHandle to <*> fdToHandle from
+ rediroutput ss h
+ | willOutput ss = P.UseHandle h
| otherwise = ss
- mungebuf ss mh
- | willOutput ss = mh
- | otherwise = Nothing
- mungeret ss mh
- | willOutput ss = Nothing
- | otherwise = mh
+#endif
willOutput :: P.StdStream -> Bool
willOutput P.Inherit = True
@@ -311,31 +353,32 @@ data AtEnd = AtEnd
data BufSig = BufSig
-setupOutputBuffer :: StdHandle -> Maybe Handle -> IO (StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd)
-setupOutputBuffer h fromh = do
+setupOutputBuffer :: StdHandle -> Handle -> P.StdStream -> Handle -> IO (StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd)
+setupOutputBuffer h toh ss fromh = do
+ hClose toh
buf <- newMVar (OutputBuffer [])
bufsig <- atomically newEmptyTMVar
bufend <- atomically newEmptyTMVar
- void $ async $ outputDrainer fromh buf bufsig bufend
+ void $ async $ outputDrainer ss fromh buf bufsig bufend
return (h, buf, bufsig, bufend)
-- Drain output from the handle, and buffer it.
-outputDrainer :: Maybe Handle -> MVar OutputBuffer -> TMVar BufSig -> TMVar AtEnd -> IO ()
-outputDrainer mfromh buf bufsig bufend = case mfromh of
- Nothing -> atend
- Just fromh -> go fromh
+outputDrainer :: P.StdStream -> Handle -> MVar OutputBuffer -> TMVar BufSig -> TMVar AtEnd -> IO ()
+outputDrainer ss fromh buf bufsig bufend
+ | willOutput ss = go
+ | otherwise = atend
where
- go fromh = do
+ go = do
t <- T.hGetChunk fromh
if T.null t
- then do
- atend
- hClose fromh
+ then atend
else do
modifyMVar_ buf $ addOutputBuffer (Output t)
changed
- go fromh
- atend = atomically $ putTMVar bufend AtEnd
+ go
+ atend = do
+ atomically $ putTMVar bufend AtEnd
+ hClose fromh
changed = atomically $ do
void $ tryTakeTMVar bufsig
putTMVar bufsig BufSig
diff --git a/src/System/Process/Concurrent.hs b/src/System/Process/Concurrent.hs
index 346ce2e0..0e00e4fd 100644
--- a/src/System/Process/Concurrent.hs
+++ b/src/System/Process/Concurrent.hs
@@ -9,14 +9,26 @@
module System.Process.Concurrent where
import System.Console.Concurrent
+import System.Console.Concurrent.Internal (ConcurrentProcessHandle(..))
import System.Process hiding (createProcess, waitForProcess)
import System.IO
import System.Exit
-- | Calls `createProcessConcurrent`
+--
+-- You should use the waitForProcess in this module on the resulting
+-- ProcessHandle. Using System.Process.waitForProcess instead can have
+-- mildly unexpected results.
createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-createProcess = createProcessConcurrent
+createProcess p = do
+ (i, o, e, ConcurrentProcessHandle h) <- createProcessConcurrent p
+ return (i, o, e, h)
-- | Calls `waitForProcessConcurrent`
+--
+-- You should only use this on a ProcessHandle obtained by calling
+-- createProcess from this module. Using this with a ProcessHandle
+-- obtained from System.Process.createProcess etc will have extremely
+-- unexpected results; it can wait a very long time before returning.
waitForProcess :: ProcessHandle -> IO ExitCode
-waitForProcess = waitForProcessConcurrent
+waitForProcess = waitForProcessConcurrent . ConcurrentProcessHandle