How we implemented serverless full-text search in Rust

Benjamin RabierBenjamin RabierMarc MettkeMarc Mettke

Search @ Grafbase

Grafbase is a unified data layer for databases and APIs that comes with with edge caching, authentication, analytics and more. Currently, we offer a serverless database powered by DynamoDB that we extended with full-text search capabilities earlier this year. In this post, we'll discuss its architecture.

Our main objective was to provide a fast, serverless solution for full-text search and filtering on top of DynamoDB. We considered using existing projects or vendors, but at the time they were either not serverless or too expensive compared to our current offering. We decided to build on top of the Tantivy library, created by Paul Masurel, a mature Rust equivalent of the famous Apache Lucene library used by Elasticsearch.

Before going further, let’s take a look at Tantivy’s core data structure, the inverted index. It maps content to the document IDs, as presented in the following example with the title field:

Inverted Index

To achieve good full-text search performance, building an inverted index is only the first step. Defining good terms and how to match them is the next one. Here we only converted words to lowercase and removed the punctuation but there is lot more to do such as unicode normalization or stemming. To tolerate typos, fuzzy matching can be applied to the terms during query execution: for a given search term, all terms within a given distance (usually Levenshtein distance) will have their associated documents added to the result set. For example the Levenshtein distance between alic and alice is 1. In Tantivy, the terms and associated IDs are stored in the .term and .idx files, respectively.

Once we retrieve a list of matching documents, the next step is to determine their relevance to the search query. The most common strategy is to rank by how frequently the query terms appear in all documents and fields, with a ranking function like BM25. With it, a document is more important when the term appears in the title rather than in a large text. And a document having multiple occurrences of a term is ranked higher that one with a single occurrence. To compute this rank, Tantivy stores the number of terms in each document field in .fieldnorm and the number of occurrences of each term the .idx itself. In addition, there are several other files with more specific purposes. For example, the .store file can store a subset of the document fields in compressed blocks. All of these files make up a segment in Tantivy’s terminology, and a Tantivy index is a collection of segments.

ffc78956f07c4c2c8ee259f6c41e8544.fieldnorm
ffc78956f07c4c2c8ee259f6c41e8544.idx
ffc78956f07c4c2c8ee259f6c41e8544.store
ffc78956f07c4c2c8ee259f6c41e8544.term
ffe96eb669c24edca98a7b4444d0fdb7.fieldnorm
ffe96eb669c24edca98a7b4444d0fdb7.idx
ffe96eb669c24edca98a7b4444d0fdb7.store
ffe96eb669c24edca98a7b4444d0fdb7.term
...

Each segment in Tantivy is immutable, so a new one is created with every update. Eventually, they are merged in the background to limit their number. This approach allows for concurrent readers while writing and merging the segments. However, it is not the most efficient when dealing with frequent and small updates, which is similar to what our database will provide! For more information, Paul Masurel's blog provides a good introduction.

Our first task was to test the performance characteristics of Tantivy with a multitude of segments. First we took a dataset from Kaggle in order to work on data that is similar to a real-world use case.

event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
2019-11-01 00:00:00 UTC,view,1003461,2053013555631882655,electronics.smartphone,xiaomi,489.07,520088904,4d3b30da-a5e4-49df-b1a8-ba5943f1dd33
2019-11-01 00:00:00 UTC,view,5000088,2053013566100866035,appliances.sewing_machine,janome,293.65,530496790,8e5f4f83-366c-4f70-860e-ca7417414283
2019-11-01 00:00:01 UTC,view,17302664,2053013553853497655,,creed,28.31,561587266,755422e7-9040-477b-9bd2-6a6e8fd97387
2019-11-01 00:00:01 UTC,view,3601530,2053013563810775923,appliances.kitchen.washer,lg,712.87,518085591,3bfb58cd-7892-48cc-8020-2f17e6de6e7f

Skipping deserialization logic, we map it to:

pub struct Record {
    pub event_time: chrono::DateTime<chrono::Utc>,
    pub event_type: EventType,
    pub product_id: u32,
    pub category_id: u64,
    pub category_code: String,
    pub brand: String,
    pub price: f32,
    pub user_id: u32,
    pub user_session: uuid::Uuid,
}

pub enum EventType {
    View,
    Cart,
    RemoveFromCart,
    Purchase,
}

First, we’re creating the schema with all indexed fields:

use tantivy::schema::*;

let mut schema = Schema::builder();

// TEXT -> full-text search fields
let category_code = schema.add_text_field("category_code", TEXT);
let brand = schema.add_text_field("brand", TEXT);

// INDEXED -> field can be filtered on
let event_time = schema.add_date_field("event_time", INDEXED);
let event_type = schema.add_u64_field("event_type", INDEXED);
let product_id = schema.add_u64_field("product_id", INDEXED);
let category_id = schema.add_u64_field("category_id", INDEXED);
let price = schema.add_f64_field("price", INDEXED);
let user_id = schema.add_u64_field("user_id", INDEXED);

All of our documents inside DynamoDB use an ULID as their ID, so we’re going to use the user_session as its analog. We will only store it inside the .store compressed with Zstd. We do not need to index it as DynamoDB would already provide the capability to retrieve a document by its ID.

// STORED -> field stored in .store
let user_session = schema.add_bytes_field("user_session", STORED);

use tantivy::{store::*, *};
let index = Index::builder()
    .schema(schema.build())
    .settings(IndexSettings {
        docstore_compression: Compressor::Zstd(ZstdCompressor::default()),
        ..Default::default()
    })
    .create_in_dir(index_path)?;

To ensure proper testing, we’re making sure to have full control over the actual number of segments we end up with:

// Using a single thread and a huge memory arena to control the
// number of segments. In real cases, the memory arena is only a
// few dozens of MiB big.
const NUM_THREADS: usize = 1;
const OVERALL_MEMORY_ARENA_IN_BYTES: usize = 1 << 30; // 1GiB
let mut writer = index.writer_with_num_threads(
  NUM_THREADS,
  OVERALL_MEMORY_ARENA_IN_BYTES
)?;

// Prevent any segment merge, again to control the number of segments.
writer.set_merge_policy(Box::new(tantivy::merge_policy::NoMergePolicy));

// CSV reading logic
let mut csv_reader = csv::ReaderBuilder::new()
    .has_headers(true)
    .from_path(csv_path)?;

// Here we're forcing a certain number of segments. In production code, we try to write
// as few as possible. Tantivy will automatically create a new segment when its memory arena
// is exhausted, so chunking isn't needed.
const TOTAL_NUMBER_OF_DOCUMENTS: usize = 10_000_000; // hardcoded
let number_of_segments = 1; // parameter
let documents_per_segment = TOTAL_NUMBER_OF_DOCUMENTS / number_of_segments;
for chunk in &csv_reader
    .deserialize()
    .into_iter()
    .chunks(documents_per_segment)
{
    for line in chunk {
        // Skipping over the deserialization details.
        let record: Record = line?;

		// Indexing a Tantivy document
        writer.add_document(tantivy::doc!(
            category_code => record.category_code.clone(),
            brand => record.brand.clone(),
            user_session => Vec::from(record.user_session.to_bytes_le()),
            event_time => record.event_datetime(),
            event_type => record.event_type.variant_id(),
            product_id => record.product_id as u64,
            category_id => record.category_id,
            price => record.price as f64,
            user_id => record.user_id as u64
        ))?;
    }
	// Committing, creating a new segment.
    writer.commit()?;
}

Finally, for the search query we used the following:

let searcher = index.reader()?.searcher();

let query_parser = tantivy::query::QueryParser::for_index(
    &index,
    // Searching on all fields
    index
        .schema()
        .fields()
        .map(|(field, _)| field)
        .collect::<Vec<_>>(),
);
let query = query_parser.parse_query(r#"brand:"creed""#)?;

searcher.search(&query, &tantivy::collector::TopDocs::with_limit(10))?;

We benchmarked it against the first 10 million rows (~1.3GB of uncompressed CSV) and ran the code on a recent hardware (M2 Max, SSD). Here are the results:

SegmentsIndexingSearch
19 s1 ms
1012 s2 ms
2013 s3 ms
5015 s6 ms
10018 s10 ms
20025 s20 ms
50045 s50 ms
100078 s90 ms
2000149 s170 ms

Tantivy is impressive, 1 micro second per document to index. However, it performs poorly with small batches. Taking it to the extreme, with single document updates, each commit takes 70ms. Unexpectedly, search queries are also significantly impacted by the number of segments, which will worsen when I/O will pass through the network. Now we have a baseline to compare against.

The Grafbase platform runs on AWS and Cloudflare infrastructure, so we considered both options. Initially, we thought of having a Cloudflare WebAssembly (Wasm) worker on the edge, like our GraphQL gateway, and storing the data in R2. A project tantivy-wasm actually made it work in the browser. However, Tantivy is a synchronous library, so it relies on synchronous HTTP calls, which Cloudflare Workers does not support.

Quickwit, an Elasticsearch-like product built on top of Tantivy, uses S3 and pre-fetches data asynchronously. But it relies on a good understanding of the segment structure and is still work in progress. Relying on it seemed perilous for us at our stage. Additionally, it wasn’t clear if all the data was pre-fetched or if it was a best-effort warm-up to reduce latency. In a Cloudflare worker we wouldn’t be able to make a synchronous call in case we missed something. Therefore, we decided against using Cloudflare Workers for the time being.

On AWS, data can be stored either in S3 or EFS. EFS is a managed NFS (disk mounted through the network), so it should be faster but more expensive than S3. To test them, we executed a search query on a pre-generated index. To minimize the number of S3 calls, we developed a generic strategy inspired by tantivy-wasm:

  1. We load the metadata and list of files concurrently
  2. Load all file footers concurrently, where Tantivy keeps metadata. While it does make a hypothesis on how Tantivy stores data, storing metadata in footers is common and unlikely to change.
  3. Files are lazily read and cached by 16KB chunks.

We also attempted to combine the first and second steps by making a single S3 call that fetches a warm-up file containing all the necessary data. With a 1 million document index, here are the results, which have had network latency subtracted, over a 30 second period:

StorageMedianP95
EFS25 ms33 ms
S3540 ms697 ms
S3 (with warmup file)462 ms534 ms

Despite some optimizations, S3 remained rather slow. While EFS is considerably more expensive, costing roughly 10 times more, it is in the same ballpark figure as DynamoDB storage, which we're already using to store the actual documents. Since this system isn't designed to store huge amounts of infrequently accessed data, such as logs, but rather live data like products and users coming from OLTP workloads, we decided to start with EFS. We can easily revisit this choice later on if necessary.

We actually didn’t test a possible optimization by storing the document IDs inside Tantivy fast fields instead of the .store file. The former stores everything in an array without compression, so we would not need to read and decompress as much data. However, it is not clear how much more efficient this approach would have been. The .store blocks are only 16KB in size and cached. . Additionally, due to a misunderstanding of how the .store works, we did not test parallelising the retrieval of document ids, which would likely have a greater impact. So, there is still room for improvement!

Now that we've decided to use Tantivy with Lambda functions and EFS, we need to index the documents from DynamoDB. We'll rely only on the open-to-close consistency provided by EFS, meaning that when a file is closed, all subsequent opens will see the latest changes. As Tantivy index files are immutable and metadata is updated atomically (creating an intermediate file first and replacing the old one after), this is sufficient. We just need to do the same for our own metadata. With this approach, reads will be consistent.

To ensure only one concurrent execution and proper ordering of writes, we can use a FIFO SQS (queue). Within a given message group, all messages will be processed in the order in which they were sent. With DynamoDB streams guaranteeing the ordering of updates to an individual document, using the DynamoDB table name as the message group ID ensures that the Tantivy index will be updated sequentially and in the correct order.

Write architecture

Search functionality will not be exposed directly; instead, it will be behind the GraphQL gateway that is deployed to a Cloudflare Worker. The actual documents are stored in DynamoDB. Therefore, to leverage our existing system, search will only return the IDs of the documents and not store them, as was the case in our previous test cases.

Read architecture

There is actually one last interaction that we need to be careful of: file deletions. When the index Lambda deletes index files for garbage collection it can break a running query Lambda. So we need to add a delay, allowing any ongoing query Lambda to finish their work. The ideal solution is to use another queue. For now, we’re adding a small delay inside the index Lambda itself.

With this setup we had actually two unexpected surprises:

  • When setting up DynamoDB Streams with Lambda triggers to execute the stream lambda for every change, we initially used the LATEST starting position. As the name suggests, this starts the stream at the most recent record. Since we already had existing DynamoDB tables with streaming enabled, it seemed to be the most appropriate choice. However, we noticed that updates were not regularly propagated in end-to-end tests with new DynamoDB tables.

    Understanding how the DynamoDB Streams API works helped us realize our mistake. Once the stream is activated, DynamoDB stores changes within shards that can be iterated over from a given position. The position we initially provided for the event source mapping, was used for the first shard in the stream. As we used LATEST, it would skip any change in that first shard, and depending on what that initial shard contained, we would miss changes. Changing it to TRIM_HORIZON (starting at the earliest record) fixed that problem.

  • The second issue was due to a misconfigured low retention period for the FIFO SQS. This caused two index Lambda functions to sometimes execute in parallel. The retention period determines how long SQS will wait before evicting an item, regardless of whether it is currently being processed by a downstream service or not. FIFO SQS delivery logic only ensures that items are treated in order by making the next message available for a particular message group ID only if no other message of that group is currently being processed.

    Therefore, when the index Lambda couldn't keep up with the throughput, it processed older and older messages until it took one too close to the retention limit. This message was then evicted from the queue before the index lambda finished processing it, and a concurrent index lambda was started with the next message. Increasing the retention period solved the issue. We also intend to improve the throughput.

To test all of this, let’s create a Grafbase project with an appropriate schema:

type Record @model @search {
  eventTime: DateTime!
  eventType: EventType!
  productId: Int!
  categoryId: Int!
  categoryCode: String!
  brand: String!
  price: Float!
  userId: Int!
}

enum EventType {
  VIEW
  CART
  REMOVE_FROM_CART
  PURCHASE
}

We pushed 10.000 documents with several batch create operations:

mutation CreateRecords($input: [RecordCreateManyInput!]!) {
  recordCreateMany(
    input: [
      {
        input: {
          eventTime: "2019-11-01T00:00:01.000Z"
          eventType: "VIEW"
          productId: 17302664
          categoryId: 2053013553853497600
          categoryCode: ""
          brand: "creed"
          price: 28.309999465942383
          userId: 561587266
        }
      }
    ]
  ) {
    recordCollection {
      id
    }
  }
}

And then executed a simple search query:

query RecordSearch {
  recordSearch(first: 10, filter: { brand: { eq: "creed" } }) {
    edges {
      node {
        id
        eventTime
        eventType
        productId
        categoryId
        categoryCode
        brand
        price
        userId
      }
    }
  }
}

Pushing the 10,000 documents took only a few seconds. However, it took several minutes for all of them to become available in search due to the limited message grouping (10) provided by the FIFO queue. Search queries themselves have a median response time of 250ms and a p95 of 400ms.

There is a lot of room for optimizations and lots of features are missing, but we achieved our main goal. Now we need to iterate on it and write the next part of this blog post! Finally, we also want to mention a similar project Pathery. While we didn't re-use it because of multiple small differences, it did serve as a point of reference to us.

Get Started

Start building your backend of the future now.