[java] 일괄 처리 기능이있는 Java 8 Stream

항목 목록이 포함 된 큰 파일이 있습니다.

항목의 일괄 처리를 만들고이 일괄 처리로 HTTP 요청을 만들고 싶습니다 (모든 항목은 HTTP 요청의 매개 변수로 필요함). for루프를 사용 하면 매우 쉽게 할 수 있지만, Java 8 애호가로서 Java 8의 Stream 프레임 워크로 이것을 작성해보고 싶습니다 (그리고 지연 처리의 이점을 누리고 있습니다).

예:

List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
  batch.add(data.get(i));
  if (batch.size() == BATCH_SIZE) process(batch);
}

if (batch.size() > 0) process(batch);

나는 긴 줄을하고 싶다.
lazyFileStream.group(500).map(processBatch).collect(toList())

이를 수행하는 가장 좋은 방법은 무엇입니까?



답변

노트! 이 솔루션은 forEach를 실행하기 전에 전체 파일을 읽습니다.

단일 스레드, 순차 스트림 사용 사례를 위해 Java 8 스트림을 확장하는 라이브러리 인 jOOλ 를 사용하여 수행 할 수 있습니다.

Seq.seq(lazyFileStream)              // Seq<String>
   .zipWithIndex()                   // Seq<Tuple2<String, Long>>
   .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
   .forEach((index, batch) -> {
       process(batch);
   });

이면 zipWithIndex()은 다음과 같습니다.

static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
    final Iterator<T> it = stream.iterator();

    class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
        long index;

        @Override
        public boolean hasNext() {
            return it.hasNext();
        }

        @Override
        public Tuple2<T, Long> next() {
            return tuple(it.next(), index++);
        }
    }

    return seq(new ZipWithIndex());
}

…하지만 groupBy()API 편의는 다음과 같습니다.

default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
    return collect(Collectors.groupingBy(classifier));
}

(면책 조항 : 나는 jOOλ 뒤에있는 회사에서 일합니다)


답변

완전성을 위해 여기에 Guava 솔루션이 있습니다.

Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);

질문에서 컬렉션을 사용할 수 있으므로 스트림이 필요하지 않으며 다음과 같이 쓸 수 있습니다.

Iterables.partition(data, batchSize).forEach(this::process);


답변

순수한 Java-8 구현도 가능합니다.

int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
         .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
         .forEach(batch -> process(batch));

JOOl과 달리 병렬로 잘 작동 할 수 있습니다 ( data임의 액세스 목록 인 경우).


답변

순수 Java 8 솔루션 :

이를 우아하게 수행하기 위해 사용자 지정 수집기를 만들 수 있으며, 각 배치를 처리 batch size하기 Consumer위해 a 와 a 를 사용할 수 있습니다.

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;

import static java.util.Objects.requireNonNull;


/**
 * Collects elements in the stream and calls the supplied batch processor
 * after the configured batch size is reached.
 *
 * In case of a parallel stream, the batch processor may be called with
 * elements less than the batch size.
 *
 * The elements are not kept in memory, and the final result will be an
 * empty list.
 *
 * @param <T> Type of the elements being collected
 */
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {

    private final int batchSize;
    private final Consumer<List<T>> batchProcessor;


    /**
     * Constructs the batch collector
     *
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     */
    BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        batchProcessor = requireNonNull(batchProcessor);

        this.batchSize = batchSize;
        this.batchProcessor = batchProcessor;
    }

    public Supplier<List<T>> supplier() {
        return ArrayList::new;
    }

    public BiConsumer<List<T>, T> accumulator() {
        return (ts, t) -> {
            ts.add(t);
            if (ts.size() >= batchSize) {
                batchProcessor.accept(ts);
                ts.clear();
            }
        };
    }

    public BinaryOperator<List<T>> combiner() {
        return (ts, ots) -> {
            // process each parallel list without checking for batch size
            // avoids adding all elements of one to another
            // can be modified if a strict batching mode is required
            batchProcessor.accept(ts);
            batchProcessor.accept(ots);
            return Collections.emptyList();
        };
    }

    public Function<List<T>, List<T>> finisher() {
        return ts -> {
            batchProcessor.accept(ts);
            return Collections.emptyList();
        };
    }

    public Set<Characteristics> characteristics() {
        return Collections.emptySet();
    }
}

선택적으로 도우미 유틸리티 클래스를 만듭니다.

import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;

public class StreamUtils {

    /**
     * Creates a new batch collector
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     * @param <T> the type of elements being processed
     * @return a batch collector instance
     */
    public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        return new BatchCollector<T>(batchSize, batchProcessor);
    }
}

사용 예 :

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();

int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);

input.stream()
     .collect(StreamUtils.batchCollector(batchSize, batchProcessor));

누군가가 살펴보고 싶다면 GitHub에도 내 코드를 게시했습니다.

Github에 연결


답변

이와 같은 시나리오를 위해 사용자 지정 Spliterator를 작성했습니다. 입력 스트림에서 주어진 크기의 목록을 채 웁니다. 이 접근 방식의 장점은 지연 처리를 수행하고 다른 스트림 기능과 함께 작동한다는 것입니다.

public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) {
    return batchSize <= 0
        ? Stream.of(stream.collect(Collectors.toList()))
        : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel());
}

private static class BatchSpliterator<E> implements Spliterator<List<E>> {

    private final Spliterator<E> base;
    private final int batchSize;

    public BatchSpliterator(Spliterator<E> base, int batchSize) {
        this.base = base;
        this.batchSize = batchSize;
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<E>> action) {
        final List<E> batch = new ArrayList<>(batchSize);
        for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++)
            ;
        if (batch.isEmpty())
            return false;
        action.accept(batch);
        return true;
    }

    @Override
    public Spliterator<List<E>> trySplit() {
        if (base.estimateSize() <= batchSize)
            return null;
        final Spliterator<E> splitBase = this.base.trySplit();
        return splitBase == null ? null
                : new BatchSpliterator<>(splitBase, batchSize);
    }

    @Override
    public long estimateSize() {
        final double baseSize = base.estimateSize();
        return baseSize == 0 ? 0
                : (long) Math.ceil(baseSize / (double) batchSize);
    }

    @Override
    public int characteristics() {
        return base.characteristics();
    }

}


답변

우리는 해결해야 할 비슷한 문제가있었습니다. 우리는 시스템 메모리보다 큰 스트림 (데이터베이스의 모든 개체를 반복)을 가져 와서 가능한 한 최선의 순서를 무작위로 지정하려고했습니다. 10,000 개의 항목을 버퍼링하고 무작위로 지정하는 것이 좋습니다.

대상은 스트림을받는 함수였습니다.

여기에 제안 된 솔루션 중 다양한 옵션이있는 것 같습니다.

  • 다양한 비 Java 8 추가 라이브러리 사용
  • 스트림이 아닌 것으로 시작-예 : 임의 액세스 목록
  • 분할기에서 쉽게 분할 할 수있는 스트림이 있습니다.

우리의 본능은 원래 커스텀 컬렉터를 사용하는 것이었지만 이것은 스트리밍을 중단하는 것을 의미했습니다. 위의 사용자 지정 수집기 솔루션은 매우 훌륭하며 거의 사용했습니다.

다음은 Streams가 탈출구Iterator 로 사용할 수 있다는 사실을 사용하여 속임수를 쓰는 솔루션입니다 . 는 자바 8의 또 다른 비트를 사용하여 스트림으로 변환 돌아 마법을.IteratorStreamSupport

/**
 * An iterator which returns batches of items taken from another iterator
 */
public class BatchingIterator<T> implements Iterator<List<T>> {
    /**
     * Given a stream, convert it to a stream of batches no greater than the
     * batchSize.
     * @param originalStream to convert
     * @param batchSize maximum size of a batch
     * @param <T> type of items in the stream
     * @return a stream of batches taken sequentially from the original stream
     */
    public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
        return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
    }

    private static <T> Stream<T> asStream(Iterator<T> iterator) {
        return StreamSupport.stream(
            Spliterators.spliteratorUnknownSize(iterator,ORDERED),
            false);
    }

    private int batchSize;
    private List<T> currentBatch;
    private Iterator<T> sourceIterator;

    public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
        this.batchSize = batchSize;
        this.sourceIterator = sourceIterator;
    }

    @Override
    public boolean hasNext() {
        prepareNextBatch();
        return currentBatch!=null && !currentBatch.isEmpty();
    }

    @Override
    public List<T> next() {
        return currentBatch;
    }

    private void prepareNextBatch() {
        currentBatch = new ArrayList<>(batchSize);
        while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
            currentBatch.add(sourceIterator.next());
        }
    }
}

이것을 사용하는 간단한 예는 다음과 같습니다.

@Test
public void getsBatches() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        .forEach(System.out::println);
}

위의 인쇄

[A, B, C]
[D, E, F]

사용 사례에서는 배치를 섞은 다음 스트림으로 유지하려고했습니다. 다음과 같이 보입니다.

@Test
public void howScramblingCouldBeDone() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
        .map(list -> {
            Collections.shuffle(list); return list; })
        .flatMap(List::stream)
        .forEach(System.out::println);
}

이것은 다음과 같은 것을 출력합니다 (무작위 화되어 매번 다릅니다)

A
C
B
E
D
F

여기서 비밀 소스는 항상 스트림이 있기 때문에 배치 스트림에서 작업하거나 각 배치에 대해 작업을 수행 한 다음 flatMap스트림으로 다시 돌아갈 수 있다는 것입니다. 더 나은, 위의 유일한 모두 최종으로 실행 forEach하거나 collect또는 다른 종단 표현 PULL 스트림을 통해 데이터를.

그것은 스트림에 iterator대한 특수한 유형의 종료 작업 이며 전체 스트림이 실행되고 메모리에 들어오지 않게하는 것으로 밝혀졌습니다 ! 멋진 디자인에 대한 Java 8 녀석들에게 감사합니다!


답변

RxJava를 사용할 수도 있습니다 .

Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));

또는

Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();

또는

Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();