항목 목록이 포함 된 큰 파일이 있습니다.
항목의 일괄 처리를 만들고이 일괄 처리로 HTTP 요청을 만들고 싶습니다 (모든 항목은 HTTP 요청의 매개 변수로 필요함). for루프를 사용 하면 매우 쉽게 할 수 있지만, Java 8 애호가로서 Java 8의 Stream 프레임 워크로 이것을 작성해보고 싶습니다 (그리고 지연 처리의 이점을 누리고 있습니다).
예:
List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
batch.add(data.get(i));
if (batch.size() == BATCH_SIZE) process(batch);
}
if (batch.size() > 0) process(batch);
나는 긴 줄을하고 싶다.
lazyFileStream.group(500).map(processBatch).collect(toList())
이를 수행하는 가장 좋은 방법은 무엇입니까?
답변
노트! 이 솔루션은 forEach를 실행하기 전에 전체 파일을 읽습니다.
단일 스레드, 순차 스트림 사용 사례를 위해 Java 8 스트림을 확장하는 라이브러리 인 jOOλ 를 사용하여 수행 할 수 있습니다.
Seq.seq(lazyFileStream) // Seq<String>
.zipWithIndex() // Seq<Tuple2<String, Long>>
.groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
.forEach((index, batch) -> {
process(batch);
});
이면 zipWithIndex()은 다음과 같습니다.
static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
final Iterator<T> it = stream.iterator();
class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
long index;
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public Tuple2<T, Long> next() {
return tuple(it.next(), index++);
}
}
return seq(new ZipWithIndex());
}
…하지만 groupBy()API 편의는 다음과 같습니다.
default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
return collect(Collectors.groupingBy(classifier));
}
(면책 조항 : 나는 jOOλ 뒤에있는 회사에서 일합니다)
답변
완전성을 위해 여기에 Guava 솔루션이 있습니다.
Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);
질문에서 컬렉션을 사용할 수 있으므로 스트림이 필요하지 않으며 다음과 같이 쓸 수 있습니다.
Iterables.partition(data, batchSize).forEach(this::process);
답변
순수한 Java-8 구현도 가능합니다.
int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
.mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
.forEach(batch -> process(batch));
JOOl과 달리 병렬로 잘 작동 할 수 있습니다 ( data임의 액세스 목록 인 경우).
답변
순수 Java 8 솔루션 :
이를 우아하게 수행하기 위해 사용자 지정 수집기를 만들 수 있으며, 각 배치를 처리 batch size하기 Consumer위해 a 와 a 를 사용할 수 있습니다.
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;
import static java.util.Objects.requireNonNull;
/**
* Collects elements in the stream and calls the supplied batch processor
* after the configured batch size is reached.
*
* In case of a parallel stream, the batch processor may be called with
* elements less than the batch size.
*
* The elements are not kept in memory, and the final result will be an
* empty list.
*
* @param <T> Type of the elements being collected
*/
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {
private final int batchSize;
private final Consumer<List<T>> batchProcessor;
/**
* Constructs the batch collector
*
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
*/
BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
batchProcessor = requireNonNull(batchProcessor);
this.batchSize = batchSize;
this.batchProcessor = batchProcessor;
}
public Supplier<List<T>> supplier() {
return ArrayList::new;
}
public BiConsumer<List<T>, T> accumulator() {
return (ts, t) -> {
ts.add(t);
if (ts.size() >= batchSize) {
batchProcessor.accept(ts);
ts.clear();
}
};
}
public BinaryOperator<List<T>> combiner() {
return (ts, ots) -> {
// process each parallel list without checking for batch size
// avoids adding all elements of one to another
// can be modified if a strict batching mode is required
batchProcessor.accept(ts);
batchProcessor.accept(ots);
return Collections.emptyList();
};
}
public Function<List<T>, List<T>> finisher() {
return ts -> {
batchProcessor.accept(ts);
return Collections.emptyList();
};
}
public Set<Characteristics> characteristics() {
return Collections.emptySet();
}
}
선택적으로 도우미 유틸리티 클래스를 만듭니다.
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;
public class StreamUtils {
/**
* Creates a new batch collector
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
* @param <T> the type of elements being processed
* @return a batch collector instance
*/
public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
return new BatchCollector<T>(batchSize, batchProcessor);
}
}
사용 예 :
List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();
int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);
input.stream()
.collect(StreamUtils.batchCollector(batchSize, batchProcessor));
누군가가 살펴보고 싶다면 GitHub에도 내 코드를 게시했습니다.
답변
이와 같은 시나리오를 위해 사용자 지정 Spliterator를 작성했습니다. 입력 스트림에서 주어진 크기의 목록을 채 웁니다. 이 접근 방식의 장점은 지연 처리를 수행하고 다른 스트림 기능과 함께 작동한다는 것입니다.
public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) {
return batchSize <= 0
? Stream.of(stream.collect(Collectors.toList()))
: StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel());
}
private static class BatchSpliterator<E> implements Spliterator<List<E>> {
private final Spliterator<E> base;
private final int batchSize;
public BatchSpliterator(Spliterator<E> base, int batchSize) {
this.base = base;
this.batchSize = batchSize;
}
@Override
public boolean tryAdvance(Consumer<? super List<E>> action) {
final List<E> batch = new ArrayList<>(batchSize);
for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++)
;
if (batch.isEmpty())
return false;
action.accept(batch);
return true;
}
@Override
public Spliterator<List<E>> trySplit() {
if (base.estimateSize() <= batchSize)
return null;
final Spliterator<E> splitBase = this.base.trySplit();
return splitBase == null ? null
: new BatchSpliterator<>(splitBase, batchSize);
}
@Override
public long estimateSize() {
final double baseSize = base.estimateSize();
return baseSize == 0 ? 0
: (long) Math.ceil(baseSize / (double) batchSize);
}
@Override
public int characteristics() {
return base.characteristics();
}
}
답변
우리는 해결해야 할 비슷한 문제가있었습니다. 우리는 시스템 메모리보다 큰 스트림 (데이터베이스의 모든 개체를 반복)을 가져 와서 가능한 한 최선의 순서를 무작위로 지정하려고했습니다. 10,000 개의 항목을 버퍼링하고 무작위로 지정하는 것이 좋습니다.
대상은 스트림을받는 함수였습니다.
여기에 제안 된 솔루션 중 다양한 옵션이있는 것 같습니다.
- 다양한 비 Java 8 추가 라이브러리 사용
- 스트림이 아닌 것으로 시작-예 : 임의 액세스 목록
- 분할기에서 쉽게 분할 할 수있는 스트림이 있습니다.
우리의 본능은 원래 커스텀 컬렉터를 사용하는 것이었지만 이것은 스트리밍을 중단하는 것을 의미했습니다. 위의 사용자 지정 수집기 솔루션은 매우 훌륭하며 거의 사용했습니다.
다음은 Streams가 탈출구Iterator 로 사용할 수 있다는 사실을 사용하여 속임수를 쓰는 솔루션입니다 . 는 자바 8의 또 다른 비트를 사용하여 스트림으로 변환 돌아 마법을.IteratorStreamSupport
/**
* An iterator which returns batches of items taken from another iterator
*/
public class BatchingIterator<T> implements Iterator<List<T>> {
/**
* Given a stream, convert it to a stream of batches no greater than the
* batchSize.
* @param originalStream to convert
* @param batchSize maximum size of a batch
* @param <T> type of items in the stream
* @return a stream of batches taken sequentially from the original stream
*/
public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
}
private static <T> Stream<T> asStream(Iterator<T> iterator) {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator,ORDERED),
false);
}
private int batchSize;
private List<T> currentBatch;
private Iterator<T> sourceIterator;
public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
this.batchSize = batchSize;
this.sourceIterator = sourceIterator;
}
@Override
public boolean hasNext() {
prepareNextBatch();
return currentBatch!=null && !currentBatch.isEmpty();
}
@Override
public List<T> next() {
return currentBatch;
}
private void prepareNextBatch() {
currentBatch = new ArrayList<>(batchSize);
while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
currentBatch.add(sourceIterator.next());
}
}
}
이것을 사용하는 간단한 예는 다음과 같습니다.
@Test
public void getsBatches() {
BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
.forEach(System.out::println);
}
위의 인쇄
[A, B, C]
[D, E, F]
사용 사례에서는 배치를 섞은 다음 스트림으로 유지하려고했습니다. 다음과 같이 보입니다.
@Test
public void howScramblingCouldBeDone() {
BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
// the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
.map(list -> {
Collections.shuffle(list); return list; })
.flatMap(List::stream)
.forEach(System.out::println);
}
이것은 다음과 같은 것을 출력합니다 (무작위 화되어 매번 다릅니다)
A
C
B
E
D
F
여기서 비밀 소스는 항상 스트림이 있기 때문에 배치 스트림에서 작업하거나 각 배치에 대해 작업을 수행 한 다음 flatMap스트림으로 다시 돌아갈 수 있다는 것입니다. 더 나은, 위의 유일한 모두 최종으로 실행 forEach하거나 collect또는 다른 종단 표현 PULL 스트림을 통해 데이터를.
그것은 스트림에 iterator대한 특수한 유형의 종료 작업 이며 전체 스트림이 실행되고 메모리에 들어오지 않게하는 것으로 밝혀졌습니다 ! 멋진 디자인에 대한 Java 8 녀석들에게 감사합니다!
답변
RxJava를 사용할 수도 있습니다 .
Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));
또는
Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();
또는
Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();
