我一直在努力为我正在开发的应用程序实现多线程方法。
我想在并行线程中运行的部分最初是用一个for循环来构造一个列表的。
@Service
public ApplicationServiceImpl implements ApplicationService {
@Override
public ResponseEntity<Void> startProcess(List<MyObject> myObjectList) throws Exception {
for (MyObject myObject : myObjectList) {
AnotherTypeOfObject anotherTypeOfObject = runMethodA(myObject);
YetAnotherTypeOfObject yetAnotherTypeOfObject = runMethodB(anotherTypeOfObject);
runMethodC(yetAnotherTypeOfObject, aStringValue, anotherStringValue);
runMethodD(yetAnotherTypeOfObject);
}
}
}
private AnotherTypeOfObject runMethodA(MyObject MyObject){…} private YetAnotherTypeOfObject (AnotherTypeOfObject AnotherTypeOfObject){…} private void runMethodC(YetAnotherTypeOfObject YetAnotherTypeOfObject String stringvalue){…}和private void runMethodD(MyObject MyObject){…}只使用局部变量。
我已经寻找了相当多的解决方案,以允许触发一个包含100个MyObject的列表的线程,而不是一个接一个。
我所做的是创建一个:
@Configuration
@EnableAsync
public class AsyncConfiguration() {
@Bean(name = "threadPoolTaskExecutor")
public Executor aSyncExecutor() {
final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(4);
threadPoolTaskExecutor.setMaxPoolSize(4);
threadPoolTaskExecutor.setQueueCapacity(50);
threadPoolTaskExecutor.setThreadNamePrefix("threadNamePrefix");
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
我确实有大量的log。info(“一些可识别的文本”)通过方法A, B, C和D,所以我可以确保什么是发生的,我把这些方法聚合成一个
private void runThreads(MyObject myObject, String aStringValue, String anotherStringValue) {
AnotherTypeOfObject anotherTypeOfObject = runMethodA(myObject);
YetAnotherTypeOfObject yetAnotherTypeOfObject = runMethodB(anotherTypeOfObject);
runMethodC(yetAnotherTypeOfObject, aStringValue, anotherStringValue);
runMethodD(yetAnotherTypeOfObject);
}
我已经尝试运行的主要方法为:
@Override
@Async("threadPoolTaskExecutor")
public ResponseEntity<Void> startProcess(List<MyObject> myObjectList) throws Exception {
String aStringValue = myObject.getAStringValue();
String anotherStringValue = myObject.getAnotherStringValue();
myObjectList.forEach(myObject -> runThreads(myObject, aStringValue, anotherStringValue));
}
我仍然没有得到为runThreads(…){}方法触发几个线程的预期结果,因此处理是并行完成的。
我遗漏了什么?
###实际上你不是在并行化for循环,而是执行for循环的方法。在这种情况下,单个线程将执行所有循环。
你需要把@Async放在runThreads()的顶部
尽管不建议使用静态配置创建执行器。尝试使用可完成的未来API:
https://www。baeldung。com/java-completablefuture
###如果它只用于并行运行集合中的所有元素,那么你可以使用Stream。parallel()。它使用一个默认的ForkJoinPool,每个CPU内核有一个线程。这是Java 8中引入的最简单的方法。
myObjectList.stream()
.parallel()
.forEach(myObject -> runThreads(myObject, myObject.getAStringValue(), myObject.getAnotherStringValue()));
为此,你不需要任何@Async或spring提供的Executor。
您可以使用自定义ForkJoinPool来自定义线程数,但默认也可以。
ForkJoinPool customThreadPool = new ForkJoinPool(4);
customThreadPool.invoke(
() -> myObjectList.stream()
.parallel()
.forEach(myObject -> runThreads(myObject, myObject.getAStringValue(), myObject.getAnotherStringValue())));