← Back to all posts

SQL on Kafka


Originally developed at LinkedIn and open sourced in 2011, Kafka is a generic, JVM-based pub-sub service that is becoming the de-facto standard messaging bus upon which organizations are building their real-time and stream-processing infrastructure. I tend to think of Kafka as the stream-processing analog to what HDFS has been to batch processing. Just as many transformative technologies have been built on top of HDFS (such as Hadoop's MapReduce), Kafka is (and will increasingly become) an integral component of stream-processing technology in general.

It is not surprising, then, that a large portion of PipelineDB's users are using Kafka in some way. And without a doubt, the most requested piece of functionality our users have asked us for is the ability to seamlessly consume data from Kafka into PipelineDB streams, making it possible to run continuous SQL queries on Kafka messages in real time as they're produced.

Well it's finally here. Our latest release, PipelineDB 0.8.1, ships with Kafka ingestion support in the form of an extension, and in this post we'll show you how it works. There should be enough detail here for you to follow along and actually run everything we show you on your own machine, but that's certainly not necessary.

Setup

In this post we're going to run continuous SQL queries on data pushed into Kafka from nginx, so you'll want to install a few tools before diving in (if you'd prefer to just read along, go ahead and skip this part):

  • Kafka - It only takes a few seconds to get Kafka up and running by following the short quickstart
  • nginx - A widely used HTTP server that we'll use to ultimately log url request metadata into Kafka
  • kafkacat - A tool that can tail a file (among other things) and write new records into Kafka as messages
  • siege - A tool that can concurrently make HTTP requests to random urls, useful for stress testing and simulating HTTP traffic (apt-get install siege or brew install siege)

The Producer

Kafka's highest-level abstractions are producers, consumers, and topics. Producers write messages to topics, which consumers read from. A topic is exactly what it sounds like: the context of a message.
PipelineDB's Kafka support currently focuses on consumption but we'd still like to show you a reasonably tangible and realistic way to produce and consume messages, so we're going to set a few things up to get started.

First we'll want to start a local Kafka server. From the root directory of your Kafka installation, run:

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

Our producer will write JSON-serialized nginx logs to Kafka. We'll use a minimal configuration file for our nginx server:

cat <<EOF > nginx.conf
worker_processes 4;
pid $PWD/nginx.pid;
events {}
http {

    log_format json 
    '{'
        '"ts": "\$time_iso8601", '
        '"user_agent": "\$http_user_agent", '
        '"url": "\$request_uri", '
        '"latency": "\$request_time",  '
        '"user": "\$arg_user"'
    '}';

    access_log $PWD/access.log json;
    error_log $PWD/error.log;

    server {
        location ~ ^/ {
            return 200;
        }
    }
}
EOF

As you can see, we've configured a relatively dumb HTTP server. It will return a 200 response for any url request that it receives, and log a JSON record to a file named access.log containing a small amount of information about the request. Let's start serving requests:

nginx -c $PWD/nginx.conf

To get a better idea of our message format, here's the output of tailing access.log for a few requests:

{"ts": "2015-09-14T10:30:21-07:00", "status": "200", "request_method": "GET", "user_agent": "Mozilla/5.0 (pc-x86_64-linux-gnu) Siege/3.0.5", "url": "/page38/path7?user=24746", "latency": "0.001",  "user": "24746"}
{"ts": "2015-09-14T10:30:21-07:00", "status": "200", "request_method": "GET", "user_agent": "Mozilla/5.0 (pc-x86_64-linux-gnu) Siege/3.0.5", "url": "/page66/path7?user=8846", "latency": "0.001",  "user": "8846"}
{"ts": "2015-09-14T10:30:21-07:00", "status": "200", "request_method": "GET", "user_agent": "Mozilla/5.0 (pc-x86_64-linux-gnu) Siege/3.0.5", "url": "/page33/path3?user=6006", "latency": "0.001",  "user": "6006"}
{"ts": "2015-09-14T10:30:21-07:00", "status": "200", "request_method": "GET", "user_agent": "Mozilla/5.0 (pc-x86_64-linux-gnu) Siege/3.0.5", "url": "/page85/path2?user=28043", "latency": "0.000",  "user": "28043"}

Now we just need something that will tail access.log and transform any new lines to Kafka messages. For this, we'll use kafkacat:

tail -f access.log | kafkacat -b localhost:9092 -t logs_topic

Here, -b points kafkacat to our local server (b is for broker), and -t specifies which topic the producer should write the nginx JSON messages to. At this point, visiting http://localhost?user=me with a browser should produce a Kafka message that you can immediately consume:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic logs_topic --from-beginning

But that's not very interesting. Time to put PipelineDB to work!

pipeline_kafka

All of the latest binary distributions of PipelineDB ship with the pipeline_kafka extension, although it must be explicitly loaded in order to use. To enable pipeline_kafka, open up a psql shell and simply run:

CREATE EXTENSION pipeline_kafka;

pipeline_kafka also needs to know about at least one Kafka server to connect to, so let's make it aware of our local server:

SELECT pipeline_kafka.add_broker('localhost:9092');

The PipelineDB analog to a Kafka topic is a stream, and we'll need to create a stream that maps to a Kafka topic. While PipelineDB supports schema inference, streams that are mapped to Kafka must be static. Since we're going to be reading JSON records, our stream can be very simple:

CREATE STREAM logs_stream (payload json);

We'll extract all of the individual fields using standard PostgreSQL JSON operators, which eliminates the need to predetermine any sort of schema. Now that we have pipeline_kafka installed and a stream we can map to a topic, we just need a continuous view that will actually read from logs_stream. For now, let's use a rudimentary one to verify that everything is working properly:

CREATE CONTINUOUS VIEW message_count AS SELECT COUNT(*) FROM logs_stream;

Once pipeline_kafka is told to begin consuming messages into a stream, selecting from this continuous view will reflect the total number of times that http://localhost?user=me has been visited up until the moment the select is executed:

SELECT pipeline_kafka.consume_begin('logs_topic', 'logs_stream', format := 'json');
 consume_begin 
---------------
 success
(1 row)

SELECT * FROM message_count;
 count 
--------
  24
(1 row)

SELECT * FROM message_count;
 count
--------
  36
 success
(1 row)

But that's not very interesting. Now that our end-to-end setup is working, we can dive a little deeper with more complex queries.

Continuous Views on Kafka

The following views will run continuous SQL queries that compute various statistics and results about the nginx logs entries being produced. For each url, the below continuous view computes the following over a sliding one-day window:

  • 99th-percentile request latency
  • Number of unique visitors
  • Total visits
/* 
 * This function will strip away any query parameters from each url,
 * as we're not interested in them.
 */
CREATE FUNCTION url(raw text, regex text DEFAULT '\?.*', replace text DEFAULT '')
    RETURNS text
AS 'textregexreplace_noopt'
LANGUAGE internal;

CREATE CONTINUOUS VIEW url_stats AS
    SELECT
        url, 
    percentile_cont(0.99) WITHIN GROUP (ORDER BY latency_ms) AS p99,
        count(DISTINCT user) AS uniques,
    count(*) total_visits
  FROM
    (SELECT 
        url(payload->>'url'),
        payload->>'user' AS user,
        (payload->>'latency')::float * 1000 AS latency_ms,
        arrival_timestamp
    FROM logs_stream) AS unpacked
WHERE arrival_timestamp > clock_timestamp() - interval '1 day'
 GROUP BY url;

And the next continuous view will compute the following daily user-level metrics:

  • Number of landing pages visisted
  • Number of conversions
  • Number of unique urls visited
CREATE CONTINUOUS VIEW user_stats AS
    SELECT
        day(arrival_timestamp),
        payload->>'user' AS user,
        sum(CASE WHEN payload->>'url' LIKE '%landing_page%' THEN 1 ELSE 0 END) AS landings,
        sum(CASE WHEN payload->>'url' LIKE '%conversion%' THEN 1 ELSE 0 END) AS conversions,
        count(DISTINCT url(payload->>'url')) AS unique_urls,
        count(*) AS total_visits
    FROM logs_stream GROUP BY payload->>'user', day;

The siege tool can be used to simulate a large number of requests to nginx. Here we generate a list of urls that siege will randomly select from, which yields a workload having 1,000 unique urls, randomly visited by 100,000 unique users.

for x in {0..1000000}; do echo "http://localhost/page$((RANDOM % 100))/path$((RANDOM % 10))?user=$((RANDOM % 100000))" >> urls.txt; done

Now run siege:

siege -c32 -b -d0 -f urls.txt

After generating a few million requests, let's run some further analysis by querying each of our continuous views:

-- What are the top-10 most visited urls?
SELECT url, total_visits FROM url_stats ORDER BY total_visits DESC limit 10;
      url      | total_visits 
---------------+--------------
 /page62/path4 |        10182
 /page51/path4 |        10181
 /page24/path5 |        10180
 /page93/path3 |        10180
 /page81/path0 |        10180
 /page2/path5  |        10180
 /page75/path2 |        10179
 /page28/path3 |        10179
 /page40/path2 |        10178
 /page74/path0 |        10176
(10 rows)


-- What is the 99th percentile latency across all urls?
SELECT combine(p99) FROM url_stats;
     combine      
------------------
 6.95410494731137
(1 row)

-- What is the average conversion rate each day for the last month?
SELECT day, avg(conversions / landings) FROM user_stats GROUP BY day;
          day           |            avg             
------------------------+----------------------------
 2015-09-15 00:00:00-07 | 1.7455000000000000000000000
(1 row)

-- How many unique urls were visited each day for the last week?
SELECT day, combine(unique_urls) FROM user_stats WHERE day > now() - interval '1 week' GROUP BY day;
          day           | combine 
------------------------+---------
 2015-09-15 00:00:00-07 |  100000
(1 row)

-- Is there a relationship between the number of unique urls visited and the highest conversion rates?
SELECT unique_urls, sum(conversions) / sum(landings) AS conversion_rate FROM user_stats
    GROUP BY unique_urls ORDER BY conversion_rate DESC LIMIT 10;
 unique_urls |  conversion_rate  
-------------+-------------------
          41 |  2.67121005785842
          36 |  2.02713894173361
          34 |  2.02034637010851
          31 |  2.01958418072859
          27 |  2.00045348712296
          24 |  1.99714899522942
          19 |  1.99438839453606
          16 |  1.98083502184886
          15 |  1.87983011139079
          14 |  1.84906254929873
(1 row)

To stop consuming Kafka from logs_topic into logs_stream, run:

SELECT pipeline_kafka.consume_end('logs_topic', 'logs_stream');
 consume_end
-------------
 success
(1 row)

One of the nice things about how pipeline_kafka works is that it continuously saves its position (Kafka calls this an offset) in a given Kafka topic durably in the database. So when you start consuming again, only unread Kafka messages will be read into a stream. This is also important in the event of a system failure or PipelineDB shutdown - pipeline_kafka will simply begin reading from wherever it last stopped.

Conclusion

We hope that being able to run pure SQL queries on Kafka topics will make your job easier and less complex. Because of Kafka's popularity across our user base, PipelineDB's integration with it will continue to be enriched significantly over time. If you'd like to learn more about pipeline_kafka's capabilities that were not discussed here, such as parallelism and other input formats, head over to the docs. And of course, if any of this sounds like something you'd be thrilled to work on full-time, we're always hiring!