My Profile Photo

Welcome


This is the little corner of Dinesh, where you can find his thoughts, work and anything else he wants to share.


Streaming json for fun and profit

Kids… This is the story of how we processed large json files that did NOT fit in our laptop’s RAM.


Recently, we came across a client, who needed to run various queries on information burried in large json files (20+ GB per file). The structure of the json is somewhat similar to: in-network-rates. Each of these json files are accessible via. an HTTP server for download.

These json files are so large that they did not fit in my laptop’s ram. On that particular day, I did not even have enough hard drive space to download all that data. SSDs!

There are 3 basic types of queries the client is interested in:

  • Is the data valid? Is the json itself valid? Are all the employer ids valid?
  • Summarize the data. Eg. Find the highest negotiated_rate in a provider_group.
  • Given a criteria, Fetch more details of the data. Eg. Fetch the information specific to a particular employer using the ein value

Upon closer inspection, the json’s schema seems to be like:

{
  "key": "value",
  "key2": "value",
  "big_array": [
    "item1",
    "item2",
    "item3",
    "item4"
     ....
  ]
}

Most of the size of the json file comes from the big_array. It has a LOT of items. Each individual item is just a few megabytes though.

If we do this properly, we don’t need to parse the whole json. Just the items the current query needs:

  • To check if the json is valid or not, we don’t need to parse the entire json. We just need to make sure each of the json token is what we expect it to be. i.e a value always comes after a key, if an array starts, it must end correctly - you get the idea. The available json tokenizers are good enough for this.

  • To summarize the data, we have to parse a subset of the json. We wanted an API that let us retrieve a value from the json path that we are interested in (in this case: /big_array/item/).

  • To fetch more details of the data, we want to index the json such that - given an item, we need to get the byte range the data in the original json file that has it. Then we can just send http range requests to the server to send us only the part that we are interested in.

In a nut shell, the main queries boil down to the task: Given a path in json, fetch the byte range and the item corresponding to that path.

There are a lot of json tokenizers out there already, but none of them seem to provide us with the byte range of the tokens. I quickly patched rapidjson to give us the offset of each token in the input stream. You can find the patch here

With that in place, We now need an API to incrementally process a stream of json data. I was able to quickly whip up a such a thing on top of my patched rapidjson:

#include <fstream>
#include <iostream>
#include "streamer.h"
#include "utils.h"

int main(int argc, char* argv[]) {
  if (argc != 2) {
    std::cerr << "Usage: " << argv[0] << " /path/to/item/ < /path/to/json\n";
    std::cerr << "Note: Use empty path components to represent array elements\n";
    std::cerr << R"(eg. item//key/ on  { "item": [ {"key": "value1"}, {"key": "value2"} ] } fetches value1 and value2)" << std::endl;
    return -1;
  }

  rapidjson::IStreamWrapper isw(std::cin);
  JsonStreamer stream(
    [](rapidjson::SizeType offset,
        rapidjson::SizeType length,
        const std::string& key,
        const RapidJsonValue& value) -> bool {

      std::cout << "Offset: " << offset << std::endl;
      std::cout << "Length: " << length << std::endl;
      std::cout << value << std::endl;

      // Keep processing the json
      return true;
    },
    Utils::split(argv[1], '/'));

  stream.Consume(isw);
  return 0;
}

The callback based API isn’t my first choice for this task, but given the rapidjson API this was the simplest API I could come up with that day. This let us create little command line tools that let us use our measly laptops to process the data:

  • Summarize streaming json:
$ curl https://path/to/big_json.json | ./json-summarizer
# 4 hours later...
# nice little summary of the data we are interested in
  • Index streaming json and use http range requests for fetching only the parts we are interested in:
$ curl https://path/to/big_json.json | ./json-indexer > index.db
$ ./json-fetcher --db index.db --url http://path/to/big_json.json "1111111111"
{
  "provider_groups": [
    {
      "npi": [1111111111, 2222222222, 3333333333, 4444444444, 5555555555],
      "tin": {
        "type": "ein",
        "value": "11-1111111"
      }
    },
    {
      "npi": [1111111111, 2222222222, 3333333333, 4444444444, 5555555555],
      "tin": {
        "type": "ein",
        "value": "22-2222222"
      }
    }
  ],
  "negotiated_prices": [
    {
      "negotiated_type": "negotiated",
      "negotiated_rate": 20000,
      "expiration_date": "2022-01-01",
      "service_code": ["05", "06", "07"],
      "billing_class": "professional"
    }
  ]
}

You can find the code for some of these experiments here: json-buffet Todo: Use compression + streaming json to save up on cloud storage costs