A monitoring server

Here we give a monitoring server that can aggregate events from multiple processes. This example demonstrates simple inter-process communication using sockets, multi-threaded applications, a daemon that responds to an interrupt signal, and calculating the success-to-failure ratio over a sliding window of event data to assess the health of a service.

This program uses two language extensions: numeric underscores and type applications.

{-# LANGUAGE NumericUnderscores, TypeApplications #-}

We will use sockets for inter-process communication. This comes from a library called network, although in this case we will only be using sockets to communicate between processes on the same machine.

import qualified Network.Socket as S
import Network.Socket.ByteString (recv, sendAll)

Whenever we open a socket, we always want to close it. This means we need to catch any I/O exception that may arise while using the socket. This import comes from the safe-exceptions library.

import Control.Exception.Safe

We send and receive byte strings over a socket.

import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8

Modules with “system” in the name generally deal with topics related to a process’s interaction with the operating system, such as how a process reads its environment variables, terminates itself, receives interrupts, or forks other processes. Here we import from the base and signal libraries.

import System.Environment
import System.Exit
import System.IO
import System.Signal

We use async to start threads and stm to create thread-safe mutable variables.

import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM

We use a Seq to hold the rolling window of events, and rational numbers for the arithmetic regarding what percentage of failures constitutes an emergency.

import qualified Data.Sequence as Seq
import Data.Ratio ((%))

import Control.Monad (forever, when)
import Data.Foldable (asum, for_, find)
import Data.Maybe (mapMaybe)

The scenario – Imagine we have many services that are performing some task repeatedly. This task, whatever it is, may fail sometimes; the success rate isn’t expected to be 100%. But if it gets really bad – say, if over half of the attempts are failing – then we’ll assume that indicates a serious problem that requires attention.

An EventReport represents the result of a single action. Each time a service finishes attempting the task, it will send an event report to a central monitoring server that aggregates all the reports.

data EventReport = Success | Failure
    deriving Eq

From the reports it receives, the monitoring server continually determines the current SystemStatus: an overview of whether, in general, actions tend to be succeeding or failing.

data SystemStatus = Okay | Alarm
    deriving Eq

Two programs in one – Because this example is meant to demonstrate inter-process communication, it is actually two programs; each process will play a different role that depends on what command-line argument it is given.

  1. The aggregate-reports command runs a monitoring server.
  2. The send-demo-reports command sends some fixed data to the monitoring server so we can observe how it reacts.

die is a convenient function which prints an error message and terminates the program. We use it here to handle the situation where the command-line arguments are not one of these three cases.

main =
  do
    hSetBuffering stdout LineBuffering

    args <- getArgs

    case args of
        ["aggregate-reports"]  -> aggregateReportsMain     -- 1
        ["send-demo-reports"]  -> sendDemoReportsMain      -- 2
        _                      -> die "Invalid args"

aggregate-reports – The monitoring server does three things:

  1. listen for event reports;
  2. inspect those reports to determine the system state;
  3. send notifications of system state changes

We use queues to pass messages between those three threads.

  • The receiveReports thread writes to reportQueue, and the analyzeReports thread reads from it.
  • The analyzeReports thread writes to alarmQueue, and the sendAlarms thread reads from it.

foldr1 race_ runs a list of actions concurrently. When any of these four threads stops – either by throwing an exception or by reaching its natural conclusion – then the other three will be stopped and the entire program will end.

aggregateReportsMain =
    withServerSocket $ \serverSocket ->
      do
        putStrLn "The monitoring server has started."

        reportQueue <- atomically newTQueue
        alarmQueue <- atomically newTQueue

        foldr1 race_
          [ receiveReports serverSocket reportQueue        -- 1
          , analyzeReports reportQueue alarmQueue          -- 2
          , sendAlarms alarmQueue                          -- 3
          , waitForTerminationSignal
          ]

        putStrLn "The monitoring server is stopping."

Termination – The three threads discussed above all run indefinitely. But all programs need some way to be stopped. In most terminal emulators, ctrl+C sends a termination signal to the foreground process. waitForTerminationSignal is an I/O action that blocks until the process receives a termination signal. Since we included this in the race above, the ending of this thread will result in aborting the other three threads, and then the monitoring server will come to a halt as requested.

waitForTerminationSignal =
  do
    terminate <- atomically (newTVar False)
    installHandler sigTERM $ \_signal ->
        atomically (writeTVar terminate True)
    atomically (readTVar terminate >>= check)

Message format – Clients will send event reports to the server. But how? We need to invent some system for encoding revent reports as byte strings. We’ll keep it very simple: failure events will be represented as the ASCII chacter 0, and success events will be the character 1.

encodeReport r =
    case r of
        Failure -> '0'
        Success -> '1'

Here we implement the reverse of encodeReport using find. See also inverting functions for an alternative approach.

decodeReport c =
    find (\r -> encodeReport r == c) [Failure, Success]

Receiving event reports – Sockets are for networking, but here we’ll be using sockets just to communicate between processes on the same machine. For the socket address, we’ll use an abstract Unix domain socket.

serverAddress = S.SockAddrUnix "\0haskell-phrasebook/monitoring"

openSocket = S.socket S.AF_UNIX S.Stream S.defaultProtocol

Whenever we open a socket, we should close it. Use bracket to ensure that this always happens.

Before we can start accepting connections from processes that will send us event reports, first we have to:

  1. Bind the socket to the address
  2. Tell the operating system that we are interested in listening for incoming connections on this socket.
withServerSocket action =
    bracket openSocket S.close $ \serverSocket ->
      do
        S.bind serverSocket serverAddress        -- 1
        S.listen serverSocket S.maxListenQueue   -- 2
        action serverSocket

Once we have our server socket set up, we use forever to enter an infinite loop.

accept awaits a new incoming connection.

This combination of mask and forkFinally gives us an effect similar to that of bracket; the difference is that the action that we take upon receiving a client connection takes place in a new thread.

receiveReports serverSocket reportQueue =
    forever $
      mask $ \unmask ->
        do
          (clientSocket, _clientAddr) <- S.accept serverSocket

          forkFinally
              (unmask (receiveReports' clientSocket reportQueue))
              (\_ -> S.close clientSocket)

receiveReports' is a loop that continually receives report data from the client socket and writes it into the report queue.

receiveReports' clientSocket reportQueue = continue
  where
    continue =
      do
        receivedBytes <- recv clientSocket 1024

        case BS.length receivedBytes of

When recv returns an empty byte string, that indicates that the sender has closed the socket, and this loop comes to an end.

            0 -> return ()
            _ ->
              do
                receiveReports'' receivedBytes reportQueue
                continue

Normally a parsing library would be involved at this point, but our protocol for this example is extremely minimal: each byte of input represents a single event report.

writeTQueue adds an event report to the queue, which will then be picked up by the analyzeReports thread.

receiveReports'' receivedBytes reportQueue =
    for_ @[] (Data.ByteString.Char8.unpack receivedBytes) $ \c ->
        for_ @Maybe (decodeReport c) $ \r ->
            atomically (writeTQueue reportQueue r)

Analysis – The analyzeReports thread will keep a list of the most recent ten reports.

  • If 80% or more are successes, we declare the system is in good working order.
  • If 50% or fewer are successes, we will determine that the system is failing.
  • If the success rate is between 50 and 80 percent, or if we do not yet have ten reports, we refrain from making a determination.

The (%) operator constructs a fractional number represented as a ratio, which is not subject to the sort of rounding error that affects types like Float.

reportWindowSize = 10
okayThreshold = 80 % 100
alarmThreshold = 50 % 100

analysis reports
    | Seq.length reports < reportWindowSize = Nothing
    | successRate <= alarmThreshold         = Just Alarm
    | successRate >= okayThreshold          = Just Okay
    | otherwise                             = Nothing
  where
    successes = Seq.filter (== Success) reports
    successRate = Seq.length successes % Seq.length reports

This thread runs indefinitely in an infinite loop of the recursively-applied continue function, whose two parameters compose the changing state of the thread:

  • status, initially Nothing, is the most recent determination of the system status.
  • reports, initially an empty sequence, holds the last ten event reports received.
analyzeReports reportQueue alarmQueue = continue Nothing Seq.empty
  where
    continue status reports =
      do

readTQueue removes a report from the queue (or waits patiently if the queue is empty).

        newReport <- atomically (readTQueue reportQueue)

We update the collection of recent reports by adding the new report to the list and then truncating the collection down to ten items, naming the result reports'.

        let reports' = Seq.take reportWindowSize
                        (newReport Seq.<| reports)

The asum function chooses the first Just among the list of Maybes. The new system status, status', is either:

  • the result of analyzing the new report data; or
  • the previous system status, unchanged.
        let status' = asum [analysis reports', status]

If there is a system status determination and it is different from the previous status, then we write to the alarm queue to indicate that the system has undergone a noteworthy change.

        for_ @Maybe status' $ \s ->
            when (status /= status') $
                atomically (writeTQueue alarmQueue s)

        continue status' reports'

Sending alerts – This part of a real system might trigger an email or a text message to whoever is on call. For demonstration purposes, we will merely print a line of text instead.

This is another thread that runs indefinitely using the aptly-named forever function.

That is all of the code for the monitoring server! Next we will write a process to connect to the server and send report data to it.

sendAlarms alarmQueue =
  forever $
    do
      a <- atomically (readTQueue alarmQueue)
      case a of
          Alarm -> putStrLn "Alarm! System is in a degraded state."
          Okay -> putStrLn "System status is normal."

send-demo-reports – This is another multi-thread program, following the same pattern we used in aggregateReportsMain: first we create a queue for the threads to communicate, then we start the threads.

sendDemoReportsMain =
  do
    reportQueue <- atomically newTQueue

    foldr1 race_
      [ generateReports reportQueue
      , sendReports reportQueue
      ]

We’ll define a fixed schedule of event reports for demonstration purposes. It tells the story of a system that is mostly successful for a while, then has an elevated failure rate, then returns to normal.

demoReports = mapMaybe decodeReport
    "1111111111111010011000001000000100011101111110111111"
    -- successes --     -- failures --    -- successes --

The generateReport thread iterates over the list of demo reports, writing each one to the queue. We introduce a tenth of a second (100,000 microsecond) delay between each event to add a little dramatic tension to the demonstration.

generateReports reportQueue =
    for_ demoReports $ \r ->
      do
        atomically (writeTQueue reportQueue r)
        threadDelay 100_000

The first step of creating the client socket is identical to the way we created the server socket above, using the socket function. Again we use bracket to fastidiously ensure that we always close what we’ve opened. But this time, instead of using bind and listen, we connect to the server.

withClientSocket action =
    bracket openSocket S.close $ \clientSocket ->
      do
        S.connect clientSocket serverAddress
        action clientSocket
sendReports reportQueue =
    withClientSocket $ \clientSocket ->

Again we have an infinite loop that reads from a queue.

        forever $
          do
            r <- atomically (readTQueue reportQueue)

For demonstration purposes, we’ll print a message for each event.

            putStrLn (case r of Success -> "1 (success)"
                                Failure -> "0 (failure)")

Each time we get a report from the queue, we convert it to a ByteString and send it through the socket with sendAll.

            sendAll clientSocket
                (Data.ByteString.Char8.pack [encodeReport r])

First start the report aggregator:

$ runhaskell monitoring.hs aggregate-reports
The monitoring server has started.

Then, in a different terminal, run the program that sends reports to the aggregator.

$ runhaskell monitoring.hs send-demo-reports

Back in the first terminal, we see the reports begin to come in.

1 (success)
1 (success)
1 (success)
1 (success)
1 (success)
1 (success)
1 (success)

Once the monitoring server has received the first ten reports, it begins its assessment. In our demonstration, they are all successes so far, so the server decides that we are starting off in a happy condition.

1 (success)
1 (success)
1 (success)
System status is normal.
1 (success)
1 (success)
1 (success)

The first failure does not immediately trigger an alarm.

0 (failure)
1 (success)
0 (failure)
0 (failure)
1 (success)
1 (success)
0 (failure)

When five out of the last ten reports are failures, the monitoring server determines that there is a problem.

0 (failure)
Alarm! System is in a degraded state.
0 (failure)
0 (failure)
0 (failure)
1 (success)
0 (failure)
0 (failure)
0 (failure)
0 (failure)
0 (failure)

For a while, the system is still in an Alarm state. During this period, the monitoring server does not send out any further alarms; it only sends notifications for changes to the system status.

0 (failure)
1 (success)
0 (failure)
0 (failure)
0 (failure)
1 (success)
1 (success)
1 (success)
0 (failure)
1 (success)
1 (success)
1 (success)

When eight out of the last ten reports are successes, we infer that the incident is over and things are back to normal.

1 (success)
1 (success)
System status is normal.
1 (success)
0 (failure)
1 (success)
1 (success)
1 (success)
1 (success)
1 (success)
1 (success)