summaryrefslogtreecommitdiff
path: root/src/Propellor/Message.hs
diff options
context:
space:
mode:
authorJoey Hess2015-10-27 23:19:41 -0400
committerJoey Hess2015-10-27 23:24:26 -0400
commit51b397d0415e1efe1df412842ccb76d702140f50 (patch)
tree567bec1fcc50ce4f6cd8a7d79405922969cec169 /src/Propellor/Message.hs
parent20b04d366b2cff90c39d06fd424ae3e8b67e49f6 (diff)
concurrent version of createProcess
Have not yet wired everything up to use this, that currently uses Utility.Process.
Diffstat (limited to 'src/Propellor/Message.hs')
-rw-r--r--src/Propellor/Message.hs213
1 files changed, 200 insertions, 13 deletions
diff --git a/src/Propellor/Message.hs b/src/Propellor/Message.hs
index 0961a356..afe551cf 100644
--- a/src/Propellor/Message.hs
+++ b/src/Propellor/Message.hs
@@ -20,10 +20,12 @@ module Propellor.Message (
enableDebugMode,
processChainOutput,
messagesDone,
+ createProcess,
) where
import System.Console.ANSI
import System.IO
+import System.Posix.IO
import System.Log.Logger
import System.Log.Formatter
import System.Log.Handler (setFormatter)
@@ -34,26 +36,38 @@ import System.Directory
import Control.Monad.IfElse
import System.IO.Unsafe (unsafePerformIO)
import Control.Concurrent
+import Control.Concurrent.Async
+import Data.Maybe
+import Data.Char
+import Data.List
+import Data.Monoid
+import qualified Data.ByteString as B
import Propellor.Types
import Utility.PartialPrelude
import Utility.Monad
import Utility.Env
-import Utility.Process
import Utility.Exception
+import qualified Utility.Process as P
data MessageHandle = MessageHandle
{ isConsole :: Bool
- , outputLock :: MVar ()
+ , outputLock :: MVar () -- ^ empty when locked
+ , outputLockedBy :: MVar Locker
}
+data Locker
+ = GeneralLock
+ | ProcessLock P.ProcessHandle
+
-- | A shared global variable for the MessageHandle.
{-# NOINLINE globalMessageHandle #-}
globalMessageHandle :: MVar MessageHandle
-globalMessageHandle = unsafePerformIO $ do
- c <- hIsTerminalDevice stdout
- o <- newMVar ()
- newMVar $ MessageHandle c o
+globalMessageHandle = unsafePerformIO $
+ newMVar =<< MessageHandle
+ <$> hIsTerminalDevice stdout
+ <*> newMVar ()
+ <*> newEmptyMVar
-- | Gets the global MessageHandle.
getMessageHandle :: IO MessageHandle
@@ -62,9 +76,71 @@ getMessageHandle = readMVar globalMessageHandle
-- | Takes a lock while performing an action. Any other threads
-- that try to lockOutput at the same time will block.
lockOutput :: (MonadIO m, MonadMask m) => m a -> m a
-lockOutput a = do
- lck <- liftIO $ outputLock <$> getMessageHandle
- bracket_ (liftIO $ takeMVar lck) (liftIO $ putMVar lck ()) a
+lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock)
+
+-- | Blocks until we have the output lock.
+takeOutputLock :: IO ()
+takeOutputLock = void $ takeOutputLock' True
+
+-- | Tries to take the output lock, without blocking.
+tryTakeOutputLock :: IO Bool
+tryTakeOutputLock = takeOutputLock' False
+
+takeOutputLock' :: Bool -> IO Bool
+takeOutputLock' block = do
+ lck <- outputLock <$> getMessageHandle
+ go =<< tryTakeMVar lck
+ where
+ -- lck was full, and we've emptied it, so we hold the lock now.
+ go (Just ()) = havelock
+ -- lck is empty, so someone else is holding the lock.
+ go Nothing = do
+ lcker <- outputLockedBy <$> getMessageHandle
+ v' <- tryTakeMVar lcker
+ case v' of
+ Just (ProcessLock h) ->
+ -- if process has exited, lock is stale
+ ifM (isJust <$> P.getProcessExitCode h)
+ ( havelock
+ , if block
+ then do
+ void $ P.waitForProcess h
+ havelock
+ else do
+ putMVar lcker (ProcessLock h)
+ return False
+ )
+ Just GeneralLock -> do
+ putMVar lcker GeneralLock
+ whenblock waitlock
+ Nothing -> whenblock waitlock
+
+ havelock = do
+ updateOutputLocker GeneralLock
+ return True
+ waitlock = do
+ -- Wait for current lock holder to relinquish
+ -- it and take the lock.
+ lck <- outputLock <$> getMessageHandle
+ takeMVar lck
+ havelock
+ whenblock a = if block then a else return False
+
+-- | Only safe to call after taking the output lock.
+dropOutputLock :: IO ()
+dropOutputLock = do
+ lcker <- outputLockedBy <$> getMessageHandle
+ lck <- outputLock <$> getMessageHandle
+ takeMVar lcker
+ putMVar lck ()
+
+-- | Only safe to call after takeOutputLock; updates the Locker.
+updateOutputLocker :: Locker -> IO ()
+updateOutputLocker l = do
+ lcker <- outputLockedBy <$> getMessageHandle
+ void $ tryTakeMVar lcker
+ putMVar lcker l
+ modifyMVar_ lcker (const $ return l)
-- | Force console output. This can be used when stdout is not directly
-- connected to a console, but is eventually going to be displayed at a
@@ -89,14 +165,14 @@ actionMessageOn :: (MonadIO m, MonadMask m, ActionResult r) => HostName -> Desc
actionMessageOn = actionMessage' . Just
actionMessage' :: (MonadIO m, MonadMask m, ActionResult r) => Maybe HostName -> Desc -> m r -> m r
-actionMessage' mhn desc a = lockOutput $ do
- liftIO $ whenConsole $ do
+actionMessage' mhn desc a = do
+ liftIO $ whenConsole $ lockOutput $ do
setTitle $ "propellor: " ++ desc
hFlush stdout
r <- a
- liftIO $ do
+ liftIO $ lockOutput $ do
whenConsole $
setTitle "propellor: running"
showhn mhn
@@ -151,7 +227,7 @@ checkDebugMode = go =<< getEnv "PROPELLOR_DEBUG"
go Nothing = whenM (doesDirectoryExist ".git") $
whenM (elem "1" . lines <$> getgitconfig) enableDebugMode
getgitconfig = catchDefaultIO "" $
- readProcess "git" ["config", "propellor.debug"]
+ P.readProcess "git" ["config", "propellor.debug"]
enableDebugMode :: IO ()
enableDebugMode = do
@@ -194,3 +270,114 @@ messagesDone = lockOutput $ do
whenConsole $
setTitle "propellor: done"
hFlush stdout
+
+-- | Wrapper around `System.Process.createProcess` that prevents processes
+-- that are running concurrently from writing to the stdout/stderr at the
+-- same time.
+--
+-- The first process run by createProcess is allowed to write to
+-- stdout and stderr in the usual way.
+--
+-- However, if a second createProcess runs concurrently with the
+-- first, any stdout or stderr that would have been displayed by it is
+-- instead buffered. The buffered output will be displayed the next time it
+-- is safe to do so (ie, after the first process exits).
+--
+-- `Propellor.Property.Cmd` has some other useful actions for running
+-- commands, which are based on this.
+--
+-- Also does debug logging of all commands run.
+createProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
+createProcess p
+ | hasoutput (P.std_out p) || hasoutput (P.std_err p) =
+ ifM tryTakeOutputLock
+ ( firstprocess
+ , concurrentprocess
+ )
+ | otherwise = P.createProcess p
+ where
+ hasoutput P.Inherit = True
+ hasoutput _ = False
+
+ firstprocess = do
+ r@(_, _, _, h) <- P.createProcess p
+ `onException` dropOutputLock
+ updateOutputLocker (ProcessLock h)
+ -- Output lock is still held as we return; the process
+ -- is running now, and once it exits the output lock will
+ -- be stale and can then be taken by something else.
+ return r
+
+ concurrentprocess = do
+ (toouth, fromouth) <- pipe
+ (toerrh, fromerrh) <- pipe
+ let p' = p
+ { P.std_out = if hasoutput (P.std_out p)
+ then P.UseHandle toouth
+ else P.std_out p
+ , P.std_err = if hasoutput (P.std_err p)
+ then P.UseHandle toerrh
+ else P.std_err p
+ }
+ r@(_, _, _, ph) <- P.createProcess p'
+ hClose toouth
+ hClose toerrh
+ buf <- newMVar []
+ void $ async $ outputDrainer fromouth stdout buf
+ void $ async $ outputDrainer fromouth stderr buf
+ void $ async $ bufferWriter buf
+ return r
+
+ pipe = do
+ (from, to) <- createPipe
+ (,) <$> fdToHandle to <*> fdToHandle from
+
+type Buffer = [(Handle, Maybe B.ByteString)]
+
+-- Drain output from the handle, and buffer it in memory.
+outputDrainer :: Handle -> Handle -> MVar Buffer -> IO ()
+outputDrainer fromh toh buf = do
+ v <- tryIO $ B.hGetSome fromh 1024
+ case v of
+ Right b | not (B.null b) -> do
+ modifyMVar_ buf (pure . addBuffer (toh, Just b))
+ outputDrainer fromh toh buf
+ _ -> do
+ modifyMVar_ buf (pure . (++ [(toh, Nothing)]))
+ hClose fromh
+
+-- Wait to lock output, and once we can, display everything
+-- that's put into buffer, until the end is signaled by Nothing
+-- for both stdout and stderr.
+bufferWriter buf = lockOutput (go [stdout, stderr])
+ where
+ go [] = return ()
+ go hs = do
+ l <- takeMVar buf
+ forM_ l $ \(h, mb) -> do
+ maybe noop (B.hPut h) mb
+ hFlush h
+ let hs' = filter (\h -> not (any (== (h, Nothing)) l)) hs
+ putMVar buf []
+ go hs'
+
+-- The buffer can grow up to 1 mb in size, but after that point,
+-- it's truncated to avoid propellor using unbounded memory
+-- when a process outputs a whole lot of stuff.
+bufsz = 1000000
+
+addBuffer :: (Handle, Maybe B.ByteString) -> Buffer -> Buffer
+addBuffer v@(_, Nothing) buf = buf ++ [v]
+addBuffer (toh, Just b) buf = (toh, Just b') : other
+ where
+ (this, other) = partition (\v -> fst v == toh && isJust (snd v)) buf
+ b' = truncateBuffer $ B.concat (mapMaybe snd this) <> b
+
+-- Truncate a buffer by removing lines from the front until it's
+-- small enough.
+truncateBuffer :: B.ByteString -> B.ByteString
+truncateBuffer b
+ | B.length b <= bufsz = b
+ | otherwise = truncateBuffer $ snd $ B.breakByte nl b
+ where
+ nl = fromIntegral (ord '\n')