ChkDB |
Open Source Rule-Based Data Checking |
|
Home Contact Download Releases Open Source License Project Contributing The PL/SQL Using ChkDB Installation Read-Me Rule-Sets ChkDB Tables E-R Diagram
Tiny Sample External Links Data Quality and Data Profiling (a glossary) |
The PL/SQL PackageIf you want to get the ChkDB software, you should go to the Download page. If you just want to look, this is the PL/SQL package that is the heart of the ChkDB software (without the license section)...
-- #################################################################
-- Create (or replace) the PL/SQL package CHKDB_CHECK_DB
--
-- This package provides procedures for doing rules-based
-- data checking. Procedures are provided to:
-- - run checks on one or more tables in a domain
-- - remove one or more domains from the database
--
-- See copyright notice and license at end of this file.
--
-- =========================================================
-- Using Procedure: APPLY_RULES (domain_name, table_spec)
--
-- What the procedure does:
-- Apply active rules (listed in CHKDB_RULE)
-- in active rule-sets (listed in CHKDB_RULESET)
-- to rows in active tables (listed in CHKDB_TABLE)
-- that have names like 'table_spec'
-- in the 'domain_name' domain (listed in CHKDB_DOMAIN).
-- ('table_spec' may include the wild-card character '%')
--
-- When a row in a table is selected by a rule,
-- a row is inserted into the CHKDB_RESULT table.
--
-- Before any rules are applied to a table, all rows
-- for that table are deleted from the CHKDB_RESULT table.
--
-- The sql generated to apply each rule is stored in
-- table CHKDB_RULE_SQL.
--
-- If an error occurs while attempting to run the sql
-- that is generated for a rule, a row describing the
-- the error is written to table CHKDB_RULE_ERR.
--
-- Examples (using SQL*Plus)...
--
-- 1 - to check the EMP table in the HR domain:
-- SQL> CALL CHKDB_CHECK_DB.APPLY_RULES('HR','EMP');
--
-- 2 - to check tables like 'EMP%' in the HR domain:
-- SQL> CALL CHKDB_CHECK_DB.APPLY_RULES('HR','EMP%');
--
-- 3 - to check all active tables in the HR domain:
-- SQL> CALL CHKDB_CHECK_DB.APPLY_RULES('HR','%');
--
--
-- =========================================================
-- Using Procedure: DROP_DOMAIN_TABLES (dom_spec)
--
-- What the procedure does:
-- Drop tables listed in CHKDB_TABLE
-- where DROP_ON_REMOVE = 'Y'
-- and the table is in a domain like 'dom_spec'
--
-- Examples:
--
-- 1 - to drop tables in the HR domain:
-- SQL> CALL DROP_DOMAIN_TABLES ('HR')
--
-- 2 - to drop tables in domains like 'HR_EMP%':
-- SQL> CALL DROP_DOMAIN_TABLES ('HR_EMP%')
--
-- #########################################################
-- Create or replace the Package Spec (public interface)
CREATE OR REPLACE PACKAGE CHKDB_CHECK_DB AS
/* Externally callable procedures... */
PROCEDURE APPLY_RULES (dom_name VARCHAR2, tab_spec VARCHAR2);
PROCEDURE DROP_DOMAIN_TABLES (dom_spec VARCHAR2);
/* Externally visible constants */
-- name of table that receives check-results...
chk_out_table CONSTANT VARCHAR2(30) := 'CHKDB_RESULT';
-- name of table that receives sql for rules...
rule_sql_table CONSTANT VARCHAR2(30) := 'CHKDB_RULE_SQL';
-- name of table that receives rule-errors...
rule_err_table CONSTANT VARCHAR2(30) := 'CHKDB_RULE_ERR';
-- list of columns inserted into CHKDB_RESULT...
chk_out_cols CONSTANT VARCHAR2(500) :=
'(DATA_DOMAIN,' ||
' RULESET_ID,' ||
' TABLE_NAME,' ||
' RULE_ID,' ||
' SELECTED_ROWID,' ||
' ROW_PRIMARY_KEY,' ||
' RULE_CLASS,' ||
' ROW_OUT_DATA,' ||
' ROW_OUT_DESC,' ||
' CHECK_DATE)';
END CHKDB_CHECK_DB;
/
-- #########################################################
-- Create or replace the Package Body
CREATE OR REPLACE PACKAGE BODY CHKDB_CHECK_DB AS
/* Forward declaration of private subprograms */
PROCEDURE del_old_output (tab_name VARCHAR2);
FUNCTION key_col (key_list VARCHAR2, col_num NUMBER)
RETURN VARCHAR2;
FUNCTION mk_col_list_expr (tab_name VARCHAR2,
col_list VARCHAR2) RETURN VARCHAR2;
FUNCTION mk_col_list_desc (col_list VARCHAR2) RETURN VARCHAR2;
FUNCTION mk_predef_where_cls (COLUMN_NAME VARCHAR2,
COLUMN_TYPE_CNDO VARCHAR2,
NOT_NULL_YN CHAR,
MIN_VALUE VARCHAR2,
MAX_VALUE VARCHAR2,
MAX_LENGTH NUMBER,
IS_A_NUMBER_YN CHAR,
NOT_BEFORE_TODAY_YN VARCHAR2,
NOT_AFTER_TODAY_YN VARCHAR2,
IN_LIST VARCHAR2,
NO_ILLEGAL_CHARS VARCHAR2,
COLUMN_2_NAME VARCHAR2,
COL_NOT_EQ_COL2_YN CHAR,
COL_NOT_NE_COL2_YN CHAR,
COL_NOT_LT_COL2_YN CHAR,
COL_NOT_GT_COL2_YN CHAR,
COL_MINUS_COL2_MIN NUMBER,
COL_MINUS_COL2_MAX NUMBER)
RETURN VARCHAR2;
PROCEDURE mk_predef_out_spec (COLUMN_NAME IN VARCHAR2,
COLUMN_TYPE_CNDO IN VARCHAR2,
COLUMN_2_NAME IN VARCHAR2,
OUT_SPEC OUT VARCHAR2,
OUT_DESC OUT VARCHAR2);
FUNCTION mk_fk_where_cls (THIS_DATA_DOMAIN VARCHAR2,
THIS_TABLE_NAME VARCHAR2,
THIS_TABLE_ALIAS CHAR,
FOREIGN_KEY_LIST VARCHAR2,
REF_DATA_DOMAIN VARCHAR2,
REF_TABLE_NAME VARCHAR2,
REF_KEY_LIST VARCHAR2,
FK_NOT_ORPHAN_YN CHAR,
FK_NOT_NULL_YN CHAR,
FK_NOT_PARTIAL_YN CHAR)
RETURN VARCHAR2;
FUNCTION mk_rule_sql (data_domain VARCHAR2,
ruleset_id VARCHAR2,
table_name VARCHAR2,
rule_id VARCHAR2,
rule_class VARCHAR2,
primary_key_spec VARCHAR2,
row_out_spec VARCHAR2,
row_out_desc VARCHAR2,
table_alias CHAR,
where_cls VARCHAR2)
RETURN VARCHAR2;
PROCEDURE apply_a_rule (data_domain VARCHAR2,
ruleset_id VARCHAR2,
table_name VARCHAR2,
rule_id VARCHAR2,
rule_sql VARCHAR2);
FUNCTION col_type_cndo (tab_name VARCHAR2,
col_name VARCHAR2)
RETURN CHAR;
-- =========================================================
/* Implementation of public procedures */
-- = = = = = = = = = = = = = = = = = = = = = = = = = =
-- Apply active rules to one or more tables in a domain
-- and write the results to the output table.
-- Note: an 'out-spec' is an SQL expression that is selected
-- from the table by the rule-sql that results in a string
-- showing the value(s) that caused the rule to select the row.
PROCEDURE APPLY_RULES (dom_name VARCHAR2, tab_spec VARCHAR2) IS
rule_tab_alias CHAR(1);
prev_table_name VARCHAR2(30) := '(no prev table)';
row_out_spec VARCHAR2(1000);
row_out_desc VARCHAR2(1000);
pri_key_spec VARCHAR2(1000);
pri_key_desc VARCHAR2(1000);
where_cls VARCHAR2(2000);
rule_sql VARCHAR2(3000);
col_type VARCHAR2(30);
cursor c_rule (dom_name VARCHAR2, tab_spec VARCHAR2) IS
select d.DATA_DOMAIN, s.RULESET_ID,
t.TABLE_NAME, t.PRIMARY_KEY_LIST,
r.RULE_ID, r.RULE_CLASS,
r.CHK_WHERE_CLS,
r.ROW_OUT_SPEC, r.ROW_OUT_DESC,
r.COLUMN_NAME, r.COLUMN_TYPE_CNDO,
r.NOT_NULL_YN,
r.MIN_VALUE, r.MAX_VALUE,
r.MAX_LENGTH, r.IS_A_NUMBER_YN,
r.NOT_BEFORE_TODAY_YN, r.NOT_AFTER_TODAY_YN,
r.IN_LIST, r.NO_ILLEGAL_CHARS,
r.COLUMN_2_NAME,
r.COL_NOT_EQ_COL2_YN, r.COL_NOT_NE_COL2_YN,
r.COL_NOT_LT_COL2_YN, r.COL_NOT_GT_COL2_YN,
r.COL_MINUS_COL2_MIN, r.COL_MINUS_COL2_MAX,
r.FOREIGN_KEY_LIST,
r.REF_DATA_DOMAIN, r.REF_TABLE_NAME,
r.REF_KEY_LIST, r.FK_NOT_ORPHAN_YN,
r.FK_NOT_NULL_YN, r.FK_NOT_PARTIAL_YN
from CHKDB_DOMAIN d, CHKDB_TABLE t,
CHKDB_RULESET s, CHKDB_RULE r
where t.DATA_DOMAIN = d.DATA_DOMAIN
and s.DATA_DOMAIN = d.DATA_DOMAIN
and r.DATA_DOMAIN = s.DATA_DOMAIN
and r.RULESET_ID = s.RULESET_ID
and r.TABLE_NAME = t.TABLE_NAME
and d.ACTIVE_YN = 'Y'
and t.ACTIVE_YN = 'Y'
and s.ACTIVE_YN = 'Y'
and r.ACTIVE_YN = 'Y'
and t.CHECKABLE_YN = 'Y'
and d.DATA_DOMAIN = upper(dom_name)
and r.TABLE_NAME like upper(tab_spec)
order by r.TABLE_NAME, r.RULESET_ID, r.RULE_ID;
BEGIN
-- in dynamic sql, this table will have an alias...
rule_tab_alias := 't';
for rule_rec in c_rule(dom_name,tab_spec) loop -- rules
-- It this is the first row for a table...
if rule_rec.TABLE_NAME <> prev_table_name then
del_old_output (rule_rec.TABLE_NAME);
pri_key_spec := mk_col_list_expr (rule_rec.TABLE_NAME,
rule_rec.PRIMARY_KEY_LIST);
pri_key_desc := mk_col_list_desc (rule_rec.PRIMARY_KEY_LIST);
end if;
-- Specified where-clause...
where_cls := rule_rec.CHK_WHERE_CLS;
row_out_spec := rule_rec.ROW_OUT_SPEC;
row_out_desc := rule_rec.ROW_OUT_DESC;
-- Pre-defined checks...
if where_cls is null and
rule_rec.COLUMN_NAME is not null then
if rule_rec.COLUMN_TYPE_CNDO is null then
col_type := col_type_cndo (rule_rec.TABLE_NAME,
rule_rec.COLUMN_NAME);
else
col_type := rule_rec.COLUMN_TYPE_CNDO;
end if;
where_cls := mk_predef_where_cls (
rule_rec.COLUMN_NAME,
col_type,
rule_rec.NOT_NULL_YN,
rule_rec.MIN_VALUE,
rule_rec.MAX_VALUE,
rule_rec.MAX_LENGTH,
rule_rec.IS_A_NUMBER_YN,
rule_rec.NOT_BEFORE_TODAY_YN,
rule_rec.NOT_AFTER_TODAY_YN,
rule_rec.IN_LIST,
rule_rec.NO_ILLEGAL_CHARS,
rule_rec.COLUMN_2_NAME,
rule_rec.COL_NOT_EQ_COL2_YN,
rule_rec.COL_NOT_NE_COL2_YN,
rule_rec.COL_NOT_LT_COL2_YN,
rule_rec.COL_NOT_GT_COL2_YN,
rule_rec.COL_MINUS_COL2_MIN,
rule_rec.COL_MINUS_COL2_MAX);
mk_predef_out_spec (rule_rec.COLUMN_NAME,
col_type,
rule_rec.COLUMN_2_NAME,
row_out_spec, row_out_desc);
end if;
-- Foreign-key checks
if where_cls is null and
rule_rec.FOREIGN_KEY_LIST is not null then
where_cls := mk_fk_where_cls (
rule_rec.DATA_DOMAIN,
rule_rec.TABLE_NAME,
rule_tab_alias,
rule_rec.FOREIGN_KEY_LIST,
rule_rec.REF_DATA_DOMAIN,
rule_rec.REF_TABLE_NAME,
rule_rec.REF_KEY_LIST,
rule_rec.FK_NOT_ORPHAN_YN,
rule_rec.FK_NOT_NULL_YN,
rule_rec.FK_NOT_PARTIAL_YN);
row_out_spec := mk_col_list_expr(rule_rec.TABLE_NAME,
rule_rec.FOREIGN_KEY_LIST);
row_out_desc := mk_col_list_desc(rule_rec.FOREIGN_KEY_LIST);
end if;
if where_cls is null then
insert into CHKDB_RULE_ERR
(DATA_DOMAIN, RULESET_ID,
TABLE_NAME, RULE_ID,
RULE_ERR_DATE,
ERR_MESSAGE)
values (rule_rec.DATA_DOMAIN, rule_rec.RULESET_ID,
rule_rec.TABLE_NAME, rule_rec.RULE_ID,
sysdate,
'Rule has no effect and was not applied');
else
rule_sql := mk_rule_sql (rule_rec.DATA_DOMAIN,
rule_rec.RULESET_ID,
rule_rec.TABLE_NAME,
rule_rec.RULE_ID,
rule_rec.RULE_CLASS,
pri_key_spec,
row_out_spec,
row_out_desc,
rule_tab_alias,
where_cls);
apply_a_rule (rule_rec.DATA_DOMAIN,
rule_rec.RULESET_ID,
rule_rec.TABLE_NAME,
rule_rec.RULE_ID,
rule_sql);
prev_table_name := rule_rec.TABLE_NAME;
end if;
end loop; -- through applicable rules
END APPLY_RULES;
-- = = = = = = = = = = = = = = = = = = = = = = = = = =
-- Drop the tables/views in one or more domains
-- where the tables/views are listed in CHKDB_TABLE
-- and DROP_ON_REMOVE = 'Y'.
-- The argument may be a domain name and it may include
-- the wildcard character '%'. To drop tables/views
-- in all domains, pass '%'.
PROCEDURE DROP_DOMAIN_TABLES (dom_spec VARCHAR2) IS
cmnd VARCHAR2(60);
table_view VARCHAR2(6);
cursor c_tab (tab_spec VARCHAR2) IS
select upper(TABLE_NAME) "TABLE_NAME",
TABLE_VIEW_TV
from CHKDB_TABLE
where DATA_DOMAIN like upper(tab_spec)
and DROP_ON_REMOVE_YN = 'Y';
BEGIN
for tab_rec in c_tab(dom_spec) loop -- tables/views
table_view := 'TABLE';
if tab_rec.TABLE_VIEW_TV = 'V' then
table_view := 'VIEW';
end if;
cmnd := 'drop ' || table_view ||
' ' || tab_rec.TABLE_NAME;
execute immediate cmnd;
end loop; -- through tables/views to drop
END DROP_DOMAIN_TABLES;
-- =========================================================
/* Implementation of private subprograms */
-- = = = = = = = = = = = = = = = = = = = = = = = = = =
-- delete check-results, rule-sql and rule-errors for a table
PROCEDURE del_old_output (tab_name VARCHAR2) IS
cmnd VARCHAR2(200);
BEGIN
cmnd := 'delete from ' || chk_out_table ||
' where TABLE_NAME = ''' || tab_name || '''';
execute immediate cmnd;
cmnd := 'delete from ' || rule_err_table ||
' where TABLE_NAME = ''' || tab_name || '''';
execute immediate cmnd;
cmnd := 'delete from ' || rule_sql_table ||
' where TABLE_NAME = ''' || tab_name || '''';
execute immediate cmnd;
END del_old_output;
-- = = = = = = = = = = = = = = = = = = = = = = = = = =
-- Return the 'col_num'th column name from 'key_list',
-- a list like 'COL1,COL2,COL3'. The list may contain
-- spaces. If col_num is too high (ie. the key does
-- not contain that many columns), return null.
FUNCTION key_col (key_list VARCHAR2, col_num NUMBER)
RETURN VARCHAR2 is
comma_pos NUMBER(4);
key_from NUMBER(4);
key_to NUMBER(4);
key_len NUMBER(4);
key_str VARCHAR2(40);
BEGIN
if col_num = 1 then
key_from := 1;
else
comma_pos := instr (key_list, ',', 1, col_num-1);
if comma_pos = 0 then
return null;
end if;
key_from := comma_pos + 1;
end if;
comma_pos := instr (key_list, ',', 1, col_num);
if comma_pos = 0 then
key_to := length(key_list);
else
key_to := comma_pos - 1;
end if;
key_len := (key_to - key_from) + 1;
if key_len > 30 then
key_len := 30;
end if;
key_str := substr(key_list,key_from,key_len);
return key_str;
END key_col;
-- = = = = = = = = = = = = = = = = = = = = = = = = = =
-- Given a table name and a list of columns like 'COL1,COL2',
-- return an SQL expression that, when selected from the
-- table, results in a string containing the column values
-- concatenated and separated by '|' like Val1|Val2
-- The returned expression could look like this:
-- 'to_char(COL1)||'|'||COL2||'|'||to_char(COL3)'
FUNCTION mk_col_list_expr (tab_name VARCHAR2,
col_list VARCHAR2) RETURN VARCHAR2 IS
cols_expr VARCHAR2(1000);
cols VARCHAR2(1000);
col_name VARCHAR2(100);
col_type CHAR(1);
iend NUMBER;
BEGIN
if col_list is null then
return 'null';
end if;
cols := replace (col_list, ' ', ''); -- remove blanks
cols_expr := ''; -- we will assemble output into here
loop -- through columns in list
while substr(cols,1,1) = ',' loop
cols := substr(cols,2);
end loop;
exit when cols is null;
iend := instr (cols, ',');
if iend = 0 then
col_name := cols;
cols := null;
else
iend := iend - 1; -- col-name doesn't include comma
col_name := substr (cols,1,iend);
cols := substr (cols,iend+2);
end if;
col_type := col_type_cndo (tab_name, col_name);
if cols_expr <> '' or cols_expr is not null then
cols_expr := cols_expr || '||''|''||';
end if;
if col_type = 'C' then
cols_expr := cols_expr || col_name;
else
cols_expr := cols_expr || 'to_char(' || col_name || ')';
end if;
end loop; -- through columns in list
return cols_expr;
END mk_col_list_expr;
-- = = = = = = = = = = = = = = = = = = = = = = = = = =
-- Given a column-list like 'COL1,COL2', return a
-- column-description that looks like 'COL1|COL2'.
-- Which is to say, convert commas to vertical bars.
-- Spaces will also be removed if they exist.
FUNCTION mk_col_list_desc (col_list VARCHAR2) RETURN VARCHAR2 IS
cols_desc VARCHAR2(1000);
BEGIN
cols_desc := col_list;
cols_desc := replace (cols_desc, ' ', '');
cols_desc := replace (cols_desc, ',', '|');
return cols_desc;
END mk_col_list_desc;
-- = = = = = = = = = = = = = = = = = = = = = = = = = =
-- Make a where-clause for predefined checks.
FUNCTION mk_predef_where_cls (COLUMN_NAME VARCHAR2,
COLUMN_TYPE_CNDO VARCHAR2,
NOT_NULL_YN CHAR,
MIN_VALUE VARCHAR2,
MAX_VALUE VARCHAR2,
MAX_LENGTH NUMBER,
IS_A_NUMBER_YN CHAR,
NOT_BEFORE_TODAY_YN VARCHAR2,
NOT_AFTER_TODAY_YN VARCHAR2,
IN_LIST VARCHAR2,
NO_ILLEGAL_CHARS VARCHAR2,
COLUMN_2_NAME VARCHAR2,
COL_NOT_EQ_COL2_YN CHAR,
COL_NOT_NE_COL2_YN CHAR,
COL_NOT_LT_COL2_YN CHAR,
COL_NOT_GT_COL2_YN CHAR,
COL_MINUS_COL2_MIN NUMBER,
COL_MINUS_COL2_MAX NUMBER)
RETURN VARCHAR2 IS
wcls VARCHAR2(1000);
new_bit VARCHAR2(200);
bit1 VARCHAR2(200);
bit2 VARCHAR2(200);
len1 NUMBER(4);
BEGIN
if NOT_NULL_YN = 'Y' then
new_bit := COLUMN_NAME || ' is NULL';
if wcls is null then
wcls := new_bit;
else
wcls := wcls || ' or ' || new_bit;
end if;
end if;
if MIN_VALUE is not null then
if COLUMN_TYPE_CNDO = 'C' then
bit1 := '''' || MIN_VALUE || '''';
elsif COLUMN_TYPE_CNDO = 'D' then
bit1 := 'to_date(''' || MIN_VALUE || ''',''YYYY-MM-DD'')';
else
bit1 := MIN_VALUE;
end if;
new_bit := COLUMN_NAME || '<' || bit1;
if wcls is null then
wcls := new_bit;
else
wcls := wcls || ' or ' || new_bit;
end if;
end if;
if MAX_VALUE is not null then
if COLUMN_TYPE_CNDO = 'C' then
bit1 := '''' || MAX_VALUE || '''';
elsif COLUMN_TYPE_CNDO = 'D' then
bit1 := 'to_date(''' || MAX_VALUE || ''',''YYYY-MM-DD'')';
else
bit1 := MAX_VALUE;
end if;
new_bit := COLUMN_NAME || '>' || bit1;
if wcls is null then
wcls := new_bit;
else
wcls := wcls || ' or ' || new_bit;
end if;
end if;
if MAX_LENGTH > 0 then
new_bit := 'length(' || COLUMN_NAME || ')>' ||
MAX_LENGTH;
if wcls is null then
wcls := new_bit;
else
wcls := wcls || ' or ' || new_bit;
end if;
end if;
if IS_A_NUMBER_YN = 'Y' then
new_bit := 'length(rtrim(ltrim(translate(' ||
COLUMN_NAME || ',''' ||
'-123456789.0eE' || ''',''' ||
' '')))) > 0';
if wcls is null then
wcls := new_bit;
else
wcls := wcls || ' or ' || new_bit;
end if;
end if;
if NOT_BEFORE_TODAY_YN = 'Y' then
new_bit := COLUMN_NAME || '
|