From af68ec950b2480749182d0d6838e96fd02c2c285 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 10:37:19 -0400 Subject: split out generic ConcurrentOutput module to Utility --- src/Propellor/Message.hs | 204 +----------------------------------- src/Utility/ConcurrentOutput.hs | 224 ++++++++++++++++++++++++++++++++++++++++ src/Utility/Process/Shim.hs | 2 +- 3 files changed, 228 insertions(+), 202 deletions(-) create mode 100644 src/Utility/ConcurrentOutput.hs (limited to 'src') diff --git a/src/Propellor/Message.hs b/src/Propellor/Message.hs index 3792129b..3b06770c 100644 --- a/src/Propellor/Message.hs +++ b/src/Propellor/Message.hs @@ -1,5 +1,3 @@ -{-# LANGUAGE PackageImports #-} - -- | This module handles all display of output to the console when -- propellor is ensuring Properties. -- @@ -22,117 +20,34 @@ module Propellor.Message ( import System.Console.ANSI import System.IO -import System.Posix.IO -import "mtl" Control.Monad.Reader +import Control.Monad +import Control.Monad.IO.Class (liftIO, MonadIO) import Control.Applicative import Control.Monad.IfElse import System.IO.Unsafe (unsafePerformIO) import Control.Concurrent -import Control.Concurrent.Async -import Data.Maybe -import Data.Char -import Data.List -import Data.Monoid -import qualified Data.ByteString as B -import qualified System.Process as P import Propellor.Types +import Utility.ConcurrentOutput import Utility.PartialPrelude import Utility.Monad import Utility.Exception data MessageHandle = MessageHandle { isConsole :: Bool - , outputLock :: MVar () -- ^ empty when locked - , outputLockedBy :: MVar Locker } -data Locker - = GeneralLock - | ProcessLock P.ProcessHandle - -- | A shared global variable for the MessageHandle. {-# NOINLINE globalMessageHandle #-} globalMessageHandle :: MVar MessageHandle globalMessageHandle = unsafePerformIO $ newMVar =<< MessageHandle <$> hIsTerminalDevice stdout - <*> newMVar () - <*> newEmptyMVar -- | Gets the global MessageHandle. getMessageHandle :: IO MessageHandle getMessageHandle = readMVar globalMessageHandle --- | Takes a lock while performing an action. Any other threads --- that try to lockOutput at the same time will block. -lockOutput :: (MonadIO m, MonadMask m) => m a -> m a -lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock) - --- | Blocks until we have the output lock. -takeOutputLock :: IO () -takeOutputLock = void $ takeOutputLock' True - --- | Tries to take the output lock, without blocking. -tryTakeOutputLock :: IO Bool -tryTakeOutputLock = takeOutputLock' False - -takeOutputLock' :: Bool -> IO Bool -takeOutputLock' block = do - lck <- outputLock <$> getMessageHandle - go =<< tryTakeMVar lck - where - -- lck was full, and we've emptied it, so we hold the lock now. - go (Just ()) = havelock - -- lck is empty, so someone else is holding the lock. - go Nothing = do - lcker <- outputLockedBy <$> getMessageHandle - v' <- tryTakeMVar lcker - case v' of - Just (ProcessLock h) -> - -- if process has exited, lock is stale - ifM (isJust <$> P.getProcessExitCode h) - ( havelock - , if block - then do - void $ P.waitForProcess h - havelock - else do - putMVar lcker (ProcessLock h) - return False - ) - Just GeneralLock -> do - putMVar lcker GeneralLock - whenblock waitlock - Nothing -> whenblock waitlock - - havelock = do - updateOutputLocker GeneralLock - return True - waitlock = do - -- Wait for current lock holder to relinquish - -- it and take the lock. - lck <- outputLock <$> getMessageHandle - takeMVar lck - havelock - whenblock a = if block then a else return False - --- | Only safe to call after taking the output lock. -dropOutputLock :: IO () -dropOutputLock = do - lcker <- outputLockedBy <$> getMessageHandle - lck <- outputLock <$> getMessageHandle - void $ takeMVar lcker - putMVar lck () - --- | Only safe to call after takeOutputLock; updates the Locker. -updateOutputLocker :: Locker -> IO () -updateOutputLocker l = do - lcker <- outputLockedBy <$> getMessageHandle - void $ tryTakeMVar lcker - putMVar lcker l - modifyMVar_ lcker (const $ return l) - -- | Force console output. This can be used when stdout is not directly -- connected to a console, but is eventually going to be displayed at a -- console. @@ -237,116 +152,3 @@ messagesDone = lockOutput $ do whenConsole $ setTitle "propellor: done" hFlush stdout - --- | Wrapper around `System.Process.createProcess` that prevents --- multiple processes that are running concurrently from writing --- to stdout/stderr at the same time. --- --- The first process is allowed to write to --- stdout and stderr in the usual way. --- --- However, if another process runs concurrently with the --- first, any stdout or stderr that would have been displayed by it is --- instead buffered. The buffered output will be displayed the next time it --- is safe to do so (ie, after the first process exits). --- --- Also does debug logging of all commands run. --- --- Unless you manually import System.Process, every part of propellor --- that runs a process uses this. -createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -createProcessConcurrent p - | hasoutput (P.std_out p) || hasoutput (P.std_err p) = - ifM tryTakeOutputLock - ( firstprocess - , concurrentprocess - ) - | otherwise = P.createProcess p - where - hasoutput P.Inherit = True - hasoutput _ = False - - firstprocess = do - r@(_, _, _, h) <- P.createProcess p - `onException` dropOutputLock - updateOutputLocker (ProcessLock h) - -- Output lock is still held as we return; the process - -- is running now, and once it exits the output lock will - -- be stale and can then be taken by something else. - return r - - concurrentprocess = do - (toouth, fromouth) <- pipe - (toerrh, fromerrh) <- pipe - let p' = p - { P.std_out = if hasoutput (P.std_out p) - then P.UseHandle toouth - else P.std_out p - , P.std_err = if hasoutput (P.std_err p) - then P.UseHandle toerrh - else P.std_err p - } - r <- P.createProcess p' - hClose toouth - hClose toerrh - buf <- newMVar [] - void $ async $ outputDrainer fromouth stdout buf - void $ async $ outputDrainer fromerrh stderr buf - void $ async $ bufferWriter buf - return r - - pipe = do - (from, to) <- createPipe - (,) <$> fdToHandle to <*> fdToHandle from - -type Buffer = [(Handle, Maybe B.ByteString)] - --- Drain output from the handle, and buffer it in memory. -outputDrainer :: Handle -> Handle -> MVar Buffer -> IO () -outputDrainer fromh toh buf = do - v <- tryIO $ B.hGetSome fromh 1024 - case v of - Right b | not (B.null b) -> do - modifyMVar_ buf (pure . addBuffer (toh, Just b)) - outputDrainer fromh toh buf - _ -> do - modifyMVar_ buf (pure . (++ [(toh, Nothing)])) - hClose fromh - --- Wait to lock output, and once we can, display everything --- that's put into buffer, until the end is signaled by Nothing --- for both stdout and stderr. -bufferWriter :: MVar Buffer -> IO () -bufferWriter buf = lockOutput (go [stdout, stderr]) - where - go [] = return () - go hs = do - l <- takeMVar buf - forM_ l $ \(h, mb) -> do - maybe noop (B.hPut h) mb - hFlush h - let hs' = filter (\h -> not (any (== (h, Nothing)) l)) hs - putMVar buf [] - go hs' - --- The buffer can grow up to 1 mb in size, but after that point, --- it's truncated to avoid propellor using unbounded memory --- when a process outputs a whole lot of stuff. -bufsz :: Int -bufsz = 1000000 - -addBuffer :: (Handle, Maybe B.ByteString) -> Buffer -> Buffer -addBuffer v@(_, Nothing) buf = buf ++ [v] -addBuffer (toh, Just b) buf = (toh, Just b') : other - where - (this, other) = partition (\v -> fst v == toh && isJust (snd v)) buf - b' = truncateBuffer $ B.concat (mapMaybe snd this) <> b - --- Truncate a buffer by removing lines from the front until it's --- small enough. -truncateBuffer :: B.ByteString -> B.ByteString -truncateBuffer b - | B.length b <= bufsz = b - | otherwise = truncateBuffer $ snd $ B.breakByte nl b - where - nl = fromIntegral (ord '\n') diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs new file mode 100644 index 00000000..cf1d166e --- /dev/null +++ b/src/Utility/ConcurrentOutput.hs @@ -0,0 +1,224 @@ +-- | Concurrent output handling. +-- +-- When two threads both try to display a message concurrently, +-- the messages will be displayed sequentially. + +module Utility.ConcurrentOutput ( + lockOutput, + createProcessConcurrent, +) where + +import System.IO +import System.Posix.IO +import Control.Monad +import Control.Monad.IO.Class (liftIO, MonadIO) +import Control.Applicative +import System.IO.Unsafe (unsafePerformIO) +import Control.Concurrent +import Control.Concurrent.Async +import Data.Maybe +import Data.Char +import Data.List +import Data.Monoid +import qualified Data.ByteString as B +import qualified System.Process as P + +import Utility.Monad +import Utility.Exception + +data OutputHandle = OutputHandle + { outputLock :: MVar () -- ^ empty when locked + , outputLockedBy :: MVar Locker + } + +data Locker + = GeneralLock + | ProcessLock P.ProcessHandle + +-- | A shared global variable for the OutputHandle. +{-# NOINLINE globalOutputHandle #-} +globalOutputHandle :: MVar OutputHandle +globalOutputHandle = unsafePerformIO $ + newMVar =<< OutputHandle + <$> newMVar () + <*> newEmptyMVar + +-- | Gets the global OutputHandle. +getOutputHandle :: IO OutputHandle +getOutputHandle = readMVar globalOutputHandle + +-- | Holds a lock while performing an action. Any other threads +-- that try to lockOutput at the same time will block. +lockOutput :: (MonadIO m, MonadMask m) => m a -> m a +lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock) + +-- | Blocks until we have the output lock. +takeOutputLock :: IO () +takeOutputLock = void $ takeOutputLock' True + +-- | Tries to take the output lock, without blocking. +tryTakeOutputLock :: IO Bool +tryTakeOutputLock = takeOutputLock' False + +takeOutputLock' :: Bool -> IO Bool +takeOutputLock' block = do + lck <- outputLock <$> getOutputHandle + go =<< tryTakeMVar lck + where + -- lck was full, and we've emptied it, so we hold the lock now. + go (Just ()) = havelock + -- lck is empty, so someone else is holding the lock. + go Nothing = do + lcker <- outputLockedBy <$> getOutputHandle + v' <- tryTakeMVar lcker + case v' of + Just (ProcessLock h) -> + -- if process has exited, lock is stale + ifM (isJust <$> P.getProcessExitCode h) + ( havelock + , if block + then do + void $ P.waitForProcess h + havelock + else do + putMVar lcker (ProcessLock h) + return False + ) + Just GeneralLock -> do + putMVar lcker GeneralLock + whenblock waitlock + Nothing -> whenblock waitlock + + havelock = do + updateOutputLocker GeneralLock + return True + waitlock = do + -- Wait for current lock holder to relinquish + -- it and take the lock. + lck <- outputLock <$> getOutputHandle + takeMVar lck + havelock + whenblock a = if block then a else return False + +-- | Only safe to call after taking the output lock. +dropOutputLock :: IO () +dropOutputLock = do + lcker <- outputLockedBy <$> getOutputHandle + lck <- outputLock <$> getOutputHandle + void $ takeMVar lcker + putMVar lck () + +-- | Only safe to call after takeOutputLock; updates the Locker. +updateOutputLocker :: Locker -> IO () +updateOutputLocker l = do + lcker <- outputLockedBy <$> getOutputHandle + void $ tryTakeMVar lcker + putMVar lcker l + modifyMVar_ lcker (const $ return l) + +-- | Wrapper around `System.Process.createProcess` that prevents +-- multiple processes that are running concurrently from writing +-- to stdout/stderr at the same time. +-- +-- The first process is allowed to write to stdout and stderr in the usual way. +-- +-- However, if another process runs concurrently with the +-- first, any stdout or stderr that would have been displayed by it is +-- instead buffered. The buffered output will be displayed the next time it +-- is safe to do so (ie, after the first process exits). +createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) +createProcessConcurrent p + | hasoutput (P.std_out p) || hasoutput (P.std_err p) = + ifM tryTakeOutputLock + ( firstprocess + , concurrentprocess + ) + | otherwise = P.createProcess p + where + hasoutput P.Inherit = True + hasoutput _ = False + + firstprocess = do + r@(_, _, _, h) <- P.createProcess p + `onException` dropOutputLock + updateOutputLocker (ProcessLock h) + -- Output lock is still held as we return; the process + -- is running now, and once it exits the output lock will + -- be stale and can then be taken by something else. + return r + + concurrentprocess = do + (toouth, fromouth) <- pipe + (toerrh, fromerrh) <- pipe + let p' = p + { P.std_out = if hasoutput (P.std_out p) + then P.UseHandle toouth + else P.std_out p + , P.std_err = if hasoutput (P.std_err p) + then P.UseHandle toerrh + else P.std_err p + } + r <- P.createProcess p' + hClose toouth + hClose toerrh + buf <- newMVar [] + void $ async $ outputDrainer fromouth stdout buf + void $ async $ outputDrainer fromerrh stderr buf + void $ async $ bufferWriter buf + return r + + pipe = do + (from, to) <- createPipe + (,) <$> fdToHandle to <*> fdToHandle from + +type Buffer = [(Handle, Maybe B.ByteString)] + +-- Drain output from the handle, and buffer it in memory. +outputDrainer :: Handle -> Handle -> MVar Buffer -> IO () +outputDrainer fromh toh buf = do + v <- tryIO $ B.hGetSome fromh 1024 + case v of + Right b | not (B.null b) -> do + modifyMVar_ buf (pure . addBuffer (toh, Just b)) + outputDrainer fromh toh buf + _ -> do + modifyMVar_ buf (pure . (++ [(toh, Nothing)])) + hClose fromh + +-- Wait to lock output, and once we can, display everything +-- that's put into buffer, until the end is signaled by Nothing +-- for both stdout and stderr. +bufferWriter :: MVar Buffer -> IO () +bufferWriter buf = lockOutput (go [stdout, stderr]) + where + go [] = return () + go hs = do + l <- takeMVar buf + forM_ l $ \(h, mb) -> do + maybe noop (B.hPut h) mb + hFlush h + let hs' = filter (\h -> not (any (== (h, Nothing)) l)) hs + putMVar buf [] + go hs' + +-- The buffer can grow up to 1 mb in size, but after that point, +-- it's truncated to avoid propellor using unbounded memory +-- when a process outputs a whole lot of stuff. +bufsz :: Int +bufsz = 1000000 + +addBuffer :: (Handle, Maybe B.ByteString) -> Buffer -> Buffer +addBuffer v@(_, Nothing) buf = buf ++ [v] +addBuffer (toh, Just b) buf = (toh, Just b') : other + where + (this, other) = partition (\v -> fst v == toh && isJust (snd v)) buf + b' = truncateBuffer $ B.concat (mapMaybe snd this) <> b + +-- Truncate a buffer by removing lines from the front until it's +-- small enough. +truncateBuffer :: B.ByteString -> B.ByteString +truncateBuffer b + | B.length b <= bufsz = b + | otherwise = truncateBuffer $ snd $ B.breakByte nl b + where + nl = fromIntegral (ord '\n') diff --git a/src/Utility/Process/Shim.hs b/src/Utility/Process/Shim.hs index 0da93bf7..202b7c32 100644 --- a/src/Utility/Process/Shim.hs +++ b/src/Utility/Process/Shim.hs @@ -1,7 +1,7 @@ module Utility.Process.Shim (module X, createProcess) where import System.Process as X hiding (createProcess) -import Propellor.Message (createProcessConcurrent) +import Utility.ConcurrentOutput (createProcessConcurrent) import System.IO createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -- cgit v1.2.3