MySQL Read Replicas and Read-After-Write Semantics
Hunter Fernandes
Software Engineer
Eventually, your MySQL database will grow to the point where you need to add read replicas to handle the load. This is a good problem to have!
But it introduces a new problem: how do you route traffic to the replicas? Naively, for read-only queries, you could just round-robin between the replicas. But read replicas have replication lag, so you can’t guarantee up-to-date data. There is a chance that a read replica will return stale data.
This will introduce subtle bugs in your application when you move from only a primary to a primary with read replicas. Suddenly, your users might cause a write and immediately read the data back, only to find that the write hasn’t propagated to the replica yet. This can be very subtle and hard to debug. The bug will often manifest itself in production and under heavy load when the replication lag is at its worst.
Read-after-write Consistency
This violates, in my opinion, the most important property of any system: read-after-write consistency. When your users write data to your service, they expect to be able to read that data back immediately. The alternative is eventual consistency, where you might read stale data for a short period of time. Eventual consistency is much easier to scale because it requires less coordination.
Not all services require read-after-write consistency. But if you start with a single primary MySQL database, you implicitly promise this property to your users. When you add read replicas, you need to maintain this property.
If you are designing a new service, you can choose to relax this property, but you need to be explicit about it in your service’s documentation. For example, S3 did not provide read-after-write consistency for new objects at launch. Their documentation was clear about this limitation, and users could design their applications around it. Annoying but not surprising is fine. But surprising is not fine. It’s worth noting that in 2020, AWS introduced strong read-after-write consistency for new objects in S3.
Read-after-write consistency is a fundamental property of any system. It’s so crucial that some eventually-consistent database vendors even add special opt-in operations to provide it. For example, by default, DynamoDB is eventually consistent. Calling PutItem
followed by a plain GetItem
might not return the data you just wrote if the read routes to a storage partition that isn’t up to date. However, you can issue GetItem
with ConsistentRead=true
for read-after-write consistency. This costs twice as much as an eventually consistent read. It’s a lot more work to deliver read-after-write consistency in an eventually consistent system.
Applying Read-after-write Consistency to MySQL
Imagine that we are finally adding read replicas to our MySQL database. We have a primary that must handle all writes, and we have four read replicas that should handle all reads. Let’s give the replicas some replication lag to make this interesting.
- Primary: no lag
- Replica 1: 10ms lag
- Replica 2: 20ms lag
- Replica 3: 30ms lag
- Replica 4: 40ms lag
To preserve read-after-write consistency for our service, we need to think at a higher level than database queries. Users don’t care what’s going on under the hood of your service. They just want it to work. We are going to think at the API call level. There are two types of API calls that our service will receive:
- Reads
- Writes
The naive thought is that we can just route read requests to a read replica. Then, we can route write requests to the primary.
This has the benefit of keeping our code simple, too! At the beginning of each request, we check if it’s a read or a write (essentially GET or POST). Then, we can choose the correct database connection appropriately. No other code needs to change.
Naive Round-robin
Our first attempt at routing reads to the replicas might be to round-robin between them. When we get a read request, we’ll randomly choose a replica and send the query to it.
Unfortunately, this doesn’t work. We are susceptible to violating read-after-write consistency if our user writes to the primary and then reads from a replica that hasn’t caught up yet.
Getting Smart
We need to be smarter about routing reads to the replicas.
The key insight is to keep track of the last time a user has performed a write, and then route their subsequent reads to a replica no more out of date than the elapsed time. If the user makes a write in our example and then performs a read 25ms later, we must rule out Replicas 3 and 4 because they are >15ms behind the primary. Our fallback is to route the read to the primary if no replica is up to date enough.
Once you see this trick, everything else is implementation details and optimizations. It mostly just involves a lot of bookkeeping. Let’s implement it!
Determining Replica Lag
I use Amazon Aurora for MySQL, so my examples will be specific to that. Amazon exposes replication lag data in two places:
- The AWS RDS/Cloudwatch API.
- The
information_schema.replica_host_status
table.
AWS RDS publishes the lag of each replica in the cluster to AWS Cloudwatch Metrics in the AuroraReplicaLag
metric. We only get a 60-second resolution with this method, so that sucks. It’s a dealbreaker. Querying Cloudwatch is not suitable for our use case. It’s meant for monitoring and alerting, not for real-time routing decisions.
The other method is querying the information_schema.replica_host_status
table. Any database user can query this table. One caveat is that the table might be out of date on replicas (this is humorous — it’s the exact problem we’re trying to solve). The solution is to query the primary for the most up-to-date information. It’s a read against an information schema table, so it’s not a big deal.
mysql> select SERVER_ID, SESSION_ID, REPLICA_LAG_IN_MILLISECONDS AS RLAG
FROM information_schema.replica_host_status;
+----------------------------+--------------------------------------+------+
| SERVER_ID | SESSION_ID | RLAG |
+----------------------------+--------------------------------------+------+
| huntercluster1-primary | MASTER_SESSION_ID | 0 |
| huntercluster1-reader1 | 11111111-1111-1111-1111-111111111111 | 10 |
| huntercluster1-reader2 | 22222222-2222-2222-2222-222222222222 | 20 |
| huntercluster1-reader3 | 33333333-3333-3333-3333-333333333333 | 30 |
| huntercluster1-reader4 | 44444444-4444-4444-4444-444444444444 | 40 |
+----------------------------+--------------------------------------+------+
5 rows in set (0.01 sec)
This query shows us our cluster topology (the primary and the four read replicas) and replication lag.
The SERVER_ID
is the name of the database instance in the AWS console. We can convert this to a hostname by appending the cluster id and the region.
If our general cluster endpoint is huntercluster1.cluster-abcdefghi1234.us-west-2.rds.amazonaws.com
, then we can convert huntercluster1-reader1
to huntercluster1-reader1.abcdefghi1234.us-west-2.rds.amazonaws.com
. Resolving this hostname will give us the IP address of the read replica. These IP addresses are stable-ish and don’t change often. But that’s an optimization for later.
Let’s implement a cute little Python class that gives us the best connection given a lag constraint.
import os
import re
from datetime import timedelta
from typing import Optional
import pymysql
class ConnectionMaker:
_WRITER_SESS_ID = "MASTER_SESSION_ID"
_LAG_QUERY = """
SELECT SERVER_ID, SESSION_ID, REPLICA_LAG_IN_MILLISECONDS AS RLAG
FROM information_schema.replica_host_status;
"""
def __init__(self, cluster_endpoint: str, user: str, password: str, dbname: str):
self.cluster_endpoint, self.user, self.password, self.dbname = cluster_endpoint, user, password, dbname
self.cluster_suffix = re.search(r'\.cluster-(.+)$', cluster_endpoint).group(1)
self._connection_cache = {}
def writer(self) -> pymysql.Connection:
"""Get a connection to the writer"""
return self._conn(self.cluster_endpoint)
def reader(self, max_lag: Optional[timedelta] = None) -> pymysql.Connection:
"""Get a connection to a reader with no more than the given replication lag"""
writer = self.writer()
with writer.cursor() as cursor:
cursor.execute(self._LAG_QUERY)
replica_lags = cursor.fetchall()
server_id, session_id = self._pick_node(replica_lags, max_lag)
if session_id == self._WRITER_SESS_ID:
hostname = self.cluster_endpoint # reuse writer conn
else:
hostname = "{}.{}".format(server_id, self.cluster_suffix)
return self._conn(hostname)
def _pick_node(self, replica_lags, max_lag: Optional[timedelta]) -> Tuple[str, str]: # (server_id, session_id)
"""Given a list of replicas, use our magic algorithm to pick the best one"""
# Our magic algorithm is just MAX(rlag) AND rlag < max_lag
for replica in sorted(replica_lags, key=lambda x: -x['RLAG']):
# If max_lag is None, we don't care about the lag
# if RLAG is 0, then we are on the last available node (the writer) and have to take it
if max_lag is None or replica['RLAG'] < max_lag.total_seconds() * 1000 or replica['RLAG'] == 0:
return replica['SERVER_ID'], replica['SESSION_ID']
def _conn(self, hostname) -> pymysql.Connection:
if hostname not in self._connection_cache:
self._connection_cache[hostname] = pymysql.connect(
host=hostname, user=self.user, password=self.password,
db=self.dbname, cursorclass=pymysql.cursors.DictCursor,
autocommit=True,
)
return self._connection_cache[hostname]
We can use this class to connect to the correct replica (or primary!). There are a few things to note here:
I omitted error handling and connection disposal for brevity. You should add these to your production code.
We cache connections. Creating a new connection is expensive — typically between 10-40ms.
We reuse the writer for reads if we can’t find a replica that meets our lag constraint. The writer is the reader of last resort!
We reuse the writer connection instead of creating a new one since the cluster endpoint hostname differs from the node hostname. They resolve to the same IP address so we can reuse the connection and save an additional connection to the primary.
In practice, this optimization has a subtle bug in the case of a primary failover. During failover, a new node will take over the primary role. Our cached connection to the cluster endpoint will be stale and pointing to the old primary. Our existing “writer” connection is now connected to a read replica. If we write to the connection, we’ll get an error. Oops.
The fix for this is to keep track of the primary server ID and session ID and refresh the connection if they change. But I am going to omit this for brevity.
Let’s set up a little table to test this out.
CREATE TABLE widgets (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(255) NOT NULL UNIQUE
);
Let’s use our ConnectionMaker
incorrectly first to see why we need it. We will INSERT a row into the primary, wait 15ms, and then SELECT from a replica. 15ms is long enough for some replicas to be caught up, but not all of them. We fail to specify a lag constraint, so we will route to the oldest replica. It won’t have the data we just wrote.
def do_query(conn: pymysql.Connection, query: str) -> list[dict]:
with conn.cursor() as cursor:
cursor.execute(query)
result = cursor.fetchall()
print("Executed query:", query)
print("\tResult:", result)
return result
def sleep_ms(ms: int):
print("Sleeping for", ms, "ms")
time.sleep(ms / 1000)
return timedelta(milliseconds=ms)
def main():
connector = ConnectionMaker(
cluster_endpoint=os.environ.get('DB_HOST'),
user=os.environ.get('DB_USER'),
password=os.environ.get('DB_PASSWORD'),
dbname=os.environ.get('DB_NAME'),
)
# Fill caches
connector.writer()
connector.reader()
print("starting test")
do_query(connector.writer(), "INSERT INTO widgets (name) VALUES ('foo');")
sleep_ms(15)
do_query(connector.reader(), "SELECT * FROM widgets;")
if __name__ == '__main__':
main()
We get the following output:
starting test
Executed query: INSERT INTO widgets (name) VALUES ('foo');
Result: []
Sleeping for 15 ms
Chose: {'SERVER_ID': 'huntercluster1-reader4', 'SESSION_ID': '44444444-4444-4444-4444-444444444444', 'RLAG': 40.0}
Executed query: SELECT * FROM widgets;
Result: ()
No surprise there! We read from the 40ms-lagged replica and it did not have the data we had just written. We demonstrated that our service is not read-after-write consistent.
Let’s fix it by correctly setting the lag constraint to 15ms on our .reader(...)
call and try again.
def main():
...
print("starting test")
do_query(connector.writer(), "INSERT INTO widgets (name) VALUES ('foo');")
dur = sleep_ms(15)
do_query(connector.reader(dur), "SELECT * FROM widgets;")
This time, we get the following output:
starting test
Executed query: INSERT INTO widgets (name) VALUES ('foo');
Result: []
Sleeping for 15 ms
Chose: {'SERVER_ID': 'huntercluster1-reader1', 'SESSION_ID': '11111111-1111-1111-1111-111111111111', 'RLAG': 10.0}
Executed query: SELECT * FROM widgets;
Result: [{'id': 1, 'name': 'foo'}]
Since we specified a lag constraint of 15ms, we routed the read to the replica that was 10ms behind the primary. And we got the data we just wrote back!
Our first building block is ConnectionMaker
, a primitive to route reads to the correct replica given a lag constraint. We could consume this as a library and plug it into our service.
Tracking Last Write Time
So far we have implemented only half of the solution: we can route reads to a replica given a lag constraint. But where does the lag constraint come from?
As I mentioned earlier, we need to keep track of the last time a user has performed a write. Since we’re in a distributed system, we must store this information in a shared location. This information needs to be accessed fast, frequently, and it is ephemeral. I would reach for Redis.
Lastly, instead of tracking User IDs, I will track session IDs. This is because we will route reads based on the session, not the user. I assume that a user can have multiple sessions at a time. A session represents a single “thread” of requests from a user — perhaps from one device or login session. We are not looking to ensure disparate callers agree on the same data. We only want to make sure that a single caller sees a consistent view of the data.
Let’s implement a simple SessionTracker
class —
import os
from datetime import datetime, timedelta
from typing import Optional
from redis import StrictRedis
class SessionTracker:
def __init__(self):
self.redis = StrictRedis(host=os.environ["REDIS_HOST"], port=6379, db=0)
def mark_write(self, session_id: str):
self.redis.set(f"/writes/{session_id}", datetime.now().isoformat(), ex=300)
def last_write_time(self, session_id: str) -> Optional[datetime]:
if (dt_str := self.redis.get(f"/writes/{session_id}")) is None
return None
return datetime.fromisoformat(dt_str.decode())
def since_last_write(self, session_id: str) -> Optional[timedelta]:
if (last_write := self.last_write_time(session_id)) is None:
return None
return datetime.now() - last_write
I want to point out that we are using a Redis key with a TTL to store the last write time. This is because we don’t care if the data is lost after a while. We assume that the database replicas will eventually catch up within 300 seconds. This is overkill — if your replication lag is 5 minutes, you have bigger problems.
Using this is simple — when a user makes a write, we’ll call mark_write
with their session ID. When a user makes a read, we’ll call since_last_write
with their session ID to get the lag constraint.
Demo Flask App
Now we have both ConnectionMaker
and SessionTracker
— the two halves of our solution. Let’s put them together in a simple Flask app.
from flask import Flask, request
app = Flask(__name__)
connector = ConnectionMaker(
cluster_endpoint=os.environ.get('DB_HOST'),
user=os.environ.get('DB_USER'),
password=os.environ.get('DB_PASSWORD'),
dbname=os.environ.get('DB_NAME'),
)
session_tracker = SessionTracker()
@app.route("/widgets", methods=["POST"])
def create_widget():
session_id = request.headers["X-Session-ID"]
with connector.writer().cursor() as cursor:
cursor.execute(
"INSERT INTO widgets (name) VALUES (%s);",
(request.json["name"],),
)
session_tracker.mark_write(session_id)
return {"id": cursor.lastrowid}, 201
@app.route("/widgets/<int:widget_id>", methods=["GET"])
def get_widget(widget_id: int):
session_id = request.headers["X-Session-ID"]
max_lag = session_tracker.since_last_write(session_id)
with connector.reader(max_lag).cursor() as cursor:
cursor.execute("SELECT * FROM widgets WHERE id = %s LIMIT 1;", (widget_id,))
widget = cursor.fetchone()
if not widget:
return {"error": "Not Found"}, 404
return widget, 200
We have two routes:
POST /widgets
to create a widget. This is a write operation.GET /widgets/<id>
to get a widget by ID. This is a read operation.
Here, I get the session ID from the X-Session-ID
header. But usually, you would get this from a cookie or JWT token — something trusted.
Without this, we fear that a user would write a widget, immediately read it back, and get stale data. But with our SessionTracker
and ConnectionMaker
, we ensure that the read will route to a replica that is up to date.
Refinement
This is a simple solution that works well. But we can make some improvements to it for a production system.
Smarter Routing
The core part of our “smart” routing is in _pick_node()
. As written, the code is simple — pick the highest lag replica that is less than the max lag constraint. In practice, our naive algorithm sends an undue amount of traffic to the replica with the highest and lowest lag. The replicas in between are underutilized.
You can get very fancy with your implementation of _pick_node()
. The one we have in production considers CPU time, zonal topology, and other factors. Weighting CPU usage works very well. It allows us to smooth out the load on the replicas.
Factoring in zonal topology is another optimization. Cross-zone networking is both slower and more expensive than intra-zone networking. We prefer to route to a replica in the same zone as the primary when two replicas tie.
In the end, these enhancements are all heuristics. Use what works for you.
Acquiring Query Topology Off the Hot Path
We are querying the information_schema.replica_host_status
table for topology and replication latency on every API call. That is a very hot path. We’ll spend a lot of time talking to the primary, and the whole point of this exercise is to offload reads from the primary.
We can’t reliably acquire this information from the replicas because they might be out of date (again, the problem we’re trying to solve).
Our solution for this is to have a background process that queries the primary for this information and writes it to Redis. We have a large number of gunicorn processes handling API requests. We have only one process that polls topology and writes to Redis in a loop. Then, we query Redis instead of querying the primary during each API call.
This way, even as our API scales horizontally, we have only constant additional load on the primary. We shift the read load to the very cheap and very fast Redis. We have a similar replication topology for Redis, so we can read from a zone-local replica. Redis replication lag is consistently under 5ms.
We can optimize this further — instead of our API call making two Redis queries (one to get the topology and one to get the last write time), we can fetch them both in one request with a pipeline.
Here’s our final architecture with both of these processes. The hot call path and the topology update loop are separate, but share data through Redis.
Cluster Promotion
In my opinion, the most important feature of AWS Aurora is Blue/Green Deployments. It’s a killer feature, reducing the maintenance nightmare of a MySQL cluster upgrade into a non-event controlled through a few API calls.
Unfortunately, our solution is not blue/green aware. It will break when the green cluster becomes the blue cluster. AWS changes the SERVER_ID
and suffixes the new nodes with -green-xxx
. But these DNS names will not resolve! Oops, that’s an outage!
We can detect and account for this in our code through more bookkeeping. In fact, we can turn this initial disadvantage into an advantage.
Normally, during a blue/green deployment switchover, we would have to wait for the DNS cluster endpoint to update and then for our connection caches to clear. This takes about 60 seconds due to the DNS TTL. On top of that, our old connections to readers are invalid but our code doesn’t know it. The old read replicas still serve read-only traffic when the blue cluster becomes unused. Our application doesn’t know that it’s connected to a node that will never have new data. Oops.
We can proactively detect this situation by noticing that all nodes in the cluster have gone read-only. This can prompt our topology update loop to skip the DNS cache, query the authoritative DNS server, and fetch the new topology from the new primary. The code in our API processes can detect new hostnames (and lack of existing hostnames) in the topology and invalidate the connection cache.
We have turned a catastrophic flaw into a feature, reducing downtime during a blue/green deployment switchover to single-digit seconds.
Conclusion
Tracking the last write time for a user and routing reads based on that information is a simple way to maintain read-after-write consistency in a MySQL cluster with read replicas. The nicest part is that you can bundle up all the “smarts” into a single class that you can plug into your service.
This method has served us well in production with only a few minor architectural tweaks. I hope this can help guide you in your own MySQL scaling journey!