Mar 30, 2011

Slouching towards Multi-Master Conflict Resolution

This title is not nearly as snappy as Yeats' line from "The Second Coming," but it will have to do.  Conflict resolution has been a holy grail for us on Tungsten--we had people asking for it when MySQL was still MySQL AB, lo these many years ago.   Well, it's finally starting to happen after about a year of work.  

Let's start with a simple multi-master configuration.  To replicate bi-directionally between two masters, you typically run Tungsten replicator on each master with two replication services on each master.  The first local service reads the master log.  The second remote service is a slave of the other master.  It looks like the following picture:


One of the big problems in multi-master replication is to avoid loops.  Let's say you update something in the SJC master.  The transaction replicates to NYC and appears in the log.  Then it wants to replicate back to SJC and appear in the log there.  If we don't do anything, the poor transaction will loop forever.

Tungsten solves this problem using a filter named BidiRemoteSlaveFilter that runs on slave pipelines  It has a simple job, which is to identify where transactions originated and keep local transactions that are returning from the other master from being applied again.  We use a variety of tricks to "tag" SQL in a way that allows us to deduce the origin--the most common are comments added to SQL statements or specially formatted row updates that we add to user transactions.  As long as you set things up properly and don't break some simple rules you can now replicate bi-directionally between 2 or more masters.

This brings us to conflict resolution.  Conflict resolution prevents incompatible transactions from colliding with each other.  The BidiRemoteSlaveFilter does a simple form of conflict resolution by preventing transactions from the local service from looping back and being applied again.  However, it's a very short hop to filters that that address application conflicts.

Here's a simple example that is next on my list to implement.  It is not unusual to split customers in multi-tenant applications across sites so that they have active updates on only one site and backup copies on the others.  You could imagine a filter that works off a simple file like the following:
[sjc]
   sharks
   athletics
[nyc]
   mets
   yankees
This tells the filter that transactions with shard ID sharks and athletics are allowed on the sjc master from local application.  However, if the nyc master updates these, we will reject the updates when the nyc remote service tries to apply them and generate an error message in the log, or perhaps put them in a special log file for later inspection.  You could even generate a dummy update on the local master that would result in the sjc data being sent back over to correct the nyc DBMS information via replication.

What we have just done is implemented conflict resolution for a system-of-record approach to multi-site data management.   There are many types of conflicts as well as ways to manage them.  Tungsten Replicator filters have a lot of potential to implement other schemes as well.  Filters are pluggable, so there is a convenient escape hatch if you need to do specialized rules of your own.  Meanwhile, there is plenty of scope for Tungsten development to provide useful conflict resolution mechanisms.

The Yeats poem I referred to at the beginning of the article is one of my all-time favorites.  Here are the last two lines:
And what rough beast, its hour come round at last,
Slouches towards Bethlehem to be born?
This could certainly describe a lot of software projects.   However, Tungsten is not like that at all.  We Tungsten engineers wear white lab jackets with pocket protectors and have shiny clipboards to take notes on our breakthroughs.  We rarely slouch.

P.s. Speaking of Tungsten engineers my colleague Giuseppe Maxia and I will be doing a webinar on multi-master replication on Thursday March 31st.  It will be mostly technical with only a small amount of marketing fluff.  As usual Giuseppe has cooked up a cool demo.  Sign up at www.continuent.com if you would like to find out more.

Mar 22, 2011

Parallel Replication Using Shards Is the Only Workable Approach for SQL

There have been a couple of recent blog articles (here and here) asking for parallel replication based on something other than schemas.  These articles both focus on the problem of parallelizing updates within a single MySQL schema.  I read these with great interest, not least because they both mentioned Tungsten (thanks!) and also found that our schema-based parallelization approach is too limited.  It is therefore worth a short article explaining exactly what the Tungsten approach is and why we chose it.

First of all, Tungsten does not exactly use schema-based parallel replication.  Tungsten is actually based on what I call the serialized shard model of replication.  We assign global transaction IDs to all transactions, which means that for any particular set of transactions we can always figure out the correct serialization and apply in the right order.  This is true even if the transactions travel across independent replication paths or if we have master failover.

Second, we assign a shard ID to all transactions.  Shards are independent streams of transactions that execute correctly when applied by themselves in serial order.  Shards are typically independent, which means transactions in different shards can execute in parallel without deadlocking or corrupting data.  This is the case when each shard contains data for a single customer in a multi-tenant application.  We also have a notion of "critical shards," which are shards that contain global data, such as shared currency rate tables.  Updates in critical shards cause full serialization across all shards.  

You can define shards in a variety of ways, but as a practical matter identifying durable shards inside individual MySQL schemas is hard for most applications, especially if there are constraints between tables or you have large transactions.   Many SQL applications tend to make most of their updates to a small number of very large tables, which makes finding stable dividing lines even more difficult.  Schemas are therefore a natural unit of sharding, and Tungsten uses these by default.

Schema-based sharding seems pretty limiting, but for current SQL databases it is really the only approach that works.  Here are some important reasons that give you a flavor of the issues.

* Restart.  To handle failures you need to mark the exact restart point on each replication apply thread or you will either repeat or miss transactions.  This requires precise and repeatable serialization on each thread, which you get with the serialized shard model.

* Deadlocks.  If there are conflicts between updates you will quickly hit deadlocks.  This is especially true because one of the biggest single thread replication optimizations is block commit, where you commit dozens of success transactions at once--it can raise throughput by 100% in some cases.  Deadlocks on the other hand can reduce effective throughput to zero in pathological cases.   Shard-based execution avoids deadlocks.

* Ordering.  SQL gives you a lot of ways to shoot yourself in the foot through bad transaction ordering.  You can't write to a table before creating it.  You can't delete a row before it is inserted.  Violating these rules does not just lead to invalid data but also causes errors that stop replication.  The workarounds are either unreliable and slow (conflict resolution) or impractical for most applications (make everything an insert).  To avoid this you need to observe serialization very carefully.

* Throughput.  SQL transactions in real systems vary tremendously in duration, which tends to result in individual long transactions blocking simpler parallelization schemes that use in-memory distribution of updates.  In the Tungsten model we can solve this by letting shard progress vary (by hours potentially), something that is only possible with a well-defined serialization model that deals with dependencies between parallel update streams.  I don't know of another approach that deals with this problem.

If you mess up the solution to any of the foregoing problems, chances are good you will irreparably corrupt data, which leads to replication going completely off the rails.  Then you reprovision your slave(s).  The databases that most need parallel replication are very large, so this is a multi-hour or even multi-day process.  It makes for unpleasant calls with customers when you tell them they need to do this.

I don't spend a lot of time worrying that Tungsten parallel replication is not well suited to the single big schema problem.  So far, the only ways I can think of making it work scalably require major changes to the DBMS or the applications that use it.  In many cases your least costly alternative may be to use SSDs to boost slave I/O performance.

My concerns about Tungsten's model lie in a different area.  The serialized shard model is theoretically sound--it has essentially the same semantics as causally dependent messaging in distributed systems.  However, if we fail to identify shards correctly (and don't know we failed) we will have crashes and corrupt data.  I want Tungsten either to work properly or tell users it won't work and degrade gracefully to full serialization.  If we can't do one of these two for every conceivable sequence of transactions that's a serious problem.

So, to get back to my original point, serialized shards are the best model for parallel replication in SQL databases as we find them today.  I suspect if you look at some of the other incipient designs for parallel replication on MySQL you will find that they follow this model in the end if not at first.  I would think in fact that the next step is to add MySQL features that make sharded replication more effective.  The drizzle team seems to be thinking along these lines already.

Mar 20, 2011

Tuning Tungsten Parallel Replication Performance

Last month my colleague Giuseppe Maxia described how to operate Tungsten parallel replication. Since then we have been doing a good bit of benchmarking on both synthetic as well as real production loads. In this article I would like to follow up with some tips about how you can goose up parallel replication performance.  These apply to Tungsten Replicator 2.0.1, which you can find here

The first way to get good performance with Tungsten is to have the right workload. As explained in an earlier article on this blog, Tungsten parallel replication works by replicating independent databases (aka shards) in parallel.  Here is a picture that summarizes what is going on.


If you have a lot of schemas, if the updates are distributed evenly across schemas, and if you don't have many dependencies between schemas that require full serialization, parallel replication can speed things up significantly for I/O-bound workloads.  For example, Tungsten runs three times faster than MySQL native replication on large datasets when the slave is catching up to the master following mysqld restart. 

Catch-up is a famous slave lag case and one where Tungsten can be quite helpful.  (I think we will be faster in the future, but this is a good start.)  Nevertheless, there's a chance you'll need to do a bit of tuning to see such benefits yourself.

Tungsten currently uses a structure called a parallel queue to enable parallelization.  The parallel queue typically sits at the end of a replicator pipeline in front of the parallel apply threads, as shown in the following handy diagram.

One key to getting decent parallel replication performance is to watch the parallel queue in operation.  In Tungsten Replicator 2.0.1 we introduced a new status command trepctl status -name stores that goes a long way to help diagnose how well parallel replication is performing.   Here's a typical example using a 6 channel queue store.

$ trepctl status -name stores
Processing status command (stores)...
NAME                VALUE
----                -----
criticalPartition : -1
discardCount      : 0
eventCount        : 3217
maxSize           : 1000
name              : parallel-queue
queues            : 6
serializationCount: 1
serialized        : false
stopRequested     : false
store.queueSize.0 : 0
store.queueSize.1 : 480
store.queueSize.2 : 310
store.queueSize.3 : 1000
store.queueSize.4 : 739
store.queueSize.5 : 407
storeClass        : com.continuent.tungsten.enterprise.replicator.store.ParallelQueueStore
storeSize         : 2936
syncEnabled       : true
syncInterval      : 100
Finished status command (stores)...

The two most important things to look at are distribution of transactions across queues and serialization.  Let's start with transaction distribution.  In this particular example we were running a parallel queue with 6 channels but only 5 databases.  The distribution therefore looks pretty good.  One queue is empty but the other have a fairly even distribution of transactions.

Notice that one queue has exactly 1000 transactions.  In Tungsten Replicator 2.0.1 the parallel queue has a maximum size parameter (maxSize), which is set to 1000 for this example run.  Once an individual queue hits the maxSize limit, the entire parallel queue blocks.  It is not uncommon to see one queue blocking in this way if the replicator is catching up, which is exactly what is happening here.  In fact, if the queues are all empty it is possible Tungsten is somehow not supplying transactions to the queue fast enough.  That is not a problem here.

Bad workloads on the other hand tend to have a lot of transactions in one or two queues and few or none in all the rest. The following is an example of a possibly bad distribution.

$ trepctl status -name stores
Processing status command (stores)...
NAME                VALUE
----                -----
...
store.queueSize.0 : 0
store.queueSize.1 : 4
store.queueSize.2 : 3
store.queueSize.3 : 972
store.queueSize.4 : 0
store.queueSize.5 : 15
...
Finished status command (stores)...

If you see such skewed distributions persistently, you may want to try to adjust the queue partitioning using the shard.list file. The default parallel queue partitioning algorithm hashes shards into channels. This does not always gives optimal performance if your shards mostly happen to hash into the same channel.  The other possibility is that the workload is just badly distributed across databases.

You can decide whether the workload or partitioning is at fault using the trepctl status -name shards command.  Here's an example.

$ ./trepctl status -name shards
Processing status command (shards)...
NAME                VALUE
----                -----
appliedLastEventId: 000007:0000000000000384;20
appliedLastSeqno  : 1471201
appliedLatency    : 0.0
eventCount        : 6
shardId           : #UNKNOWN
stage             : d-pq-to-dbms
NAME                VALUE
----                -----
appliedLastEventId: 000005:0000000326365895;41
appliedLastSeqno  : 1470999
appliedLatency    : 0.0
eventCount        : 311605
shardId           : db1
stage             : d-pq-to-dbms
NAME                VALUE
----                -----
appliedLastEventId: 000005:0000000326512277;95
appliedLastSeqno  : 1471200
appliedLatency    : 0.0
eventCount        : 298522
shardId           : db2
stage             : d-pq-to-dbms
...

This shows that the distribution of transactions between the db1 and db2 databases is pretty even.  If you have many databases with roughly even values in the eventCount parameter, the workload is well suited for parallelization.  In that case you may want to assign shards explicitly in the shard.list file if you don't like the distribution in the parallel queue.

Meanwhile, the previous example shows an example of another potential problem.  We also see counts for #UNKNOWN, which is a special shard ID that means "I could not tell what schema this is." #UNKOWN transactions can occur if Tungsten cannot parse a SQL statement properly or there is a transaction that updates multiple schemas.  In either case, Tungsten serializes the parallel queue.

However it occurs, serialization is a performance killer because it means we have to block the parallel queue until all parallel transactions complete, execute one or more transactions serially, and then reopen the parallel queue.  You can see how often this is happening from the serializationCount value on trepctl status -name stores.  For many workloads a serializationCount value that is more than a few percent of the number in eventCount means the entire transaction stream is effectively serialized.

If the serialization is occurring due to #UNKNOWN shards, you may be able to improve things using a new value in the replicator service properties file that was added in version 2.0.1. It controls whether we assign the shard ID using the default schema even if Tungsten cannot tell from the SQL command what you are doing.

# Policy for shard assignment based on default database.  If 'stringent', use
# default database only if SQL is recognized.  For 'relaxed' always use the
# default database if it is available.
replicator.shard.default.db=relaxed

Setting the parameter to relaxed can help quite a bit if the problem is due to unusual SQL that confuses the parser.  On one workload we were able to reduce the serializationCount from about 10% of transactions to 0% in this way.  We then saw the expected speed-up from parallel replication.

Mar 7, 2011

Understanding Tungsten Replication Services

If you follow Giuseppe Maxia's Datacharmer blog you have seen several recent articles on Tungsten Replicator.  Giuseppe and I work closely together on replication at Continuent, and I have promised a matching set of articles about replication internals that match the practical introduction provided by Giuseppe.  In this first article I will describe replication services, which are message processing flows that run in the Tungsten Replicator.

Unlike many replication engines, Tungsten Replicator can run multiple replication services concurrently.  There is a central management interface that allows you to start new replication services without disturbing services that are already running.  Each replication service also has its own management interface so that you can put the loaded service online, offline, etc. without disturbing other replication work.  As Tungsten is written in Java, the management interfaces are based on JMX, a standard administrative interface for Java applications.

Here is a simple diagram that shows a Tungsten Replicator with two replication services named fra and nyc that replicate from separate DBMS masters in Frankfurt and NYC into a single slave in San Francisco.   You can immediately see the power of replication services--a single Tungsten Replicator process can simultaneously replicate between several locations.  Replication services are an important building block for the type of complex setups that Giuseppe Maxia discussed in his blog article on Replication Topologies


Users who are handy with Java can write their own programs to manipulate the JMX interfaces directly.  If not, there is the trepctl utility, which is supplied with the Tungsten Replicator and works off the command line. 

If the Tungsten Replicator architecture reminds you of a Java application server, you are absolutely right.  Java VMs have a relatively large resource footprint compared to ordinary C programs, so it is typically more efficient to put multiple applications in a single VM rather than running a lot of individual Java processes.  Tungsten replication services follow the same design pattern, except that instead of serving web pages they replicate database transactions.  

Let's now look a little more deeply at how Tungsten Replicator organizes replication services.  Each replication service runs a single pipeline, which is Tungsten parlance for a configurable message flow.  (For more on pipelines, read here.)  When the service starts, it loads an instance of a Java class called OpenReplicatorManager that handles the state machine for replication (online, offline, etc.) and provides the management interfaces for the services.  The OpenRepicatorManager instance in turn depends on a number of external resources from the file system and DBMS. 

Here is another diagram showing how Tungsten Replicator organizes all of the various parts for services.  Services need a configuration file for the pipeline, as well as various bits of disk space to store transaction logs and replication metadata.  The big challenge is to ensure things do not accidentally collide.


This layout seems a bit complex at first but is reasonably simple once you get used to it.  Let's start with service configuration using our fra service as an example. 

Service configuration files are stored in the tungsten-replicator/conf directory.   There are up to two files for each service.  The static-fra.properties file defines all properties of the service, pipeline organization and properties like the replication role or master address that may change during operation.  The dynamic-fra.properties contains overrides to selected properties.  For instance, if you switch the replication role from slave to master as part of a failover operation, it goes in the dynamic-fra.properties file.  Tungsten Replicator reads the static file first, then applies the overrides when it starts the service.

Next, we have Tungsten transaction logs, also known as the Transaction History Log.  This is a list of all transactions to be replicated along with metadata like global transaction IDs and shard IDs.  THL files for each service are normally stored in the logs directory at the same level as the tungsten release directory itself.  There is a separate directory for each service, as for example logs/fra

Next we have Tungsten relay logs.  These are downloaded binlogs from a MySQL master DBMS from which the replication service creates the Tungsten transaction logs.  Not every replication service uses these.  They are required when the MySQL master is on another host, or the binlogs are on an NFS-mounted file system, which Tungsten does not parse very efficiently yet.  Tungsten relay logs use the same pattern as the THL--everything is stored under relay-logs with a separate subdirectory for each service, for example relay-logs/fra

Finally, there is metadata in the DBMS itself.  Each replication service has a database that it uses to store restart points for replication (table trep_commit_seqno) as well as heartbeats and consistency checks (tables heartbeat and consistency, respectively).   The name of this database is tungsten_servicename as in tungsten_fra

Setting up services is difficult to do manually, so Tungsten Replicator 2.0 has a program named configure-service that defines new replication services and removes old ones by deleting all traces including the database catalogs. You can find out all about installation and starting services by looking the Tungsten Replicator 2.0 Installation and Configuration Guide, which is located here

Services have been part of Tungsten Replicator for a while but we have only recently begun to talk about them more widely as part of the release of Tungsten Replicator 2.0.0 in February 2011, especially as we are start to do more work with multi-master topologies.  One of the comments we get is that replication services make Tungsten seem complicated and therefore harder to use, especially compared with MySQL replication, which is relatively easy to set up.  That's a fair criticism.  Tungsten Replicator is really a very configurable toolkit for replication and does far more than MySQL replication or just about any other open source replicator for that matter.   Like most toolkits, the trade-off for power is complexity.

We are therefore working on automating as much of the configuration as possible, so that you can set up even relatively complex topologies with just a couple of commands.  You'll see more of this as we make additional replicator releases (version 2.0.1 will be out shortly) and push features fully into open source.   Meanwhile, if you have comments on Tungsten 2.0.0 please feel free to post them back to us.