← Back to all posts

PipelineDB 0.9.0 - Continuous Transforms, Streaming Topologies, and PostgreSQL 9.5 Compatibility


PipelineDB 0.9.0 is here, download it now!

This is a big release that we're very excited to announce today. In addition to various bug fixes and performance improvements, PipelineDB 0.9.0 adds a new much-demanded core abstraction called the continuous transform, a couple of new user-requested aggregates, and full compatibility with PostgreSQL 9.5.

Let's take a look at how all of this stuff can make your life easier.

Continuous Transforms

Continuous transforms are a generalized abstraction of the continuous view. Transforms make it possible to define arbitrary non-aggregate SQL queries that continuously run on a combination of input streams and tables. While quite similar to continuous views, the key difference between transforms and views is that instead of continuously updating an underlying table with the query output, transforms simply call an arbitrary function on any rows that the transform query produces. In fact, internally, continuous views are effectively a continuous transform that happens to write to an underlying table.

Since continuous transforms can write their output to another stream, they make it easy and efficient to share work between continuous views, normalize or format raw input data in one place, and ultimately link continuous views and transforms together to achieve purely SQL-defined stream-processing topologies.

Let's walk through a couple of examples of continuous transforms in action.

Deduplicating Stream-table Joins

This first example will demonstrate how continuous transforms can easily deduplicate a commonly redundant computation: joining streaming rows on tables. Imagine an input stream named raw_user_events whose columns are user_id, action_type, and user_agent. Now suppose we have a static relation called users whose columns are user_id, and user_name, and that in our continuous views that consume this stream we are only interested in the user_name associated with a given user_id.

Instead of doing a stream-table join between raw_user_events and users for each continuous view, we can do a single join with a continuous transform, and write its output to a new stream named normalized_user_events. Now our continuous views must only read from normalized_user_events, and the potentially expensive join only happens a single time:

CREATE CONTINUOUS TRANSFORM normalize_events AS
    SELECT users.user_name, raw.action_type, raw.user_agent
    FROM raw_user_events raw JOIN users ON raw.user_id = users.user_id
    THEN EXECUTE PROCEDURE pipeline_stream_insert('normalized_user_events');

-- Read the output of the transform
CREATE CONTINUOUS VIEW v0 AS SELECT COUNT(DISTINCT user_name) FROM normalized_user_events;
CREATE CONTINUOUS VIEW v1 AS SELECT user_name, COUNT(*) FROM normalized_user_events
    GROUP BY user_name;

Without transforms, each of these continuous views would require an identical join clause, which would obviously become inefficient with many similar views.

The THEN EXECUTE PROCEDURE clause works identically to PostgreSQL's trigger procedures, and pipeline_stream_insert is a built-in function that writes the row it receives to a stream of the given name. Internally, transform procedures have access to the row produced by the transform's query, along with any explicit arguments that it is passed, of course.

Consolidating Input Data Normalization

Raw data normalization is another operation that is often duplicated across continuous views. In this example, we'll describe how continuous transforms can consolidate data normalization and provide any number of continuous views a single, normalized stream to read from.

Suppose we have a JSON-encoded stream with events containing arrays of values (e.g. {"key": [0, 1, 2]}), and that we want to process all of these values as a single stream. json_array_elements, which takes a JSON array as its input and outputs a row for each element of the array, will do this for us. But instead of unpacking the array in each of our continuous views, we can use a continuous transform to normalize each event and write its output to a new stream:

CREATE CONTINUOUS TRANSFORM unpack_json AS
    SELECT json_array_elements(payload::json) AS value FROM raw_stream
    THEN EXECUTE PROCEDURE pipeline_stream_insert('normalized_stream');

-- Read the output of the transform
CREATE CONTINUOUS VIEW v0 AS SELECT sum(value) FROM normalized_stream;
CREATE CONTINUOUS VIEW v1 WITH (max_age = '1 hour') AS SELECT avg(value) normalized_stream;
    GROUP BY user_name;

Each continuous view reading from normalized_stream will now see a stream of individual values without having to know anything about the original input format.

New Aggregates

PipelineDB 0.9.0 also adds a couple of aggregates that were requested by our users:

json_object_int_sum

This aggregate takes as its input a JSON object having integer values at each of its keys, and sums them into a new object. For example:

CREATE CONTINUOUS VIEW v0 AS SELECT json_object_int_sum(payload::json) FROM stream;
INSERT INTO stream (payload) VALUES ('{"k0": 1, "k1": 2}');
INSERT INTO stream (payload) VALUES ('{"k0": 1}');

SELECT * FROM v0;

 json_object_int_sum
----------------------
 { "k0": 2, "k1": 2 }
(1 row)

json_object_int_sum can be very useful for aggregating events having sparse subsets of columns and is much faster than having a large number of sum aggregates.

first_values ordered-set aggregate

first_values is an ordered-set aggregate that will maintain an array of the first n values of a stream as if the entire stream were sorted by the given ORDER BY expression:

CREATE CONTINUOUS VIEW v0 AS SELECT first_values(3)
    WITHIN GROUP (ORDER BY x::integer) FROM stream;

INSERT INTO stream (x) SELECT generate_series(1, 100000) AS x;
SELECT * FROM v0;

 first_values
--------------
 {1,2,3}

INSERT INTO stream (x) VALUES (-1);
SELECT * FROM v0;

 first_values
--------------
 {-1,2,3}

(1 row)

PostgreSQL 9.5 Compatibility

Last but not least, PipelineDB 0.9.0 is fully compatible with PostgreSQL 9.5. Of particular relevance to PipelineDB are the addition of grouping sets, JSON improvements, vacuuming improvements, and general increases in system performance.

Thanks!

Finally, we'd like to thank our amazing community of users, who consistently give us invaluable feedback and are the most important aspect of our continual effort to deliver and support a world-class database product. If you're not already using PipelineDB, try it out and let us know what you think :)

Thank you!