Here we give a monitoring server that can aggregate events from multiple processes. This example demonstrates simple inter-process communication using sockets, multi-threaded applications, a daemon that responds to an interrupt signal, and calculating the success-to-failure ratio over a sliding window of event data to assess the health of a service.
This program uses two language extensions: numeric underscores and type applications.
{-# LANGUAGE NumericUnderscores, TypeApplications #-}
We will use sockets for inter-process communication. This comes from a library called network, although in this case we will only be using sockets to communicate between processes on the same machine.
import qualified Network.Socket as S
import Network.Socket.ByteString (recv, sendAll)
Whenever we open a socket, we always want to close it. This means we need to catch any I/O exception that may arise while using the socket. This import comes from the safe-exceptions library.
import Control.Exception.Safe
We send and receive byte strings over a socket.
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8
Modules with “system” in the name generally deal with topics related to a process’s interaction with the operating system, such as how a process reads its environment variables, terminates itself, receives interrupts, or forks other processes. Here we import from the base and signal libraries.
import System.Environment
import System.Exit
import System.IO
import System.Signal
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
We use a Seq to hold the rolling window of events, and rational numbers for the arithmetic regarding what percentage of failures constitutes an emergency.
import qualified Data.Sequence as Seq
import Data.Ratio ((%))
import Control.Monad (forever, when)
import Data.Foldable (asum, for_, find)
import Data.Maybe (mapMaybe)
The scenario – Imagine we have many services that are performing some task repeatedly. This task, whatever it is, may fail sometimes; the success rate isn’t expected to be 100%. But if it gets really bad – say, if over half of the attempts are failing – then we’ll assume that indicates a serious problem that requires attention.
An EventReport
represents the result of a single action. Each time a service finishes attempting the task, it will send an event report to a central monitoring server that aggregates all the reports.
data EventReport = Success | Failure
deriving Eq
From the reports it receives, the monitoring server continually determines the current SystemStatus
: an overview of whether, in general, actions tend to be succeeding or failing.
data SystemStatus = Okay | Alarm
deriving Eq
Two programs in one – Because this example is meant to demonstrate inter-process communication, it is actually two programs; each process will play a different role that depends on what command-line argument it is given.
- The aggregate-reports command runs a monitoring server.
- The send-demo-reports command sends some fixed data to the monitoring server so we can observe how it reacts.
die is a convenient function which prints an error message and terminates the program. We use it here to handle the situation where the command-line arguments are not one of these three cases.
=
main do
LineBuffering
hSetBuffering stdout
<- getArgs
args
case args of
"aggregate-reports"] -> aggregateReportsMain -- 1
["send-demo-reports"] -> sendDemoReportsMain -- 2
[-> die "Invalid args" _
aggregate-reports – The monitoring server does three things:
- listen for event reports;
- inspect those reports to determine the system state;
- send notifications of system state changes
We use queues to pass messages between those three threads.
- The
receiveReports
thread writes toreportQueue
, and theanalyzeReports
thread reads from it. - The
analyzeReports
thread writes toalarmQueue
, and thesendAlarms
thread reads from it.
foldr1 race_ runs a list of actions concurrently. When any of these four threads stops – either by throwing an exception or by reaching its natural conclusion – then the other three will be stopped and the entire program will end.
=
aggregateReportsMain $ \serverSocket ->
withServerSocket do
putStrLn "The monitoring server has started."
<- atomically newTQueue
reportQueue <- atomically newTQueue
alarmQueue
foldr1 race_
-- 1
[ receiveReports serverSocket reportQueue -- 2
, analyzeReports reportQueue alarmQueue -- 3
, sendAlarms alarmQueue
, waitForTerminationSignal
]
putStrLn "The monitoring server is stopping."
Termination – The three threads discussed above all run indefinitely. But all programs need some way to be stopped. In most terminal emulators, ctrl+C sends a termination signal to the foreground process. waitForTerminationSignal
is an I/O action that blocks until the process receives a termination signal. Since we included this in the race above, the ending of this thread will result in aborting the other three threads, and then the monitoring server will come to a halt as requested.
=
waitForTerminationSignal do
<- atomically (newTVar False)
terminate $ \_signal ->
installHandler sigTERM True)
atomically (writeTVar terminate >>= check) atomically (readTVar terminate
Message format – Clients will send event reports to the server. But how? We need to invent some system for encoding revent reports as byte strings. We’ll keep it very simple: failure events will be represented as the ASCII chacter 0
, and success events will be the character 1
.
=
encodeReport r case r of
Failure -> '0'
Success -> '1'
Here we implement the reverse of encodeReport
using find. See also inverting functions for an alternative approach.
=
decodeReport c -> encodeReport r == c) [Failure, Success] find (\r
Receiving event reports – Sockets are for networking, but here we’ll be using sockets just to communicate between processes on the same machine. For the socket address, we’ll use an abstract Unix domain socket.
= S.SockAddrUnix "\0haskell-phrasebook/monitoring"
serverAddress
= S.socket S.AF_UNIX S.Stream S.defaultProtocol openSocket
Whenever we open a socket, we should close it. Use bracket to ensure that this always happens.
Before we can start accepting connections from processes that will send us event reports, first we have to:
=
withServerSocket action $ \serverSocket ->
bracket openSocket S.close do
-- 1
S.bind serverSocket serverAddress -- 2
S.listen serverSocket S.maxListenQueue action serverSocket
Once we have our server socket set up, we use forever to enter an infinite loop.
accept awaits a new incoming connection.
This combination of mask and forkFinally gives us an effect similar to that of bracket
; the difference is that the action that we take upon receiving a client connection takes place in a new thread.
=
receiveReports serverSocket reportQueue $
forever $ \unmask ->
mask do
<- S.accept serverSocket
(clientSocket, _clientAddr)
forkFinally
(unmask (receiveReports' clientSocket reportQueue))-> S.close clientSocket) (\_
receiveReports'
is a loop that continually receives report data from the client socket and writes it into the report queue.
= continue
receiveReports' clientSocket reportQueue where
=
continue do
<- recv clientSocket 1024
receivedBytes
case BS.length receivedBytes of
When recv returns an empty byte string, that indicates that the sender has closed the socket, and this loop comes to an end.
0 -> return ()
->
_ do
receiveReports'' receivedBytes reportQueue continue
Normally a parsing library would be involved at this point, but our protocol for this example is extremely minimal: each byte of input represents a single event report.
writeTQueue adds an event report to the queue, which will then be picked up by the analyzeReports
thread.
=
receiveReports'' receivedBytes reportQueue @[] (Data.ByteString.Char8.unpack receivedBytes) $ \c ->
for_ @Maybe (decodeReport c) $ \r ->
for_ atomically (writeTQueue reportQueue r)
Analysis – The analyzeReports
thread will keep a list of the most recent ten reports.
- If 80% or more are successes, we declare the system is in good working order.
- If 50% or fewer are successes, we will determine that the system is failing.
- If the success rate is between 50 and 80 percent, or if we do not yet have ten reports, we refrain from making a determination.
The (%) operator constructs a fractional number represented as a ratio, which is not subject to the sort of rounding error that affects types like Float.
= 10
reportWindowSize = 80 % 100
okayThreshold = 50 % 100
alarmThreshold
analysis reports| Seq.length reports < reportWindowSize = Nothing
| successRate <= alarmThreshold = Just Alarm
| successRate >= okayThreshold = Just Okay
| otherwise = Nothing
where
= Seq.filter (== Success) reports
successes = Seq.length successes % Seq.length reports successRate
This thread runs indefinitely in an infinite loop of the recursively-applied continue
function, whose two parameters compose the changing state of the thread:
status
, initiallyNothing
, is the most recent determination of the system status.reports
, initially an empty sequence, holds the last ten event reports received.
= continue Nothing Seq.empty
analyzeReports reportQueue alarmQueue where
=
continue status reports do
readTQueue removes a report from the queue (or waits patiently if the queue is empty).
<- atomically (readTQueue reportQueue) newReport
We update the collection of recent reports
by adding the new report to the list and then truncating the collection down to ten items, naming the result reports'
.
let reports' = Seq.take reportWindowSize
Seq.<| reports) (newReport
The asum function chooses the first Just
among the list of Maybe
s. The new system status, status'
, is either:
- the result of analyzing the new report data; or
- the previous system status, unchanged.
let status' = asum [analysis reports', status]
If there is a system status determination and it is different from the previous status, then we write to the alarm queue to indicate that the system has undergone a noteworthy change.
@Maybe status' $ \s ->
for_ /= status') $
when (status
atomically (writeTQueue alarmQueue s)
continue status' reports'
Sending alerts – This part of a real system might trigger an email or a text message to whoever is on call. For demonstration purposes, we will merely print a line of text instead.
This is another thread that runs indefinitely using the aptly-named forever function.
That is all of the code for the monitoring server! Next we will write a process to connect to the server and send report data to it.
=
sendAlarms alarmQueue $
forever do
<- atomically (readTQueue alarmQueue)
a case a of
Alarm -> putStrLn "Alarm! System is in a degraded state."
Okay -> putStrLn "System status is normal."
send-demo-reports – This is another multi-thread program, following the same pattern we used in aggregateReportsMain
: first we create a queue for the threads to communicate, then we start the threads.
=
sendDemoReportsMain do
<- atomically newTQueue
reportQueue
foldr1 race_
[ generateReports reportQueue
, sendReports reportQueue ]
We’ll define a fixed schedule of event reports for demonstration purposes. It tells the story of a system that is mostly successful for a while, then has an elevated failure rate, then returns to normal.
= mapMaybe decodeReport
demoReports "1111111111111010011000001000000100011101111110111111"
-- successes -- -- failures -- -- successes --
The generateReport
thread iterates over the list of demo reports, writing each one to the queue. We introduce a tenth of a second (100,000 microsecond) delay between each event to add a little dramatic tension to the demonstration.
=
generateReports reportQueue $ \r ->
for_ demoReports do
atomically (writeTQueue reportQueue r) threadDelay 100_000
The first step of creating the client socket is identical to the way we created the server socket above, using the socket function. Again we use bracket to fastidiously ensure that we always close what we’ve opened. But this time, instead of using bind
and listen
, we connect to the server.
=
withClientSocket action $ \clientSocket ->
bracket openSocket S.close do
S.connect clientSocket serverAddress action clientSocket
=
sendReports reportQueue $ \clientSocket -> withClientSocket
Again we have an infinite loop that reads from a queue.
$
forever do
<- atomically (readTQueue reportQueue) r
For demonstration purposes, we’ll print a message for each event.
putStrLn (case r of Success -> "1 (success)"
Failure -> "0 (failure)")
Each time we get a report from the queue, we convert it to a ByteString
and send it through the socket with sendAll.
sendAll clientSocket (Data.ByteString.Char8.pack [encodeReport r])
First start the report aggregator:
$ runhaskell monitoring.hs aggregate-reports The monitoring server has started.
Then, in a different terminal, run the program that sends reports to the aggregator.
$ runhaskell monitoring.hs send-demo-reports
Back in the first terminal, we see the reports begin to come in.
1 (success)
1 (success)
1 (success)
1 (success)
1 (success)
1 (success) 1 (success)
Once the monitoring server has received the first ten reports, it begins its assessment. In our demonstration, they are all successes so far, so the server decides that we are starting off in a happy condition.
1 (success)
1 (success)
1 (success)
System status is normal.
1 (success)
1 (success) 1 (success)
The first failure does not immediately trigger an alarm.
0 (failure)
1 (success)
0 (failure)
0 (failure)
1 (success)
1 (success) 0 (failure)
When five out of the last ten reports are failures, the monitoring server determines that there is a problem.
0 (failure)
Alarm! System is in a degraded state.
0 (failure)
0 (failure)
0 (failure)
1 (success)
0 (failure)
0 (failure)
0 (failure)
0 (failure) 0 (failure)
For a while, the system is still in an Alarm
state. During this period, the monitoring server does not send out any further alarms; it only sends notifications for changes to the system status.
0 (failure)
1 (success)
0 (failure)
0 (failure)
0 (failure)
1 (success)
1 (success)
1 (success)
0 (failure)
1 (success)
1 (success) 1 (success)
When eight out of the last ten reports are successes, we infer that the incident is over and things are back to normal.
1 (success)
1 (success)
System status is normal.
1 (success)
0 (failure)
1 (success)
1 (success)
1 (success)
1 (success)
1 (success) 1 (success)