Developing a Fast and Big data Acquisition System with a near Real Time Analytics – Part 1
Introduction
This series of How To or Case Study I am attempting to write was the result of the work of our team for the past 2+ years.We were developing a Predictive Analytics Platform for a global truck OEM. This was to be integrated with their live OBU data, Warranty, Research & Design, Customer Support, CRM and DMS among other things.
In this journey, we have attempted to solve the problem in incremental steps. Currently we are working on the predictive analytics with learning workflows. So, I believe Its time to pen down the experience with building the other 3 incremental solutions.
- First Baby Step – Fast Data capture and Conventional Analytics –
- Kafka, Redis, PostreSQL
- Next Logical Step – Big Data capture, Warehousing and Conventional Analytics
- Kafka, Storm/Spark, Hadoop/Hive, Zookeeper
- The Bulls Eye – Real Time Analytics on Big-Data
- Same as Above with Solr and Zeppelin
- The Holy Grail – Predictive Analytics
- Same as Above with MLib on Spark
Now, in this post I will write about “The First Baby Step”. This involves fast acquisition of data, Real-time analytics and long term data archival.
The disparate data sets and sources posed a significant complexity, not to mention the myriad polling frequencies, sync models and EOD jobs. It goes without saying that the #OEM had a significant investment in SAP infrastructure. We had studied multiple architecture models, (Some are available in this Reference Architecture Model from Horton Works and SAP)
The following are the considerations from the data perspective,
- FastData – Realtime Telematics data from the OBU.
- BigData – Diagonastics data from each truck had 40+ parameters and initial pilot of 7500 trucks.
- Structured Data – Data from Dealer Management System and Customer Relationship Management System.
- Transactional Data – Data from Warranty management and Customer Support systems.
Fast Data: Our primary challenge for the 1st phase of design/development was the scaling of the data acquisition system to collect data from thousands of nodes, each of which sent 40 sensor readings polled once per second and transmitted every 6 seconds once. While maintaining the ability to query the data in real time for event detection. While each data record was only ~300kb, our expected maximum sensor load indicated a collection rate of about 27 million records, or 22.5GB, per hour. However, our primary issue was not data size, but data rate. A large number of inserts had to happen each second, and we were unable to buffer inserts into batches or transactions without incurring a delay in the real-time data stream.
When designing network applications, one must consider the two canonical I/O bottlenecks: Network I/O, and Filesystem I/O. For our use case, we had little influence over network I/O speeds. We had no control over the locations where our truck sensors would be at any given time, or in the bandwidth or network infrastructure of said location (Our OBDs communicated using GPRS on GSM Network). With network latency as a known variant, we focused on addressing the bottleneck we could control: Filesystem I/O. For the immediate collection problem, this means we evaluated databases to insert the data into as it was collected. While we initially attempted to collect the data in a relational database (PostgreSQL), we soon discovered that while PostgreSQL could potentially handle the number of inserts per second, it was unable to respond to read queries simultaneously. Simply put, we were unable to read data while we were collecting it, preventing us from doing any real-time analysis (or any analysis at all, for that matter, unless we stopped data collection).
The easiest way to avoid slowdowns due to disk operations is to avoid the disk altogether, we mitigated this by leveraging Redis, an open-source in-memory NoSQL datastore. Redis stores all data in RAM and in hybrid models in Flash storage (like an SSD) allowing lightning fast reads and writes. With Redis, we were easily able to insert all of our collected data as it was transmitted from the sensor nodes, and query the data simultaneously for event detection and analytics. In fact, were were also able to leverage Pub/Sub functionality on the same Redis server to publish notifications of detected events for transmission to event driven workers, without any performance issues.
In addition to speed, Redis features advanced data structures, including Lists, Sets, Hashes,Geospatials and Sorted Sets, rather than the somewhat limiting key/value pair consistent with many NoSQL stores.
Sorted Sets proved to be an excellent data structure to model timeseries data, by setting the score to the timestamp of a given datapoint. This automatically ordered our timeseries’, even when data was inserted out of order, and allowed querying by timestamp, timestamp range, or by “most recent #” of records (which is merely the last # values of the set).
Our use case requires us to archive our data for a period of time, enabling the business users to run a historical analytics along with data from the real-time source.
Enter Data Temperatures,
Hot Data – The data which is frequently accessed and is currently being polled/gathered.
Warm Data – The data which is currently not being polled but still frequently used.
Cold Data – The data that is in warehouse-mode, but still can be accessed for BI or analytics jobs with a bit of I/O Overhead.
Since Redis keeps all data in RAM that is the HOT Area, our Redis datastore was only able to hold as much data as the server had “Available RAM”. Our data, inserted at a rate of 27GB/hour, quickly outgrew this limitation. To scale this solution and archive our data for future analysis, we set up an automated migration script to push the oldest data in our Redis datastore to a PostgreSQL database with more storage scalability. As explained above, since Redis has native data types for Time Series data, it was a simple enough process for the Load operation.
The other consideration to be exercised is the “Available RAM”. As the amount of data that is queried, CPU cycles used and the RAM used for the Processing determines the amount of memory available for data stores. be reminded if the data-stores are fill to the brim your processing job is going to utulise the disk I/O. Which is very bad.
We wrote a REST API as an interface to our two datastores allowing client applications a unified query interface, without having to worry about which data-store a particular piece of data resided in. This web-service layer defined the standards for the time, range and parameters.
With the above represented architecture in place, generating automated event detection and real-time notifications was feasible, again through the use of Redis. Since Redis also offers Pub/Sub functionality, we were able to monitor incoming data in Redis using a small service, and push noteworthy events to a notification channel on the same Redis server, from which subscribed SMTP workers could send out notifications in real-time. This can even be channeled to an MQ/ESB or any Asynchronous mechanism to initiate actions or reactions.
Our experiences show Kafka and Redis to be a powerful tool for Big Data applications, specifically for high-throughput data collection. The benefits of Kafka as a collection mechanism, coupled with inmemory data storage using Redis and data migration to a deep analytics platform, such as relational databases or even Hadoop’s HDFS, yields a powerful and versatile architecture suitable for many Big Data applications.
After we have implemented HDFS and Spark in Phase 2-3 of this roadmap, we have of-course configured redis in the said role. Hope I have covered enough of the 1st step in our Big-Data journey. Will write an article per week regarding the other 3 phases we have implemented successfully.