ElasticSearch基于数据库动态更新IK分词器词库

young 530 2021-10-17

参考

环境


系统:CentOS


Elasticsearch:7.6.2


IK:7.6.2


DB:Oracle

步骤

  1. 下载指定版本IK的source code ,地址:https://github.com/medcl/elasticsearch-analysis-ik/releases
  2. 在config目录下增加jdbc-reload.properties
jdbc.url=jdbc:oracle:thin:@10.10.200.42:1521:orcl
jdbc.user=credit_law
jdbc.password=credit_law
jdbc.reload.sql=SELECT EXT_WORD FROM ES_IK_EXT_WORD T WHERE T.VERSION > (SELECT nvl(MAX(WORD_VER),0) FROM \
  ES_IK_EXT_WORD_VER)
jdbc.reload.stop_word.sql=SELECT STOP_WORD FROM ES_IK_STOP_WORD T WHERE T.VERSION > (SELECT nvl(MAX(STOP_WORD_VER),0)\
   FROM ES_IK_EXT_WORD_VER)
jdbc.reload.update_version.sql = INSERT INTO ES_IK_EXT_WORD_VER(WORD_VER, STOP_WORD_VER)VALUES ((SELECT nvl(MAX(VERSION), 0) FROM ES_IK_EXT_WORD), (SELECT nvl(MAX(VERSION), 0) FROM ES_IK_STOP_WORD))
jdbc.reload.ext_word=EXT_WORD
jdbc.reload.stop_word=STOP_WORD
period_time_seconds=60
  1. 在Dictionary中添加数据库驱动
static {
    try {
        //驱动默认选择oracle
        logger.info("初始化驱动开始..............");
        Class.forName("oracle.jdbc.OracleDriver");
        logger.info("初始化驱动完成..............");
    } catch (Exception e) {
        logger.error("初始化驱动失败..............",e);
    }
}
  1. 在Dictionary类中添加全局变量
private Properties props;
  1. 在Dictionary构造方法最后添加读取db的配置的方法

try {
     Path file = PathUtils.get(getDictRoot(), "jdbc-reload.properties");
     dbProps = new Properties();
     dbProps.load(new FileInputStream(file.toFile()));
     logger.info("dbProps:{}",dbProps);
 } catch (IOException e) {
     logger.error("db配置文件读取失败", e);
 }
  1. 在Dictionary类中添加创建数据库链接的方法
private Connection getConn() {
    Connection conn = null;
    try {
        logger.info("创建数据连接");
        // 创建数据连接
        conn = DriverManager.getConnection(
            dbProps.getProperty("jdbc.url"),
            dbProps.getProperty("jdbc.user"),
            dbProps.getProperty("jdbc.password")
        );
        logger.info("创建数据连接成功..........");
    } catch (Exception e) {
        logger.error("创建数据连接失败..............",e);
    }
    return conn;
}
  1. 在Dictionary类中增加从数据获取词库的方法
public boolean loadOracleHotDict() throws Exception {
    ResultSet rs = null;
    boolean change = false;
    try (Connection conn = getConn();Statement stmt =conn.createStatement() ){
        logger.info("hot words loading");
        String sql = dbProps.getProperty("jdbc.reload.sql");
        logger.info("jdbc.reload.sql:{}",sql);
        if(sql != null && !"".equals(sql)){
            rs = stmt.executeQuery(sql);
            while(rs.next()) {
                change = true;
                String theWord = rs.getString(dbProps.getProperty("jdbc.reload.ext_word"));
                logger.info("hot word from oracle: " + theWord);
                _MainDict.fillSegment(theWord.trim().toCharArray());
            }
        }
        logger.info("hot words load end");
        return change;
    } catch (Exception e) {
        logger.error("热词更新异常:",e);
        throw new Exception("热词更新异常:",e);
    } finally {
        if(rs != null) {
            try {
                rs.close();
            } catch (SQLException e) {
                logger.error("error", e);
            }
        }
    }
}

public boolean loadMyOracleStopWordDict() throws Exception {
    ResultSet rs = null;
    boolean change = false;
    try (Connection conn = getConn();Statement stmt =conn.createStatement()){
        logger.info("stop words loading");
        String sql = dbProps.getProperty("jdbc.reload.stop_word.sql");
        logger.info("jdbc.reload.stop_word.sql:{}",sql);
        if(sql != null && !"".equals(sql)){
            rs = stmt.executeQuery(sql);
            while(rs.next()) {
                change = true;
                String theWord = rs.getString(dbProps.getProperty("jdbc.reload.stop_word"));
                logger.info("stop word from oracle: " + theWord);
                _StopWords.fillSegment(theWord.trim().toCharArray());
            }
        }
        logger.info("stop words load end");
        return change;
    } catch (Exception e) {
        logger.error("停止词更新异常:",e);
        throw new Exception("停止词更新异常:",e);
    } finally {
        if(rs != null) {
            try {
                rs.close();
            } catch (SQLException e) {
                logger.error("error", e);
            }
        }
    }
}

private void updateVersion() throws Exception{
    try (Connection conn = getConn(); Statement stmt = conn.createStatement()){
        logger.info("update word version");
        String sql = dbProps.getProperty("jdbc.reload.update_version.sql");
        logger.info("upload version sql:{}:",sql);
        stmt.executeUpdate(sql);
        conn.commit();
        logger.info("update word version end");
    }catch (Exception e){
        logger.error("更新词库版本异常:",e);
        throw new Exception("更新词库版本异常:",e);
    }
}
  1. 在Dictionary类中增加调用上述方法的方法
public void reLoadSQLDict() throws Exception {
    boolean extChange = this.loadOracleHotDict();
    boolean stopChange = this.loadMyOracleStopWordDict();
    if (extChange|| stopChange){
        this.updateVersion();
    }
}
  1. 在dic目录下创建OracleDictReloadTread类
public class OracleDictReloadThread implements Runnable {

    private static final Logger logger = ESPluginLoggerFactory.getLogger(OracleDictReloadThread.class.getName());

    @Override
    public void run() {
        logger.info("reloading hot_word and stop_word dict from oracle");
        try {
            Dictionary.getSingleton().reLoadSQLDict();
        } catch (Exception e) {
            logger.error("调用热词更新方法异常:",e);
        }
    }
}
  1. 在Dictionary类的initial方法中添加线程
public static synchronized void initial(Configuration cfg) {
    if (singleton == null) {
        synchronized (Dictionary.class) {
            if (singleton == null) {
                singleton = new Dictionary(cfg);
                singleton.loadMainDict();
                singleton.loadSurnameDict();
                singleton.loadQuantifierDict();
                singleton.loadSuffixDict();
                singleton.loadPrepDict();
                singleton.loadStopWordDict();

                if(cfg.isEnableRemoteDict()){
                    // 建立监控线程
                    for (String location : singleton.getRemoteExtDictionarys()) {
                        // 10 秒是初始延迟可以修改的 60是间隔时间 单位秒
                        pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
                    }
                    for (String location : singleton.getRemoteExtStopWordDictionarys()) {
                        pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
                    }
                }
                logger.info("查数据库更新线程启动--------------------");
                pool.scheduleAtFixedRate(new OracleDictReloadThread(), 10, Integer.parseInt(dbProps.getProperty("period_time_seconds")), TimeUnit.SECONDS);

            }
        }
    }
}
  1. 在pom文件中添加依赖并更改es版本
<dependency>
    <groupId>com.oracle</groupId>
    <artifactId>ojdbc6</artifactId>
    <version>11.2.0.1.0</version>
</dependency>

<elasticsearch.version>7.6.2</elasticsearch.version>
  1. 在src\main\assemblies\plugin.xml中添加配置使得数据库相关依赖一并打包
<dependencySet>
    <outputDirectory/>
    <useProjectArtifact>true</useProjectArtifact>
    <useTransitiveFiltering>true</useTransitiveFiltering>
    <includes>
        <include>com.oracle:ojdbc6</include>
    </includes>
</dependencySet>
  1. 打包,执行maven命令
mvn clean package -Dmaven.test.skip=true
  1. 将target/releases下的包上传至es服务器
  2. 修改permission
  3. 如果es启动用的是es的jdk,进入$/jdk/lib/security
  4. 修改default.policy
  5. 在最后的grant中添加
permission java.util.PropertyPermission "*","read,write";
permission java.net.SocketPermission "*","connect,resolve,accept,listen";
permission java.lang.RuntimePermission "setContextClassLoader";
permission java.lang.RuntimePermission "getClassLoader";
  1. 保存并退出

  2. 关闭es,删除原es的ik

# 直接删除es根目录下的config和plugin目录下的ik目录即可
rm -rf ${ES_HOME}/config/ik ${ES_HOME}/plugins/ik
  1. 安装ik
    无警告即为安装成功
elasticsearch-plugin install file://app/elasticsearch-analysis-ik-7.6.2.zip
  1. 重启es
elasticsearch -d

SQL

create table ES_IK_STOP_WORD
(
    STOP_WORD VARCHAR(100 char) not null,
    VERSION NUMBER(10) not null
)
/

comment on table ES_IK_STOP_WORD is 'ES IK分词器停用词'
/

comment on column ES_IK_STOP_WORD.STOP_WORD is '停用词'
/

comment on column ES_IK_STOP_WORD.VERSION is '版本号'
/

create index ES_IK_STOP_WORD_VERSION_INDEX
    on ES_IK_STOP_WORD (VERSION desc)
/

create table ES_IK_EXT_WORD
(
    EXT_WORD VARCHAR(100 char) not null,
    VERSION NUMBER(10) not null
)
/

comment on table ES_IK_EXT_WORD is 'ES IK分词器扩展词'
/

comment on column ES_IK_EXT_WORD.EXT_WORD is '扩展词'
/

comment on column ES_IK_EXT_WORD.VERSION is '版本号'
/

create index ES_IK_EXT_WORD_VERSION_INDEX
    on ES_IK_EXT_WORD (VERSION desc)
/


create table ES_IK_EXT_WORD_VER
(
    WORD_VER NUMBER(10) not null,
    STOP_WORD_VER NUMBER(10) not null
)
/

comment on table ES_IK_EXT_WORD_VER is '词库版本'
/

comment on column ES_IK_EXT_WORD_VER.WORD_VER  is '扩展词版本'
/

comment on column ES_IK_EXT_WORD_VER.STOP_WORD_VER  is '停用词版本'
/

create index ES_IK_EXT_WORD_VER_WORD_INDEX
    on ES_IK_EXT_WORD_VER (WORD_VER desc)
/

create index ES_IK_EXT_WORD_VER_STOP_INDEX
    on ES_IK_EXT_WORD_VER (STOP_WORD_VER desc)
/

create sequence SEQ_ES_IK_STOP_WORD
    maxvalue 9999999999
    order
/

create sequence SEQ_ES_IK_EXT_WORD
    maxvalue 9999999999
    order
/

insert into ES_IK_EXT_WORD_VER values (0,0)
/