항목 목록이 포함 된 큰 파일이 있습니다.
항목의 일괄 처리를 만들고이 일괄 처리로 HTTP 요청을 만들고 싶습니다 (모든 항목은 HTTP 요청의 매개 변수로 필요함). for
루프를 사용 하면 매우 쉽게 할 수 있지만, Java 8 애호가로서 Java 8의 Stream 프레임 워크로 이것을 작성해보고 싶습니다 (그리고 지연 처리의 이점을 누리고 있습니다).
예:
List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
batch.add(data.get(i));
if (batch.size() == BATCH_SIZE) process(batch);
}
if (batch.size() > 0) process(batch);
나는 긴 줄을하고 싶다.
lazyFileStream.group(500).map(processBatch).collect(toList())
이를 수행하는 가장 좋은 방법은 무엇입니까?
답변
노트! 이 솔루션은 forEach를 실행하기 전에 전체 파일을 읽습니다.
단일 스레드, 순차 스트림 사용 사례를 위해 Java 8 스트림을 확장하는 라이브러리 인 jOOλ 를 사용하여 수행 할 수 있습니다.
Seq.seq(lazyFileStream) // Seq<String>
.zipWithIndex() // Seq<Tuple2<String, Long>>
.groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
.forEach((index, batch) -> {
process(batch);
});
이면 zipWithIndex()
은 다음과 같습니다.
static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
final Iterator<T> it = stream.iterator();
class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
long index;
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public Tuple2<T, Long> next() {
return tuple(it.next(), index++);
}
}
return seq(new ZipWithIndex());
}
…하지만 groupBy()
API 편의는 다음과 같습니다.
default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
return collect(Collectors.groupingBy(classifier));
}
(면책 조항 : 나는 jOOλ 뒤에있는 회사에서 일합니다)
답변
완전성을 위해 여기에 Guava 솔루션이 있습니다.
Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);
질문에서 컬렉션을 사용할 수 있으므로 스트림이 필요하지 않으며 다음과 같이 쓸 수 있습니다.
Iterables.partition(data, batchSize).forEach(this::process);
답변
순수한 Java-8 구현도 가능합니다.
int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
.mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
.forEach(batch -> process(batch));
JOOl과 달리 병렬로 잘 작동 할 수 있습니다 ( data
임의 액세스 목록 인 경우).
답변
순수 Java 8 솔루션 :
이를 우아하게 수행하기 위해 사용자 지정 수집기를 만들 수 있으며, 각 배치를 처리 batch size
하기 Consumer
위해 a 와 a 를 사용할 수 있습니다.
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;
import static java.util.Objects.requireNonNull;
/**
* Collects elements in the stream and calls the supplied batch processor
* after the configured batch size is reached.
*
* In case of a parallel stream, the batch processor may be called with
* elements less than the batch size.
*
* The elements are not kept in memory, and the final result will be an
* empty list.
*
* @param <T> Type of the elements being collected
*/
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {
private final int batchSize;
private final Consumer<List<T>> batchProcessor;
/**
* Constructs the batch collector
*
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
*/
BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
batchProcessor = requireNonNull(batchProcessor);
this.batchSize = batchSize;
this.batchProcessor = batchProcessor;
}
public Supplier<List<T>> supplier() {
return ArrayList::new;
}
public BiConsumer<List<T>, T> accumulator() {
return (ts, t) -> {
ts.add(t);
if (ts.size() >= batchSize) {
batchProcessor.accept(ts);
ts.clear();
}
};
}
public BinaryOperator<List<T>> combiner() {
return (ts, ots) -> {
// process each parallel list without checking for batch size
// avoids adding all elements of one to another
// can be modified if a strict batching mode is required
batchProcessor.accept(ts);
batchProcessor.accept(ots);
return Collections.emptyList();
};
}
public Function<List<T>, List<T>> finisher() {
return ts -> {
batchProcessor.accept(ts);
return Collections.emptyList();
};
}
public Set<Characteristics> characteristics() {
return Collections.emptySet();
}
}
선택적으로 도우미 유틸리티 클래스를 만듭니다.
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;
public class StreamUtils {
/**
* Creates a new batch collector
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
* @param <T> the type of elements being processed
* @return a batch collector instance
*/
public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
return new BatchCollector<T>(batchSize, batchProcessor);
}
}
사용 예 :
List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();
int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);
input.stream()
.collect(StreamUtils.batchCollector(batchSize, batchProcessor));
누군가가 살펴보고 싶다면 GitHub에도 내 코드를 게시했습니다.
답변
이와 같은 시나리오를 위해 사용자 지정 Spliterator를 작성했습니다. 입력 스트림에서 주어진 크기의 목록을 채 웁니다. 이 접근 방식의 장점은 지연 처리를 수행하고 다른 스트림 기능과 함께 작동한다는 것입니다.
public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) {
return batchSize <= 0
? Stream.of(stream.collect(Collectors.toList()))
: StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel());
}
private static class BatchSpliterator<E> implements Spliterator<List<E>> {
private final Spliterator<E> base;
private final int batchSize;
public BatchSpliterator(Spliterator<E> base, int batchSize) {
this.base = base;
this.batchSize = batchSize;
}
@Override
public boolean tryAdvance(Consumer<? super List<E>> action) {
final List<E> batch = new ArrayList<>(batchSize);
for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++)
;
if (batch.isEmpty())
return false;
action.accept(batch);
return true;
}
@Override
public Spliterator<List<E>> trySplit() {
if (base.estimateSize() <= batchSize)
return null;
final Spliterator<E> splitBase = this.base.trySplit();
return splitBase == null ? null
: new BatchSpliterator<>(splitBase, batchSize);
}
@Override
public long estimateSize() {
final double baseSize = base.estimateSize();
return baseSize == 0 ? 0
: (long) Math.ceil(baseSize / (double) batchSize);
}
@Override
public int characteristics() {
return base.characteristics();
}
}
답변
우리는 해결해야 할 비슷한 문제가있었습니다. 우리는 시스템 메모리보다 큰 스트림 (데이터베이스의 모든 개체를 반복)을 가져 와서 가능한 한 최선의 순서를 무작위로 지정하려고했습니다. 10,000 개의 항목을 버퍼링하고 무작위로 지정하는 것이 좋습니다.
대상은 스트림을받는 함수였습니다.
여기에 제안 된 솔루션 중 다양한 옵션이있는 것 같습니다.
- 다양한 비 Java 8 추가 라이브러리 사용
- 스트림이 아닌 것으로 시작-예 : 임의 액세스 목록
- 분할기에서 쉽게 분할 할 수있는 스트림이 있습니다.
우리의 본능은 원래 커스텀 컬렉터를 사용하는 것이었지만 이것은 스트리밍을 중단하는 것을 의미했습니다. 위의 사용자 지정 수집기 솔루션은 매우 훌륭하며 거의 사용했습니다.
다음은 Stream
s가 탈출구Iterator
로 사용할 수 있다는 사실을 사용하여 속임수를 쓰는 솔루션입니다 . 는 자바 8의 또 다른 비트를 사용하여 스트림으로 변환 돌아 마법을.Iterator
StreamSupport
/**
* An iterator which returns batches of items taken from another iterator
*/
public class BatchingIterator<T> implements Iterator<List<T>> {
/**
* Given a stream, convert it to a stream of batches no greater than the
* batchSize.
* @param originalStream to convert
* @param batchSize maximum size of a batch
* @param <T> type of items in the stream
* @return a stream of batches taken sequentially from the original stream
*/
public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
}
private static <T> Stream<T> asStream(Iterator<T> iterator) {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator,ORDERED),
false);
}
private int batchSize;
private List<T> currentBatch;
private Iterator<T> sourceIterator;
public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
this.batchSize = batchSize;
this.sourceIterator = sourceIterator;
}
@Override
public boolean hasNext() {
prepareNextBatch();
return currentBatch!=null && !currentBatch.isEmpty();
}
@Override
public List<T> next() {
return currentBatch;
}
private void prepareNextBatch() {
currentBatch = new ArrayList<>(batchSize);
while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
currentBatch.add(sourceIterator.next());
}
}
}
이것을 사용하는 간단한 예는 다음과 같습니다.
@Test
public void getsBatches() {
BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
.forEach(System.out::println);
}
위의 인쇄
[A, B, C]
[D, E, F]
사용 사례에서는 배치를 섞은 다음 스트림으로 유지하려고했습니다. 다음과 같이 보입니다.
@Test
public void howScramblingCouldBeDone() {
BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
// the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
.map(list -> {
Collections.shuffle(list); return list; })
.flatMap(List::stream)
.forEach(System.out::println);
}
이것은 다음과 같은 것을 출력합니다 (무작위 화되어 매번 다릅니다)
A
C
B
E
D
F
여기서 비밀 소스는 항상 스트림이 있기 때문에 배치 스트림에서 작업하거나 각 배치에 대해 작업을 수행 한 다음 flatMap
스트림으로 다시 돌아갈 수 있다는 것입니다. 더 나은, 위의 유일한 모두 최종으로 실행 forEach
하거나 collect
또는 다른 종단 표현 PULL 스트림을 통해 데이터를.
그것은 스트림에 iterator
대한 특수한 유형의 종료 작업 이며 전체 스트림이 실행되고 메모리에 들어오지 않게하는 것으로 밝혀졌습니다 ! 멋진 디자인에 대한 Java 8 녀석들에게 감사합니다!
답변
RxJava를 사용할 수도 있습니다 .
Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));
또는
Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();
또는
Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();