Very much real time by Apache PINOT

source: https://docs.pinot.apache.org/

When we talk about real time metrics, underneath its a batch or specifically its micro batches by which our queries are run and returns the result.

As and how the data size increases, the time it takes for processing the data will also differ.

Now the need for realtime has evolved so much that we need to see the metrics in a gap of micro seconds and take actions based on that.

To help us solve this problem, LINKEDIN and UBER and partnered to create Apache PINOT.

Quick fact

The Apache Software Foundation Announces Apache® Pinot™ as a Top-Level Project.

What is Pinot in a nutshell

Apache Pinot is a distributed datastore which gives ultra low latency and very high throughput realtime data.

Features:

  1. Highly Available
  2. Scalable
  3. Cost effective
  4. As fresh as it can be
  5. Millisecond latency

Source: https://github.com/apache/pinot

Apache Pinot is a real-time distributed OLAP datastore, built to deliver scalable real-time analytics with low latency. It can ingest from batch data sources (such as Hadoop HDFS, Amazon S3, Azure ADLS, Google Cloud Storage) as well as stream data sources (such as Apache Kafka).

Pinot was built by engineers at LinkedIn and Uber and is designed to scale up and out with no upper bound. Performance always remains constant based on the size of your cluster and an expected query per second (QPS) threshold.

For getting started guides, deployment recipes, tutorials, and more, please visit our project documentation at https://docs.pinot.apache.org.

We will be talking about the architecture but on a high level here is how the data will be stored as columnar.

  1. Apache Pinot stores the data in a columnar format
  2. It can ingest data from different stream sources like kafka, druid etc and as well as the batch data like s3, hdfs, json etc.
  3. It supports wide variety of indexes to help your read queries.
  4. You can segment or isolate your tables or queries by the help of tagging server.
  5. It uses Zookeeper for highly availability and Apache Helix for state management.
  6. Cool thing is we can ingest realtime and offline data to a 1 table where pinot takes care of merging them.

Architecture

INGEST

We can load the data by

  1. Online like Kafka, Amazon Kinesis
  2. Offline like ADLS, S3 etc

Note: Offline data requires a bit of a different mechanism to load. We may can make use of Spark or Druid to ingest to Pinot segment store or we may use Native pinot process.

Core Layer

We have a Controller which takes the data distributes them to bunch of servers. Controller decides where to store what, how to replicate. managed by zoo keeper.

Presentation

broker is used to support user queries and will distribute the queries by talking o the controller.

Secret Sauce: Segments

Pinot has the concept of table, which is a logical abstraction to refer to a collection of related data. Pinot has a distributed architecture and scales horizontally. Pinot expects the size of a table to grow infinitely over time. In order to achieve this, the entire data needs to be distributed across multiple nodes. Pinot achieve this by breaking the data into smaller chunks known as segment (this is similar to shards/partitions in relational databases). Segments can also be seen as time based partitions.

https://docs.pinot.apache.org/basics/components/segment

Below are the different types of indexes it supports.

  1. Forward Index
  • Dictionary-encoded forward index with bit compression
  • Raw value forward index
  • Sorted forward index with run-length encoding

2. Inverted Index

  • Bitmap inverted index
  • Sorted inverted index

3. Star-tree Index

4. Bloom Filter

5. Range Index

6. Text Index

7. Geospatial

9. JSON Index

Ingestion Architecture

  1. Brokers accepts queries from the clients and forwards them to the servers
  2. receive results back from the servers and merge them and forward to the client

How does the data gets into the server

Pinot stores the data as segments. Each segments is data packed in columnar format along with dictionaries and indexes.

In case of realtime: the data is consumed directly from kafka into the real time servers and periodically pushed to the segment store.

In case of offline: Data at rest will be ingested to the segment store from a native job or spark or flink.

Once ingested, the controller will be notified of the segment and controller will instruct the offline servers where to look for that data.

Controller manages all the components of the cluster by the help of Apache Helix. and apache zoo keeper is used for metadata store.

Core Concepts

Servers

Servers or the group of servers are used to process the data, either from the realtime sources or offline stores like s3.

Controller

We have a Controller which takes the data distributes them to bunch of servers. Controller decides where to store what, how to replicate. managed by zoo keeper.

Tenants

A Tenant is a logical component defined as a group of servers with the same tag.

In the table config we need to define broker and server tenant name and our queries will hit the respective group as defined in the below image.

Note: we can still have a shared Broker but isolate the queries from the server level by defining tenants for only server groups.

Offline Segments

  1. We define the schema with table name and server tenant
  2. This table config gets stored in the zoo keeper.
  3. We start our ingestion job and generate a segment
  4. Ingestion job will push this data to deeps store.
  5. Notify the controller about the new segment
  6. Controller will gather the series of info:

table config: server tag

Fetch the list of server belongs to that server tenant

current mapping: previous segments if any already assigned

7. Controller will decide where to store these segments which has the least number of segments including replica.

8. Controller will update the mapping in the zoo keeper about the servers where it will store the data. This is called as ideal state.

9. Helix will listen to this state and inform the servers. By which the servers will download the segments from segment store to the local server.

10. once the segment is updated, helix will update the mapping external view. This view defines the actual state of the cluster.

Helix is responsible to keep the Ideal state in sync with the External view.

11. Once the external view is updated, the segment is ready to serve the queries.

Realtime segments

  1. Broker routes queries to the realtime servers
  2. realtime server of the pinot segment it holds. Also these servers consuming realtime data from streams.
  3. Data is indexed as it is consumed and kept in memory. This is reffered as consuming segments. this will also be used to serving query
  4. Periodically the consuming segments are confverted to completed pinot segments. This will flush to the segment store.
  5. Controller coordinates all these activities across all the components using Helix and Zokeeper.

Kafka Realtime stream

  1. We define the table schema whic hgets stored in the zoo keeper which consist of topic to read, the number of topics etc by controller.
  2. Controller will ask the server list in the given tenant
  3. Controlelr will fetch the number of partitions from the kafka streams
  4. controller will define the mapping of partition and the servers including replica. Eg. partition A: Server:1 and 2 based on the replica count.
  5. Controller will also define the earliest offset of the partition from where to read.
  6. Helix will see the new entry and instructs the server to start consuming.
  7. Servers will directly consume from the stream for the partitions its assigned to.
  8. Periodically, the consuming segments will be flushed based on time or the number of rows ingested by the consuming segments.
  9. Server will let the controller know the offset it reached.
  10. Controller will inform the server to flush and the server will convert it to completed segment and stores in the segment store. This will also build indexes.
  11. Once the flush is completed, the controller will update the state mapping table with state from “CONSUMING” to “ONLINE” which means its ready to serve queries. With that it stores the offset until when its stored.
  12. Now since we have written only 1 copy, the other copy will get notified and they given a chance to catch up. if they succeed they build their own local copy, if not the segment will be downloaded to the server from the segment store.
  13. We make a new entry in the state table for the same partition and the start offset to be as the last offset number which we have committed.
  14. helix will do the same process again.

Eg. Partition stores the data in SERVER 1 and 2. And the state will be CONSUMING.

Once the Flush is completed the state will be changed to ONLINE and writes the end offset.

Creates another row for the same partition and server to start form the completed offset, ere in our example it starts from 51.

Sources and References:

Ajith Shetty

BigData Engineer — Love for Bigdata, Analytics, Cloud and Infrastructure.

Subscribe✉️ ||More blogs📝||Linked In📊||Profile Page📚||Git Repo👓

Subscribe to my: Weekly Newsletter Just Enough Data

Bigdata Engineer — Love for BigData, Analytics, Cloud and Infrastructure. Want to talk more? Ping me in Linked In: https://www.linkedin.com/in/ajshetty28/