summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoey Hess2015-10-28 14:46:17 -0400
committerJoey Hess2015-10-28 14:46:17 -0400
commit111ea88d4d7c54e9ab7950962ad22528d54dd959 (patch)
tree69053673ab24be9eb61f0ce87c23d2d84b108ae7
parent68dbfe1b08c9cf1d976ac84ea53817c54fcd3479 (diff)
fix bad MVar use, use STM
I had 2 MVars both involved in the same lock, and it seemed intractable to avoid deadlocks with them. STM makes it easy. At this point, the concurrent process stuff seems to work pretty well, but I'm not 100% sure it's not got some bugs.
-rw-r--r--debian/changelog1
-rw-r--r--debian/control2
-rw-r--r--propellor.cabal6
-rw-r--r--src/Propellor/Bootstrap.hs3
-rw-r--r--src/Utility/ConcurrentOutput.hs173
5 files changed, 104 insertions, 81 deletions
diff --git a/debian/changelog b/debian/changelog
index 6c154e1a..f3522b7c 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -19,6 +19,7 @@ propellor (2.13.0) UNRELEASED; urgency=medium
actions are combined (API change).
* Added Propellor.Property.Concurrent for concurrent properties.
* execProcess and everything built on it is now concurrent output safe.
+ * Propellor now depends on stm.
* Add File.isCopyOf. Thanks, Per Olofsson.
-- Joey Hess <id@joeyh.name> Sat, 24 Oct 2015 15:16:45 -0400
diff --git a/debian/control b/debian/control
index 7f42c916..2956fdaa 100644
--- a/debian/control
+++ b/debian/control
@@ -17,6 +17,7 @@ Build-Depends:
libghc-mtl-dev,
libghc-transformers-dev,
libghc-exceptions-dev (>= 0.6),
+ libghc-stm-dev,
Maintainer: Gergely Nagy <algernon@madhouse-project.org>
Standards-Version: 3.9.6
Vcs-Git: git://git.joeyh.name/propellor
@@ -39,6 +40,7 @@ Depends: ${misc:Depends}, ${shlibs:Depends},
libghc-mtl-dev,
libghc-transformers-dev,
libghc-exceptions-dev (>= 0.6),
+ libghc-stm-dev,
git,
make,
Description: property-based host configuration management in haskell
diff --git a/propellor.cabal b/propellor.cabal
index 20e82407..da43775f 100644
--- a/propellor.cabal
+++ b/propellor.cabal
@@ -39,7 +39,7 @@ Executable propellor
Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5,
IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal,
containers (>= 0.5), network, async, time, QuickCheck, mtl, transformers,
- exceptions (>= 0.6)
+ exceptions (>= 0.6), stm
if (! os(windows))
Build-Depends: unix
@@ -51,7 +51,7 @@ Executable propellor-config
Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5,
IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal,
containers (>= 0.5), network, async, time, QuickCheck, mtl, transformers,
- exceptions
+ exceptions, stm
if (! os(windows))
Build-Depends: unix
@@ -62,7 +62,7 @@ Library
Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5,
IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal,
containers (>= 0.5), network, async, time, QuickCheck, mtl, transformers,
- exceptions
+ exceptions, stm
if (! os(windows))
Build-Depends: unix
diff --git a/src/Propellor/Bootstrap.hs b/src/Propellor/Bootstrap.hs
index 6a5d5acb..2318b910 100644
--- a/src/Propellor/Bootstrap.hs
+++ b/src/Propellor/Bootstrap.hs
@@ -65,7 +65,7 @@ depsCommand = "( " ++ intercalate " ; " (concat [osinstall, cabalinstall]) ++ "
aptinstall p = "apt-get --no-upgrade --no-install-recommends -y install " ++ p
- -- This is the same build deps listed in debian/control.
+ -- This is the same deps listed in debian/control.
debdeps =
[ "gnupg"
, "ghc"
@@ -81,6 +81,7 @@ depsCommand = "( " ++ intercalate " ; " (concat [osinstall, cabalinstall]) ++ "
, "libghc-mtl-dev"
, "libghc-transformers-dev"
, "libghc-exceptions-dev"
+ , "libghc-stm-dev"
, "make"
]
diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs
index c6550b84..5535066f 100644
--- a/src/Utility/ConcurrentOutput.hs
+++ b/src/Utility/ConcurrentOutput.hs
@@ -1,3 +1,5 @@
+{-# LANGUAGE BangPatterns #-}
+
-- | Concurrent output handling.
module Utility.ConcurrentOutput (
@@ -14,6 +16,7 @@ import Control.Monad.IO.Class (liftIO, MonadIO)
import Control.Applicative
import System.IO.Unsafe (unsafePerformIO)
import Control.Concurrent
+import Control.Concurrent.STM
import Control.Concurrent.Async
import Data.Maybe
import Data.List
@@ -25,21 +28,23 @@ import Utility.Monad
import Utility.Exception
data OutputHandle = OutputHandle
- { outputLock :: MVar () -- ^ empty when locked
- , outputLockedBy :: MVar Locker
+ { outputLock :: TMVar (Maybe Locker)
}
data Locker
= GeneralLock
- | ProcessLock P.ProcessHandle
+ | ProcessLock P.ProcessHandle String
+
+instance Show Locker where
+ show GeneralLock = "GeneralLock"
+ show (ProcessLock _ cmd) = "ProcessLock " ++ cmd
-- | A shared global variable for the OutputHandle.
{-# NOINLINE globalOutputHandle #-}
globalOutputHandle :: MVar OutputHandle
globalOutputHandle = unsafePerformIO $
newMVar =<< OutputHandle
- <$> newMVar ()
- <*> newEmptyMVar
+ <$> newTMVarIO Nothing
-- | Gets the global OutputHandle.
getOutputHandle :: IO OutputHandle
@@ -58,61 +63,69 @@ takeOutputLock = void $ takeOutputLock' True
tryTakeOutputLock :: IO Bool
tryTakeOutputLock = takeOutputLock' False
-takeOutputLock' :: Bool -> IO Bool
-takeOutputLock' block = do
+withLock :: (TMVar (Maybe Locker) -> STM a) -> IO a
+withLock a = do
lck <- outputLock <$> getOutputHandle
- go =<< tryTakeMVar lck
+ atomically (a lck)
+
+-- The lock TMVar is kept full normally, even if only with Nothing,
+-- so if we take it here, that blocks anyone else from trying
+-- to take the lock while we are checking it.
+takeOutputLock' :: Bool -> IO Bool
+takeOutputLock' block = go =<< withLock tryTakeTMVar
where
- -- lck was full, and we've emptied it, so we hold the lock now.
- go (Just ()) = havelock
- -- lck is empty, so someone else is holding the lock.
- go Nothing = do
- lcker <- outputLockedBy <$> getOutputHandle
- v' <- tryTakeMVar lcker
- case v' of
- Just (ProcessLock h) ->
- -- if process has exited, lock is stale
- ifM (isJust <$> P.getProcessExitCode h)
- ( havelock
- , if block
- then do
- void $ P.waitForProcess h
- havelock
- else do
- putMVar lcker (ProcessLock h)
- return False
- )
- Just GeneralLock -> do
- putMVar lcker GeneralLock
- whenblock waitlock
- Nothing -> whenblock waitlock
+ go Nothing = whenblock waitlock
+ -- Something has the lock. It may be stale, so check it.
+ -- We must always be sure to fill the TMVar back with Just or Nothing.
+ go (Just orig) = case orig of
+ Nothing -> havelock
+ (Just (ProcessLock h _)) ->
+ -- when process has exited, lock is stale
+ ifM (isJust <$> P.getProcessExitCode h)
+ ( havelock
+ , if block
+ then do
+ hPutStr stderr "WAITFORPROCESS in lock"
+ hFlush stderr
+ void $ P.waitForProcess h
+ hPutStr stderr "WAITFORPROCESS in lock done"
+ hFlush stderr
+ havelock
+ else do
+ withLock (`putTMVar` orig)
+ return False
+ )
+ (Just GeneralLock) -> do
+ withLock (`putTMVar` orig)
+ whenblock waitlock
havelock = do
- updateOutputLocker GeneralLock
+ withLock (`putTMVar` Just GeneralLock)
return True
- waitlock = do
- -- Wait for current lock holder to relinquish
- -- it and take the lock.
- lck <- outputLock <$> getOutputHandle
- takeMVar lck
- havelock
+
+ -- Wait for current lock holder (if any) to relinquish
+ -- it and take the lock for ourselves.
+ waitlock = withLock $ \l -> do
+ v <- tryTakeTMVar l
+ case v of
+ Just (Just _) -> retry
+ _ -> do
+ putTMVar l (Just GeneralLock)
+ return True
+
whenblock a = if block then a else return False
-- | Only safe to call after taking the output lock.
dropOutputLock :: IO ()
-dropOutputLock = do
- lcker <- outputLockedBy <$> getOutputHandle
- lck <- outputLock <$> getOutputHandle
- void $ takeMVar lcker
- putMVar lck ()
+dropOutputLock = withLock $ \l -> do
+ void $ takeTMVar l
+ putTMVar l Nothing
-- | Only safe to call after takeOutputLock; updates the Locker.
updateOutputLocker :: Locker -> IO ()
-updateOutputLocker l = do
- lcker <- outputLockedBy <$> getOutputHandle
- void $ tryTakeMVar lcker
- putMVar lcker l
- modifyMVar_ lcker (const $ return l)
+updateOutputLocker locker = withLock $ \l -> do
+ void $ takeTMVar l
+ putTMVar l (Just locker)
-- | Use this around any IO actions that use `outputConcurrent`
-- or `createProcessConcurrent`
@@ -124,7 +137,7 @@ withConcurrentOutput a = a `finally` drain
where
-- Just taking the output lock is enough to ensure that anything
-- that was buffering output has had a chance to flush its buffer.
- drain = lockOutput (return ())
+ drain = lockOutput noop
-- | Displays a string to stdout, and flush output so it's displayed.
--
@@ -158,28 +171,25 @@ outputConcurrent s = do
-- as the output lock becomes free.
createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
createProcessConcurrent p
- | willoutput (P.std_out p) || willoutput (P.std_err p) =
+ | willOutput (P.std_out p) || willOutput (P.std_err p) =
ifM tryTakeOutputLock
- ( do
- hPutStrLn stderr "IS NOT CONCURRENT"
- firstprocess
- , do
- hPutStrLn stderr "IS CONCURRENT"
- concurrentprocess
+ ( firstprocess
+ , concurrentprocess
)
| otherwise = P.createProcess p
where
- willoutput P.Inherit = True
- willoutput _ = False
+ rediroutput ss h
+ | willOutput ss = P.UseHandle h
+ | otherwise = ss
- rediroutput str h
- | willoutput str = P.UseHandle h
- | otherwise = str
+ cmd = case P.cmdspec p of
+ P.ShellCommand s -> s
+ P.RawCommand c ps -> unwords (c:ps)
firstprocess = do
r@(_, _, _, h) <- P.createProcess p
`onException` dropOutputLock
- updateOutputLocker (ProcessLock h)
+ updateOutputLocker (ProcessLock h cmd)
-- Output lock is still held as we return; the process
-- is running now, and once it exits the output lock will
-- be stale and can then be taken by something else.
@@ -196,8 +206,8 @@ createProcessConcurrent p
hClose toouth
hClose toerrh
buf <- newMVar []
- void $ async $ outputDrainer fromouth stdout buf
- void $ async $ outputDrainer fromerrh stderr buf
+ void $ async $ outputDrainer (P.std_out p) fromouth stdout buf
+ void $ async $ outputDrainer (P.std_err p) fromerrh stderr buf
void $ async $ bufferWriter buf
return r
@@ -205,6 +215,10 @@ createProcessConcurrent p
(from, to) <- createPipe
(,) <$> fdToHandle to <*> fdToHandle from
+willOutput :: P.StdStream -> Bool
+willOutput P.Inherit = True
+willOutput _ = False
+
type Buffer = [(Handle, BufferedActivity)]
data BufferedActivity
@@ -213,17 +227,22 @@ data BufferedActivity
| InTempFile FilePath
deriving (Eq)
--- Drain output from the handle, and buffer it in memory.
-outputDrainer :: Handle -> Handle -> MVar Buffer -> IO ()
-outputDrainer fromh toh buf = do
- v <- tryIO $ B.hGetSome fromh 1024
- case v of
- Right b | not (B.null b) -> do
- modifyMVar_ buf $ addBuffer (toh, Output b)
- outputDrainer fromh toh buf
- _ -> do
- modifyMVar_ buf $ pure . (++ [(toh, ReachedEnd)])
- hClose fromh
+-- Drain output from the handle, and buffer it.
+outputDrainer :: P.StdStream -> Handle -> Handle -> MVar Buffer -> IO ()
+outputDrainer ss fromh toh buf
+ | willOutput ss = go
+ | otherwise = atend
+ where
+ go = do
+ v <- tryIO $ B.hGetSome fromh 1024
+ case v of
+ Right b | not (B.null b) -> do
+ modifyMVar_ buf $ addBuffer (toh, Output b)
+ go
+ _ -> atend
+ atend = do
+ modifyMVar_ buf $ pure . (++ [(toh, ReachedEnd)])
+ hClose fromh
-- Wait to lock output, and once we can, display everything
-- that's put into buffer, until the end is signaled by Nothing
@@ -262,8 +281,8 @@ addBuffer (toh, Output b) buf
hClose h
return ((toh, InTempFile tmp) : other)
where
- b' = B.concat (mapMaybe getOutput this) <> b
- (this, other) = partition same buf
+ !b' = B.concat (mapMaybe getOutput this) <> b
+ !(this, other) = partition same buf
same v = fst v == toh && case snd v of
Output _ -> True
_ -> False