In part one of this series we built a simple distributed counter, specifically a G-Counter, which has some hard limitations, one of them being that it can only be incremented and the other one is that we need to know beforehand how many agents are going to be affecting it. In this part we will extend the counter to solve both problems.
This is the second part of a series of posts on CRDTs and Distributed Consistency. If you have not read the first part I recommend you start there. You can find the first part here.
The GCounter
class we built in the previous post only allowed increments to be performed, now we want to allow increments and decrements. We will need to think a little bit outside the box to achieve this because our implementation depends on the Math.max
function to decide whether an update coming from another agent is newer than the state we currently have.
Let's see the problem with an example, we can assume we remove the increment only restriction by commenting out the code as follows:
class GCounter {
...
public increment(num: number): void {
// if (num < 0) {
// throw new Error('Only positive values');
// }
// Increment my local counter by num
this.state[this.id] += num;
}
...
}
const NUMBER_OF_CLIENTS = 2;
// All clients start
const client1 = new GCounter(0, NUMBER_OF_CLIENTS);
const client2 = new GCounter(1, NUMBER_OF_CLIENTS);
const clients = [client1, client2];
// Each client independently increases or decreases their counter
client1.increment(1);
client2.increment(3);
client2.increment(-2);
client2.increment(-2);
client1.increment(1);
client1.increment(1);
// Before sharing any knowledge they just know about their own count
console.log({
client1: client1.query(),
client2: client2.query(),
});
// { client1: 3, client2: -1 }
// At some point in time they synchronize and share their state
clients.forEach((client) => {
client1.merge(client);
client2.merge(client);
});
// After sharing the knowledge they agree on the count, but it will not be the right count!
console.log({
client1: client1.query(),
client2: client2.query(),
});
// Actual: { client1: 3, client2: 3 }
// Expected: { client1: 2, client2: 2 }
This happened because when merging the states [3, 0]
and [0, -1]
for the client1
and client2
respectively, the max between 0
and -1
is 0
and there is no way to know which value is newer. If we have the restriction of only allowing increments the rule for deciding how to solve a conflict is trivial because we know that whatever value is bigger will be newer.
In order to solve this we need to think on the independent operations of increasing and decreasing the count. How did each client reached their independent count state?
In the case of client1
it reached its state by doing three increments of 1
. For client2
the steps were one increment of 3
and two increments of -2
. If we re-write the steps for client2
we can say it reached the state by doing one increment of 3
and two decrements of 2
. Let's see this in a different way by placing each operation on different arrays:
// Before the re-write
client1 = {
increments: [1, 1, 1],
};
client2 = {
increments: [3, -2, -2],
};
// After the re-write
client1 = {
increments: [1, 1, 1],
decrements: [],
};
client2 = {
increments: [3],
decrements: [2, 2],
};
You might have noticed that now, after we changed the wording of the operations, both the increments and decrements are now positive numbers and they can be both represented as independent counts! And guess what? We already know how to represent positive counts, we do it with a G-Counter
.
So, in order to remove the increment only restriction we simply need to have two counts, one for the increments and one for the decrements and to get the final result we can just subtract them from each other. This is called a PN-Counter
(Positive/Negative-Counter) and the implementation is really simple because we already did most of the work with the G-Counter
:
import GCounter from "./GCounter";
export default class PNCounter {
private positive: GCounter; // Counts of increments
private negative: GCounter; // Counts of decrements
constructor(id: number, size: number) {
this.positive = new GCounter(id, size);
this.negative = new GCounter(id, size);
}
public query(): number {
// The result of the count is the difference of both counts
return this.positive.query() - this.negative.query();
}
public increment(num: number): void {
// Incrementing the positive count
this.positive.increment(num);
}
public decrement(num: number): void {
// Incrementing the negative count
this.negative.increment(num);
}
public merge(counter: PNCounter) {
// Merging is just done independently for each count
this.positive.merge(counter.positive);
this.negative.merge(counter.negative);
}
}
We can see it working with an example, we will use the same we used to derive the idea for the counter:
import PNCounter from "./PNCounter";
const NUMBER_OF_CLIENTS = 2;
// All clients start
const client1 = new PNCounter(0, NUMBER_OF_CLIENTS);
const client2 = new PNCounter(1, NUMBER_OF_CLIENTS);
const clients = [client1, client2];
// Each client independently increases or decreases their counter
client1.increment(1);
client2.increment(3);
client2.decrement(2);
client2.decrement(2);
client1.increment(1);
client1.increment(1);
// Before sharing any knowledge they just know about their own count
console.log({
client1: client1.query(),
client2: client2.query(),
});
// { client1: 3, client2: -1 }
// At some point in time they synchronize and share their state
clients.forEach((client) => {
client1.merge(client);
client2.merge(client);
});
// After sharing the knowledge they agree on the count
console.log({
client1: client1.query(),
client2: client2.query(),
});
// { client1: 2, client2: 2 }
The second restriction that our original counter had is that it would only work if we know how many agents can be affecting the counter beforehand, that is a big restriction for any distributed system that could automatically scale up or down without notice. Let's try to remove that restriction.
If we look at the implementation of the G-Counter
we will notice that we choose our data structure for the counts to be an array, that is a very efficient structure but it also is the one forcing us to use index-based IDs.
class GCounter {
private id: number;
public state: number[];
...
}
This is a problem, imagine you have two new agents "joining the counter" at the same time, if we want to keep them independent, each one of them would add one more place into the array and assign themselves the n+1
id and cause a collision, this breaks the counter.
It now becomes clear that we need to identify the agents with unique IDs and the IDs can't be index-based, we can use whatever we want but we also need to change the state
structure, in this case we will use a Map
to store the id-count pair.
class GCounterWithArbitraryClients {
private id: number | string; // The ID of this client
public state: Map<number | string, number>; // The last known count value for each client
// We don't need the size anymore
constructor(id: number | string, initialValue = 0) {
this.id = id;
this.state = new Map();
// We don't need to initialize all clients, just our own value
this.state.set(id, initialValue);
}
...
}
If we continue adapting the code we will end up with a very similar implementation, but that will instead of assuming we have always values for all clients it will use defaults for unknown clients, see the full code below with comments:
class GCounterWithArbitraryClients {
private id: number | string;
public state: Map<number | string, number>;
constructor(id: number | string, initialValue = 0) {
this.id = id;
this.state = new Map();
this.state.set(id, initialValue);
}
public query(): number {
// This is the same as before, but we go over the values of the Map
return [...this.state.values()].reduce((acc, value) => acc + value, 0);
}
public increment(num: number): void {
if (num < 0) {
throw new Error("Only positive values");
}
// This is the same as before, but we get, increment and set the new value
// using the Map primitives instead of the direct array assignment
this.state.set(this.id, this.state.get(this.id)! + num);
}
public merge(counter: GCounterWithArbitraryClients) {
// We create a set with all the ids known by the local agent plus
// the ids from the remote agent that we are merging
const ids = [...new Set([...this.state.keys(), ...counter.state.keys()])];
// Create tuples with local count and received count,
// but we default to zero so we can properly compare new clients
// zipped = [[local1, remote1], [local2, remote2], ...]
const zipped = ids.map((id) => [this.state.get(id) ?? 0, counter.state.get(id) ?? 0]);
// Create a new map with all the ids and the new counts
this.state = new Map(ids.map((id, index) => [id, Math.max(zipped[index][0], zipped[index][1])]));
}
}
We can test it with an example:
import GCounterWithArbitraryClients from "./GCounterWithArbitraryClients";
// All clients start (at the same time or not, we don't care anymore)
const client1 = new GCounterWithArbitraryClients("client1");
const client2 = new GCounterWithArbitraryClients("client2");
const clients = [client1, client2];
// Each client independently increases their counter
client1.increment(1);
client2.increment(3);
client2.increment(3);
client1.increment(1);
client1.increment(1);
// Before sharing any knowledge they just know about their own count
console.log({
client1: client1.query(),
client2: client2.query(),
});
// { client1: 3, client2: 6 }
// At some point in time they synchronize and share their state
clients.forEach((client) => {
client1.merge(client);
client2.merge(client);
});
// After sharing the knowledge they agree on the count
console.log({
client1: client1.query(),
client2: client2.query(),
});
// { client1: 9, client2: 9 }
And that's it! We just solved both problems, our counter now supports increments and decrements and also can accommodate any number of clients as they join the count. There are some improvements that can be done to the solution in terms of performance, but for the purposes of this post the solution is enough.
We started with a very simple distributed counter and we have evolved it to be a more powerful counter that supports increments, decrements and an arbitrary number of clients. One problem that you might have noticed here is that there is no way of cleaning up old or dead clients. That can be achieved but it imposes many challenges and more advanced concepts as "tombstones" or "remove sets". You can read more details on the Wikipedia page for CRDTs.
Following this same approach you can build CRDTs that are increasingly more complex but at the same time they provide features that can allow you to build truly distributed apps.
In the next part of the series I will build an example that uses a peer-to-peer approach to share the state between agents.
Thanks to Martín Feldman, Martín Fitipaldo and Nicolás Wolman from our team that helped me go through the second part of this post.