June 30, 2022

Qbeast

A few days ago I ran into Qbeast which is an open-source project on top of delta lake I needed to dig into.

This introductory post explains it quite well: https://qbeast.io/qbeast-format-enhanced-data-lakehouse/

The project is quite good and it seems helpful if you need to write your custom data source as everything is documented. And well as I’m in love with note-taking I want to dig into the following three topics:

  1. Explaining how the format works (including optimizations)
  2. Describing how the sampling push is implementing
  3. Understanding the table tolerance

1. Qbeast format

This would be better explained with diagrams. Remember delta lake? We had a _delta_log folder with files pointing to files. Now Qbeast has extended this delta_log and has added some new properties.

Delta Log & QBeast:

_ d e l t a _ l o g / f f i i l l e e _ _ 0 0 0 0 1 2 . . j j s s o o n n a r d e d m o s v o e m e s . o p m a e r . q p u a e r t q u e t

And some samples of the qbeast added metadata:

    "tags": {
      "state": "FLOODED",
      "cube": "gw",
      "revision": "1",
      "elementCount": "10836",
      "minWeight": "-1253864150",
      "maxWeight": "1254740128"
    }

Within the tags we found “cubes” which are nodes of their tree. Qbeast aims to index the data in an n dimension (that’s why they call the nodes cubes).

0 1 0 0 1 0 0 2 0 2 0

Qbeats provides an analyze/optimize operation similar to the delta lake optimize command. But the latter performs a z-order inside this level and here the cubes are replicated and maintained with the index.

It took me a bit to understand how its works:

  1. We identify the cubes from the last revision of the table. With that, we get also the replicate Set of cubes. (https://github.com/Qbeast-io/qbeast-spark/blob/bb080839e03d5cebd75dceefa2e0f5c0e824e06c/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala)

  2. We now can calculate the “cubes to optimize”, which should be new cubes added (announced) but not those who are already in a REPLICATED state.

  3. With the proper cubes selected we need to optimize the index and then use the writer to generate the proper blocks. There is a low-level writer called BlockWriter https://github.com/Qbeast-io/qbeast-spark/blob/b89adac2b53ed08ebffb7868d50043dde7071ee3/src/main/scala/io/qbeast/spark/index/writer/BlockWriter.scala

All this will be stored as another delta log transaction. The reading and writing protocol are quite interesting too.

2. Sampling push

To optimize some sampling, you need to enter in the optimize rules, that’s done by adding an extension:

class QbeastSparkSessionExtension extends DeltaSparkSessionExtension {

  override def apply(extensions: SparkSessionExtensions): Unit = {

    super.apply(extensions)

    extensions.injectOptimizerRule { session =>
      new SampleRule(session)
    }

    extensions.injectOptimizerRule { session =>
      new ReplaceFileIndex(session)
    }
  }

}

Within that “SampleRule” we find a parse in the logical plan:

  override def apply(plan: LogicalPlan): LogicalPlan = {
    plan transformDown { case s @ Sample(_, _, false, _, child) =>
      child match {
        case QbeastRelation(l, q) => transformSampleToFilter(s, l, q)

        case Project(_, Filter(_, QbeastRelation(l, q))) =>
          transformSampleToFilter(s, l, q)

        case Filter(_, QbeastRelation(l, q)) =>
          transformSampleToFilter(s, l, q)

        case Project(_, QbeastRelation(l, q)) =>
          transformSampleToFilter(s, l, q)

        case _ => s
      }

    }

Where the operation extracts for each revision of the qbeast the proper murmurhash of the columns and allows the reader to be based on metadata statistics.

(It reminds me a lot of the new data skipping https://github.com/delta-io/delta/blob/3f3be4663f263b465f0e26bf822bac17b09e7a6d/core/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala)

3. Table tolerance

Sadly the tolerance implementation has not been published yet: https://github.com/Qbeast-io/qbeast-spark/blob/15667c27bb2cc6d76cecd680d61e22fa8f571d49/src/main/scala/io/qbeast/spark/implicits/package.scala#L46

I’m super grateful for being able to dig into open source code, I think that the team after Qbeast did a pretty good job and I hope I can contribute in a near future. Btw, here is a show-case video: https://www.youtube.com/watch?v=Rxi-bHVOybs

2017-2024 Adrián Abreu powered by Hugo and Kiss Theme