对于简单的操作(不复杂的SQL或业务场景),通过Mybatis的拦截器,实现变化记录。
首先拦截Mybatis的Executor的update方法
@Intercepts(
@Signature(
type = Executor.class,
method = "update",
args = {MappedStatement.class, Object.class}
)
)
通过MappedStatement判断是否为写操作,如果是写操作,则进行审计处理,否则直接执行即可
MappedStatement mappedStatement = (MappedStatement) invocation.getArgs()[0];
boolean isWriteType = isIsWriteType(mappedStatement);
if (isWriteType) {
// 处理写操作审计
return processWriteAudit(invocation, mappedStatement);
} else {
return invocation.proceed();
}
private static boolean isIsWriteType(MappedStatement mappedStatement) {
SqlCommandType sqlCommandType = mappedStatement.getSqlCommandType();
return SqlCommandType.INSERT == sqlCommandType
|| SqlCommandType.UPDATE == sqlCommandType
|| SqlCommandType.DELETE == sqlCommandType;
}
写操作分为3种,分别是增删改,其中可能会涉及到逻辑删除,逻辑删除对应的是修改类型的SQL,但是在审计日志中,需要定义为删除类型,因此需要区分修改类型的实际操作,此时需要对SQL进行解析,这里采用的是jsqlparser。
通过MappedStatement.getBoundSql方法可以获取到sql,但是此时获取到的sql中,参数是带占位符?的,jsqlparser不能直接解析这种sql,所以需要将sql拼接完整,之后在进行解析。
// 获取参数
Object parameter = invocation.getArgs()[1];
// 获取配置
Configuration configuration = mappedStatement.getConfiguration();
// 获取执行的完整SQL
String completeSql = processCompleteSql(mappedStatement, configuration, parameter);
/**
* 获取完整SQL
*/
private String processCompleteSql(MappedStatement mappedStatement, Configuration configuration, Object parameter) {
// 获取 BoundSql 对象
BoundSql boundSql = mappedStatement.getBoundSql(parameter);
// 获取完整 SQL 语句(带占位符)
String sql = boundSql.getSql();
// 获取参数映射列表
List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
// 获取参数对象
Object parameterObject = boundSql.getParameterObject();
// 构建完整 SQL(替换占位符为实际值)
String completeSql = getCompleteSql(sql, parameterMappings, parameterObject, configuration);
// 打印完整 SQL
log.debug("Complete SQL: {}", completeSql);
return completeSql;
}
/**
* 构建完整 SQL,替换占位符为实际值
*/
private String getCompleteSql(String sql, List<ParameterMapping> parameterMappings,
Object parameterObject, Configuration configuration) {
if (parameterMappings == null || parameterMappings.isEmpty()) {
return sql;
}
StringBuilder completeSql = new StringBuilder();
// 不使用简单的空格分割,而是逐字符处理
char[] sqlChars = sql.toCharArray();
int parameterIndex = 0;
for (int i = 0; i < sqlChars.length; i++) {
char ch = sqlChars[i];
if (ch == '?') {
// 遇到问号,替换为实际参数值
if (parameterIndex < parameterMappings.size()) {
ParameterMapping parameterMapping = parameterMappings.get(parameterIndex);
Object value = getParameterValue(parameterObject, parameterMapping, configuration);
completeSql.append(formatParameterValue(value));
parameterIndex++;
} else {
completeSql.append(ch);
}
} else {
completeSql.append(ch);
}
}
return completeSql.toString();
}
/**
* 获取参数值
*/
private Object getParameterValue(Object parameterObject, ParameterMapping parameterMapping, Configuration configuration) {
String propertyName = parameterMapping.getProperty();
// 如果参数是单个参数,并且propertyName是参数本身的属性,则直接从参数对象中获取
// 如果参数是Map,则从Map中通过propertyName获取
// 这里使用MyBatis的MetaObject来获取参数值
if (parameterObject == null) {
return null;
}
TypeHandlerRegistry typeHandlerRegistry = configuration.getTypeHandlerRegistry();
if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
return parameterObject;
}
// 使用MetaObject获取参数值
org.apache.ibatis.reflection.MetaObject metaObject = configuration.newMetaObject(parameterObject);
return metaObject.getValue(propertyName);
}
/**
* 格式化参数值
*/
private String formatParameterValue(Object value) {
if (value == null) {
return "null";
} else if (value instanceof String) {
return "'" + value.toString() + "'";
} else if (value instanceof Date) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
return "'" + sdf.format((Date) value) + "'";
} else {
return value.toString();
}
}
获取到完整参数之后,就可以对SQL进行解析了
if (SqlCommandType.UPDATE == sqlCommandType) {
// 通过JParser解析SQL
Statement statement = CCJSqlParserUtil.parse(completeSql);
Update update = (Update) statement;
// 判断是否为逻辑删除
boolean isLogicDelete = isIsLogicDelete(update);
if (isLogicDelete) {
// 是逻辑删除,调用处理删除的审计操作
return processDeleteAudit(invocation, update, executor, configuration, true);
} else {
// 不是逻辑删除,调用修改审计操作
return processUpdateAudit(invocation, update, executor, configuration);
}
} else if (SqlCommandType.INSERT == sqlCommandType) {
// 处理新增审计操作
return processInsertAudit(invocation, completeSql);
} else if (SqlCommandType.DELETE == sqlCommandType) {
Statement statement = CCJSqlParserUtil.parse(completeSql);
// 处理物理删除审计操作
return processDeleteAudit(invocation, statement, executor, configuration, false);
} else {
return invocation.proceed();
}
判断逻辑删除时,定义了逻辑删除字段
/**
* 判断是否为逻辑删除
*/
private boolean isIsLogicDelete(Update update) {
// 获取update的set字段集合
List<UpdateSet> updateSets = update.getUpdateSets();
boolean isLogicDelete = false;
// 遍历set字段集合,查找字段为is_deleted的set字段
logicDeleteCheck:
for (UpdateSet updateSet : updateSets) {
ExpressionList<Column> columns = updateSet.getColumns();
for (Column column : columns) {
if ("is_deleted".equals(column.getColumnName())) {
isLogicDelete = true;
break logicDeleteCheck;
}
}
}
return isLogicDelete;
}
对于新增操作,SQL中含有数据信息,因此,只需要记录insert sql,然后发送给解析程序去解析即可
在解析过程中,需要对jsqlparser的值进行处理
private String getExpressionValue(Expression expression) {
if (expression instanceof StringValue) {
return ((StringValue) expression).getValue();
} else if (expression instanceof LongValue) {
return String.valueOf(((LongValue) expression).getValue());
} else if (expression instanceof DoubleValue) {
return String.valueOf(((DoubleValue) expression).getValue());
} else if (expression instanceof NullValue) {
return "NULL";
} else if (expression instanceof TimestampValue) {
return ((TimestampValue) expression).getValue().toString();
} else {
// 其他类型表达式
return expression.toString();
}
}
对于修改操作,需要记录的是修改前后的字段变化,一般情况下,我们可能会用Id作为筛选条件,但是有一部分情况下,筛选条件不会是id,同时set的操作可能是一个计算公式,所以我这边打算采用两次查询的方式,获取实际写入数据库中的值。
那么首先需要获取到查询的SQL,通过jsqlparser,我们可以获取到待执行的sql的表名,set的字段,以及where条件,那么将其进行拼接即可
// 获取修改的表名
Table table = statement.getTable();
// 获取修改的where条件
Expression where = statement.getWhere();
// 获取修改的字段
List<UpdateSet> updateSets = statement.getUpdateSets();
// 获取修改的字段名集合
List<String> columnNames = new ArrayList<>();
for (UpdateSet updateSet : updateSets) {
for (Column column : updateSet.getColumns()) {
columnNames.add(column.getColumnName());
}
}
// 如果不存在id字段,增加id字段进行查询,以便以比较定位
if (!columnNames.contains("id")) {
columnNames.add("id");
}
// 拼接查询字段
String colNames = String.join(",", columnNames);
// 拼接查询sql
String querySql = "select " + colNames + " from " + table.getFullyQualifiedName() + " where " + where;
执行查询有两种方式,一种是通过executor.getTransaction().getConnection()获取数据库连接(注意,此方式获取的connection不能关闭),然后直接执行sql,另一种方式是采用mybatis的Executor执行。
由于mybatis的缓存及事务问题,使用Connection查询会出现修改后的数据查询不变化的情况,因此,采用mybatis的Executor执行,考虑到此操作比较频繁,通过代码不断创建临时statement可能会对内存造成压力,因此我定义了一个固定的mapper来执行sql。
<select id="auditQuery" resultType="java.util.Map">
${sql}
</select>
/**
* 执行董涛SQL查询
*/
private List<Map<String, Object>> executeQuery(Executor executor, Configuration configuration, String querySql) throws SQLException {
Map<String, Object> parameters = new HashMap<>();
parameters.put("sql", querySql); // 将动态 SQL 作为参数传入
// 调用预定义的 Mapper 方法
String statementId = "cn.yhsblog.testmybatisinterceptor.service.mapper.AuditQueryMapper.auditQuery";
// 执行查询
return executor.query(configuration.getMappedStatement(statementId),
parameters,
RowBounds.DEFAULT,
SimpleExecutor.NO_RESULT_HANDLER);
}
在执行操作前后进行查询
// 执行查询,获取执行修改前的值
List<Map<String, Object>> oldValues = executeQuery(executor, configuration, querySql);
log.debug("old values:{}", oldValues);
// 执行修改操作
Object proceed = invocation.proceed();
// 执行查询,获取执行修改后的值
List<Map<String, Object>> newValues = executeQuery(executor, configuration, querySql);
log.debug("new values:{}:", newValues);
在实际处理中,考虑到执行前后可能会存在数据行数上的差异,我这边采用旧数据的id匹配新数据的id的方式进行对比操作
// 执行前查询出来的结果,应该和执行后查询出来的结果一致
// 根据Id进行处理,以便于可以找到匹配的行数据
Map<Object, Map<String, Object>> oldMapById = oldValues.stream().collect(Collectors.toMap(e -> e.get("id"), e -> e));
Map<Object, Map<String, Object>> newMapById = newValues.stream().collect(Collectors.toMap(e -> e.get("id"), e -> e));
// 每行数据的变化
List<RowValueChange> rowValueChanges = updateValueChange.getRowValueChanges();
Set<Map.Entry<Object, Map<String, Object>>> oldEntrySet = oldMapById.entrySet();
for (Map.Entry<Object, Map<String, Object>> oldEntry : oldEntrySet) {
Object id = oldEntry.getKey();
// 如果这个旧数据中的Id在新数据中不存在,说明这个Id不是当前事务加入的
Map<String, Object> newRowValue = newMapById.get(id);
if (newRowValue == null) {
continue;
}
Map<String, Object> oldRowValue = oldEntry.getValue();
// 行数据变化
RowValueChange rowValueChange = new RowValueChange();
rowValueChanges.add(rowValueChange);
List<ValueChange> valueChanges = rowValueChange.getValueChanges();
for (String columnName : columnNames) {
// 列数据变化
ValueChange valueChange = new ValueChange();
valueChanges.add(valueChange);
valueChange.setColName(columnName);
valueChange.setOldValue(oldRowValue.get(columnName));
valueChange.setNewValue(newRowValue.get(columnName));
}
}
对于删除操作,其实就是在执行前,获取其数据完整信息进行记录,因此和修改一样,在操作前,对其完整数据进行查询
private Object processDeleteAudit(Invocation invocation, Statement statement, Executor executor, Configuration configuration, boolean isLogicDelete) throws Exception {
Expression where;
Table table;
if (isLogicDelete) {
Update update = (Update) statement;
table = update.getTable();
where = update.getWhere();
} else {
Delete delete = (Delete) statement;
table = delete.getTable();
where = delete.getWhere();
}
// 根据where条件,查询表的旧数据
String querySql = "SELECT * FROM " + table.getFullyQualifiedName() + " WHERE " + where;
List<Map<String, Object>> oldValue = executeQuery(executor, configuration, querySql);
Audit audit = new Audit();
audit.setOperationType("DELETE");
audit.setDeleteValueChange(new DeleteValueChange(table.getName(), oldValue));
return invocation.proceed();
}
这里采用了ThreadLocal存储审计数据,通过AOP的方式,在拦截方法执行完之后,通过mq发送到处理系统进行处理。
这里还需要注意的是,需要判断哪些表需要进行审计操作,哪些表不需要,进行更精确的控制避免不必要的资源消耗。对付十分复杂的场景,还是需要认为的审计日志。