← Back to all posts

Continuous SQL Triggers


EDIT - Since writing this post, continuous triggers have been replaced by continuous transforms and output streams

Since releasing PipelineDB as open source in July last year, one of the primary use cases we've seen it deployed for has been to build real-time dashboards. Real-time dashboards are powerful tools that allow you to get instant insight into the metrics you care about most. Once you have a realtime view of the metrics you care about, the natural next step is to take an action based on some specific condition or event. In SQL land, the way to react to events happening in your database is by using triggers.

In this post, I'll give a brief overview of what triggers are and then showcase how we have extended triggers to work with continuous SQL queries, what we call continuous views. I will then work through an example of how continuous triggers can be used with PipelineDB to easily build a real-time alerting system for monitoring ad campaign spend. Whenever customers overspend their daily budget an alert email will be send.

Configuration

Before we get started, we need to ensure that PipelineDB is configured correctly. The configuration options that need to be set in pipelinedb.conf to enable continuous triggers are shown below:

continuous_triggers_enabled = on
wal_level = logical
max_wal_senders = 1
max_replication_slots = 1

For more information, refer to the docs.

Triggers on Continuous Views

Triggers are stored procedures that are automatically executed when an event takes place on a table e.g., insert, update or delete. Along with the event, you can also specify a condition that the event must satisfy for the trigger to fire. With PipelineDB 0.9.1, we're bringing triggers to continuous views! Let's start with a simple example:

-- A stream for click events
CREATE STREAM click_stream (
  ip text,
  button text
);

-- A continuous view that aggregates the number of times the sign up
-- button is clicked for each minute from an IP address
CREATE CONTINUOUS VIEW signup_clicks AS
  SELECT
    minute(arrival_timestamp),
    ip,
    count(*)
  FROM click_stream
  WHERE button = 'sign-up'
  GROUP BY minute(arrival_timestamp), ip;

-- A simple trigger to detect abuse from an IP address
CREATE TRIGGER abusive_ip
  AFTER UPDATE ON signup_clicks FOR EACH ROW
  WHEN (NEW.count > 100 AND OLD.count <= 100)
  EXECUTE PROCEDURE notify_abuse();

In this example, the trigger function notify_abuse() will be executed every time we detect that the sign up button has been clicked over a 100 times in a minute from the same IP address. One thing to note here is that even though signup.clicks.count would only ever be incremented by 1 for each INSERT in a regular table query, it might jump by more when continuous views are updated. This is because PipelineDB batches multiple tuples insertions for performance sake. Therefore, when checking that a threshold is exceeded, we always check for ranges in the OLD and NEW value.

Tracking Campaign Spends

Now that I've explained what triggers are, let's walk through an example of how you could use PipelineDB's new continuous trigger functionality to get real-time email alerts for critical events. Before we get started, make sure you have PipelineDB 0.9.1 installed. You can download it here.

Suppose you work at an Ad Tech company. One of the mission critical tasks you have is to ensure your customers dont spend more money than the budget they've allocated. You probably have many guards installed to prevent something like that from happening, but as with any software system, bugs are inevitable. I'm going to show you how you can use PipelineDB to build a real-time alerting utility in case the guards fail and a customer exceeds their budget.

We'll start off by creating a table which stores each customers daily budget.

CREATE TABLE customer_daily_budget (
  customer_id integer,
  day         date,
  budget      bigint,
  PRIMARY KEY (customer_id, day)
);

Let's also create a dummy customer with ID 1234 and populate their daily budgets for the month of March 2016. For simplicity, we're going to choose 10000 as their budget for all days.

INSERT INTO customer_daily_budget
  SELECT 1234, date '2016-03-01' + x, 10000 FROM generate_series(0, 30) AS x;

Next, we'll declare a stream which captures all the ad impressions purchased on our platform.

CREATE STREAM ad_impressions_purchased (
  customer_id   integer,
  campaign_id   integer,
  purchase_time timestamp,
  amount_spent  integer
);

Now we need to create a continuous view that keeps track of the amount of money each customer has spent on ad impressions per day.

CREATE CONTINUOUS VIEW customer_daily_spend AS
  SELECT
    customer_id,
    date(purchase_time) AS day,
    sum(amount_spent) AS total_spent
  FROM ad_impressions_purchased
  GROUP BY customer_id, date(purchase_time);

This is all we need to be able to keep track of how much money each customer is spending per day. Let's pipe some mock data into our ads_impression_purchased stream to see if everything is working correctly. You can download the impressions.sql file here.

cat impressions.sql | psql -h localhost -p 5432 -d pipeline

To see which days the customer exceeded their quota, we can run the following query.

SELECT cds.day
  FROM customer_daily_spend AS cds JOIN customer_daily_budget AS cdb
    ON cds.customer_id = cdb.customer_id AND cds.day = cdb.day
  WHERE cds.total_spent > cdb.budget
  ORDER BY cds.day;

    day
------------
 2016-03-06
 2016-03-07
 2016-03-24
 2016-03-31
(4 rows)

Alerting When Things Go Wrong

Now that everything is set up, let's create a trigger that will immediately notify us via email when a customer exceeds their daily budget. To accomplish this we will need two more things. First, we need a way to fetch the budget for a customer for a specific day inside a trigger condition. We can create a simple SQL function for that.

CREATE FUNCTION get_customer_budget (integer, date)
  RETURNS bigint
  AS 'SELECT budget FROM customer_daily_budget
        WHERE customer_id = $1 AND day = $2'
  LANGUAGE sql
  IMMUTABLE;

Second, we need a trigger function that sends us an email when the trigger is fired. The function takes an email address as its only argument. When executed, it will email the customer ID and the day the budget was exceeded to the provided email address. We're going to use Mailgun's HTTP API to send emails (the API key we'll use is from their free sandbox which only allows 10k emails/month, so please don't abuse it). @pramsey has written a simple extension that lets us make HTTP request from PostgreSQL (thanks man!). We'll start by building the extension from source.

git clone https://github.com/pramsey/pgsql-http.git
cd pgsql-http
make
sudo make install

Next we need to explicitly load the extension into PipelineDB.

CREATE EXTENSION http;

We can now use the functionality provided by this extension to issue an HTTP request to Mailgun's API for sending an alert email.

CREATE FUNCTION send_alert_email ()
RETURNS trigger AS
$$
BEGIN
  SET http.timeout_msec TO 5000;
  RAISE WARNING 'Customer % has exceeded their daily budget for %', NEW.customer_id, NEW.day::text;
  PERFORM http(
    ('POST',
     'https://api.mailgun.net/v3/sandbox2d4d7025bde84eedb1b6fb68739520b4.mailgun.org/messages',
     ARRAY[ ('Authentication', 'Basic YXBpOmtleS02NjQ5YTlmNjkyMGU0ZjBhODZjNDNiNzA4MTYxNDEwYg==')::http_header ],
     'application/x-www-form-urlencoded',
     'from=' || urlencode('PipelineDB <info@pipelinedb.com>') ||
     '&to=' || urlencode(TG_ARGV[0]) ||
     '&subject=' || urlencode('ALERT: Customer budget exceeded!') ||
     '&text=' || 'Customer ' || NEW.customer_id::text || ' has exceeded their daily budget for ' ||
             NEW.day::text)::http_request);
  RETURN NEW;
END;
$$
LANGUAGE plpgsql
VOLATILE;

That's it! We have everything we need to create the required trigger on the customer_daily_spend continuous view.

-- We need to create two triggers, one for the insert case and one for the update case.

CREATE TRIGGER budget_exceeded_on_insert
  AFTER INSERT ON customer_daily_spend FOR EACH ROW
  WHEN (NEW.total_spent > get_customer_budget(NEW.customer_id, NEW.day))
  EXECUTE PROCEDURE send_alert_email('you@email.com');

CREATE TRIGGER budget_exceeded_on_update
  AFTER UPDATE ON customer_daily_spend FOR EACH ROW
  WHEN (OLD.total_spent <= get_customer_budget(NEW.customer_id, NEW.day) AND
        NEW.total_spent > get_customer_budget(NEW.customer_id, NEW.day))
  EXECUTE PROCEDURE send_alert_email('you@email.com');

Let's truncate the continuous view and re-run the data through the ads_impressions_purchased stream to see if we receive alerts for for the days the budget was exceeded.

psql -h localhost -p 5432 -d pipeline \
  -c 'TRUNCATE CONTINUOUS VIEW customer_daily_spend'
cat impressions.sql | psql -h localhost -p 5432 -d pipeline

Buzz buzz...

It works! Check your spam folder if you didn't receive an email. There is also a slight chance that either the request failed or got queued up for later deliver by Mailgun. Check your server logs, you should see a WARNING logged for each day which tells us that the trigger function was correctly fired.

Final Thoughts

Being able to get real-time notifications on data that's being continuously distilled is a powerful abstraction. I showed you a simple example of how you can detect anomalies in campaign spending using continuous triggers. You can also use continuous triggers for other mission critical tasks such as alerting for DDOS attacks in networking, real-time fraud detection for finance, monitoring health and latency requirements of web services etc. If you'd like to learn more about continuous triggers, check out their documentation.

For organizations interested in using continuous triggers in environments that require horizontal scaling and high availability, please check out our commercial extension, PipelineDB Enterprise, and contact us to get started with a trial today.

If any of this sounds interesting to do for a living, we're hiring!