From 51b397d0415e1efe1df412842ccb76d702140f50 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 27 Oct 2015 23:19:41 -0400 Subject: concurrent version of createProcess Have not yet wired everything up to use this, that currently uses Utility.Process. --- src/Propellor/Message.hs | 213 +++++++++++++++++++++++++++++++++++++++--- src/Propellor/Property/Cmd.hs | 1 + 2 files changed, 201 insertions(+), 13 deletions(-) (limited to 'src') diff --git a/src/Propellor/Message.hs b/src/Propellor/Message.hs index 0961a356..afe551cf 100644 --- a/src/Propellor/Message.hs +++ b/src/Propellor/Message.hs @@ -20,10 +20,12 @@ module Propellor.Message ( enableDebugMode, processChainOutput, messagesDone, + createProcess, ) where import System.Console.ANSI import System.IO +import System.Posix.IO import System.Log.Logger import System.Log.Formatter import System.Log.Handler (setFormatter) @@ -34,26 +36,38 @@ import System.Directory import Control.Monad.IfElse import System.IO.Unsafe (unsafePerformIO) import Control.Concurrent +import Control.Concurrent.Async +import Data.Maybe +import Data.Char +import Data.List +import Data.Monoid +import qualified Data.ByteString as B import Propellor.Types import Utility.PartialPrelude import Utility.Monad import Utility.Env -import Utility.Process import Utility.Exception +import qualified Utility.Process as P data MessageHandle = MessageHandle { isConsole :: Bool - , outputLock :: MVar () + , outputLock :: MVar () -- ^ empty when locked + , outputLockedBy :: MVar Locker } +data Locker + = GeneralLock + | ProcessLock P.ProcessHandle + -- | A shared global variable for the MessageHandle. {-# NOINLINE globalMessageHandle #-} globalMessageHandle :: MVar MessageHandle -globalMessageHandle = unsafePerformIO $ do - c <- hIsTerminalDevice stdout - o <- newMVar () - newMVar $ MessageHandle c o +globalMessageHandle = unsafePerformIO $ + newMVar =<< MessageHandle + <$> hIsTerminalDevice stdout + <*> newMVar () + <*> newEmptyMVar -- | Gets the global MessageHandle. getMessageHandle :: IO MessageHandle @@ -62,9 +76,71 @@ getMessageHandle = readMVar globalMessageHandle -- | Takes a lock while performing an action. Any other threads -- that try to lockOutput at the same time will block. lockOutput :: (MonadIO m, MonadMask m) => m a -> m a -lockOutput a = do - lck <- liftIO $ outputLock <$> getMessageHandle - bracket_ (liftIO $ takeMVar lck) (liftIO $ putMVar lck ()) a +lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock) + +-- | Blocks until we have the output lock. +takeOutputLock :: IO () +takeOutputLock = void $ takeOutputLock' True + +-- | Tries to take the output lock, without blocking. +tryTakeOutputLock :: IO Bool +tryTakeOutputLock = takeOutputLock' False + +takeOutputLock' :: Bool -> IO Bool +takeOutputLock' block = do + lck <- outputLock <$> getMessageHandle + go =<< tryTakeMVar lck + where + -- lck was full, and we've emptied it, so we hold the lock now. + go (Just ()) = havelock + -- lck is empty, so someone else is holding the lock. + go Nothing = do + lcker <- outputLockedBy <$> getMessageHandle + v' <- tryTakeMVar lcker + case v' of + Just (ProcessLock h) -> + -- if process has exited, lock is stale + ifM (isJust <$> P.getProcessExitCode h) + ( havelock + , if block + then do + void $ P.waitForProcess h + havelock + else do + putMVar lcker (ProcessLock h) + return False + ) + Just GeneralLock -> do + putMVar lcker GeneralLock + whenblock waitlock + Nothing -> whenblock waitlock + + havelock = do + updateOutputLocker GeneralLock + return True + waitlock = do + -- Wait for current lock holder to relinquish + -- it and take the lock. + lck <- outputLock <$> getMessageHandle + takeMVar lck + havelock + whenblock a = if block then a else return False + +-- | Only safe to call after taking the output lock. +dropOutputLock :: IO () +dropOutputLock = do + lcker <- outputLockedBy <$> getMessageHandle + lck <- outputLock <$> getMessageHandle + takeMVar lcker + putMVar lck () + +-- | Only safe to call after takeOutputLock; updates the Locker. +updateOutputLocker :: Locker -> IO () +updateOutputLocker l = do + lcker <- outputLockedBy <$> getMessageHandle + void $ tryTakeMVar lcker + putMVar lcker l + modifyMVar_ lcker (const $ return l) -- | Force console output. This can be used when stdout is not directly -- connected to a console, but is eventually going to be displayed at a @@ -89,14 +165,14 @@ actionMessageOn :: (MonadIO m, MonadMask m, ActionResult r) => HostName -> Desc actionMessageOn = actionMessage' . Just actionMessage' :: (MonadIO m, MonadMask m, ActionResult r) => Maybe HostName -> Desc -> m r -> m r -actionMessage' mhn desc a = lockOutput $ do - liftIO $ whenConsole $ do +actionMessage' mhn desc a = do + liftIO $ whenConsole $ lockOutput $ do setTitle $ "propellor: " ++ desc hFlush stdout r <- a - liftIO $ do + liftIO $ lockOutput $ do whenConsole $ setTitle "propellor: running" showhn mhn @@ -151,7 +227,7 @@ checkDebugMode = go =<< getEnv "PROPELLOR_DEBUG" go Nothing = whenM (doesDirectoryExist ".git") $ whenM (elem "1" . lines <$> getgitconfig) enableDebugMode getgitconfig = catchDefaultIO "" $ - readProcess "git" ["config", "propellor.debug"] + P.readProcess "git" ["config", "propellor.debug"] enableDebugMode :: IO () enableDebugMode = do @@ -194,3 +270,114 @@ messagesDone = lockOutput $ do whenConsole $ setTitle "propellor: done" hFlush stdout + +-- | Wrapper around `System.Process.createProcess` that prevents processes +-- that are running concurrently from writing to the stdout/stderr at the +-- same time. +-- +-- The first process run by createProcess is allowed to write to +-- stdout and stderr in the usual way. +-- +-- However, if a second createProcess runs concurrently with the +-- first, any stdout or stderr that would have been displayed by it is +-- instead buffered. The buffered output will be displayed the next time it +-- is safe to do so (ie, after the first process exits). +-- +-- `Propellor.Property.Cmd` has some other useful actions for running +-- commands, which are based on this. +-- +-- Also does debug logging of all commands run. +createProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) +createProcess p + | hasoutput (P.std_out p) || hasoutput (P.std_err p) = + ifM tryTakeOutputLock + ( firstprocess + , concurrentprocess + ) + | otherwise = P.createProcess p + where + hasoutput P.Inherit = True + hasoutput _ = False + + firstprocess = do + r@(_, _, _, h) <- P.createProcess p + `onException` dropOutputLock + updateOutputLocker (ProcessLock h) + -- Output lock is still held as we return; the process + -- is running now, and once it exits the output lock will + -- be stale and can then be taken by something else. + return r + + concurrentprocess = do + (toouth, fromouth) <- pipe + (toerrh, fromerrh) <- pipe + let p' = p + { P.std_out = if hasoutput (P.std_out p) + then P.UseHandle toouth + else P.std_out p + , P.std_err = if hasoutput (P.std_err p) + then P.UseHandle toerrh + else P.std_err p + } + r@(_, _, _, ph) <- P.createProcess p' + hClose toouth + hClose toerrh + buf <- newMVar [] + void $ async $ outputDrainer fromouth stdout buf + void $ async $ outputDrainer fromouth stderr buf + void $ async $ bufferWriter buf + return r + + pipe = do + (from, to) <- createPipe + (,) <$> fdToHandle to <*> fdToHandle from + +type Buffer = [(Handle, Maybe B.ByteString)] + +-- Drain output from the handle, and buffer it in memory. +outputDrainer :: Handle -> Handle -> MVar Buffer -> IO () +outputDrainer fromh toh buf = do + v <- tryIO $ B.hGetSome fromh 1024 + case v of + Right b | not (B.null b) -> do + modifyMVar_ buf (pure . addBuffer (toh, Just b)) + outputDrainer fromh toh buf + _ -> do + modifyMVar_ buf (pure . (++ [(toh, Nothing)])) + hClose fromh + +-- Wait to lock output, and once we can, display everything +-- that's put into buffer, until the end is signaled by Nothing +-- for both stdout and stderr. +bufferWriter buf = lockOutput (go [stdout, stderr]) + where + go [] = return () + go hs = do + l <- takeMVar buf + forM_ l $ \(h, mb) -> do + maybe noop (B.hPut h) mb + hFlush h + let hs' = filter (\h -> not (any (== (h, Nothing)) l)) hs + putMVar buf [] + go hs' + +-- The buffer can grow up to 1 mb in size, but after that point, +-- it's truncated to avoid propellor using unbounded memory +-- when a process outputs a whole lot of stuff. +bufsz = 1000000 + +addBuffer :: (Handle, Maybe B.ByteString) -> Buffer -> Buffer +addBuffer v@(_, Nothing) buf = buf ++ [v] +addBuffer (toh, Just b) buf = (toh, Just b') : other + where + (this, other) = partition (\v -> fst v == toh && isJust (snd v)) buf + b' = truncateBuffer $ B.concat (mapMaybe snd this) <> b + +-- Truncate a buffer by removing lines from the front until it's +-- small enough. +truncateBuffer :: B.ByteString -> B.ByteString +truncateBuffer b + | B.length b <= bufsz = b + | otherwise = truncateBuffer $ snd $ B.breakByte nl b + where + nl = fromIntegral (ord '\n') diff --git a/src/Propellor/Property/Cmd.hs b/src/Propellor/Property/Cmd.hs index 23816a94..f2c5b33e 100644 --- a/src/Propellor/Property/Cmd.hs +++ b/src/Propellor/Property/Cmd.hs @@ -16,6 +16,7 @@ module Propellor.Property.Cmd ( safeSystemEnv, shellEscape, createProcess, + waitForProcess, ) where import Control.Applicative -- cgit v1.2.3