High Performance Streaming Data Processing Implementation and Analysis

koders-stream
koders-stream

Introduction

This article is more like a proof of concept of an implementation of microservice development, based on a real world implementation of a specific project build on top of integration of two stock markets, Nasdaq and BIST. The project requires different type of approaches and know-how which can be grouped into two categories, stream data api implementation and data analysis. Both sides have different challenges as they have different requirements.

At the early first kick of the project, the development team needs a flexible and portable environments for their micro services. Different versions of a particular service should be easily deployed to the proper environment by keeping consistency stable. These level of automation should solve problems such as dependency management, service discovery, test automation and lightweight deployments. All these know-how should be shared and understood to almost every member of the working group to provide easy integration to the project.

Data Streaming Project Use Case Analysis

The aim of the project is -simply- developing a low latency data streaming api to the clients in order to provide a wide variety of financial data. Streaming data needs to be consistent and low latency for the clients since It contains transaction messages as well as market data information. The market is highly competitive environment therefore delivering consistent data in a low latency is another asset by the means of quality.

Core part of the API  is responsible for receiving, transforming, logging and propagating the stream data to the clients. As the architecture implies, there are independent I/O operations on top of one single stream input which requires a lock-free concurrent engine. Therefore  It is crucial to point out that providing the consistent stream data and the result of how the market is behaving is important for clients. This is where machine learning part of our project comes in, where one can run any sort of machine learning algorithm to teach the behavior of the market based on the state of orderbook and trading aggressiveness of an instant time which follows up to the possibility to make a prediction to open up a position for the near-real time. One of the main motivation of this part of the work is strongly related with behavioral finance which is a research field which concentrates on to understand characteristics of collective behavior of individuals. So we wanted to introduce an engine consists of different pluggable components to understand that behavior from trading characteristics of individuals, so that we can even identify what is happening through the country behaviorally by just observing market data. We would have hopefully wanted to come to a point where without knowing the semantic meaning of flowing data, we can detect any anomalies which mostly caused by an abnormal news in a related community near-real time.

Business Analysis

On the case we chose to work on, people create transaction by using financial asset either by buying or selling. These transactions is basically function of the time. In the streaming data processing, theoretically any algorithm is behind the present. In today’s computational work, this is an inevitable truth due to the couple of latencies exits in the environment such as; cpu cycles, cpu caches, ram, disk, and the worst one is the network latency. These can be expressed as physical factors which get any better with proper hardware or service combinations. Therefore, finding the feasible product or service (feasible covers the affordable price and manageable service or hardware sets), eliminates all the factors left to the performance of software developed. In the beginning, we planned to develop high performance, portable, modular, scalable, easily configurable microservice oriented software services.

Tools, Libraries and Services Used

From both the infrastructure and software development point of view, there have been many tools, libraries and services are used within the development phase. All these terms are applicable for production cases as well (with the proper specs and configurations for the real world requirements).

  • The core business logic nodes is implemented in Java using the Vertx library. There are a couple of nice features we have got as advantage.
  • Vertx provides a high level of abstraction on running multiple JVMs as a cluster of nodes (on top of Hazelcast). This is a win-win feature both for development ( abstraction and implementation are easily configurable ) and operation side ( any JVM microservice can be scalable and failsafe without thinking about the sticky objects such as distributed stateful messages )
  • Service discovery feature is required both for adding and removing nodes to the cluster and leader election for stream data producer node. Zookeeper provides both leader election and service discovery features. Beside that, It is highly available out of the box, which is a win-win feature again.
  • All JVMs run inside individual Docker containers.  Even more with docker-compose and swarm, we can deploy and test multiple AND/OR multi-tier microservices on any computational platform.
  • Many AWS services are used for IaaS and SaaS such as, EC2, S3, Cloudfront, Lambda Function, ECR, Route 53, VPC, SES. These kind of resource level automation is applied by Terraform.
  • Saltstack is for provisioning the EC2 instances and Docker deployments.
  • Zabbix and Sysdig is for monitoring instances at the least granular level.
  • Jenkins, sits in the middle of the development life cycle by running automated builds/tests and deployments.

Data Center Level Fail Safety

The API needs to be failsafe both in the cluster level and data center level. For the purpose of obtaining the fail safe data centers, we choose to behave them as completely separated environments. Each DC can be interpreted as an abstraction of endpoint, working, being managed, monitored and automated in exactly the same way as others. Therefore, It is like each of the data center is like one giant server providing the same services.

Therefore, the only work to be done is to route the end users to the functional data center in a feasible way. There are many techniques to can be applied like service discovery or application centric approach, however they need more management and have consistency issues. Since the smart DNS resolution protocol provided by AWS Route 53 service provides the active health checks and failsafe query features with almost the lowest pricing model in the market, we have chose to implement Internet traffic on top of that. Maintaining such fail-safety with DNS is a feasible choice for a few options as well :

  • It is cheap, highly responsive ( traffic routes itself to a secondary data center under 10 sec. ) and contains no complex configuration.
  • It sits on top of the architecture by providing abstraction to the clients.
  • Neither application business logic nor configurations provided to the app knows nothing about it.

Active health check simply monitors the endpoints by HTTP request or TCP binding ( In this case they are identified as dc1.hostedzone.net ad dc2.hostedzone.net ) and either a haelthy enpoint, successful HTTP responds or a valid response string, depending on the configuration provided. If the desired state satisfied, It is assumed that the DC is healthy and ready for clients. It is also a matter of configuration to label data centers as primary and secondary. Therefore, the CNAME definition sits on the top ( which is api.hostedzone.net in this case ) gets updated accordingly by pointing a load balancer or a web application firewall. When the data center endpoint tagged as primary dies the failover happens and the new endpoint becomes the new data center. Therefore when the disconnected clients wants to make a new connection, they simply be routed to the secondary data center. In the same behavior, when the primary gets on line, new connection attempts will be handled by the primary again. The visualization as follows :

image01

Auto Scaling on the Cloud

Stock markets always have specific time intervals in each business day. Therefore It is possible to foresee the possible client workload demands for a particular market, which leads to opportunity to scale the system up to a required number of nodes. As the behavior of the business case, It is almost impossible to predict when the transaction density will go beyond a specific point (the point which exceeds our processing limits, we call It overloading point) therefore, the auto-scaling policies are applied based on time windows first. The scaling policies firstly depends on the specific hours of work days, which we know in advance by analyzing the raw logs. Between these specific times, the system serves at maximum level of nodes. Beside that, many of them shut themselves down. (We label the bare minimum instances and we monitor them at the least granular level, therefore we gain to have an idea about what is going through the cluster.) The second policy in auto-scaling policy is a network and CPU utilization combination. At rare cases It bursts, just to be safe, this rule is applied as well.

Every week, we try to analyze the utilization of the whole cluster and shrink the unused nodes one by one. Since we handle the peak points in a daytime, we are good.

Spot Instance Management with Terraform

Data analysis contains algorithmic learning on a relatively big data set. That is mostly more than one month data extracted and logged from the stream. The developed algorithms requires temporary cpu and memory resource power at large scale. The computation is simply started by Docker containers deployed one per EC2 instances. Containers simply pulls the required set of data from S3, processes them, then periodically push the results back to the S3 again. There should be no state stored in the Docker container (that would make the size of the Docker image larger, that would lead to scalability problems) since we could lose any node any given time which leads to inconsistency.

To manage the cluster of EC2 instances, an automated setup does the followings:

  1. A custom shell script gets all the spot instance prices at any given time from various AWS regions ( and including all the availability zones ) and selects a price on the cheapest regions availability zone. The betting price is determined based on a formula which ended up a couple of cents more than average. This way, we can satisfy the durability of the cluster for a few hours.
  2. After the calculation, we know where to deploy our compute intensive instances. The second part of the pipeline passes the zone, price and number of EC2 instance information to the Terraform state files and the Terraform binary spins these instances and provisions them. All of these actions are applied within a Jenkins job and ended up under 5 minutes.

The provision phase, only pulls the specific image from AWS container registry and runs them.

image02

Development Life Cycle

Throughout the development cycle, several libraries, algorithms and setups have been applied to the architecture. It is a requirement to test all these components as granular as possible to make sure they don’t make the whole system fragile and weak. That’s almost the definition of CI/CD pipeline. In order to satisfy that requirement, we have applied a combination of tools & services as follows :

  • All build logic and dependency management is embedded in Gradle & Groovy scripts. Comparing to the maven, Integrating people and the environment to the Gradle is a lot more easier, reading and developing is a lot more consistent. This way, we obtain a runnable artifact created bundled with the specific configuration set (for the suitable environment) all in one container. Code build job runs whenever there is a change on a specific branch.
  • Beside the unit tests (which runs on every build), there is also integration test which requires some time and relatively bigger resources to run. That’s mostly nightly build job (If the data size is reasonable for the integration tests, we run them on every change).
  • On a weekly basis, a load test needs to be applied as to test the changes on the master branch. For the specific case, this is compute intensive operation which requires minimum two VMs (all of which spins up on demand as spot instances and terminates themselves afterwards), one of which runs the application that makes the transformation (that is the mission critical logic in the architecture mosty) and the rest of the VMs pushes the data into it through a TCP socket at burst. The job results some metrics therefore then the team interprets how the changes affected the system.
  • Since we have portable deployment environment on top of Docker containers, developers can create almost any set of microservice setups on their own workspaces ( on their personal laptops, assuming that they have enough memory for the stack ) by using docker-compose. Therefore, a change in the codebase in a specific tier can be checked once by the developer himself, before he pushes the commits to the shared pool.

image04

Microservice Architecture

For the purpose of playing data vendor role, one must listen several data sources, analyze and transform them and then merge & publish these stream as one service, the stream API. Instead of setting up a monolithic stack to process these different and independent sources, we have chosen to implement a set of loosely-coupled transformation services. They run independent of each other and propagate their input stream to the merger cluster after the proper transformation.

This setup gives the flexibility to manage and configure each service component in their own problematic scope. Since they are all JVMs running inside a container, they all support HA and scalability.

Service Discovery

The whole system service discovery is basically based on the configuration files which are required to be given on their launch time. These configuration files includes information such as where to connect, user name/password pairs, application runtime specific configurations and environment specific information like whether to run as cluster or standalone. Since we have divide the architecture into two parts as running binary and the configuration set, we are required to store, generate and pull these configuration from a service.

 

At the early implementation of the design, we have used etcd as a service to store and retrieve the configuration parameters. In order to create a dynamic configuration file, a confd client is configured in the CI system which generates files based on a template file given. This has been a powerful solution for service discovery ( especially If you have requirement to make this generation at runtime ) however, for the set of services we produce, that is not a strict requirement. A static file server is like S3 will be more easy to maintain and manage. Moreover, In order to satisfy HA, we need to set up at least 3 node etcd cluster. In the end, S3 is an easy solution and covers all of our requirements.

image03

 

Leader Election

In order to provide two active-active system to run in active-active manner, there is a leadership mechanism required to satisfy the one publisher at a time. Two or more nodes simple listens and do their transformation stuff inside their JVMs however only the one who has the Zookeeper key simply publish the messages to the network in multicast. Since the Zookeeper cluster consists on three node, we can scale It up to 5-7 or 9 to make It more fault tolerant.

image05

 

SHARE

LEAVE A REPLY