醋醋百科网

Good Luck To You!

性能调优实战:Spring Boot 多线程处理SQL IN语句大量值的优化方案

环境:SpringBoot3.4.0


1. 简介

当我们编写的SQL语句包含有IN语句并且包含大量值时,往往会遇到性能瓶颈,甚至可能导致数据库报错。特别是在处理大数据集时,这种问题尤为突出。大量值的IN语句不仅会增加数据库的查询负担,还可能导致内存消耗过高、查询速度下降,甚至在某些数据库中会因为值过多而直接报错。

MySQL:没有固定的限制值,更多受限于 max_allowed_packet 参数所影响的整体SQL语句大小。

SHOW VARIABLES LIKE '%max_allowed_packet%';

输出结果

当我们执行超大SQL时,将看到如下的错误:

这与你整个执行的sql大小有关

Oracle:理论上支持的 IN 子句值的数量上限为1000项,超出此数目会导致错误。

Oracle好像是不能修改此限制的?

通常我们遇到次情况时可以采取如下的方式解决:

  • 使用临时表
  • 将IN语句中的值进行分批执行

在本篇文章中,我们通过AOP结合多线程技术,自动优化因SQL IN语句包含过多值引起的错误或是导致的性能低下问题。

2. 实战案例

2.1 自定义注解

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface SplitQuery {
  /**线程池bean名称;类型必须是Executor*/
  String executorName() default "" ;
  
  /**批处理大小*/
  int batchSize() default 100 ;
  
  /**返回值结果处理器beanName;类型必须是ResultHandler*/
  String handlerName() default "" ;
}

该注解标注了需要被处理的方法。

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.PARAMETER)
public @interface BatchParam {
}

该注解标注方法参数中哪个参数需要被处理。

2.2 返回值处理器定义

在切面中通过多线程处理完数据后,可以将结果传递给一个具体的返回值处理器来进一步处理。通过将数据处理和结果处理分离到不同的组件中(即多线程处理逻辑和返回值处理器),系统变得更加模块化。这种设计有助于降低组件之间的耦合度。当需要更改数据处理逻辑或结果处理方式时,只需修改相应的组件即可,无需对整个切面或业务逻辑进行大规模调整。这大大提高了系统的可扩展性和灵活性。

接口定义

public interface ResultHandler {
  T process(List<Object> result) ;
}

默认实现

public class DefaultResultHandler implements ResultHandler<Object> {
  @Override
  public Object process(List<Object> result) {
    return result ;
  }
}

默认处理器,不进行任何的处理直接返回结果;我们应该根据自己的业务来实现具体的逻辑处理。

2.3 切面定义

切面中我们会根据具体IN参数(List集合)的个数与注解中配置的批次大小进行拆分成多个线程进行并发处理数据(List.size / batchSize)。

@Aspect
@Component
public class SplitQueryAspect implements ApplicationContextAware {
  private static final Logger logger = LoggerFactory.getLogger(SplitQueryAspect.class) ;
  
  /**默认使用虚拟线程*/
  private static final Executor defaultExecutor = Executors.newVirtualThreadPerTaskExecutor() ;
  
  private ApplicationContext context ;
  
  @Pointcut("@annotation(sq)")
  private void splitPc(SplitQuery sq) {}
  
  @Around("splitPc(sq)")
  public Object splitQueryAround(ProceedingJoinPoint pjp, SplitQuery sq) throws Throwable {
    int batchSize = sq.batchSize() ;
    Executor executor = getExecutor(sq.executorName()) ;
    Object[] args = pjp.getArgs() ;
    MethodSignature ms = (MethodSignature) pjp.getSignature() ;
    
    Parameter[] parameters = ms.getMethod().getParameters() ;
    int index = -1 ;
    for (int i = 0, len = parameters.length; i < len; i++) {
      Parameter param = parameters[i] ;
      BatchParam batchParam = param.getAnnotation(BatchParam.class) ;
      if (batchParam != null) {
        index = i ;
        break ;
      }
    }
    Object arg = args[index] ;
    // 这里只考虑了参数集合是List情况
    if (index == -1 
        || !List.class.isAssignableFrom(arg.getClass()) 
        || ((List) arg).size() <= batchSize) {
      logger.info("直接调用目标方法...") ;
      return pjp.proceed() ;
    }
    ResultHandler resultHandler = getResultHandler(sq.handlerName()) ;
    final int paramIndex = index ;
    List data = (List) arg ;
    // 这里我们使用的guava进行拆分集合
    List partitions = Lists.partition(data, batchSize) ;
    List<Object> result = partitions.stream().map(chunk -> {
      return CompletableFuture.supplyAsync(() -> {
        try {
          Object[] newArgs = new Object[args.length] ;
          System.arraycopy(args, 0, newArgs, 0, args.length) ;
          newArgs[paramIndex] = chunk ;
          logger.info("处理批次数据: {}", newArgs[paramIndex]) ;
          return pjp.proceed(newArgs) ;
        } catch (Throwable e) {
          return null ;
        }
      }, executor) ; // 设置线程池
    }).collect(Collectors.toList())
        .stream()
        .map(CompletableFuture::join)
        // 过滤数据为null或空的情况
        .filter(obj -> obj != null && !((List)obj).isEmpty())
        .collect(Collectors.toList()) ;
    return resultHandler.process(result) ;
  }
  
  private Executor getExecutor(String executorName) {
    if (StringUtils.hasLength(executorName)) {
      try {
        return this.context.getBean(executorName, Executor.class) ;
      } catch (Exception e) {
        logger.warn("不存beanName为: {} 的线程池,将使用默认的虚拟线程池对象", executorName);
        return defaultExecutor ;
      }
    }
    return defaultExecutor ;
  }
  private ResultHandler getResultHandler(String handlerName) {
    if (StringUtils.hasLength(handlerName)) {
      try {
        return this.context.getBean(handlerName, ResultHandler.class) ;
      } catch (Exception e) {
        logger.warn("不存beanName为: {} 的结果处理器,将使用DefaultResultHandler", handlerName);
        return new DefaultResultHandler() ;
      }
    }
    return new DefaultResultHandler() ;
  }
  @Override
  public void setApplicationContext(ApplicationContext context) throws BeansException {
    this.context = context ;
  }
}

以上我们就完成了切面的编写,接下来我们就可以进行测试了。

2.4 业务代码编写

Repository接口定义

public interface PersonRepository extends JpaRepository {
  List findByAgeAndNameContainingAndIdIn(Integer age, String name, List ids) ;
}

自定义了一个根据age,name和id进行查询的方法。

Service业务方法

@Service
public class PersonService {
  private final PersonRepository personRepository ;
  public PersonService(PersonRepository personRepository) {
    this.personRepository = personRepository;
  }
  @SplitQuery(batchSize = 2, handlerName = "personResultHandler")
  public List query(Integer age, @BatchParam List ids, String name) {
    return this.personRepository.findByAgeAndNameContainingAndIdIn(age, name, ids) ;
  }
}

这里的query方法将通过切面多线程进行处理,其中设置了返回值处理器,该处理器定义如下:

@Component("personResultHandler")
public class PersonInResultHandler implements ResultHandler<List> {
  @Override
  public List process(List<Object> result) {
    if (result == null) {
      return null ;
    }
    return result.stream()
    // 这里我们知道返回的类型,所有可以直接进行类型的转换
    .flatMap(obj -> ((List)obj).stream())
    .collect(Collectors.toList()) ;
  }
}

2.5 测试

@RestController
@RequestMapping("/persons")
public class PersonController {
  private final PersonService personService ;
  public PersonController(PersonService personService) {
    this.personService = personService;
  }
  @GetMapping("/query")
  public ResponseEntity<List> query() {
    return ResponseEntity.ok(this.personService.query(11, 
      List.of(1L, 2L, 3L, 4L, 5L), "a")) ;
  }
}

调用上面的接口最终控制台SQL输出如下:

通过3个线程执行

我们将batchSize修改为6后再进行测试:

直接调用了目标方法,因为我们的List中的值小于batchSize的个数。

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言