Druid Deprecation and ClickHouse Adoption at Lyft

Ritesh Varyani
Lyft Engineering
Published in
12 min readNov 29, 2023

--

Written by Ritesh Varyani and Jeana Choi at Lyft.

Introduction

At Lyft, we have used systems like ClickHouse and Apache Druid for near real-time and sub-second analytics. Sub-second query systems allow for near real-time data explorations and low latency, high throughput queries, which are particularly well-suited for handling time-series data. For our customers, this means faster analytics on near real-time data and decision making. This is crucial for use cases like market signaling and forecasting which benefit from, and depend upon, the most up-to-date information. Overall, these systems allow us to make timely decisions for the business with accurate predictions.

In this particular blog post, we explain how Druid has been used at Lyft and what led us to adopt ClickHouse for our sub-second analytic system.

Druid at Lyft

Apache Druid is an in-memory, columnar, distributed, open-source data store designed for sub-second queries on real-time and historical data. Druid enables low latency (real-time) data ingestion, flexible data exploration and fast data aggregation resulting in sub-second query latencies.

At Lyft, we started using Druid around six years ago as our first interactive query engine in the sub-second space. Our initial use for Druid was for near real-time geospatial querying and high performance on high-cardinality data sets. It also allowed us to optimize for handling time-series data and event data at scale.

Druid leverages the concept of segments, a unit of storage that allows for parallel querying and columnar storage, complemented with efficient compression and data retrieval.

Druid Architecture at Lyft

Druid Architecture at Lyft

With Lyft as a Kubernetes shop, we ran all of the above processes into our stateful and stateless Kubernetes pods within the service namespaces. External dependencies for Druid were managed by our persistence teams and Amazon S3 was utilized for deep storage of our segments.

Druid Performance

For Druid query performance, we primarily focused on two types of optimization: rollup and compaction.

1. At Lyft, we used rollup as a data preprocessing technique which aggregates and reduces the granularity of data prior to being stored in segments. Pre-aggregating data at ingestion time helped optimize our query performance and reduce our storage costs.

An example of how we use Druid rollup at Lyft.

2. Compaction uses batch processing to combine many small, fragmented segments into fewer optimized segments, improving the query performance further.

Druid Data Ingestion

Our pipeline for the two methods of ingesting data into Druid—the upper process is for batch ingestion, the lower process is for real-time ingestion.

Real-time Ingestion

Events from our real-time analytics pipeline were configured to be sent into our internal Flink application, streamed to Kafka, and written into Druid. This was our main form of ingestion.

Batch Ingestion

Our batch ingestion pipeline served as a backup ingestion method for backfills and maintenance tasks, specifically:

  • Backfills covering for any data loss or downtime in real-time ingestion
  • Maintenance tasks such as compaction of segments

Configuring Ingestion for Druid

To onboard a use case to Druid, our customers had to make the events streamed to our Flink application with the correct transformations if needed, and define the proto files to serialize and deserialize the data to Kafka to prepare them for ingestion into Druid. Then, they needed to define an ingestion specification which tells Druid how to process the data being ingested.

A Druid ingestion specification contains the following:

  • Datasource schema: name, query / segment granularity, timestamp, dimension, metrics, etc.
  • ioConfig: Kafka server info, topic names, etc. (ex. tuningConfig)
druid: {
ioConfig: {
taskCount: 3
replicas: 2
taskDuration: PT15M
completionTimeout: PT60M
earlyMessageRejectionPeriod: PT2H
}
tuningConfig: {
intermediatePersistPeriod: PT5M,
maxRowsPerSegment: 15000000
maxRowsInMemory: 500000
resetOffsetAutomatically: true
}
}
kafka {
partitions: 3
}

Druid Use Cases at Lyft

Druid powered crucial use cases for Lyft, including pricing models and campaigns created and analyzed by our campaigns and communications team. In addition, our experimentation team utilized the datastore to analyze recent experimentation data.

Managing Druid and Issues We Faced

At Lyft, our primary Druid customers were data engineers and software engineers. One of the main barriers we faced for onboarding new customers was the steep ramp-up for writing and maintaining their ingestion specifications, as well as understanding the different tuning mechanisms for each use case.

This involved:

  • Ensuring the correct format for data being ingested
  • Fixing downstream ingestion pipelines for use cases with any upstream changes
  • Creating specification files for users well-versed in Python/SQL semantics, but not necessarily Druid-specific technologies

These blockers made it difficult to find more customers and increase adoption. Over time, Druid was under-invested due to lack of ROI, despite the existing and active critical use cases for spend tracking, short-term forecasting, and signaling.

Around this time, Lyft began to run a lot leaner with different cost-cutting initiatives. For new platform engineers, it presented a steeper learning curve. This lean phase meant we hyper-prioritized and therefore could not quickly upgrade or maintain Druid’s latest performance fixes.

In addition, when Lyft moved to Kubernetes Compute about three years ago, an extra layer of complexity was introduced, increasing the effort required for the team since multiple Druid clusters were running with many more components in the updated system.

With these factors in mind, we wanted to take a step back and reevaluate our choices to consider if other technologies could be a net positive for us.

ClickHouse at Lyft

What is ClickHouse?

ClickHouse is an open-source, column-oriented database for online analytical processing. One of ClickHouse’s standout factors is its high performance—due to a combination of factors such as column-based data storage & processing, data compression, and indexing.

Initial Use Case

In 2020, while the data platform team was managing Druid, the marketplace team considered a new set of requirements:

  1. Data produced is immediately available for querying in near real-time
  2. Latencies are sub-second for business dashboarding
  3. Ingestion for quick slice and dice of datasets. (For example: How many rides in the last 2 hours in the SF region?)
  4. Nested data support
  5. Support for both real-time and batch ingestion
  6. Native data deduplication at destination

While the latest version of Druid would provide us with some of these features, such as nested joins (v0.18), other requirements such as deduplication at destination would not be well satisfied. Using our existing stack, we considered performing deduplication at the streaming layer instead of at the destination.

However, two main reasons prevented us from pursuing this idea:

  1. We would want to perform this at the Destination Storage layer to deduplicate data between the stream and batch loads.
  2. Streaming solutions require setting up a mutability window per entity (ex. 24 hours per ride). This was a hard requirement from the business end due to possible scenarios of updating a past transactional entity already written to storage. This was coupled with the need of the entity to be queryable as soon as possible (at the end of a ride, for example, if not earlier).

The conclusion was that we should be able to overwrite records as needed using the dedupe facility. With this information at top of mind in 2020, ClickHouse emerged as an option that satisfied the above requirements out of the box and was adopted by the marketplace team.

Decision: ClickHouse or Druid?

ClickHouse gained momentum with our marketplace use cases, leading us to a series of questions — should we expand to other use cases? Can ClickHouse support our Druid use cases? Should we continue to run both systems at Lyft or consolidate into one?

After a careful and deep analysis of cost, infrastructure management, and overall feature benefits, we decided to expand on ClickHouse and sunset Druid, migrating existing Druid use cases to ClickHouse. The following points expand on some benefits we saw with ClickHouse over Druid:

  1. Simplified infrastructure management—with Lyft’s pivot towards leaner teams and architecture, there was a preference for converging on a system with less management and maintenance requirements. Druid, due to its modular design, turned out to be a more complex system to maintain.
  2. Reduced learning curve—our users are well versed in Python and SQL semantics compared to Java, etc. With more familiar language and tooling, onboarding their use cases would take less time. For example, defining sorting key and engine type in their object definitions with TTL would be a shorter learning curve compared to defining these as a Druid specification.
  3. Data deduplication—covered in the “Initial Use Case” section above.
  4. Cost—for Lyft, as a Kubernetes Compute company, running ClickHouse over Druid would reduce our compute footprint by a large margin. Running much leaner with better TTL definitions at 1/8th of the cost was a big plus.
  5. Specialized engines—Replicated*, Replacing* and Kafka Engine types in ClickHouse allow Lyft to natively manage Kafka pull-based ingestion and also maintain high availability (HA) due to replication across nodes.

Benchmarking, Performance, and Migration

We created a benchmarking test suite, and while still serving queries off of Druid, provisioned our real-time & batch ingestion in ClickHouse, and ran tests in a controlled environment comparing query performance between ClickHouse and Druid.

We involved our stakeholder users in these tests and took queries running in our Druid production system, dynamically transpiled them to ClickHouse syntax, and fired both queries against Druid and ClickHouse respectively. We compared the query latencies and identified bottlenecks in ClickHouse.

For a couple of our experimentation use cases in ClickHouse, we observed unreliable (spiky, higher) latency performance due to the sorting key of the table and the query resulting in full scans of the table. Since the shape of the query was consistent and the data queried could be pre-aggregated, adding projections in ClickHouse helped.

Projections are essentially precomputed views of your data, which compute only on the columns needed and rather than running a full table scan, ClickHouse just scans the projection column. In addition to improving the query performance for the couple experimentation use cases, projections helped reduce our I/O as well.

We measured correctness (by row counts returned and the diff of exact table results) and latency, and used a tiered migration serving 1%, 5%, 10%, 20%, 50% and then 100% from ClickHouse. We eventually also realized latency gains which is discussed in later sections.

Overall, our migration experience went smoothly. Use cases for campaigns, experimentation, forecasting, and spend tracking underwent this migration process. We communicated with customer teams and ensured correct read query translations from Druid to ClickHouse, as well as running the above benchmarking tests and performance analysis. Throughout the process, we worked on some query optimizations (see the“ClickHouse Query Optimizations” section below) for certain scenarios where the latency was higher than desired.

ClickHouse Architecture at Lyft

Current ClickHouse Architecture at Lyft

We use Altinity’s Kubernetes Operator to deploy the ClickHouse clusters. Currently, we run the 21.7 version of ClickHouse and have plans to upgrade to the latest version. The storage which we use are co-located EBS volumes in our stateful Kubernetes cluster. We run our clusters in HA mode with general-purpose AWS M5-type compute instances with our database objects being replicated across nodes. We do not use sharding on our clusters currently but there are plans to optimize the cluster performance as we scale more.

ClickHouse data ingestion is discussed in detail below. ClickHouse read querying is served through our internal Proxy with ACLs and visualization tools such as Mode.

ClickHouse Data Ingestion

For our ClickHouse infrastructure, we handle ingestion through three separate pipelines.

  1. Kafka → ClickHouse: this is primarily used by our services which rely on a pub-sub model. ClickHouse is one of the subscribers to this data. There is native support in ClickHouse for the KafkaTableEngine, which uses a pull-based mechanism to read from Kafka cluster topics. We ingest up to 2 billion records per day into ClickHouse from Kafka-based ingestion.
  2. Kinesis → Flink → ClickHouse: this ingestion scheme populates our events data in ClickHouse. Events teams, who need their data in ClickHouse for analytics, onboard through this ingestion mechanism. Lyft generates about 600 million rows per day from Flink ingestion alone, in ClickHouse.
  3. Trino → Cron → ClickHouse: we also support batch ingestion from our offline systems through Trino. This is primarily used to export our marketplace health derived datasets for quick slice and dice in determining marketplace health.

Moving forward, we will explore merging our real-time pipelines to ingest everything through Kafka for simplified architecture and costs.

ClickHouse Query Optimizations

Sorting Key

The sorting key helps determine how data is physically organized on disk and speeds up query execution times. When data is sorted on particular column(s) frequently used in queries, the database can skip large portions of irrelevant data during query execution. For our time-series based datasets, many of the tables are sorted on event_time (or occurred_at time). This also helps with time-based range queries in the system. Along with reduced I/O due to sequential access of such data, sorting keys strongly help with query performance.

Choosing the right sorting key can easily be answered based on the type of queries that will run on those datasets.

Skip Indices

When querying data with filters where the sorting key is not defined, we risk a full scan of each column in order to apply the WHERE clause. To evaluate these non-indexed queries, we make use of Skip indices where ClickHouse uses the index file to understand which blocks of data can be skipped. At Lyft, we primarily have used minmax indices to increase our query performance.

ALTER TABLE <database>.<table> ON cluster <cluster> ADD INDEX logged_at_idx (logged_at) TYPE minmax GRANULARITY 8192

We then materialize this example index on already existing data. This allows ClickHouse to skip unnecessary data blocks and minimize I/O operations when running the queries, lowering our end-to-end latencies.

Projections

One of our customer requirements for the migration was to maintain, or lower, the latency. While most of the transpiled ClickHouse queries ran with a faster execution time, our health queries that regularly polled the latest experiment timestamps were much slower.

We utilized the power of ClickHouse projections, specifically creating and materializing the projection SELECT max(occurred_at) to pre-compute and store the latest timestamp. With this projection, only a couple thousand rows are scanned rather than the entire table, speeding up our health checks from ~20 seconds to sub-second.

ClickHouse Use Cases and Scale

Today, we serve the following use cases on ClickHouse:

  • Market health
  • Policy reporting for bikes & scooters
  • Spend tracking
  • Forecasting and market signaling
  • Experimentation
  • Campaigns

At Lyft, we ingest tens of millions of rows and execute millions of read queries in ClickHouse daily with volume continuing to increase. On a monthly basis, this means reading and writing more than 25TB of data.

Issues Managing ClickHouse

While our migration process went smoothly, some pain points arose with our updated system:

  • Query caching performance—we have sometimes seen largely variable latencies, making it hard to promise SLAs for certain workloads to customers. Using query cache with appropriate cache size and TTL helps. Initially, when the cache is getting hydrated, the query performance can vary but the latency spikes are very short-lived.
  • Kafka issues with MSK integration—we use Kafka Table Engine extensively, which is a pull-based, native supported ingestion mechanism in ClickHouse. In Kafka Table Engine, the authentication scheme used for SASL is SCRAM-SHA-256 to ingest from in-house Kafka. librdkafka is a C library used by ClickHouse for data ingestion from Kafka. While trying to ingest from Amazon Managed Kafka (AWS MSK) for a new use case, the SASL mechanism AWS_MSK_IAM (AWS’s SASL mechanism) is not supported in librdkafka (Confluent). The solution over here is to try Kafka Connect / MSK Connect, which we will tackle once we upgrade ClickHouse.
  • Ingestion pipeline resiliency—our Flink ingestion into ClickHouse is a push-based model and when ZooKeeper is doing conflict resolutions, it can mark the table as readonly, causing failures in the push model. We will explore a better push-based approach with Kafka Connect into ClickHouse and use Kafka between Flink and ClickHouse to stream the writes and store in Kafka while the Kafka Connect can batch write into ClickHouse.

What’s next for ClickHouse at Lyft?

There are three main areas for expansion when it comes to ClickHouse at Lyft:

  1. Stabilize batch ingestion architecture with streaming Kinesis ingestion through Kafka—we are working to stabilize our batch ingestion architecture to a more resilient orchestration platform, Apache Airflow, largely used at Lyft.
  2. Move Flink SQL to ClickHouse—certain Flink transformations can be directly done in destination in ClickHouse. We plan to leverage for multiple new use cases in ClickHouse.
  3. Retire ZooKeeper—we are currently using Apache ZooKeeper for state management in ClickHouse. We will soon be upgrading ClickHouse and exploring ClickHouse Keeper to reduce external component dependencies.

Acknowledgements

A huge Thank You to our stakeholders in Marketplace (Andrew Chan et. al.) and Data (Sarthak Sharma, Akash Katipally et. al.) for their support in enabling a transparent migration, and to our internal ClickHouse customers for their cooperation while we worked to streamline our sub-second infrastructure.

ClickHouse has been successful as a product for our use cases at Lyft and we are excited for what’s next for us!

If you’re interested in working on problems like this in Data, visit Lyft’s careers page to see our openings.

--

--