질문
Java 8에서 적절한 백그라운드 로더를 어떻게 작성합니까? 조항들:
- 백그라운드에서 데이터를로드해야합니다
- 로드 후 데이터가 표시되어야합니다
- 데이터가로드되는 동안 더 이상 요청을 수락하지 않아야합니다.
- 데이터가로드되는 동안 요청이있는 경우 특정 시간 초과 (예 : 5 초) 후에 다른로드를 예약해야합니다.
예를 들어 다시로드 요청을 수락하지만 요청으로 데이터베이스가 넘치지 않도록하는 것입니다.
MCVE
여기 MCVE가 있습니다. Thread.sleep을 2 초간 호출하여 로딩을 시뮬레이션하는 백그라운드 작업으로 구성됩니다. 작업은 매초마다 예약되므로 자연스럽게 백그라운드 로딩 작업이 겹치게되므로 피해야합니다.
public class LoadInBackgroundExample {
/**
* A simple background task which should perform the data loading operation. In this minimal example it simply invokes Thread.sleep
*/
public static class BackgroundTask implements Runnable {
private int id;
public BackgroundTask(int id) {
this.id = id;
}
/**
* Sleep for a given amount of time to simulate loading.
*/
@Override
public void run() {
try {
System.out.println("Start #" + id + ": " + Thread.currentThread());
long sleepTime = 2000;
Thread.sleep( sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("Finish #" + id + ": " + Thread.currentThread());
}
}
}
/**
* CompletableFuture which simulates loading and showing data.
* @param taskId Identifier of the current task
*/
public static void loadInBackground( int taskId) {
// create the loading task
BackgroundTask backgroundTask = new BackgroundTask( taskId);
// "load" the data asynchronously
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
CompletableFuture<Void> future = CompletableFuture.runAsync(backgroundTask);
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return "task " + backgroundTask.id;
}
});
// display the data after they are loaded
CompletableFuture<Void> future = completableFuture.thenAccept(x -> {
System.out.println( "Background task finished:" + x);
});
}
public static void main(String[] args) {
// runnable which invokes the background loader every second
Runnable trigger = new Runnable() {
int taskId = 0;
public void run() {
loadInBackground( taskId++);
}
};
// create scheduler
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(trigger, 0, 1, TimeUnit.SECONDS);
// cancel the scheudler and the application after 10 seconds
scheduler.schedule(() -> beeperHandle.cancel(true), 10, TimeUnit.SECONDS);
try {
beeperHandle.get();
} catch (Throwable th) {
}
System.out.println( "Cancelled");
System.exit(0);
}
}
결과는 다음과 같습니다.
Start #0: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Start #1: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Start #2: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Finish #0: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Background task finished:task 0
Finish #1: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Background task finished:task 1
Start #3: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Finish #2: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Background task finished:task 2
Start #4: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Start #5: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Finish #3: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Background task finished:task 3
Start #6: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Finish #4: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Background task finished:task 4
Finish #5: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Background task finished:task 5
Start #7: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Finish #6: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Start #8: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Background task finished:task 6
Start #9: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Finish #7: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Background task finished:task 7
Start #10: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Finish #8: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Background task finished:task 8
Cancelled
목표는 # 0이 여전히 실행 중이므로 예를 들어 # 1과 # 2를 건너 뛰는 것입니다.
문제
차단 메커니즘을 어디에서 올바르게 설정합니까? 동기화를 사용해야합니까? 아니면 AtomicBoolean
? 그렇다면 get()
방법 내부 또는 다른 곳에 있어야 합니까?
답변
작업을 실행할 스레드 풀이 이미 있습니다. 반드시 다른 비동기 실행기에서 작업을 실행하는 것이 반드시 복잡하지는 않습니다 ( ForkJoinPool
을 사용할 때 CompletableFuture
)
간단하게:
public static void loadInBackground(int taskId) {
// create the loading task
BackgroundTask backgroundTask = new BackgroundTask(taskId);
// No need to run in async, as it already in executor
backgroundTask.run();
}
당신이 그것을 호출 할 때 ScheduledExecutorService를 단 하나의 작업이 시간에 실행되도록해야합니다 scheduleAtFixedRate
지정된 초기 지연 이후에 지정된 주기로 활성화되는 주기적 조치를 작성하고 실행합니다. 즉, initialDelay, initialDelay + 마침표, initialDelay + 2 * 기간 등이 실행 된 후에 실행이 시작됩니다. 작업 실행에서 예외가 발생하면 후속 실행이 억제됩니다. 그렇지 않으면 작업은 실행기의 취소 또는 종료를 통해서만 종료됩니다.이 작업의 실행이 기간보다 오래 걸리면 후속 실행이 늦게 시작될 수 있지만 동시에 실행되지는 않습니다 .
답변
요구 사항으로 다음을 고려하십시오.
- 백그라운드에서 데이터를로드해야합니다
- 로드 후 데이터가 표시되어야합니다
- 데이터가로드되는 동안 더 이상 요청을 수락하지 않아야합니다.
- 데이터가로드되는 동안 요청이있는 경우 특정 시간 초과 (예 : 5 초) 후에 다른로드를 예약해야합니다.
이 솔루션은 빌드에 기초 CA는 Executors.newSingleThreadExecutor()
, CompletableFuture
및 LinkedBlockingQueue
:
public class SingleThreadedLoader {
private static class BackgroundTask extends CompletableFuture<String> {
private final String query;
private BackgroundTask(String query) {
this.query = query;
}
public String getQuery() {
return query;
}
}
private final BlockingQueue<BackgroundTask> tasks = new LinkedBlockingQueue<>();
// while data are loaded no further requests should be accepted
private final Executor executor = Executors.newSingleThreadExecutor();
private final int delaySeconds;
private AtomicReference<Instant> lastExecution = new AtomicReference<>(Instant.EPOCH);
public SingleThreadedLoader(int delaySeconds) {
this.delaySeconds = delaySeconds;
setupLoading();
}
public BackgroundTask loadInBackground(String query) {
log("Enqueued query " + query);
BackgroundTask task = new BackgroundTask(query);
tasks.add(task);
return task;
}
private void setupLoading() {
// data should be loaded in background
executor.execute(() -> {
while (true) {
try {
// if there were requests while the data were loaded
// another loading should be scheduled after a certain timeout (e. g. 5 seconds)
Instant prev = lastExecution.get();
long delay = Duration.between(prev, Instant.now()).toSeconds();
if (delay < delaySeconds) {
log("Waiting for 5 seconds before next data loading");
TimeUnit.SECONDS.sleep(delaySeconds - delay);
}
BackgroundTask task = tasks.take();
try {
String query = task.getQuery();
String data = loadData(query);
task.complete(data);
} catch (Exception e) {
task.completeExceptionally(e);
}
lastExecution.set(Instant.now());
} catch (InterruptedException e) {
log(e.getMessage());
return;
}
}
});
}
private String loadData(String query) {
try {
log("Loading data for " + query);
TimeUnit.SECONDS.sleep(2);
log("Loaded data for " + query);
return "Result " + query;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static void log(String str) {
String time = LocalTime.now().truncatedTo(ChronoUnit.SECONDS).format(DateTimeFormatter.ISO_TIME);
String thread = Thread.currentThread().getName();
System.out.println(time + ' ' + thread + ": " + str);
}
public static void main(String[] args) throws Exception {
SingleThreadedLoader loader = new SingleThreadedLoader(5);
// after the loading the data should be displayed
loader.loadInBackground("1").thenAccept(SingleThreadedLoader::log);
loader.loadInBackground("2").thenAccept(SingleThreadedLoader::log);
loader.loadInBackground("3").thenAccept(SingleThreadedLoader::log);
log("Do another work in the main thread");
TimeUnit.SECONDS.sleep(30);
}
}
실행 후 stdout은 다음과 같은 출력을 갖습니다.
10:29:26 main: Enqueued query 1
10:29:26 pool-1-thread-1: Loading data for 1
10:29:26 main: Enqueued query 2
10:29:26 main: Enqueued query 3
10:29:26 main: Do another work in the main thread
10:29:28 pool-1-thread-1: Loaded data for 1
10:29:28 pool-1-thread-1: Result 1
10:29:28 pool-1-thread-1: Waiting for 5 seconds before next data loading
10:29:33 pool-1-thread-1: Loading data for 2
10:29:36 pool-1-thread-1: Loaded data for 2
10:29:36 pool-1-thread-1: Result 2
10:29:36 pool-1-thread-1: Waiting for 5 seconds before next data loading
10:29:41 pool-1-thread-1: Loading data for 3
10:29:43 pool-1-thread-1: Loaded data for 3
10:29:43 pool-1-thread-1: Result 3
답변
나는 간단한 lock () 및 unlock () 메소드로 작업을 실행하는 카운터 역할을하는 AtomicInteger를 추가하여 출력을 얻은 원래 코드로 약간 변경했습니다.
Start #0: Thread[ForkJoinPool.commonPool-worker-2,5,main]
background task cancelled 1
background task cancelled 2
Finish #0: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Background task finished:task 0
Start #3: Thread[ForkJoinPool.commonPool-worker-2,5,main]
background task cancelled 4
Finish #3: Thread[ForkJoinPool.commonPool-worker-2,5,main]
background task cancelled 5
Background task finished:task 3
Start #6: Thread[ForkJoinPool.commonPool-worker-3,5,main]
background task cancelled 7
Finish #6: Thread[ForkJoinPool.commonPool-worker-3,5,main]
background task cancelled 8
Background task finished:task 6
Start #9: Thread[ForkJoinPool.commonPool-worker-2,5,main]
background task cancelled 10
Cancelled
귀하의 작업에 대한 나의 해결책은 다음과 같습니다.
public class LoadInBackgroundExample {
//Added new exception
public static class AlreadyIsRunningException extends RuntimeException {
long taskId;
public AlreadyIsRunningException(String message, long taskId) {
super(message);
this.taskId = taskId;
}
public long getTaskId() {
return taskId;
}
public void setTaskId(long taskId) {
this.taskId = taskId;
}
}
/**
* A simple background task which should perform the data loading operation. In this minimal example it simply invokes Thread.sleep
*/
public static class BackgroundTask implements Runnable {
//this atomicInteger acts as a global lock counter for BackgroundTask objects
private static AtomicInteger counter = new AtomicInteger(0);
private int id;
public BackgroundTask(int id) {
this.id = id;
}
private void unlock() {
counter.decrementAndGet();
}
private void lock() {
//we need to check this way to avoid some unlucky timing between threads
int lockValue = counter.incrementAndGet();
//if we got counter different than 1 that means that some other task is already running (it has already acquired the lock)
if (lockValue != 1) {
//rollback our check
counter.decrementAndGet();
//throw an exception
throw new AlreadyIsRunningException("Some other task already is running", id);
}
}
/**
* Sleep for a given amount of time to simulate loading.
*/
@Override
public void run() {
//Check if we can acquire lock
lock();
//we have a lock to
try {
System.out.println("Start #" + id + ": " + Thread.currentThread());
long sleepTime = 2000;
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("Finish #" + id + ": " + Thread.currentThread());
unlock();
}
}
}
/**
* CompletableFuture which simulates loading and showing data.
*
* @param taskId Identifier of the current task
*/
public static void loadInBackground(int taskId) {
// create the loading task
BackgroundTask backgroundTask = new BackgroundTask(taskId);
// "load" the data asynchronously
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
CompletableFuture<Void> future = CompletableFuture.runAsync(backgroundTask);
try {
future.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof AlreadyIsRunningException) {
System.out.println("background task cancelled " + ((AlreadyIsRunningException) e.getCause()).getTaskId());
throw (AlreadyIsRunningException) e.getCause();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task " + backgroundTask.id;
}
});
// display the data after they are loaded
CompletableFuture<Void> future = completableFuture.thenAccept(x -> {
System.out.println("Background task finished:" + x);
});
}
ArrayList<BackgroundTask> backgroundTasks = new ArrayList<>();
public static void main(String[] args) {
// runnable which invokes the background loader every second
Runnable trigger = new Runnable() {
int taskId = 0;
public void run() {
loadInBackground(taskId++);
}
};
// create scheduler
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(trigger, 0, 1, TimeUnit.SECONDS);
// cancel the scheudler and the application after 10 seconds
scheduler.schedule(() -> beeperHandle.cancel(true), 10, TimeUnit.SECONDS);
try {
beeperHandle.get();
} catch (Throwable th) {
}
System.out.println("Cancelled");
System.exit(0);
}
최신 정보
lock () 및 unlock () 메소드를보다 간단한 형식으로 변경했습니다.
private static AtomicBoolean atomicBoolean = new AtomicBoolean(false);
private void unlock() {
atomicBoolean.set(false);
}
private void lock() {
//if 'changed' is false that means some other task is already running
boolean changed = atomicBoolean.compareAndSet(false,true);
if (!changed) {
throw new AlreadyIsRunningException("Some other task is already running", id);
}
}
답변
이해하면 동시에 여러 가지 작업을 백그라운드로 수행 할 수 있습니다. 이러한 작업은 정확히 동일한 작업을 수행하므로 작업을 병렬로 실행하지 않으려면 작업을 완료하고 결과를 다른 작업과 공유하기 위해 하나의 작업이 필요합니다. 따라서 CompletableFuture
동시에 10을 얻는다면 그들 중 하나가 ‘reload’를 db로 호출하고 실행 결과를 다른 사람들과 공유하는 방식으로CompletableFuture
결과가 정상적으로 완료 합니다. 나는 이것을
목표는 # 0이 여전히 실행 중이므로 예를 들어 # 1과 # 2를 건너 뛰는 것입니다.
과
로드 후 데이터가 표시되어야합니다
내 추측이 맞다면 내 해결책을 시도해 볼 수 있습니다.
나는 작업 사이에 어떤 종류의 부모-자식 관계가 있습니다. 부모 과제는 실제로 업무를 수행하고 자녀에게 얻은 결과를 공유하는 것입니다. 자식 작업은 부모 작업이 여전히 실행되는 동안 추가 된 작업이며 자식 작업은 부모 작업이 실행을 마칠 때까지 기다립니다. 부모 과제의 결과가 여전히 ‘신선한’상태이기 때문에 각 자녀에게 복사되어 모든 자녀가 미래를 완성합니다.
public class BackgroundService {
public static class BackgroundJob implements Callable<String> {
private static BackgroundJob ROOT_JOB = null;
private synchronized static void addBackgroundJob(BackgroundJob backgroundJob) {
if (ROOT_JOB != null) {
ROOT_JOB.addChild(backgroundJob);
} else {
System.out.println();
System.out.println(Thread.currentThread().getName() + " RUNNING ROOT TASK-" + backgroundJob.jobId);
ROOT_JOB = backgroundJob;
}
}
private synchronized static void unlock() {
ROOT_JOB = null;
}
private final int jobId;
private List<BackgroundJob> children = new ArrayList<>();
private BackgroundJob parent;
private String providedResultFromParent = null;
public BackgroundJob(int jobId) {
this.jobId = jobId;
}
private void addChild(BackgroundJob backgroundJob) {
backgroundJob.parent = this;
this.children.add(backgroundJob);
}
@Override
public String call() throws Exception {
addBackgroundJob(this);
if (parent == null) {
String result = logic();
synchronized (ROOT_JOB) {
for (final BackgroundJob backgroundJob : children) {
backgroundJob.providedResultFromParent = result;
synchronized (backgroundJob) {
backgroundJob.notify();
}
}
unlock();
}
return "\t\tROOT task" + jobId + "'s " + result;
} else {
synchronized (this) {
System.out.println(Thread.currentThread().getName() + "\t\tskipping task-" + jobId + " and waiting running task-" + parent.jobId + " to finish");
this.wait();
}
return "\t\t\t\ttask-" + jobId + "'s " + providedResultFromParent;
}
}
private String logic() throws InterruptedException {
Thread.sleep(2000);
return (int) (Math.random() * 1000) + " ";
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
AtomicInteger atomicInteger = new AtomicInteger();
ExecutorService pool = Executors.newCachedThreadPool();
Supplier<String> job = () -> {
int taskId = atomicInteger.incrementAndGet();
BackgroundJob backgroundJob = new BackgroundJob(taskId);
try {
return backgroundJob.call();
} catch (Exception e) {
e.printStackTrace();
}
return "finished " + taskId;
};
for (int i = 100; i > 0; i--) {
CompletableFuture.supplyAsync(job, pool).thenAccept(s -> System.out.println(Thread.currentThread().getName()+" "+ s + " result is readable"));
Thread.sleep((long) (Math.random() * 500));
}
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
pool.shutdown();
}
그리고 출력은 다음과 같습니다.
pool-1-thread-1 RUNNING ROOT TASK-1
pool-1-thread-2 skipping task-2 and waiting running task-1 to finish
pool-1-thread-3 skipping task-3 and waiting running task-1 to finish
pool-1-thread-4 skipping task-4 and waiting running task-1 to finish
pool-1-thread-5 skipping task-5 and waiting running task-1 to finish
pool-1-thread-6 skipping task-6 and waiting running task-1 to finish
pool-1-thread-7 skipping task-7 and waiting running task-1 to finish
pool-1-thread-8 skipping task-8 and waiting running task-1 to finish
pool-1-thread-3 task-3's 165 result is readable
pool-1-thread-8 task-8's 165 result is readable
pool-1-thread-6 task-6's 165 result is readable
pool-1-thread-5 task-5's 165 result is readable
pool-1-thread-7 task-7's 165 result is readable
pool-1-thread-2 task-2's 165 result is readable
pool-1-thread-1 ROOT task1's 165 result is readable
pool-1-thread-4 task-4's 165 result is readable
pool-1-thread-4 RUNNING ROOT TASK-9
pool-1-thread-1 skipping task-10 and waiting running task-9 to finish
pool-1-thread-2 skipping task-11 and waiting running task-9 to finish
pool-1-thread-7 skipping task-12 and waiting running task-9 to finish
pool-1-thread-5 skipping task-13 and waiting running task-9 to finish
pool-1-thread-8 skipping task-14 and waiting running task-9 to finish
pool-1-thread-6 skipping task-15 and waiting running task-9 to finish
pool-1-thread-3 skipping task-16 and waiting running task-9 to finish
pool-1-thread-9 skipping task-17 and waiting running task-9 to finish
pool-1-thread-10 skipping task-18 and waiting running task-9 to finish
pool-1-thread-1 task-10's 370 result is readable
pool-1-thread-10 task-18's 370 result is readable
pool-1-thread-4 ROOT task9's 370 result is readable
pool-1-thread-9 task-17's 370 result is readable
pool-1-thread-7 task-12's 370 result is readable
pool-1-thread-6 task-15's 370 result is readable
pool-1-thread-8 task-14's 370 result is readable
pool-1-thread-2 task-11's 370 result is readable
pool-1-thread-3 task-16's 370 result is readable
pool-1-thread-5 task-13's 370 result is readable
pool-1-thread-5 RUNNING ROOT TASK-19
pool-1-thread-3 skipping task-20 and waiting running task-19 to finish
pool-1-thread-2 skipping task-21 and waiting running task-19 to finish
pool-1-thread-8 skipping task-22 and waiting running task-19 to finish
pool-1-thread-6 skipping task-23 and waiting running task-19 to finish
pool-1-thread-7 skipping task-24 and waiting running task-19 to finish
pool-1-thread-9 skipping task-25 and waiting running task-19 to finish
pool-1-thread-4 skipping task-26 and waiting running task-19 to finish
pool-1-thread-10 skipping task-27 and waiting running task-19 to finish
pool-1-thread-1 skipping task-28 and waiting running task-19 to finish
pool-1-thread-5 ROOT task19's 574 result is readable
pool-1-thread-8 task-22's 574 result is readable
pool-1-thread-4 task-26's 574 result is readable
pool-1-thread-7 task-24's 574 result is readable
pool-1-thread-6 task-23's 574 result is readable
pool-1-thread-3 task-20's 574 result is readable
pool-1-thread-9 task-25's 574 result is readable
pool-1-thread-2 task-21's 574 result is readable
pool-1-thread-1 task-28's 574 result is readable
pool-1-thread-10 task-27's 574 result is readable
pool-1-thread-10 RUNNING ROOT TASK-29
pool-1-thread-1 skipping task-30 and waiting running task-29 to finish
pool-1-thread-2 skipping task-31 and waiting running task-29 to finish
pool-1-thread-9 skipping task-32 and waiting running task-29 to finish
pool-1-thread-3 skipping task-33 and waiting running task-29 to finish
pool-1-thread-6 skipping task-34 and waiting running task-29 to finish
pool-1-thread-7 skipping task-35 and waiting running task-29 to finish
pool-1-thread-4 skipping task-36 and waiting running task-29 to finish
pool-1-thread-8 skipping task-37 and waiting running task-29 to finish
pool-1-thread-5 skipping task-38 and waiting running task-29 to finish
pool-1-thread-11 skipping task-39 and waiting running task-29 to finish
pool-1-thread-1 task-30's 230 result is readable
pool-1-thread-11 task-39's 230 result is readable
pool-1-thread-8 task-37's 230 result is readable
pool-1-thread-5 task-38's 230 result is readable
pool-1-thread-4 task-36's 230 result is readable
pool-1-thread-7 task-35's 230 result is readable
pool-1-thread-12 RUNNING ROOT TASK-40
pool-1-thread-6 task-34's 230 result is readable
pool-1-thread-10 ROOT task29's 230 result is readable
pool-1-thread-3 task-33's 230 result is readable
pool-1-thread-9 task-32's 230 result is readable
pool-1-thread-2 task-31's 230 result is readable
pool-1-thread-2 skipping task-41 and waiting running task-40 to finish
pool-1-thread-9 skipping task-42 and waiting running task-40 to finish
pool-1-thread-3 skipping task-43 and waiting running task-40 to finish
pool-1-thread-10 skipping task-44 and waiting running task-40 to finish
pool-1-thread-6 skipping task-45 and waiting running task-40 to finish
pool-1-thread-7 skipping task-46 and waiting running task-40 to finish
pool-1-thread-2 task-41's 282 result is readable
pool-1-thread-10 task-44's 282 result is readable
pool-1-thread-6 task-45's 282 result is readable
pool-1-thread-7 task-46's 282 result is readable
pool-1-thread-3 task-43's 282 result is readable
pool-1-thread-9 task-42's 282 result is readable
pool-1-thread-12 ROOT task40's 282 result is readable
pool-1-thread-12 RUNNING ROOT TASK-47
pool-1-thread-9 skipping task-48 and waiting running task-47 to finish
pool-1-thread-3 skipping task-49 and waiting running task-47 to finish
pool-1-thread-7 skipping task-50 and waiting running task-47 to finish
pool-1-thread-6 skipping task-51 and waiting running task-47 to finish
pool-1-thread-10 skipping task-52 and waiting running task-47 to finish
pool-1-thread-2 skipping task-53 and waiting running task-47 to finish
pool-1-thread-12 ROOT task47's 871 result is readable
pool-1-thread-10 task-52's 871 result is readable
pool-1-thread-2 task-53's 871 result is readable
pool-1-thread-3 task-49's 871 result is readable
pool-1-thread-6 task-51's 871 result is readable
pool-1-thread-7 task-50's 871 result is readable
pool-1-thread-9 task-48's 871 result is readable
pool-1-thread-9 RUNNING ROOT TASK-54
pool-1-thread-7 skipping task-55 and waiting running task-54 to finish
pool-1-thread-6 skipping task-56 and waiting running task-54 to finish
pool-1-thread-3 skipping task-57 and waiting running task-54 to finish
pool-1-thread-2 skipping task-58 and waiting running task-54 to finish
pool-1-thread-10 skipping task-59 and waiting running task-54 to finish
pool-1-thread-12 skipping task-60 and waiting running task-54 to finish
pool-1-thread-4 skipping task-61 and waiting running task-54 to finish
pool-1-thread-5 skipping task-62 and waiting running task-54 to finish
pool-1-thread-9 ROOT task54's 345 result is readable
pool-1-thread-2 task-58's 345 result is readable
pool-1-thread-5 task-62's 345 result is readable
pool-1-thread-7 task-55's 345 result is readable
pool-1-thread-10 task-59's 345 result is readable
pool-1-thread-6 task-56's 345 result is readable
pool-1-thread-3 task-57's 345 result is readable
pool-1-thread-4 task-61's 345 result is readable
pool-1-thread-12 task-60's 345 result is readable
pool-1-thread-12 RUNNING ROOT TASK-63
pool-1-thread-4 skipping task-64 and waiting running task-63 to finish
pool-1-thread-3 skipping task-65 and waiting running task-63 to finish
pool-1-thread-6 skipping task-66 and waiting running task-63 to finish
pool-1-thread-10 skipping task-67 and waiting running task-63 to finish
pool-1-thread-7 skipping task-68 and waiting running task-63 to finish
pool-1-thread-5 skipping task-69 and waiting running task-63 to finish
pool-1-thread-2 skipping task-70 and waiting running task-63 to finish
pool-1-thread-12 ROOT task63's 670 result is readable
pool-1-thread-2 task-70's 670 result is readable
pool-1-thread-5 task-69's 670 result is readable
pool-1-thread-7 task-68's 670 result is readable
pool-1-thread-10 task-67's 670 result is readable
pool-1-thread-6 task-66's 670 result is readable
pool-1-thread-3 task-65's 670 result is readable
pool-1-thread-4 task-64's 670 result is readable
pool-1-thread-4 RUNNING ROOT TASK-71
pool-1-thread-3 skipping task-72 and waiting running task-71 to finish
pool-1-thread-6 skipping task-73 and waiting running task-71 to finish
pool-1-thread-10 skipping task-74 and waiting running task-71 to finish
pool-1-thread-7 skipping task-75 and waiting running task-71 to finish
pool-1-thread-5 skipping task-76 and waiting running task-71 to finish
pool-1-thread-2 skipping task-77 and waiting running task-71 to finish
pool-1-thread-12 skipping task-78 and waiting running task-71 to finish
pool-1-thread-9 skipping task-79 and waiting running task-71 to finish
pool-1-thread-8 skipping task-80 and waiting running task-71 to finish
pool-1-thread-4 ROOT task71's 445 result is readable
pool-1-thread-6 task-73's 445 result is readable
pool-1-thread-9 task-79's 445 result is readable
pool-1-thread-3 task-72's 445 result is readable
pool-1-thread-8 task-80's 445 result is readable
pool-1-thread-12 task-78's 445 result is readable
pool-1-thread-5 task-76's 445 result is readable
pool-1-thread-10 task-74's 445 result is readable
pool-1-thread-2 task-77's 445 result is readable
pool-1-thread-7 task-75's 445 result is readable
pool-1-thread-7 RUNNING ROOT TASK-81
pool-1-thread-2 skipping task-82 and waiting running task-81 to finish
pool-1-thread-10 skipping task-83 and waiting running task-81 to finish
pool-1-thread-5 skipping task-84 and waiting running task-81 to finish
pool-1-thread-12 skipping task-85 and waiting running task-81 to finish
pool-1-thread-8 skipping task-86 and waiting running task-81 to finish
pool-1-thread-3 skipping task-87 and waiting running task-81 to finish
pool-1-thread-9 skipping task-88 and waiting running task-81 to finish
pool-1-thread-6 skipping task-89 and waiting running task-81 to finish
pool-1-thread-7 ROOT task81's 141 result is readable
pool-1-thread-6 task-89's 141 result is readable
pool-1-thread-9 task-88's 141 result is readable
pool-1-thread-3 task-87's 141 result is readable
pool-1-thread-10 task-83's 141 result is readable
pool-1-thread-5 task-84's 141 result is readable
pool-1-thread-12 task-85's 141 result is readable
pool-1-thread-8 task-86's 141 result is readable
pool-1-thread-2 task-82's 141 result is readable
pool-1-thread-2 RUNNING ROOT TASK-90
pool-1-thread-8 skipping task-91 and waiting running task-90 to finish
pool-1-thread-12 skipping task-92 and waiting running task-90 to finish
pool-1-thread-5 skipping task-93 and waiting running task-90 to finish
pool-1-thread-10 skipping task-94 and waiting running task-90 to finish
pool-1-thread-3 skipping task-95 and waiting running task-90 to finish
pool-1-thread-9 skipping task-96 and waiting running task-90 to finish
pool-1-thread-6 skipping task-97 and waiting running task-90 to finish
pool-1-thread-7 skipping task-98 and waiting running task-90 to finish
pool-1-thread-4 skipping task-99 and waiting running task-90 to finish
pool-1-thread-11 skipping task-100 and waiting running task-90 to finish
pool-1-thread-2 ROOT task90's 321 result is readable
pool-1-thread-3 task-95's 321 result is readable
pool-1-thread-7 task-98's 321 result is readable
pool-1-thread-8 task-91's 321 result is readable
pool-1-thread-11 task-100's 321 result is readable
pool-1-thread-4 task-99's 321 result is readable
pool-1-thread-5 task-93's 321 result is readable
pool-1-thread-9 task-96's 321 result is readable
pool-1-thread-12 task-92's 321 result is readable
pool-1-thread-10 task-94's 321 result is readable
pool-1-thread-6 task-97's 321 result is readable
답변
하나의 단일 액세스 스레드 만 원하면 간단한 동기화 작업이 수행됩니다 …
산출:
Start #2: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Finish #0: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Background task finished:task 0 finished getting data...
Start #3: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Finish #2: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Start #4: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Background task finished:task 2 finished getting data...
Finish #1: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Background task finished:task 1 finished getting data...
Start #6: Thread[ForkJoinPool.commonPool-worker-3,5,main]
Finish #4: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Start #5: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Background task finished:task 4 finished getting data...
Finish #3: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Start #7: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Background task finished:task 3 finished getting data...
Cancelled
암호:
package queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public class LoadInBackgroundExample {
public static class SyncronizedBackend {
public synchronized String getData() {
long sleepTime = 2000;
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return new String("finished getting data...");
}
}
/**
* A simple background task which should perform the data loading operation. In
* this minimal example it simply invokes Thread.sleep
*/
public static class BackgroundTask implements Runnable {
private int id;
private SyncronizedBackend syncronizedBackend;
private String result;
public BackgroundTask(SyncronizedBackend syncronizedBackend, int id) {
this.syncronizedBackend = syncronizedBackend;
this.id = id;
}
/**
* Sleep for a given amount of time to simulate loading.
*/
@Override
public void run() {
System.out.println("Start #" + id + ": " + Thread.currentThread());
result = this.syncronizedBackend.getData();
System.out.println("Finish #" + id + ": " + Thread.currentThread());
}
public String getResult() {
return result;
}
}
/**
* CompletableFuture which simulates loading and showing data.
* @param syncronizedBackend
*
* @param taskId Identifier of the current task
*/
public static void loadInBackground(SyncronizedBackend syncronizedBackend, int taskId) {
// create the loading task
BackgroundTask backgroundTask = new BackgroundTask(syncronizedBackend, taskId);
// "load" the data asynchronously
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
CompletableFuture<Void> future = CompletableFuture.runAsync(backgroundTask);
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return "task " + backgroundTask.id + " " + backgroundTask.getResult();
}
});
// display the data after they are loaded
CompletableFuture<Void> future = completableFuture.thenAccept(x -> {
System.out.println("Background task finished:" + x);
});
}
public static void main(String[] args) {
SyncronizedBackend syncronizedBackend = new SyncronizedBackend();
// runnable which invokes the background loader every second
Runnable trigger = new Runnable() {
int taskId = 0;
public void run() {
loadInBackground(syncronizedBackend, taskId++);
}
};
// create scheduler
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(trigger, 0, 1, TimeUnit.SECONDS);
// cancel the scheudler and the application after 10 seconds
scheduler.schedule(() -> beeperHandle.cancel(true), 10, TimeUnit.SECONDS);
try {
beeperHandle.get();
} catch (Throwable th) {
}
System.out.println("Cancelled");
System.exit(0);
}
}
답변
I 클래스, 스레드 듀얼 스위치를 사용하여 솔루션을 참조 시도 BackgroundTaskDualSwitch
, 그것은 시뮬레이트 로딩이를 사용 CompletableFuture
. 아이디어는 현재 실행중인 작업이 완료 될 때까지 두 번째 작업이 대기하도록하는 것입니다 (변경 사항 참조) BackgroundTask
. 이를 통해 최대 하나의 작업 스레드가 실행 중이고 최대 하나의 작업 스레드가 대기 중입니다. 실행중인 작업이 완료 될 때까지 추가 요청을 건너 뛰고 다음 요청을 처리하기 위해 ‘자유’상태가됩니다.
public static class BackgroundTask extends Thread {
private int id;
private Thread pendingTask;
public BackgroundTask(int id) {
this.id = id;
}
public BackgroundTask(int id, Thread pendingTask) {
this(id);
this.pendingTask = pendingTask;
}
/**
* Sleep for a given amount of time to simulate loading.
*/
@Override
public void run() {
try {
if (pendingTask != null && pendingTask.isAlive()) {
pendingTask.join();
}
System.out.println("Start #" + id + ": " + Thread.currentThread());
...
}
}
public static class BackgroundTaskDualSwitch {
private static BackgroundTask task1;
private static BackgroundTask task2;
public static synchronized boolean runTask(int taskId) {
if (! isBusy(task1)) {
if (isBusy(task2)) {
task1 = new BackgroundTask(taskId, task2);
} else {
task1 = new BackgroundTask(taskId);
}
runAsync(task1);
return true;
} else if (! isBusy(task2)) {
if (isBusy(task1)) {
task2 = new BackgroundTask(taskId, task1);
} else {
task2 = new BackgroundTask(taskId);
}
runAsync(task2);
return true;
} else {
return false; // SKIPPED
}
}
private static void runAsync(BackgroundTask task) {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
task.start();
task.join();
}
catch (InterruptedException e) {
e.printStackTrace();
}
return "task " + task.id;
}
});
// display the data after they are loaded
CompletableFuture<Void> future = completableFuture.thenAccept(x -> {
System.out.println( "Background task finished:" + x);
});
}
private static boolean isBusy(BackgroundTask task) {
return task != null && task.isAlive();
}
}
/**
* Simulates loading and showing data.
* @param taskId Identifier of the current task
*/
public static void loadInBackground(int taskId) {
// create the loading task
if (! BackgroundTaskDualSwitch.runTask(taskId)) {
System.out.println( "Background task ignored:task " + taskId); // SKIPPED
}
}
...
출력은 다음과 같습니다
Start #0: Thread[Thread-0,5,main]
Background task ignored:task 2
Finish #0: Thread[Thread-0,5,main]
Start #1: Thread[Thread-1,5,main]
Background task finished:task 0
Background task ignored:task 4
Finish #1: Thread[Thread-1,5,main]
Start #3: Thread[Thread-2,5,main]
Background task finished:task 1
Background task ignored:task 6
Finish #3: Thread[Thread-2,5,main]
Start #5: Thread[Thread-3,5,main]
Background task finished:task 3
Background task ignored:task 8
Finish #5: Thread[Thread-3,5,main]
Start #7: Thread[Thread-4,5,main]
Background task finished:task 5
Background task ignored:task 10
Finish #7: Thread[Thread-4,5,main]
Start #9: Thread[Thread-5,5,main]
Background task finished:task 7
Cancelled
답변
고가의 작업을 시작한 첫 번째 스레드는 결과를 콜백으로 알립니다. 이를 실행하려는 다른 스레드는 ExpensiveWork.notificables에 등록되므로 값 비싼 작업이 완료되면 작업을 수행 한 스레드가이를 통지합니다.
한편 스레드는 5 초마다 결과를 확인합니다.
public class ExpensiveWorkTest {
private final static int THREADS = 20;
private final static long THREAD_TIMEOUT = 5000L;
@Test
public void example() throws InterruptedException {
ExpensiveWork<String> expensiveWork = new ExpensiveWorkImpl();
ExecutorService service = Executors.newFixedThreadPool(THREADS);
for(int i=0; i<THREADS;i++) {
service.execute(() ->{
Notificable<String> notificable = new NotificableImpl();
expensiveWork.execute(notificable);
while(notificable.getExpensiveResult() == null) {
try {
Thread.sleep(THREAD_TIMEOUT);
} catch (InterruptedException e) {}
}
System.out.println(Thread.currentThread().getName()+" has the message: "+notificable.getExpensiveResult());
});
}
service.awaitTermination(60, TimeUnit.SECONDS);
}
public static abstract class ExpensiveWork<T> {
private final AtomicBoolean runnning = new AtomicBoolean(false);
private List<Notificable<T>> notificables = Collections.synchronizedList(new ArrayList<>());
public void execute(Notificable<T> notificable) {
String id = Thread.currentThread().getName();
System.out.println("Loading data for "+id);
notificables.add(notificable);
if(!runnning.getAndSet(true)) {
System.out.println("Running the expensive work "+id);
T expensiveResult = expensiveWork();
notificables.stream().forEach(n -> n.callback(expensiveResult));
} else {
System.out.println(id+" will receive the response later");
}
}
protected abstract T expensiveWork();
}
public static class ExpensiveWorkImpl extends ExpensiveWork<String>{
@Override
public String expensiveWork() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {}
return "<Expensive result>";
}
}
public static interface Notificable<T> {
void callback(T expensiveResult);
T getExpensiveResult();
}
public static class NotificableImpl implements Notificable<String> {
private volatile String expensiveResult;
@Override
public void callback(String expensiveResult) {
this.expensiveResult = expensiveResult;
}
@Override
public String getExpensiveResult() {
return expensiveResult;
}
}
}
그리고 이것은 출력입니다.
Loading data for pool-1-thread-6
Loading data for pool-1-thread-7
Loading data for pool-1-thread-9
pool-1-thread-9 will receive the response later
Loading data for pool-1-thread-8
pool-1-thread-8 will receive the response later
Loading data for pool-1-thread-1
Loading data for pool-1-thread-2
Loading data for pool-1-thread-12
pool-1-thread-12 will receive the response later
Loading data for pool-1-thread-3
pool-1-thread-3 will receive the response later
Loading data for pool-1-thread-4
Loading data for pool-1-thread-5
pool-1-thread-5 will receive the response later
pool-1-thread-4 will receive the response later
Loading data for pool-1-thread-14
pool-1-thread-14 will receive the response later
Loading data for pool-1-thread-13
Loading data for pool-1-thread-15
pool-1-thread-15 will receive the response later
pool-1-thread-2 will receive the response later
pool-1-thread-1 will receive the response later
Loading data for pool-1-thread-11
pool-1-thread-11 will receive the response later
Loading data for pool-1-thread-10
pool-1-thread-10 will receive the response later
pool-1-thread-7 will receive the response later
Running the expensive work pool-1-thread-6
Loading data for pool-1-thread-18
pool-1-thread-18 will receive the response later
Loading data for pool-1-thread-17
Loading data for pool-1-thread-16
pool-1-thread-16 will receive the response later
pool-1-thread-13 will receive the response later
Loading data for pool-1-thread-19
pool-1-thread-19 will receive the response later
pool-1-thread-17 will receive the response later
Loading data for pool-1-thread-20
pool-1-thread-20 will receive the response later
pool-1-thread-6 has the message: <Expensive result>
pool-1-thread-8 has the message: <Expensive result>
pool-1-thread-12 has the message: <Expensive result>
pool-1-thread-9 has the message: <Expensive result>
pool-1-thread-11 has the message: <Expensive result>
pool-1-thread-1 has the message: <Expensive result>
pool-1-thread-2 has the message: <Expensive result>
pool-1-thread-3 has the message: <Expensive result>
pool-1-thread-15 has the message: <Expensive result>
pool-1-thread-4 has the message: <Expensive result>
pool-1-thread-14 has the message: <Expensive result>
pool-1-thread-10 has the message: <Expensive result>
pool-1-thread-5 has the message: <Expensive result>
pool-1-thread-13 has the message: <Expensive result>
pool-1-thread-16 has the message: <Expensive result>
pool-1-thread-19 has the message: <Expensive result>
pool-1-thread-20 has the message: <Expensive result>
pool-1-thread-7 has the message: <Expensive result>
pool-1-thread-18 has the message: <Expensive result>
pool-1-thread-17 has the message: <Expensive result>