From 261d008d41e6656ce4ceafb8c0f0630d5795944a Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 27 Oct 2015 23:50:27 -0400 Subject: merge from git-annex --- src/Utility/Process.hs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) (limited to 'src/Utility') diff --git a/src/Utility/Process.hs b/src/Utility/Process.hs index c4882a01..cc113867 100644 --- a/src/Utility/Process.hs +++ b/src/Utility/Process.hs @@ -172,22 +172,21 @@ createBackgroundProcess p a = a =<< createProcess p -- returns a transcript combining its stdout and stderr, and -- whether it succeeded or failed. processTranscript :: String -> [String] -> (Maybe String) -> IO (String, Bool) -processTranscript cmd opts = processTranscript' cmd opts Nothing +processTranscript = processTranscript' id -processTranscript' :: String -> [String] -> Maybe [(String, String)] -> (Maybe String) -> IO (String, Bool) -processTranscript' cmd opts environ input = do +processTranscript' :: (CreateProcess -> CreateProcess) -> String -> [String] -> Maybe String -> IO (String, Bool) +processTranscript' modproc cmd opts input = do #ifndef mingw32_HOST_OS {- This implementation interleves stdout and stderr in exactly the order - the process writes them. -} (readf, writef) <- System.Posix.IO.createPipe readh <- System.Posix.IO.fdToHandle readf writeh <- System.Posix.IO.fdToHandle writef - p@(_, _, _, pid) <- createProcess $ + p@(_, _, _, pid) <- createProcess $ modproc $ (proc cmd opts) { std_in = if isJust input then CreatePipe else Inherit , std_out = UseHandle writeh , std_err = UseHandle writeh - , env = environ } hClose writeh @@ -199,12 +198,11 @@ processTranscript' cmd opts environ input = do return (transcript, ok) #else {- This implementation for Windows puts stderr after stdout. -} - p@(_, _, _, pid) <- createProcess $ + p@(_, _, _, pid) <- createProcess $ modproc $ (proc cmd opts) { std_in = if isJust input then CreatePipe else Inherit , std_out = CreatePipe , std_err = CreatePipe - , env = environ } getout <- mkreader (stdoutHandle p) -- cgit v1.2.3 From 894e2f7980052f1c331ba7780100ae0ad19856cb Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 27 Oct 2015 23:52:02 -0400 Subject: use execProcessConcurrent everywhere Found a reasonable clean way to make Utility.Process use execProcessConcurrent, while still allowing copying updates to it from git-annex. --- propellor.cabal | 2 ++ src/Propellor/Base.hs | 2 ++ src/Propellor/Debug.hs | 36 +++++++++++++++++++++++++++++++ src/Propellor/Message.hs | 49 ++++++++----------------------------------- src/Propellor/Property/Cmd.hs | 2 +- src/Utility/Process.hs | 16 +++++++------- src/Utility/Process/Shim.hs | 8 +++++++ 7 files changed, 66 insertions(+), 49 deletions(-) create mode 100644 src/Propellor/Debug.hs create mode 100644 src/Utility/Process/Shim.hs (limited to 'src/Utility') diff --git a/propellor.cabal b/propellor.cabal index 7a9d2b5d..63fcaaa5 100644 --- a/propellor.cabal +++ b/propellor.cabal @@ -135,6 +135,7 @@ Library Propellor.CmdLine Propellor.Info Propellor.Message + Propellor.Debug Propellor.PrivData Propellor.Engine Propellor.Exception @@ -175,6 +176,7 @@ Library Utility.PartialPrelude Utility.PosixFiles Utility.Process + Utility.Process.Shim Utility.SafeCommand Utility.Scheduled Utility.Table diff --git a/src/Propellor/Base.hs b/src/Propellor/Base.hs index 3c13bb7d..2a0f5cbc 100644 --- a/src/Propellor/Base.hs +++ b/src/Propellor/Base.hs @@ -15,6 +15,7 @@ module Propellor.Base ( , module Propellor.Engine , module Propellor.Exception , module Propellor.Message + , module Propellor.Debug , module Propellor.Location , module Propellor.Utilities @@ -39,6 +40,7 @@ import Propellor.Property.Cmd import Propellor.PrivData import Propellor.Types.PrivData import Propellor.Message +import Propellor.Debug import Propellor.Exception import Propellor.Info import Propellor.PropAccum diff --git a/src/Propellor/Debug.hs b/src/Propellor/Debug.hs new file mode 100644 index 00000000..ac4a56cc --- /dev/null +++ b/src/Propellor/Debug.hs @@ -0,0 +1,36 @@ +module Propellor.Debug where + +import Control.Applicative +import Control.Monad.IfElse +import System.IO +import System.Directory +import System.Log.Logger +import System.Log.Formatter +import System.Log.Handler (setFormatter) +import System.Log.Handler.Simple + +import Utility.Monad +import Utility.Env +import Utility.Exception +import Utility.Process + +debug :: [String] -> IO () +debug = debugM "propellor" . unwords + +checkDebugMode :: IO () +checkDebugMode = go =<< getEnv "PROPELLOR_DEBUG" + where + go (Just "1") = enableDebugMode + go (Just _) = noop + go Nothing = whenM (doesDirectoryExist ".git") $ + whenM (elem "1" . lines <$> getgitconfig) enableDebugMode + getgitconfig = catchDefaultIO "" $ + readProcess "git" ["config", "propellor.debug"] + +enableDebugMode :: IO () +enableDebugMode = do + f <- setFormatter + <$> streamHandler stderr DEBUG + <*> pure (simpleLogFormatter "[$time] $msg") + updateGlobalLogger rootLoggerName $ + setLevel DEBUG . setHandlers [f] diff --git a/src/Propellor/Message.hs b/src/Propellor/Message.hs index afe551cf..4be8263e 100644 --- a/src/Propellor/Message.hs +++ b/src/Propellor/Message.hs @@ -15,24 +15,16 @@ module Propellor.Message ( warningMessage, infoMessage, errorMessage, - debug, - checkDebugMode, - enableDebugMode, processChainOutput, messagesDone, - createProcess, + createProcessConcurrent, ) where import System.Console.ANSI import System.IO import System.Posix.IO -import System.Log.Logger -import System.Log.Formatter -import System.Log.Handler (setFormatter) -import System.Log.Handler.Simple import "mtl" Control.Monad.Reader import Control.Applicative -import System.Directory import Control.Monad.IfElse import System.IO.Unsafe (unsafePerformIO) import Control.Concurrent @@ -42,13 +34,12 @@ import Data.Char import Data.List import Data.Monoid import qualified Data.ByteString as B +import qualified System.Process as P import Propellor.Types import Utility.PartialPrelude import Utility.Monad -import Utility.Env import Utility.Exception -import qualified Utility.Process as P data MessageHandle = MessageHandle { isConsole :: Bool @@ -131,7 +122,7 @@ dropOutputLock :: IO () dropOutputLock = do lcker <- outputLockedBy <$> getMessageHandle lck <- outputLock <$> getMessageHandle - takeMVar lcker + void $ takeMVar lcker putMVar lck () -- | Only safe to call after takeOutputLock; updates the Locker. @@ -216,27 +207,6 @@ colorLine intensity color msg = do putStrLn "" hFlush stdout -debug :: [String] -> IO () -debug = debugM "propellor" . unwords - -checkDebugMode :: IO () -checkDebugMode = go =<< getEnv "PROPELLOR_DEBUG" - where - go (Just "1") = enableDebugMode - go (Just _) = noop - go Nothing = whenM (doesDirectoryExist ".git") $ - whenM (elem "1" . lines <$> getgitconfig) enableDebugMode - getgitconfig = catchDefaultIO "" $ - P.readProcess "git" ["config", "propellor.debug"] - -enableDebugMode :: IO () -enableDebugMode = do - f <- setFormatter - <$> streamHandler stderr DEBUG - <*> pure (simpleLogFormatter "[$time] $msg") - updateGlobalLogger rootLoggerName $ - setLevel DEBUG . setHandlers [f] - -- | Reads and displays each line from the Handle, except for the last line -- which is a Result. processChainOutput :: Handle -> IO Result @@ -244,16 +214,13 @@ processChainOutput h = go Nothing where go lastline = do v <- catchMaybeIO (hGetLine h) - debug ["read from chained propellor: ", show v] case v of Nothing -> case lastline of Nothing -> do - debug ["chained propellor output nothing; assuming it failed"] return FailedChange Just l -> case readish l of Just r -> pure r Nothing -> do - debug ["chained propellor output did not end with a Result; assuming it failed"] lockOutput $ do putStrLn l hFlush stdout @@ -287,8 +254,8 @@ messagesDone = lockOutput $ do -- commands, which are based on this. -- -- Also does debug logging of all commands run. -createProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -createProcess p +createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) +createProcessConcurrent p | hasoutput (P.std_out p) || hasoutput (P.std_err p) = ifM tryTakeOutputLock ( firstprocess @@ -319,12 +286,12 @@ createProcess p then P.UseHandle toerrh else P.std_err p } - r@(_, _, _, ph) <- P.createProcess p' + r <- P.createProcess p' hClose toouth hClose toerrh buf <- newMVar [] void $ async $ outputDrainer fromouth stdout buf - void $ async $ outputDrainer fromouth stderr buf + void $ async $ outputDrainer fromerrh stderr buf void $ async $ bufferWriter buf return r @@ -349,6 +316,7 @@ outputDrainer fromh toh buf = do -- Wait to lock output, and once we can, display everything -- that's put into buffer, until the end is signaled by Nothing -- for both stdout and stderr. +bufferWriter :: MVar Buffer -> IO () bufferWriter buf = lockOutput (go [stdout, stderr]) where go [] = return () @@ -364,6 +332,7 @@ bufferWriter buf = lockOutput (go [stdout, stderr]) -- The buffer can grow up to 1 mb in size, but after that point, -- it's truncated to avoid propellor using unbounded memory -- when a process outputs a whole lot of stuff. +bufsz :: Int bufsz = 1000000 addBuffer :: (Handle, Maybe B.ByteString) -> Buffer -> Buffer diff --git a/src/Propellor/Property/Cmd.hs b/src/Propellor/Property/Cmd.hs index f2c5b33e..9536f71d 100644 --- a/src/Propellor/Property/Cmd.hs +++ b/src/Propellor/Property/Cmd.hs @@ -27,7 +27,7 @@ import Propellor.Types import Propellor.Property import Utility.SafeCommand import Utility.Env -import Utility.Process (createProcess, CreateProcess) +import Utility.Process (createProcess, CreateProcess, waitForProcess) -- | A property that can be satisfied by running a command. -- diff --git a/src/Utility/Process.hs b/src/Utility/Process.hs index cc113867..c6699961e 100644 --- a/src/Utility/Process.hs +++ b/src/Utility/Process.hs @@ -41,9 +41,12 @@ module Utility.Process ( devNull, ) where -import qualified System.Process -import qualified System.Process as X hiding (CreateProcess(..), createProcess, runInteractiveProcess, readProcess, readProcessWithExitCode, system, rawSystem, runInteractiveCommand, runProcess) -import System.Process hiding (createProcess, readProcess, waitForProcess) +import qualified Utility.Process.Shim +import qualified Utility.Process.Shim as X hiding (CreateProcess(..), createProcess, runInteractiveProcess, readProcess, readProcessWithExitCode, system, rawSystem, runInteractiveCommand, runProcess) +import Utility.Process.Shim hiding (createProcess, readProcess, waitForProcess) +import Utility.Misc +import Utility.Exception + import System.Exit import System.IO import System.Log.Logger @@ -58,9 +61,6 @@ import Control.Applicative import Data.Maybe import Prelude -import Utility.Misc -import Utility.Exception - type CreateProcessRunner = forall a. CreateProcess -> ((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> IO a) -> IO a data StdHandle = StdinHandle | StdoutHandle | StderrHandle @@ -372,7 +372,7 @@ startInteractiveProcess cmd args environ = do createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) createProcess p = do debugProcess p - System.Process.createProcess p + Utility.Process.Shim.createProcess p -- | Debugging trace for a CreateProcess. debugProcess :: CreateProcess -> IO () @@ -392,6 +392,6 @@ debugProcess p = debugM "Utility.Process" $ unwords -- | Wrapper around 'System.Process.waitForProcess' that does debug logging. waitForProcess :: ProcessHandle -> IO ExitCode waitForProcess h = do - r <- System.Process.waitForProcess h + r <- Utility.Process.Shim.waitForProcess h debugM "Utility.Process" ("process done " ++ show r) return r diff --git a/src/Utility/Process/Shim.hs b/src/Utility/Process/Shim.hs new file mode 100644 index 00000000..0da93bf7 --- /dev/null +++ b/src/Utility/Process/Shim.hs @@ -0,0 +1,8 @@ +module Utility.Process.Shim (module X, createProcess) where + +import System.Process as X hiding (createProcess) +import Propellor.Message (createProcessConcurrent) +import System.IO + +createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) +createProcess = createProcessConcurrent -- cgit v1.2.3 From af68ec950b2480749182d0d6838e96fd02c2c285 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 10:37:19 -0400 Subject: split out generic ConcurrentOutput module to Utility --- propellor.cabal | 1 + src/Propellor/Message.hs | 204 +----------------------------------- src/Utility/ConcurrentOutput.hs | 224 ++++++++++++++++++++++++++++++++++++++++ src/Utility/Process/Shim.hs | 2 +- 4 files changed, 229 insertions(+), 202 deletions(-) create mode 100644 src/Utility/ConcurrentOutput.hs (limited to 'src/Utility') diff --git a/propellor.cabal b/propellor.cabal index 63fcaaa5..20e82407 100644 --- a/propellor.cabal +++ b/propellor.cabal @@ -161,6 +161,7 @@ Library Propellor.Shim Propellor.Property.Chroot.Util Utility.Applicative + Utility.ConcurrentOutput Utility.Data Utility.DataUnits Utility.Directory diff --git a/src/Propellor/Message.hs b/src/Propellor/Message.hs index 3792129b..3b06770c 100644 --- a/src/Propellor/Message.hs +++ b/src/Propellor/Message.hs @@ -1,5 +1,3 @@ -{-# LANGUAGE PackageImports #-} - -- | This module handles all display of output to the console when -- propellor is ensuring Properties. -- @@ -22,117 +20,34 @@ module Propellor.Message ( import System.Console.ANSI import System.IO -import System.Posix.IO -import "mtl" Control.Monad.Reader +import Control.Monad +import Control.Monad.IO.Class (liftIO, MonadIO) import Control.Applicative import Control.Monad.IfElse import System.IO.Unsafe (unsafePerformIO) import Control.Concurrent -import Control.Concurrent.Async -import Data.Maybe -import Data.Char -import Data.List -import Data.Monoid -import qualified Data.ByteString as B -import qualified System.Process as P import Propellor.Types +import Utility.ConcurrentOutput import Utility.PartialPrelude import Utility.Monad import Utility.Exception data MessageHandle = MessageHandle { isConsole :: Bool - , outputLock :: MVar () -- ^ empty when locked - , outputLockedBy :: MVar Locker } -data Locker - = GeneralLock - | ProcessLock P.ProcessHandle - -- | A shared global variable for the MessageHandle. {-# NOINLINE globalMessageHandle #-} globalMessageHandle :: MVar MessageHandle globalMessageHandle = unsafePerformIO $ newMVar =<< MessageHandle <$> hIsTerminalDevice stdout - <*> newMVar () - <*> newEmptyMVar -- | Gets the global MessageHandle. getMessageHandle :: IO MessageHandle getMessageHandle = readMVar globalMessageHandle --- | Takes a lock while performing an action. Any other threads --- that try to lockOutput at the same time will block. -lockOutput :: (MonadIO m, MonadMask m) => m a -> m a -lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock) - --- | Blocks until we have the output lock. -takeOutputLock :: IO () -takeOutputLock = void $ takeOutputLock' True - --- | Tries to take the output lock, without blocking. -tryTakeOutputLock :: IO Bool -tryTakeOutputLock = takeOutputLock' False - -takeOutputLock' :: Bool -> IO Bool -takeOutputLock' block = do - lck <- outputLock <$> getMessageHandle - go =<< tryTakeMVar lck - where - -- lck was full, and we've emptied it, so we hold the lock now. - go (Just ()) = havelock - -- lck is empty, so someone else is holding the lock. - go Nothing = do - lcker <- outputLockedBy <$> getMessageHandle - v' <- tryTakeMVar lcker - case v' of - Just (ProcessLock h) -> - -- if process has exited, lock is stale - ifM (isJust <$> P.getProcessExitCode h) - ( havelock - , if block - then do - void $ P.waitForProcess h - havelock - else do - putMVar lcker (ProcessLock h) - return False - ) - Just GeneralLock -> do - putMVar lcker GeneralLock - whenblock waitlock - Nothing -> whenblock waitlock - - havelock = do - updateOutputLocker GeneralLock - return True - waitlock = do - -- Wait for current lock holder to relinquish - -- it and take the lock. - lck <- outputLock <$> getMessageHandle - takeMVar lck - havelock - whenblock a = if block then a else return False - --- | Only safe to call after taking the output lock. -dropOutputLock :: IO () -dropOutputLock = do - lcker <- outputLockedBy <$> getMessageHandle - lck <- outputLock <$> getMessageHandle - void $ takeMVar lcker - putMVar lck () - --- | Only safe to call after takeOutputLock; updates the Locker. -updateOutputLocker :: Locker -> IO () -updateOutputLocker l = do - lcker <- outputLockedBy <$> getMessageHandle - void $ tryTakeMVar lcker - putMVar lcker l - modifyMVar_ lcker (const $ return l) - -- | Force console output. This can be used when stdout is not directly -- connected to a console, but is eventually going to be displayed at a -- console. @@ -237,116 +152,3 @@ messagesDone = lockOutput $ do whenConsole $ setTitle "propellor: done" hFlush stdout - --- | Wrapper around `System.Process.createProcess` that prevents --- multiple processes that are running concurrently from writing --- to stdout/stderr at the same time. --- --- The first process is allowed to write to --- stdout and stderr in the usual way. --- --- However, if another process runs concurrently with the --- first, any stdout or stderr that would have been displayed by it is --- instead buffered. The buffered output will be displayed the next time it --- is safe to do so (ie, after the first process exits). --- --- Also does debug logging of all commands run. --- --- Unless you manually import System.Process, every part of propellor --- that runs a process uses this. -createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -createProcessConcurrent p - | hasoutput (P.std_out p) || hasoutput (P.std_err p) = - ifM tryTakeOutputLock - ( firstprocess - , concurrentprocess - ) - | otherwise = P.createProcess p - where - hasoutput P.Inherit = True - hasoutput _ = False - - firstprocess = do - r@(_, _, _, h) <- P.createProcess p - `onException` dropOutputLock - updateOutputLocker (ProcessLock h) - -- Output lock is still held as we return; the process - -- is running now, and once it exits the output lock will - -- be stale and can then be taken by something else. - return r - - concurrentprocess = do - (toouth, fromouth) <- pipe - (toerrh, fromerrh) <- pipe - let p' = p - { P.std_out = if hasoutput (P.std_out p) - then P.UseHandle toouth - else P.std_out p - , P.std_err = if hasoutput (P.std_err p) - then P.UseHandle toerrh - else P.std_err p - } - r <- P.createProcess p' - hClose toouth - hClose toerrh - buf <- newMVar [] - void $ async $ outputDrainer fromouth stdout buf - void $ async $ outputDrainer fromerrh stderr buf - void $ async $ bufferWriter buf - return r - - pipe = do - (from, to) <- createPipe - (,) <$> fdToHandle to <*> fdToHandle from - -type Buffer = [(Handle, Maybe B.ByteString)] - --- Drain output from the handle, and buffer it in memory. -outputDrainer :: Handle -> Handle -> MVar Buffer -> IO () -outputDrainer fromh toh buf = do - v <- tryIO $ B.hGetSome fromh 1024 - case v of - Right b | not (B.null b) -> do - modifyMVar_ buf (pure . addBuffer (toh, Just b)) - outputDrainer fromh toh buf - _ -> do - modifyMVar_ buf (pure . (++ [(toh, Nothing)])) - hClose fromh - --- Wait to lock output, and once we can, display everything --- that's put into buffer, until the end is signaled by Nothing --- for both stdout and stderr. -bufferWriter :: MVar Buffer -> IO () -bufferWriter buf = lockOutput (go [stdout, stderr]) - where - go [] = return () - go hs = do - l <- takeMVar buf - forM_ l $ \(h, mb) -> do - maybe noop (B.hPut h) mb - hFlush h - let hs' = filter (\h -> not (any (== (h, Nothing)) l)) hs - putMVar buf [] - go hs' - --- The buffer can grow up to 1 mb in size, but after that point, --- it's truncated to avoid propellor using unbounded memory --- when a process outputs a whole lot of stuff. -bufsz :: Int -bufsz = 1000000 - -addBuffer :: (Handle, Maybe B.ByteString) -> Buffer -> Buffer -addBuffer v@(_, Nothing) buf = buf ++ [v] -addBuffer (toh, Just b) buf = (toh, Just b') : other - where - (this, other) = partition (\v -> fst v == toh && isJust (snd v)) buf - b' = truncateBuffer $ B.concat (mapMaybe snd this) <> b - --- Truncate a buffer by removing lines from the front until it's --- small enough. -truncateBuffer :: B.ByteString -> B.ByteString -truncateBuffer b - | B.length b <= bufsz = b - | otherwise = truncateBuffer $ snd $ B.breakByte nl b - where - nl = fromIntegral (ord '\n') diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs new file mode 100644 index 00000000..cf1d166e --- /dev/null +++ b/src/Utility/ConcurrentOutput.hs @@ -0,0 +1,224 @@ +-- | Concurrent output handling. +-- +-- When two threads both try to display a message concurrently, +-- the messages will be displayed sequentially. + +module Utility.ConcurrentOutput ( + lockOutput, + createProcessConcurrent, +) where + +import System.IO +import System.Posix.IO +import Control.Monad +import Control.Monad.IO.Class (liftIO, MonadIO) +import Control.Applicative +import System.IO.Unsafe (unsafePerformIO) +import Control.Concurrent +import Control.Concurrent.Async +import Data.Maybe +import Data.Char +import Data.List +import Data.Monoid +import qualified Data.ByteString as B +import qualified System.Process as P + +import Utility.Monad +import Utility.Exception + +data OutputHandle = OutputHandle + { outputLock :: MVar () -- ^ empty when locked + , outputLockedBy :: MVar Locker + } + +data Locker + = GeneralLock + | ProcessLock P.ProcessHandle + +-- | A shared global variable for the OutputHandle. +{-# NOINLINE globalOutputHandle #-} +globalOutputHandle :: MVar OutputHandle +globalOutputHandle = unsafePerformIO $ + newMVar =<< OutputHandle + <$> newMVar () + <*> newEmptyMVar + +-- | Gets the global OutputHandle. +getOutputHandle :: IO OutputHandle +getOutputHandle = readMVar globalOutputHandle + +-- | Holds a lock while performing an action. Any other threads +-- that try to lockOutput at the same time will block. +lockOutput :: (MonadIO m, MonadMask m) => m a -> m a +lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock) + +-- | Blocks until we have the output lock. +takeOutputLock :: IO () +takeOutputLock = void $ takeOutputLock' True + +-- | Tries to take the output lock, without blocking. +tryTakeOutputLock :: IO Bool +tryTakeOutputLock = takeOutputLock' False + +takeOutputLock' :: Bool -> IO Bool +takeOutputLock' block = do + lck <- outputLock <$> getOutputHandle + go =<< tryTakeMVar lck + where + -- lck was full, and we've emptied it, so we hold the lock now. + go (Just ()) = havelock + -- lck is empty, so someone else is holding the lock. + go Nothing = do + lcker <- outputLockedBy <$> getOutputHandle + v' <- tryTakeMVar lcker + case v' of + Just (ProcessLock h) -> + -- if process has exited, lock is stale + ifM (isJust <$> P.getProcessExitCode h) + ( havelock + , if block + then do + void $ P.waitForProcess h + havelock + else do + putMVar lcker (ProcessLock h) + return False + ) + Just GeneralLock -> do + putMVar lcker GeneralLock + whenblock waitlock + Nothing -> whenblock waitlock + + havelock = do + updateOutputLocker GeneralLock + return True + waitlock = do + -- Wait for current lock holder to relinquish + -- it and take the lock. + lck <- outputLock <$> getOutputHandle + takeMVar lck + havelock + whenblock a = if block then a else return False + +-- | Only safe to call after taking the output lock. +dropOutputLock :: IO () +dropOutputLock = do + lcker <- outputLockedBy <$> getOutputHandle + lck <- outputLock <$> getOutputHandle + void $ takeMVar lcker + putMVar lck () + +-- | Only safe to call after takeOutputLock; updates the Locker. +updateOutputLocker :: Locker -> IO () +updateOutputLocker l = do + lcker <- outputLockedBy <$> getOutputHandle + void $ tryTakeMVar lcker + putMVar lcker l + modifyMVar_ lcker (const $ return l) + +-- | Wrapper around `System.Process.createProcess` that prevents +-- multiple processes that are running concurrently from writing +-- to stdout/stderr at the same time. +-- +-- The first process is allowed to write to stdout and stderr in the usual way. +-- +-- However, if another process runs concurrently with the +-- first, any stdout or stderr that would have been displayed by it is +-- instead buffered. The buffered output will be displayed the next time it +-- is safe to do so (ie, after the first process exits). +createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) +createProcessConcurrent p + | hasoutput (P.std_out p) || hasoutput (P.std_err p) = + ifM tryTakeOutputLock + ( firstprocess + , concurrentprocess + ) + | otherwise = P.createProcess p + where + hasoutput P.Inherit = True + hasoutput _ = False + + firstprocess = do + r@(_, _, _, h) <- P.createProcess p + `onException` dropOutputLock + updateOutputLocker (ProcessLock h) + -- Output lock is still held as we return; the process + -- is running now, and once it exits the output lock will + -- be stale and can then be taken by something else. + return r + + concurrentprocess = do + (toouth, fromouth) <- pipe + (toerrh, fromerrh) <- pipe + let p' = p + { P.std_out = if hasoutput (P.std_out p) + then P.UseHandle toouth + else P.std_out p + , P.std_err = if hasoutput (P.std_err p) + then P.UseHandle toerrh + else P.std_err p + } + r <- P.createProcess p' + hClose toouth + hClose toerrh + buf <- newMVar [] + void $ async $ outputDrainer fromouth stdout buf + void $ async $ outputDrainer fromerrh stderr buf + void $ async $ bufferWriter buf + return r + + pipe = do + (from, to) <- createPipe + (,) <$> fdToHandle to <*> fdToHandle from + +type Buffer = [(Handle, Maybe B.ByteString)] + +-- Drain output from the handle, and buffer it in memory. +outputDrainer :: Handle -> Handle -> MVar Buffer -> IO () +outputDrainer fromh toh buf = do + v <- tryIO $ B.hGetSome fromh 1024 + case v of + Right b | not (B.null b) -> do + modifyMVar_ buf (pure . addBuffer (toh, Just b)) + outputDrainer fromh toh buf + _ -> do + modifyMVar_ buf (pure . (++ [(toh, Nothing)])) + hClose fromh + +-- Wait to lock output, and once we can, display everything +-- that's put into buffer, until the end is signaled by Nothing +-- for both stdout and stderr. +bufferWriter :: MVar Buffer -> IO () +bufferWriter buf = lockOutput (go [stdout, stderr]) + where + go [] = return () + go hs = do + l <- takeMVar buf + forM_ l $ \(h, mb) -> do + maybe noop (B.hPut h) mb + hFlush h + let hs' = filter (\h -> not (any (== (h, Nothing)) l)) hs + putMVar buf [] + go hs' + +-- The buffer can grow up to 1 mb in size, but after that point, +-- it's truncated to avoid propellor using unbounded memory +-- when a process outputs a whole lot of stuff. +bufsz :: Int +bufsz = 1000000 + +addBuffer :: (Handle, Maybe B.ByteString) -> Buffer -> Buffer +addBuffer v@(_, Nothing) buf = buf ++ [v] +addBuffer (toh, Just b) buf = (toh, Just b') : other + where + (this, other) = partition (\v -> fst v == toh && isJust (snd v)) buf + b' = truncateBuffer $ B.concat (mapMaybe snd this) <> b + +-- Truncate a buffer by removing lines from the front until it's +-- small enough. +truncateBuffer :: B.ByteString -> B.ByteString +truncateBuffer b + | B.length b <= bufsz = b + | otherwise = truncateBuffer $ snd $ B.breakByte nl b + where + nl = fromIntegral (ord '\n') diff --git a/src/Utility/Process/Shim.hs b/src/Utility/Process/Shim.hs index 0da93bf7..202b7c32 100644 --- a/src/Utility/Process/Shim.hs +++ b/src/Utility/Process/Shim.hs @@ -1,7 +1,7 @@ module Utility.Process.Shim (module X, createProcess) where import System.Process as X hiding (createProcess) -import Propellor.Message (createProcessConcurrent) +import Utility.ConcurrentOutput (createProcessConcurrent) import System.IO createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -- cgit v1.2.3 From 4e84fa68e3ea2a11e85d09860f2d6440d91e27d1 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 11:04:47 -0400 Subject: don't truncate over-large output; swap to temp files --- src/Utility/ConcurrentOutput.hs | 78 ++++++++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 33 deletions(-) (limited to 'src/Utility') diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index cf1d166e..186f881f 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -1,7 +1,4 @@ -- | Concurrent output handling. --- --- When two threads both try to display a message concurrently, --- the messages will be displayed sequentially. module Utility.ConcurrentOutput ( lockOutput, @@ -10,6 +7,7 @@ module Utility.ConcurrentOutput ( import System.IO import System.Posix.IO +import System.Directory import Control.Monad import Control.Monad.IO.Class (liftIO, MonadIO) import Control.Applicative @@ -17,7 +15,6 @@ import System.IO.Unsafe (unsafePerformIO) import Control.Concurrent import Control.Concurrent.Async import Data.Maybe -import Data.Char import Data.List import Data.Monoid import qualified Data.ByteString as B @@ -122,7 +119,7 @@ updateOutputLocker l = do -- -- The first process is allowed to write to stdout and stderr in the usual way. -- --- However, if another process runs concurrently with the +-- However, if another process is run concurrently with the -- first, any stdout or stderr that would have been displayed by it is -- instead buffered. The buffered output will be displayed the next time it -- is safe to do so (ie, after the first process exits). @@ -171,7 +168,13 @@ createProcessConcurrent p (from, to) <- createPipe (,) <$> fdToHandle to <*> fdToHandle from -type Buffer = [(Handle, Maybe B.ByteString)] +type Buffer = [(Handle, BufferedActivity)] + +data BufferedActivity + = ReachedEnd + | Output B.ByteString + | InTempFile FilePath + deriving (Eq) -- Drain output from the handle, and buffer it in memory. outputDrainer :: Handle -> Handle -> MVar Buffer -> IO () @@ -179,10 +182,10 @@ outputDrainer fromh toh buf = do v <- tryIO $ B.hGetSome fromh 1024 case v of Right b | not (B.null b) -> do - modifyMVar_ buf (pure . addBuffer (toh, Just b)) + modifyMVar_ buf $ addBuffer (toh, Output b) outputDrainer fromh toh buf _ -> do - modifyMVar_ buf (pure . (++ [(toh, Nothing)])) + modifyMVar_ buf $ pure . (++ [(toh, ReachedEnd)]) hClose fromh -- Wait to lock output, and once we can, display everything @@ -194,31 +197,40 @@ bufferWriter buf = lockOutput (go [stdout, stderr]) go [] = return () go hs = do l <- takeMVar buf - forM_ l $ \(h, mb) -> do - maybe noop (B.hPut h) mb - hFlush h - let hs' = filter (\h -> not (any (== (h, Nothing)) l)) hs + forM_ l $ \(h, ba) -> case ba of + Output b -> do + B.hPut h b + hFlush h + InTempFile tmp -> do + B.hPut h =<< B.readFile tmp + void $ tryWhenExists $ removeFile tmp + ReachedEnd -> return () + let hs' = filter (\h -> not (any (== (h, ReachedEnd)) l)) hs putMVar buf [] go hs' - --- The buffer can grow up to 1 mb in size, but after that point, --- it's truncated to avoid propellor using unbounded memory --- when a process outputs a whole lot of stuff. -bufsz :: Int -bufsz = 1000000 - -addBuffer :: (Handle, Maybe B.ByteString) -> Buffer -> Buffer -addBuffer v@(_, Nothing) buf = buf ++ [v] -addBuffer (toh, Just b) buf = (toh, Just b') : other + +-- Adds a value to the Buffer. When adding Output to a Handle, it's cheaper +-- to combine it with any already buffered Output to that same Handle. +-- +-- When the total buffered Output exceeds 1 mb in size, it's moved out of +-- memory, to a temp file. This should only happen rarely, but is done to +-- avoid some verbose process unexpectedly causing excessive memory use. +addBuffer :: (Handle, BufferedActivity) -> Buffer -> IO Buffer +addBuffer (toh, Output b) buf + | B.length b' <= 1000000 = return ((toh, Output b') : other) + | otherwise = do + tmpdir <- getTemporaryDirectory + (tmp, h) <- openTempFile tmpdir "output.tmp" + B.hPut h b' + hClose h + return ((toh, InTempFile tmp) : other) where - (this, other) = partition (\v -> fst v == toh && isJust (snd v)) buf - b' = truncateBuffer $ B.concat (mapMaybe snd this) <> b - --- Truncate a buffer by removing lines from the front until it's --- small enough. -truncateBuffer :: B.ByteString -> B.ByteString -truncateBuffer b - | B.length b <= bufsz = b - | otherwise = truncateBuffer $ snd $ B.breakByte nl b - where - nl = fromIntegral (ord '\n') + b' = B.concat (mapMaybe getOutput this) <> b + (this, other) = partition same buf + same v = fst v == toh && case snd v of + Output _ -> True + _ -> False + getOutput v = case snd v of + Output b'' -> Just b'' + _ -> Nothing +addBuffer v buf = return (buf ++ [v]) -- cgit v1.2.3 From 9b27331f216bc23ca8b548800652641e4b59e2a8 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 11:09:53 -0400 Subject: improve comment --- src/Utility/ConcurrentOutput.hs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) (limited to 'src/Utility') diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index 186f881f..3624ffbf 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -117,12 +117,17 @@ updateOutputLocker l = do -- multiple processes that are running concurrently from writing -- to stdout/stderr at the same time. -- --- The first process is allowed to write to stdout and stderr in the usual way. +-- If the process does not output to stdout or stderr, it's run +-- by createProcess entirely as usual. Only processes that can generate +-- output are handled specially: -- --- However, if another process is run concurrently with the --- first, any stdout or stderr that would have been displayed by it is --- instead buffered. The buffered output will be displayed the next time it --- is safe to do so (ie, after the first process exits). +-- A process is allowed to write to stdout and stderr in the usual +-- way, assuming it can successfully take the output lock. +-- +-- When the output lock is held (by another process or other caller of +-- `lockOutput`), the process is instead run with its stdout and stderr +-- redirected to a buffer. The buffered output will be displayed as soon +-- as the output lock becomes free. createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) createProcessConcurrent p | hasoutput (P.std_out p) || hasoutput (P.std_err p) = -- cgit v1.2.3 From 6d8a4009f5867fa42d9882fd691ff9af4b9884ed Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 11:32:16 -0400 Subject: propellor spin --- src/Utility/ConcurrentOutput.hs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'src/Utility') diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index 3624ffbf..35904cb7 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -132,14 +132,22 @@ createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Ma createProcessConcurrent p | hasoutput (P.std_out p) || hasoutput (P.std_err p) = ifM tryTakeOutputLock - ( firstprocess - , concurrentprocess + ( do + print ("FIRST", pc) + firstprocess + , do + print ("CONCURRENT", pc) + concurrentprocess ) | otherwise = P.createProcess p where hasoutput P.Inherit = True hasoutput _ = False + pc = case P.cmdspec p of + P.ShellCommand s -> s + P.RawCommand c ps -> unwords (c:ps) + firstprocess = do r@(_, _, _, h) <- P.createProcess p `onException` dropOutputLock -- cgit v1.2.3 From 21a74a3ffea3d48195d76486a56031b317fa23fa Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 11:44:05 -0400 Subject: propellor spin --- src/Propellor/Spin.hs | 5 ++++- src/Utility/ConcurrentOutput.hs | 28 ++++++++++------------------ 2 files changed, 14 insertions(+), 19 deletions(-) (limited to 'src/Utility') diff --git a/src/Propellor/Spin.hs b/src/Propellor/Spin.hs index 0c457705..36859fb7 100644 --- a/src/Propellor/Spin.hs +++ b/src/Propellor/Spin.hs @@ -206,7 +206,10 @@ updateServer target relay hst connect haveprecompiled getprivdata = sendRepoUrl toh loop (Just NeedPrivData) -> do - sendPrivData hn toh =<< getprivdata + print "START GET PRIVDATA" + pd <- getprivdata + print "GOT PRIVDATA" + sendPrivData hn toh pd loop (Just NeedGitClone) -> do hClose toh diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index 35904cb7..8a4bdcf2 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -130,23 +130,19 @@ updateOutputLocker l = do -- as the output lock becomes free. createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) createProcessConcurrent p - | hasoutput (P.std_out p) || hasoutput (P.std_err p) = + | willoutput (P.std_out p) || willoutput (P.std_err p) = ifM tryTakeOutputLock - ( do - print ("FIRST", pc) - firstprocess - , do - print ("CONCURRENT", pc) - concurrentprocess + ( firstprocess + , concurrentprocess ) | otherwise = P.createProcess p where - hasoutput P.Inherit = True - hasoutput _ = False + willoutput P.Inherit = True + willoutput _ = False - pc = case P.cmdspec p of - P.ShellCommand s -> s - P.RawCommand c ps -> unwords (c:ps) + rediroutput str h + | willoutput str = P.UseHandle h + | otherwise = str firstprocess = do r@(_, _, _, h) <- P.createProcess p @@ -161,12 +157,8 @@ createProcessConcurrent p (toouth, fromouth) <- pipe (toerrh, fromerrh) <- pipe let p' = p - { P.std_out = if hasoutput (P.std_out p) - then P.UseHandle toouth - else P.std_out p - , P.std_err = if hasoutput (P.std_err p) - then P.UseHandle toerrh - else P.std_err p + { P.std_out = rediroutput (P.std_out p) toouth + , P.std_err = rediroutput (P.std_err p) toerrh } r <- P.createProcess p' hClose toouth -- cgit v1.2.3 From 7a83dab6e977f61b3348aaa9f70bd2a288b4b631 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 12:19:49 -0400 Subject: use outputConcurrent interface This interface will fix the current deadlock when a process is running and the thread that ran it wants to output to the console. The locking and buffering is not implemented yet. --- src/Propellor/Message.hs | 91 +++++++++++++++++++---------------------- src/Propellor/Spin.hs | 2 +- src/Utility/ConcurrentOutput.hs | 21 ++++++++-- 3 files changed, 61 insertions(+), 53 deletions(-) (limited to 'src/Utility') diff --git a/src/Propellor/Message.hs b/src/Propellor/Message.hs index 3b06770c..6d541b9a 100644 --- a/src/Propellor/Message.hs +++ b/src/Propellor/Message.hs @@ -20,10 +20,8 @@ module Propellor.Message ( import System.Console.ANSI import System.IO -import Control.Monad import Control.Monad.IO.Class (liftIO, MonadIO) import Control.Applicative -import Control.Monad.IfElse import System.IO.Unsafe (unsafePerformIO) import Control.Concurrent @@ -55,10 +53,11 @@ forceConsole :: IO () forceConsole = modifyMVar_ globalMessageHandle $ \mh -> pure (mh { isConsole = True }) --- | Only performs the action when at the console, or when console --- output has been forced. -whenConsole :: IO () -> IO () -whenConsole a = whenM (isConsole <$> getMessageHandle) a +whenConsole :: String -> IO String +whenConsole s = ifM (isConsole <$> getMessageHandle) + ( pure s + , pure "" + ) -- | Shows a message while performing an action, with a colored status -- display. @@ -72,55 +71,54 @@ actionMessageOn = actionMessage' . Just actionMessage' :: (MonadIO m, MonadMask m, ActionResult r) => Maybe HostName -> Desc -> m r -> m r actionMessage' mhn desc a = do - liftIO $ whenConsole $ lockOutput $ do - setTitle $ "propellor: " ++ desc - hFlush stdout + liftIO $ outputConcurrent + =<< whenConsole (setTitleCode $ "propellor: " ++ desc) r <- a - liftIO $ lockOutput $ do - whenConsole $ - setTitle "propellor: running" - showhn mhn - putStr $ desc ++ " ... " - let (msg, intensity, color) = getActionResult r - colorLine intensity color msg - hFlush stdout + liftIO $ outputConcurrent . concat =<< sequence + [ whenConsole $ + setTitleCode "propellor: running" + , showhn mhn + , pure $ desc ++ " ... " + , let (msg, intensity, color) = getActionResult r + in colorLine intensity color msg + ] return r where - showhn Nothing = return () - showhn (Just hn) = do - whenConsole $ - setSGR [SetColor Foreground Dull Cyan] - putStr (hn ++ " ") - whenConsole $ - setSGR [] + showhn Nothing = return "" + showhn (Just hn) = concat <$> sequence + [ whenConsole $ + setSGRCode [SetColor Foreground Dull Cyan] + , pure (hn ++ " ") + , whenConsole $ + setSGRCode [] + ] warningMessage :: MonadIO m => String -> m () -warningMessage s = liftIO $ lockOutput $ - colorLine Vivid Magenta $ "** warning: " ++ s +warningMessage s = liftIO $ + outputConcurrent =<< colorLine Vivid Magenta ("** warning: " ++ s) infoMessage :: MonadIO m => [String] -> m () -infoMessage ls = liftIO $ lockOutput $ - mapM_ putStrLn ls +infoMessage ls = liftIO $ outputConcurrent $ concatMap (++ "\n") ls errorMessage :: MonadIO m => String -> m a -errorMessage s = liftIO $ lockOutput $ do - colorLine Vivid Red $ "** error: " ++ s +errorMessage s = liftIO $ do + outputConcurrent =<< colorLine Vivid Red ("** error: " ++ s) error "Cannot continue!" -colorLine :: ColorIntensity -> Color -> String -> IO () -colorLine intensity color msg = do - whenConsole $ - setSGR [SetColor Foreground intensity color] - putStr msg - whenConsole $ - setSGR [] +colorLine :: ColorIntensity -> Color -> String -> IO String +colorLine intensity color msg = concat <$> sequence + [ whenConsole $ + setSGRCode [SetColor Foreground intensity color] + , pure msg + , whenConsole $ + setSGRCode [] -- Note this comes after the color is reset, so that -- the color set and reset happen in the same line. - putStrLn "" - hFlush stdout + , pure "\n" + ] -- | Reads and displays each line from the Handle, except for the last line -- which is a Result. @@ -136,19 +134,14 @@ processChainOutput h = go Nothing Just l -> case readish l of Just r -> pure r Nothing -> do - lockOutput $ do - putStrLn l - hFlush stdout + outputConcurrent l return FailedChange Just s -> do - lockOutput $ do - maybe noop (\l -> unless (null l) (putStrLn l)) lastline - hFlush stdout + outputConcurrent $ + maybe "" (\l -> if null l then "" else l ++ "\n") lastline go (Just s) -- | Called when all messages about properties have been printed. messagesDone :: IO () -messagesDone = lockOutput $ do - whenConsole $ - setTitle "propellor: done" - hFlush stdout +messagesDone = outputConcurrent + =<< whenConsole (setTitleCode "propellor: done") diff --git a/src/Propellor/Spin.hs b/src/Propellor/Spin.hs index 8a40fc87..0c457705 100644 --- a/src/Propellor/Spin.hs +++ b/src/Propellor/Spin.hs @@ -206,7 +206,7 @@ updateServer target relay hst connect haveprecompiled getprivdata = sendRepoUrl toh loop (Just NeedPrivData) -> do - sendPrivData hn toh pd =<< getprivdata + sendPrivData hn toh =<< getprivdata loop (Just NeedGitClone) -> do hClose toh diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index 8a4bdcf2..0e9a59de 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -1,7 +1,7 @@ -- | Concurrent output handling. module Utility.ConcurrentOutput ( - lockOutput, + outputConcurrent, createProcessConcurrent, ) where @@ -113,6 +113,20 @@ updateOutputLocker l = do putMVar lcker l modifyMVar_ lcker (const $ return l) +-- | Displays a string to stdout, and flush output so it's displayed. +-- +-- Uses locking to ensure that the whole string is output atomically +-- even when other threads are concurrently generating output. +-- +-- When something else is writing to the console at the same time, this does +-- not block. It buffers the string, so it will be displayed once the other +-- writer is done. +outputConcurrent :: String -> IO () +outputConcurrent s = do + putStr s + hFlush stdout + -- TODO + -- | Wrapper around `System.Process.createProcess` that prevents -- multiple processes that are running concurrently from writing -- to stdout/stderr at the same time. @@ -124,8 +138,9 @@ updateOutputLocker l = do -- A process is allowed to write to stdout and stderr in the usual -- way, assuming it can successfully take the output lock. -- --- When the output lock is held (by another process or other caller of --- `lockOutput`), the process is instead run with its stdout and stderr +-- When the output lock is held (by another concurrent process, +-- or because `outputConcurrent` is being called at the same time), +-- the process is instead run with its stdout and stderr -- redirected to a buffer. The buffered output will be displayed as soon -- as the output lock becomes free. createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -- cgit v1.2.3 From 7f401a17aae36bc1baebadf98a26f1fb2de19731 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 12:26:28 -0400 Subject: propellor spin --- src/Utility/ConcurrentOutput.hs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'src/Utility') diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index 0e9a59de..193e757f 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -147,8 +147,12 @@ createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Ma createProcessConcurrent p | willoutput (P.std_out p) || willoutput (P.std_err p) = ifM tryTakeOutputLock - ( firstprocess - , concurrentprocess + ( do + print "IS NOT CONCURRENT" + firstprocess + , do + print "IS CONCURRENT" + concurrentprocess ) | otherwise = P.createProcess p where -- cgit v1.2.3 From f79fe8c0b16638c22a1094b5b2d7e4b62810d839 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 12:27:48 -0400 Subject: propellor spin --- src/Utility/ConcurrentOutput.hs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/Utility') diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index 193e757f..1ca92d90 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -148,10 +148,10 @@ createProcessConcurrent p | willoutput (P.std_out p) || willoutput (P.std_err p) = ifM tryTakeOutputLock ( do - print "IS NOT CONCURRENT" + hPutStrLn stderr "IS NOT CONCURRENT" firstprocess , do - print "IS CONCURRENT" + hPutStrLn stderr "IS CONCURRENT" concurrentprocess ) | otherwise = P.createProcess p -- cgit v1.2.3 From 68dbfe1b08c9cf1d976ac84ea53817c54fcd3479 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 12:41:15 -0400 Subject: need withConcurrentOutput to flush any buffered concurrent output --- src/Propellor/CmdLine.hs | 2 +- src/Propellor/Message.hs | 1 + src/Utility/ConcurrentOutput.hs | 13 +++++++++++++ src/wrapper.hs | 2 +- 4 files changed, 16 insertions(+), 2 deletions(-) (limited to 'src/Utility') diff --git a/src/Propellor/CmdLine.hs b/src/Propellor/CmdLine.hs index 9f798166..4bca3986 100644 --- a/src/Propellor/CmdLine.hs +++ b/src/Propellor/CmdLine.hs @@ -89,7 +89,7 @@ processCmdLine = go =<< getArgs -- | Runs propellor on hosts, as controlled by command-line options. defaultMain :: [Host] -> IO () -defaultMain hostlist = do +defaultMain hostlist = withConcurrentOutput $ do Shim.cleanEnv checkDebugMode cmdline <- processCmdLine diff --git a/src/Propellor/Message.hs b/src/Propellor/Message.hs index 6d541b9a..7439c362 100644 --- a/src/Propellor/Message.hs +++ b/src/Propellor/Message.hs @@ -16,6 +16,7 @@ module Propellor.Message ( processChainOutput, messagesDone, createProcessConcurrent, + withConcurrentOutput, ) where import System.Console.ANSI diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index 1ca92d90..c6550b84 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -1,6 +1,7 @@ -- | Concurrent output handling. module Utility.ConcurrentOutput ( + withConcurrentOutput, outputConcurrent, createProcessConcurrent, ) where @@ -113,6 +114,18 @@ updateOutputLocker l = do putMVar lcker l modifyMVar_ lcker (const $ return l) +-- | Use this around any IO actions that use `outputConcurrent` +-- or `createProcessConcurrent` +-- +-- This is necessary to ensure that buffered concurrent output actually +-- gets displayed before the program exits. +withConcurrentOutput :: IO a -> IO a +withConcurrentOutput a = a `finally` drain + where + -- Just taking the output lock is enough to ensure that anything + -- that was buffering output has had a chance to flush its buffer. + drain = lockOutput (return ()) + -- | Displays a string to stdout, and flush output so it's displayed. -- -- Uses locking to ensure that the whole string is output atomically diff --git a/src/wrapper.hs b/src/wrapper.hs index e367fe69..0cfe319d 100644 --- a/src/wrapper.hs +++ b/src/wrapper.hs @@ -50,7 +50,7 @@ netrepo :: String netrepo = "https://github.com/joeyh/propellor.git" main :: IO () -main = do +main = withConcurrentOutput $ do args <- getArgs home <- myHomeDir let propellordir = home ".propellor" -- cgit v1.2.3 From 111ea88d4d7c54e9ab7950962ad22528d54dd959 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 14:46:17 -0400 Subject: fix bad MVar use, use STM I had 2 MVars both involved in the same lock, and it seemed intractable to avoid deadlocks with them. STM makes it easy. At this point, the concurrent process stuff seems to work pretty well, but I'm not 100% sure it's not got some bugs. --- debian/changelog | 1 + debian/control | 2 + propellor.cabal | 6 +- src/Propellor/Bootstrap.hs | 3 +- src/Utility/ConcurrentOutput.hs | 173 ++++++++++++++++++++++------------------ 5 files changed, 104 insertions(+), 81 deletions(-) (limited to 'src/Utility') diff --git a/debian/changelog b/debian/changelog index 6c154e1a..f3522b7c 100644 --- a/debian/changelog +++ b/debian/changelog @@ -19,6 +19,7 @@ propellor (2.13.0) UNRELEASED; urgency=medium actions are combined (API change). * Added Propellor.Property.Concurrent for concurrent properties. * execProcess and everything built on it is now concurrent output safe. + * Propellor now depends on stm. * Add File.isCopyOf. Thanks, Per Olofsson. -- Joey Hess Sat, 24 Oct 2015 15:16:45 -0400 diff --git a/debian/control b/debian/control index 7f42c916..2956fdaa 100644 --- a/debian/control +++ b/debian/control @@ -17,6 +17,7 @@ Build-Depends: libghc-mtl-dev, libghc-transformers-dev, libghc-exceptions-dev (>= 0.6), + libghc-stm-dev, Maintainer: Gergely Nagy Standards-Version: 3.9.6 Vcs-Git: git://git.joeyh.name/propellor @@ -39,6 +40,7 @@ Depends: ${misc:Depends}, ${shlibs:Depends}, libghc-mtl-dev, libghc-transformers-dev, libghc-exceptions-dev (>= 0.6), + libghc-stm-dev, git, make, Description: property-based host configuration management in haskell diff --git a/propellor.cabal b/propellor.cabal index 20e82407..da43775f 100644 --- a/propellor.cabal +++ b/propellor.cabal @@ -39,7 +39,7 @@ Executable propellor Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5, IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal, containers (>= 0.5), network, async, time, QuickCheck, mtl, transformers, - exceptions (>= 0.6) + exceptions (>= 0.6), stm if (! os(windows)) Build-Depends: unix @@ -51,7 +51,7 @@ Executable propellor-config Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5, IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal, containers (>= 0.5), network, async, time, QuickCheck, mtl, transformers, - exceptions + exceptions, stm if (! os(windows)) Build-Depends: unix @@ -62,7 +62,7 @@ Library Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5, IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal, containers (>= 0.5), network, async, time, QuickCheck, mtl, transformers, - exceptions + exceptions, stm if (! os(windows)) Build-Depends: unix diff --git a/src/Propellor/Bootstrap.hs b/src/Propellor/Bootstrap.hs index 6a5d5acb..2318b910 100644 --- a/src/Propellor/Bootstrap.hs +++ b/src/Propellor/Bootstrap.hs @@ -65,7 +65,7 @@ depsCommand = "( " ++ intercalate " ; " (concat [osinstall, cabalinstall]) ++ " aptinstall p = "apt-get --no-upgrade --no-install-recommends -y install " ++ p - -- This is the same build deps listed in debian/control. + -- This is the same deps listed in debian/control. debdeps = [ "gnupg" , "ghc" @@ -81,6 +81,7 @@ depsCommand = "( " ++ intercalate " ; " (concat [osinstall, cabalinstall]) ++ " , "libghc-mtl-dev" , "libghc-transformers-dev" , "libghc-exceptions-dev" + , "libghc-stm-dev" , "make" ] diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index c6550b84..5535066f 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE BangPatterns #-} + -- | Concurrent output handling. module Utility.ConcurrentOutput ( @@ -14,6 +16,7 @@ import Control.Monad.IO.Class (liftIO, MonadIO) import Control.Applicative import System.IO.Unsafe (unsafePerformIO) import Control.Concurrent +import Control.Concurrent.STM import Control.Concurrent.Async import Data.Maybe import Data.List @@ -25,21 +28,23 @@ import Utility.Monad import Utility.Exception data OutputHandle = OutputHandle - { outputLock :: MVar () -- ^ empty when locked - , outputLockedBy :: MVar Locker + { outputLock :: TMVar (Maybe Locker) } data Locker = GeneralLock - | ProcessLock P.ProcessHandle + | ProcessLock P.ProcessHandle String + +instance Show Locker where + show GeneralLock = "GeneralLock" + show (ProcessLock _ cmd) = "ProcessLock " ++ cmd -- | A shared global variable for the OutputHandle. {-# NOINLINE globalOutputHandle #-} globalOutputHandle :: MVar OutputHandle globalOutputHandle = unsafePerformIO $ newMVar =<< OutputHandle - <$> newMVar () - <*> newEmptyMVar + <$> newTMVarIO Nothing -- | Gets the global OutputHandle. getOutputHandle :: IO OutputHandle @@ -58,61 +63,69 @@ takeOutputLock = void $ takeOutputLock' True tryTakeOutputLock :: IO Bool tryTakeOutputLock = takeOutputLock' False -takeOutputLock' :: Bool -> IO Bool -takeOutputLock' block = do +withLock :: (TMVar (Maybe Locker) -> STM a) -> IO a +withLock a = do lck <- outputLock <$> getOutputHandle - go =<< tryTakeMVar lck + atomically (a lck) + +-- The lock TMVar is kept full normally, even if only with Nothing, +-- so if we take it here, that blocks anyone else from trying +-- to take the lock while we are checking it. +takeOutputLock' :: Bool -> IO Bool +takeOutputLock' block = go =<< withLock tryTakeTMVar where - -- lck was full, and we've emptied it, so we hold the lock now. - go (Just ()) = havelock - -- lck is empty, so someone else is holding the lock. - go Nothing = do - lcker <- outputLockedBy <$> getOutputHandle - v' <- tryTakeMVar lcker - case v' of - Just (ProcessLock h) -> - -- if process has exited, lock is stale - ifM (isJust <$> P.getProcessExitCode h) - ( havelock - , if block - then do - void $ P.waitForProcess h - havelock - else do - putMVar lcker (ProcessLock h) - return False - ) - Just GeneralLock -> do - putMVar lcker GeneralLock - whenblock waitlock - Nothing -> whenblock waitlock + go Nothing = whenblock waitlock + -- Something has the lock. It may be stale, so check it. + -- We must always be sure to fill the TMVar back with Just or Nothing. + go (Just orig) = case orig of + Nothing -> havelock + (Just (ProcessLock h _)) -> + -- when process has exited, lock is stale + ifM (isJust <$> P.getProcessExitCode h) + ( havelock + , if block + then do + hPutStr stderr "WAITFORPROCESS in lock" + hFlush stderr + void $ P.waitForProcess h + hPutStr stderr "WAITFORPROCESS in lock done" + hFlush stderr + havelock + else do + withLock (`putTMVar` orig) + return False + ) + (Just GeneralLock) -> do + withLock (`putTMVar` orig) + whenblock waitlock havelock = do - updateOutputLocker GeneralLock + withLock (`putTMVar` Just GeneralLock) return True - waitlock = do - -- Wait for current lock holder to relinquish - -- it and take the lock. - lck <- outputLock <$> getOutputHandle - takeMVar lck - havelock + + -- Wait for current lock holder (if any) to relinquish + -- it and take the lock for ourselves. + waitlock = withLock $ \l -> do + v <- tryTakeTMVar l + case v of + Just (Just _) -> retry + _ -> do + putTMVar l (Just GeneralLock) + return True + whenblock a = if block then a else return False -- | Only safe to call after taking the output lock. dropOutputLock :: IO () -dropOutputLock = do - lcker <- outputLockedBy <$> getOutputHandle - lck <- outputLock <$> getOutputHandle - void $ takeMVar lcker - putMVar lck () +dropOutputLock = withLock $ \l -> do + void $ takeTMVar l + putTMVar l Nothing -- | Only safe to call after takeOutputLock; updates the Locker. updateOutputLocker :: Locker -> IO () -updateOutputLocker l = do - lcker <- outputLockedBy <$> getOutputHandle - void $ tryTakeMVar lcker - putMVar lcker l - modifyMVar_ lcker (const $ return l) +updateOutputLocker locker = withLock $ \l -> do + void $ takeTMVar l + putTMVar l (Just locker) -- | Use this around any IO actions that use `outputConcurrent` -- or `createProcessConcurrent` @@ -124,7 +137,7 @@ withConcurrentOutput a = a `finally` drain where -- Just taking the output lock is enough to ensure that anything -- that was buffering output has had a chance to flush its buffer. - drain = lockOutput (return ()) + drain = lockOutput noop -- | Displays a string to stdout, and flush output so it's displayed. -- @@ -158,28 +171,25 @@ outputConcurrent s = do -- as the output lock becomes free. createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) createProcessConcurrent p - | willoutput (P.std_out p) || willoutput (P.std_err p) = + | willOutput (P.std_out p) || willOutput (P.std_err p) = ifM tryTakeOutputLock - ( do - hPutStrLn stderr "IS NOT CONCURRENT" - firstprocess - , do - hPutStrLn stderr "IS CONCURRENT" - concurrentprocess + ( firstprocess + , concurrentprocess ) | otherwise = P.createProcess p where - willoutput P.Inherit = True - willoutput _ = False + rediroutput ss h + | willOutput ss = P.UseHandle h + | otherwise = ss - rediroutput str h - | willoutput str = P.UseHandle h - | otherwise = str + cmd = case P.cmdspec p of + P.ShellCommand s -> s + P.RawCommand c ps -> unwords (c:ps) firstprocess = do r@(_, _, _, h) <- P.createProcess p `onException` dropOutputLock - updateOutputLocker (ProcessLock h) + updateOutputLocker (ProcessLock h cmd) -- Output lock is still held as we return; the process -- is running now, and once it exits the output lock will -- be stale and can then be taken by something else. @@ -196,8 +206,8 @@ createProcessConcurrent p hClose toouth hClose toerrh buf <- newMVar [] - void $ async $ outputDrainer fromouth stdout buf - void $ async $ outputDrainer fromerrh stderr buf + void $ async $ outputDrainer (P.std_out p) fromouth stdout buf + void $ async $ outputDrainer (P.std_err p) fromerrh stderr buf void $ async $ bufferWriter buf return r @@ -205,6 +215,10 @@ createProcessConcurrent p (from, to) <- createPipe (,) <$> fdToHandle to <*> fdToHandle from +willOutput :: P.StdStream -> Bool +willOutput P.Inherit = True +willOutput _ = False + type Buffer = [(Handle, BufferedActivity)] data BufferedActivity @@ -213,17 +227,22 @@ data BufferedActivity | InTempFile FilePath deriving (Eq) --- Drain output from the handle, and buffer it in memory. -outputDrainer :: Handle -> Handle -> MVar Buffer -> IO () -outputDrainer fromh toh buf = do - v <- tryIO $ B.hGetSome fromh 1024 - case v of - Right b | not (B.null b) -> do - modifyMVar_ buf $ addBuffer (toh, Output b) - outputDrainer fromh toh buf - _ -> do - modifyMVar_ buf $ pure . (++ [(toh, ReachedEnd)]) - hClose fromh +-- Drain output from the handle, and buffer it. +outputDrainer :: P.StdStream -> Handle -> Handle -> MVar Buffer -> IO () +outputDrainer ss fromh toh buf + | willOutput ss = go + | otherwise = atend + where + go = do + v <- tryIO $ B.hGetSome fromh 1024 + case v of + Right b | not (B.null b) -> do + modifyMVar_ buf $ addBuffer (toh, Output b) + go + _ -> atend + atend = do + modifyMVar_ buf $ pure . (++ [(toh, ReachedEnd)]) + hClose fromh -- Wait to lock output, and once we can, display everything -- that's put into buffer, until the end is signaled by Nothing @@ -262,8 +281,8 @@ addBuffer (toh, Output b) buf hClose h return ((toh, InTempFile tmp) : other) where - b' = B.concat (mapMaybe getOutput this) <> b - (this, other) = partition same buf + !b' = B.concat (mapMaybe getOutput this) <> b + !(this, other) = partition same buf same v = fst v == toh && case snd v of Output _ -> True _ -> False -- cgit v1.2.3 From a882ac7eefa405993ba903f19c51134341ba457c Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 14:51:30 -0400 Subject: fix tricky race Race between 2 calls to takeOutputLock'. The first call empties the TMVar, and does some work to check it. Meanwhile, the second call could sneak in, see it was empty, and call waitlock. Since waitlock used tryTakeTMVar, that would not block it, and it would think it had the lock, filling the TMVar. In the meantime, the first call could decide it had to lock and go on to possibly cause trouble. --- src/Utility/ConcurrentOutput.hs | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) (limited to 'src/Utility') diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index 5535066f..3c072cf4 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -74,7 +74,7 @@ withLock a = do takeOutputLock' :: Bool -> IO Bool takeOutputLock' block = go =<< withLock tryTakeTMVar where - go Nothing = whenblock waitlock + go Nothing = whenblock waitlockchange -- Something has the lock. It may be stale, so check it. -- We must always be sure to fill the TMVar back with Just or Nothing. go (Just orig) = case orig of @@ -85,11 +85,7 @@ takeOutputLock' block = go =<< withLock tryTakeTMVar ( havelock , if block then do - hPutStr stderr "WAITFORPROCESS in lock" - hFlush stderr void $ P.waitForProcess h - hPutStr stderr "WAITFORPROCESS in lock done" - hFlush stderr havelock else do withLock (`putTMVar` orig) @@ -97,21 +93,16 @@ takeOutputLock' block = go =<< withLock tryTakeTMVar ) (Just GeneralLock) -> do withLock (`putTMVar` orig) - whenblock waitlock + whenblock waitlockchange havelock = do withLock (`putTMVar` Just GeneralLock) return True - -- Wait for current lock holder (if any) to relinquish - -- it and take the lock for ourselves. - waitlock = withLock $ \l -> do - v <- tryTakeTMVar l - case v of - Just (Just _) -> retry - _ -> do - putTMVar l (Just GeneralLock) - return True + -- Wait for the lock to change, and try again. + waitlockchange = do + void $ withLock readTMVar + takeOutputLock' block whenblock a = if block then a else return False -- cgit v1.2.3 From 644ce3f6e8876ee4bbecba6d1bf5b74a612d82e4 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 15:41:26 -0400 Subject: work around waitForProcess race condition https://github.com/haskell/process/issues/46 --- src/Utility/ConcurrentOutput.hs | 18 +++++++++++++++++- src/Utility/Process/Shim.hs | 10 +++++++--- 2 files changed, 24 insertions(+), 4 deletions(-) (limited to 'src/Utility') diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index 3c072cf4..0f1cf9d3 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -6,6 +6,7 @@ module Utility.ConcurrentOutput ( withConcurrentOutput, outputConcurrent, createProcessConcurrent, + waitForProcessConcurrent, ) where import System.IO @@ -23,6 +24,7 @@ import Data.List import Data.Monoid import qualified Data.ByteString as B import qualified System.Process as P +import System.Exit import Utility.Monad import Utility.Exception @@ -85,7 +87,7 @@ takeOutputLock' block = go =<< withLock tryTakeTMVar ( havelock , if block then do - void $ P.waitForProcess h + void $ waitForProcessConcurrent h havelock else do withLock (`putTMVar` orig) @@ -206,6 +208,20 @@ createProcessConcurrent p (from, to) <- createPipe (,) <$> fdToHandle to <*> fdToHandle from +-- | This must be used to wait for processes started with +-- `createProcessConcurrent`. +-- +-- This is necessary because `System.Process.waitForProcess` has a +-- race condition when two threads check the same process. If the race +-- is triggered, one thread will successfully wait, but the other +-- throws a DoesNotExist exception. +waitForProcessConcurrent :: P.ProcessHandle -> IO ExitCode +waitForProcessConcurrent h = do + v <- tryWhenExists (P.waitForProcess h) + case v of + Just r -> return r + Nothing -> maybe (waitForProcessConcurrent h) return =<< P.getProcessExitCode h + willOutput :: P.StdStream -> Bool willOutput P.Inherit = True willOutput _ = False diff --git a/src/Utility/Process/Shim.hs b/src/Utility/Process/Shim.hs index 202b7c32..08694d5d 100644 --- a/src/Utility/Process/Shim.hs +++ b/src/Utility/Process/Shim.hs @@ -1,8 +1,12 @@ -module Utility.Process.Shim (module X, createProcess) where +module Utility.Process.Shim (module X, createProcess, waitForProcess) where -import System.Process as X hiding (createProcess) -import Utility.ConcurrentOutput (createProcessConcurrent) +import System.Process as X hiding (createProcess, waitForProcess) +import Utility.ConcurrentOutput (createProcessConcurrent, waitForProcessConcurrent) import System.IO +import System.Exit createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) createProcess = createProcessConcurrent + +waitForProcess :: ProcessHandle -> IO ExitCode +waitForProcess = waitForProcessConcurrent -- cgit v1.2.3 From d61e3866d794635de5875d7292861fb49ad0340a Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 15:47:23 -0400 Subject: fix buffer order Build it up reversed, and reverse when processing. --- src/Utility/ConcurrentOutput.hs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/Utility') diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index 0f1cf9d3..5bf973de 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -248,7 +248,7 @@ outputDrainer ss fromh toh buf go _ -> atend atend = do - modifyMVar_ buf $ pure . (++ [(toh, ReachedEnd)]) + modifyMVar_ buf $ pure . ((toh, ReachedEnd) :) hClose fromh -- Wait to lock output, and once we can, display everything @@ -260,7 +260,7 @@ bufferWriter buf = lockOutput (go [stdout, stderr]) go [] = return () go hs = do l <- takeMVar buf - forM_ l $ \(h, ba) -> case ba of + forM_ (reverse l) $ \(h, ba) -> case ba of Output b -> do B.hPut h b hFlush h -- cgit v1.2.3 From 5cde1ed21cc912db0b53846196f920fe52835dbc Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 17:03:17 -0400 Subject: fix memory leak, and optimise when command output is very large --- debian/changelog | 3 ++ propellor.cabal | 6 +-- src/Utility/ConcurrentOutput.hs | 113 +++++++++++++++++++++++----------------- 3 files changed, 72 insertions(+), 50 deletions(-) (limited to 'src/Utility') diff --git a/debian/changelog b/debian/changelog index 8c4715f5..c5538c7f 100644 --- a/debian/changelog +++ b/debian/changelog @@ -21,6 +21,9 @@ propellor (2.13.0) UNRELEASED; urgency=medium * Made the execProcess exported by propellor, and everything built on it, avoid scrambled output when run concurrently. * Propellor now depends on STM. + * The cabal file now builds propellor with -O. While -O0 makes ghc + take less memory while building propellor, it can lead to bad memory + usage at runtime due to eg, disabled stream fusion. * Add File.isCopyOf. Thanks, Per Olofsson. -- Joey Hess Sat, 24 Oct 2015 15:16:45 -0400 diff --git a/propellor.cabal b/propellor.cabal index da43775f..a07109a7 100644 --- a/propellor.cabal +++ b/propellor.cabal @@ -34,7 +34,7 @@ Description: Executable propellor Main-Is: wrapper.hs - GHC-Options: -threaded -O0 -Wall -fno-warn-tabs + GHC-Options: -threaded -O -Wall -fno-warn-tabs Hs-Source-Dirs: src Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5, IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal, @@ -46,7 +46,7 @@ Executable propellor Executable propellor-config Main-Is: config.hs - GHC-Options: -threaded -O0 -Wall -fno-warn-tabs + GHC-Options: -threaded -O -Wall -fno-warn-tabs Hs-Source-Dirs: src Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5, IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal, @@ -57,7 +57,7 @@ Executable propellor-config Build-Depends: unix Library - GHC-Options: -O0 -Wall -fno-warn-tabs + GHC-Options: -O -Wall -fno-warn-tabs Hs-Source-Dirs: src Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5, IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal, diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index 5bf973de..be1562ac 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -1,8 +1,11 @@ {-# LANGUAGE BangPatterns #-} +{-# OPTIONS_GHC -fno-warn-tabs #-} -- | Concurrent output handling. module Utility.ConcurrentOutput ( + takeOutputLock, + dropOutputLock, withConcurrentOutput, outputConcurrent, createProcessConcurrent, @@ -146,6 +149,20 @@ outputConcurrent s = do hFlush stdout -- TODO +-- | This must be used to wait for processes started with +-- `createProcessConcurrent`. +-- +-- This is necessary because `System.Process.waitForProcess` has a +-- race condition when two threads check the same process. If the race +-- is triggered, one thread will successfully wait, but the other +-- throws a DoesNotExist exception. +waitForProcessConcurrent :: P.ProcessHandle -> IO ExitCode +waitForProcessConcurrent h = do + v <- tryWhenExists (P.waitForProcess h) + case v of + Just r -> return r + Nothing -> maybe (waitForProcessConcurrent h) return =<< P.getProcessExitCode h + -- | Wrapper around `System.Process.createProcess` that prevents -- multiple processes that are running concurrently from writing -- to stdout/stderr at the same time. @@ -196,37 +213,20 @@ createProcessConcurrent p , P.std_err = rediroutput (P.std_err p) toerrh } r <- P.createProcess p' - hClose toouth - hClose toerrh - buf <- newMVar [] - void $ async $ outputDrainer (P.std_out p) fromouth stdout buf - void $ async $ outputDrainer (P.std_err p) fromerrh stderr buf - void $ async $ bufferWriter buf + outbuf <- setupBuffer stdout toouth (P.std_out p) fromouth + errbuf <- setupBuffer stderr toerrh (P.std_err p) fromerrh + void $ async $ bufferWriter [outbuf, errbuf] return r pipe = do (from, to) <- createPipe (,) <$> fdToHandle to <*> fdToHandle from --- | This must be used to wait for processes started with --- `createProcessConcurrent`. --- --- This is necessary because `System.Process.waitForProcess` has a --- race condition when two threads check the same process. If the race --- is triggered, one thread will successfully wait, but the other --- throws a DoesNotExist exception. -waitForProcessConcurrent :: P.ProcessHandle -> IO ExitCode -waitForProcessConcurrent h = do - v <- tryWhenExists (P.waitForProcess h) - case v of - Just r -> return r - Nothing -> maybe (waitForProcessConcurrent h) return =<< P.getProcessExitCode h - willOutput :: P.StdStream -> Bool willOutput P.Inherit = True willOutput _ = False -type Buffer = [(Handle, BufferedActivity)] +type Buffer = [BufferedActivity] data BufferedActivity = ReachedEnd @@ -234,43 +234,62 @@ data BufferedActivity | InTempFile FilePath deriving (Eq) +instance Show BufferedActivity where + show ReachedEnd = "ReachedEnd" + show (Output b) = "Output " ++ show (B.length b) + show (InTempFile t) = "InTempFile " ++ t + +setupBuffer :: Handle -> Handle -> P.StdStream -> Handle -> IO (Handle, MVar Buffer, TMVar ()) +setupBuffer h toh ss fromh = do + hClose toh + buf <- newMVar [] + bufsig <- atomically newEmptyTMVar + void $ async $ outputDrainer ss fromh buf bufsig + return (h, buf, bufsig) + -- Drain output from the handle, and buffer it. -outputDrainer :: P.StdStream -> Handle -> Handle -> MVar Buffer -> IO () -outputDrainer ss fromh toh buf +outputDrainer :: P.StdStream -> Handle -> MVar Buffer -> TMVar () -> IO () +outputDrainer ss fromh buf bufsig | willOutput ss = go | otherwise = atend where go = do - v <- tryIO $ B.hGetSome fromh 1024 + v <- tryIO $ B.hGetSome fromh 1048576 case v of Right b | not (B.null b) -> do - modifyMVar_ buf $ addBuffer (toh, Output b) + modifyMVar_ buf $ addBuffer (Output b) + changed go _ -> atend atend = do - modifyMVar_ buf $ pure . ((toh, ReachedEnd) :) + modifyMVar_ buf $ pure . (ReachedEnd :) + changed hClose fromh + changed = atomically $ do + void $ tryTakeTMVar bufsig + putTMVar bufsig () -- Wait to lock output, and once we can, display everything --- that's put into buffer, until the end is signaled by Nothing --- for both stdout and stderr. -bufferWriter :: MVar Buffer -> IO () -bufferWriter buf = lockOutput (go [stdout, stderr]) +-- that's put into the buffers. +bufferWriter :: [(Handle, MVar Buffer, TMVar ())] -> IO () +bufferWriter = void . lockOutput . mapConcurrently go where - go [] = return () - go hs = do + go v@(outh, buf, bufsig) = do + atomically $ takeTMVar bufsig l <- takeMVar buf - forM_ (reverse l) $ \(h, ba) -> case ba of + putMVar buf [] + forM_ (reverse l) $ \ba -> case ba of Output b -> do - B.hPut h b - hFlush h + B.hPut outh b + hFlush outh + return () InTempFile tmp -> do - B.hPut h =<< B.readFile tmp + B.hPut outh =<< B.readFile tmp void $ tryWhenExists $ removeFile tmp ReachedEnd -> return () - let hs' = filter (\h -> not (any (== (h, ReachedEnd)) l)) hs - putMVar buf [] - go hs' + if any (== ReachedEnd) l + then return () + else go v -- Adds a value to the Buffer. When adding Output to a Handle, it's cheaper -- to combine it with any already buffered Output to that same Handle. @@ -278,22 +297,22 @@ bufferWriter buf = lockOutput (go [stdout, stderr]) -- When the total buffered Output exceeds 1 mb in size, it's moved out of -- memory, to a temp file. This should only happen rarely, but is done to -- avoid some verbose process unexpectedly causing excessive memory use. -addBuffer :: (Handle, BufferedActivity) -> Buffer -> IO Buffer -addBuffer (toh, Output b) buf - | B.length b' <= 1000000 = return ((toh, Output b') : other) +addBuffer :: BufferedActivity -> Buffer -> IO Buffer +addBuffer (Output b) buf + | B.length b' <= 1048576 = return (Output b' : other) | otherwise = do tmpdir <- getTemporaryDirectory (tmp, h) <- openTempFile tmpdir "output.tmp" B.hPut h b' hClose h - return ((toh, InTempFile tmp) : other) + return (InTempFile tmp : other) where !b' = B.concat (mapMaybe getOutput this) <> b - !(this, other) = partition same buf - same v = fst v == toh && case snd v of + !(this, other) = partition isOutput buf + isOutput v = case v of Output _ -> True _ -> False - getOutput v = case snd v of + getOutput v = case v of Output b'' -> Just b'' _ -> Nothing -addBuffer v buf = return (buf ++ [v]) +addBuffer v buf = return (v:buf) -- cgit v1.2.3 From 80e28e5e4e97e6b11b54c9f086601f84c2d27440 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 17:04:17 -0400 Subject: remove debug code --- src/Utility/ConcurrentOutput.hs | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) (limited to 'src/Utility') diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index be1562ac..091513d0 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -38,11 +38,7 @@ data OutputHandle = OutputHandle data Locker = GeneralLock - | ProcessLock P.ProcessHandle String - -instance Show Locker where - show GeneralLock = "GeneralLock" - show (ProcessLock _ cmd) = "ProcessLock " ++ cmd + | ProcessLock P.ProcessHandle -- | A shared global variable for the OutputHandle. {-# NOINLINE globalOutputHandle #-} @@ -84,7 +80,7 @@ takeOutputLock' block = go =<< withLock tryTakeTMVar -- We must always be sure to fill the TMVar back with Just or Nothing. go (Just orig) = case orig of Nothing -> havelock - (Just (ProcessLock h _)) -> + (Just (ProcessLock h)) -> -- when process has exited, lock is stale ifM (isJust <$> P.getProcessExitCode h) ( havelock @@ -192,14 +188,10 @@ createProcessConcurrent p | willOutput ss = P.UseHandle h | otherwise = ss - cmd = case P.cmdspec p of - P.ShellCommand s -> s - P.RawCommand c ps -> unwords (c:ps) - firstprocess = do r@(_, _, _, h) <- P.createProcess p `onException` dropOutputLock - updateOutputLocker (ProcessLock h cmd) + updateOutputLocker (ProcessLock h) -- Output lock is still held as we return; the process -- is running now, and once it exits the output lock will -- be stale and can then be taken by something else. @@ -234,11 +226,6 @@ data BufferedActivity | InTempFile FilePath deriving (Eq) -instance Show BufferedActivity where - show ReachedEnd = "ReachedEnd" - show (Output b) = "Output " ++ show (B.length b) - show (InTempFile t) = "InTempFile " ++ t - setupBuffer :: Handle -> Handle -> P.StdStream -> Handle -> IO (Handle, MVar Buffer, TMVar ()) setupBuffer h toh ss fromh = do hClose toh -- cgit v1.2.3 From 213cfc8f7253d6055883f6b3fb213cb4e0dffdc0 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 19:38:41 -0400 Subject: better lock taking using STM, and wait for concurrent processes writer threads on shutdown --- src/Utility/ConcurrentOutput.hs | 158 +++++++++++++++++++++------------------- 1 file changed, 83 insertions(+), 75 deletions(-) (limited to 'src/Utility') diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index 091513d0..a8de8ed1 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -15,6 +15,7 @@ module Utility.ConcurrentOutput ( import System.IO import System.Posix.IO import System.Directory +import System.Exit import Control.Monad import Control.Monad.IO.Class (liftIO, MonadIO) import Control.Applicative @@ -27,25 +28,28 @@ import Data.List import Data.Monoid import qualified Data.ByteString as B import qualified System.Process as P -import System.Exit +import qualified Data.Set as S import Utility.Monad import Utility.Exception +import Utility.FileSystemEncoding data OutputHandle = OutputHandle - { outputLock :: TMVar (Maybe Locker) + { outputLock :: TMVar Lock + , outputBuffer :: TMVar Buffer + , outputThreads :: TMVar (S.Set (Async ())) } -data Locker - = GeneralLock - | ProcessLock P.ProcessHandle +data Lock = Locked -- | A shared global variable for the OutputHandle. {-# NOINLINE globalOutputHandle #-} globalOutputHandle :: MVar OutputHandle globalOutputHandle = unsafePerformIO $ newMVar =<< OutputHandle - <$> newTMVarIO Nothing + <$> newEmptyTMVarIO + <*> newTMVarIO [] + <*> newTMVarIO S.empty -- | Gets the global OutputHandle. getOutputHandle :: IO OutputHandle @@ -64,60 +68,34 @@ takeOutputLock = void $ takeOutputLock' True tryTakeOutputLock :: IO Bool tryTakeOutputLock = takeOutputLock' False -withLock :: (TMVar (Maybe Locker) -> STM a) -> IO a +withLock :: (TMVar Lock -> STM a) -> IO a withLock a = do lck <- outputLock <$> getOutputHandle atomically (a lck) --- The lock TMVar is kept full normally, even if only with Nothing, --- so if we take it here, that blocks anyone else from trying --- to take the lock while we are checking it. takeOutputLock' :: Bool -> IO Bool -takeOutputLock' block = go =<< withLock tryTakeTMVar - where - go Nothing = whenblock waitlockchange - -- Something has the lock. It may be stale, so check it. - -- We must always be sure to fill the TMVar back with Just or Nothing. - go (Just orig) = case orig of - Nothing -> havelock - (Just (ProcessLock h)) -> - -- when process has exited, lock is stale - ifM (isJust <$> P.getProcessExitCode h) - ( havelock - , if block - then do - void $ waitForProcessConcurrent h - havelock - else do - withLock (`putTMVar` orig) - return False - ) - (Just GeneralLock) -> do - withLock (`putTMVar` orig) - whenblock waitlockchange - - havelock = do - withLock (`putTMVar` Just GeneralLock) - return True - - -- Wait for the lock to change, and try again. - waitlockchange = do - void $ withLock readTMVar - takeOutputLock' block - - whenblock a = if block then a else return False +takeOutputLock' block = do + locked <- withLock $ \l -> do + v <- tryTakeTMVar l + case v of + Just Locked + | block -> retry + | otherwise -> do + -- Restore value we took. + putTMVar l Locked + return False + Nothing -> do + putTMVar l Locked + return True + when locked $ do + bv <- outputBuffer <$> getOutputHandle + buf <- atomically $ swapTMVar bv [] + emitBuffer stdout buf + return locked -- | Only safe to call after taking the output lock. dropOutputLock :: IO () -dropOutputLock = withLock $ \l -> do - void $ takeTMVar l - putTMVar l Nothing - --- | Only safe to call after takeOutputLock; updates the Locker. -updateOutputLocker :: Locker -> IO () -updateOutputLocker locker = withLock $ \l -> do - void $ takeTMVar l - putTMVar l (Just locker) +dropOutputLock = withLock $ void . takeTMVar -- | Use this around any IO actions that use `outputConcurrent` -- or `createProcessConcurrent` @@ -127,9 +105,17 @@ updateOutputLocker locker = withLock $ \l -> do withConcurrentOutput :: IO a -> IO a withConcurrentOutput a = a `finally` drain where - -- Just taking the output lock is enough to ensure that anything - -- that was buffering output has had a chance to flush its buffer. - drain = lockOutput noop + -- Wait for all outputThreads to finish. Then, take the output lock + -- to ensure that nothing is currently generating output, and flush + -- any buffered output. + drain = do + v <- outputThreads <$> getOutputHandle + atomically $ do + r <- takeTMVar v + if r == S.empty + then return () + else retry + lockOutput $ return () -- | Displays a string to stdout, and flush output so it's displayed. -- @@ -140,10 +126,19 @@ withConcurrentOutput a = a `finally` drain -- not block. It buffers the string, so it will be displayed once the other -- writer is done. outputConcurrent :: String -> IO () -outputConcurrent s = do - putStr s - hFlush stdout - -- TODO +outputConcurrent s = bracket setup cleanup go + where + setup = tryTakeOutputLock + cleanup False = return () + cleanup True = dropOutputLock + go True = do + putStr s + hFlush stdout + go False = do + bv <- outputBuffer <$> getOutputHandle + oldbuf <- atomically $ takeTMVar bv + newbuf <- addBuffer (Output (B.pack (decodeW8NUL s))) oldbuf + atomically $ putTMVar bv newbuf -- | This must be used to wait for processes started with -- `createProcessConcurrent`. @@ -191,10 +186,10 @@ createProcessConcurrent p firstprocess = do r@(_, _, _, h) <- P.createProcess p `onException` dropOutputLock - updateOutputLocker (ProcessLock h) - -- Output lock is still held as we return; the process - -- is running now, and once it exits the output lock will - -- be stale and can then be taken by something else. + -- Wait for the process to exit and drop the lock. + void $ async $ do + void $ tryIO $ waitForProcessConcurrent h + dropOutputLock return r concurrentprocess = do @@ -218,6 +213,7 @@ willOutput :: P.StdStream -> Bool willOutput P.Inherit = True willOutput _ = False +-- Built up with newest seen output first. type Buffer = [BufferedActivity] data BufferedActivity @@ -257,27 +253,39 @@ outputDrainer ss fromh buf bufsig putTMVar bufsig () -- Wait to lock output, and once we can, display everything --- that's put into the buffers. +-- that's put into the buffers, until the end. bufferWriter :: [(Handle, MVar Buffer, TMVar ())] -> IO () -bufferWriter = void . lockOutput . mapConcurrently go +bufferWriter l = do + worker <- async $ void $ lockOutput $ mapConcurrently go l + v <- outputThreads <$> getOutputHandle + atomically $ do + s <- takeTMVar v + putTMVar v (S.insert worker s) + void $ async $ do + void $ waitCatch worker + atomically $ do + s <- takeTMVar v + putTMVar v (S.delete worker s) where go v@(outh, buf, bufsig) = do - atomically $ takeTMVar bufsig + void $ atomically $ takeTMVar bufsig l <- takeMVar buf putMVar buf [] - forM_ (reverse l) $ \ba -> case ba of - Output b -> do - B.hPut outh b - hFlush outh - return () - InTempFile tmp -> do - B.hPut outh =<< B.readFile tmp - void $ tryWhenExists $ removeFile tmp - ReachedEnd -> return () + emitBuffer outh l if any (== ReachedEnd) l then return () else go v +emitBuffer :: Handle -> Buffer -> IO () +emitBuffer outh l = forM_ (reverse l) $ \ba -> case ba of + Output b -> do + B.hPut outh b + hFlush outh + InTempFile tmp -> do + B.hPut outh =<< B.readFile tmp + void $ tryWhenExists $ removeFile tmp + ReachedEnd -> return () + -- Adds a value to the Buffer. When adding Output to a Handle, it's cheaper -- to combine it with any already buffered Output to that same Handle. -- -- cgit v1.2.3 From c85ca96d70f328fb799019a604b7ba82daa0aa33 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 19:39:51 -0400 Subject: remove externals --- src/Utility/ConcurrentOutput.hs | 2 -- 1 file changed, 2 deletions(-) (limited to 'src/Utility') diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index a8de8ed1..31871977 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -4,8 +4,6 @@ -- | Concurrent output handling. module Utility.ConcurrentOutput ( - takeOutputLock, - dropOutputLock, withConcurrentOutput, outputConcurrent, createProcessConcurrent, -- cgit v1.2.3 From d44f0b46d78060d36e8171b7278b63b6821a9889 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 19:43:34 -0400 Subject: export lockOutput --- src/Utility/ConcurrentOutput.hs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) (limited to 'src/Utility') diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index 31871977..db0bae0a 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -8,6 +8,7 @@ module Utility.ConcurrentOutput ( outputConcurrent, createProcessConcurrent, waitForProcessConcurrent, + lockOutput, ) where import System.IO @@ -53,8 +54,11 @@ globalOutputHandle = unsafePerformIO $ getOutputHandle :: IO OutputHandle getOutputHandle = readMVar globalOutputHandle --- | Holds a lock while performing an action. Any other threads --- that try to lockOutput at the same time will block. +-- | Holds a lock while performing an action that will display output. +-- While this is running, other threads that try to lockOutput will block, +-- and calls to `outputConcurrent` and `createProcessConcurrent` +-- will result in that concurrent output being buffered and not +-- displayed until the action is done. lockOutput :: (MonadIO m, MonadMask m) => m a -> m a lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock) @@ -253,8 +257,8 @@ outputDrainer ss fromh buf bufsig -- Wait to lock output, and once we can, display everything -- that's put into the buffers, until the end. bufferWriter :: [(Handle, MVar Buffer, TMVar ())] -> IO () -bufferWriter l = do - worker <- async $ void $ lockOutput $ mapConcurrently go l +bufferWriter ts = do + worker <- async $ void $ lockOutput $ mapConcurrently go ts v <- outputThreads <$> getOutputHandle atomically $ do s <- takeTMVar v -- cgit v1.2.3 From 86a115aaa0c216e4c46e57a324b58177c8b78435 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 20:10:11 -0400 Subject: have to flush concurrent output before printing result when chaining --- src/Propellor/Message.hs | 2 +- src/Propellor/Property/Chroot.hs | 1 + src/Propellor/Property/Docker.hs | 2 ++ src/Utility/ConcurrentOutput.hs | 30 +++++++++++++++++------------- 4 files changed, 21 insertions(+), 14 deletions(-) (limited to 'src/Utility') diff --git a/src/Propellor/Message.hs b/src/Propellor/Message.hs index 7439c362..7df5104a 100644 --- a/src/Propellor/Message.hs +++ b/src/Propellor/Message.hs @@ -135,7 +135,7 @@ processChainOutput h = go Nothing Just l -> case readish l of Just r -> pure r Nothing -> do - outputConcurrent l + outputConcurrent (l ++ "\n") return FailedChange Just s -> do outputConcurrent $ diff --git a/src/Propellor/Property/Chroot.hs b/src/Propellor/Property/Chroot.hs index 8b923aab..e72d1bd9 100644 --- a/src/Propellor/Property/Chroot.hs +++ b/src/Propellor/Property/Chroot.hs @@ -213,6 +213,7 @@ chain hostlist (ChrootChain hn loc systemdonly onconsole) = then [Systemd.installed] else map ignoreInfo $ hostProperties h + flushConcurrentOutput putStrLn $ "\n" ++ show r chain _ _ = errorMessage "bad chain command" diff --git a/src/Propellor/Property/Docker.hs b/src/Propellor/Property/Docker.hs index 5f41209a..9082460f 100644 --- a/src/Propellor/Property/Docker.hs +++ b/src/Propellor/Property/Docker.hs @@ -540,6 +540,7 @@ init s = case toContainerId s of warningMessage "Boot provision failed!" void $ async $ job reapzombies job $ do + flushConcurrentOutput void $ tryIO $ ifM (inPath "bash") ( boolSystem "bash" [Param "-l"] , boolSystem "/bin/sh" [] @@ -583,6 +584,7 @@ chain hostlist hn s = case toContainerId s of r <- runPropellor h $ ensureProperties $ map ignoreInfo $ hostProperties h + flushConcurrentOutput putStrLn $ "\n" ++ show r stopContainer :: ContainerId -> IO Bool diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index db0bae0a..3f28068a 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -5,6 +5,7 @@ module Utility.ConcurrentOutput ( withConcurrentOutput, + flushConcurrentOutput, outputConcurrent, createProcessConcurrent, waitForProcessConcurrent, @@ -105,19 +106,22 @@ dropOutputLock = withLock $ void . takeTMVar -- This is necessary to ensure that buffered concurrent output actually -- gets displayed before the program exits. withConcurrentOutput :: IO a -> IO a -withConcurrentOutput a = a `finally` drain - where - -- Wait for all outputThreads to finish. Then, take the output lock - -- to ensure that nothing is currently generating output, and flush - -- any buffered output. - drain = do - v <- outputThreads <$> getOutputHandle - atomically $ do - r <- takeTMVar v - if r == S.empty - then return () - else retry - lockOutput $ return () +withConcurrentOutput a = a `finally` flushConcurrentOutput + +-- | Blocks until any processes started by `createProcessConcurrent` have +-- finished, and any buffered output is displayed. +flushConcurrentOutput :: IO () +flushConcurrentOutput = do + -- Wait for all outputThreads to finish. + v <- outputThreads <$> getOutputHandle + atomically $ do + r <- takeTMVar v + if r == S.empty + then return () + else retry + -- Take output lock to ensure that nothing else is currently + -- generating output, and flush any buffered output. + lockOutput $ return () -- | Displays a string to stdout, and flush output so it's displayed. -- -- cgit v1.2.3 From 6de8582fbae52a389c0d391f768284d6434b1467 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 20:17:15 -0400 Subject: propellor spin --- src/Utility/ConcurrentOutput.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/Utility') diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index 3f28068a..20c60ba8 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -117,7 +117,7 @@ flushConcurrentOutput = do atomically $ do r <- takeTMVar v if r == S.empty - then return () + then putTMVar v r else retry -- Take output lock to ensure that nothing else is currently -- generating output, and flush any buffered output. -- cgit v1.2.3 From ceee9305dce89a9529b316db6d6a5eabe1ad8adb Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 21:34:09 -0400 Subject: example --- src/Utility/ConcurrentOutput.hs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) (limited to 'src/Utility') diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index 20c60ba8..94cd4202 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -1,7 +1,21 @@ {-# LANGUAGE BangPatterns #-} {-# OPTIONS_GHC -fno-warn-tabs #-} --- | Concurrent output handling. +-- | +-- Copyright: 2013 Joey Hess +-- License: BSD-2-clause +-- +-- Concurrent output handling. +-- +-- > import Control.Concurrent.Async +-- > import Control.Concurrent.Output +-- > +-- > main = withConcurrentOutput $ +-- > outputConcurrent "washed the car\n" +-- > `concurrently` +-- > outputConcurrent "walked the dog\n" +-- > `concurrently` +-- > createProcessConcurrent (proc "ls" []) module Utility.ConcurrentOutput ( withConcurrentOutput, -- cgit v1.2.3 From 39fa051833de3178639974fa4fc7c803c5918f0e Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Thu, 29 Oct 2015 00:38:53 -0400 Subject: generalize what can be output This adds a dependency on Text, but I don't mind propellor depending on it and am somewhat surprised it doesn't already. Using Text also lets this use encodeUtf8 instead of the nasty hack it was using to go from String -> ByteString. --- debian/changelog | 2 +- debian/control | 2 ++ propellor.cabal | 6 +++--- src/Propellor/Bootstrap.hs | 1 + src/Utility/ConcurrentOutput.hs | 35 +++++++++++++++++++++++++---------- 5 files changed, 32 insertions(+), 14 deletions(-) (limited to 'src/Utility') diff --git a/debian/changelog b/debian/changelog index c5538c7f..6f75bce9 100644 --- a/debian/changelog +++ b/debian/changelog @@ -20,7 +20,7 @@ propellor (2.13.0) UNRELEASED; urgency=medium * Added Propellor.Property.Concurrent for concurrent properties. * Made the execProcess exported by propellor, and everything built on it, avoid scrambled output when run concurrently. - * Propellor now depends on STM. + * Propellor now depends on STM and text. * The cabal file now builds propellor with -O. While -O0 makes ghc take less memory while building propellor, it can lead to bad memory usage at runtime due to eg, disabled stream fusion. diff --git a/debian/control b/debian/control index 2956fdaa..97fb3e6d 100644 --- a/debian/control +++ b/debian/control @@ -18,6 +18,7 @@ Build-Depends: libghc-transformers-dev, libghc-exceptions-dev (>= 0.6), libghc-stm-dev, + libghc-text-dev, Maintainer: Gergely Nagy Standards-Version: 3.9.6 Vcs-Git: git://git.joeyh.name/propellor @@ -41,6 +42,7 @@ Depends: ${misc:Depends}, ${shlibs:Depends}, libghc-transformers-dev, libghc-exceptions-dev (>= 0.6), libghc-stm-dev, + libghc-text-dev, git, make, Description: property-based host configuration management in haskell diff --git a/propellor.cabal b/propellor.cabal index a07109a7..6e871d6b 100644 --- a/propellor.cabal +++ b/propellor.cabal @@ -39,7 +39,7 @@ Executable propellor Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5, IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal, containers (>= 0.5), network, async, time, QuickCheck, mtl, transformers, - exceptions (>= 0.6), stm + exceptions (>= 0.6), stm, text if (! os(windows)) Build-Depends: unix @@ -51,7 +51,7 @@ Executable propellor-config Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5, IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal, containers (>= 0.5), network, async, time, QuickCheck, mtl, transformers, - exceptions, stm + exceptions, stm, text if (! os(windows)) Build-Depends: unix @@ -62,7 +62,7 @@ Library Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5, IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal, containers (>= 0.5), network, async, time, QuickCheck, mtl, transformers, - exceptions, stm + exceptions, stm, text if (! os(windows)) Build-Depends: unix diff --git a/src/Propellor/Bootstrap.hs b/src/Propellor/Bootstrap.hs index 2318b910..21772b34 100644 --- a/src/Propellor/Bootstrap.hs +++ b/src/Propellor/Bootstrap.hs @@ -82,6 +82,7 @@ depsCommand = "( " ++ intercalate " ; " (concat [osinstall, cabalinstall]) ++ " , "libghc-transformers-dev" , "libghc-exceptions-dev" , "libghc-stm-dev" + , "libghc-text-dev" , "make" ] diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index 94cd4202..c24744a3 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -1,4 +1,4 @@ -{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances #-} {-# OPTIONS_GHC -fno-warn-tabs #-} -- | @@ -20,6 +20,7 @@ module Utility.ConcurrentOutput ( withConcurrentOutput, flushConcurrentOutput, + Outputable(..), outputConcurrent, createProcessConcurrent, waitForProcessConcurrent, @@ -40,13 +41,14 @@ import Control.Concurrent.Async import Data.Maybe import Data.List import Data.Monoid -import qualified Data.ByteString as B import qualified System.Process as P import qualified Data.Set as S +import qualified Data.ByteString as B +import qualified Data.Text as T +import Data.Text.Encoding (encodeUtf8) import Utility.Monad import Utility.Exception -import Utility.FileSystemEncoding data OutputHandle = OutputHandle { outputLock :: TMVar Lock @@ -137,27 +139,40 @@ flushConcurrentOutput = do -- generating output, and flush any buffered output. lockOutput $ return () --- | Displays a string to stdout, and flush output so it's displayed. +-- | Values that can be output. +class Outputable v where + toOutput :: v -> B.ByteString + +instance Outputable B.ByteString where + toOutput = id + +instance Outputable T.Text where + toOutput = encodeUtf8 + +instance Outputable String where + toOutput = toOutput . T.pack + +-- | Displays a value to stdout, and flush output so it's displayed. -- --- Uses locking to ensure that the whole string is output atomically +-- Uses locking to ensure that the whole output occurs atomically -- even when other threads are concurrently generating output. -- -- When something else is writing to the console at the same time, this does --- not block. It buffers the string, so it will be displayed once the other +-- not block. It buffers the value, so it will be displayed once the other -- writer is done. -outputConcurrent :: String -> IO () -outputConcurrent s = bracket setup cleanup go +outputConcurrent :: Outputable v => v -> IO () +outputConcurrent v = bracket setup cleanup go where setup = tryTakeOutputLock cleanup False = return () cleanup True = dropOutputLock go True = do - putStr s + B.hPut stdout (toOutput v) hFlush stdout go False = do bv <- outputBuffer <$> getOutputHandle oldbuf <- atomically $ takeTMVar bv - newbuf <- addBuffer (Output (B.pack (decodeW8NUL s))) oldbuf + newbuf <- addBuffer (Output (toOutput v)) oldbuf atomically $ putTMVar bv newbuf -- | This must be used to wait for processes started with -- cgit v1.2.3