In the fast-paced domain of real-time stream processing with stringent performance demands, selecting the right tools is critical. At Ideas2IT's Machine Learning services practice, we frequently encounter such scenarios, prompting us to explore toolsets capable of meeting these rigorous requirements.
This blog post outlines our comparison of solutions based on Golang, Spark, and Python, focusing on their abilities to handle complex data processing tasks. We delve into their performances in terms of scalability, ease of development, and monitoring, aiming to provide insights into which tool is best suited for various use cases.
By evaluating these popular options, we aim to assist developers and data engineers in making informed decisions when designing high-performance stream processing systems. From infrastructure considerations to implementation intricacies, this analysis offers a comprehensive overview of the strengths and limitations of each solution, enabling readers to choose the most suitable tool for their specific needs.
Adapting to New Challenges and Use Cases
Big data processing, as a field, has been receiving a lot of attention from developers in the past few years, and there are a number of great open source projects to show for it. Most of these projects expect the problem to be expressed as a Directed Acyclic Graph (DAG), with each vertex being a processing task. This works great for many types of data processing requirements, but increasingly we have been seeing use cases where this is not enough. One common scenario is where recent historical data from a certain user (from just a few seconds back) could affect the prediction for the current row.
We took one such use case and created separate implementations using Python, Spark, and Golang, and compared them for performance, monitoring, ease of development, scalability, and maintainability.
Solving the Puzzle
We get json access logs for many users, the aim is to predict whether an event is an anomaly or not. To do this, we need to store some metadata about each user - we need aggregate information for all the locations that a user has logged in from, what the most recent cities are, recent login times, etc. This data is needed for the processing of every incoming row, and would be modified by every incoming row. Once we have retrieved the most recent user aggregates and also modified them with the new data, we do the actual prediction, which is fairly straightforward, and have no other dependencies.
Design Possibilities for User Aggregate Management
- We could get the user aggregate retrieval and update process to happen in a single thread. This would make sure user aggregates are always completely up to date, but it would also prevent the program from scaling beyond a single core.
- We could use a redis cluster/instance as storage for the user aggregate map, and send every update for each row to Redis separately, and because redis has atomic increment and getset, there can be multiple processes updating the aggregates, and there won’t be any data races. But as the number of messages per second increases to the tens of thousands, the aggregate map will start to lag behind the incoming logs by several seconds - because of network delays, all the copying required, and because of the redis cluster not being able to keep up with all the atomic update requests.
- The third approach is to group incoming messages into user-wise 1-second batches, and send those batches to individual workers. Because each batch will only have logs from a single user, workers would only have to retrieve the aggregate data for that user only once for that entire batch. And the worker can collect aggregate delta for the batch, and send just a single update to redis.
Spark, Golang & Python Implementations & Performance Analysis
In this analysis, we delve into the performance of Python, Spark, and Golang implementations in data processing. Python, though initially efficient, faces limitations in scaling due to its single-threaded nature, despite efforts to introduce multiprocessing. Spark's strategy of log distribution among workers demonstrates adaptability, albeit with challenges in maintaining user aggregate maps. Conversely, Golang's concurrent processing capabilities shine, offering efficient batch management and distributed systems. Each solution showcases distinct approaches to real-world data processing demands, illuminating the trade-offs between simplicity, scalability, and performance.
Python
Infrastructure: 1 VM on on-premise server, with 4 cores at 3GHz and 8GB of RAM.
The logic and the model were designed and build in Python first, as a single threaded application. It used a dictionary with Pandas objects to store the user aggregate info, and the program was able to process around 60 input logs/sec. We introduced batching and multithreading using the multiprocessing library, but it was still not able to handle more than 400 logs/sec. Once we had the model ready, we spent less than a week building the first version in Python.
Spark
Infrastructure: 3 VMs on on-premise servers, each with 4 cores at 3GHz and 8GB of RAM.
We built two versions using Spark. In the first attempt, we built a system that distributes logs widely among workers, and for each incoming log, workers fetch that user’s data from Redis then update 6 fields for each incoming log. That’s a lot of redis hits. Keeping the user aggregate maps upto date was the bottle neck, so we used 1 VM for the prediction and 2 for the user aggregate management. When then redis updates were asynchronous, we were able to get a throughput of 10,000 logs/sec, but the user aggregate map was always lagging behind the latest logs by more than 20 seconds.
The throughput from the above method was good enough, but the the user aggregate lag was too bad. So in the second attempt, instead of sending every single update to redis, for every 2-second batch that comes from Spark’s StreamingContext, we aggregate and send only one update to redis, for each unique user in the batch. We were still using 2 machines for user aggregate management, and one for the predictions. The throughput was still at 10,000 logs/sec, but the user aggregate map’s lag came down to around 5 seconds.
Golang
Infrastructure: 1 VMs on on-premise servers, each with 4 cores at 3GHz and 8GB of RAM.
Spark provides a lot of features, but when you have a problem that does not fit perfectly into its programming model, we have to do clever things to work around the restrictions, and that usually leads to reduced performance.
Golang is a modern programming language designed with concurrency and performance in mind, and it is easy to build concurrent data processing pipelines with just language constructs.
First we built a single multithreaded application, that has a batchManager that spins off new batchers as messages for new users start to appear, and kill batchers that have not received any messages in a while. It also has a location manager, a user manager, which provide single threaded access to maps that store location data or user aggregate data, and batch processors can request and receive back data from the data manager threads using channels. This version is able to process 20,000 logs/sec and keep the user aggregate map’s lag to below 1 second all the time, and double that with same lag, on an 8 core machine. We spent a little over a week to build this, and another week later, it was deployed to production. It has undergone a lot of changes and enhancements, but this is still the production version.
We also have a distributed version, which is just some distributed code added on top of the first version. We use Hashicorp’s memberlist and raft implementations to create a cluster that can maintain itself.
The master node elected by raft gives leases to machines on which node should process which user, and other nodes that received that user’s data would parse the json messages, put them in a struct and forward them to the other nodes using a binary serialization library gob. The machine with the lease pulls user aggregate data from redis, maintains it, and sends a single copy to redis as backup, once every 30 seconds, or when the lease expires.
With 3 quad-core VMs receiving data from a kafka topic with 3 partitions, the clustered setup is able to process around 50,000 logs/sec, and the user aggregate map still has a 1 second lag at most. And this was built in another week of design and development over the non-distributed version.
Summary
The custom solution in Golang was clearly much better than Spark in performance, and it could still be developed in around 2 weeks against Spark solution’s 1 week.
The Golang program was designed from the ground up. Golang has just enough features to accelerate development, but because it is a full programming language, there is still a lot of flexibility on how exactly you want your solution to work, unlike Spark where your design has to exactly fit Spark’s model. Golang has green threads, communication and synchronization between threads using channels, clustering and leader elections through hashicorp’s memberlist and raft libraries, fast binary serialization/deserialization through gob, protobuf, flatbuffers, etc. Designing distributed systems, or even thinking about them in general needs some getting used to, but once we are past that stage, Golang provides the best balance between performance and development time.
We are also in the process of building a minimal generic (code generation based) framework in Golang that provides even more glue code for message passing, key based leases for specific machines to get all messages with a specific key, etc. We will keep writing as we make progress.