/*-------------------------------------------------------------------------------------*\
|  PROGRAM NAME:                                                                        |
|     scdm_data_qa_review-partition.sas                                                 |
|                                                                                       |
|---------------------------------------------------------------------------------------|
|  PURPOSE:                                                                             |
|     The purpose of this program is to validate partitioned data prior to performing   |
|     quality assurance on SCDM tables                                                  |
|---------------------------------------------------------------------------------------|
|  PROGRAM INPUT:                                                                       |
|     parition crosswalk                                                                |
|     SCDM tables available at DP site                                                  |
|                                                                                       |
|  PROGRAM OUTPUT:                                                                      |
|     see Workplan PDF                                                                  |
|---------------------------------------------------------------------------------------|
|  CONTACT:                                                                             |
|     Sentinel Coordinating Center                                                      |
|     info@sentinelsystem.org                                                           |
\*-------------------------------------------------------------------------------------*/

*-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-;
*  PLEASE DO NOT EDIT BELOW WITHOUT CONTACTING THE SENTINEL OPERATIONS CENTER           ;
*-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-;


*** ---------------------------------------------------------------------------------------------- ***;
*** Macro to examine crosswalk for format and components                                           ***;
*** ---------------------------------------------------------------------------------------------- ***;
%macro partition_meta_chks / minoperator;

    %* Defensive Pre-Check: abort program if crosswalk does not exist *;    
    %if %sysfunc(exist(qadata.&ParTable.)) = 1
    %then %do;
	    %put Crosswalk existence confirmed;
        %put Package will continue processing;
    %end ;    
    
    %else %do;
	    data _null_;
            put 90*'!';
            put ' ';
            put 'ERROR: Crosswalk specified in ParTable parameter does not exist. Program is aborting.';
            put ' ';
            put '==> The crosswalk specified in ParTable needs to exist.';
            put '==> Please check for its existence or change ParTable in the master program.';
            put ' ';
            put '==> Processing will abort.';
            put ' ';
            put 90*'!';
            put ' ';
        run;
        %abort cancel 99 ;
    %end ;  
    
    %* Defensive Pre-Check: abort program if crosswalk file format invalid *;   
    proc sql noprint ;
        create table _metaPart as 
            select   memname
                   , lowcase(name) as varname
                   , lowcase(type) as vartype
                from dictionary.columns
                where     libname="QADATA"
                      and memname = "%upcase(&ParTable.)"
                      and upcase(name) in ('PARTITIONID', 'PATID'); 
    quit;
    
    %* set parameter to default parameters to abort;
    %let PARTCHK_ABORT     = 1; 
    %let PATID_EXIST       = 0;
    %let PATID_TYPE        = 0; 
    %let PARTITIONID_EXIST = 0;
    %let PARTITIONID_TYPE  = 0;

    data _null_;

        RETAIN PATID_EXIST PATID_TYPE PARTITIONID_EXIST PARTITIONID_TYPE;

        set _metaPart end = eof;                 
       
        if varname='patid'
        then do;
            PATID_EXIST = 1;
            PATID_TYPE = (vartype = 'num');
        end;

        else if varname='partitionid'
        then do;
            PARTITIONID_EXIST = 1;
            PARTITIONID_TYPE = (vartype = 'num');
        end;
        
        if eof then do;
            call symputx ('PATID_EXIST',       PATID_EXIST);
            call symputx ('PATID_TYPE',        PATID_TYPE);
            call symputx ('PARTITIONID_EXIST', PARTITIONID_EXIST);
            call symputx ('PARTITIONID_TYPE',  PARTITIONID_TYPE);
            
            _abort_ = (sum(PATID_EXIST, PATID_TYPE, PARTITIONID_EXIST,  PARTITIONID_TYPE) ne 4);
            
            call symputx ('PARTCHK_ABORT', _abort_);
        end;

    run;    
                                                
    %if &PARTCHK_ABORT.=0
    %then %do;
        %put Crosswalk dataset is in correct format;
        %put Package will continue processing;
    %end;                                    
    
    %else %do;
	    data _null_;
            put 90*'!';
            put ' ';
            put 'ERROR: Crosswalk format invalid. Program is aborting';
            put ' ';            
            %if &PATID_EXIST. ne 1 %then %do; 
               put '==> Crosswalk does not include PatID';            
            %end ;
            %if &PATID_EXIST.=1 and &PATID_TYPE. ne 1 %then %do; 
               put '==> Crosswalk PatID is not numeric';            
            %end ;
            %if &PARTITIONID_EXIST. ne 1 %then %do; 
               put '==> Crosswalk does not include PartitionID';            
            %end ;
            %if &PARTITIONID_EXIST. = 1 and &PARTITIONID_TYPE. ne 1 %then %do; 
               put '==> Crosswalk PartitionID is not numeric';           
            %end ;            
            put ' ';
            put '==> Processing will abort.';
            put ' ';
            put 90*'!';
            put ' ';
        run;
        %abort cancel 99 ;
    %end;    

    proc datasets library = work nolist nowarn nodetails;
        delete _:;
    quit;

%mend partition_meta_chks;



*** ---------------------------------------------------------------------------------------------- ***;
*** Macro to examine available versus expected data                                                ***;
*** ---------------------------------------------------------------------------------------------- ***;
%macro partition_expected_scdmtab_chks;

    %* Pull list of populated global paramaters defining SCDM table names where partitioning applies;
    proc sql NOPRINT;
        select distinct   name
                        , quote(strip(lowcase(value)))
            into   :SCDMPTabList separated by ' '
                 , :SCDMValTabList separated by ','
            from dictionary.macros
                where     scope='GLOBAL'
                      and index(name, 'TABLE') > 0
                      and name not in ('FACTABLE', 'PVDTABLE', 'PRSTABLE', 'PARTABLE')
                      and not missing(value)
                      and lowcase(value) ne 'na';
    quit;
               
    %* create dataset with all expected tables;    
    data _expected ;
        length memname_prefix $32 ;
        %do i=1 %to %sysfunc(countw(&SCDMPTabList.));
            %let SCDMTabParm = %scan(&SCDMPTabList., &i.);
            %let SCMTabPrefix = %upcase(&&&SCDMTabParm.);
            memname_prefix = lowcase("&SCMTabPrefix.");
            output;
	   %end ;
    run;    
   
    %* create dataset all datasets in QADATA matching expected tables;
    proc contents data = QADATA._all_ out = _available ( keep = memname ) noprint ; 
    run;  
    
    proc sort data=_available nodupkey ; 
        by memname ;    
    run; 
          
    data _available_parse ;
    
        length memname_letters $ 32;
        
        set _available ;

        memname_letters = lowcase(PRXCHANGE ('s/\d+$//', 1, trim(memname) )) ;
        memname_numbers = substr(memname, length(memname_letters)+1) ;
        
        if memname_letters in (&SCDMValTabList.);
    run;      
      
    %* define expected scdm tables not identified;
    proc sql NOPRINT;
         create table _missing_scdm_tables as
         select distinct memname_prefix
         from _expected
         where memname_prefix not in 
             (select distinct memname_letters from _available_parse);      
             
         select count(distinct memname_prefix)
         into :nmiss_scdm
         from _missing_scdm_tables;             

    quit;   

    %if &nmiss_scdm. > 0 %then %do;

        data dplocal.missing_scdm_tables;
            set _missing_scdm_tables;
        run;
        
        %* Defensive Pre-Check: abort program if expected data sets are not found *;
	   data _null_;
            put 90*'!';
            put ' ';
            put 'ERROR: Data sets expected based on header macro variables were not found';
            put ' ';
            put '==> Please see dplocal.missing_scdm_tables for list of missing SCDM table parameters';
            put ' ';
            put '==> Processing will abort.';
            put ' ';
            put 90*'!';
            put ' ';
        run;
        %abort cancel 99 ;
    %end ;       

    %else %do;

        %put All expected SCDM tables identified;
        %put Package will continue processing;

        proc datasets library = work nolist nowarn nodetails;
            delete _:;
        quit;

    %end;   

    
%mend partition_expected_scdmtab_chks;   
    
%macro partition_expected_scdmpart_chks;

    %* Pull list of populated global paramaters defining SCDM table names where partitioning applies;
    proc sql NOPRINT;
        select distinct quote(strip(lowcase(value)))
            into :SCDMValTabList separated by ','
            from dictionary.macros
                where     scope='GLOBAL'
                      and index(name, 'TABLE') > 0
                      and name not in ('FACTABLE', 'PVDTABLE', 'PRSTABLE', 'PARTABLE')
                      and not missing(value)
                      and lowcase(value) ne 'na';
    quit;
    
   
    %* create dataset all datasets in QADATA matching expected tables;
    proc contents data = QADATA._all_ out = _available ( keep = memname ) noprint ; 
    run;
    
    proc sort data=_available nodupkey ; 
        by memname ;    
    run;

    data _available_parse ;
    
        length memname_letters $ 32;
        
        set _available ;
        memname_letters = lowcase(PRXCHANGE ('s/\d+$//', 1, trim(memname) )) ;  
        if memname_letters in (&SCDMValTabList.);
    run;
    
    proc sql NOPRINT;
        select distinct memname_letters
            into :MemPChkList separated by ' '
            from _available_parse;
    quit;    
    
    data _memp_chk;
    
        length prefix memname $ 32 ;          
        retain nmissp;
        %do m = 1 %to %sysfunc(countw(&MemPChkList.));
            %let MemChk = %scan(&MemPChkList., &m.);
            prefix = "&MemChk.";
            %do p = 1 %to &NumPartitions.;
                memname = "&MemChk.&p.";
                if exist("qadata.&MemChk.&p.")=0 then do; 
                    nmissp = sum(nmissp,1);
                    output;
                end;
            %end;
        %end;
        call symputx('nmissp', nmissp);
    run;
    
    %if &nmissp. > 0 %then %do;
    
        data dplocal.missing_partition_scdm;
            set _memp_chk (keep = memname
                           rename = (memname = missp_scdmtable));
        run;
        
        %* Defensive Pre-Check: abort program if expected data sets are not found *;
	   data _null_;
            put 90*'!';
            put ' ';
            put 'ERROR: Complete set of partitioned data not available per all SCDM tables';
            put ' ';
            put '==> Please see dplocal.missing_partition_scdm for list of missing partitioned datasets';
            put ' ';
            put '==> Processing will abort.';
            put ' ';
            put 90*'!';
            put ' ';
        run;
        %abort cancel 99 ;
    %end ;              
         
    %else %do;
        %put All expected SCDM table partitions identified;
        %put Package will continue processing;

        proc datasets library = work nolist nowarn nodetails;
            delete _:;
        quit;

    %end;   
    
%mend partition_expected_scdmpart_chks;    

*** ---------------------------------------------------------------------------------------------- ***; 
*** Macro to identify tables with invalid names per partitioning requirements                      ***;
*** ---------------------------------------------------------------------------------------------- ***;
%macro partition_appropriate_table_chks;

    %* define scdm tables that should be partitioned at site;
    %* define scdm tables that should not be partitioned at site;    
    %local SCDMParTabList SCDMNonParTabList;    
    proc sql NOPRINT;
        select distinct quote(strip(lowcase(value)))
            into :SCDMParTabList separated by ','
            from dictionary.macros
                where     scope='GLOBAL'
                      and index(name, 'TABLE') > 0
                      and name not in ('FACTABLE', 'PVDTABLE', 'PRSTABLE', 'PARTABLE')
                      and not missing(value)
                      and lowcase(value) ne 'na';

        select distinct quote(strip(lowcase(value)))
            into :SCDMNonParTabList separated by ','
            from dictionary.macros
                where     scope='GLOBAL'
                      and index(name, 'TABLE') > 0
                      and name in ('FACTABLE', 'PVDTABLE', 'PRSTABLE', 'PARTABLE',)
                      and not missing(value)
                      and lowcase(value) ne 'na';
    quit;      
     
    
    %* identify any scdm tables that should be partitioned and are not or should not be partitioned but are;
    proc contents data = QADATA._all_ out = _available ( keep = memname ) noprint ; 
    run;
    
    proc sort data=_available nodupkey ; 
        by memname ;    
    run;

    data _invalpart;
    
        length memname_prefix memname_suffix $ 32;
            
        set _available ;

        memname_prefix = lowcase(PRXCHANGE ('s/\d+$//', 1, trim(memname) )) ;
        memname_suffix = substr(memname, length(memname_prefix)+1);
             
        if      (memname_prefix in (&SCDMParTabList.)    and not(1<=input(memname_suffix,8.)<=&NumPartitions.))
             or (memname_prefix in (&SCDMNonParTabList.) and not(missing(memname_suffix)))
       then output;
        
    run;    
    
    %local inval_part_table;
    data _NULL_;
        if 0 then set _invalpart nobs=n;
	    call symputx('inval_part_table',n);
	    stop;
    run;

    %* if inval partitioned data found, abort processing;
    %if &inval_part_table. %then %do; 
    
        data dplocal.ErrorPTabNm;
            set _invalpart (keep = memname);
        run;    
        
        data _null_;
            put 90*'!';
            put ' ';
            put 'ERROR: SCDM tables not partitioned as expected';
            put ' ';
            put '==> Please see DPLOCAL.ErrorPTabNm for name of tables'; 
            put ' ';
            put '==> Processing will abort.';
            put ' ';
            put 90*'!';
            put ' ';
        run; 
        %abort cancel 99 ;     
    %end;

    %else %do;
    
        %put SCDM tables meet requirements for partitioning;
        %put Package Will continue processing;

        proc datasets library = work nolist nowarn nodetails;
            delete _:;
        quit;

    %end;

%mend partition_appropriate_table_chks;

*** ---------------------------------------------------------------------------------------------- ***;
*** Macro to identify duplicate entries in partition patID crosswalk file                          ***;
*** ---------------------------------------------------------------------------------------------- ***;
%macro partition_dup_chks;

    proc sql;
        create table _pdwalk_dups as
            select  patid
                  , count(*) as nrecords
                  , count(distinct PartitionID) as nPartitions
            from qadata.&ParTable.
            group by patid
            having nrecords > 1 
                   or nPartitions >1;;
    quit;
 
    data _NULL_;
	    if 0 then set _pdwalk_dups nobs=n;
	    call symputx('pdwalk_dups',n);
	    stop;
    run;

    %if &pdwalk_dups. %then %do;

        data dplocal.pdwalk_dups;
            set _pdwalk_dups;
        run;

	    data _null_;
            put 90*'!';
            put ' ';
            put "ERROR: duplicate records in dataset qadata.&ParTable.";
            put ' ';
            put '==> Please see dplocal.pdwalk_dups for list of PatID values with problematic entries'; 
            put ' ';
            put '==> Processing will abort.';
            put ' ';
            put 90*'!';
            put ' ';
       run;
       %abort cancel 99 ;

    %end;

    %else %do;

        %put No duplicates found in qadata.&ParTable.;
        %put Package Will continue processing;

        proc datasets library = work nolist nowarn nodetails;
            delete _:;
        quit;

    %end;
  

%mend partition_dup_chks;

*** ---------------------------------------------------------------------------------------------- ***;
*** Macro to inconsistency in file format by SCDM table and across all SCDM tables                 ***;
*** ---------------------------------------------------------------------------------------------- ***;
%macro partition_consistent_fmt_chks;

    %* Pull list of populated global paramaters defining SCDM table names where partitioning applies;
    %local SCDMPTabList;
    proc sql noprint;
        select distinct name
            into :SCDMPTabList separated by ' '
            from dictionary.macros
                where     scope='GLOBAL'
                      and index(name, 'TABLE') > 0
                      and name not in ('FACTABLE', 'PVDTABLE', 'PRSTABLE', 'PARTABLE', '_PARTABLE')
                      and not missing(value);
    quit;    

    %* identfy inconsistent file format across partitions of single SCDM table ;
    %do a = 1 %to %sysfunc(countw(&SCDMPTabList.));

         %let SCDMTabParm = %scan(&SCDMPTabList., &a.);
         %let SCMTabPrefix = %upcase(&&&SCDMTabParm.);
         
         proc sql noprint ;
              create table _m&SCMTabPrefix. as 
                select   memname
                       , "&SCMTabPrefix" as prefix length =32 
                       , input(substr(memname, length(calculated prefix)+1), 8.) as suffix
                       , lowcase(name) as varname
                       , type
                       , length
                       , format
                from dictionary.columns
                where     libname="QADATA"
                      and index(memname, "&SCMTabPrefix.")=1
                having calculated suffix>0 ;

              create table _f&SCMTabPrefix. as 
                select   prefix
                       , varname 
                       , type
                       , length
                       , format
                       , count(*) as records
                from _m&SCMTabPrefix. 
                group by prefix, varname, type, length, format
                having records ne &NumPartitions.;
        quit;       

        data _NULL_;
	        if 0 then set _f&SCMTabPrefix. nobs=n;
	        call symputx('invalid_fmt_n',n);
	        stop;
        run;

        %if &invalid_fmt_n. %then %do;
            data DPLOCAL.F&SCMTabPrefix.;
                set _f&SCMTabPrefix.;
            run;

	        data _null_;
                put 90*'!';
                put ' ';
                put "ERROR: variables inconsistently defined across partitions for SCDM Table &SCMTabPrefix.";
                put ' ';
                put "==> Please see DPLOCAL.F&SCMTabPrefix. for further detail on variable attributes"; 
                put ' ';
                put '==> Processing will abort.';
                put ' ';
                put 90*'!';
                put ' ';
            run; 
            %abort cancel 99 ;
        %end;

        %else %do;
            %put File formats valid for all paritions of &SCMTabPrefix.;
            %put Package Will continue processing;
        %end;
    %end;

    %* identfy file format across all SCDM tables ;
    data _allSCDM ;
         set %do a = 1 %to %sysfunc(countw(&SCDMPTabList.));
                %let SCDMTabParm = %scan(&SCDMPTabList., &a.);
                %let SCMTabPrefix = %upcase(&&&SCDMTabParm.);
                _m&SCMTabPrefix.
             %end;;
    run;

    proc sql noprint ;
        create table _allSCDMVar as
            select distinct   varname
                            , type
                            , length
            from _allSCDM ;

        create table _FallSCDM as
            select   varname
                   , type
                   , length
                   , count(*) as records
            from (select distinct varname, type, length
                  from _allSCDM)
            group by varname
            having records >1;
    quit;

    data _NULL_;
        if 0 then set _FallSCDM nobs=n;
	    call symputx('invalid_all_fmt_n',n);
	    stop;
    run;

    %if &invalid_all_fmt_n. %then %do; 
        data dplocal.FallSCDM;
            set _FallSCDM (drop = records);
        run;

        data _null_;
            put 90*'!';
            put ' ';
            put 'ERROR: Variables inconsistently defined across SCDM Database';
            put ' ';
            put '==> Please see DPLOCAL.FallSCDM for further detail on SCDM table attributes'; 
            put ' ';
            put '==> Processing will abort.';
            put ' ';
            put 90*'!';
            put ' ';
        run; 
        %abort cancel 99 ;
    %end;

    %else %do;
        %put File formats valid across SCDM tables;
        %put Package Will continue processing;

        proc datasets library = work nolist nowarn nodetails;
            delete _:;
        quit;

    %end;


%mend partition_consistent_fmt_chks;