抽取oracle数据到mysql数据库的实现过程
在oracle数据库迁移至mysql数据库,除了oracle数据库模型移到mysql外,还一个重要环节就是要将oracle数据库的数据移到mysql数据库,本人尝试用过多款数据迁移程序,性能都不是很好的,于是自己动手写一个针对于oracle数据库数据迁移到mysql数据程序,其具体过程如下:
1、要抽取mysql表、字段及过滤条件的配制文件imp_data.sql
2、建立一个目录ETL_DIR
3、运行oracle数据库程序P_ETL_ORA_DATA,生成各表的csv数据文件,同时也生成一个导入mysql的脚本文件imp_data.sql
4、导入mysql数据,文件内容如下
loaddatainfile"alarm_hist_inc.csv"intotablealarm_hist_incfieldsterminatedby","enclosedby"^"linesterminatedby"\r\n"; loaddatainfile"button_authority.csv"intotablebutton_authorityfieldsterminatedby","enclosedby"^"linesterminatedby"\r\n"; loaddatainfile"c3_sms_hist_inc.csv"intotablec3_sms_hist_incfieldsterminatedby","enclosedby"^"linesterminatedby"\r\n"; loaddatainfile"datapermisson.csv"intotabledatapermissonfieldsterminatedby","enclosedby"^"linesterminatedby"\r\n";
附:数据库脚本P_ETL_ORA_DATA
CREATEORREPLACEPROCEDUREP_ETL_ORA_DATA
(
P_ORA_DIRVARCHAR2,
P_DATA_PATHVARCHAR2
)IS
TYPET_RECISRECORD(
TBNVARCHAR2(40),
WHRVARCHAR2(4000));
TYPET_TABSISTABLEOFT_REC;
V_TABST_TABS:=T_TABS();
V_ETL_DIRVARCHAR2(40):=P_ORA_DIR;
V_LOAD_FILEUTL_FILE.FILE_TYPE;
PROCEDUREETL_DATA
(
P_SQL_STMTVARCHAR2,
P_DATA_PATHVARCHAR2,
P_TB_NAMEVARCHAR2
)IS
BEGIN
DECLARE
V_VAR_COLVARCHAR2(32767);
V_NUM_COLNUMBER;
V_DATE_COLDATE;
V_TMZTIMESTAMP;
V_COLSNUMBER;
V_COLS_DESCDBMS_SQL.DESC_TAB;
V_ROW_STRVARCHAR2(32767);
V_COL_STRVARCHAR2(32767);
V_SQL_IDNUMBER;
V_SQL_REFSYS_REFCURSOR;
V_EXP_FILEUTL_FILE.FILE_TYPE;
V_DATA_PATHVARCHAR2(200);
BEGIN
V_DATA_PATH:=P_DATA_PATH;
IFREGEXP_SUBSTR(V_DATA_PATH,'\\$')ISNULL
THEN
V_DATA_PATH:=V_DATA_PATH||'\';
ENDIF;
V_DATA_PATH:=REPLACE(V_DATA_PATH,'\','\\');
OPENV_SQL_REFFORP_SQL_STMT;
V_SQL_ID:=DBMS_SQL.TO_CURSOR_NUMBER(V_SQL_REF);
DBMS_SQL.DESCRIBE_COLUMNS(V_SQL_ID,V_COLS,V_COLS_DESC);
FORIINV_COLS_DESC.FIRST..V_COLS_DESC.LAST
LOOP
CASE
WHENV_COLS_DESC(I).COL_TYPEIN(1,9,96)THEN
DBMS_SQL.DEFINE_COLUMN(V_SQL_ID,I,V_VAR_COL,32767);
WHENV_COLS_DESC(I).COL_TYPE=2THEN
DBMS_SQL.DEFINE_COLUMN(V_SQL_ID,I,V_NUM_COL);
WHENV_COLS_DESC(I).COL_TYPE=12THEN
DBMS_SQL.DEFINE_COLUMN(V_SQL_ID,I,V_DATE_COL);
WHENV_COLS_DESC(I).COL_TYPE=180THEN
DBMS_SQL.DEFINE_COLUMN(V_SQL_ID,I,V_TMZ);
ENDCASE;
ENDLOOP;
DECLARE
V_FLUSH_OVERPLS_INTEGER:=1;
V_FILE_OVERPLS_INTEGER:=1;
V_FILE_NOPLS_INTEGER:=1;
V_FILE_NAMEVARCHAR2(200);
V_LINEVARCHAR2(400);
BEGIN
WHILEDBMS_SQL.FETCH_ROWS(V_SQL_ID)>0
LOOP
IFV_FILE_OVER=1
THEN
V_FILE_NAME:=P_TB_NAME||'_'||V_FILE_NO||'.csv';
V_EXP_FILE:=UTL_FILE.FOPEN(V_ETL_DIR,V_FILE_NAME,OPEN_MODE=>'w',MAX_LINESIZE=>32767);
ENDIF;
V_ROW_STR:='';
FORIIN1..V_COLS
LOOP
V_COL_STR:='\N';
BEGIN
CASE
WHENV_COLS_DESC(I).COL_TYPEIN(1,9,96)THEN
DBMS_SQL.COLUMN_VALUE(V_SQL_ID,I,V_VAR_COL);
IFV_VAR_COLISNOTNULL
THEN
V_COL_STR:='^'||V_VAR_COL||'^';
ENDIF;
WHENV_COLS_DESC(I).COL_TYPE=2THEN
DBMS_SQL.COLUMN_VALUE(V_SQL_ID,I,V_NUM_COL);
IFV_NUM_COLISNOTNULL
THEN
V_COL_STR:=V_NUM_COL;
ENDIF;
WHENV_COLS_DESC(I).COL_TYPE=12THEN
DBMS_SQL.COLUMN_VALUE(V_SQL_ID,I,V_DATE_COL);
IFV_DATE_COLISNOTNULL
THEN
V_COL_STR:='^'||TO_CHAR(V_DATE_COL,'yyyy-mm-ddhh24:mi:ss')||'^';
ENDIF;
WHENV_COLS_DESC(I).COL_TYPEIN(180,181,231)THEN
DBMS_SQL.COLUMN_VALUE(V_SQL_ID,I,V_TMZ);
IFV_TMZISNOTNULL
THEN
V_COL_STR:='^'||TO_CHAR(V_TMZ,'yyyy-mm-ddhh24:mi:ss.ff6')||'^';
ENDIF;
ENDCASE;
IFI=1
THEN
V_ROW_STR:=V_COL_STR;
ELSE
V_ROW_STR:=V_ROW_STR||','||V_COL_STR;
ENDIF;
END;
ENDLOOP;
UTL_FILE.PUT_LINE(V_EXP_FILE,CONVERT(V_ROW_STR,'UTF8'));
IFV_FILE_OVER>200000/*每200000条记录就产生一个新的文件*/
THEN
V_FILE_OVER:=1;
V_FLUSH_OVER:=1;
V_FILE_NO:=V_FILE_NO+1;
UTL_FILE.FCLOSE(V_EXP_FILE);
V_LINE:='loaddatainfile"'||V_DATA_PATH||V_FILE_NAME||'"intotable'||P_TB_NAME;
V_LINE:=V_LINE||'fieldsterminatedby","enclosedby"^"linesterminatedby"\r\n";';
UTL_FILE.PUT_LINE(V_LOAD_FILE,V_LINE);
UTL_FILE.FFLUSH(V_LOAD_FILE);
CONTINUE;
ENDIF;
V_FILE_OVER:=V_FILE_OVER+1;
IFV_FLUSH_OVER>2000/*每2000条记录就刷新缓存,写到文件中*/
THEN
UTL_FILE.FFLUSH(V_EXP_FILE);
V_FLUSH_OVER:=1;
ELSE
V_FLUSH_OVER:=V_FLUSH_OVER+1;
ENDIF;
ENDLOOP;
DBMS_SQL.CLOSE_CURSOR(V_SQL_ID);
IFUTL_FILE.IS_OPEN(V_EXP_FILE)
THEN
UTL_FILE.FCLOSE(V_EXP_FILE);
V_LINE:='loaddatainfile"'||V_DATA_PATH||V_FILE_NAME||'"intotable'||P_TB_NAME;
V_LINE:=V_LINE||'fieldsterminatedby","enclosedby"^"linesterminatedby"\r\n";';
UTL_FILE.PUT_LINE(V_LOAD_FILE,V_LINE);
UTL_FILE.FFLUSH(V_LOAD_FILE);
ENDIF;
END;
EXCEPTION
WHENOTHERSTHEN
IFDBMS_SQL.IS_OPEN(V_SQL_ID)
THEN
DBMS_SQL.CLOSE_CURSOR(V_SQL_ID);
ENDIF;
IFUTL_FILE.IS_OPEN(V_EXP_FILE)
THEN
UTL_FILE.FCLOSE(V_EXP_FILE);
ENDIF;
DBMS_OUTPUT.PUT_LINE(SQLERRM);
DBMS_OUTPUT.PUT_LINE(P_SQL_STMT);
END;
END;
BEGIN
BEGIN
EXECUTEIMMEDIATE'createtablemysql_etl_tbs(tnvarchar2(40),cnvarchar2(40),cinumber)';
EXCEPTION
WHENOTHERSTHEN
NULL;
END;
EXECUTEIMMEDIATE'truncatetablemysql_etl_tbs';
DECLARE
V_CIPLS_INTEGER;
V_CNVARCHAR2(40);
V_ETL_COLSVARCHAR2(32767);
V_TBNVARCHAR2(30);
V_ETL_CFGVARCHAR2(32767);
V_CNF_FILEUTL_FILE.FILE_TYPE;
V_FROM_POSPLS_INTEGER;
BEGIN
V_CNF_FILE:=UTL_FILE.FOPEN(V_ETL_DIR,'ETL_TABS.CNF','r',32767);
LOOP
UTL_FILE.GET_LINE(V_CNF_FILE,V_ETL_CFG,32767);
V_FROM_POS:=REGEXP_INSTR(V_ETL_CFG,'from',1,1,0,'i');
V_ETL_COLS:=SUBSTR(V_ETL_CFG,1,V_FROM_POS-1);
V_ETL_COLS:=REGEXP_SUBSTR(V_ETL_COLS,'(select)(.+)',1,1,'i',2);
V_TBN:=REGEXP_SUBSTR(V_ETL_CFG,'(\s+from\s+)(\w+)(\s*)',1,1,'i',2);
V_TBN:=UPPER(V_TBN);
V_TABS.EXTEND();
V_TABS(V_TABS.LAST).TBN:=V_TBN;
V_TABS(V_TABS.LAST).WHR:=REGEXP_SUBSTR(V_ETL_CFG,'\s+where.+',1,1,'i');
V_CI:=1;
LOOP
V_CN:=REGEXP_SUBSTR(V_ETL_COLS,'\S+',1,V_CI);
EXITWHENV_CNISNULL;
V_CN:=UPPER(V_CN);
EXECUTEIMMEDIATE'insertintomysql_etl_tbs(tn,cn,ci)values(:1,:2,:3)'
USINGV_TBN,V_CN,V_CI;
COMMIT;
V_CI:=V_CI+1;
ENDLOOP;
ENDLOOP;
EXCEPTION
WHENUTL_FILE.INVALID_PATHTHEN
DBMS_OUTPUT.PUT_LINE('指定的目录:ETL_DIR"'||'"无效!');
RETURN;
WHENUTL_FILE.INVALID_FILENAMETHEN
DBMS_OUTPUT.PUT_LINE('指定的文件:"ETL_TABS.CNF'||'"无效!');
RETURN;
WHENNO_DATA_FOUNDTHEN
UTL_FILE.FCLOSE(V_CNF_FILE);
WHENOTHERSTHEN
DBMS_OUTPUT.PUT_LINE(SQLERRM);
RETURN;
END;
DECLARE
V_CUR_MATCHSYS_REFCURSOR;
V_SQL_SMTVARCHAR2(32767);
V_TNVARCHAR2(40);
V_CNVARCHAR2(40);
V_CIPLS_INTEGER;
V_COLUMN_NAMEVARCHAR2(40);
V_ETL_COLSVARCHAR2(32767);
V_LINEVARCHAR2(4000);
V_TBNVARCHAR2(40);
BEGIN
V_LOAD_FILE:=UTL_FILE.FOPEN(V_ETL_DIR,'load_data.sql',OPEN_MODE=>'w',MAX_LINESIZE=>32767);
FORT_IXINV_TABS.FIRST..V_TABS.LAST
LOOP
V_SQL_SMT:='selecttn,cn,column_name,cifrom(select*frommysql_etl_tbswheretn='':tbn:'')lleftjoinuser_tab_columnsronl.tn=r.table_nameandl.cn=r.column_nameorderbyci';
V_TBN:=V_TABS(T_IX).TBN;
V_SQL_SMT:=REPLACE(V_SQL_SMT,':tbn:',V_TBN);
V_ETL_COLS:=NULL;
OPENV_CUR_MATCHFORV_SQL_SMT;
LOOP
FETCHV_CUR_MATCH
INTOV_TN,V_CN,V_COLUMN_NAME,V_CI;
EXITWHENV_CUR_MATCH%NOTFOUND;
IFV_CI>1
THEN
V_ETL_COLS:=V_ETL_COLS||',';
ENDIF;
IFV_COLUMN_NAMEISNULL
THEN
V_ETL_COLS:=V_ETL_COLS||'cast(nullasnumber)'||V_CN;
ELSE
V_ETL_COLS:=V_ETL_COLS||V_CN;
ENDIF;
ENDLOOP;
CLOSEV_CUR_MATCH;
V_TBN:=LOWER(V_TBN);
V_SQL_SMT:='select'||V_ETL_COLS||'from'||V_TBN||V_TABS(T_IX).WHR;
ETL_DATA(V_SQL_SMT,P_DATA_PATH,V_TBN);
ENDLOOP;
IFUTL_FILE.IS_OPEN(V_LOAD_FILE)
THEN
UTL_FILE.FCLOSE(V_LOAD_FILE);
ENDIF;
END;
ENDP_ETL_ORA_DATA;
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对毛票票的支持。如果你想了解更多相关内容请查看下面相关链接