站長資訊網
        最全最豐富的資訊網站

        最系統掌握Flink CDC系列之實時抽取Oracle數據(排雷和調優實踐)

        本篇文章給大家帶來了對 Oracle 的實時數據捕獲以及性能調優,將試用過程中的一些關鍵細節進行分享,希望對大家有幫助。

        最系統掌握Flink CDC系列之實時抽取Oracle數據(排雷和調優實踐)

        Flink CDC 于 2021 年 11 月 15 日發布了最新版本 2.1,該版本通過引入內置 Debezium 組件,增加了對 Oracle 的支持。筆者第一時間下載了該版本進行試用并成功實現了對 Oracle 的實時數據捕獲以及性能調優,現將試用過程中的一些關鍵細節進行分享。

        試用環境:

        Oracle:11.2.0.4.0(RAC 部署)

        Flink:1.13.1

        Hadoop:3.2.1

        通過 Flink on Yarn 方式部署使用

        一、無法連接數據庫

        根據官方文檔說明,在 Flink SQL CLI 中輸入以下語句:

        create table TEST (A string) WITH ('connector'='oracle-cdc',     'hostname'='10.230.179.125',     'port'='1521',     'username'='myname',     'password'='***',     'database-name'='MY_SERVICE_NAME',     'schema-name'='MY_SCHEMA',     'table-name'='TEST' );

        之后嘗試通過 select * from TEST 觀察,發現無法正常連接 Oracle,報錯如下:

        [ERROR] Could not execute SQL statement. Reason: oracle.net.ns.NetException: Listener refused the connection with the following error: ORA-12505, TNS:listener does not currently know of SID given in connect descriptor

        從報錯信息來看,可能是由于 Flink CDC 誤將連接信息中提供的 MY_SERVICE_NAME (Oracle 的服務名) 錯認為 SID。于是嘗試閱讀 Flink CDC 涉及到 Oracle Connector 的源碼,發現在 com.ververica.cdc.connectors.oracle.OracleValidator 中,對于 Oracle 連接的代碼如下:

        public static Connection openConnection(Properties properties) throws SQLException {     DriverManager.registerDriver(new oracle.jdbc.OracleDriver());     String hostname = properties.getProperty("database.hostname");     String port = properties.getProperty("database.port");     String dbname = properties.getProperty("database.dbname");     String userName = properties.getProperty("database.user");     String userpwd = properties.getProperty("database.password");     return DriverManager.getConnection(             "jdbc:oracle:thin:@" + hostname + ":" + port + ":" + dbname, userName, userpwd); }

        由上可以看出,在當前版本的 Flink CDC 中,對于 SID 和 Service Name 的連接方式并未做區分,而是直接在代碼中寫死了 SID 的連接方式 (即 port 和 dbname 中間使用 “ : ” 分隔開)。

        從 Oracle 8i 開始,Oracle 已經引入了 Service Name 的概念以支持數據庫的集群 (RAC) 部署,一個 Service Name 可作為一個數據庫的邏輯概念,統一對該數據庫不同的 SID 實例的連接。據此,可以考慮以下兩種方式:

        在 Flink CDC 的 create table 語句中,將 database-name 由 Service Name 替換成其中一個 SID。該方式能解決連接問題,但無法適應主流的 Oracle 集群部署的真實場景;

        對該源碼進行修改。具體可在新建工程中,重寫 com.ververica.cdc.connectors.oracle.OracleValidator 方法,修改為 Service Name 的連接方式 (即 port 和 dbname 中間使用 “ / ” 分隔開),即:

        "jdbc:oracle:thin:@" + hostname + ":" + port + "/" + dbname, userName, userpwd);

        筆者采用的就是第二種方法,實現了正常連接數據庫的同時,保留對 Oracle Service Name 特性的使用。

        二、無法找到 Oracle 表

        按照上述步驟,再次通過 select * from TEST 觀察,發現依然無法正常獲取數據,報錯如下:

        [ERROR] Could not execute SQL statement. Reason: io.debezium.DebeziumException: Supplemental logging not configured for table MY_SERVICE_NAME.MY_SCHEMA.test.  Use command: ALTER TABLE MY_SCHEMA.test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS

        觀察到錯誤日志中提到的表是 MY_SERVICE_NAME.MY_SCHEMA.test,為什么數據庫名、Schema 名都是大寫,而表名是小寫?

        注意到該錯誤由 io.debezium 包報出,通過分析該包的源代碼 (通過 Flink CDC 的 pom.xml 文件可知,目前使用的是 debezium 1.5.4 版本) 可知,在 io.debezium.relational.Tables 中有如下代碼:

        private TableId toLowerCaseIfNeeded(TableId tableId) {     return tableIdCaseInsensitive ? tableId.toLowercase() : tableId; }

        可見,Debezium 的開發者將 “大小寫不敏感” 統一定義為了 “需要將表名轉換為小寫”。對于 Debezium 支持的 PostgreSQL、Mysql 等確實如此。然而對于 Oracle 數據庫,“大小寫不敏感” 卻意味著在內部元信息存儲時,需要將表名轉換為大寫

        因而 Debezium 在讀取到 “大小寫不敏感” 的配置后,按照上述代碼邏輯,只會因為嘗試去讀取小寫的表名而報錯。

        由于 Debezium 直到目前最新的穩定版本 1.7.1,以及最新的開發版本 1.8.0 都未修復該問題,我們可以通過以下兩種方法繞過該問題:

        如需使用 Oracle “大小寫不敏感” 的特性,可直接修改源碼,將上述 toLowercase 修改為 toUppercase (這也是筆者選擇的方法);

        如果不愿意修改源碼,且無需使用 Oracle “大小寫不敏感” 的特性,可以在 create 語句中加上 'debezium.database.tablename.case.insensitive'='false',如下示例:

        create table TEST (A string) WITH ('connector'='oracle-cdc',     'hostname'='10.230.179.125',     'port'='1521',     'username'='myname',     'password'='***',     'database-name'='MY_SERVICE_NAME', 'schema-name'='MY_SCHEMA', 'table-name'='TEST', 'debezium.database.tablename.case.insensitive'='false' );

        該方法的弊端是喪失了 Oracle “大小寫不敏感” 的特性,在 'table-name' 中必須顯式指定大寫的表名。

        需要注明的是,對于 database.tablename.case.insensitive 參數,Debezium 目前僅對 Oracle 11g 默認設置為 true,對其余 Oracle 版本均默認設置為 false。所以讀者如果使用的不是 Oracle 11g 版本,可無需修改該參數,但仍需顯式指定大寫的表名。

        三、數據延遲較大

        數據延遲較大,有時需要 3-5 分鐘才能捕捉到數據變化。對于該問題,在 Flink CDC FAQ 中已給出了明確的解決方案:在 create 語句中加上如下兩個配置項:

        'debezium.log.mining.strategy'='online_catalog', 'debezium.log.mining.continuous.mine'='true'

        那么為什么要這樣做呢?我們依然可以通過分析源碼和日志,結合 Oracle Logminer 的工作原理來加深對工具的理解。

        對 Logminer 的抽取工作,主要在 Debezium 的 io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource 中 execute 方法進行。為節約篇幅,本文不列出實際的源碼,僅提煉出關鍵過程繪于下面的流程圖,有興趣的讀者可以對照該流程圖,結合實際源碼進行分析:

        最系統掌握Flink CDC系列之實時抽取Oracle數據(排雷和調優實踐)

        采用 redo_log_catalog 的方式,可以監控數據表的 DDL 信息,且由于 archive logs 被永久保存到磁盤上,可以在數據庫宕機后依然正常獲取到宕機前的所有 DDL 和 DML 操作。但由于涉及到比 online catalog

        贊(3)
        分享到: 更多 (0)
        網站地圖   滬ICP備18035694號-2    滬公網安備31011702889846號
        主站蜘蛛池模板: 国产精品国产三级国产普通话 | 国产精品免费看久久久| 免费欧美精品a在线| 亚洲一区精品无码| 久久久WWW免费人成精品| 国产精品99久久久久久宅男| 国产精品三级在线| 国产精品无码日韩欧| 无码精品黑人一区二区三区| 亚洲国产精品国产自在在线| 88国产精品欧美一区二区三区| 99久久免费国产精品热| 久久亚洲精品人成综合网| 亚洲精品美女久久777777| 亚洲精品和日本精品| 欧美久久久久久午夜精品| 久久精品亚洲福利| 国产伦精品一区二区三区视频金莲| 大胸国产精品视频| 国产成人久久久精品二区三区| 在线欧美v日韩v国产精品v| 国产在线拍揄自揄视精品不卡| 97在线精品视频| 99久久精品费精品国产一区二区 | 最新精品露脸国产在线 | 欧美精品91欧美日韩操| 国产日韩久久久精品影院首页 | 国产AV国片精品| 99久久99这里只有免费费精品| 99久久人妻无码精品系列蜜桃| jiucao在线观看精品| 国产精品一二三区| 8x福利精品第一导航| 国产三级精品三级在线专区1| 凹凸国产熟女精品视频app| 国产cosplay精品视频| 久久这里只精品国产99热| 国产成人无码精品久久久免费| 国产在线精品一区二区高清不卡| 久热这里只有精品99国产6| 亚洲国产小视频精品久久久三级|