Since releasing PipelineDB as open source in July last year, one of the primary use cases we've seen it deployed for has been to build real-time dashboards. Real-time dashboards are powerful tools that allow you to get instant insight into the metrics you care about most. Once you have a realtime view of the metrics you care about, the natural next step is to take an action based on some specific condition or event. In SQL land, the way to react to events happening in your database is by using triggers.
In this post, I'll give a brief overview of what triggers are and then showcase how we have extended triggers to work with continuous SQL queries, what we call continuous views. I will then work through an example of how continuous triggers can be used with PipelineDB to easily build a real-time alerting system for monitoring ad campaign spend. Whenever customers overspend their daily budget an alert email will be send.
Before we get started, we need to ensure that PipelineDB is configured correctly. The configuration options that need to be set in
pipelinedb.conf to enable continuous triggers are shown below:
continuous_triggers_enabled = on wal_level = logical max_wal_senders = 1 max_replication_slots = 1
For more information, refer to the docs.
Triggers on Continuous Views
Triggers are stored procedures that are automatically executed when an event takes place on a table e.g., insert, update or delete. Along with the event, you can also specify a condition that the event must satisfy for the trigger to fire. With PipelineDB 0.9.1, we're bringing triggers to continuous views! Let's start with a simple example:
-- A stream for click events CREATE STREAM click_stream ( ip text, button text ); -- A continuous view that aggregates the number of times the sign up -- button is clicked for each minute from an IP address CREATE CONTINUOUS VIEW signup_clicks AS SELECT minute(arrival_timestamp), ip, count(*) FROM click_stream WHERE button = 'sign-up' GROUP BY minute(arrival_timestamp), ip; -- A simple trigger to detect abuse from an IP address CREATE TRIGGER abusive_ip AFTER UPDATE ON signup_clicks FOR EACH ROW WHEN (NEW.count > 100 AND OLD.count <= 100) EXECUTE PROCEDURE notify_abuse();
In this example, the trigger function
notify_abuse() will be executed every time we detect that the sign up button has been clicked over a 100 times in a minute from the same IP address. One thing to note here is that even though
signup.clicks.count would only ever be incremented by 1 for each
INSERT in a regular table query, it might jump by more when continuous views are updated. This is because PipelineDB batches multiple tuples insertions for performance sake. Therefore, when checking that a threshold is exceeded, we always check for ranges in the
Tracking Campaign Spends
Now that I've explained what triggers are, let's walk through an example of how you could use PipelineDB's new continuous trigger functionality to get real-time email alerts for critical events. Before we get started, make sure you have PipelineDB 0.9.1 installed. You can download it here.
Suppose you work at an Ad Tech company. One of the mission critical tasks you have is to ensure your customers dont spend more money than the budget they've allocated. You probably have many guards installed to prevent something like that from happening, but as with any software system, bugs are inevitable. I'm going to show you how you can use PipelineDB to build a real-time alerting utility in case the guards fail and a customer exceeds their budget.
We'll start off by creating a table which stores each customers daily budget.
CREATE TABLE customer_daily_budget ( customer_id integer, day date, budget bigint, PRIMARY KEY (customer_id, day) );
Let's also create a dummy customer with ID
1234 and populate their daily budgets for the month of March 2016. For simplicity, we're going to choose
10000 as their budget for all days.
INSERT INTO customer_daily_budget SELECT 1234, date '2016-03-01' + x, 10000 FROM generate_series(0, 30) AS x;
Next, we'll declare a stream which captures all the ad impressions purchased on our platform.
CREATE STREAM ad_impressions_purchased ( customer_id integer, campaign_id integer, purchase_time timestamp, amount_spent integer );
Now we need to create a continuous view that keeps track of the amount of money each customer has spent on ad impressions per day.
CREATE CONTINUOUS VIEW customer_daily_spend AS SELECT customer_id, date(purchase_time) AS day, sum(amount_spent) AS total_spent FROM ad_impressions_purchased GROUP BY customer_id, date(purchase_time);
This is all we need to be able to keep track of how much money each customer is spending per day. Let's pipe some mock data into our
ads_impression_purchased stream to see if everything is working correctly. You can download the
impressions.sql file here.
cat impressions.sql | psql -h localhost -p 5432 -d pipeline
To see which days the customer exceeded their quota, we can run the following query.
SELECT cds.day FROM customer_daily_spend AS cds JOIN customer_daily_budget AS cdb ON cds.customer_id = cdb.customer_id AND cds.day = cdb.day WHERE cds.total_spent > cdb.budget ORDER BY cds.day; day ------------ 2016-03-06 2016-03-07 2016-03-24 2016-03-31 (4 rows)
Alerting When Things Go Wrong
Now that everything is set up, let's create a trigger that will immediately notify us via email when a customer exceeds their daily budget. To accomplish this we will need two more things. First, we need a way to fetch the budget for a customer for a specific day inside a trigger condition. We can create a simple SQL function for that.
CREATE FUNCTION get_customer_budget (integer, date) RETURNS bigint AS 'SELECT budget FROM customer_daily_budget WHERE customer_id = $1 AND day = $2' LANGUAGE sql IMMUTABLE;
Second, we need a trigger function that sends us an email when the trigger is fired. The function takes an email address as its only argument. When executed, it will email the customer ID and the day the budget was exceeded to the provided email address. We're going to use Mailgun's HTTP API to send emails (the API key we'll use is from their free sandbox which only allows 10k emails/month, so please don't abuse it). @pramsey has written a simple extension that lets us make HTTP request from PostgreSQL (thanks man!). We'll start by building the extension from source.
git clone https://github.com/pramsey/pgsql-http.git cd pgsql-http make sudo make install
Next we need to explicitly load the extension into PipelineDB.
CREATE EXTENSION http;
We can now use the functionality provided by this extension to issue an HTTP request to Mailgun's API for sending an alert email.
CREATE FUNCTION send_alert_email () RETURNS trigger AS $$ BEGIN SET http.timeout_msec TO 5000; RAISE WARNING 'Customer % has exceeded their daily budget for %', NEW.customer_id, NEW.day::text; PERFORM http( ('POST', 'https://api.mailgun.net/v3/sandbox2d4d7025bde84eedb1b6fb68739520b4.mailgun.org/messages', ARRAY[ ('Authentication', 'Basic YXBpOmtleS02NjQ5YTlmNjkyMGU0ZjBhODZjNDNiNzA4MTYxNDEwYg==')::http_header ], 'application/x-www-form-urlencoded', 'from=' || urlencode('PipelineDB <firstname.lastname@example.org>') || '&to=' || urlencode(TG_ARGV) || '&subject=' || urlencode('ALERT: Customer budget exceeded!') || '&text=' || 'Customer ' || NEW.customer_id::text || ' has exceeded their daily budget for ' || NEW.day::text)::http_request); RETURN NEW; END; $$ LANGUAGE plpgsql VOLATILE;
That's it! We have everything we need to create the required trigger on the
customer_daily_spend continuous view.
-- We need to create two triggers, one for the insert case and one for the update case. CREATE TRIGGER budget_exceeded_on_insert AFTER INSERT ON customer_daily_spend FOR EACH ROW WHEN (NEW.total_spent > get_customer_budget(NEW.customer_id, NEW.day)) EXECUTE PROCEDURE send_alert_email('email@example.com'); CREATE TRIGGER budget_exceeded_on_update AFTER UPDATE ON customer_daily_spend FOR EACH ROW WHEN (OLD.total_spent <= get_customer_budget(NEW.customer_id, NEW.day) AND NEW.total_spent > get_customer_budget(NEW.customer_id, NEW.day)) EXECUTE PROCEDURE send_alert_email('firstname.lastname@example.org');
Let's truncate the continuous view and re-run the data through the
ads_impressions_purchased stream to see if we receive alerts for for the days the budget was exceeded.
psql -h localhost -p 5432 -d pipeline \ -c 'TRUNCATE CONTINUOUS VIEW customer_daily_spend' cat impressions.sql | psql -h localhost -p 5432 -d pipeline
It works! Check your spam folder if you didn't receive an email. There is also a slight chance that either the request failed or got queued up for later deliver by Mailgun. Check your server logs, you should see a
WARNING logged for each day which tells us that the trigger function was correctly fired.
Being able to get real-time notifications on data that's being continuously distilled is a powerful abstraction. I showed you a simple example of how you can detect anomalies in campaign spending using continuous triggers. You can also use continuous triggers for other mission critical tasks such as alerting for DDOS attacks in networking, real-time fraud detection for finance, monitoring health and latency requirements of web services etc. If you'd like to learn more about continuous triggers, check out their documentation.
For organizations interested in using continuous triggers in environments that require horizontal scaling and high availability, please check out our commercial extension, PipelineDB Enterprise, and contact us to get started with a trial today.
If any of this sounds interesting to do for a living, we're hiring!