Preface
The idea here is to build a working architecture from first principles. i'm still learning on how to build applications without the obvious bottlenecks, and this is an attempt at that. there’s benefits and tradeoffs to what I’ve built, and I’ve tried discussing them out below.
Let’s first lay out ideas on what in the app we want to implement. there’s 3 parts of a dating app system I’ve recognized I want to build.
Feed generation at scale: when the user opens the app, post sign up/even generally, the user sees a feed of people who they can swipe left and right on.
Handling large amounts of swipe data: since swiping is such a simple operation, the amount of swipe data to handle is very large.
In house chat system: post receiving a match with another user, the users should be able to send messages to each other like any other real time chat system.
I’ll start with the fundamentals: data storage.
We typically have 4 types of data to store here:
user data
swipe data
matches data
chat messages
It’s important to look at 2,4 exclusively, since they here could determine what storage engine we use, simply because of the amount of data we’d store for swipes, and chat messages. swipes are a very simple operation, and chat messages can in theory be in thousands per match, and with a 100,000 matches across the lifecycle of the platform, we’d be storing a 100 million messages.
this clearly indicates towards having a write heavy database, and the first choice that comes to mind, is a NoSQL DB like cassandraDB. cassandra is a NoSQL columnar database that is optimized for heavy writes.
Unlike SQL databases, which use tables and relationships between them (like JOINs), Cassandra uses a more flexible data model. It organizes data into denormalized tables, which refers to duplicating data so that queries can be executed quickly.
Cassandra is optimized for very high writes due to how it appends writes to a commit log before writing them to a Memtable (an in-memory structure) before to the disk. This prevents any unnecessary disk I/O’s during writes.
Here’s a basic data model we could start with:
CREATE TABLE users (
id UUID PRIMARY KEY,
first_name TEXT,
last_name TEXT,
bio TEXT,
latitude DECIMAL,
longitude DECIMAL,
updated_at TIMESTAMP
);
CREATE TABLE user_matches (
user_id UUID,
match_id UUID,
other_user_id UUID,
first_name TEXT,
last_name TEXT,
bio TEXT,
latitude DECIMAL,
longitude DECIMAL,
updated_at TIMESTAMP,
PRIMARY KEY (user_id, match_id)
);
CREATE TABLE user_swipes (
user_id UUID,
swiped_on_user_id UUID,
swipe_id UUID,
action TEXT, -- for example 'left' or 'right'
timestamp TIMESTAMP,
PRIMARY KEY (user_id, swipe_id)
);
(In my codebase, I previously used SQL to start with for simplicity, until I realized cassandraDB is potentially the better choice, hence the implementation in my code is currently in SQL, but would ideally change to cassandraDB, but all constructs I’ve used below for this system work with either database.)
Feed Generation
Now that we’ve picked our data store with a basic data model, let’s understand how we would generate our feed for a user. in theory, a very basic implementation would be an API request to get a user’s feed from cassandra based on geo location data. there’s a couple of issues with this:
latency: getting each users feed, and only depending on cassandra for those reads is highly unoptimal, since we’re using cassandraDB optimized for writes.
geo location based queries: cassandraDB does not have in house support for geolocation data, which means that we would have to rely on third party plugins to help achieve the same.
therefore, this calls for an optimization in our system which would allow for optimized read queries to get a users feed, and also something that allows geospatial queries.
this is where we introduce elasticsearch.
Elasticsearch
is a distributed storage engine optimized to run reads, and complex queries, including geospatial queries. in this system, we can store user data to be able to run geospatial queries effectively for the users feed.
but we already have a storage engine in cassandra DB, so how do we sync data between the two without too many data inconsistencies?
Change data capture (CDC)
Change data capture (cdc) is the ability to capture changes in a data store, like new inserts, updates, deletions, etc and use these captured changes (which can be represented in JSON), and make changes to your second storage engine, which in our case is elastic search.
easily put, our system would make use of cdc to sync user data related changes from cassandraDB to elastic search.
the working of cdc:
we would be using debezium, an open source implementation of cdc to capture data changes in cassandra, and publish them to a kafka topic. we would have a consumer listening to this kafka topic, which would then receive the change captured, and convert this recieved data into a JSON blob which can be ingested into elastic search. this way, we avoid any unoptimal practices like polling the database to sync changes with elastic search.
This is how the code looks for our kafka topic listener, followed by ingestion into elastic search:
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Received Message: %s\n", string(msg.Value))
var kafkaMessage map[string]interface{}
if err := json.Unmarshal(msg.Value, &kafkaMessage); err != nil {
log.Fatalf("Error unmarshaling message: %v\n", err)
}
payload, ok := kafkaMessage["payload"].(map[string]interface{})
if !ok {
log.Fatalf("Invalid payload format")
}
after, ok := payload["after"].(map[string]interface{})
if !ok {
log.Println("No 'after' field in payload; skipping")
continue
}
esData, err := cdc.TransformCreateOperationForES(after) //code for this in the next code block below
if err != nil {
log.Fatalf("Error transforming data: %v\n", err)
}
//ES Ingestion
err = bi.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
Body: bytes.NewReader(esData),
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
},
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
if err != nil {
fmt.Printf("Error adding document: %v\n", err)
} else {
fmt.Printf("Elasticsearch error: %s\n", res.Error.Reason)
}
},
Index: index,
})
if err != nil {
panic(err)
}
} else {
fmt.Printf("Error: %v\n", err)
}
}
func TransformCreateOperationForES(record map[string]interface{}) ([]byte, error) {
v, err := json.Marshal(record)
if err != nil {
return nil, err
}
lat := gjson.GetBytes(v, "latitude")
long := gjson.GetBytes(v, "longitude")
decodedLatitude, err := base64.StdEncoding.DecodeString(lat.String())
if err != nil {
return nil, err
}
decodedLongitude, err := base64.StdEncoding.DecodeString(long.String())
if err != nil {
return nil, err
}
fmt.Println(decodedLatitude)
fmt.Println(decodedLongitude)
rand.Seed(time.Now().UnixNano())
latitude := rand.Float64()*180 - 90
longitude := rand.Float64()*360 - 180
esData := map[string]interface{}{
"id": record["id"],
"first_name": record["first_name"],
"last_name": record["last_name"],
"bio": record["bio"],
"location_user": map[string]interface{}{
"lat": latitude, //randomized longtitude and latitude values for ease
"lon": longitude,//randomized longtitude and latitude values for ease
},
"updated_at": record["updated_at"],
}
data, err := json.Marshal(esData)
if err != nil {
return nil, err
}
return data, nil
}
so now, we know that we have user data firstly that goes to cassandra, and through cdc reaches elastic search, which would help us run geospatial queries to get users close to the user who wants to see a feed.
that’s where we’re hit with a new problem. let’s say I queried the users feed, and received a bunch of users to show on their feed based on geolocation data (and the assumption that somewhere within this system our recommender system would also run), how do we guarantee that across all the requests to see a feed, a user wouldn’t see a repeat user? where do we store the users who’ve been seen on the feed for a specific user? a naive implementation could be storing it somewhere in cassandra db but then would we query the db for every user recommended before the getFeed() request is resolved for the user? that’s heavily unoptimal. and for a long time user, the number of users throughout the lifecycle of the app would be insanely large, so even storing this in an in memory store doesn’t make all that much sense. this is where the concept of a bloom filter comes in.
Bloom Filters
they are a probabilistic data structure that helps determine if a certain record has been seen before, with false positives but no false negatives.
Deep dive into bloom filters:
What’s following this is me thinking out loud how I understood bloom filters. the problem we’re trying to solve is figuring out how we can avoid repeat users coming up in a feed.
Let’s say you wanted to store all the users who have been seen in a feed for every user. Simply put it could be a Map where the key is the users ID and the value is an array of IDs of users who have been seen. in such a system, once we have fetched users from elastic search based on distance, we could run a for loop to check if each users id that has been fetched is present in the users value’s array in the map.
This would be unoptimal, because the time complexity of such a solution would be O(m n) where m is the number of users recommended, and n is the number of elements that exist in the value array of the user in the map, where n could be a tremendously large value. How could we solve this? An optimization on this could be storing the seen user ids in the map in a tree instead of an array, which would make the search operation to check if an id has been seen a log n operation, which is better, since now the time complexity is O(m log n). but what if I want something even better, a complexity that very much nears O(m). that’s where a bloom filter comes in.
a bloom filter doesn’t actually store the data, it stores the existence of that data. to elaborate, how a bloom filter works is that it’s essentially an array of bits (with the length of the array being fixed and not depending on how much data there is), and each time you want to mark a new user as seen in a feed for let’s say a person named aneesh, you pass the users id into a hash function and mod it by the fixed length of the array which would give you a random index value in the bit array, and in Aneesh’s bit array, you change that bit to 1 from 0. the next time the same user is recommended, we just like any other user record would pass it through the hash function and mod it by the length of the array, which would give us the same index value as before, and because it’s 1, we drop that user record and don’t send it back for the users feed.
This creates a problem, since the size of the array is predetermined, we could have 2 user ids from the hash function modded by the length given out the same index value. in that case, let’s say I have a user id 1 which when hashed and modded gave me index 2, I would mark that bit as 1, and send the user id 1 to the users feed. I then much later have a user id 2 when hashed and modded also gave me index 2, which is marked as 1, but user id 2 was never shown to the users feed. This is called a false positive, where a user that wasn’t shown is being seen as shown by the bloom filter, and that’s a trade off we make.
Because we strictly want a system that doesn’t show repeat users, we’re okay with not showing users at times, since the assumption is that the former would give UX that is worse.
Bloom Filter Code - (Implemented myself, can also use Redis)
Redis also has an implementation for bloom filters, but to understand this better, I ended up implementing my own in memory version.
type BloomFilter struct {
filter []byte
size int
}
type userToFilterMap struct {
m map[string]*BloomFilter
}
type BloomFilterPerUser struct {
bfMap *userToFilterMap
}
var globalBloomFilter BloomFilterPerUser
func InitializeGlobalBloomFilter() (*BloomFilterPerUser, error) {
bfMap := &userToFilterMap{
m: make(map[string]*BloomFilter),
}
bf := &BloomFilterPerUser{
bfMap: bfMap,
}
globalBloomFilter = *bf
return bf, nil
}
func NewBloomFilterForUser(size int, userID string) (*BloomFilterPerUser, error) {
if _, exists := globalBloomFilter.bfMap.m[userID]; exists {
return nil, nil
}
globalBloomFilter.bfMap.m[userID] = &BloomFilter{
filter: make([]byte, size),
size: size,
}
return &globalBloomFilter, nil
}
func hashValueAndModBySize(key string, size int) int {
hasher := murmur3.New32()
_, _ = hasher.Write([]byte(key))
hash := hasher.Sum32()
return int(hash) % size
}
func (bfpu *BloomFilterPerUser) AddToBloomFilterForUser(key string, userID string) error {
bf, exists := bfpu.bfMap.m[userID]
if !exists {
return fmt.Errorf("BloomFilter not found for user: %s", userID)
}
idx := hashValueAndModBySize(key, bf.size)
byteIdx := idx / 8
bitIdx := idx % 8
bf.filter[byteIdx] |= 1 << bitIdx
return nil
}
func (bfpu *BloomFilterPerUser) MembershipCheck(key string, userID string) (bool, error) {
bf, exists := bfpu.bfMap.m[userID]
if !exists {
_, err := NewBloomFilterForUser(1024, userID)
if err != nil {
return false, fmt.Errorf("failed to report false positive: %w", err)
}
bf, exists = bfpu.bfMap.m[userID]
if !exists {
return false, fmt.Errorf("error creating bloom filter for user: %w", err)
}
}
idx := hashValueAndModBySize(key, bf.size)
byteIdx := idx / 8
bitIdx := idx % 8
if bf.filter[byteIdx]&(1<<bitIdx) != 0 {
return true, nil
}
err := bfpu.AddToBloomFilterForUser(key, userID)
if err != nil {
return false, nil
}
return false, nil
}
So currently this is where our system stands:
We add user data into cassandra db normally via queries
CDC with kafka enables data to move into elasticsearch
user feed request:
Elasticsearch is what we query for user data on based on geospatial queries
Post fetching data from elasticsearch, we send this data through a bloom filter to check if the user has already been seen in a previous feed, which gives us false positives but no false negatives.
Caching feed data - a further optimisation
We currently, for every feed request would query elasticsearch, pass each record through a bloom filter, and then return the feed to the user. we can avoid this flow each time by using a cache - redis. redis is an in-memory distributed data store which allows in-memory retrievals of data, which is commonly used as a cache due to its low latency, and ability to scale horizontally.
in our system, once data is sent through the bloom filter and we have all filtered records, i setup a system where we would send 50% of records to the user, and 50% of the records into cache so that the next time the user requests a fresh feed, we can just retrieve it from cache, and on each cache retrieval, we can asynchronously update the cache again by going through the elasticsearch → bloom filter flow. and in case the user requests for a feed in the middle of this process of asynchronous updation, we can just have them query elasticsearch, but a lot of our cases would just be satisfied by redis itself, and since elasticsearch is read optimized, it wouldn’t be the end of the world to query it for a request.
Demo / Code - Final User Feed System
Initial API Request to fetch feed:
var requestBody FRequestBody
err := json.NewDecoder(r.Body).Decode(&requestBody)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
feedKey := fmt.Sprintf("%s-%s:feed", requestBody.FirstName, requestBody.LastName)
val, err := a.cache.R.Get(feedKey).Result()
if err != nil {
if err == redis.Nil {
hits, err := a.es.RetrieveUserFilteredData("users",
requestBody.Latitude,
requestBody.Longitude,
requestBody.DesiredDistance)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
var filteredResults []es.User
for _, hit := range hits {
isMember, err := a.bfpu.MembershipCheck(fmt.Sprintf("%s-%s", hit.Source.FirstName, hit.Source.LastName), fmt.Sprintf("%s-%s", requestBody.FirstName, requestBody.LastName))
if err != nil {
http.Error(w, "Error with membership checks in bloom filter", http.StatusInternalServerError)
return
}
if !isMember && (fmt.Sprintf("%s-%s", hit.Source.FirstName, hit.Source.LastName) != fmt.Sprintf("%s-%s", requestBody.FirstName, requestBody.LastName)) {
filteredResults = append(filteredResults, hit.Source)
}
}
// Split results: 60% for cache, 40% for immediate return
splitIndex := int(float64(len(filteredResults)) * 0.6)
cacheResults := filteredResults[:splitIndex]
immediateResults := filteredResults[splitIndex:]
// Cache 60% of the results
cacheData, err := json.Marshal(cacheResults)
if err != nil {
http.Error(w, "Error marshaling data for cache", http.StatusInternalServerError)
return
}
err = a.cache.R.Set(feedKey, cacheData, time.Hour).Err()
if err != nil {
log.Printf("error setting cache: %v", err)
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
response, err := json.Marshal(immediateResults)
if err != nil {
http.Error(w, "Error processing response", http.StatusInternalServerError)
return
}
w.Write(response)
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Cache hit - parse and return cached data (60% of original results)
var retrievedData []map[string]interface{}
err = json.Unmarshal([]byte(val), &retrievedData)
if err != nil {
log.Printf("Error deserializing cached data: %v", err)
http.Error(w, "Error processing cached data", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
response, err := json.Marshal(retrievedData)
if err != nil {
http.Error(w, "Error processing response", http.StatusInternalServerError)
return
}
_, err = a.cache.R.Del(feedKey).Result()
if err != nil {
log.Printf("error deleting cache: %v", err)
}
w.Write(response)
Initial Request:
Total Users retrieved: 2
Cached: 1
Returned: 1
Data fetched from Elastic Search since cache initially empty
Total Request Time: 689 ms
Cache Before this request:
Cache after this request:
Next Feed Request:
Request resolved from cache
Total Request Time: 4ms (because retrieved from in-memory redis cache)
Cache Post this Request:
Now, our cache has been exhausted, all possible users in the geolocation distance retrieved from elasticsearch have been exhausted, and stored in our bloom filter.
Next Feed Request:
This time, we retrieve no results despite pinging elasticsearch because all users within our geolocation distance had been retrieved atleast once to our feed, stored in our bloom filter we created in memory.
How Big Tech possibly handles Geolocation based queries
This is mostly based off some of the things I’ve read about one of the approaches for how bigger companies handle geospatial queries.
For this, we would think of the world map as a bunch of boxes, and we have servers that lie in each of these boxes. Some boxes are bigger than the other, and how big these boxes are would depend on factors such as active user count in the area, the required precision in location based queries, and more. For example, an area like San Francisco would require smaller boxes for greater precision, and an area like Oregon would have a bigger box since the active user count there would be much lesser, for instance.
These boxes, and their sizes are determined using the S2 Geometry Library, which in turn uses RegionCoverer to approximate these boxes on the world 2d map.
Since each box consists of a server/suite of servers depending on traffic, a request from a location within a particular box is in parallel sent to all of these servers within this box, and users are then recommended, and possibly, and post this recommendation, we could use a centralized bloom filter like the one we have to filter out users that shouldn’t be shown in the feed.
Handling Swipes
Swipe is a very simple operation, either left or right. its simplicity is what makes it a really consistent operation performed by every user on the app, creating loads and loads of data, requiring appropriate handling to trigger notifications matches, and handling consistency issues like users swiping at the same timestamp.
Naive CassandraDB based Approach
One approach could be that for every swipe, we query the database to store swipe data and check if the user we swiped on has swiped right on us, and if we did too, send a notification for match. this can get tricky, since we also need to maintain consistency for swipes, i.e we could have 2 users swiping right on each other same time, and if consistency is not maintained, we lose out on notifying the user on a match, which is not acceptable within the scope of what we’re building. And also, since swipe is such a simple operation, in theory we could be having 100,000 requests per second on a database for people swiping at the same time, which would be very unoptimal.
Redis with Lua Scripts for Atomicity & Consistency
A potential solution to this is using an in-memory data store like redis to handle swipes. Since swipes are such a simple operation, we need to solve for consistency and latency. Redis being in-memory and single threaded helps us with both.
In terms of implementation, we would use lua scripts to update the swipe direction of user A to user B (key in redis: swipe_userA:userB), where the names userA and userB are sorted in asc order to make sure regardless of who swiped (user A or B) on whoever out of the two, the key remains the same, and just the values change. We within the same transaction update the swipe for the user who swiped, and check for the status of the swipe of the other user to this user.
Lua scripts, being the enabler for this, and the fact that this is on redis which is single threaded would allow the atomicity of swipe data updation, meaning that even if user B is present on another server and tried to update the single shared key for swipe data both ways (A to B, and B to A), the single threaded nature would ensure consistency in checks.
Code to handle swipes with Redis
key := getKey(requestBody.UserId1, requestBody.UserId2)
luaScript := `
redis.call('HSET', KEYS[1], ARGV[1], ARGV[2])
return redis.call('HGET', KEYS[1], ARGV[3])
`
swipeField := getSwipeField(requestBody.UserId1)
otherField := getSwipeField(requestBody.UserId2)
result, err := a.cache.R.Eval(luaScript, []string{key}, swipeField, requestBody.SwipeDirection, otherField).Result()
if err != nil {
log.Printf("error executing redis lua script: %v", err)
http.Error(w, "internal server error", http.StatusInternalServerError)
return
}
if requestBody.SwipeDirection == "right" && result == "right" {
err := a.createMatchHandler(requestBody.UserId1, requestBody.UserId2)
if err != nil {
log.Printf("error creating match: %v", err)
http.Error(w, "internal server error", http.StatusInternalServerError)
return
}
}
Building an in-house chat system
The requirement is simple, building an in-house chat system that can scale to 100,000 users parallely chatting on the platform.
The Protocol
the fundamental concept behind real-time communication is websockets. websockets, an upgrade from the HTTP protocol is a real-time communication protocol that allows real-time communication between client and server, with the communication being bidirectional.
all reliable chat systems we use today (iMessage, Whatsapp, etc.) use websockets for real-time communication. but using websockets at scale is a challenge to solve for.
The Naive Architecture
Let’s start with a naive implementation of a chat application.
Let’s say we have a had a server that accepts websocket connections. Each websocket connection from a user is associated with a unique websocket object, which is used to send messages to the user back. Since websocket connections are persistent in nature, I need to store these websocket objects in memory for each user so that I could retrieve them for whenever the server needs to send a message to a specific user. If we used a single server, we could maybe store about 10,000 connections which could be used to send messages to 10,000 different users, but a single server would not scale beyond that. Instinctively, we would want to add another server that accepts websocket connections, but there is a problem to solve with it.
If a user X matched with user Y and were to be texting each other, let’s say user X would create a websocket connection on server 1, and user Y would create a websocket connection on server 2, with each of their websocket objects stored on their respective servers. Let’s say X wanted to send a message to Y, X would send a request to the backend server 1 to send the message, but the server 1 does not hold the websocket object for Y, since that is present on server 2, and we need that websocket object to send the message to Y. How does that communication take place then?
The desired implementation - Pub Sub (Redis / Google Cloud)
The idea with a pub sub (publisher subscriber model) is that, everytime we have a match, both users send out a joinMessageRoom() request, and each of their websocket objects is stored in memory in a data structure. This server, where the websocket object for this user gets stored also takes in a common messageRoomId for the room shared by the users who’ve matched, and this server subscribes to listen to messages for that room using the Pub Sub service.
Whenever a message is sent by user X or Y, the message is published to this room ID, and all servers who are listening would receive this message, and would iterate over their in memory data structures that would store the user objects for each user present in the room, and would send this message to them using their websocket objects. This is highly scalable, since now, the load that existed majorly on the backend servers has been removed. To scale this even further, we can shard the pub sub service itself, to reduce the amount of load on each pub sub instance, and in a case where there are too many websocket connections, we could just increase the number of backend servers that handle these connections.
Stage 1 Diagram:
Stage 2 Diagram:
Tradeoffs / Considerations to make:
No system is perfect, and this one definitely isn’t (for several reasons I may not even understand, yet), but here are a few things to consider:
False Positivity Rate in Bloom Filters:
Since our bloom filter sizes are predetermined, there will be times where our false positivity rate will cross a threshold which might hurt our system more than it might benefit, since the whole point of bloom filters is to avoid showing repeat users. To tackle this, we can asynchronously update bloom filter sizes for each user if the false positivity rate crosses 85%, for instance. This can be done by:
For every user that is deemed to be already seen, we add that to a kafka topic with the user id, and asynchronously check with CassandraDB if that user has actually been seen in the feed. If they haven’t, then we update the false positivity rate of the user’s bloom filter, and if that were to cross 85% at any point, we resize (increase) the bloom filter.
Dead Letter Queues (DLQs):
We rely very heavily on queue systems for our system to be consistent, whether it's for resizing bloom filters to reduce false positives, or for CDC (Change Data Capture). If our Kafka cluster were to go down, a lot of our system would become dysfunctional. And for bugs within the queue system, for instance, messages that fail repeatedly due to unprocessable data, schema mismatches, or transient issues, DLQs act as a safeguard.
Recommender System:
Our current system did not consider what complexities adding a recommender system for users would bring, and that’s partly down to me wanting to focus on building the core backend engineering parts out, but that’s definitely something to keep in the back of the mind.
Cache TTL:
In the current system, we can technically have a user who requests a feed on Day X, and our system would return them a feed, and store 40% of the returned feed from elasticsearch into cache. In theory, the user could come back on day Y (where Y - X > 100 days), and their cache would still exist, but the users’ data in the cache may be outdated (location changes, bio changes, etc). This creates inconsistency, and can be solved using TTL on these caches, and cache resyncs for user after a certain period of time.