top of page
Writer's pictureGang Tao

How to Get an Edge with Real-Time Market Data using Timeplus



Timeplus exists to be the fastest way for you to start with streaming data and get actionable insights.


The streaming data market continues to grow globally as more devices are online, at all times. One technology trend we’ve seen amidst this macro-trend is the use of WebSockets and server-sent events (SSE). These two methods are used to stream data directly from server to client, in a continuous manner; as opposed to traditional request/response methods. Previously this blog compared the performance of WebSockets versus server-sent events (SSE). The key takeaway is that they have similar performance, but the usage requirements will help you decide which to use. 


When it comes to actionable insights, few industries place greater value on insights than the capital markets. Instantaneous opportunities in trading happen quickly and highly profitable actions must be taken before the opportunity perishes as the market moves. More and more market data providers are using WebSockets and SSE to stream data for  trading equities, options, forex, cryptocurrencies, etc. Previously this blog discussed streaming analytics patterns like realtime PNL, position-keeping, and TCA. The key takeaway is that you can save time and headache with SQL to handle complex analytics - by not having to write, debug, and maintain custom code.


Today’s blog is the first in a series of blogs explaining how to use Timeplus to extract insights from data streams, a technical deep dive into connectivity and some interesting analytics appearing in the field, and finally an analysis to uncover patterns in the market. 


In today's blog, I am going to show you how to ingest real-time market data, as provided by sources like: 


(BTW, IEX uses SSE because of a straightforward implementation, and we support SSE via our go-based API service - give us a holler if you want to try out IEX + Timeplus.)


 

Connectivity


The vendors above provide real-time market data APIs via WebSocket and follow one of a few patterns:


  • Connect to server: the specific server might vary, based on the asset class

  • Receive connection acknowledgement

  • (Possibly provide authentication key)

  • (Receive authenticated acknowledgement)

  • Send subscription request 

  • Receive stream of data, for all requested financial instruments

  • Unsubscribe or revise subscription list as needed 


The messages back and forth are in JSON format. A very common request is to provide some bridge between these json-formatted, market datastreams and a quantitative research environment like a Jupyter notebook powered by Python and libraries like Pandas, NumPy, etc. Ideally, the “stay-alive” nature of a WebSocket would be preserved and would handle the streaming data ingress regardless of the stat of the research environment. In other words, you’d have a streaming datastore capturing all the data, ready to run any analytics needed to get things ready for Python, then connect to that datastore whenever (from wherever) to do the quantitative analysis against live and historical market data. 


This is where Timeplus Enterprise comes in: you simply setup the data ingestion from your market data provider, create a stream to handle both the live data ingest and historical data store, then use Timeplus’ Python library to connect as needed. Plus you can leverage Timeplus Enterprise to quickly write analytics in a scratchpad, and even visualize the stream and store of live and historical prices. 


So let’s get started!


 

Start a Free Trial


Download Timeplus Enterprise and start your 30-day free trial (no credit card required). See installation options for Linux, MacOS, or Windows.


Once you have it installed, you have everything you need to connect to market data, run SQL queries for streaming analytics, build visualizations, and prepare data for quantitative analysis.


 

Create a WebSocket Data Source 


Next step, we are going to create a WebSocket data source and ingest these market data into a Timeplus stream.


Open the Timeplus Web Console. On the left navigation menu, click Data Collection, then click the WebSocket tile.



Here we are going to use Coinbase’s API and WebSocket server, but feel free to use any market data provider and list of symbols you like.


Input:

  • WebSocket URL - wss://ws-feed.exchange.coinbase.com

  • Messages - {"type": "subscribe","product_ids": ["ETH-USD","BTC-USD","LTC-USD","BCH-USD","XRP-USD"],"channels": ["ticker"]}

  • Chose `Read as` to `Text`




In the preview step, create a new stream called `coinbase_tickers_raw` which only contains one field `raw`, we will ingest the raw json data as a string returned from the WebSocket server. 


In the final step, review the configuration and create the source.



 

Transforming the JSON data


Now if we run `select * from coinbase_tickers_raw` in the console, we can start exploring the details of those market data events.



Here is one of the events:

{
   "type": "ticker",
   "sequence": 70702869627,
   "product_id": "BTC-USD",
   "price": "42831.66",
   "open_24h": "45128.54",
   "volume_24h": "32408.38101719",
   "low_24h": "40625.68",
   "high_24h": "45519.32",
   "volume_30d": "400693.62167705",
   "best_bid": "42826.87",
   "best_bid_size": "0.02200000",
   "best_ask": "42831.66",
   "best_ask_size": "0.18639007",
   "side": "buy",
   "time": "2024-01-03T21:39:15.839968Z",
   "trade_id": 591227018,
   "last_size": "0.00110993"
}

To run a better analysis, we can extract all the fields from the raw event by running the following query.

SELECT
raw:type AS type,
raw:sequence AS sequence,
raw:product_id AS product_id,
cast(raw:price,'decimal(10,2)') AS price,
cast(raw:open_24h,'decimal(10,2)') AS open_24h,
cast(raw:volume_24h,'decimal(20,8)') AS volume_24h,
cast(raw:low_24h,'decimal(10,2)') AS low_24h,
cast(raw:high_24h,'decimal(10,2)') AS high_24h,
cast(raw:volume_30d,'decimal(20,8)') AS volume_30d,
cast(raw:best_bid,'decimal(10,2)') AS best_bid,
cast(raw:best_bid_size,'decimal(20,8)') AS best_bid_size,
cast(raw:best_ask,'decimal(10,2)') AS best_ask,
cast(raw:best_ask_size,'decimal(20,8)') AS best_ask_size,
raw:side AS side,
to_time(raw:time) AS time,
raw:trade_id AS trade_id,
cast(raw:last_size,'decimal(20,8)') AS last_size
FROM
coinbase_tickers_raw

If you would like help crafting these streaming ETL queries, feel free to join our Slack Community and post a question, check out our documentation, or try out our doc bot (click in the chat icon in the bottom right corner).


You might notice the type we are using for the prices is Decimal, not float() like you might initially consider. 


There is a lot to this topic - but as a best practice, for financial numbers, decimal is preferred. A quick justification is apparent when you issue a simple query:

:) SELECT 1 - 0.9
SELECT
  1 - 0.9
Query id: fb6dfead-f428-4d7b-b7c0-75be051072d9
┌───────minus(1, 0.9)─┐
│   0.09999999999999998 │
└─────────────────────┘
1 row in set. Elapsed: 0.101 sec. 

Note, field:type_str is basically another format of cast(col, type_str), here will just run this query to extract all fields to specific types we need.


We create a view called `v_coinbase_ticker` using this query and if we query this view, we get all the real-time price data.



At this point we are looking at raw data, straight from the market data provider. To get prices into a form that a machine learning / AI model could use we will need to downsample and aggregate.


 

Streaming Analytic with OHLC


Downsampling simply takes raw data, which might be bursty (a lot happening at one) or experience periods of inactivity, and creates regular, uniformly spaced data that mathematical models are more accustomed to.  A very common form of downsampling is to aggregate to key values over a time window: Open, High, Low, Close.


Here is a brief explanation of OHLC, which is intended to show the formation of price and its momentum:


  • open, first price in the time window, where opinion about a financial instrument started

  • high, max price in the time window, as far as the bulls could lift it up

  • low, min price in the time window, as far as the bears could swipe it down

  • close, last price in the time window, where the price formation ended


We can calculate OHLC data using aggregation functions earliest, max, min and latest from a tumble window into the live datastream, using a 5 second time window:

SELECT
 window_start, product_id, 
 earliest(price) AS open, 
 max(price) AS high, 
 min(price) AS low,
 latest(price) AS close
FROM
 tumble(v_coinbase_tickers, time, 5s)
GROUP BY
 window_start, product_id


 

Visualize the live OHLC data  


Next, we can visualize the live stream of OHLC query results using the OHLC chart in Timeplus.

SELECT
 window_start AS time, product_id, 
 earliest(price) AS open, 
 max(price) AS high,
 min(price) AS low, 
 latest(price) AS close
FROM
tumble(v_coinbase_tickers, time, 5s)
WHERE
product_id = 'BTC-USD' and time > now() -1m
GROUP BY
window_start, product_id


Note, the OHLC chart will be only available to use if your query result contains time,open, high, low, close fields.



Now you can monitor this 5 second price change in real time.  We will show more sophisticated queries in the follow up blogs.


 

Integration with Python, Pandas


In order to leverage snapshots into the live data that Timeplus supports, you can easily connect to Timeplus Enterprise via the Python API. Here you can follow the examples from https://pypi.org/project/timeplus/


You will need to have the Timeplus Python package installed:

pip install timeplus

Next, you can run the below SQL in Timeplus Enterprise.  


Note: the SQL query below runs against streaming data, live data, aggregated in 1 second time intervals - calculating OHLC data. It’s set to a limit of 5 aggregations, but it will need a few seconds to let live data arrive to fill the time interval. 

import pandas as pd
from timeplus.dbapi import connect 

conn=connect(
  host='[PASTE YOUR HOSTNAME]',
  user='[PASTE YOUR USERNAME]'
  password='[PASTE YOUR PASSWORD]')

cursor = conn.execute("SELECT window_start, product_id,  earliest(price) AS open,  max(price) AS high,  min(price) AS low, latest(price) AS close FROM tumble(v_mgb_cb, time, 1s) GROUP BY window_start, product_id LIMIT 5")

rows = cursor.fetchmany(5)
df = pd.DataFrame(rows)
df['close']=pd.to_numeric(df['close'])
df.groupby('productid').agg({'close':['min','max','mean','std']})

An example result is:

>>> df.groupby('productid').agg({'close':['min','max','mean','std']})                                                                                               close                                  
                min       max          mean       std
productid                                            
BTC-USD    43930.06  43930.96  43930.730000  0.446692
ETH-USD     2254.68   2254.73   2254.713333  0.028868
LTC-USD       65.48     65.48     65.480000  0.000000
>>> 

 

Summary


In today’s blog, I have shown how we can ingest live market data via WebSocket, run streaming analytics using SQL, and monitor the price trend using a live OHLC chart with just a few clicks in the Timeplus Enterprise Web Console. Plus, you can use Timeplus as a streaming datastore for unified analytics against live and historical data - available for further quantitative analysis using Python.


If this was useful, please follow this series as we dive deeper into connectivity and market insights. 


If you are interested in more detail or a custom solution, Timeplus is the best data platform for you. 


 

Ready to try Timeplus Enterprise? Try for free for 30-days. 

Join our Timeplus Community! Connect with other users or get support in our Slack community.


bottom of page