Apr 24, 2018


Blog Categories

Categories

Blog Categories

Connecting External Indexers and Data Pipelines

For this blog post, we’ll focus on the pattern to feed changes from FaunaDB's transactional data into a secondary data processing pipeline. The foundation of FaunaDB’s flexibility is our ability to integrate with the data ecosystem. The pattern illustrated here can be used to keep any system up to date with FaunaDB, from full text indexers to machine learning classifiers.

When building a search index (or other eventually consistent derived data) from a database, you typically want to build it from the database’s current state — not by replaying the database history. And when keeping the index up to date with new database changes, typically you don’t care about all the intermediate states, you just want to update your index to reflect the latest version of each document. Deduplicating the feed can be a big optimization if your objects have multiple updates in quick succession, leading to faster, more responsive indexes.

However, if you’re building something like a real-time fraud detection system, you might want your system to ingest the full feed of intermediate changes. This article is about how to build a deduplicated feed for bringing external processors up to date with the latest changes. If you’re interested in the full feed including intermediate versions,  FaunaDB’s event queries are the answer, and our temporal timeline tutorial is a great place to start. The pattern in the article you're reading now is designed for full-text search, aggregation, geographic indexes or any other derived data set that represents an eventually consistent view of the current state of your data.

Fetching New Updates Consistently

Each update in FaunaDB has a timestamp that is available on instance objects under their ts field. This is a synthetic timestamp, which corresponds to an agreed-upon value derived by the FaunaDB cluster, and which corresponds to the snapshot containing the object’s update. This logical clock is always increasing (unlike a clock tracking real time) and doesn’t depend on physical time resolution, making it safe to use in this context. As you’ll see, our logic for correctness depends on the fact that FaunaDB’s timestamps are monotonically increasing.

Indexing on this timestamp value makes it easy to sort instances by their snapshot order. If we ingest the instances in this order, we can track a high-water-mark, and use it to safely resume our data feed after a process restart or server failure. What this looks like in practice is first a query to load all objects from the beginning of time, then polling logic to request changes since the high-water mark. Each time we poll we record the new high-water-mark, so that if we’re restarted we know where to pick up from.

One nuance is that without proper use of snapshots, it’s possible that frequently updated items can twinkle just beyond the horizon of our queries. By running our pagination queries within the same snapshot through each complete set we ensure that instances will be ingested with fairness. Once we’ve paginated to the end of the set in a given snapshot, we can start again with the database’s new current snapshot, from our new high-water-mark. No matter how frequently updated an instance is, it will be found by snapshot pagination.

Assuming we have a class called posts, we can create an index on the ts and ref fields.

q.CreateIndex({
    name: "posts-changes",
    source : q.Class("posts"), 
    values : [{field: ["ts"]}, {field: ["ref"]}]})

Once this index is active, we paginate over the database’s current contents. If the database is only a few records you might be able to retrieve it all in one query. But most databases are large enough to need multiple pages of results to fully traverse all data.

Before we begin pagination we need to pick a snapshot timestamp to paginate from that we can reuse as we fetch each page of the set. The best way to do this is to ask the database for it’s representation of now:

q.Time("now")
// => q.Time("2018-04-16T23:38:56.568191Z")

We can store this value in a local variable in our program and use it for each query in the set. The first query would look like:

q.At(q.Time("2018-04-16T23:38:56.568191Z"), 
    q.Map(q.Paginate(q.Match(q.Index("posts-changes"))), 
      function(_ts, ref){return q.Get(ref)}))

This will return the first page of results in that snapshot, and a cursor for the next page of results if there is one, in the after field. Refer to the Paginate function documentation for details. As long as there is a page linked in the after field, we keep fetching pages and processing them.

Processing the results will look like looping over the documents (returned by q.Get(ref) in the query) and feeding them to your search index, machine learning classifier, or other downstream system. Your life will be a easier if this ingest processing is idempotent because that means accidentally processing some documents twice is OK. Most indexers are idempotent. At the end of each page, update your high-water-mark by persisting it to the local filesystem or in FaunaDB.

Once we have processed all the pages in the index the result set won’t include an after field. The highest ts we’ve seen can become our new high-water-mark. Now we’ve completed a full snapshotted index pass and we’re ready to start again at the new latest snapshot so we can repeat the process. We’ll query again for q.Time("now") to find the new snapshot and start fetching the snapshotted index starting at our high-water-mark. (To do this we make a synthetic after cursor from our high-water-mark and start the process again to fetch the latest updates.)

In JavaScript, making the query with the new snapshot time and the high-water-mark would look something like this. (This illustration leaves out looping over the paginated pages for simplicity, and concerns itself with only the first call. Astute readers will recognize that a full implementation might look recursive as it tracks the high-water-mark.)

client.query(q.Time("now")).then(function(snapshotTime) {
// => q.Time("2018-04-16T23:38:58.658231Z")
    const syntheticCursor = {after : highWaterMark};
    return client.query(q.At(snapshotTime,
        q.Map(q.Paginate(q.Match(q.Index("posts-changes")), 
            syntheticCursor), function(_ts, ref){return q.Get(ref)})))
})

Operating Your Connector

The less frequently you poll for new data, the more deduplication that can happen. So your secondary system might not have to work as hard depending on your document update patterns. But if you poll too infrequently, users can get frustrated waiting for their changes to become searchable. A happy medium is to define the fastest polling interval you are comfortable with (maybe 250ms), and then let the unit of work that is your indexer ingest over a snapshot throttle you to appropriately sized chunks of work. However, if updates come in faster than your ingester can handle them you’ll end up with a lengthening queue of work.

An easy way to tell if you need to allocate more resources to your external indexer, is if the ingester high-water-mark is falling behind the cluster’s current snapshot. Falling behind a little during traffic spikes is probably OK, but you don’t want to be trending further and further behind over the long term.

If you redefine your external data pipeline, for instance by indexing a new field, you might want to rebuild from scratch. This is as easy as erasing your target system, setting the high-water-mark to zero, and running your scripts again.

Stay tuned for an open-source example that implements the full approach.

If you enjoyed this topic and want to work on systems and challenges just like this, Fauna is hiring!