summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--propellor.cabal4
-rw-r--r--src/Propellor/Gpg.hs5
-rw-r--r--src/Propellor/Message.hs2
-rw-r--r--src/Propellor/PrivData.hs5
-rw-r--r--src/Propellor/Property/Chroot.hs2
-rw-r--r--src/Propellor/Property/Docker.hs2
-rw-r--r--src/System/Console/Concurrent.hs39
-rw-r--r--src/System/Console/Concurrent/Internal.hs (renamed from src/Utility/ConcurrentOutput.hs)178
-rw-r--r--src/System/Process/Concurrent.hs34
-rw-r--r--src/Utility/Process/Shim.hs10
10 files changed, 173 insertions, 108 deletions
diff --git a/propellor.cabal b/propellor.cabal
index 6e871d6b..ccba846f 100644
--- a/propellor.cabal
+++ b/propellor.cabal
@@ -161,7 +161,6 @@ Library
Propellor.Shim
Propellor.Property.Chroot.Util
Utility.Applicative
- Utility.ConcurrentOutput
Utility.Data
Utility.DataUnits
Utility.Directory
@@ -185,6 +184,9 @@ Library
Utility.Tmp
Utility.UserInfo
Utility.QuickCheck
+ System.Console.Concurrent
+ System.Console.Concurrent.Internal
+ System.Process.Concurrent
source-repository head
type: git
diff --git a/src/Propellor/Gpg.hs b/src/Propellor/Gpg.hs
index 9c58a5d1..960c70d3 100644
--- a/src/Propellor/Gpg.hs
+++ b/src/Propellor/Gpg.hs
@@ -7,6 +7,8 @@ import System.Directory
import Data.Maybe
import Data.List.Utils
import Control.Monad
+import System.Console.Concurrent
+import System.Console.Concurrent.Internal (ConcurrentProcessHandle(..))
import Propellor.PrivData.Paths
import Propellor.Message
@@ -16,7 +18,6 @@ import Utility.Monad
import Utility.Misc
import Utility.Tmp
import Utility.FileSystemEncoding
-import Utility.ConcurrentOutput
type KeyId = String
@@ -129,7 +130,7 @@ gitCommit msg ps = do
ps'' <- gpgSignParams ps'
if isNothing msg
then do
- (_, _, _, p) <- createProcessForeground $
+ (_, _, _, ConcurrentProcessHandle p) <- createProcessForeground $
proc "git" (toCommand ps'')
checkSuccessProcess p
else boolSystem "git" ps''
diff --git a/src/Propellor/Message.hs b/src/Propellor/Message.hs
index 7df5104a..e964c664 100644
--- a/src/Propellor/Message.hs
+++ b/src/Propellor/Message.hs
@@ -25,9 +25,9 @@ import Control.Monad.IO.Class (liftIO, MonadIO)
import Control.Applicative
import System.IO.Unsafe (unsafePerformIO)
import Control.Concurrent
+import System.Console.Concurrent
import Propellor.Types
-import Utility.ConcurrentOutput
import Utility.PartialPrelude
import Utility.Monad
import Utility.Exception
diff --git a/src/Propellor/PrivData.hs b/src/Propellor/PrivData.hs
index 6b77f782..a1e34abc 100644
--- a/src/Propellor/PrivData.hs
+++ b/src/Propellor/PrivData.hs
@@ -36,6 +36,8 @@ import "mtl" Control.Monad.Reader
import qualified Data.Map as M
import qualified Data.Set as S
import qualified Data.ByteString.Lazy as L
+import System.Console.Concurrent
+import System.Console.Concurrent.Internal (ConcurrentProcessHandle(..))
import Propellor.Types
import Propellor.Types.PrivData
@@ -54,7 +56,6 @@ import Utility.FileMode
import Utility.Env
import Utility.Table
import Utility.FileSystemEncoding
-import Utility.ConcurrentOutput
import Utility.Process
-- | Allows a Property to access the value of a specific PrivDataField,
@@ -194,7 +195,7 @@ editPrivData field context = do
hClose th
maybe noop (\p -> writeFileProtected' f (`L.hPut` privDataByteString p)) v
editor <- getEnvDefault "EDITOR" "vi"
- (_, _, _, p) <- createProcessForeground $ proc editor [f]
+ (_, _, _, ConcurrentProcessHandle p) <- createProcessForeground $ proc editor [f]
unlessM (checkSuccessProcess p) $
error "Editor failed; aborting."
PrivData <$> readFile f
diff --git a/src/Propellor/Property/Chroot.hs b/src/Propellor/Property/Chroot.hs
index 0c00e8f4..8d1a2388 100644
--- a/src/Propellor/Property/Chroot.hs
+++ b/src/Propellor/Property/Chroot.hs
@@ -27,11 +27,11 @@ import qualified Propellor.Property.Systemd.Core as Systemd
import qualified Propellor.Property.File as File
import qualified Propellor.Shim as Shim
import Propellor.Property.Mount
-import Utility.ConcurrentOutput
import qualified Data.Map as M
import Data.List.Utils
import System.Posix.Directory
+import System.Console.Concurrent
-- | Specification of a chroot. Normally you'll use `debootstrapped` or
-- `bootstrapped` to construct a Chroot value.
diff --git a/src/Propellor/Property/Docker.hs b/src/Propellor/Property/Docker.hs
index f2dbaaf5..0cc8212b 100644
--- a/src/Propellor/Property/Docker.hs
+++ b/src/Propellor/Property/Docker.hs
@@ -56,7 +56,6 @@ import qualified Propellor.Property.Cmd as Cmd
import qualified Propellor.Shim as Shim
import Utility.Path
import Utility.ThreadScheduler
-import Utility.ConcurrentOutput
import Control.Concurrent.Async hiding (link)
import System.Posix.Directory
@@ -65,6 +64,7 @@ import Prelude hiding (init)
import Data.List hiding (init)
import Data.List.Utils
import qualified Data.Map as M
+import System.Console.Concurrent
installed :: Property NoInfo
installed = Apt.installed ["docker.io"]
diff --git a/src/System/Console/Concurrent.hs b/src/System/Console/Concurrent.hs
new file mode 100644
index 00000000..efbfaa15
--- /dev/null
+++ b/src/System/Console/Concurrent.hs
@@ -0,0 +1,39 @@
+-- |
+-- Copyright: 2015 Joey Hess <id@joeyh.name>
+-- License: BSD-2-clause
+--
+-- Concurrent output handling.
+--
+-- > import Control.Concurrent.Async
+-- > import System.Console.Concurrent
+-- >
+-- > main = withConcurrentOutput $
+-- > outputConcurrent "washed the car\n"
+-- > `concurrently`
+-- > outputConcurrent "walked the dog\n"
+-- > `concurrently`
+-- > createProcessConcurrent (proc "ls" [])
+
+module System.Console.Concurrent (
+ -- * Concurrent output
+ withConcurrentOutput,
+ Outputable(..),
+ outputConcurrent,
+ ConcurrentProcessHandle,
+ createProcessConcurrent,
+ waitForProcessConcurrent,
+ createProcessForeground,
+ flushConcurrentOutput,
+ lockOutput,
+ -- * Low level access to the output buffer
+ OutputBuffer,
+ StdHandle(..),
+ bufferOutputSTM,
+ outputBufferWaiterSTM,
+ waitAnyBuffer,
+ waitCompleteLines,
+ emitOutputBuffer,
+) where
+
+import System.Console.Concurrent.Internal
+
diff --git a/src/Utility/ConcurrentOutput.hs b/src/System/Console/Concurrent/Internal.hs
index ca1ae7c5..caef9833 100644
--- a/src/Utility/ConcurrentOutput.hs
+++ b/src/System/Console/Concurrent/Internal.hs
@@ -1,40 +1,14 @@
{-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances, TupleSections #-}
-- |
--- Copyright: 2013 Joey Hess <id@joeyh.name>
+-- Copyright: 2015 Joey Hess <id@joeyh.name>
-- License: BSD-2-clause
--
--- Concurrent output handling.
+-- Concurrent output handling, internals.
--
--- > import Control.Concurrent.Async
--- > import System.Console.Concurrent
--- >
--- > main = withConcurrentOutput $
--- > outputConcurrent "washed the car\n"
--- > `concurrently`
--- > outputConcurrent "walked the dog\n"
--- > `concurrently`
--- > createProcessConcurrent (proc "ls" [])
-
-module Utility.ConcurrentOutput (
- -- * Concurrent output
- withConcurrentOutput,
- Outputable(..),
- outputConcurrent,
- createProcessConcurrent,
- waitForProcessConcurrent,
- createProcessForeground,
- flushConcurrentOutput,
- lockOutput,
- -- * Low level access to the output buffer
- OutputBuffer,
- StdHandle(..),
- bufferOutputSTM,
- outputBufferWaiterSTM,
- waitAnyBuffer,
- waitCompleteLines,
- emitOutputBuffer,
-) where
+-- May change at any time.
+
+module System.Console.Concurrent.Internal where
import System.IO
import System.Posix.IO
@@ -62,6 +36,8 @@ data OutputHandle = OutputHandle
, outputBuffer :: TMVar OutputBuffer
, errorBuffer :: TMVar OutputBuffer
, outputThreads :: TMVar Integer
+ , processWaiters :: TMVar [Async ()]
+ , waitForProcessLock :: TMVar ()
}
data Lock = Locked
@@ -74,6 +50,8 @@ globalOutputHandle = unsafePerformIO $ OutputHandle
<*> newTMVarIO (OutputBuffer [])
<*> newTMVarIO (OutputBuffer [])
<*> newTMVarIO 0
+ <*> newTMVarIO []
+ <*> newEmptyTMVarIO
-- | Holds a lock while performing an action. This allows the action to
-- perform its own output to the console, without using functions from this
@@ -185,20 +163,69 @@ outputConcurrent v = bracket setup cleanup go
newbuf <- addOutputBuffer (Output (toOutput v)) oldbuf
atomically $ putTMVar bv newbuf
--- | This must be used to wait for processes started with
--- `createProcessConcurrent` and `createProcessForeground`. It may also be
--- used to wait for processes started by `System.Process.createProcess`.
+newtype ConcurrentProcessHandle = ConcurrentProcessHandle P.ProcessHandle
+
+toConcurrentProcessHandle :: (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -> (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
+toConcurrentProcessHandle (i, o, e, h) = (i, o, e, ConcurrentProcessHandle h)
+
+-- | Use this to wait for processes started with
+-- `createProcessConcurrent` and `createProcessForeground`, and get their
+-- exit status.
--
--- This is necessary because `System.Process.waitForProcess` has a
--- race condition when two threads check the same process. If the race
--- is triggered, one thread will successfully wait, but the other
--- throws a DoesNotExist exception.
-waitForProcessConcurrent :: P.ProcessHandle -> IO ExitCode
-waitForProcessConcurrent h = do
- v <- tryWhenExists (P.waitForProcess h)
- case v of
- Just r -> return r
- Nothing -> maybe (waitForProcessConcurrent h) return =<< P.getProcessExitCode h
+-- Note that such processes are actually automatically waited for
+-- internally, so not calling this exiplictly will not result
+-- in zombie processes. This behavior differs from `P.waitForProcess`
+waitForProcessConcurrent :: ConcurrentProcessHandle -> IO ExitCode
+waitForProcessConcurrent (ConcurrentProcessHandle h) = checkexit
+ where
+ checkexit = maybe waitsome return =<< P.getProcessExitCode h
+ waitsome = maybe checkexit return =<< bracket lock unlock go
+ lck = waitForProcessLock globalOutputHandle
+ lock = atomically $ tryPutTMVar lck ()
+ unlock True = atomically $ takeTMVar lck
+ unlock False = return ()
+ go True = do
+ let v = processWaiters globalOutputHandle
+ l <- atomically $ readTMVar v
+ if null l
+ -- Avoid waitAny [] which blocks forever;
+ then Just <$> P.waitForProcess h
+ else do
+ -- Wait for any of the running
+ -- processes to exit. It may or may not
+ -- be the one corresponding to the
+ -- ProcessHandle. If it is,
+ -- getProcessExitCode will succeed.
+ void $ tryIO $ waitAny l
+ hFlush stdout
+ return Nothing
+ go False = do
+ -- Another thread took the lck first. Wait for that thread to
+ -- wait for one of the running processes to exit.
+ atomically $ do
+ putTMVar lck ()
+ takeTMVar lck
+ return Nothing
+
+-- Registers an action that waits for a process to exit,
+-- adding it to the processWaiters list, and removing it once the action
+-- completes.
+asyncProcessWaiter :: IO () -> IO ()
+asyncProcessWaiter waitaction = do
+ regdone <- newEmptyTMVarIO
+ waiter <- async $ do
+ self <- atomically (takeTMVar regdone)
+ waitaction `finally` unregister self
+ register waiter regdone
+ where
+ v = processWaiters globalOutputHandle
+ register waiter regdone = atomically $ do
+ l <- takeTMVar v
+ putTMVar v (waiter:l)
+ putTMVar regdone waiter
+ unregister waiter = atomically $ do
+ l <- takeTMVar v
+ putTMVar v (filter (/= waiter) l)
-- | Wrapper around `System.Process.createProcess` that prevents
-- multiple processes that are running concurrently from writing
@@ -215,71 +242,39 @@ waitForProcessConcurrent h = do
-- or because `outputConcurrent` is being called at the same time),
-- the process is instead run with its stdout and stderr
-- redirected to a buffer. The buffered output will be displayed as soon
--- as the output lock becomes free, or after the command has finished.
-createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
+-- as the output lock becomes free.
+createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
createProcessConcurrent p
| willOutput (P.std_out p) || willOutput (P.std_err p) =
ifM tryTakeOutputLock
( fgProcess p
, bgProcess p
)
- | otherwise = P.createProcess p
+ | otherwise = do
+ r@(_, _, _, h) <- P.createProcess p
+ asyncProcessWaiter $ do
+ void $ P.waitForProcess h
+ return (toConcurrentProcessHandle r)
-- | Wrapper around `System.Process.createProcess` that makes sure a process
-- is run in the foreground, with direct access to stdout and stderr.
-- Useful when eg, running an interactive process.
---
--- If another process is already running in the foreground, this will block
--- until it finishes. Background processes may continue to run while
--- this process is running, and their output will be buffered until it
--- exits.
---
--- The obvious reason you might need to use this is in an example like this:
---
--- > main = withConcurrentOutput $
--- > createProcessConcurrent (proc "ls" [])
--- > `concurrently` createProcessForeground (proc "vim" [])
---
--- Since vim is an interactive program, it needs to run in the foreground.
--- If it were started by `createProcessConcurrent`, it would sometimes
--- run in the background.
---
--- Also, there is actually a race condition when calling
--- `createProcessConcurrent` sequentially like this:
---
--- > main = withConcurrentOutput $ do
--- > (Nothing, Nothing, Nothing, h) <- createProcessConcurrent (proc "ls" [])
--- > waitForProcessConcurrent h
--- > createProcessConcurrent (proc "vim" [])
---
--- Here vim runs about 50% of the time as a background process! Why is
--- it not always run in the foreground? The reason is that the previous
--- process was run in the foreground, and still holds the output lock.
--- `waitForProcessConcurrent` waits for that process, but does not clear
--- the output lock immediately. By the time the output lock does clear,
--- the vim process may have already started up, in the background.
---
--- It would be nice to fix that race, but it can't be fixed without
--- an Eq instance for `ProcessHandle`. In any case, when you're using
--- this module, you're typically actually doing concurrent things,
--- not sequential as in the example above, and so even if the race were
--- fixed, you'd still want to use `createProcessForeground` to run vim.
-createProcessForeground :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
+createProcessForeground :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
createProcessForeground p = do
takeOutputLock
fgProcess p
-fgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
+fgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
fgProcess p = do
r@(_, _, _, h) <- P.createProcess p
`onException` dropOutputLock
-- Wait for the process to exit and drop the lock.
- void $ async $ do
- void $ tryIO $ waitForProcessConcurrent h
+ asyncProcessWaiter $ do
+ void $ P.waitForProcess h
dropOutputLock
- return r
+ return (toConcurrentProcessHandle r)
-bgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
+bgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
bgProcess p = do
(toouth, fromouth) <- pipe
(toerrh, fromerrh) <- pipe
@@ -288,12 +283,13 @@ bgProcess p = do
, P.std_err = rediroutput (P.std_err p) toerrh
}
registerOutputThread
- r <- P.createProcess p'
+ r@(_, _, _, h) <- P.createProcess p'
`onException` unregisterOutputThread
+ asyncProcessWaiter $ void $ P.waitForProcess h
outbuf <- setupOutputBuffer StdOut toouth (P.std_out p) fromouth
errbuf <- setupOutputBuffer StdErr toerrh (P.std_err p) fromerrh
void $ async $ bufferWriter [outbuf, errbuf]
- return r
+ return (toConcurrentProcessHandle r)
where
pipe = do
(from, to) <- createPipe
diff --git a/src/System/Process/Concurrent.hs b/src/System/Process/Concurrent.hs
new file mode 100644
index 00000000..0e00e4fd
--- /dev/null
+++ b/src/System/Process/Concurrent.hs
@@ -0,0 +1,34 @@
+-- |
+-- Copyright: 2015 Joey Hess <id@joeyh.name>
+-- License: BSD-2-clause
+--
+-- The functions exported by this module are intended to be drop-in
+-- replacements for those from System.Process, when converting a whole
+-- program to use System.Console.Concurrent.
+
+module System.Process.Concurrent where
+
+import System.Console.Concurrent
+import System.Console.Concurrent.Internal (ConcurrentProcessHandle(..))
+import System.Process hiding (createProcess, waitForProcess)
+import System.IO
+import System.Exit
+
+-- | Calls `createProcessConcurrent`
+--
+-- You should use the waitForProcess in this module on the resulting
+-- ProcessHandle. Using System.Process.waitForProcess instead can have
+-- mildly unexpected results.
+createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
+createProcess p = do
+ (i, o, e, ConcurrentProcessHandle h) <- createProcessConcurrent p
+ return (i, o, e, h)
+
+-- | Calls `waitForProcessConcurrent`
+--
+-- You should only use this on a ProcessHandle obtained by calling
+-- createProcess from this module. Using this with a ProcessHandle
+-- obtained from System.Process.createProcess etc will have extremely
+-- unexpected results; it can wait a very long time before returning.
+waitForProcess :: ProcessHandle -> IO ExitCode
+waitForProcess = waitForProcessConcurrent . ConcurrentProcessHandle
diff --git a/src/Utility/Process/Shim.hs b/src/Utility/Process/Shim.hs
index 08694d5d..8c9d41d0 100644
--- a/src/Utility/Process/Shim.hs
+++ b/src/Utility/Process/Shim.hs
@@ -1,12 +1,4 @@
module Utility.Process.Shim (module X, createProcess, waitForProcess) where
import System.Process as X hiding (createProcess, waitForProcess)
-import Utility.ConcurrentOutput (createProcessConcurrent, waitForProcessConcurrent)
-import System.IO
-import System.Exit
-
-createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-createProcess = createProcessConcurrent
-
-waitForProcess :: ProcessHandle -> IO ExitCode
-waitForProcess = waitForProcessConcurrent
+import System.Process.Concurrent