यद्यपि सेक्शनिंग का विषय पहले ही उठाया जा
चुका है , मैं इस समस्या को हल करने में अपने अनुभव के बारे में बात करने के लिए वापस लौटना चाहता हूं जो बड़ी मात्रा में डेटा के विश्लेषणात्मक प्रसंस्करण की आवश्यकता के संबंध में उत्पन्न हुई। सेक्शनिंग के अलावा, मैं स्रोत डेटा में बदलाव होने पर, स्वचालित रूप से अपडेट किए गए कुल प्रश्नों के "स्नैपशॉट" के अत्यंत सरलीकृत कार्यान्वयन पर विचार करूंगा।
विकसित प्रणाली के लिए मुख्य आवश्यकताओं में से एक मुफ्त सॉफ्टवेयर का उपयोग था, और इसलिए, पसंद PostgreSQL पर गिर गई। जिस समय मैंने प्रोजेक्ट पर काम करना शुरू किया था, तब मैं PostgreSQL को बहुत सतही रूप से जानता था, लेकिन मैं Oracle डेटाबेस की क्षमताओं से काफी परिचित था। चूँकि यह विश्लेषणात्मक प्रसंस्करण के बारे में था, इसलिए मैं ओरेकल विकल्पों जैसे कि
पार्टिशनिंग और
मटेरियलाइज़्ड व्यूज के अनुरूप होना चाहता था।
PostgreSQL की विशेषताओं से परिचित होने के बाद, यह स्पष्ट हो गया कि इस कार्यक्षमता, एक तरह से या किसी अन्य, को मैन्युअल रूप से लिखना होगा।
बेशक, हम भौतिकवादी दृश्यों के किसी भी पूर्ण कार्यान्वयन के बारे में बात नहीं कर रहे थे, जो
पुनर्लेखन प्रश्नों के लिए प्रदान करता है। मेरी आवश्यकताओं के लिए, स्वचालित रूप से अद्यतन एकल-तालिका नमूने बनाने की क्षमता काफी पर्याप्त थी (निकट भविष्य में तालिकाओं में शामिल होने की संभावना सबसे अधिक जोड़ दी जाएगी)। विभाजन के लिए, मैंने
विरासत में मिली तालिकाओं का उपयोग करके बार-बार वर्णित दृष्टिकोण का उपयोग करने की योजना बनाई है, जिसमें ट्रिगर द्वारा नियंत्रित डेटा प्रविष्टि है। मेरे पास विभाजन को नियंत्रित करने के लिए
नियमों का उपयोग करने का एक विचार था, लेकिन मैंने इसे मना कर दिया, क्योंकि, मेरे मामले में, एकल रिकॉर्ड के साथ डेटा सम्मिलित किया गया था।
मैंने शुरुआत में, मेटाडेटा के भंडारण के लिए तालिकाओं के साथ शुरुआत की:
ps_tables.sqlcreate sequence ps_table_seq; create table ps_table ( id bigint default nextval('ps_table_seq') not null, name varchar(50) not null unique, primary key(id) ); create sequence ps_column_seq; create table ps_column ( id bigint default nextval('ps_column_seq') not null, table_id bigint not null references ps_table(id), name varchar(50) not null, parent_name varchar(50), type_name varchar(8) not null check (type_name in ('date', 'key', 'nullable', 'sum', 'min', 'max', 'cnt')), unique (table_id, name), primary key(id) ); create table ps_range_partition ( table_id bigint not null references ps_table(id), type_name varchar(10) not null check (type_name in ('day', 'week', 'month', 'year')), start_value date not null, end_value date not null, primary key(table_id, start_value) ); create table ps_snapshot ( snapshot_id bigint not null references ps_table(id), table_id bigint not null references ps_table(id), type_name varchar(10) not null check (type_name in ('day', 'week', 'month', 'year')), primary key(snapshot_id) );
यहाँ सब कुछ बहुत स्पष्ट है। केवल उल्लेख के लायक चीजें स्तंभ प्रकार हैं:
टाइप
| विवरण
|
तिथि
| डेटा को विभाजित करने और एकत्र करने के लिए उपयोग की जाने वाली कैलेंडर तिथि वाले स्तंभ (दिनांक और टाइमस्टैम्प पोस्टग्रैसीक्यू प्रकार समर्थित हैं)
|
कुंजी
| डेटा एकत्रीकरण में खंड द्वारा समूह में उपयोग की जाने वाली कुंजी (सभी पोस्टग्रेसीक्यूएल पूर्णांक प्रकार समर्थित हैं)
|
नल
| डेटा एकत्रीकरण में उपयोग की जाने वाली एक कुंजी, संभवतः शून्य से युक्त
|
योग
| मूल्यों का योग
|
मिनट
| न्यूनतम मूल्य
|
अधिकतम
| अधिकतम मूल्य
|
cnt
| गैर-शून्य मानों की गिनती
|
संपूर्ण समाधान का आधार वह फ़ंक्शन था जो स्रोत डेटा वाले तालिका के लिए ट्रिगर फ़ंक्शन के पुनर्निर्माण का कार्य करता है:
ps_trigger_regenerate (bigint) create or replace function ps_trigger_regenerate(in p_table bigint) returns void as $$ declare l_sql text; l_table_name varchar(50); l_date_column varchar(50); l_flag boolean; tabs record; columns record; begin select name into l_table_name from ps_table where id = p_table; l_sql := 'create or replace function ps_' || l_table_name || '_insert_trigger() returns trigger ' || 'as $'|| '$ ' || 'begin '; for tabs in select a.snapshot_id as id, b.name as table_name, a.type_name as snapshot_type from ps_snapshot a, ps_table b where a.table_id = p_table and b.id = a.snapshot_id loop l_flag = FALSE; l_sql := l_sql || 'update ' || tabs.table_name || ' set '; for columns in select name, parent_name, type_name from ps_column where table_id = tabs.id and not type_name in ('date', 'key', 'nullable') loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; if columns.type_name = 'sum' then l_sql := l_sql || columns.name || ' = ' || columns.name || ' + coalesce(NEW.' || columns.parent_name || ', 0) '; end if; if columns.type_name = 'min' then l_sql := l_sql || columns.name || ' = least(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) '; end if; if columns.type_name = 'max' then l_sql := l_sql || columns.name || ' = greatest(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) '; end if; if columns.type_name = 'cnt' then l_sql := l_sql || columns.name || ' = ' || columns.name || ' + case when NEW.' || columns.parent_name || ' is null then 0 else 1 end '; end if; end loop; l_flag = FALSE; l_sql := l_sql || 'where '; for columns in select name, parent_name, type_name from ps_column where table_id = tabs.id and type_name in ('date', 'key', 'nullable') loop if l_flag then l_sql := l_sql || 'and '; end if; l_flag := TRUE; if columns.type_name = 'date' then l_sql := l_sql || columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') '; end if; if columns.type_name = 'key' then l_sql := l_sql || columns.name || ' = NEW.' || columns.parent_name || ' '; end if; if columns.type_name = 'nullable' then l_sql := l_sql || columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)'; end if; end loop; l_sql := l_sql || '; ' || 'if not FOUND then ' || 'insert into ' || tabs.table_name || '('; l_flag = FALSE; for columns in select name, type_name from ps_column where table_id = tabs.id loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; l_sql := l_sql || columns.name; end loop; l_sql := l_sql || ') values ('; l_flag = FALSE; for columns in select name, parent_name, type_name from ps_column where table_id = tabs.id loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; if columns.type_name = 'date' then l_sql := l_sql || 'date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ')'; elsif columns.type_name = 'cnt' then l_sql := l_sql || 'case when NEW.' || columns.parent_name || ' is null then 0 else 1 end'; elsif columns.type_name in ('nullable', 'sum') then l_sql := l_sql || 'coalesce(NEW.' || columns.parent_name || ', 0)'; else l_sql := l_sql || 'NEW.' || columns.parent_name; end if; end loop; l_sql := l_sql || '); ' || 'end if; '; end loop; select name into l_date_column from ps_column where table_id = p_table and type_name = 'date'; for tabs in select to_char(start_value, 'YYYYMMDD') as start_value, to_char(end_value, 'YYYYMMDD') as end_value, type_name from ps_range_partition where table_id = p_table order by start_value desc loop l_sql := l_sql || 'if NEW.' || l_date_column || ' >= to_date(''' || tabs.start_value || ''', ''YYYYMMDD'') and NEW.' || l_date_column || ' < to_date(''' || tabs.end_value || ''', ''YYYYMMDD'') then ' || 'insert into ' || l_table_name || '_' || tabs.start_value || ' values (NEW.*); ' || 'return null; ' || 'end if; '; end loop; l_sql := l_sql || 'return NEW; '|| 'end; '|| '$'||'$ language plpgsql'; execute l_sql; l_sql := 'create or replace function ps_' || l_table_name || '_raise_trigger() returns trigger ' || 'as $'|| '$ ' || 'begin ' || 'raise EXCEPTION ''Can''''t support % on MIN or MAX aggregate'', TG_OP;' || 'end; '|| '$'||'$ language plpgsql'; execute l_sql; l_sql := 'create or replace function ps_' || l_table_name || '_delete_trigger() returns trigger ' || 'as $'|| '$ ' || 'begin '; for tabs in select a.snapshot_id as id, b.name as table_name, a.type_name as snapshot_type from ps_snapshot a, ps_table b where a.table_id = p_table and b.id = a.snapshot_id loop l_flag = FALSE; l_sql := l_sql || 'update ' || tabs.table_name || ' set '; for columns in select name, parent_name, type_name from ps_column where table_id = tabs.id and type_name in ('sum', 'cnt') loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; if columns.type_name = 'sum' then l_sql := l_sql || columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' '; end if; if columns.type_name = 'cnt' then l_sql := l_sql || columns.name || ' = ' || columns.name || ' - case when OLD.' || columns.parent_name || ' is null then 0 else 1 end '; end if; end loop; l_flag = FALSE; l_sql := l_sql || 'where '; for columns in select name, parent_name, type_name from ps_column where table_id = tabs.id and type_name in ('date', 'key', 'nullable') loop if l_flag then l_sql := l_sql || 'and '; end if; l_flag := TRUE; if columns.type_name = 'date' then l_sql := l_sql || columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') '; end if; if columns.type_name = 'key' then l_sql := l_sql || columns.name || ' = NEW.' || columns.parent_name || ' '; end if; if columns.type_name = 'nullable' then l_sql := l_sql || columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)'; end if; end loop; l_sql := l_sql || '; '; end loop; l_sql := l_sql || 'return null; '|| 'end; '|| '$'||'$ language plpgsql'; execute l_sql; l_sql := 'create or replace function ps_' || l_table_name || '_update_trigger() returns trigger ' || 'as $'|| '$ ' || 'begin '; for tabs in select a.snapshot_id as id, b.name as table_name, a.type_name as snapshot_type from ps_snapshot a, ps_table b where a.table_id = p_table and b.id = a.snapshot_id loop l_flag = FALSE; l_sql := l_sql || 'update ' || tabs.table_name || ' set '; for columns in select name, parent_name, type_name from ps_column where table_id = tabs.id and type_name in ('sum', 'cnt') loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; if columns.type_name = 'sum' then l_sql := l_sql || columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' + NEW.' || columns.parent_name || ' '; end if; if columns.type_name = 'cnt' then l_sql := l_sql || columns.name || ' = ' || columns.name || ' - case when OLD.' || columns.parent_name || ' is null then 0 else 1 end ' || ' + case when NEW.' || columns.parent_name || ' is null then 0 else 1 end '; end if; end loop; l_flag = FALSE; l_sql := l_sql || 'where '; for columns in select name, parent_name, type_name from ps_column where table_id = tabs.id and type_name in ('date', 'key', 'nullable') loop if l_flag then l_sql := l_sql || 'and '; end if; l_flag := TRUE; if columns.type_name = 'date' then l_sql := l_sql || columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') '; end if; if columns.type_name = 'key' then l_sql := l_sql || columns.name || ' = NEW.' || columns.parent_name || ' '; end if; if columns.type_name = 'nullable' then l_sql := l_sql || columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)'; end if; end loop; l_sql := l_sql || '; '; end loop; l_sql := l_sql || 'return null; '|| 'end; '|| '$'||'$ language plpgsql'; execute l_sql; end; $$ language plpgsql;
अपने भयानक रूप के बावजूद, यह फ़ंक्शन काफी सरल है। इसका कार्य ट्रिगर के निर्माण में उपयोग किए जाने वाले चार कार्यों (उपलब्ध मेटाडेटा के आधार पर) को बनाना है:
- ps_Table_insert_trigger () - फ़ंक्शन जो डेटा के सम्मिलन को नियंत्रित करता है
- ps_TABLE_update_trigger () - फ़ंक्शन जो डेटा रिफ्रेश को नियंत्रित करता है
- ps_Table_delete_trigger () - फ़ंक्शन जो डेटा हटाने को नियंत्रित करता है
- ps_Table_raise_trigger () - फ़ंक्शन अद्यतन करने और डेटा हटाने पर रोक लगाता है
यहां, TABLE के बजाय, स्रोत डेटा वाली तालिका का नाम प्रतिस्थापित किया जाता है। Ps_Table_insert_trigger () की एक सामान्य परिभाषा इस तरह दिखाई देगी:
create or replace function ps_data_insert_trigger() returns trigger as $$ begin update data_month set sum_field = sum_field + NEW.sum_field , min_field = least(min_field, NEW.min_field) where date_field = date_trunc('month', NEW.date_field) and key_field = NEW.key_field; if not FOUND then insert into data_month(date_field, key_field, sum_field, min_field) values (date_trunc('month', NEW.date_field), NEW.key_field, NEW.sum_field, NEW.min_field); end if; if NEW.date_field >= to_date('20130101', 'YYYYMMDD') and NEW.date_field < to_date('20130201', 'YYYYMMDD') then insert into data_20130101 values (NEW.*); return null; end if; return NEW; end; $$ language plpgsql;
दरअसल, फ़ंक्शन थोड़ा अधिक जटिल दिखता है, क्योंकि अशक्त मानों को एक विशेष तरीके से नियंत्रित किया जाता है। लेकिन, एक उदाहरण के रूप में, उपरोक्त उदाहरण काफी पर्याप्त है। इस कोड का तर्क स्पष्ट है:
- मूल तालिका में डेटा सम्मिलित करते समय, हम एकत्रित data_month दृश्य में काउंटरों को अद्यतन करने का प्रयास करते हैं
- यदि यह विफल हो गया (data_month में रिकॉर्ड नहीं मिला), एक नया रिकॉर्ड जोड़ें
- अगला, जांचें कि क्या प्रत्येक अनुभाग तिथि सीमा में है (उदाहरण में, एक खंड), और यदि सफल है, तो रिकॉर्ड को संबंधित खंड में डालें (चूंकि अनुभाग मुख्य तालिका से विरासत में मिला है, आप सुरक्षित रूप से तारांकन चिह्न का उपयोग कर सकते हैं) और मुख्य तालिका में सम्मिलित होने से रिकॉर्ड को रोकने के लिए अशक्त लौटें
- यदि कोई भी अनुभाग फिट नहीं है, तो नया लौटें, जिससे आप मुख्य तालिका में पेस्ट कर सकते हैं
अंतिम पैराग्राफ इस तथ्य की ओर जाता है कि यदि एक उपयुक्त अनुभाग नहीं मिला है, तो डेटा को मुख्य तालिका में जोड़ा जाता है। व्यवहार में, यह काफी सुविधाजनक है। यहां तक कि अगर हम पहले से एक खंड नहीं बनाते हैं या गलत तारीख के साथ डेटा प्राप्त करते हैं, तो डेटा सम्मिलित करने में सफलता मिलेगी। इसके बाद, आप क्वेरी चलाकर मुख्य तालिका की सामग्री का विश्लेषण कर सकते हैं:
select * from only data
उसके बाद, लापता अनुभाग बनाएं (जैसा कि नीचे दिखाया जाएगा, डेटा स्वचालित रूप से मुख्य तालिका से निर्मित अनुभाग में स्थानांतरित कर दिया जाएगा)। ऐसे मामलों में, आपके सेक्शन में नहीं आने वाले रिकॉर्ड्स की संख्या आमतौर पर बड़ी नहीं होती है और डेटा ट्रांसफर की लागत नगण्य होती है।
अब यह एक दोहन करने के लिए बनी हुई है। चलिए एक नए अनुभाग को बनाने के लिए फ़ंक्शन से शुरू करते हैं:
ps_add_range_partition (varchar, varchar, varchar, date) create or replace function ps_add_range_partition(in p_table varchar, in p_column varchar, in p_type varchar, in p_start date) returns void as $$ declare l_sql text; l_end date; l_start_str varchar(10); l_end_str varchar(10); l_table bigint; l_flag boolean; columns record; begin perform 1 from ps_table a, ps_column b where a.id = b.table_id and lower(a.name) = lower(p_table) and b.type_name = 'date' and lower(b.name) <> lower(p_column); if FOUND then raise EXCEPTION 'Conflict DATE columns'; end if; l_end := p_start + ('1 ' || p_type)::INTERVAL; perform 1 from ps_table a, ps_range_partition b where a.id = b.table_id and lower(a.name) = lower(p_table) and (( p_start >= b.start_value and p_start < b.end_value ) or ( b.start_value >= p_start and b.start_value < l_end )); if FOUND then raise EXCEPTION 'Range intervals intersects'; end if; perform 1 from ps_table where lower(name) = lower(p_table); if not FOUND then insert into ps_table(name) values (lower(p_table)); end if; select id into l_table from ps_table where lower(name) = lower(p_table); perform 1 from ps_column where table_id = l_table and type_name = 'date' and lower(name) = lower(p_column); if not FOUND then insert into ps_column(table_id, name, type_name) values (l_table, lower(p_column), 'date'); end if; insert into ps_range_partition(table_id, type_name, start_value, end_value) values (l_table, p_type, p_start, l_end); l_start_str = to_char(p_start, 'YYYYMMDD'); l_end_str = to_char(l_end, 'YYYYMMDD'); l_sql := 'create table ' || p_table || '_' || l_start_str || '(' || 'check (' || p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' || p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')), ' || 'primary key ('; l_flag := FALSE; for columns in select f.name as name from ( select ps_array_to_set(a.conkey) as nn from pg_constraint a, pg_class b where b.oid = a.conrelid and a.contype = 'p' and b.relname = p_table ) c, ( select d.attname as name, d.attnum as nn from pg_attribute d, pg_class e where e.oid = d.attrelid and e.relname = p_table ) f where f.nn = c.nn order by f.nn loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; l_sql := l_sql || columns.name; end loop; l_sql := l_sql || ')) inherits (' || p_table || ')'; execute l_sql; l_sql := 'create index ' || p_table || '_' || l_start_str || '_date on ' || p_table || '_' || l_start_str || '(' || p_column || ')'; execute l_sql; perform ps_trigger_regenerate(l_table); execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table; l_sql := 'insert into ' || p_table || '_' || l_start_str || ' ' || 'select * from ' || p_table || ' where ' || p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' || p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')'; execute l_sql; l_sql := 'delete from only ' || p_table || ' where ' || p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' || p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_before_insert ' || 'before insert on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_insert_trigger()'; execute l_sql; perform 1 from ps_snapshot a, ps_column b where b.table_id = a.snapshot_id and a.table_id = l_table and b.type_name in ('min', 'max'); if FOUND then l_sql := 'create trigger ps_' || p_table || '_after_update ' || 'after update on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_after_delete ' || 'after delete on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_' || l_start_str || '_after_update ' || 'after update on ' || p_table || '_' || l_start_str || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_' || l_start_str || '_after_delete ' || 'after delete on ' || p_table || '_' || l_start_str || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; else l_sql := 'create trigger ps_' || p_table || '_after_update ' || 'after update on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_update_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_after_delete ' || 'after delete on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_delete_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_' || l_start_str || '_after_update ' || 'after update on ' || p_table || '_' || l_start_str || ' for each row ' || 'execute procedure ps_' || p_table || '_update_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_' || l_start_str || '_after_delete ' || 'after delete on ' || p_table || '_' || l_start_str || ' for each row ' || 'execute procedure ps_' || p_table || '_delete_trigger()'; execute l_sql; end if; end; $$ language plpgsql;
यहां, इनपुट डेटा की शुद्धता की जांच करने के बाद, हम आवश्यक मेटाडेटा जोड़ते हैं, जिसके बाद, हम एक विरासत तालिका बनाते हैं। फिर, हम ps_trigger_regenerate को कॉल करके ट्रिगर्स के कार्यों को फिर से बनाते हैं, जिसके बाद हम सेक्शनिंग स्थिति में आने वाले डेटा को डायनेमिक क्वेरी के साथ बनाए गए सेक्शन में ट्रांसफर करते हैं और ट्रिगर्स को खुद को फिर से बनाते हैं।
दो बिंदुओं के साथ कठिनाइयाँ उत्पन्न हुईं।
- शुरुआत की तारीख (इनपुट पैरामीटर p_type के आधार पर) एक महीने, दिन या वर्ष को जोड़ने के साथ मुझे थोड़ा सा नुकसान उठाना पड़ा:
l_end := p_start + ('1 ' || p_type)::INTERVAL;
- चूँकि प्राथमिक कुंजी विरासत में नहीं मिली है, इसलिए मुझे स्रोत तालिका की प्राथमिक कुंजी के स्तंभों की सूची प्राप्त करने के लिए सिस्टम कैटलॉग के लिए एक अनुरोध लिखना पड़ा (मुझे अपनी मेटाडेटा में प्राथमिक कुंजी का वर्णन करना भी अनुचित लगा):
select f.name as name from ( select ps_array_to_set(a.conkey) as nn from pg_constraint a, pg_class b where b.oid = a.conrelid and a.contype = 'p' and b.relname = p_table ) c, ( select d.attname as name, d.attnum as nn from pg_attribute d, pg_class e where e.oid = d.attrelid and e.relname = p_table ) f where f.nn = c.nn order by f.nn
इसके अलावा, यह ध्यान दिया जाना चाहिए कि सूचकांक बनाने से पहले, विभाजन कुंजी (निर्मित विभाजन के लिए) पर, यह जांचना सार्थक होगा कि क्या यह प्राथमिक कुंजी का प्रमुख स्तंभ है (ताकि डुप्लिकेट इंडेक्स बनाने के लिए नहीं)।
किसी अनुभाग को हटाने का कार्य बहुत सरल है और विशेष टिप्पणियों की आवश्यकता नहीं है:
ps_del_range_partition (varchar, date) create or replace function ps_del_range_partition(in p_table varchar, in p_start date) returns void as $$ declare l_sql text; l_start_str varchar(10); l_table bigint; begin select id into l_table from ps_table where lower(name) = lower(p_table); l_start_str = to_char(p_start, 'YYYYMMDD'); delete from ps_range_partition where table_id = l_table and start_value = p_start; perform ps_trigger_regenerate(l_table); l_sql := 'insert into ' || p_table || ' ' || 'select * from ' || p_table || '_' || l_start_str; execute l_sql; perform 1 from ( select 1 from ps_range_partition where table_id = l_table union all select 1 from ps_snapshot where table_id = l_table ) a; if not FOUND then execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table; execute 'drop function ps_' || p_table || '_insert_trigger() cascade'; execute 'drop function ps_' || p_table || '_raise_trigger() cascade'; execute 'drop function ps_' || p_table || '_update_trigger() cascade'; execute 'drop function ps_' || p_table || '_delete_trigger() cascade'; delete from ps_column where table_id = l_table; delete from ps_table where id = l_table; end if; perform 1 from ps_range_partition where table_id = l_table; if not FOUND then delete from ps_column where table_id = l_table and type_name = 'date'; end if; execute 'drop table ' || p_table || '_' || l_start_str; end; $$ language plpgsql;
जब कोई खंड हटा दिया जाता है, तो डेटा, निश्चित रूप से खो नहीं जाता है, लेकिन मुख्य तालिका में स्थानांतरित कर दिया जाता है (ट्रिगर पहले से हटा दिए जाते हैं, क्योंकि यह निकला, एकमात्र कीवर्ड सम्मिलित विवरण में काम नहीं करता है)।
यह "लाइव" डेटा स्नैपशॉट प्रबंधित करने के लिए फ़ंक्शंस जोड़ना है:
ps_add_snapshot_column (varchar, varchar, varchar, varchar) create or replace function ps_add_snapshot_column(in p_snapshot varchar, in p_column varchar, in p_parent varchar, in p_type varchar) returns void as $$ declare l_table bigint; begin perform 1 from ps_table where lower(name) = lower(p_snapshot); if not FOUND then insert into ps_table(name) values (lower(p_snapshot)); end if; select id into l_table from ps_table where lower(name) = lower(p_snapshot); insert into ps_column(table_id, name, parent_name, type_name) values (l_table, lower(p_column), lower(p_parent), p_type); end; $$ language plpgsql;
ps_add_snapshot (varchar, varchar, varchar) create or replace function ps_add_snapshot(in p_table varchar, in p_snapshot varchar, in p_type varchar) returns void as $$ declare l_sql text; l_table bigint; l_snapshot bigint; l_flag boolean; columns record; begin select id into l_snapshot from ps_table where lower(name) = lower(p_snapshot); perform 1 from ps_column where table_id = l_snapshot and type_name in ('date', 'key'); if not FOUND then raise EXCEPTION 'Key columns not found'; end if; perform 1 from ps_column where table_id = l_snapshot and not type_name in ('date', 'key', 'nullable'); if not FOUND then raise EXCEPTION 'Aggregate columns not found'; end if; perform 1 from ps_table where lower(name) = lower(p_table); if not FOUND then insert into ps_table(name) values (lower(p_table)); end if; select id into l_table from ps_table where lower(name) = lower(p_table); insert into ps_snapshot(table_id, snapshot_id, type_name) values (l_table, l_snapshot, p_type); perform ps_trigger_regenerate(l_table); l_sql := 'create table ' || p_snapshot || ' ('; l_flag := FALSE; for columns in select name, type_name from ps_column where table_id = l_snapshot loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; if columns.type_name = 'date' then l_sql := l_sql || columns.name || ' date not null'; else l_sql := l_sql || columns.name || ' bigint not null'; end if; end loop; l_sql := l_sql || ', primary key ('; l_flag := FALSE; for columns in select name from ps_column where table_id = l_snapshot and type_name in ('date', 'key', 'nullable') loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; l_sql := l_sql || columns.name; end loop; l_sql := l_sql || '))'; execute l_sql; execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table; l_sql := 'create trigger ps_' || p_table || '_before_insert ' || 'before insert on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_insert_trigger()'; execute l_sql; perform 1 from ps_snapshot a, ps_column b where b.table_id = a.snapshot_id and a.table_id = l_table and b.type_name in ('min', 'max'); if FOUND then l_sql := 'create trigger ps_' || p_table || '_after_update ' || 'after update on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_after_delete ' || 'after delete on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; else l_sql := 'create trigger ps_' || p_table || '_after_update ' || 'after update on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_update_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_after_delete ' || 'after delete on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_delete_trigger()'; execute l_sql; end if; l_sql := 'insert into ' || p_snapshot || '('; l_flag := FALSE; for columns in select name from ps_column where table_id = l_snapshot loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; l_sql := l_sql || columns.name; end loop; l_sql := l_sql || ') select '; l_flag := FALSE; for columns in select parent_name as name, type_name from ps_column where table_id = l_snapshot loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; if columns.type_name = 'date' then l_sql := l_sql || 'date_trunc(lower(''' || p_type || '''), ' || columns.name || ')'; end if; if columns.type_name = 'key' then l_sql := l_sql || columns.name; end if; if columns.type_name = 'nullable' then l_sql := l_sql || 'coalesce(' || columns.name || ', 0)'; end if; if columns.type_name = 'sum' then l_sql := l_sql || 'sum(' || columns.name || ')'; end if; if columns.type_name = 'min' then l_sql := l_sql || 'min(' || columns.name || ')'; end if; if columns.type_name = 'max' then l_sql := l_sql || 'max(' || columns.name || ')'; end if; if columns.type_name = 'cnt' then l_sql := l_sql || 'count(' || columns.name || ')'; end if; end loop; l_sql := l_sql || 'from ' || p_table || ' group by '; l_flag := FALSE; for columns in select parent_name as name, type_name from ps_column where table_id = l_snapshot and type_name in ('date', 'key', 'nullable') loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; if columns.type_name = 'date' then l_sql := l_sql || 'date_trunc(lower(''' || p_type || '''), ' || columns.name || ')'; else l_sql := l_sql || columns.name; end if; end loop; execute l_sql; end; $$ language plpgsql;
ps_del_snapshot (varchar) create or replace function ps_del_snapshot(in p_snapshot varchar) returns void as $$ declare l_sql text; p_table varchar(50); l_table bigint; l_snapshot bigint; begin select a.table_id, c.name into l_table, p_table from ps_snapshot a, ps_table b, ps_table c where b.id = a.snapshot_id and c.id = a.table_id and lower(b.name) = lower(p_snapshot); select id into l_snapshot from ps_table where lower(name) = lower(p_snapshot); delete from ps_snapshot where snapshot_id = l_snapshot; delete from ps_column where table_id = l_snapshot; delete from ps_table where id = l_snapshot; execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table; perform 1 from ( select 1 from ps_range_partition where table_id = l_table union all select 1 from ps_snapshot where table_id = l_table ) a; if not FOUND then execute 'drop function if exists ps_' || p_table || '_insert_trigger() cascade'; execute 'drop function if exists ps_' || p_table || '_raise_trigger() cascade'; execute 'drop function if exists ps_' || p_table || '_update_trigger() cascade'; execute 'drop function if exists ps_' || p_table || '_delete_trigger() cascade'; else perform ps_trigger_regenerate(l_table); l_sql := 'create trigger ps_' || p_table || '_before_insert ' || 'before insert on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_insert_trigger()'; execute l_sql; perform 1 from ps_snapshot a, ps_column b where b.table_id = a.snapshot_id and a.table_id = l_table and b.type_name in ('min', 'max'); if FOUND then l_sql := 'create trigger ps_' || p_table || '_after_update ' || 'after update on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_after_delete ' || 'after delete on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; else l_sql := 'create trigger ps_' || p_table || '_after_update ' || 'after update on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_update_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_after_delete ' || 'after delete on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_delete_trigger()'; execute l_sql; end if; end if; execute 'drop table if exists ' || p_snapshot; end; $$ language plpgsql;
यहां भी, मौलिक रूप से नया कुछ भी नहीं है और केवल एक चीज जो मैं नोट करना चाहूंगा, वह यह है कि, मिनट या अधिकतम समुच्चय का उपयोग करने के मामले में, ट्रिगर बनाते समय, ps_Table_raise_trigger () फ़ंक्शन का उपयोग किया जाता है, जो हटाने और तालिका में परिवर्तन को प्रतिबंधित करता है। जो स्नैपशॉट का निर्माण किया। ऐसा इसलिए किया जाता है क्योंकि मैं अपडेट को निष्पादित करते समय और स्रोत तालिका में स्टेटमेंट को हटाते समय इन समुच्चय को अद्यतन करने के लिए पर्याप्त प्रदर्शन कार्यान्वयन के साथ नहीं आ सका।
आइए देखें कि यह सब कैसे काम करता है। एक परीक्षण तालिका बनाएँ:
create sequence test_seq; create table test ( id bigint default nextval('test_seq') not null, event_time timestamp not null, customer_id bigint not null, value bigint not null, primary key(id) );
अब, एक अनुभाग जोड़ने के लिए, यह निम्नलिखित प्रश्न को निष्पादित करने के लिए पर्याप्त है:
select ps_add_range_partition('test', 'event_time', 'month', to_date('20130501', 'YYYYMMDD'))
परिणामस्वरूप, विरासत में मिली तालिका test_20130501 बनाई जाएगी, जिसमें मई महीने के सभी रिकॉर्ड अपने आप गिर जाएंगे।किसी अनुभाग को हटाने के लिए, आप निम्नलिखित क्वेरी चला सकते हैं: select ps_del_range_partition('test', to_date('20130501', 'YYYYMMDD'))
स्नैपशॉट बनाना थोड़ा अधिक जटिल है क्योंकि आपको पहले हमारे लिए रुचि के कॉलम निर्धारित करने की आवश्यकता है: select ps_add_snapshot_column('test_month', 'customer_id', 'key') select ps_add_snapshot_column('test_month', 'event_time', 'date') select ps_add_snapshot_column('test_month', 'value_sum', 'value', 'sum') select ps_add_snapshot_column('test_month', 'value_cnt', 'value', 'cnt') select ps_add_snapshot_column('test_month', 'value_max', 'value', 'max') select ps_add_snapshot('test', 'test_month', 'month')
परिणामस्वरूप, निम्नलिखित क्वेरी के आधार पर स्वचालित रूप से अपडेट की गई तालिका बनाई जाएगी: select customer_id, date_trunc('month', event_time), sum(value) as value_sum, count(value) as value_cnt, max(value) as value_max from test group by customer_id, date_trunc('month', event_time)
आप निम्नलिखित क्वेरी चलाकर स्नैपशॉट निकाल सकते हैं: select ps_del_snapshot('test_month')
आज के लिए बस इतना ही। GitHub पर लिपियों को उठाया जा सकता है ।