Introduction

In search of less latency, while going through the GDAX's API, back when Coinbase Pro was GDAX, I stumbled upon their Websocket API. With the API, you could keep a copy of the order book in memory ready for query instead of pinging the server for information. The library that I found that was fairly high level and easy to use was the Websocket API from Microsoft's cpprestsdk. It wraps websocketpp and gives it better usability. The library I used to parse and create JSON (JavaScript Object Notation) is rapidjson since I have experience with it.
Websocket is a communication protocol that provides two way communication over a TCP connection. In the case of Coinbase Pro's Websocket API you would first subscribe by, right after connecting to it, sending a subscription message containing exchange's currency pairs and their channels to subscribe to. After which, depending on which channel you subscribed to, you would then receive updates. For instance, if you subscribe to the heartbeat channel you will receive a message every second basically letting you know the connection is alive with some basic information. The ticker channel gives you an update every time there is a match between a maker and a taker and provides "real-time price updates". In the level2 channel subscription, you first get a message type snapshot with every price where there is a bid or an ask and the amount that is being bid or asked in that price. Subsequently, you will get updates every time there is a change to the book. With the full-channel you get an update every time there is any change to anything which could take quite a bit of bandwidth. Anyway, we will focus on using the level2 channel.

The Code

cpprest uses a lot of threading when receiving messages meaning that we will have to make a existing container more thread safe. For instance, with the std::vector container there can be more than one concurrent readers, but when it is being written to there can be no other concurrent writers or readers. Read this for more information. To achieve full thread safety we will need to use std::shared_mutex to ensure that when the vector is being written to there are no other concurrent writers or readers. To do this we need to lock a mutex when ever reading from the vector and to check if it is locked but not lock it when reading from it.
Let's create a class to abstract our use of cpprest, rapidjson, and the order book. Let's give our class the ability to look up either the best buy or sell price. Here is the the header giving the skeleton of the class we will create. Be advised that I use some C++17 stuff so some older compilers might not work.

// Websock.h
#ifndef WEBSOCK_H
#define WEBSOCK_H

#include <cpprest/ws_client.h>
#include <vector>
#include <string>
#include <shared_mutex>

class Websock
{
private:
  web::websockets::client::websocket_callback_client client;
  void message_handler(web::websockets::client::websocket_incoming_message msg);
  void send_message(std::string to_send);
  std::string subscribe(bool sub);
  std::vector<double> buy_prices;
  std::vector<double> sell_prices;
  std::shared_mutex buy_mut, sell_mut;
  std::vector<std::string> Channels;
  std::string Product_id;
  std::string Uri;
  bool is_connected;
public:
  double Best_Buy_Price();
  double Best_Sell_Price();
  double MidMarket_Price();
  
  void Connect();
  void Disconnect();
  Websock(std::vector<std::string> channels, std::string product_id, std::string uri);
  ~Websock();
};
#endif // WEBSOCK_H

I'll describe the functions in the class starting from the top.

message_handler

The message_handler function is going to be used to handle the information received from the API. So, it need to have the ability to parse JSON and, consequently, modify the vector containers accordingly.
Let's create the skeleton of the Websock.cpp file by including the header and some of the necessary libraries that are not already included in the header file.

#include "Websock.h"
#include <iostream>
#include <algorithm>
#include "rapidjson/document.h"
#include "rapidjson/writer.h"
#include "rapidjson/stringbuffer.h"
using namespace rapidjson;

According to the cpprest documentation on their callback client the set_message_handler function has a lambda function as an input but we are going to have that lambda function call a function, called message_handler, within the class for easier manipulation.

void Websock::message_handler(web::websockets::client::websocket_incoming_message msg)
{
  // TODO
}

To extract the string from the websocket_incoming_message

std::string input = msg.extract_string().get();

Now to parse the input

Document d;
d.Parse(input.c_str());

According to the Coinbase Pro API the level2 channel will receive a snapshot in this format (link)

{
    "type": "snapshot",
    "product_id": "BTC-EUR",
    "bids": [["6500.11", "0.45054140"]],
    "asks": [["6500.15", "0.57753524"]]
}

or for the updates in this format

{
    "type":  "l2update",
    "product_id": "BTC-EUR",
    "changes": [
        ["buy", "6500.09", "0.84702376"],
        ["sell", "6507.00", "1.88933140"],
        ["sell", "6505.54", "1.12386524"],
        ["sell", "6504.38", "0"]
    ]
}

To differentiate between those we will check the "type" field

std::string type = d["type"].GetString();
if (type == "snapshot")
{
  // TODO
}
else if (type == "l2update")
{
  // TODO
}

In the snapshot part of the if statement lets grab the bids and asks arrays.

const Value& bids = d["bids"];
const Value& asks = d["asks"];

I'm not showing the checks and assertions here but you can see them in the Source Code at my Github on the link below. Now, we need to lock access to the vectors as we write to them. This can be done with

std::scoped_lock<std::shared_mutex, std::shared_mutex> lock(buy_mut, sell_mut);

Lets reserve space on the vectors

buy_prices.reserve(bids.Size());
sell_prices.reserve(asks.Size());

Now we iterate through the arrays

for (SizeType i = 0; i < bids.Size(); i++)
{
  assert(bids[i].IsArray());
  buy_prices.push_back(std::stod(bids[i][0].GetString()));
}
for (SizeType i = 0; i < asks.Size(); i++)
{
  assert(asks[i].IsArray());
  sell_prices.push_back(std::stod(asks[i][0].GetString()));
}

For the updates part of the if statement we do the same with the changes array.

const Value& changes = d["changes"];
assert(changes.IsArray());

This time we will lock the mutexes while we iterate through the array, grab the price before checking whether it is a change in the buy or sell, and keep the side in a string.

for (SizeType i = 0; i < changes.Size(); i++)
{
  assert(changes[i].IsArray());
  double price = std::stod(changes[i][1].GetString());
  std::string side = changes[i][0].GetString();
  // TODO
}

In the for statement, check whether it is a buy or sell

if (side == "buy")
{
  std::unique_lock<std::shared_mutex> lock(buy_mut);
  // TODO
}
else
{
  std::unique_lock<std::shared_mutex> lock(sell_mut);
  // TODO
}

According to the API, if the amount returned is 0, then we erase the value from our container.

if (0 == std::stod(changes[i][2].GetString()))
  buy_prices.erase(std::remove(buy_prices.begin(), buy_prices.end(), price), buy_prices.end());
else
  buy_prices.push_back(price);

And for the sell prices

if (0 == std::stod(changes[i][2].GetString()))
  sell_prices.erase(std::remove(sell_prices.begin(), sell_prices.end(), price), sell_prices.end());
else
  sell_prices.push_back(price);

send_message

The send_message function will take in a string which will then be converted to a 'websocket_outgoing_message' type, and sent.

void Websock::send_message(std::string to_send)
{
  // TODO
}

Convert the std::string to websocket_outgoing_message using the 'set_utf8_message' function.

web::websockets::client::websocket_outgoing_message out_msg;
out_msg.set_utf8_message(to_send);

Then just send using the following

client.send(out_msg).wait();

subscribe

The subscribe function's purpose is to create a string containing the JSON required to subscribe and un-subscribe to the API.

{
    "type": "subscribe",
    "product_ids": [
        "ETH-USD",
        "ETH-EUR"
    ],
    "channels": [
        "level2",
        "heartbeat",
        {
            "name": "ticker",
            "product_ids": [
                "ETH-BTC",
                "ETH-USD"
            ]
        }
    ]
}

This is done with rapidjson.

std::string Websock::subscribe(bool sub)
{
  Document d;
  d.SetObject();
  rapidjson::Document::AllocatorType& allocator = d.GetAllocator();
  // TODO
}

If sub is true then create subscribe string and un-subscribe otherwise.

d.AddMember("type", (sub) ? "subscribe" : "unsubscribe", allocator);

We must create an array which contains the currency pairs to subscribe to

Value product_ids(kArrayType);
product_ids.PushBack(Value().SetString(StringRef(Product_id.c_str())), allocator);
d.AddMember("product_ids", product_ids, allocator);

We do the same with the channels

Value channels(kArrayType);
for (std::string& channel : Channels)
  channels.PushBack(Value().SetString(StringRef(channel.c_str())), allocator);
d.AddMember("channels", channels, allocator);

Finally, we now create the json string and return

StringBuffer strbuf;
Writer<StringBuffer> writer(strbuf);
d.Accept(writer);
return strbuf.GetString();

Best_Buy_Price, Best_Sell_Price & MidMarket_Price

As the name implies, in the Best_Buy_Price function we will calculate the buy price closest to the MidMarket price (link). So, we need to search the buy vector for it's largest element and this can be easily done the standard library. Also, when locking use the shared lock to allow for multiple reading threads.

double Websock::Best_Buy_Price()
{
  std::shared_lock<std::shared_mutex> lock(buy_mut);
  auto biggest = std::max_element(std::begin(buy_prices), std::end(buy_prices));
  return *biggest;
}

The opposite is true for the sell price.

double Websock::Best_Sell_Price()
{
  std::shared_lock<std::shared_mutex> lock(sell_mut);
  auto smallest = std::min_element(std::begin(sell_prices), std::end(sell_prices));
  return *smallest;
}

The MidMarket_Price function simply calls the two other functions and averages the results.

double Websock::MidMarket_Price()
{
  return (Best_Buy_Price() + Best_Sell_Price()) / 2;
}

Connect & Disconnect

For the Connect function we will set the message handler, connect, and send the subscribe message, but due to differences with the library in the Windows Operating system we must to some conversion if it is compiled in Windows.

void Websock::Connect()
{
  client.set_message_handler([this](web::websockets::client::websocket_incoming_message msg){message_handler(msg);});
  // TODO
}

In Windows, the library takes in a unicode string type which we must convert our string type to. To check if it is compiled in windows we will use preprocessor directives.

#ifdef _WIN32
  client.connect(web::uri(utility::conversions::to_string_t(Uri))).wait();
#else
  client.connect(Uri).wait();
#endif

Create and send the subscriptions message. Also, set the is_connected to true to indicate that it is connected.

send_message(subscribe(true));
is_connected = true;

For the Disconnect function we will create and send the unsubscribe message and close the client.

void Websock::Disconnect()
{
  send_message(subscribe(false));
  client.close().wait();
  is_connected = false;
}

Constructor & Destructor

In the constructor we will only set the input elements to the class variables, and, in the destructor, we will check if the client is connected and disconnect if it is connected.

Websock::Websock(std::vector<std::string> channels, std::string product_id, std::string uri)
{ Channels = channels; Product_id = product_id; Uri = uri; }
Websock::~Websock()
{
  if (is_connected)
    Disconnect();
}

Trying it out

Lets create a main function that will create the Websock class connect, wait a bit, ask for the buy, sell, and midmarket price, and disconnect. So, for the includes we need.

#include "Websock.h"
#include <iostream>
#include <chrono>
#include <thread>

And for the main function

int main()
{
  std::vector<std::string> channels = {"heartbeat", "level2"};
  std::string product_id = "BTC-USD";
  std::string uri = "wss://ws-feed.pro.coinbase.com";
  Websock sock(channels, product_id, uri);
  sock.Connect();
  std::this_thread::sleep_for(std::chrono::seconds(3));
  std::cout << "Buy: " << sock.Best_Buy_Price() << std::endl;
  std::this_thread::sleep_for(std::chrono::seconds(3));
  std::cout << "Sell: " << sock.Best_Sell_Price() << std::endl;
  std::this_thread::sleep_for(std::chrono::seconds(3));
  std::cout << "MidMarket: " << sock.MidMarket_Price() << std::endl;
  sock.Disconnect();
  return 0;
}

Here is the output

ubrwlf@ubrpc:~/cb_websock/build$ ./cb_websock
Buy: 8168.63
Sell: 8168.64
MidMarket: 8168.64

As the price of bitcoin moves so will these numbers.
Source Code