Here we give a monitoring server that can aggregate events from multiple processes. This example demonstrates simple inter-process communication using sockets, multi-threaded applications, a daemon that responds to an interrupt signal, spawning new processes, and calculating the success-to-failure ratio over a sliding window of event data to assess the health of a service.
We will use sockets for inter-process communication. This comes from a library called network, although in this case we will only be using sockets to communicate between processes on the same machine.
Whenever we open a socket, we always want to close it. This means we need to catch any I/O exception that may arise while using the socket. This import comes from the safe-exceptions library.
We send and receive byte strings over a socket.
Modules with “system” in the name generally deal with topics related to a process’s interaction with the operating system, such as how a process reads its environment variables, terminates itself, receives interrupts, or forks other processes. Here we import from the base, signal, and process libraries.
The scenario – Imagine we have many services that are performing some task repeatedly. This task, whatever it is, may fail sometimes; the success rate isn’t expected to be 100%. But if it gets really bad – say, if over half of the attempts are failing – then we’ll assume that indicates a serious problem that requires attention.
EventReport represents the result of a single action. Each time a service finishes attempting the task, it will send an event report to a central monitoring server that aggregates all the reports.
From the reports it receives, the monitoring server continually determines the current
SystemStatus: an overview of whether, in general, actions tend to be succeeding or failing.
Three programs in one – Because this example is meant to demonstrate inter-process communication, it is actually three programs; each process will play a different role that depends on what command-line argument it is given.
- The aggregate-reports command runs a monitoring server.
- The send-demo-reports command sends some fixed data to the monitoring server so we can observe how it reacts.
- The full-demonstration command launches each of the other two.
die is a convenient function which prints an error message and terminates the program. We use it here to handle the situation where the command-line arguments are not one of these three cases.
aggregate-reports – The monitoring server does three things:
- listen for event reports;
- inspect those reports to determine the system state;
- send notifications of system state changes
We use queues to pass messages between those three threads.
receiveReportsthread writes to
reportQueue, and the
analyzeReportsthread reads from it.
analyzeReportsthread writes to
alarmQueue, and the
sendAlarmsthread reads from it.
foldr1 race_ runs a list of actions concurrently. When any of these four threads stops – either by throwing an exception or by reaching its natural conclusion – then the other three will be stopped and the entire program will end.
Termination – The three threads discussed above all run indefinitely. But all programs need some way to be stopped. In most terminal emulators, ctrl+C sends a termination signal to the foreground process.
waitForTerminationSignal is an I/O action that blocks until the process receives a termination signal. Since we included this in the race above, the ending of this thread will result in aborting the other three threads, and then the monitoring server will come to a halt as requested.
Message format – Clients will send event reports to the server. But how? We need to invent some system for encoding revent reports as byte strings. We’ll keep it very simple: failure events will be represented as the ASCII chacter
0, and success events will be the character
Receiving event reports – Sockets are for networking, but here we’ll be using sockets just to communicate between processes on the same machine. For the socket address, we’ll use an abstract Unix domain socket.
Before we can start accepting connections from processes that will send us event reports, first we have to:
Once we have our server socket set up, we use forever to enter an infinite loop.
accept awaits a new incoming connection.
receiveReports' is a loop that continually receives report data from the client socket and writes it into the report queue.
When recv returns an empty byte string, that indicates that the sender has closed the socket, and this loop comes to an end.
Normally a parsing library would be involved at this point, but our protocol for this example is extremely minimal: each byte of input represents a single event report.
writeTQueue adds an event report to the queue, which will then be picked up by the
Analysis – The
analyzeReports thread will keep a list of the most recent ten reports.
- If 80% or more are successes, we declare the system is in good working order.
- If 50% or fewer are successes, we will determine that the system is failing.
- If the success rate is between 50 and 80 percent, or if we do not yet have ten reports, we refrain from making a determination.
reportWindowSize = 10 okayThreshold = 80 % 100 alarmThreshold = 50 % 100 analysis reports | Seq.length reports < reportWindowSize = Nothing | successRate <= alarmThreshold = Just Alarm | successRate >= okayThreshold = Just Okay | otherwise = Nothing where successes = Seq.filter (== Success) reports successRate = Seq.length successes % Seq.length reports
This thread runs indefinitely in an infinite loop of the recursively-applied
continue function, whose two parameters compose the changing state of the thread:
Nothing, is the most recent determination of the system status.
reports, initially an empty sequence, holds the last ten event reports received.
readTQueue removes a report from the queue (or waits patiently if the queue is empty).
The asum function chooses the first
Just among the list of
Maybes. The new system status,
status', is either:
- the result of analyzing the new report data; or
- the previous system status, unchanged.
If there is a system status determination and it is different from the previous status, then we write to the alarm queue to indicate that the system has undergone a noteworthy change.
Sending alerts – This part of a real system might trigger an email or a text message to whoever is on call. For demonstration purposes, we will merely print a line of text instead.
This is another thread that runs indefinitely using the aptly-named forever function.
That is all of the code for the monitoring server! Next we will write a process to connect to the server and send report data to it.
send-demo-reports – This is another multi-thread program, following the same pattern we used in
aggregateReportsMain: first we create a queue for the threads to communicate, then we start the threads.
We’ll define a fixed schedule of event reports for demonstration purposes. It tells the story of a system that is mostly successful for a while, then has an elevated failure rate, then returns to normal.
generateReport thread iterates over the list of demo reports, writing each one to the queue. We introduce a tenth of a second (100,000 microsecond) delay between each event to add a little dramatic tension to the demonstration.
The first step of creating the client socket is identical to the way we created the server socket above, using the socket function. Again we use bracket to fastidiously ensure that we always close what we’ve opened. But this time, instead of using
listen, we connect to the server.
Again we have an infinite loop that reads from a queue.
Each time we get a report from the queue, we convert it to a
ByteString and send it through the socket with sendAll.
For demonstration purposes, we’ll also print a message for each event.
Full demonstration – To tie everything together and show these two programs running concurrently, this is a third program whose job is just to run the other two. We could have written this as a shell script, but doing it in Haskell is just as convenient.
spawnCommand runs a shell command, starting a new background process; the full-demonstration process continues on without waiting for the newly-spawned aggregate-reports process to complete.
We momentarily pause the demo here so that the aggregate-reports process has time to set up the server socket that the next process will connect to. This doesn’t take long; one second (one million microseconds) will be plenty long enough.
callCommand starts a new foreground process. The full-demonstration process will stop here and wait for the send-demo-reports process to complete.
Once the send-demo-reports process is complete, the demonstration is over. We finish by stopping the monitoring server and waiting for it to terminate. terminateProcess sends the process a termination signal, resulting in the execution of the
waitForTerminationSignal action that we wrote above.
Once the monitoring server has received the first ten reports, it begins its assessment. In our demonstration, they are all successes so far, so the server decides that we are starting off in a happy condition.
The first failure does not immediately trigger an alarm.
When five out of the last ten reports are failures, the monitoring server determines that there is a problem.
For a while, the system is still in an
Alarm state. During this period, the monitoring server does not send out any further alarms; it only sends notifications for changes to the system status.
When eight out of the last ten reports are successes, we infer that the incident is over and things are back to normal.
- The send-demo-reports process completes first.
- Then the full-demonstration process terminates the aggregate-reports process.
- Finally, the full-demonstration process completes.