Sunday, June 3, 2012

MySQL to Vertica Replication, Part 1: Enabling Real-Time Analytics with Tungsten

Real-time analytics allow companies to react rapidly to changing business conditions.   Online ad services process click-through data to maximize ad impressions.  Retailers analyze sales patterns to identify micro-trends and move inventory to meet them.  The common theme is speed: moving lots of information without delay from operational systems to fast data warehouses that can feed reports back to users as quickly as possible.

Real-time data publishing is a classic example of a big data replication problem.  In this two-part article I will describe recent work on Tungsten Replicator to move data out of MySQL into Vertica at high speed with minimal load on DBMS servers.  This feature is known as batch loading.  Batch loading enables not only real-time analytics but also any other application that depends on moving data efficiently from MySQL into a data warehouse.

The first article works through the overall solution starting with replication problems for real-time analytics through a description of how Tungsten adapts real-time replication to data warehouses.  If you are in a hurry to set up, just skim this article and jump straight to the implementation details in the follow-on article.

Replication Challenges for Real-Time Analytics

To understand some of the difficulties of replicating to a data warehouse, imagine a hosted intrusion detection service that collects access log data from across the web and generates security alerts as well as threat assessments for users.  The architecture for this application follows a pattern that is increasingly common in businesses that have to analyze large quantities of incoming data.  

Access log entries arrive through data feeds, whereupon an application server checks them to look for suspicious activity and commits results into a front-end DBMS tier of sharded MySQL servers.  The front-end tier optimizes for a MySQL sweet spot, namely fast processing of a lot of small transactions.

Next, MySQL data feed as quickly as possible into a Vertica cluster that generates reports to users.  Vertica is a popular column store with data compression, advanced projections (essentially materialized views) and built-in redundancy.  (For more on Vertica origins and column stores in general, read this.)  The back-end DBMS tier optimizes for a Vertica sweet spot, namely fast parallel load and quick query performance.  

There are many challenges in building any system that must scale to high numbers of transactions.  Replicating from MySQL to Vertica is an especially thorny issue.  Here is a short list of problems to overcome.   
  1. Intrusion detection generates a lot of data.  This type of application can generate aggregate peak rates of 100,000 updates per second into the front-end DBMS tier. 
  2. Data warehouses handle normal SQL commands like INSERT, UPDATE or DELETE very inefficiently.  You need to use batch loading methods like the Vertica COPY command rather than submitting individual transactions as they appear in the MySQL binlog.  
  3. Real applications generate not only INSERTS but also UPDATE and DELETE operations.   You need to apply these in the correct order during batch loading or the data warehouse will quickly become inconsistent.  
  4. Both DBMS tiers are very busy, and whatever replication technique you use needs to reduce load as much as possible on both sides of the fence.  
Until recently there were two obvious options for moving data between MySQL and Vertica.  
  1. Use an ETL tool like Talend to post batches extracted from MySQL to Vertica.  
  2. Write your own scripts to scrape data out of the binlog, process them with a fast scripting language like Perl, and load the result into Vertica.  
ETL tools put load on MySQL to scan for changes and often require application changes, for example to add timestamps to detect updates.  Home grown tools in addition to other limitations are difficult to maintain and deal poorly with corner cases unless very carefully tested.  Both approaches also add latency to replication, which detracts from the real-time delivery goal.

The summary, then, is that there is no simple way to provide anything like real-time reports to users when large volumes of data are involved.  ETL and home-grown solutions tend to fall down on real-time transfer as well as the extra load they impose on already busy servers.  That's where Tungsten comes in. 

Developing Tungsten Batch Loading for Data Warehouses

Our first crack at replicating to data warehouses applied MySQL transactions to Greenplum using the same approach used for MySQL--connect with a JDBC driver and apply row changes in binlog order as fast as possible.  It was functionally correct but not very usable.  Like many data warehouses, Greenplum processes individual SQL statements around 100 times slower than MySQL.  To populate data at a reasonable speed you need to dump changes to CSV and insert them in batches using gpload, an extremely fast parallel loader for Greenplum.

We did not add gpload support at that time, because it was obviously a major effort and we did not understand the implementation very well.  However, I spent the next couple of months thinking about how to add CSV-based batch loading to Tungsten.  The basic idea was to turn on MySQL row replication on the master and then apply updates to the data warehouse as follows:
  1. Accumulate a large number of transactions as rows in open CSV files.  
  2. Load the files to staging tables. 
  3. Merge the staging table contents into the base tables using SQL.  
When a customer showed up needing fast replication into Vertica from MySQL we were therefore ready to develop batch loading and dived right in. It looked like a few weeks of work to get something ready for production deployment, but that estimate turned out to be quite optimistic.  The implementation in fact took a good bit longer because of the complexities of CSV formats used by different DBMS services, problems with timezones, differences in SQL load command semantics, and the fact that when we started out we did not have an easy-to-setup method to test heterogeneous replication.   Plus we needed to take time to create a proper installation.

That said, most of the work was SMOP, or a simple matter of programming.  After a few weeks six months we had fast, functional batch loading for Vertica as well as working implementations for MySQL and PostgreSQL.  Batch loading applies MySQL row updates in very large groups to Vertica using CSV files and Vertica COPY commands.  The following diagram shows direct replication using a single pipeline to apply transactions from a Tungsten master replication to Vertica.

Tungsten replication operates more or less normally up to the point where we apply to Vertica.  This is the job of a new applier class called SimpleBatchApplier.  It implements the CSV loading as follows.

First, as new transactions arrive Tungsten writes them to CSV files named after the Vertica tables to which they apply.  For instance, say we have updates for a table simple_tab in schema test with the following format (slightly truncated from the vsql \d output):

 Schema |   Table    |     Column      |  Type        | Size | 
 test   | simple_tab | id              | int          |    8 | 
 test   | simple_tab | f_data          | varchar(100) |  100 | 

The updates go into file test.simple_tab.  Here is an example of the data in the CSV file.

"64087","I","17","Some data to be inserted","1"
"64088","I","18","Some more data to be inserted","2"

The CSV file includes a Tungsten seqno (global transaction ID),  an operation code (I for insert, D for delete),  and the primary key.  For inserts, we have additional columns containing data.  Deletes just contain nulls for those columns.  The last column is a row number, which allows us to order the rows when they are loaded into Vertica.

Tungsten keeps writing transactions until it reaches the block commit maximum (for example 25,000 transactions).  It then closes each CSV file and loads the contents into a staging table that has the base name plus a prefix, here "stage_xxx_."  The staging table format mimics the CSV file columns.  For example, the previous example might have a staging table like the following:

  Schema |            Table     |     Column      |     Type     | Size | 
 test   | stage_xxx_simple_tab | tungsten_seqno  | int          |    8 |
 test   | stage_xxx_simple_tab | tungsten_opcode | char(1)      |    1 |
 test   | stage_xxx_simple_tab | id              | int          |    8 |
 test   | stage_xxx_simple_tab | f_data          | varchar(100) |  100 |
 test   | stage_xxx_simple_tab | tungsten_row_id | int          |    8 |

Finally, Tungsten applies the deletes and inserts to table test.simple_tab by executing SQL commands like the following:

DELETE FROM test.simple_tab WHERE id IN
  (SELECT id FROM test.stage_xxx_simple_tab 
     WHERE tungsten_opcode = 'D');
INSERT INTO test.simple_tab(id, f_data)
  SELECT id, f_data 
  FROM test.stage_xxx_simple_tab AS stage_a
  WHERE tungsten_opcode='I' AND tungsten_row_id IN
    (SELECT MAX(tungsten_row_id) 
       FROM test.stage_xxx_simple_tab GROUP BY id);

Simple right?  The SQL commands are actually generated from templates that specify the SQL to execute when connecting to Vertica, to load a CSV file into the staging table, and to merge changes from the staging table to the base (i.e., real) table.   You can find the template files in directory tungsten-replicator/samples/scripts/batch.  The template file format is documented here.

Tungsten MySQL to Vertica replication is currently in field testing.  The performance on the MySQL side is excellent, as you would expect with asynchronous replication.  On the Vertica side we find that batch loading operates far faster than using JDBC interfaces.  Tungsten has a block commit feature that allows you to commit very large numbers of transactions at once.  Tests show that Tungsten easily commits around 20,000 transactions per block using CSV files.

We added a specialized batch loader class to perform CSV uploads to Vertica from other hosts, which further reduces the load on Vertica servers.   (It still needs a small fix to work with Vertica 5 JDBC but works with Vertica 4.)  Taking together the new Vertica replication features look as if they will be very successful for implementing real-time analytics.  Reading the binlog on MySQL minimizes master overhead and fetches exactly the rows that have changed within seconds of being committed.  Batch loading on Vertica takes advantage of parallel load, again reducing overhead in the reporting tier.

A New Replication Paradigm:  Set-Based Apply

Batch loading is significant for reasons other than conveniently moving data between MySQL and Vertica.  Batch loading is also the beginning of a new model for replication.  I would like to expand on this briefly as it will likely be a theme in future work on Tungsten.

Up until this time, Tungsten Replicator has followed the principle of rigorously applying transactions to replicas in serial order without any deviations whatsoever.  If you INSERT and then UPDATE a row, it always works because Tungsten applies them to the slave in the same order.  This consistency is one of the reasons for the success of Tungsten overall, as serialization short-cuts usually end up hitting weird corner cases and are also hard to test.  However, the serialized apply model is horribly inefficient on data warehouses, because single SQL statements execute very slowly.

The SQL-based procedure for updating replicas that we saw in the previous section is based on a model that I call set-based apply.  It works by treating the changes in a group of transactions as an ordered set (actually a relation) consisting of insert and delete operations.  The algorithm is easiest to explain with an example.  The following diagram shows how three row operation on table t in the MySQL binlog morph to four changes, of which we actually apply only the last two.

Set-based apply merges the ordered change set to the base table using the following rules:
  1. Delete any rows from the base table where there is change set DELETE for the primary key and the first operation on that key is not an INSERT.  This deletes any rows that previously existed. 
  2. Apply the last INSERT on each key provided it is not followed by a DELETE.  This inserts any row that was not later deleted. 
This is a form of logical reduction using a combination of staging tables and CSV loading as described in the previous section.  The rules are implemented as SQL queries.  Taken together these two rules apply changes in a way that is identical to applying them straight from the binlog.  Using SQL is not the most efficient approach but is relatively simple to implement and easy for users to understand.

Set-based apply offers interesting capabilities because sets, particularly relations, have powerful mathematical properties.  We can use set theory to reason about and develop optimized handling to solve problems like conflict resolution in multi-master systems.  I will get back to this topic in a future post.

Meanwhile, there are obvious ways to speed up the apply process for data warehouses by performing more of the set reduction in Tungsten and less in SQL.  We can also take advantage of existing Tungsten parallelization capabilities.  I predict that this will offer the same sort of efficiency gains for data warehouse loading as Tungsten parallel apply provides for I/O-bound MySQL slaves.  Log-based replication is simply a very good way of handling real-time loading and there are lots of ways to optimize it provided we follow a sound processing model.


This first article on enabling real-time analytics explained how Tungsten loads data in real-time from Vertica to MySQL.  The focus has been allowing users to serve up reports quickly from MySQL-based data, but Tungsten replication obviously applies to many other problems involving data warehouses.

In the next article I will turn from dry theory to practical details.  We will walk through the details of configuring MySQL to Vertica replication, so that you can try setting up real-time data loading yourself.

P.S. If optimized batch loading seems like something you can help solve, Continuent is hiring.  This is just one of a number of cutting-edge problems we are working on.


Jonathan Levin said...

This is indeed an exciting bit of technology. If you are able to convert statements into batched CSV files, you can work with any column-based storage engine / datawarehouse technology. Depending on how clean your data is when it is replicated over, you can have very up to date reports.

Robert Hodges said...

Thanks! You bring up an interesting point about cleaning. The large-scale reporting systems I have seen are designed with analytics in mind, hence are able to use data with little or no transformation. That's a good design principle if you want to move data really quickly into reports.

Alexander Zaitsev said...

Hi Robert,

When we talked live, I somehow did not realize how you technically do the replication to Vertica. The approach is indeed very interesting and universal.

Note, that you can load data to Vertica in multiple parallel streams, if needed. There is also a lot of optimization room configuring Vertica WOS/ROS usage.

I wonder how you deal with COPY rejections? Vertica can reject copy for the number of reasons, so you need to have some sort of retry logic.

Another thing, though it is probably related to Part 2, why you really need intermediate CSV files? The stdout/stdin piping is usually faster and easier to manage.

Robert Hodges said...

@Alexander, thank you for the compliment! It's been fun to work on this problem, especially once some of the set theoretic fundamentals became more apparent. It opens up the possibility of some very interesting optimizations.

Parallelization is an interesting topic. Vertica of course can do it well and we have parallel apply on the Tungsten end. In fact, it seems that Tungsten parallelization just works on Vertica if you have multiple schemas to load, because they behave like shards. I did this by accident in some of my testing, but we have not certified it formally. What would be really interesting is table-by-table sharding.

COPY rejections are handled in a very simple fail/stop way. We check row counts on load, though you can turn this off if you want. The replicator just gets an error when CSV rows and update counts do not match. You look at the CSV file (which has the transaction seqno) and either skip it or fix the underlying problem. We could do much better.

Finally, we don't really need CSV files as you point out but this works for everyone. MySQL does not have streaming load, for instance. I'm sure there's more to add here as well.

Anonymous said...

You might not be aware of this, but Vertica's insert performance is very high, providing you disable autocommit and use JDBC's batch API. When rows are inserted using [addBatch* executeBatch] a COPY command is run on the server, so directly loading into Vertica using the standard JDBC (or ODBC) API approaches the performance of COPYing .csv files. For free, with no MYSQL layer needed at all.

Robert Hodges said...

@Anonymous, thank very much you for that tip. We will take a fresh look at it in the future. We use currently the batch API in a limited way as it has problems with error handling that make it difficult to tricky back to the exact place an error happened. In particular we like to be able to point to a specific transaction number and SQL statement.

Meanwhile, was the reference to the "MySQL layer" a typo? Just to be clear you don't need MySQL to load Vertica with Tungsten. We are also working on loading data from Oracle. :)

Vladi Feigin said...


Does it support the replication between the two geo distributed Vertica clusters?

Vladi Feigin

Robert Hodges said...

@Vlad. We do not support replication between Vertica clusters yet, as that requires access to the Vertica log, which is not easily accessible. Is this for high availability? If so, I would check with the Vertica team at HP; they are apparently thinking about some feature development in that direction.

p.s., You had some duplicate comments, so I just published the last one.

Vladi Feigin said...

Thanks for the answer!
We need it for the Disaster recovery

Will be glad if you help us with this :-)


Scaling Databases Using Commodity Hardware and Shared-Nothing Design