Data Cookbook Kitchen

Asynchronous inserts and buffer tables

ClickHouse is known for being able to handle large amounts of data that are inserted in near real time. However, each insertion creates at least one new data part, which leads to frequent writes and requires a lot of background merges. Therefore, larger batches of at least 1000 rows are recommended.

But what if you cannot or don’t want to batch up data in your client application? Well, you can leave the job to Clickhouse by using asynchronous insert. There is a blog by ClickHouse that discusses asynchronous inserts in great detail. In that case, ClickHouse maintains an internal buffer for each insert query shape, and flushes data to the target table when one of these condition apply:

  1. the buffer reaches a specified size (async_insert_max_data_size)
  2. a time threshold elapses (async_insert_busy_timeout_(min|max)_ms) or
  3. a maximum number of insert queries accumulate (async_insert_max_query_number).

This creates a tradeoff of insert efficiency vs. low latency. But the main issue is that data thusly inserted can only be queries after it has been flushed to the table. It would be nice to be able to query the buffer already while it is being filled.

It turns out there is an older and slightly more quirky construct in ClickHouse that does just that: the Buffer table engine!

The pros and cons of Buffer tables are discussed in the blog about asynchronous inserts and also in the documentation. Some disadvantages include:

  • In ClickHouse open source (OS), a Buffer table needs to be created on each node explicitly. Buffer tables are neither distributed nor replicated, one instance does not know about the other. Any insert would go to a random instance.
  • FINAL and SAMPLE do not work correctly for Buffer tables. These conditions are passed to the destination table but are not used for processing data in the buffer.
  • If the destination table is replicated, some expected characteristics of replicated tables are lost when writing to a Buffer table. The random changes to the order of rows and sizes of data parts cause data deduplication to quit working, which means it is not possible to have a reliable ‘exactly once’ write to replicated tables.
  • Data in the buffer table would be lost in a node crash.
  • To query all the memory buffers in a cluster, you would have to use clusterAllReplicas and … then what? We’ll come to that.

But there is one advantage: By querying the Buffer table directly, you effectively get the UNION of whatever is in the buffer and all the data that has been persisted. This enables, at least in theory, lower latency use cases without hurting performance!

But how do buffer tables behave in ClickHouse Cloud? With the SharedMergeTree table engine, data is virtually replicated across all cluster nodes. How does that change the behavior, and what do we have to do in order to set up and query a Buffer table correctly? The documentation is a bit terse here, so let’s find out!

Naïve approach

First, let’s create ourselves an underlying table and also a Buffer table on top. We configure the Buffer to be flushed every 10 seconds:

DROP DATABASE IF EXISTS db_buf;
CREATE DATABASE db_buf;

CREATE TABLE db_buf.base (
    id Int64,
    val String
)
ENGINE = MergeTree
ORDER BY id;

CREATE TABLE db_buf.buf_base (
    id Int64,
    val String
)
ENGINE = Buffer(
    'db_buf',
    'base',
    1, -- buckets
    10, 10, -- seconds
    1, 100000, -- rows
    1, 100000000 -- bytes
);

A few checks for existence reveal that the Buffer table has been created on all nodes. So the table structure has been replicated automatically.

Insert some random data:

CREATE TABLE db_buf.gen_str (val String)
ENGINE = GenerateRandom(1, 5);

INSERT INTO db_buf.buf_base
SELECT 1 AS id, val 
FROM db_buf.gen_str
LIMIT 1;

INSERT INTO db_buf.buf_base
SELECT 2 AS id, val 
FROM db_buf.gen_str
LIMIT 1;

INSERT INTO db_buf.buf_base
SELECT 3 AS id, val 
FROM db_buf.gen_str
LIMIT 1;

(If you are using the cloud console, run each command separately so as to get a fresh connection each time.)

Now, run a basic query quickly:

SELECT hostName(), *, is_flushed FROM clusterAllReplicas(default, db_buf.buf_base);

The result shows that each row exists only on one node:

xxx-0cnzjug-0	2	fSRH	false
xxx-0cnzjug-0	3	fSRH	false
xxx-gwn9owd-0	1	fSRH	false

Wait 10 seconds and run the same query again

xxx-0cnzjug-0	1	fSRH	true
xxx-0cnzjug-0	2	fSRH	true
xxx-0cnzjug-0	3	fSRH	true
xxx-gwn9owd-0	2	fSRH	true
xxx-gwn9owd-0	3	fSRH	true
xxx-gwn9owd-0	1	fSRH	true
xxx-usfhh7d-0	1	fSRH	true
xxx-usfhh7d-0	2	fSRH	true
xxx-usfhh7d-0	3	fSRH	true

Now there are nine rows: three on each server!

Here’s the problem: the data in the memory buffers exists only on the one node where it happened to be inserted, but the SharedMergeTree underlying table is virtually replicated on every node. So we find ourselves in a conundrum: to get all the rows in the buffer we need to use clusterAllReplicas(); but to get each one of the persisted rows only once, we cannot use clusterAllReplicas().

How to resolve?

If only we could tell which rows are still in the buffer, and which ones have been persisted …

Materialized Columns

MATERIALIZED columns are similar to DEFAULT columns in that you can prepopulate them with a value that is computed during insertion. Unlike DEFAULT columns, you cannot override this computation by specifying an explicit value. Also, MATERIALIZED columns do not participate in a SELECT *: you can query them but you have to mention them explicitly in the projection clause of a query.

This offers an elegant solution to our problem: Let’s add a MATERIALIZED column to both the underlying and buffer tables but specify different values! Like this:

CREATE TABLE db_buf.base (
    id Int64,
    val String,
    is_flushed Boolean MATERIALIZED True
)
ENGINE = MergeTree
ORDER BY id;

CREATE TABLE db_buf.buf_base (
    id Int64,
    val String,
    is_flushed Boolean MATERIALIZED False
)
ENGINE = Buffer(
    'db_buf',
    'base',
    1, -- buckets
    10, 10, -- seconds
    1, 100000, -- rows
    1, 100000000 -- bytes
);

Now we can easily see whether a row comes from the buffer or from disk!

Querying

With this, we can find the rows in the buffer easily:

SELECT * FROM clusterAllReplicas(default, db_buf.buf_base) WHERE is_flushed = false 

and to get the entire table, UNION it with the underlying table:

SELECT * FROM clusterAllReplicas(default, db_buf.buf_base) WHERE is_flushed = false 
UNION ALL 
SELECT * FROM db_buf.base;

You can even package this nicely in a view:

CREATE VIEW db_buf.v_base AS
SELECT * FROM clusterAllReplicas(default, db_buf.buf_base) WHERE is_flushed = false 
UNION ALL 
SELECT * FROM db_buf.base;

The only issue is that the clusterAllReplicas part will still do a full scan on each node! We know the invariant that is_flushed is True for all the persisted data, but ClickHouse doesn’t.

(For this EXPLAIN, I ran the insert statements again with 100 million rows each.)

EXPLAIN indexes = 1
SELECT *
FROM clusterAllReplicas(default, db_buf.buf_base)
WHERE is_flushed = false;
    ┌─explain────────────────────────────────────────────────────────────┐
 1. │ Union                                                              │
 2. │   Union                                                            │
 3. │     Expression ((Project names + Projection))                      │
 4. │       Expression                                                   │
 5. │         ReadFromMergeTree (db_buf.base)                            │
 6. │         Indexes:                                                   │
 7. │           PrimaryKey                                               │
 8. │             Condition: true                                        │
 9. │             Parts: 5/5                                             │
10. │             Granules: 36690/36690                                  │
11. │     Expression ((Project names + Projection))                      │
12. │       Filter (( + Change column names to column identifiers))      │
13. │         ReadFromPreparedSource (Read from buffers of Buffer table) │
14. │   ReadFromRemote (Read from remote replica)                        │
    └────────────────────────────────────────────────────────────────────┘

Can we improve this?

Making it faster

The is_flushed column can take only 2 values, so let’s put a set index on it. It can have a fairly coarse granularity because we know that in the persisted data this column’s value will always be True:

ALTER TABLE db_buf.base ADD INDEX ind_is_flushed is_flushed TYPE Set(2) GRANULARITY 8;
ALTER TABLE db_buf.base MATERIALIZE INDEX ind_is_flushed;

Now run the same explain statement again:

EXPLAIN indexes = 1
SELECT *
FROM clusterAllReplicas(default, db_buf.buf_base)
WHERE is_flushed = false;
    ┌─explain────────────────────────────────────────────────────────────┐
 1. │ Union                                                              │
 2. │   Union                                                            │
 3. │     Expression ((Project names + Projection))                      │
 4. │       Expression                                                   │
 5. │         ReadFromMergeTree (db_buf.base)                            │
 6. │         Indexes:                                                   │
 7. │           PrimaryKey                                               │
 8. │             Condition: true                                        │
 9. │             Parts: 5/5                                             │
10. │             Granules: 36690/36690                                  │
11. │           Skip                                                     │
12. │             Name: ind_is_flushed                                   │
13. │             Description: set GRANULARITY 8                         │
14. │             Parts: 0/5                                             │
15. │             Granules: 0/36690                                      │
16. │     Expression ((Project names + Projection))                      │
17. │       Filter (( + Change column names to column identifiers))      │
18. │         ReadFromPreparedSource (Read from buffers of Buffer table) │
19. │   ReadFromRemote (Read from remote replica)                        │
    └────────────────────────────────────────────────────────────────────┘

Note how the Skip index makes ClickHouse bypass the entire table scan!

Further considerations

Some points that were suggested by colleagues:

  • There are no guarantees on data consistency, durability or query performance.
  • What happens when the clusterAllReplicas will reach a node where the Buffer table was just flushed but the new part is not yet in the part set of the table on the node initiating the query? There are probably non transaction guarantees.
  • You will need skip_unavailable_shards for queries to work during rollouts and scaling. The query will depend on the performance and availability of the slowest node. And if a node doesn’t respond the query will fail.

Conclusion

  • Buffer tables can enable low latency insertion and querying.
  • In ClickHouse Cloud, a Buffer table is created on each node automatically, but the instances are independent.
  • You can work around this with clusterAllReplicasbut then you need to deduplicate the persisted data.
  • MATERIALIZED columns can help distinguish buffered and flushed data.
  • A skip index can help checking the invariants faster.
  • And finally: Don’t try this with critical production data.

This image is taken from Page 377 of Praktisches Kochbuch für die gewöhnliche und feinere Küche” by Medical Heritage Library, Inc. is licensed under CC BY-NC-SA 2.0 .