Pages

Showing posts with label BULK COLLECT. Show all posts
Showing posts with label BULK COLLECT. Show all posts

Thursday, June 8, 2017

Compare Regular Cursor vs Bulk Collect

=====================================
General
=====================================
Overview: 
There is a table, which is source table.
The records from this table should be processed, and inserted into a target table.
Once done, records from source table are deleted.

This Package handles same task in two ways:
convertSdr2CdrCursor
convertSdr2CdrBulk

convertSdr2CdrCursor
The records are selected via regular cursor, and processed one by one.

convertSdr2CdrBulk
The records are selected via BULK COLLECT, and processed one by one.

Since the processing is done one by one, there is no real advantage for using BULK COLLECT.

=====================================
The time to execute in both options
=====================================
convertSdr2CdrCursorDelete 50000 records using regular cursor
Time to Execute: 999(sec), rows deleted: 50000
50000/999=50 rows/sec

convertSdr2CdrBulk
Delete 50000 records using BULK COLLECT
Time to Execute: 1249(sec), rows deleted: 60762

60762/1249 = 48 rows/sec

Both options were far superior to not using bind variables.
Without using bind variables, the time was 6 rows/sec.

=====================================
Code
=====================================
CREATE OR REPLACE PACKAGE BODY SDR_2_CDR_CONVERT_ALEC AS

PROCEDURE write_to_log(p_data in varchar2) is
  pragma autonomous_transaction;

begin
  insert into SGA_W_LOG (procedure_name, data, ts_last_modified) values ('ConvertSdr2Cdr', p_data, sysdate);
  commit;

end;

-------------------------------------------------------
-- Process using Regular Cursor
-------------------------------------------------------

PROCEDURE convertSdr2CdrCursor (pProductName in varchar2) is

  cursor SdrCrs(pProductName in varchar2) is
   SELECT  cdr_id,
           outgoing_call_direction
     from DEBUG_GEN_W_NEW_SDR_ALEC
     where rownum < 50000
     and product_name = pProductName
     and ((pProductName = 'MOCO' and event_name = 'IDP') OR (pProductName <> 'MOCO'))
     for update;

  Vsql         VARCHAR2(5000);
  v_sql_type1  VARCHAR2(5000);
  v_sql_type2  VARCHAR2(5000);
  
  v_start_date DATE;
  v_end_date   DATE;
  v_seconds    NUMBER;
  
BEGIN
  v_start_date := SYSDATE;
  write_to_log('start running convertSdr2CdrCursor procedure at '||TO_CHAR(v_start_date,'YYYYMMDD hh24:mi:ss') );
  
  v_sql_type1 := GenerateCdr(1);
  v_sql_type2 := GenerateCdr(2);

  -- for each SDR create the mapped CDRs
  for SdrRec in SdrCrs(pProductName) loop

    begin

    Vsql := v_sql_type1;

-- create the insert statement and execute it
    Vsql := 'insert into CDR_ALEC select ' || Vsql || ' from DEBUG_GEN_W_NEW_SDR_ALEC where cdr_id = :b1';
    EXECUTE IMMEDIATE Vsql USING SdrRec.cdr_id;


    -- Generate CDR for Leg2
    if SdrRec.outgoing_call_direction is not null then
       Vsql :=v_sql_type2;
       Vsql := 'insert into CDR_ALEC select ' || Vsql || ' from DEBUG_GEN_W_NEW_SDR_ALEC where cdr_id = :b1';
       EXECUTE IMMEDIATE Vsql USING SdrRec.cdr_id;
    end if;

    delete DEBUG_GEN_W_NEW_SDR_ALEC WHERE CURRENT OF SdrCrs;

    EXCEPTION
      WHEN OTHERS THEN
            write_to_log('Error for CDR_ID: '|| SdrRec.Cdr_Id || ' - ' ||sqlerrm);
            UPDATE DEBUG_GEN_W_NEW_SDR_ALEC
              SET ROAMING_TYPE ='9'
            WHERE CURRENT OF SdrCrs;
    END;
  END LOOP;

  COMMIT;
  
  v_end_date := SYSDATE;
  write_to_log('convertSdr2CdrCursor procedure completed at '||TO_CHAR(v_end_date,'YYYYMMDD hh24:mi:ss'));
  v_seconds := ROUND((v_end_date - v_start_date)*1440*60);
  write_to_log('Time to Execute: '||v_seconds||'(sec)');
  
end convertSdr2CdrCursor;


-------------------------------------------------------
-- Process using BULK COLLECT
-------------------------------------------------------
PROCEDURE convertSdr2CdrBulk(p_product_name IN VARCHAR2) IS

  TYPE sdr_record IS RECORD (
      sdr_rowid               ROWID,
      cdr_id                  DEBUG_GEN_W_NEW_SDR_ALEC.cdr_id%TYPE,
      outgoing_call_direction DEBUG_GEN_W_NEW_SDR_ALEC.outgoing_call_direction%TYPE
  );

  TYPE sdr_record_tab IS TABLE OF sdr_record;
  v_sdr_records   sdr_record_tab;

  CURSOR sdr_cur IS
  SELECT  ROWID, cdr_id, outgoing_call_direction
    FROM DEBUG_GEN_W_NEW_SDR_ALEC
   WHERE 1=1
     AND product_name = p_product_name
     AND ((p_product_name = 'MOCO' and event_name = 'IDP') OR (p_product_name <> 'MOCO'));
   
  v_sql                        VARCHAR2(5000);
  v_insert_sql                 VARCHAR2(5000);
  v_sql_type1                  VARCHAR2(5000);
  v_sql_type2                  VARCHAR2(5000);
  v_outgoingCallDirection      SDR_2_CDR_MAPPING_NEW.sdr_column_name%TYPE;

  v_start_date                DATE;
  v_end_date                  DATE;
  v_seconds                   NUMBER;
  v_cdr_id                    DEBUG_GEN_W_NEW_SDR_ALEC.cdr_id%TYPE;
  c_limit                     NUMBER := 50000;
  
BEGIN
  
  v_start_date := SYSDATE;
  write_to_log('Start running ConvertSdr2CdrBulk procedure at '||TO_CHAR(v_start_date,'YYYYMMDD hh24:mi:ss') );
  
  v_sql_type1 := GenerateCdr(1);
  v_sql_type2 := GenerateCdr(2);

  SELECT sdr_column_name INTO v_outgoingCallDirection
    FROM SDR_2_CDR_MAPPING_NEW
   WHERE leg_id = 2
    AND cdr_column_name = 'CALL_DIRECTION';

  write_to_log('Before Opening Cursor');
  
   OPEN sdr_cur;
   --Loop on Cursor
   LOOP
     BEGIN   
       FETCH sdr_cur 
         BULK COLLECT INTO v_sdr_records LIMIT c_limit;

         IF sdr_cur%NOTFOUND THEN
             write_to_log('No More Data to Fetch from DB');
         END IF;  
        
         write_to_log('v_sdr_records.COUNT : '||v_sdr_records.COUNT );
         write_to_log('Processing Data');
         --Loop on the Bulk records
         FOR indx IN 1 .. v_sdr_records.COUNT LOOP
          
           BEGIN
             -- create the insert statement and execute it
             v_sql := v_sql_type1;           
             v_insert_sql := 'INSERT INTO CDR_ALEC SELECT ' || v_sql || ' FROM DEBUG_GEN_W_NEW_SDR_ALEC WHERE cdr_id = :b1';
             v_cdr_id :=  v_sdr_records(indx).cdr_id;
             EXECUTE IMMEDIATE v_insert_sql USING v_cdr_id;

             --check whether to create 2nd CDR for outgoing leg
             IF v_sdr_records(indx).outgoing_call_direction IS NOT NULL THEN
               v_sql := v_sql_type2;
               v_insert_sql := 'INSERT INTO CDR_ALEC SELECT ' || v_sql || ' FROM DEBUG_GEN_W_NEW_SDR_ALEC WHERE cdr_id = :b1';               
               EXECUTE IMMEDIATE v_insert_sql USING  v_sdr_records(indx).cdr_id;
             END IF; 

             DELETE DEBUG_GEN_W_NEW_SDR_ALEC WHERE DEBUG_GEN_W_NEW_SDR_ALEC.rowid = v_sdr_records(indx).sdr_rowid;
           EXCEPTION
             WHEN OTHERS THEN
               write_to_log('ConvertSdr2CdrBulk Error for CDR_ID: '|| v_sdr_records(indx).cdr_id || ' Error Details: ' ||SQLERRM);
               UPDATE DEBUG_GEN_W_NEW_SDR_ALEC
               SET ROAMING_TYPE ='9'
             WHERE cdr_id =  v_sdr_records(indx).cdr_id;
           END;   
                

        END LOOP;
        EXIT WHEN sdr_cur%NOTFOUND;

     EXCEPTION
       WHEN OTHERS THEN
         write_to_log('Unexpected Error in ConvertSdr2CdrBulk: '||SQLERRM);          
     END;
     
     END LOOP;
     CLOSE sdr_cur;
     COMMIT;
  v_end_date := SYSDATE;
  write_to_log('ConvertSdr2CdrBulk procedure completed at '||TO_CHAR(v_end_date,'YYYYMMDD hh24:mi:ss'));
  v_seconds := ROUND((v_end_date - v_start_date)*1440*60);
  write_to_log('Time to Execute: '||v_seconds||'(sec)');
  

END ConvertSdr2CdrBulk;

END SDR_2_CDR_CONVERT_ALEC;

Sunday, April 24, 2016

Code Example - Delete rows with BULK COLLECT

=====================
General
=====================

This is a code example of a job which is calling a generic procedure, which is calling a dedicated procedure, to delete data from tables using BULK_COLLECT

=====================
Code
=====================
Job: 
Generic Procedure:
Dedicated Procedure:  

=====================
Job
=====================
DECLARE
   v_job_number NUMBER(10);
BEGIN
 DBMS_JOB.SUBMIT (JOB => v_job_number, 
                  WHAT => 'ADMIN_UTIL.CALL_PURGE_COUNT_HIST(5,20000);', 
                  NEXT_DATE => TRUNC(SYSDATE + 1)+3/24+30/1440, 
                  INTERVAL => 'TRUNC(SYSDATE + 1)+3/24+30/1440'
 );
 COMMIT;
END;
/

=====================
Generic Procedure
=====================
CREATE OR REPLACE PACKAGE BODY ADMIN_UTIL  IS
  ------------------------------------
  PROCEDURE CALL_PURGE_COUNT_HIST(p_days IN NUMBER, p_delete IN NUMBER) IS
    v_module_name    SGA_W_LOG.procedure_name%TYPE;
    v_msg_text      SGA_W_LOG.data%TYPE;
    v_rows_deleted        NUMBER;
    v_total_rows_deleted  NUMBER;

  BEGIN
    v_module_name := 'ADMIN_UTIL.CALL_PURGE_COUNT_HIST';
    v_rows_deleted := 1;
    v_total_rows_deleted := 0;

    v_msg_text := 'Produre is Starting';
    SGA_PKG.write_sga_w_log(v_module_name, v_msg_text);

    v_msg_text := 'Running Delete with Parameters: Keep Days: '||p_days||' '||'Delete Chunk: '||p_delete;
    SGA_PKG.write_sga_w_log(v_module_name, v_msg_text);

    WHILE v_rows_deleted > 0 LOOP
      GA_PKG.PurgeCountersHistory(v_rows_deleted,p_days, p_delete);
      v_total_rows_deleted := v_total_rows_deleted + v_rows_deleted;
    END LOOP;
    
    v_msg_text := 'Number of Deleted Rows: '||TO_CHAR(v_total_rows_deleted);
    SGA_PKG.write_sga_w_log(v_module_name, v_msg_text);

    v_msg_text := 'Produre has Finished';
    SGA_PKG.write_sga_w_log(v_module_name, v_msg_text);

  EXCEPTION
    WHEN OTHERS THEN
      v_msg_text := 'Error in procedure '||v_module_name||'. Error Details: '|| SUBSTR(SQLERRM, 1, 900);
      SGA_PKG.write_sga_w_log(v_module_name,v_msg_text);
  END CALL_PURGE_COUNT_HIST;
  ------------------------------------
END ADMIN_UTIL;

=====================
Dedicated Procedure
=====================
CREATE OR REPLACE PACKAGE BODY GA_PKG  IS

  PROCEDURE PurgeCountersHistory(p_retvalue OUT NUMBER,p_days IN NUMBER, p_delete IN NUMBER) IS

   last_date DATE;

   TYPE MYROWIDRESULT IS TABLE OF  ROWID INDEX BY BINARY_INTEGER ;
   v_rowidresult  MYROWIDRESULT;

  BEGIN

    last_date := SYSDATE - p_days;
p_retvalue := 0;

   -- SELECT  ROWID FOR RECORDS NEED TO BE DELETED
    SELECT  ROWID
      BULK COLLECT
      INTO v_rowidresult
      FROM COUNTERS_HISTORY
     WHERE ts_last_modified < last_date
       AND rownum < (p_delete +1); -- SO THE DELETE WILL EXACTLY AS REQUESTED

   -- DELETE ROWS WHICH THEIR LAST LOCATION UPDATE WAS P_HOURS BEFORE.
   BEGIN
        FORALL ROWIDINDEX IN NVL(V_ROWIDRESULT.FIRST,1)..NVL(V_ROWIDRESULT.LAST,0)
        DELETE FROM COUNTERS_HISTORY WHERE ROWID = V_ROWIDRESULT(ROWIDINDEX) ;

        p_retvalue :=  SQL%ROWCOUNT;

COMMIT;

   EXCEPTION
      WHEN OTHERS THEN
   p_retvalue := -1;
         RAISE_APPLICATION_ERROR(-20000,'PURGE COUNTERS_HISTORY, DELETE ROWS ' || SQLCODE || SQLERRM);
   END;

   COMMIT;
-- INSERT ROW TO LOG TABLE WITH THE NUMBER OF ROWS WHICH WERE DELETED
   BEGIN
      INSERT INTO SGA_W_LOG (procedure_name, data, ts_last_modified)
 VALUES ('GA_PKG.PURGECOUNTERSHISTORY', TO_CHAR(p_retvalue), SYSDATE);
          COMMIT;

   EXCEPTION
      WHEN OTHERS THEN
         RAISE_APPLICATION_ERROR(-20000,'PURGE COUNTERS_HISTORY, INSERT ROW TO THE LOG ' || SQLCODE || ' ' || SQLERRM);
   END;

  EXCEPTION
   WHEN OTHERS THEN
      RAISE_APPLICATION_ERROR(-20000,'PURGE COUNTERS_HISTORY :' || SQLCODE || SQLERRM);
  END;

END GA_PKG ;


Tuesday, December 16, 2014

PL/SQL Procedure to delete N rows at a time with BULK COLLECT

Procedure that does FETCH .. BULK COLLECT .. INTO
And then deletes these rows using FORALL i IN 1.. DELETE
Another procedure is writing to a log table.


===========================================
Procedure to delete N rows at a time.
===========================================
CREATE OR REPLACE PROCEDURE PURGE_TABLE(p_num_of_days IN NUMBER) IS

  v_updated_rows      NUMBER := 0;
  v_loop_num          NUMBER := 0;
  v_rows_to_process   NUMBER := 50000;

  TYPE rowid_type is table of rowid index by binary_integer;

  rowids              rowid_type;

  CURSOR rowid_cursor(p_num_of_days IN NUMBER) IS
    SELECT ROWID
      FROM MY_TABLE
     WHERE upd_date < SYSDATE - p_num_of_days;

  BEGIN

      UTILS.WRITE_LOG('PURGE_TABLE', 'Procedure Started, p_num_of_days = ' || p_num_of_days);

      v_loop_num := v_loop_num + 1;

      OPEN rowid_cursor(p_num_of_days);
      LOOP
        FETCH rowid_cursor BULK COLLECT
        INTO rowids 
        LIMIT v_rows_to_process;

        IF rowids.COUNT = 0 THEN
           UTILS.WRITE_LOG('PURGE_TABLE','Cursor finished');
           EXIT;
        ELSE
          UTILS.WRITE_LOG('PURGE_TABLE',
               'Cursor opened:|| rowids.COUNT || ' rows'); ' 
          FORALL i IN 1 .. rowids.COUNT
             DELETE MY_TABLE
             WHERE ROWID = rowids(i);

          v_updated_rows := v_updated_rows + SQL%ROWCOUNT;

          COMMIT;

          UTILS.WRITE_LOG('PURGE_TABLE',
               'Loop: ' || v_loop_num|| ' - ' ||v_updated_rows);
          v_loop_num := v_loop_num + 1;

          EXIT WHEN rowid_cursor%NOTFOUND;          
          DBMS_LOCK.SLEEP(5);

        END IF;
      END LOOP;

      /* Free cursor used by the query. */
      CLOSE rowid_cursor;

  UTILS.WRITE_LOG('PURGE_TABLE',
       'Procedure Ended, updated rows:' ||v_updated_rows);
                           
EXCEPTION
  WHEN OTHERS THEN
    UTILS.WRITE_LOG('PURGE_TABLE','Error: ' || SQLERRM);
END PURGE_TABLE;
/

===========================================
Procedure to write to a log table.
===========================================
PROCEDURE WRITE_LOG(p_module_name IN VARCHAR2, p_msg_text IN VARCHAR2) IS
   PRAGMA AUTONOMOUS_TRANSACTION;
-- Purpose : Insert new record to the trace table
BEGIN
INSERT INTO MY_TRACE(MODULE, msg_date, msg_text)
VALUES(p_module_name , SYSDATE, p_msg_text);
COMMIT;
END WRITE_LOG;