Log processing at scale - ELK cluster at 25,000 events per second.
At Viki, we run a number of micro services that process thousands of requests per second in various geographical regions. Micro service architecture helps us break down the complexity of building a large distributed system, but also introduces the complexity of debugging an issue.
Our first point of exposure to any issue is an alert raised from one of the monitoring services we use (currently we are using SignalFx). The alert gives us the information about the nature of the issue (high latency, increased 5xx errors etc.) and the service components which are under distress.
For most cases, the alert gives us enough idea about which part of the system needs attention. But if we have to SSH into tens of servers to look at the service logs, that would be a nightmare. One of the most helpful tools for debugging is our ELK stack - Elasticsearch Logstash Kibana stack.
As you can imagine, logging thousands of requests per second at every level in our stack means a multifold increase in the number of events that we log. We learnt some things while creating a setup to handle this scale.
- Elasticsearch is a well known search server that is built on top of Apache Lucene - a full text indexing and searching engine. It provides a RESTful web interface, along with the ability to handle a large number of indexing and search requests.
- Logstash is a log aggregator and it can perform filter actions (mutate, add field, remove field etc.) on log events. It is a 3 part pipeline - Input > Filter > Output. We will discuss this pipeline in detail in the later part of this post.
- Kibana is a frontend to Elasticsearch which provides graphing and visualization capability on Elasticsearch data. It even supports Geolocation data visualizations.
We receive logs from the following sources
- Access logs at both load balancer level and application level. In a distributed system it is important to monitor requests at all levels to be able to effectively debug a situation. When a request arrives at our Load Balancer it gets uniquely tagged and the same tag is present for that request across all services.
- Application logs - Applications write debug information (including crash events) to STDERR or a separate file.
Logstash is composed of Input, Filter and Output chain. A single logstash process can have multiple of each pieces in the chain. For e.g. you can have multiple file inputs, multiple logstash-forwarder inputs.
One input translates to one input thread. You can set the number of Filter worker threads in the configuration. But for output there is only one thread, unless the plugin allows setting up multiple worker threads. When we started with logstash, elasticsearch output plugin only had a single output thread, but recently, some output plugins allow multiple threads. From our experiments, a single logstash process can write up to a 1000 events per second to elasticsearch with a single output thread. With multiple output threads, we could do close to 10x this number based on the horse power of the machine.
This number may vary based on the size of your log lines, the number of filter steps and the horsepower of your CPU/Network. Read the elasticsearch official documentation on Logstash Pipeline for a better understanding.
Keeping this in mind, we have a bunch of logstash processes which we use collectively as a pool to throw events to elasticsearch. This provides redundancy as well as horizontal scalability as we can simply add more machines to scale our logstash cluster.
We currently have 4 servers for logstash with 8 cores, 16gb ram each. Each machine has a pool of logstash processes with some processes dedicated for events received from services and some processes dedicated for event received from clients, over HTTP.
An HAProxy in front of the logstash processes is used to load balance the incoming logs from various applications and clients. For lumberjack input (from logstash-forwarder), the HAProxy acts as a TCP proxy, with a simple TCP connect as the healthcheck. For HTTP client logs, based on the URL, we set a type on the log events. For e.g. logger.viki.io/mobile gets a log type of mobile_logs.
Logstash-forwarder on the application side is used to forward logs from a file to logstash. It sends logs as they are written to the file and keeps a record of the location upto which it has read the logs in the log file and works well with logrotate. Before the popularity of logstash-forwarder, redis and rabbitmq were used to buffer log events before sending the logs to logstash to prevent losing any logs.
Docker and Logs
We run multiple docker containers per machine and we volume mount the
/var/log directory inside each container to
/var/log/docker/<container_name> directory on the host. We run a logstash-forwarder docker container on each host which reads the logs from the
/var/log/ docker directory on the host and forwards all the logs of all containers to logstash. To add more sources of logs, we update the configuration for our logstash-forwarder container in Consul, and the logstash-forwarder process reloads itself using consul-template (essentially, a watch on a key in Consul with a callback whenever the key changes).
TL;DR: Based on our learnings so far, there are 3 main golden rules for maintaining an Elasticsearch cluster
- Experiment in a staging environment with different hardware configurations with a similar indexing and search volume as production. No size fits all.
- If you are running out of memory, and you have to clear your field cache, then you are doing something wrong. Your cluster needs more horsepower.
- Practice cluster restarts. Faster recovery means happier devs.
For our elasticsearch cluster, we run 20 nodes with the following specs.
- 12 cores, i7-3930k
- 64 GB RAM
- 2 x 3 TB hard drives configured as RAID 0.
Shards and Replicas
Each logstash index has 20 shards (one shard per machine) and 4 replicas (including the primary). This distributes the load of querying equally on all the machines and we have the ability to restart 3 machines at a time (or handle failure of 3 machines at a time). We keep 20 days of data as open indexes while we close the indexes for data older than that. If we wish to query the logs for older data, opening 20 more days of older indexes takes around 4-6 hrs, which is an acceptable SLA for those who wish to query older logs. Please read the hardware requirements guide on elasticsearch website as it details how you should go about choosing your hardware based on your needs. TL;DR - Experiment with different hardware specs and log volume.
On each elasticsearch machine, we allocate 30 GB of heap to the JVM process which is ~50% of the total memory on the machine. This is a recommended amount of memory as the rest of the memory on the machine can be used in the form of linux caches by Apache Lucene. Also setting up memory more than 30gb can be harmful as JVM does not compress heap which is bigger than 30gb. Even if you have 64gb of ram or more, 30gb of heap is ideal because Lucene is designed to leverage the underlying OS for caching in-memory data structures. So the rest of the memory can be used by Lucene for caching.
Also ensure that none of your elasticsearch nodes is swapping. Its best to disable swap entirely and also use the configuration
bootstrap.mlockall: true which will prevent the memory from being swapped by the OS. Read the heap-sizing documentation for more details.
Field Data and Doc Values
Field data is the data kept by elasticsearch in memory for each index for faster searches over the same index. Field data objects are useful for faster searching but if their usage is not monitored or capped, it can lead to OutOfMemory issues, essentially killing your cluster. If you see Field Data filling up fast and leading to memory issues, then you need to rethink your memory requirements for the cluster.
Field data is particularly useful for sorting (which is very common use case for log visualization). Elasticsearch loads the values from every document in your index regardless of the document type or the documents that matched a particular query type. This consumes a lot of memory so you need to ensure that you set proper limits on field data including circuit breakers for queries that might lead your Elasticsearch to OOM. You can read about how to limit memory usage on the elasticsearch website. Another alternative to field data is to use doc values. Doc Values is the on-disk equivalent data store for field data.
To monitor our elasticsearch cluster, we send metrics using a Collectd plugin to SignalFX. We also find Elastic's Marvel to be a very friendly tool for monitoring indexing rate and machine health. We use KOPF plugin for visualizing shard allocation and applying transient/persistent settings for cluster maintenance. Curator is an additional swiss-knife that sits in our cronjobs to close older indices (>20 days) and delete older indices (>90 days).