summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--debian/changelog1
-rw-r--r--debian/control2
-rw-r--r--propellor.cabal6
-rw-r--r--src/Propellor/Bootstrap.hs3
-rw-r--r--src/Utility/ConcurrentOutput.hs173
5 files changed, 104 insertions, 81 deletions
diff --git a/debian/changelog b/debian/changelog
index 6c154e1a..f3522b7c 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -19,6 +19,7 @@ propellor (2.13.0) UNRELEASED; urgency=medium
actions are combined (API change).
* Added Propellor.Property.Concurrent for concurrent properties.
* execProcess and everything built on it is now concurrent output safe.
+ * Propellor now depends on stm.
* Add File.isCopyOf. Thanks, Per Olofsson.
-- Joey Hess <id@joeyh.name> Sat, 24 Oct 2015 15:16:45 -0400
diff --git a/debian/control b/debian/control
index 7f42c916..2956fdaa 100644
--- a/debian/control
+++ b/debian/control
@@ -17,6 +17,7 @@ Build-Depends:
libghc-mtl-dev,
libghc-transformers-dev,
libghc-exceptions-dev (>= 0.6),
+ libghc-stm-dev,
Maintainer: Gergely Nagy <algernon@madhouse-project.org>
Standards-Version: 3.9.6
Vcs-Git: git://git.joeyh.name/propellor
@@ -39,6 +40,7 @@ Depends: ${misc:Depends}, ${shlibs:Depends},
libghc-mtl-dev,
libghc-transformers-dev,
libghc-exceptions-dev (>= 0.6),
+ libghc-stm-dev,
git,
make,
Description: property-based host configuration management in haskell
diff --git a/propellor.cabal b/propellor.cabal
index 20e82407..da43775f 100644
--- a/propellor.cabal
+++ b/propellor.cabal
@@ -39,7 +39,7 @@ Executable propellor
Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5,
IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal,
containers (>= 0.5), network, async, time, QuickCheck, mtl, transformers,
- exceptions (>= 0.6)
+ exceptions (>= 0.6), stm
if (! os(windows))
Build-Depends: unix
@@ -51,7 +51,7 @@ Executable propellor-config
Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5,
IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal,
containers (>= 0.5), network, async, time, QuickCheck, mtl, transformers,
- exceptions
+ exceptions, stm
if (! os(windows))
Build-Depends: unix
@@ -62,7 +62,7 @@ Library
Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5,
IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal,
containers (>= 0.5), network, async, time, QuickCheck, mtl, transformers,
- exceptions
+ exceptions, stm
if (! os(windows))
Build-Depends: unix
diff --git a/src/Propellor/Bootstrap.hs b/src/Propellor/Bootstrap.hs
index 6a5d5acb..2318b910 100644
--- a/src/Propellor/Bootstrap.hs
+++ b/src/Propellor/Bootstrap.hs
@@ -65,7 +65,7 @@ depsCommand = "( " ++ intercalate " ; " (concat [osinstall, cabalinstall]) ++ "
aptinstall p = "apt-get --no-upgrade --no-install-recommends -y install " ++ p
- -- This is the same build deps listed in debian/control.
+ -- This is the same deps listed in debian/control.
debdeps =
[ "gnupg"
, "ghc"
@@ -81,6 +81,7 @@ depsCommand = "( " ++ intercalate " ; " (concat [osinstall, cabalinstall]) ++ "
, "libghc-mtl-dev"
, "libghc-transformers-dev"
, "libghc-exceptions-dev"
+ , "libghc-stm-dev"
, "make"
]
diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs
index c6550b84..5535066f 100644
--- a/src/Utility/ConcurrentOutput.hs
+++ b/src/Utility/ConcurrentOutput.hs
@@ -1,3 +1,5 @@
+{-# LANGUAGE BangPatterns #-}
+
-- | Concurrent output handling.
module Utility.ConcurrentOutput (
@@ -14,6 +16,7 @@ import Control.Monad.IO.Class (liftIO, MonadIO)
import Control.Applicative
import System.IO.Unsafe (unsafePerformIO)
import Control.Concurrent
+import Control.Concurrent.STM
import Control.Concurrent.Async
import Data.Maybe
import Data.List
@@ -25,21 +28,23 @@ import Utility.Monad
import Utility.Exception
data OutputHandle = OutputHandle
- { outputLock :: MVar () -- ^ empty when locked
- , outputLockedBy :: MVar Locker
+ { outputLock :: TMVar (Maybe Locker)
}
data Locker
= GeneralLock
- | ProcessLock P.ProcessHandle
+ | ProcessLock P.ProcessHandle String
+
+instance Show Locker where
+ show GeneralLock = "GeneralLock"
+ show (ProcessLock _ cmd) = "ProcessLock " ++ cmd
-- | A shared global variable for the OutputHandle.
{-# NOINLINE globalOutputHandle #-}
globalOutputHandle :: MVar OutputHandle
globalOutputHandle = unsafePerformIO $
newMVar =<< OutputHandle
- <$> newMVar ()
- <*> newEmptyMVar
+ <$> newTMVarIO Nothing
-- | Gets the global OutputHandle.
getOutputHandle :: IO OutputHandle
@@ -58,61 +63,69 @@ takeOutputLock = void $ takeOutputLock' True
tryTakeOutputLock :: IO Bool
tryTakeOutputLock = takeOutputLock' False
-takeOutputLock' :: Bool -> IO Bool
-takeOutputLock' block = do
+withLock :: (TMVar (Maybe Locker) -> STM a) -> IO a
+withLock a = do
lck <- outputLock <$> getOutputHandle
- go =<< tryTakeMVar lck
+ atomically (a lck)
+
+-- The lock TMVar is kept full normally, even if only with Nothing,
+-- so if we take it here, that blocks anyone else from trying
+-- to take the lock while we are checking it.
+takeOutputLock' :: Bool -> IO Bool
+takeOutputLock' block = go =<< withLock tryTakeTMVar
where
- -- lck was full, and we've emptied it, so we hold the lock now.
- go (Just ()) = havelock
- -- lck is empty, so someone else is holding the lock.
- go Nothing = do
- lcker <- outputLockedBy <$> getOutputHandle
- v' <- tryTakeMVar lcker
- case v' of
- Just (ProcessLock h) ->
- -- if process has exited, lock is stale
- ifM (isJust <$> P.getProcessExitCode h)
- ( havelock
- , if block
- then do
- void $ P.waitForProcess h
- havelock
- else do
- putMVar lcker (ProcessLock h)
- return False
- )
- Just GeneralLock -> do
- putMVar lcker GeneralLock
- whenblock waitlock
- Nothing -> whenblock waitlock
+ go Nothing = whenblock waitlock
+ -- Something has the lock. It may be stale, so check it.
+ -- We must always be sure to fill the TMVar back with Just or Nothing.
+ go (Just orig) = case orig of
+ Nothing -> havelock
+ (Just (ProcessLock h _)) ->
+ -- when process has exited, lock is stale
+ ifM (isJust <$> P.getProcessExitCode h)
+ ( havelock
+ , if block
+ then do
+ hPutStr stderr "WAITFORPROCESS in lock"
+ hFlush stderr
+ void $ P.waitForProcess h
+ hPutStr stderr "WAITFORPROCESS in lock done"
+ hFlush stderr
+ havelock
+ else do
+ withLock (`putTMVar` orig)
+ return False
+ )
+ (Just GeneralLock) -> do
+ withLock (`putTMVar` orig)
+ whenblock waitlock
havelock = do
- updateOutputLocker GeneralLock
+ withLock (`putTMVar` Just GeneralLock)
return True
- waitlock = do
- -- Wait for current lock holder to relinquish
- -- it and take the lock.
- lck <- outputLock <$> getOutputHandle
- takeMVar lck
- havelock
+
+ -- Wait for current lock holder (if any) to relinquish
+ -- it and take the lock for ourselves.
+ waitlock = withLock $ \l -> do
+ v <- tryTakeTMVar l
+ case v of
+ Just (Just _) -> retry
+ _ -> do
+ putTMVar l (Just GeneralLock)
+ return True
+
whenblock a = if block then a else return False
-- | Only safe to call after taking the output lock.
dropOutputLock :: IO ()
-dropOutputLock = do
- lcker <- outputLockedBy <$> getOutputHandle
- lck <- outputLock <$> getOutputHandle
- void $ takeMVar lcker
- putMVar lck ()
+dropOutputLock = withLock $ \l -> do
+ void $ takeTMVar l
+ putTMVar l Nothing
-- | Only safe to call after takeOutputLock; updates the Locker.
updateOutputLocker :: Locker -> IO ()
-updateOutputLocker l = do
- lcker <- outputLockedBy <$> getOutputHandle
- void $ tryTakeMVar lcker
- putMVar lcker l
- modifyMVar_ lcker (const $ return l)
+updateOutputLocker locker = withLock $ \l -> do
+ void $ takeTMVar l
+ putTMVar l (Just locker)
-- | Use this around any IO actions that use `outputConcurrent`
-- or `createProcessConcurrent`
@@ -124,7 +137,7 @@ withConcurrentOutput a = a `finally` drain
where
-- Just taking the output lock is enough to ensure that anything
-- that was buffering output has had a chance to flush its buffer.
- drain = lockOutput (return ())
+ drain = lockOutput noop
-- | Displays a string to stdout, and flush output so it's displayed.
--
@@ -158,28 +171,25 @@ outputConcurrent s = do
-- as the output lock becomes free.
createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
createProcessConcurrent p
- | willoutput (P.std_out p) || willoutput (P.std_err p) =
+ | willOutput (P.std_out p) || willOutput (P.std_err p) =
ifM tryTakeOutputLock
- ( do
- hPutStrLn stderr "IS NOT CONCURRENT"
- firstprocess
- , do
- hPutStrLn stderr "IS CONCURRENT"
- concurrentprocess
+ ( firstprocess
+ , concurrentprocess
)
| otherwise = P.createProcess p
where
- willoutput P.Inherit = True
- willoutput _ = False
+ rediroutput ss h
+ | willOutput ss = P.UseHandle h
+ | otherwise = ss
- rediroutput str h
- | willoutput str = P.UseHandle h
- | otherwise = str
+ cmd = case P.cmdspec p of
+ P.ShellCommand s -> s
+ P.RawCommand c ps -> unwords (c:ps)
firstprocess = do
r@(_, _, _, h) <- P.createProcess p
`onException` dropOutputLock
- updateOutputLocker (ProcessLock h)
+ updateOutputLocker (ProcessLock h cmd)
-- Output lock is still held as we return; the process
-- is running now, and once it exits the output lock will
-- be stale and can then be taken by something else.
@@ -196,8 +206,8 @@ createProcessConcurrent p
hClose toouth
hClose toerrh
buf <- newMVar []
- void $ async $ outputDrainer fromouth stdout buf
- void $ async $ outputDrainer fromerrh stderr buf
+ void $ async $ outputDrainer (P.std_out p) fromouth stdout buf
+ void $ async $ outputDrainer (P.std_err p) fromerrh stderr buf
void $ async $ bufferWriter buf
return r
@@ -205,6 +215,10 @@ createProcessConcurrent p
(from, to) <- createPipe
(,) <$> fdToHandle to <*> fdToHandle from
+willOutput :: P.StdStream -> Bool
+willOutput P.Inherit = True
+willOutput _ = False
+
type Buffer = [(Handle, BufferedActivity)]
data BufferedActivity
@@ -213,17 +227,22 @@ data BufferedActivity
| InTempFile FilePath
deriving (Eq)
--- Drain output from the handle, and buffer it in memory.
-outputDrainer :: Handle -> Handle -> MVar Buffer -> IO ()
-outputDrainer fromh toh buf = do
- v <- tryIO $ B.hGetSome fromh 1024
- case v of
- Right b | not (B.null b) -> do
- modifyMVar_ buf $ addBuffer (toh, Output b)
- outputDrainer fromh toh buf
- _ -> do
- modifyMVar_ buf $ pure . (++ [(toh, ReachedEnd)])
- hClose fromh
+-- Drain output from the handle, and buffer it.
+outputDrainer :: P.StdStream -> Handle -> Handle -> MVar Buffer -> IO ()
+outputDrainer ss fromh toh buf
+ | willOutput ss = go
+ | otherwise = atend
+ where
+ go = do
+ v <- tryIO $ B.hGetSome fromh 1024
+ case v of
+ Right b | not (B.null b) -> do
+ modifyMVar_ buf $ addBuffer (toh, Output b)
+ go
+ _ -> atend
+ atend = do
+ modifyMVar_ buf $ pure . (++ [(toh, ReachedEnd)])
+ hClose fromh
-- Wait to lock output, and once we can, display everything
-- that's put into buffer, until the end is signaled by Nothing
@@ -262,8 +281,8 @@ addBuffer (toh, Output b) buf
hClose h
return ((toh, InTempFile tmp) : other)
where
- b' = B.concat (mapMaybe getOutput this) <> b
- (this, other) = partition same buf
+ !b' = B.concat (mapMaybe getOutput this) <> b
+ !(this, other) = partition same buf
same v = fst v == toh && case snd v of
Output _ -> True
_ -> False