summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoey Hess2015-11-08 14:50:21 -0400
committerJoey Hess2015-11-08 14:50:21 -0400
commitd7e140aeae8a8ea47976ca1f3e29c4d0b00eacee (patch)
tree31aa4bbf775879dddb307f9d1c99ac84287ca909
parentf85b7d1bdc9019fd63c5037094f514a7c7ace8d2 (diff)
parentd50aa85052b1f35021072ea95bc51b5c46c797b0 (diff)
Merge branch 'joeyconfig'
-rw-r--r--propellor.cabal4
-rw-r--r--src/Propellor/CmdLine.hs5
-rw-r--r--src/Propellor/Gpg.hs22
-rw-r--r--src/Propellor/Message.hs2
-rw-r--r--src/Propellor/PrivData.hs6
-rw-r--r--src/Propellor/Property/Chroot.hs2
-rw-r--r--src/Propellor/Property/Docker.hs2
-rw-r--r--src/Propellor/Spin.hs22
-rw-r--r--src/System/Console/Concurrent.hs44
-rw-r--r--src/System/Console/Concurrent/Internal.hs538
-rw-r--r--src/System/Process/Concurrent.hs34
-rw-r--r--src/Utility/ConcurrentOutput.hs348
-rw-r--r--src/Utility/Process/Shim.hs10
13 files changed, 656 insertions, 383 deletions
diff --git a/propellor.cabal b/propellor.cabal
index 6e871d6b..ccba846f 100644
--- a/propellor.cabal
+++ b/propellor.cabal
@@ -161,7 +161,6 @@ Library
Propellor.Shim
Propellor.Property.Chroot.Util
Utility.Applicative
- Utility.ConcurrentOutput
Utility.Data
Utility.DataUnits
Utility.Directory
@@ -185,6 +184,9 @@ Library
Utility.Tmp
Utility.UserInfo
Utility.QuickCheck
+ System.Console.Concurrent
+ System.Console.Concurrent.Internal
+ System.Process.Concurrent
source-repository head
type: git
diff --git a/src/Propellor/CmdLine.hs b/src/Propellor/CmdLine.hs
index 4bca3986..4a4f71fe 100644
--- a/src/Propellor/CmdLine.hs
+++ b/src/Propellor/CmdLine.hs
@@ -120,8 +120,9 @@ defaultMain hostlist = withConcurrentOutput $ do
go False (Spin hs mrelay) = do
commitSpin
forM_ hs $ \hn -> withhost hn $ spin mrelay hn
- go False cmdline@(SimpleRun hn) = buildFirst cmdline $
- go False (Run hn)
+ go False cmdline@(SimpleRun hn) = do
+ forceConsole
+ buildFirst cmdline $ go False (Run hn)
go False (Run hn) = ifM ((==) 0 <$> getRealUserID)
( onlyprocess $ withhost hn mainProperties
, go True (Spin [hn] Nothing)
diff --git a/src/Propellor/Gpg.hs b/src/Propellor/Gpg.hs
index 60b0d52d..960c70d3 100644
--- a/src/Propellor/Gpg.hs
+++ b/src/Propellor/Gpg.hs
@@ -7,6 +7,8 @@ import System.Directory
import Data.Maybe
import Data.List.Utils
import Control.Monad
+import System.Console.Concurrent
+import System.Console.Concurrent.Internal (ConcurrentProcessHandle(..))
import Propellor.PrivData.Paths
import Propellor.Message
@@ -111,10 +113,7 @@ gitCommitKeyRing action = do
-- Commit explicitly the keyring and privdata files, as other
-- changes may be staged by the user and shouldn't be committed.
tocommit <- filterM doesFileExist [ privDataFile, keyring]
- gitCommit $ (map File tocommit) ++
- [ Param "-m"
- , Param ("propellor " ++ action)
- ]
+ gitCommit (Just ("propellor " ++ action)) (map File tocommit)
-- Adds --gpg-sign if there's a keyring.
gpgSignParams :: [CommandParam] -> IO [CommandParam]
@@ -124,10 +123,17 @@ gpgSignParams ps = ifM (doesFileExist keyring)
)
-- Automatically sign the commit if there'a a keyring.
-gitCommit :: [CommandParam] -> IO Bool
-gitCommit ps = do
- ps' <- gpgSignParams ps
- boolSystem "git" (Param "commit" : ps')
+gitCommit :: Maybe String -> [CommandParam] -> IO Bool
+gitCommit msg ps = do
+ let ps' = Param "commit" : ps ++
+ maybe [] (\m -> [Param "-m", Param m]) msg
+ ps'' <- gpgSignParams ps'
+ if isNothing msg
+ then do
+ (_, _, _, ConcurrentProcessHandle p) <- createProcessForeground $
+ proc "git" (toCommand ps'')
+ checkSuccessProcess p
+ else boolSystem "git" ps''
gpgDecrypt :: FilePath -> IO String
gpgDecrypt f = ifM (doesFileExist f)
diff --git a/src/Propellor/Message.hs b/src/Propellor/Message.hs
index 7df5104a..e964c664 100644
--- a/src/Propellor/Message.hs
+++ b/src/Propellor/Message.hs
@@ -25,9 +25,9 @@ import Control.Monad.IO.Class (liftIO, MonadIO)
import Control.Applicative
import System.IO.Unsafe (unsafePerformIO)
import Control.Concurrent
+import System.Console.Concurrent
import Propellor.Types
-import Utility.ConcurrentOutput
import Utility.PartialPrelude
import Utility.Monad
import Utility.Exception
diff --git a/src/Propellor/PrivData.hs b/src/Propellor/PrivData.hs
index e59f42c3..a1e34abc 100644
--- a/src/Propellor/PrivData.hs
+++ b/src/Propellor/PrivData.hs
@@ -36,6 +36,8 @@ import "mtl" Control.Monad.Reader
import qualified Data.Map as M
import qualified Data.Set as S
import qualified Data.ByteString.Lazy as L
+import System.Console.Concurrent
+import System.Console.Concurrent.Internal (ConcurrentProcessHandle(..))
import Propellor.Types
import Propellor.Types.PrivData
@@ -54,6 +56,7 @@ import Utility.FileMode
import Utility.Env
import Utility.Table
import Utility.FileSystemEncoding
+import Utility.Process
-- | Allows a Property to access the value of a specific PrivDataField,
-- for use in a specific Context or HostContext.
@@ -192,7 +195,8 @@ editPrivData field context = do
hClose th
maybe noop (\p -> writeFileProtected' f (`L.hPut` privDataByteString p)) v
editor <- getEnvDefault "EDITOR" "vi"
- unlessM (boolSystem editor [File f]) $
+ (_, _, _, ConcurrentProcessHandle p) <- createProcessForeground $ proc editor [f]
+ unlessM (checkSuccessProcess p) $
error "Editor failed; aborting."
PrivData <$> readFile f
setPrivDataTo field context v'
diff --git a/src/Propellor/Property/Chroot.hs b/src/Propellor/Property/Chroot.hs
index 0c00e8f4..8d1a2388 100644
--- a/src/Propellor/Property/Chroot.hs
+++ b/src/Propellor/Property/Chroot.hs
@@ -27,11 +27,11 @@ import qualified Propellor.Property.Systemd.Core as Systemd
import qualified Propellor.Property.File as File
import qualified Propellor.Shim as Shim
import Propellor.Property.Mount
-import Utility.ConcurrentOutput
import qualified Data.Map as M
import Data.List.Utils
import System.Posix.Directory
+import System.Console.Concurrent
-- | Specification of a chroot. Normally you'll use `debootstrapped` or
-- `bootstrapped` to construct a Chroot value.
diff --git a/src/Propellor/Property/Docker.hs b/src/Propellor/Property/Docker.hs
index f2dbaaf5..0cc8212b 100644
--- a/src/Propellor/Property/Docker.hs
+++ b/src/Propellor/Property/Docker.hs
@@ -56,7 +56,6 @@ import qualified Propellor.Property.Cmd as Cmd
import qualified Propellor.Shim as Shim
import Utility.Path
import Utility.ThreadScheduler
-import Utility.ConcurrentOutput
import Control.Concurrent.Async hiding (link)
import System.Posix.Directory
@@ -65,6 +64,7 @@ import Prelude hiding (init)
import Data.List hiding (init)
import Data.List.Utils
import qualified Data.Map as M
+import System.Console.Concurrent
installed :: Property NoInfo
installed = Apt.installed ["docker.io"]
diff --git a/src/Propellor/Spin.hs b/src/Propellor/Spin.hs
index 478d1517..ae7e7af5 100644
--- a/src/Propellor/Spin.hs
+++ b/src/Propellor/Spin.hs
@@ -29,12 +29,12 @@ import Propellor.Types.Info
import qualified Propellor.Shim as Shim
import Utility.FileMode
import Utility.SafeCommand
-import Utility.ConcurrentOutput
commitSpin :: IO ()
commitSpin = do
void $ actionMessage "Git commit" $
- gitCommit [Param "--allow-empty", Param "-a", Param "-m", Param spinCommitMessage]
+ gitCommit (Just spinCommitMessage)
+ [Param "--allow-empty", Param "-a"]
-- Push to central origin repo first, if possible.
-- The remote propellor will pull from there, which avoids
-- us needing to send stuff directly to the remote host.
@@ -61,10 +61,9 @@ spin' mprivdata relay target hst = do
updateServer target relay hst
(proc "ssh" $ cacheparams ++ [sshtarget, shellWrap probecmd])
(proc "ssh" $ cacheparams ++ [sshtarget, shellWrap updatecmd])
- getprivdata
+ =<< getprivdata
-- And now we can run it.
- flushConcurrentOutput
unlessM (boolSystem "ssh" (map Param $ cacheparams ++ ["-t", sshtarget, shellWrap runcmd])) $
error "remote propellor failed"
where
@@ -191,16 +190,16 @@ updateServer
-> Host
-> CreateProcess
-> CreateProcess
- -> IO PrivMap
+ -> PrivMap
-> IO ()
-updateServer target relay hst connect haveprecompiled getprivdata =
+updateServer target relay hst connect haveprecompiled privdata =
withIOHandles createProcessSuccess connect go
where
hn = fromMaybe target relay
go (toh, fromh) = do
let loop = go (toh, fromh)
- let restart = updateServer hn relay hst connect haveprecompiled getprivdata
+ let restart = updateServer hn relay hst connect haveprecompiled privdata
let done = return ()
v <- maybe Nothing readish <$> getMarked fromh statusMarker
case v of
@@ -208,7 +207,7 @@ updateServer target relay hst connect haveprecompiled getprivdata =
sendRepoUrl toh
loop
(Just NeedPrivData) -> do
- sendPrivData hn toh =<< getprivdata
+ sendPrivData hn toh privdata
loop
(Just NeedGitClone) -> do
hClose toh
@@ -219,7 +218,7 @@ updateServer target relay hst connect haveprecompiled getprivdata =
hClose toh
hClose fromh
sendPrecompiled hn
- updateServer hn relay hst haveprecompiled (error "loop") getprivdata
+ updateServer hn relay hst haveprecompiled (error "loop") privdata
(Just NeedGitPush) -> do
sendGitUpdate hn fromh toh
hClose fromh
@@ -338,8 +337,9 @@ mergeSpin = do
old_head <- getCurrentGitSha1 branch
old_commit <- findLastNonSpinCommit
rungit "reset" [Param old_commit]
- rungit "commit" [Param "-a", Param "--allow-empty"]
- rungit "merge" =<< gpgSignParams [Param "-s", Param "ours", Param old_head]
+ unlessM (gitCommit Nothing [Param "-a", Param "--allow-empty"]) $
+ error "git commit failed"
+ rungit "merge" =<< gpgSignParams [Param "-s", Param "ours", Param old_head, Param "--no-edit"]
current_commit <- getCurrentGitSha1 branch
rungit "update-ref" [Param branchref, Param current_commit]
rungit "checkout" [Param branch]
diff --git a/src/System/Console/Concurrent.hs b/src/System/Console/Concurrent.hs
new file mode 100644
index 00000000..12447637
--- /dev/null
+++ b/src/System/Console/Concurrent.hs
@@ -0,0 +1,44 @@
+-- |
+-- Copyright: 2015 Joey Hess <id@joeyh.name>
+-- License: BSD-2-clause
+--
+-- Concurrent output handling.
+--
+-- > import Control.Concurrent.Async
+-- > import System.Console.Concurrent
+-- >
+-- > main = withConcurrentOutput $
+-- > outputConcurrent "washed the car\n"
+-- > `concurrently`
+-- > outputConcurrent "walked the dog\n"
+-- > `concurrently`
+-- > createProcessConcurrent (proc "ls" [])
+
+{-# LANGUAGE CPP #-}
+
+module System.Console.Concurrent (
+ -- * Concurrent output
+ withConcurrentOutput,
+ Outputable(..),
+ outputConcurrent,
+ errorConcurrent,
+ ConcurrentProcessHandle,
+#ifndef mingw32_HOST_OS
+ createProcessConcurrent,
+#endif
+ waitForProcessConcurrent,
+ createProcessForeground,
+ flushConcurrentOutput,
+ lockOutput,
+ -- * Low level access to the output buffer
+ OutputBuffer,
+ StdHandle(..),
+ bufferOutputSTM,
+ outputBufferWaiterSTM,
+ waitAnyBuffer,
+ waitCompleteLines,
+ emitOutputBuffer,
+) where
+
+import System.Console.Concurrent.Internal
+
diff --git a/src/System/Console/Concurrent/Internal.hs b/src/System/Console/Concurrent/Internal.hs
new file mode 100644
index 00000000..f538a7de
--- /dev/null
+++ b/src/System/Console/Concurrent/Internal.hs
@@ -0,0 +1,538 @@
+{-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances, TupleSections #-}
+{-# LANGUAGE CPP #-}
+
+-- |
+-- Copyright: 2015 Joey Hess <id@joeyh.name>
+-- License: BSD-2-clause
+--
+-- Concurrent output handling, internals.
+--
+-- May change at any time.
+
+module System.Console.Concurrent.Internal where
+
+import System.IO
+#ifndef mingw32_HOST_OS
+import System.Posix.IO
+#endif
+import System.Directory
+import System.Exit
+import Control.Monad
+import Control.Monad.IO.Class (liftIO, MonadIO)
+import Control.Applicative
+import System.IO.Unsafe (unsafePerformIO)
+import Control.Concurrent
+import Control.Concurrent.STM
+import Control.Concurrent.Async
+import Data.Maybe
+import Data.List
+import Data.Monoid
+import qualified System.Process as P
+import qualified Data.Text as T
+import qualified Data.Text.IO as T
+
+import Utility.Monad
+import Utility.Exception
+
+data OutputHandle = OutputHandle
+ { outputLock :: TMVar Lock
+ , outputBuffer :: TMVar OutputBuffer
+ , errorBuffer :: TMVar OutputBuffer
+ , outputThreads :: TMVar Integer
+ , processWaiters :: TMVar [Async ()]
+ , waitForProcessLock :: TMVar ()
+ }
+
+data Lock = Locked
+
+-- | A shared global variable for the OutputHandle.
+{-# NOINLINE globalOutputHandle #-}
+globalOutputHandle :: OutputHandle
+globalOutputHandle = unsafePerformIO $ OutputHandle
+ <$> newEmptyTMVarIO
+ <*> newTMVarIO (OutputBuffer [])
+ <*> newTMVarIO (OutputBuffer [])
+ <*> newTMVarIO 0
+ <*> newTMVarIO []
+ <*> newEmptyTMVarIO
+
+-- | Holds a lock while performing an action. This allows the action to
+-- perform its own output to the console, without using functions from this
+-- module.
+--
+-- While this is running, other threads that try to lockOutput will block.
+-- Any calls to `outputConcurrent` and `createProcessConcurrent` will not
+-- block, but the output will be buffered and displayed only once the
+-- action is done.
+lockOutput :: (MonadIO m, MonadMask m) => m a -> m a
+lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock)
+
+-- | Blocks until we have the output lock.
+takeOutputLock :: IO ()
+takeOutputLock = void $ takeOutputLock' True
+
+-- | Tries to take the output lock, without blocking.
+tryTakeOutputLock :: IO Bool
+tryTakeOutputLock = takeOutputLock' False
+
+withLock :: (TMVar Lock -> STM a) -> IO a
+withLock a = atomically $ a (outputLock globalOutputHandle)
+
+takeOutputLock' :: Bool -> IO Bool
+takeOutputLock' block = do
+ locked <- withLock $ \l -> do
+ v <- tryTakeTMVar l
+ case v of
+ Just Locked
+ | block -> retry
+ | otherwise -> do
+ -- Restore value we took.
+ putTMVar l Locked
+ return False
+ Nothing -> do
+ putTMVar l Locked
+ return True
+ when locked $ do
+ (outbuf, errbuf) <- atomically $ (,)
+ <$> swapTMVar (outputBuffer globalOutputHandle) (OutputBuffer [])
+ <*> swapTMVar (errorBuffer globalOutputHandle) (OutputBuffer [])
+ emitOutputBuffer StdOut outbuf
+ emitOutputBuffer StdErr errbuf
+ return locked
+
+-- | Only safe to call after taking the output lock.
+dropOutputLock :: IO ()
+dropOutputLock = withLock $ void . takeTMVar
+
+-- | Use this around any actions that use `outputConcurrent`
+-- or `createProcessConcurrent`
+--
+-- This is necessary to ensure that buffered concurrent output actually
+-- gets displayed before the program exits.
+withConcurrentOutput :: (MonadIO m, MonadMask m) => m a -> m a
+withConcurrentOutput a = a `finally` liftIO flushConcurrentOutput
+
+-- | Blocks until any processes started by `createProcessConcurrent` have
+-- finished, and any buffered output is displayed.
+--
+-- `withConcurrentOutput` calls this at the end; you can call it anytime
+-- you want to flush output.
+flushConcurrentOutput :: IO ()
+flushConcurrentOutput = do
+ -- Wait for all outputThreads to finish.
+ let v = outputThreads globalOutputHandle
+ atomically $ do
+ r <- takeTMVar v
+ if r <= 0
+ then putTMVar v r
+ else retry
+ -- Take output lock to ensure that nothing else is currently
+ -- generating output, and flush any buffered output.
+ lockOutput $ return ()
+
+-- | Values that can be output.
+class Outputable v where
+ toOutput :: v -> T.Text
+
+instance Outputable T.Text where
+ toOutput = id
+
+instance Outputable String where
+ toOutput = toOutput . T.pack
+
+-- | Displays a value to stdout.
+--
+-- No newline is appended to the value, so if you want a newline, be sure
+-- to include it yourself.
+--
+-- Uses locking to ensure that the whole output occurs atomically
+-- even when other threads are concurrently generating output.
+--
+-- When something else is writing to the console at the same time, this does
+-- not block. It buffers the value, so it will be displayed once the other
+-- writer is done.
+outputConcurrent :: Outputable v => v -> IO ()
+outputConcurrent = outputConcurrent' StdOut
+
+-- | Like `outputConcurrent`, but displays to stderr.
+--
+-- (Does not throw an exception.)
+errorConcurrent :: Outputable v => v -> IO ()
+errorConcurrent = outputConcurrent' StdErr
+
+outputConcurrent' :: Outputable v => StdHandle -> v -> IO ()
+outputConcurrent' stdh v = bracket setup cleanup go
+ where
+ setup = tryTakeOutputLock
+ cleanup False = return ()
+ cleanup True = dropOutputLock
+ go True = do
+ T.hPutStr h (toOutput v)
+ hFlush h
+ go False = do
+ oldbuf <- atomically $ takeTMVar bv
+ newbuf <- addOutputBuffer (Output (toOutput v)) oldbuf
+ atomically $ putTMVar bv newbuf
+ h = toHandle stdh
+ bv = bufferFor stdh
+
+newtype ConcurrentProcessHandle = ConcurrentProcessHandle P.ProcessHandle
+
+toConcurrentProcessHandle :: (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -> (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
+toConcurrentProcessHandle (i, o, e, h) = (i, o, e, ConcurrentProcessHandle h)
+
+-- | Use this to wait for processes started with
+-- `createProcessConcurrent` and `createProcessForeground`, and get their
+-- exit status.
+--
+-- Note that such processes are actually automatically waited for
+-- internally, so not calling this explicitly will not result
+-- in zombie processes. This behavior differs from `P.waitForProcess`
+waitForProcessConcurrent :: ConcurrentProcessHandle -> IO ExitCode
+waitForProcessConcurrent (ConcurrentProcessHandle h) =
+ bracket lock unlock checkexit
+ where
+ lck = waitForProcessLock globalOutputHandle
+ lock = atomically $ tryPutTMVar lck ()
+ unlock True = atomically $ takeTMVar lck
+ unlock False = return ()
+ checkexit locked = maybe (waitsome locked) return
+ =<< P.getProcessExitCode h
+ waitsome True = do
+ let v = processWaiters globalOutputHandle
+ l <- atomically $ readTMVar v
+ if null l
+ -- Avoid waitAny [] which blocks forever
+ then P.waitForProcess h
+ else do
+ -- Wait for any of the running
+ -- processes to exit. It may or may not
+ -- be the one corresponding to the
+ -- ProcessHandle. If it is,
+ -- getProcessExitCode will succeed.
+ void $ tryIO $ waitAny l
+ checkexit True
+ waitsome False = do
+ -- Another thread took the lck first. Wait for that thread to
+ -- wait for one of the running processes to exit.
+ atomically $ do
+ putTMVar lck ()
+ takeTMVar lck
+ checkexit False
+
+-- Registers an action that waits for a process to exit,
+-- adding it to the processWaiters list, and removing it once the action
+-- completes.
+asyncProcessWaiter :: IO () -> IO ()
+asyncProcessWaiter waitaction = do
+ regdone <- newEmptyTMVarIO
+ waiter <- async $ do
+ self <- atomically (takeTMVar regdone)
+ waitaction `finally` unregister self
+ register waiter regdone
+ where
+ v = processWaiters globalOutputHandle
+ register waiter regdone = atomically $ do
+ l <- takeTMVar v
+ putTMVar v (waiter:l)
+ putTMVar regdone waiter
+ unregister waiter = atomically $ do
+ l <- takeTMVar v
+ putTMVar v (filter (/= waiter) l)
+
+-- | Wrapper around `System.Process.createProcess` that prevents
+-- multiple processes that are running concurrently from writing
+-- to stdout/stderr at the same time.
+--
+-- If the process does not output to stdout or stderr, it's run
+-- by createProcess entirely as usual. Only processes that can generate
+-- output are handled specially:
+--
+-- A process is allowed to write to stdout and stderr in the usual
+-- way, assuming it can successfully take the output lock.
+--
+-- When the output lock is held (ie, by another concurrent process,
+-- or because `outputConcurrent` is being called at the same time),
+-- the process is instead run with its stdout and stderr
+-- redirected to a buffer. The buffered output will be displayed as soon
+-- as the output lock becomes free.
+--
+-- Currently only available on Unix systems, not Windows.
+#ifndef mingw32_HOST_OS
+createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
+createProcessConcurrent p
+ | willOutput (P.std_out p) || willOutput (P.std_err p) =
+ ifM tryTakeOutputLock
+ ( fgProcess p
+ , bgProcess p
+ )
+ | otherwise = do
+ r@(_, _, _, h) <- P.createProcess p
+ asyncProcessWaiter $
+ void $ tryIO $ P.waitForProcess h
+ return (toConcurrentProcessHandle r)
+#endif
+
+-- | Wrapper around `System.Process.createProcess` that makes sure a process
+-- is run in the foreground, with direct access to stdout and stderr.
+-- Useful when eg, running an interactive process.
+createProcessForeground :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
+createProcessForeground p = do
+ takeOutputLock
+ fgProcess p
+
+fgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
+fgProcess p = do
+ r@(_, _, _, h) <- P.createProcess p
+ `onException` dropOutputLock
+ -- Wait for the process to exit and drop the lock.
+ asyncProcessWaiter $ do
+ void $ tryIO $ P.waitForProcess h
+ dropOutputLock
+ return (toConcurrentProcessHandle r)
+
+#ifndef mingw32_HOST_OS
+bgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
+bgProcess p = do
+ (toouth, fromouth) <- pipe
+ (toerrh, fromerrh) <- pipe
+ let p' = p
+ { P.std_out = rediroutput (P.std_out p) toouth
+ , P.std_err = rediroutput (P.std_err p) toerrh
+ }
+ registerOutputThread
+ r@(_, _, _, h) <- P.createProcess p'
+ `onException` unregisterOutputThread
+ asyncProcessWaiter $ void $ tryIO $ P.waitForProcess h
+ outbuf <- setupOutputBuffer StdOut toouth (P.std_out p) fromouth
+ errbuf <- setupOutputBuffer StdErr toerrh (P.std_err p) fromerrh
+ void $ async $ bufferWriter [outbuf, errbuf]
+ return (toConcurrentProcessHandle r)
+ where
+ pipe = do
+ (from, to) <- createPipe
+ (,) <$> fdToHandle to <*> fdToHandle from
+ rediroutput ss h
+ | willOutput ss = P.UseHandle h
+ | otherwise = ss
+#endif
+
+willOutput :: P.StdStream -> Bool
+willOutput P.Inherit = True
+willOutput _ = False
+
+-- | Buffered output.
+data OutputBuffer = OutputBuffer [OutputBufferedActivity]
+ deriving (Eq)
+
+data StdHandle = StdOut | StdErr
+
+toHandle :: StdHandle -> Handle
+toHandle StdOut = stdout
+toHandle StdErr = stderr
+
+bufferFor :: StdHandle -> TMVar OutputBuffer
+bufferFor StdOut = outputBuffer globalOutputHandle
+bufferFor StdErr = errorBuffer globalOutputHandle
+
+data OutputBufferedActivity
+ = Output T.Text
+ | InTempFile
+ { tempFile :: FilePath
+ , endsInNewLine :: Bool
+ }
+ deriving (Eq)
+
+data AtEnd = AtEnd
+ deriving Eq
+
+data BufSig = BufSig
+
+setupOutputBuffer :: StdHandle -> Handle -> P.StdStream -> Handle -> IO (StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd)
+setupOutputBuffer h toh ss fromh = do
+ hClose toh
+ buf <- newMVar (OutputBuffer [])
+ bufsig <- atomically newEmptyTMVar
+ bufend <- atomically newEmptyTMVar
+ void $ async $ outputDrainer ss fromh buf bufsig bufend
+ return (h, buf, bufsig, bufend)
+
+-- Drain output from the handle, and buffer it.
+outputDrainer :: P.StdStream -> Handle -> MVar OutputBuffer -> TMVar BufSig -> TMVar AtEnd -> IO ()
+outputDrainer ss fromh buf bufsig bufend
+ | willOutput ss = go
+ | otherwise = atend
+ where
+ go = do
+ t <- T.hGetChunk fromh
+ if T.null t
+ then atend
+ else do
+ modifyMVar_ buf $ addOutputBuffer (Output t)
+ changed
+ go
+ atend = do
+ atomically $ putTMVar bufend AtEnd
+ hClose fromh
+ changed = atomically $ do
+ void $ tryTakeTMVar bufsig
+ putTMVar bufsig BufSig
+
+registerOutputThread :: IO ()
+registerOutputThread = do
+ let v = outputThreads globalOutputHandle
+ atomically $ putTMVar v . succ =<< takeTMVar v
+
+unregisterOutputThread :: IO ()
+unregisterOutputThread = do
+ let v = outputThreads globalOutputHandle
+ atomically $ putTMVar v . pred =<< takeTMVar v
+
+-- Wait to lock output, and once we can, display everything
+-- that's put into the buffers, until the end.
+--
+-- If end is reached before lock is taken, instead add the command's
+-- buffers to the global outputBuffer and errorBuffer.
+bufferWriter :: [(StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd)] -> IO ()
+bufferWriter ts = do
+ activitysig <- atomically newEmptyTMVar
+ worker1 <- async $ lockOutput $
+ ifM (atomically $ tryPutTMVar activitysig ())
+ ( void $ mapConcurrently displaybuf ts
+ , noop -- buffers already moved to global
+ )
+ worker2 <- async $ void $ globalbuf activitysig
+ void $ async $ do
+ void $ waitCatch worker1
+ void $ waitCatch worker2
+ unregisterOutputThread
+ where
+ displaybuf v@(outh, buf, bufsig, bufend) = do
+ change <- atomically $
+ (Right <$> takeTMVar bufsig)
+ `orElse`
+ (Left <$> takeTMVar bufend)
+ l <- takeMVar buf
+ putMVar buf (OutputBuffer [])
+ emitOutputBuffer outh l
+ case change of
+ Right BufSig -> displaybuf v
+ Left AtEnd -> return ()
+ globalbuf activitysig = do
+ ok <- atomically $ do
+ -- signal we're going to handle it
+ -- (returns false if the displaybuf already did)
+ ok <- tryPutTMVar activitysig ()
+ -- wait for end of all buffers
+ when ok $
+ mapM_ (\(_outh, _buf, _bufsig, bufend) -> takeTMVar bufend) ts
+ return ok
+ when ok $ do
+ -- add all of the command's buffered output to the
+ -- global output buffer, atomically
+ bs <- forM ts $ \(outh, buf, _bufsig, _bufend) ->
+ (outh,) <$> takeMVar buf
+ atomically $
+ forM_ bs $ \(outh, b) ->
+ bufferOutputSTM' outh b
+
+-- Adds a value to the OutputBuffer. When adding Output to a Handle,
+-- it's cheaper to combine it with any already buffered Output to that
+-- same Handle.
+--
+-- When the total buffered Output exceeds 1 mb in size, it's moved out of
+-- memory, to a temp file. This should only happen rarely, but is done to
+-- avoid some verbose process unexpectedly causing excessive memory use.
+addOutputBuffer :: OutputBufferedActivity -> OutputBuffer -> IO OutputBuffer
+addOutputBuffer (Output t) (OutputBuffer buf)
+ | T.length t' <= 1048576 = return $ OutputBuffer (Output t' : other)
+ | otherwise = do
+ tmpdir <- getTemporaryDirectory
+ (tmp, h) <- openTempFile tmpdir "output.tmp"
+ let !endnl = endsNewLine t'
+ let i = InTempFile
+ { tempFile = tmp
+ , endsInNewLine = endnl
+ }
+ T.hPutStr h t'
+ hClose h
+ return $ OutputBuffer (i : other)
+ where
+ !t' = T.concat (mapMaybe getOutput this) <> t
+ !(this, other) = partition isOutput buf
+ isOutput v = case v of
+ Output _ -> True
+ _ -> False
+ getOutput v = case v of
+ Output t'' -> Just t''
+ _ -> Nothing
+addOutputBuffer v (OutputBuffer buf) = return $ OutputBuffer (v:buf)
+
+-- | Adds a value to the output buffer for later display.
+--
+-- Note that buffering large quantities of data this way will keep it
+-- resident in memory until it can be displayed. While `outputConcurrent`
+-- uses temp files if the buffer gets too big, this STM function cannot do
+-- so.
+bufferOutputSTM :: Outputable v => StdHandle -> v -> STM ()
+bufferOutputSTM h v = bufferOutputSTM' h (OutputBuffer [Output (toOutput v)])
+
+bufferOutputSTM' :: StdHandle -> OutputBuffer -> STM ()
+bufferOutputSTM' h (OutputBuffer newbuf) = do
+ (OutputBuffer buf) <- takeTMVar bv
+ putTMVar bv (OutputBuffer (newbuf ++ buf))
+ where
+ bv = bufferFor h
+
+-- | A STM action that waits for some buffered output to become
+-- available, and returns it.
+--
+-- The function can select a subset of output when only some is desired;
+-- the fst part is returned and the snd is left in the buffer.
+--
+-- This will prevent it from being displayed in the usual way, so you'll
+-- need to use `emitOutputBuffer` to display it yourself.
+outputBufferWaiterSTM :: (OutputBuffer -> (OutputBuffer, OutputBuffer)) -> STM (StdHandle, OutputBuffer)
+outputBufferWaiterSTM selector = waitgetbuf StdOut `orElse` waitgetbuf StdErr
+ where
+ waitgetbuf h = do
+ let bv = bufferFor h
+ (selected, rest) <- selector <$> takeTMVar bv
+ when (selected == OutputBuffer [])
+ retry
+ putTMVar bv rest
+ return (h, selected)
+
+waitAnyBuffer :: OutputBuffer -> (OutputBuffer, OutputBuffer)
+waitAnyBuffer b = (b, OutputBuffer [])
+
+-- | Use with `outputBufferWaiterSTM` to make it only return buffered
+-- output that ends with a newline. Anything buffered without a newline
+-- is left in the buffer.
+waitCompleteLines :: OutputBuffer -> (OutputBuffer, OutputBuffer)
+waitCompleteLines (OutputBuffer l) =
+ let (selected, rest) = span completeline l
+ in (OutputBuffer selected, OutputBuffer rest)
+ where
+ completeline (v@(InTempFile {})) = endsInNewLine v
+ completeline (Output b) = endsNewLine b
+
+endsNewLine :: T.Text -> Bool
+endsNewLine t = not (T.null t) && T.last t == '\n'
+
+-- | Emits the content of the OutputBuffer to the Handle
+--
+-- If you use this, you should use `lockOutput` to ensure you're the only
+-- thread writing to the console.
+emitOutputBuffer :: StdHandle -> OutputBuffer -> IO ()
+emitOutputBuffer stdh (OutputBuffer l) =
+ forM_ (reverse l) $ \ba -> case ba of
+ Output t -> emit t
+ InTempFile tmp _ -> do
+ emit =<< T.readFile tmp
+ void $ tryWhenExists $ removeFile tmp
+ where
+ outh = toHandle stdh
+ emit t = void $ tryIO $ do
+ T.hPutStr outh t
+ hFlush outh
diff --git a/src/System/Process/Concurrent.hs b/src/System/Process/Concurrent.hs
new file mode 100644
index 00000000..0e00e4fd
--- /dev/null
+++ b/src/System/Process/Concurrent.hs
@@ -0,0 +1,34 @@
+-- |
+-- Copyright: 2015 Joey Hess <id@joeyh.name>
+-- License: BSD-2-clause
+--
+-- The functions exported by this module are intended to be drop-in
+-- replacements for those from System.Process, when converting a whole
+-- program to use System.Console.Concurrent.
+
+module System.Process.Concurrent where
+
+import System.Console.Concurrent
+import System.Console.Concurrent.Internal (ConcurrentProcessHandle(..))
+import System.Process hiding (createProcess, waitForProcess)
+import System.IO
+import System.Exit
+
+-- | Calls `createProcessConcurrent`
+--
+-- You should use the waitForProcess in this module on the resulting
+-- ProcessHandle. Using System.Process.waitForProcess instead can have
+-- mildly unexpected results.
+createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
+createProcess p = do
+ (i, o, e, ConcurrentProcessHandle h) <- createProcessConcurrent p
+ return (i, o, e, h)
+
+-- | Calls `waitForProcessConcurrent`
+--
+-- You should only use this on a ProcessHandle obtained by calling
+-- createProcess from this module. Using this with a ProcessHandle
+-- obtained from System.Process.createProcess etc will have extremely
+-- unexpected results; it can wait a very long time before returning.
+waitForProcess :: ProcessHandle -> IO ExitCode
+waitForProcess = waitForProcessConcurrent . ConcurrentProcessHandle
diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs
deleted file mode 100644
index c24744a3..00000000
--- a/src/Utility/ConcurrentOutput.hs
+++ /dev/null
@@ -1,348 +0,0 @@
-{-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances #-}
-{-# OPTIONS_GHC -fno-warn-tabs #-}
-
--- |
--- Copyright: 2013 Joey Hess <id@joeyh.name>
--- License: BSD-2-clause
---
--- Concurrent output handling.
---
--- > import Control.Concurrent.Async
--- > import Control.Concurrent.Output
--- >
--- > main = withConcurrentOutput $
--- > outputConcurrent "washed the car\n"
--- > `concurrently`
--- > outputConcurrent "walked the dog\n"
--- > `concurrently`
--- > createProcessConcurrent (proc "ls" [])
-
-module Utility.ConcurrentOutput (
- withConcurrentOutput,
- flushConcurrentOutput,
- Outputable(..),
- outputConcurrent,
- createProcessConcurrent,
- waitForProcessConcurrent,
- lockOutput,
-) where
-
-import System.IO
-import System.Posix.IO
-import System.Directory
-import System.Exit
-import Control.Monad
-import Control.Monad.IO.Class (liftIO, MonadIO)
-import Control.Applicative
-import System.IO.Unsafe (unsafePerformIO)
-import Control.Concurrent
-import Control.Concurrent.STM
-import Control.Concurrent.Async
-import Data.Maybe
-import Data.List
-import Data.Monoid
-import qualified System.Process as P
-import qualified Data.Set as S
-import qualified Data.ByteString as B
-import qualified Data.Text as T
-import Data.Text.Encoding (encodeUtf8)
-
-import Utility.Monad
-import Utility.Exception
-
-data OutputHandle = OutputHandle
- { outputLock :: TMVar Lock
- , outputBuffer :: TMVar Buffer
- , outputThreads :: TMVar (S.Set (Async ()))
- }
-
-data Lock = Locked
-
--- | A shared global variable for the OutputHandle.
-{-# NOINLINE globalOutputHandle #-}
-globalOutputHandle :: MVar OutputHandle
-globalOutputHandle = unsafePerformIO $
- newMVar =<< OutputHandle
- <$> newEmptyTMVarIO
- <*> newTMVarIO []
- <*> newTMVarIO S.empty
-
--- | Gets the global OutputHandle.
-getOutputHandle :: IO OutputHandle
-getOutputHandle = readMVar globalOutputHandle
-
--- | Holds a lock while performing an action that will display output.
--- While this is running, other threads that try to lockOutput will block,
--- and calls to `outputConcurrent` and `createProcessConcurrent`
--- will result in that concurrent output being buffered and not
--- displayed until the action is done.
-lockOutput :: (MonadIO m, MonadMask m) => m a -> m a
-lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock)
-
--- | Blocks until we have the output lock.
-takeOutputLock :: IO ()
-takeOutputLock = void $ takeOutputLock' True
-
--- | Tries to take the output lock, without blocking.
-tryTakeOutputLock :: IO Bool
-tryTakeOutputLock = takeOutputLock' False
-
-withLock :: (TMVar Lock -> STM a) -> IO a
-withLock a = do
- lck <- outputLock <$> getOutputHandle
- atomically (a lck)
-
-takeOutputLock' :: Bool -> IO Bool
-takeOutputLock' block = do
- locked <- withLock $ \l -> do
- v <- tryTakeTMVar l
- case v of
- Just Locked
- | block -> retry
- | otherwise -> do
- -- Restore value we took.
- putTMVar l Locked
- return False
- Nothing -> do
- putTMVar l Locked
- return True
- when locked $ do
- bv <- outputBuffer <$> getOutputHandle
- buf <- atomically $ swapTMVar bv []
- emitBuffer stdout buf
- return locked
-
--- | Only safe to call after taking the output lock.
-dropOutputLock :: IO ()
-dropOutputLock = withLock $ void . takeTMVar
-
--- | Use this around any IO actions that use `outputConcurrent`
--- or `createProcessConcurrent`
---
--- This is necessary to ensure that buffered concurrent output actually
--- gets displayed before the program exits.
-withConcurrentOutput :: IO a -> IO a
-withConcurrentOutput a = a `finally` flushConcurrentOutput
-
--- | Blocks until any processes started by `createProcessConcurrent` have
--- finished, and any buffered output is displayed.
-flushConcurrentOutput :: IO ()
-flushConcurrentOutput = do
- -- Wait for all outputThreads to finish.
- v <- outputThreads <$> getOutputHandle
- atomically $ do
- r <- takeTMVar v
- if r == S.empty
- then putTMVar v r
- else retry
- -- Take output lock to ensure that nothing else is currently
- -- generating output, and flush any buffered output.
- lockOutput $ return ()
-
--- | Values that can be output.
-class Outputable v where
- toOutput :: v -> B.ByteString
-
-instance Outputable B.ByteString where
- toOutput = id
-
-instance Outputable T.Text where
- toOutput = encodeUtf8
-
-instance Outputable String where
- toOutput = toOutput . T.pack
-
--- | Displays a value to stdout, and flush output so it's displayed.
---
--- Uses locking to ensure that the whole output occurs atomically
--- even when other threads are concurrently generating output.
---
--- When something else is writing to the console at the same time, this does
--- not block. It buffers the value, so it will be displayed once the other
--- writer is done.
-outputConcurrent :: Outputable v => v -> IO ()
-outputConcurrent v = bracket setup cleanup go
- where
- setup = tryTakeOutputLock
- cleanup False = return ()
- cleanup True = dropOutputLock
- go True = do
- B.hPut stdout (toOutput v)
- hFlush stdout
- go False = do
- bv <- outputBuffer <$> getOutputHandle
- oldbuf <- atomically $ takeTMVar bv
- newbuf <- addBuffer (Output (toOutput v)) oldbuf
- atomically $ putTMVar bv newbuf
-
--- | This must be used to wait for processes started with
--- `createProcessConcurrent`.
---
--- This is necessary because `System.Process.waitForProcess` has a
--- race condition when two threads check the same process. If the race
--- is triggered, one thread will successfully wait, but the other
--- throws a DoesNotExist exception.
-waitForProcessConcurrent :: P.ProcessHandle -> IO ExitCode
-waitForProcessConcurrent h = do
- v <- tryWhenExists (P.waitForProcess h)
- case v of
- Just r -> return r
- Nothing -> maybe (waitForProcessConcurrent h) return =<< P.getProcessExitCode h
-
--- | Wrapper around `System.Process.createProcess` that prevents
--- multiple processes that are running concurrently from writing
--- to stdout/stderr at the same time.
---
--- If the process does not output to stdout or stderr, it's run
--- by createProcess entirely as usual. Only processes that can generate
--- output are handled specially:
---
--- A process is allowed to write to stdout and stderr in the usual
--- way, assuming it can successfully take the output lock.
---
--- When the output lock is held (by another concurrent process,
--- or because `outputConcurrent` is being called at the same time),
--- the process is instead run with its stdout and stderr
--- redirected to a buffer. The buffered output will be displayed as soon
--- as the output lock becomes free.
-createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
-createProcessConcurrent p
- | willOutput (P.std_out p) || willOutput (P.std_err p) =
- ifM tryTakeOutputLock
- ( firstprocess
- , concurrentprocess
- )
- | otherwise = P.createProcess p
- where
- rediroutput ss h
- | willOutput ss = P.UseHandle h
- | otherwise = ss
-
- firstprocess = do
- r@(_, _, _, h) <- P.createProcess p
- `onException` dropOutputLock
- -- Wait for the process to exit and drop the lock.
- void $ async $ do
- void $ tryIO $ waitForProcessConcurrent h
- dropOutputLock
- return r
-
- concurrentprocess = do
- (toouth, fromouth) <- pipe
- (toerrh, fromerrh) <- pipe
- let p' = p
- { P.std_out = rediroutput (P.std_out p) toouth
- , P.std_err = rediroutput (P.std_err p) toerrh
- }
- r <- P.createProcess p'
- outbuf <- setupBuffer stdout toouth (P.std_out p) fromouth
- errbuf <- setupBuffer stderr toerrh (P.std_err p) fromerrh
- void $ async $ bufferWriter [outbuf, errbuf]
- return r
-
- pipe = do
- (from, to) <- createPipe
- (,) <$> fdToHandle to <*> fdToHandle from
-
-willOutput :: P.StdStream -> Bool
-willOutput P.Inherit = True
-willOutput _ = False
-
--- Built up with newest seen output first.
-type Buffer = [BufferedActivity]
-
-data BufferedActivity
- = ReachedEnd
- | Output B.ByteString
- | InTempFile FilePath
- deriving (Eq)
-
-setupBuffer :: Handle -> Handle -> P.StdStream -> Handle -> IO (Handle, MVar Buffer, TMVar ())
-setupBuffer h toh ss fromh = do
- hClose toh
- buf <- newMVar []
- bufsig <- atomically newEmptyTMVar
- void $ async $ outputDrainer ss fromh buf bufsig
- return (h, buf, bufsig)
-
--- Drain output from the handle, and buffer it.
-outputDrainer :: P.StdStream -> Handle -> MVar Buffer -> TMVar () -> IO ()
-outputDrainer ss fromh buf bufsig
- | willOutput ss = go
- | otherwise = atend
- where
- go = do
- v <- tryIO $ B.hGetSome fromh 1048576
- case v of
- Right b | not (B.null b) -> do
- modifyMVar_ buf $ addBuffer (Output b)
- changed
- go
- _ -> atend
- atend = do
- modifyMVar_ buf $ pure . (ReachedEnd :)
- changed
- hClose fromh
- changed = atomically $ do
- void $ tryTakeTMVar bufsig
- putTMVar bufsig ()
-
--- Wait to lock output, and once we can, display everything
--- that's put into the buffers, until the end.
-bufferWriter :: [(Handle, MVar Buffer, TMVar ())] -> IO ()
-bufferWriter ts = do
- worker <- async $ void $ lockOutput $ mapConcurrently go ts
- v <- outputThreads <$> getOutputHandle
- atomically $ do
- s <- takeTMVar v
- putTMVar v (S.insert worker s)
- void $ async $ do
- void $ waitCatch worker
- atomically $ do
- s <- takeTMVar v
- putTMVar v (S.delete worker s)
- where
- go v@(outh, buf, bufsig) = do
- void $ atomically $ takeTMVar bufsig
- l <- takeMVar buf
- putMVar buf []
- emitBuffer outh l
- if any (== ReachedEnd) l
- then return ()
- else go v
-
-emitBuffer :: Handle -> Buffer -> IO ()
-emitBuffer outh l = forM_ (reverse l) $ \ba -> case ba of
- Output b -> do
- B.hPut outh b
- hFlush outh
- InTempFile tmp -> do
- B.hPut outh =<< B.readFile tmp
- void $ tryWhenExists $ removeFile tmp
- ReachedEnd -> return ()
-
--- Adds a value to the Buffer. When adding Output to a Handle, it's cheaper
--- to combine it with any already buffered Output to that same Handle.
---
--- When the total buffered Output exceeds 1 mb in size, it's moved out of
--- memory, to a temp file. This should only happen rarely, but is done to
--- avoid some verbose process unexpectedly causing excessive memory use.
-addBuffer :: BufferedActivity -> Buffer -> IO Buffer
-addBuffer (Output b) buf
- | B.length b' <= 1048576 = return (Output b' : other)
- | otherwise = do
- tmpdir <- getTemporaryDirectory
- (tmp, h) <- openTempFile tmpdir "output.tmp"
- B.hPut h b'
- hClose h
- return (InTempFile tmp : other)
- where
- !b' = B.concat (mapMaybe getOutput this) <> b
- !(this, other) = partition isOutput buf
- isOutput v = case v of
- Output _ -> True
- _ -> False
- getOutput v = case v of
- Output b'' -> Just b''
- _ -> Nothing
-addBuffer v buf = return (v:buf)
diff --git a/src/Utility/Process/Shim.hs b/src/Utility/Process/Shim.hs
index 08694d5d..8c9d41d0 100644
--- a/src/Utility/Process/Shim.hs
+++ b/src/Utility/Process/Shim.hs
@@ -1,12 +1,4 @@
module Utility.Process.Shim (module X, createProcess, waitForProcess) where
import System.Process as X hiding (createProcess, waitForProcess)
-import Utility.ConcurrentOutput (createProcessConcurrent, waitForProcessConcurrent)
-import System.IO
-import System.Exit
-
-createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-createProcess = createProcessConcurrent
-
-waitForProcess :: ProcessHandle -> IO ExitCode
-waitForProcess = waitForProcessConcurrent
+import System.Process.Concurrent