rrs-commit: r29 - trunk

decibel at decibel.org decibel at decibel.org
Thu Mar 3 23:35:32 GMT 2005


Author: decibel
Date: Thu Mar  3 23:35:31 2005
New Revision: 29

Modified:
   trunk/rrs.sql
   trunk/rrs_functions.sql
Log:
Code to dynamically throttle back bucket creation to hit a target runtime

Modified: trunk/rrs.sql
==============================================================================
--- trunk/rrs.sql	(original)
+++ trunk/rrs.sql	Thu Mar  3 23:35:31 2005
@@ -10,6 +10,16 @@
 
 SET search_path = rrs, pg_catalog;
 
+CREATE TABLE setting (
+    setting_name    text    NOT NULL CONSTRAINT setting__name PRIMARY KEY
+    , setting       text    NOT NULL
+) WITHOUT OIDS;
+
+COPY setting ( setting_name, setting ) FROM stdin;
+history length	10
+history_data_interval length	10
+\.
+
 CREATE TABLE rrs (
     rrs_id integer NOT NULL CONSTRAINT rrs_rrs__rrs_id PRIMARY KEY
     , keep_buckets integer NOT NULL
@@ -51,6 +61,14 @@
 ) WITHOUT OIDS;
 
 
+CREATE TABLE history (
+	start_time	timestamp with time zone CONSTRAINT history__start PRIMARY KEY
+	, end_time	timestamp with time zone NOT NULL
+) WITHOUT OIDs;
+
+CREATE TABLE history_data_interval (
+	data_interval	interval	NOT NULL
+) INHERITS ( history ) WITHOUT OIDs;
 
 COPY rrs (rrs_id, keep_buckets, parent, parent_buckets, time_per_bucket, rrs_name) FROM stdin;
 1	60	\N	\N	00:01:00	last hour

Modified: trunk/rrs_functions.sql
==============================================================================
--- trunk/rrs_functions.sql	(original)
+++ trunk/rrs_functions.sql	Thu Mar  3 23:35:31 2005
@@ -10,11 +10,6 @@
 
 SET search_path = rrs, pg_catalog;
 
---
--- TOC entry 16 (OID 591228955)
--- Name: update(); Type: FUNCTION; Schema: rrs; Owner: pgsql
---
-
 CREATE OR REPLACE FUNCTION "update"() RETURNS integer
     AS '
 DECLARE
@@ -26,6 +21,7 @@
     v_source rrs.source%ROWTYPE;
     v_sql text;
 
+    v_start_time rrs.history.start_time%TYPE;
     v_my_oid oid;
 BEGIN
     -- Figure out our OID and try to aquire a lock
@@ -38,8 +34,11 @@
         RETURN -1;
     END IF;
 
+    -- remember when we started for later
+    v_start_time := current_timestamp;
+
     -- make sure all the buckets are up to date
-    v_total_rows := rrs.update_buckets();
+    v_total_rows := rrs.update_buckets( v_start_time );
 
     -- Run through each source, updating each RRD for each source
     FOR v_source IN SELECT * FROM rrs.source
@@ -223,9 +222,11 @@
 -- Name: update_buckets(); Type: FUNCTION; Schema: rrs; Owner: pgsql
 --
 
-CREATE OR REPLACE FUNCTION update_buckets() RETURNS integer
+CREATE OR REPLACE FUNCTION update_buckets( history_bucket.start_time%TYPE ) RETURNS integer
     AS '
 DECLARE
+    p_start_time ALIAS FOR $1;
+
     v_delete_end_time TIMESTAMP WITH TIME ZONE;
     v_first_end_time TIMESTAMP WITH TIME ZONE;
     v_last_end_time TIMESTAMP WITH TIME ZONE;
@@ -312,9 +313,17 @@
                 RAISE LOG ''update_buckets no data found in sources, skipping to next RRD'';
             ELSE
                 v_first_end_time := rrs.interval_time( v_first_end_time, v_rrs.time_per_bucket) + v_rrs.time_per_bucket;
+
+                -- Figure out our end time. This is an expensive routine, so don''t do it if we''re close to caught up
+                IF v_first_end_time > current_timestamp - rrs.setting_get( ''target length'' )::interval * 3 THEN
+                    v_last_end_time := current_timestamp;
+                ELSE
+                    v_last_end_time := calculate_end_time( v_first_end_time, p_start_time );
+                END IF;
+
                 --debug.f(''update_buckets new first_end_time is %'', v_first_end_time);
                 RAISE LOG ''update_buckets new first_end_time is %'', v_first_end_time;
-                v_rows :=  rrs.add_buckets(v_rrs.rrs_id, v_rrs.time_per_bucket, v_first_end_time, NULL::timestamptz);
+                v_rows :=  rrs.add_buckets(v_rrs.rrs_id, v_rrs.time_per_bucket, v_first_end_time, v_last_end_time);
                 v_buckets_added = v_buckets_added + v_rows;
                 RAISE INFO ''update_buckets: % buckets added'', v_rows;
             END IF;
@@ -346,13 +355,13 @@
             END IF;
 
             IF v_first_end_time IS NULL THEN
-                -- If there's no data for our parent, we don't want to create any buckets, because we'd end up with missing
+                -- If there''s no data for our parent, we don''t want to create any buckets, because we''d end up with missing
                 -- data.
                 --debug.f(''update_buckets no data available for rrs_id %, skipping to next RRD'', v_rrs.rrs_id);
                 RAISE LOG ''update_buckets no data available for rrs_id %, skipping to next RRD'', v_rrs.rrs_id;
             ELSE
                 -- If we have our first end time, figure out what the last end time will be. This is just the last end time
-                -- of our parent (and no more, because we don't want to miss any data).
+                -- of our parent (and no more, because we don''t want to miss any data).
                 SELECT max(end_time)
                     INTO v_last_end_time
                     FROM rrs.bucket
@@ -370,10 +379,48 @@
 '
     LANGUAGE plpgsql;
 
---
--- TOC entry 19 (OID 591228977)
--- Name: interval_time(timestamp with time zone, interval); Type: FUNCTION; Schema: rrs; Owner: pgsql
---
+CREATE OR REPLACE FUNCTION calculate_end_time ( rrs.bucket.end_time%TYPE, rrs.history.start_time%TYPE ) RETURNS timestamp with time zone AS '
+DECLARE
+    p_first_end_time ALIAS FOR $1;
+    p_start_time ALIAS FOR $3;
+
+    v_last_end_time rrs.bucket.end_time%TYPE;
+BEGIN
+    /*
+    To calculate the end time we build a linear estimation (http://mathworld.wolfram.com/LeastSquaresFitting.html).
+    This gives as a straight line of the formula y = ax + b. The formulas for a and b are
+
+    b = ( sum(xy) - n * sum(x) * sum(y) ) / ( sum(x^2) - n * sum(x)^2 )
+
+    a = sum(y) - b * sum(x)
+
+    If x is runtime and y is data interval, our desired data_interval would be y = ax + b
+        = ( sum(y) - b * sum(x) ) * x + b
+
+    */
+    
+    SELECT INTO v_data_interval
+            sum_y - b * sum_x ) * rss.setting_get(''desired run time'')::interval + b
+        FROM (
+                SELECT sum(x) AS sum_x, sum(y) AS sum_y
+                        , ( sum( x * y ) - count(*) * sum(x) * sum(y) ) / ( sum(x*x) - count(*) * sum(x) * sum(x) ) AS b
+                    FROM (
+                            SELECT data_interval AS y, end_time - start_time AS x
+                                FROM history_bucket
+                        ) raw
+            ) sums_b
+    ;
+
+    -- Now we have an upper limit on how many buckets to process, but we don''t want to go beyond current_timestamp
+    v_last_end_time := min( current_timestamp, p_first_end_time + v_data_interval );
+
+    -- Log how much time we''ll actually be processing
+    INSERT INTO rrs.history_bucket( start_time, end_time, buckets )
+        VALUES( p_start_time, p_start_time, v_last_end_time - v_first_end_time )
+    ;
+
+END;
+' LANGUAGE plpgsql;
 
 CREATE OR REPLACE FUNCTION interval_time(timestamp with time zone, interval) RETURNS timestamp with time zone
     AS '
@@ -413,7 +460,7 @@
         v_current_end_time := rrs.interval_time( p_first_end_time, p_time_per_bucket );
 
         -- Figure out what the most recent bucket we can create is
-        v_max_end_time := rrs.interval_time( coalesce( p_last_end_time, current_timestamp ), p_time_per_bucket ); 
+        v_max_end_time := rrs.interval_time( p_last_end_time, p_time_per_bucket ); 
         /*
         debug.f(''add_buckets: adding buckets for rrs_id % between % and %''
                     , p_rrs_id
@@ -440,6 +487,120 @@
 END;
 '
     LANGUAGE plpgsql;
+
+
+
+CREATE OR REPLACE FUNCTION log_time( timestamptz ) RETURNS interval AS '
+DECLARE
+    p_start_time ALIAS FOR $1;
+
+    v_end_time rrs.history.end_time%TYPE;
+    v_min_keep_time rrs.history.start_time%TYPE;
+BEGIN
+    v_end_time := current_timestamp;
+
+    -- Trim the history table
+    SELECT INTO v_min_keep_time
+            min( start_time )
+        FROM (
+                SELECT start_time
+                    FROM (
+                            SELECT start_time
+                                FROM rrs.history
+                                ORDER BY start_time DESC
+                        ) ordered
+                    LIMIT rrs.setting_get( ''history length'' )
+            ) limited
+    ;
+    IF FOUND THEN
+        DELETE FROM rrs.history WHERE start_time < v_min_keep_time;
+    END IF;
+
+    INSERT INTO rrs.history( start_time, end_time )
+        VALUES( p_start_time, v_end_time )
+    ;
+
+    -- IF add_buckets decided not to limit bucket creation, don''t bother processing anything
+    IF EXISTS (SELECT * FROM rrs.history_bucket WHERE start_time = p_start_time) THEN
+        -- Trim the history table
+        SELECT INTO v_min_keep_time
+                min( start_time )
+            FROM (
+                    SELECT start_time
+                        FROM (
+                                SELECT start_time
+                                    FROM rrs.history_bucket
+                                    ORDER BY start_time DESC
+                            ) ordered
+                        LIMIT rrs.setting_get( ''history length'' )
+                ) limited
+        ;
+        IF FOUND THEN
+            DELETE FROM rrs.history_bucket WHERE start_time < v_min_keep_time;
+        END IF;
+        
+        UPDATE rrs.history_bucket
+            SET end_time = v_end_time
+            WHERE start_time = p_start_time
+        ;
+    END IF;
+
+    RETURN v_end_time - v_start_time;
+END;
+'
+    LANGUAGE plpgsql
+    SECURITY DEFINER
+;
+
+
+CREATE OR REPLACE FUNCTION setting_get(text) RETURNS text AS '
+    SELECT setting FROM rrs.setting WHERE setting_name = $1;
+'
+    LANGUAGE sql
+    SECURITY DEFINER
+;
+
+CREATE OR REPLACE FUNCTION setting_set(text, text) RETURNS text AS '
+DECLARE
+    p_setting_name ALIAS FOR $1;
+    p_setting ALIAS FOR $2;
+
+    v_old_setting rrs.setting.setting%TYPE;
+BEGIN
+    SELECT INTO v_old_setting
+            setting
+        FROM rrs.setting
+        WHERE setting_name = p_setting_name
+        FOR UPDATE
+    ;
+
+    IF p_setting IS NULL THEN
+        IF FOUND THEN
+            DELETE FROM rrs.setting
+                WHERE setting_name = p_setting_name
+            ;
+        END IF;
+    ELSE
+        IF NOT FOUND THEN
+            INSERT INTO rrs.setting( setting_name, setting )
+                VALUES( p_setting_name, p_setting )
+            ;
+            v_old_setting := NULL;
+        ELSE
+            UPDATE rrs.setting
+                SET setting = p_setting
+                WHERE setting_name = p_setting_name
+            ;
+        END IF;
+    END IF;
+
+    RETURN v_old_setting;
+END;
+'
+    LANGUAGE plpgsql
+    SECURITY DEFINER
+;
+
 
 GRANT EXECUTE ON FUNCTION update() TO public;
 


More information about the rrs-commit mailing list