summaryrefslogtreecommitdiff
path: root/src/Propellor/Message.hs
diff options
context:
space:
mode:
authorJoey Hess2015-10-28 10:37:19 -0400
committerJoey Hess2015-10-28 10:37:19 -0400
commitaf68ec950b2480749182d0d6838e96fd02c2c285 (patch)
tree5f3a81661057021c460f44eaedd23872e5e744da /src/Propellor/Message.hs
parent357ffb9fd34ebd36e07dece8e45450dbd2f0e8ec (diff)
split out generic ConcurrentOutput module to Utility
Diffstat (limited to 'src/Propellor/Message.hs')
-rw-r--r--src/Propellor/Message.hs204
1 files changed, 3 insertions, 201 deletions
diff --git a/src/Propellor/Message.hs b/src/Propellor/Message.hs
index 3792129b..3b06770c 100644
--- a/src/Propellor/Message.hs
+++ b/src/Propellor/Message.hs
@@ -1,5 +1,3 @@
-{-# LANGUAGE PackageImports #-}
-
-- | This module handles all display of output to the console when
-- propellor is ensuring Properties.
--
@@ -22,117 +20,34 @@ module Propellor.Message (
import System.Console.ANSI
import System.IO
-import System.Posix.IO
-import "mtl" Control.Monad.Reader
+import Control.Monad
+import Control.Monad.IO.Class (liftIO, MonadIO)
import Control.Applicative
import Control.Monad.IfElse
import System.IO.Unsafe (unsafePerformIO)
import Control.Concurrent
-import Control.Concurrent.Async
-import Data.Maybe
-import Data.Char
-import Data.List
-import Data.Monoid
-import qualified Data.ByteString as B
-import qualified System.Process as P
import Propellor.Types
+import Utility.ConcurrentOutput
import Utility.PartialPrelude
import Utility.Monad
import Utility.Exception
data MessageHandle = MessageHandle
{ isConsole :: Bool
- , outputLock :: MVar () -- ^ empty when locked
- , outputLockedBy :: MVar Locker
}
-data Locker
- = GeneralLock
- | ProcessLock P.ProcessHandle
-
-- | A shared global variable for the MessageHandle.
{-# NOINLINE globalMessageHandle #-}
globalMessageHandle :: MVar MessageHandle
globalMessageHandle = unsafePerformIO $
newMVar =<< MessageHandle
<$> hIsTerminalDevice stdout
- <*> newMVar ()
- <*> newEmptyMVar
-- | Gets the global MessageHandle.
getMessageHandle :: IO MessageHandle
getMessageHandle = readMVar globalMessageHandle
--- | Takes a lock while performing an action. Any other threads
--- that try to lockOutput at the same time will block.
-lockOutput :: (MonadIO m, MonadMask m) => m a -> m a
-lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock)
-
--- | Blocks until we have the output lock.
-takeOutputLock :: IO ()
-takeOutputLock = void $ takeOutputLock' True
-
--- | Tries to take the output lock, without blocking.
-tryTakeOutputLock :: IO Bool
-tryTakeOutputLock = takeOutputLock' False
-
-takeOutputLock' :: Bool -> IO Bool
-takeOutputLock' block = do
- lck <- outputLock <$> getMessageHandle
- go =<< tryTakeMVar lck
- where
- -- lck was full, and we've emptied it, so we hold the lock now.
- go (Just ()) = havelock
- -- lck is empty, so someone else is holding the lock.
- go Nothing = do
- lcker <- outputLockedBy <$> getMessageHandle
- v' <- tryTakeMVar lcker
- case v' of
- Just (ProcessLock h) ->
- -- if process has exited, lock is stale
- ifM (isJust <$> P.getProcessExitCode h)
- ( havelock
- , if block
- then do
- void $ P.waitForProcess h
- havelock
- else do
- putMVar lcker (ProcessLock h)
- return False
- )
- Just GeneralLock -> do
- putMVar lcker GeneralLock
- whenblock waitlock
- Nothing -> whenblock waitlock
-
- havelock = do
- updateOutputLocker GeneralLock
- return True
- waitlock = do
- -- Wait for current lock holder to relinquish
- -- it and take the lock.
- lck <- outputLock <$> getMessageHandle
- takeMVar lck
- havelock
- whenblock a = if block then a else return False
-
--- | Only safe to call after taking the output lock.
-dropOutputLock :: IO ()
-dropOutputLock = do
- lcker <- outputLockedBy <$> getMessageHandle
- lck <- outputLock <$> getMessageHandle
- void $ takeMVar lcker
- putMVar lck ()
-
--- | Only safe to call after takeOutputLock; updates the Locker.
-updateOutputLocker :: Locker -> IO ()
-updateOutputLocker l = do
- lcker <- outputLockedBy <$> getMessageHandle
- void $ tryTakeMVar lcker
- putMVar lcker l
- modifyMVar_ lcker (const $ return l)
-
-- | Force console output. This can be used when stdout is not directly
-- connected to a console, but is eventually going to be displayed at a
-- console.
@@ -237,116 +152,3 @@ messagesDone = lockOutput $ do
whenConsole $
setTitle "propellor: done"
hFlush stdout
-
--- | Wrapper around `System.Process.createProcess` that prevents
--- multiple processes that are running concurrently from writing
--- to stdout/stderr at the same time.
---
--- The first process is allowed to write to
--- stdout and stderr in the usual way.
---
--- However, if another process runs concurrently with the
--- first, any stdout or stderr that would have been displayed by it is
--- instead buffered. The buffered output will be displayed the next time it
--- is safe to do so (ie, after the first process exits).
---
--- Also does debug logging of all commands run.
---
--- Unless you manually import System.Process, every part of propellor
--- that runs a process uses this.
-createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
-createProcessConcurrent p
- | hasoutput (P.std_out p) || hasoutput (P.std_err p) =
- ifM tryTakeOutputLock
- ( firstprocess
- , concurrentprocess
- )
- | otherwise = P.createProcess p
- where
- hasoutput P.Inherit = True
- hasoutput _ = False
-
- firstprocess = do
- r@(_, _, _, h) <- P.createProcess p
- `onException` dropOutputLock
- updateOutputLocker (ProcessLock h)
- -- Output lock is still held as we return; the process
- -- is running now, and once it exits the output lock will
- -- be stale and can then be taken by something else.
- return r
-
- concurrentprocess = do
- (toouth, fromouth) <- pipe
- (toerrh, fromerrh) <- pipe
- let p' = p
- { P.std_out = if hasoutput (P.std_out p)
- then P.UseHandle toouth
- else P.std_out p
- , P.std_err = if hasoutput (P.std_err p)
- then P.UseHandle toerrh
- else P.std_err p
- }
- r <- P.createProcess p'
- hClose toouth
- hClose toerrh
- buf <- newMVar []
- void $ async $ outputDrainer fromouth stdout buf
- void $ async $ outputDrainer fromerrh stderr buf
- void $ async $ bufferWriter buf
- return r
-
- pipe = do
- (from, to) <- createPipe
- (,) <$> fdToHandle to <*> fdToHandle from
-
-type Buffer = [(Handle, Maybe B.ByteString)]
-
--- Drain output from the handle, and buffer it in memory.
-outputDrainer :: Handle -> Handle -> MVar Buffer -> IO ()
-outputDrainer fromh toh buf = do
- v <- tryIO $ B.hGetSome fromh 1024
- case v of
- Right b | not (B.null b) -> do
- modifyMVar_ buf (pure . addBuffer (toh, Just b))
- outputDrainer fromh toh buf
- _ -> do
- modifyMVar_ buf (pure . (++ [(toh, Nothing)]))
- hClose fromh
-
--- Wait to lock output, and once we can, display everything
--- that's put into buffer, until the end is signaled by Nothing
--- for both stdout and stderr.
-bufferWriter :: MVar Buffer -> IO ()
-bufferWriter buf = lockOutput (go [stdout, stderr])
- where
- go [] = return ()
- go hs = do
- l <- takeMVar buf
- forM_ l $ \(h, mb) -> do
- maybe noop (B.hPut h) mb
- hFlush h
- let hs' = filter (\h -> not (any (== (h, Nothing)) l)) hs
- putMVar buf []
- go hs'
-
--- The buffer can grow up to 1 mb in size, but after that point,
--- it's truncated to avoid propellor using unbounded memory
--- when a process outputs a whole lot of stuff.
-bufsz :: Int
-bufsz = 1000000
-
-addBuffer :: (Handle, Maybe B.ByteString) -> Buffer -> Buffer
-addBuffer v@(_, Nothing) buf = buf ++ [v]
-addBuffer (toh, Just b) buf = (toh, Just b') : other
- where
- (this, other) = partition (\v -> fst v == toh && isJust (snd v)) buf
- b' = truncateBuffer $ B.concat (mapMaybe snd this) <> b
-
--- Truncate a buffer by removing lines from the front until it's
--- small enough.
-truncateBuffer :: B.ByteString -> B.ByteString
-truncateBuffer b
- | B.length b <= bufsz = b
- | otherwise = truncateBuffer $ snd $ B.breakByte nl b
- where
- nl = fromIntegral (ord '\n')