[unix] 표준 프로세스를 병렬 프로세스로 확산
stdin에서 파일 목록을 처리하는 작업이 있습니다. 프로그램의 시작 시간은 상당하며 각 파일에 걸리는 시간은 매우 다양합니다. 나는 많은 수의 프로세스를 생성하고 바쁘지 않은 프로세스를 파견하고 싶다. 내가 원하는 것을 거의 수행하는 몇 가지 명령 줄 도구가 있습니다. 거의 두 가지 작업 옵션으로 좁혔습니다.
find . -type f | split -n r/24 -u --filter="myjob"
find . -type f | parallel --pipe -u -l 1 myjob
문제는 split
순수한 라운드 로빈을 수행하므로 프로세스 중 하나가 뒤쳐지고 남아있어 전체 작업의 완료가 지연 된다는 것입니다. 반면 parallel
욕구는 N 라인 당 하나 개의 프로세스를 생성 또는 바이트의 입력을 내가 시작 오버 헤드 방법에게 너무 많은 시간을 보내는 바람합니다.
프로세스와 피드 라인을 재사용하여 차단되지 않은 stdin을 가진 프로세스에 재사용 할 수있는 이와 같은 것이 있습니까?
답변
그런 일반적인 경우에는 불가능합니다. 각 프로세스에 대한 버퍼가 있음을 의미하며 외부에서 버퍼를보고 다음 항목을 넣을 위치를 결정할 수 있습니다 (스케줄링) … 물론 무언가를 쓸 수 있습니다 (또는 slurm과 같은 배치 시스템을 사용하십시오)
그러나 프로세스가 무엇인지에 따라 입력을 사전 처리 할 수 있습니다. 예를 들어 파일을 다운로드하거나 DB에서 유사한 항목을 업데이트하려고하지만 그중 50 %가 건너 뛴 경우 (입력에 따라 처리 차이가 커짐) 전처리기를 설정하면됩니다. 어떤 항목이 오래 걸릴지 (파일 존재, 데이터 변경 등) 확인하므로 다른 쪽에서 오는 것은 상당히 동일한 시간이 소요됩니다. 휴리스틱이 완벽하지 않더라도 상당히 개선 될 수 있습니다. 다른 파일을 파일로 덤프 한 후 동일한 방식으로 처리 할 수 있습니다.
그러나 사용 사례에 따라 다릅니다.
답변
아니요, 일반적인 해결책은 없습니다. 귀하의 디스패처는 각 프로그램이 다른 줄을 읽을 준비가되었는지 알아야하며,이를 인식하는 표준이 없습니다. 당신이 할 수있는 것은 STDOUT에 줄을두고 그것을 소비하기를 기다리는 것입니다. 파이프 라인에서 생산자가 다음 소비자가 준비되었는지 여부를 알 수있는 좋은 방법은 없습니다.
답변
나는 그렇게 생각하지 않습니다. 내가 좋아하는 잡지에는 한 번 배쉬 프로그래밍에 관한 기사가있었습니다. 할 수있는 도구가 있다면 그것들을 언급했을 것입니다. 따라서 다음 라인을 따라 무언가를 원합니다.
set -m # enable job control
max_processes=8
concurrent_processes=0
child_has_ended() { concurrent_processes=$((concurrent_processes - 1)) }
trap child_has_ended SIGCHLD # that's magic calling our bash function when a child processes ends
for i in $(find . -type f)
do
# don't do anything while there are max_processes running
while [ ${concurrent_processes} -ge ${max_processes}]; do sleep 0.5; done
# increase the counter
concurrent_processes=$((concurrent_processes + 1))
# start a child process to actually deal with one file
/path/to/script/to/handle/one/file $i &
done
분명히 실제 작업 스크립트에 대한 호출을 원하는대로 변경할 수 있습니다. 내가 언급 한 잡지는 처음에는 파이프 설정 및 실제로 작업자 스레드 시작과 같은 작업을 수행합니다. 확인하십시오 mkfifo
. 그러나 작업자 프로세스가 마스터 프로세스에 더 많은 데이터를 수신 할 준비가되었음을 신호로 알려야하므로 경로는 훨씬 더 복잡합니다. 따라서 각 작업자 프로세스마다 데이터를 보내려면 하나의 fifo가 필요하고 마스터 프로세스가 작업자로부터 물건을 받으려면 하나의 fifo가 필요합니다.
기권
나는 내 머리 꼭대기에서 그 대본을 썼습니다. 구문 문제가있을 수 있습니다.
답변
GNU Parallel의 경우 –block을 사용하여 블록 크기를 설정할 수 있습니다. 그러나 실행중인 각 프로세스에 대해 1 블록의 메모리를 유지하기에 충분한 메모리가 필요합니다.
나는 이것이 당신이 찾고있는 것이 아니라는 것을 이해하지만, 지금은 수용 가능한 해결책 일 수 있습니다.
평균적인 작업 시간이 같은 경우 mbuffer를 사용할 수 있습니다.
find . -type f | split -n r/24 -u --filter="mbuffer -m 2G | myjob"
답변
이 시도:
mkfifo
각 프로세스마다.
그런 다음 tail -f | myjob
각 fifo에 매 달리십시오.
예를 들어 작업자 설정 (myjob 프로세스)
mkdir /tmp/jobs
for X in 1 2 3 4
do
mkfifo pipe$X
tail -f pipe$X | myjob &
jobs -l| awk '/pipe'$X'/ {print $2, "'pipe$X'"}' >> pipe-job-mapping
done
응용 프로그램 (myjob)에 따라 작업 -s를 사용하여 중지 된 작업을 찾을 수 있습니다. 그렇지 않으면 CPU별로 정렬 된 프로세스를 나열하고 가장 적은 리소스를 소비하는 프로세스를 선택하십시오. 더 많은 작업을 원할 때 파일 시스템에 플래그를 설정하여 작업보고 자체를 수행해야합니다.
입력을 기다릴 때 작업이 중지되었다고 가정하면
jobs -sl
예를 들어 중지 된 작업의 pid를 찾아서 작업을 지정하기 위해
grep "^$STOPPED_PID" pipe-to-job-mapping | while read PID PIPE
do
cat workset > $PIPE
done
나는 이것을 테스트했다.
garfield:~$ cd /tmp
garfield:/tmp$ mkfifo f1
garfield:/tmp$ mkfifo f2
garfield:/tmp$ tail -f f1 | sed 's/^/1 /' &
[1] 21056
garfield:/tmp$ tail -f f2 | sed 's/^/2 /' &
[2] 21058
garfield:/tmp$ echo hello > f1
1 hello
garfield:/tmp$ echo what > f2
2 what
garfield:/tmp$ echo yes > f1
1 yes
인정해야 할 것은 바로 ymmv입니다.
답변
이것을 해결하기 위해 실제로 필요한 것은 어떤 유형의 큐 메커니즘입니다.
작업이 SYSV 메시지 대기열과 같은 대기열에서 입력을 읽은 다음 병렬로 프로그램을 실행하여 값을 대기열로 푸시하는 것이 가능합니까?
또 다른 가능성은 다음과 같이 큐에 디렉토리를 사용하는 것입니다.
- find 출력은 각 파일에 대한 심볼릭 링크를 만들어 디렉토리에서 처리합니다.
pending
- 각 작업 프로세스는 수행
mv
이의 형제 디렉토리에 디렉토리에 보는 첫 번째 파일의pending
이름을inprogress
. - 작업이 파일을 성공적으로 이동하면 처리가 수행됩니다. 그렇지 않으면 다시 다른 파일 이름을 찾아 이동합니다.
pending
답변
@ash의 답변에 설명하면 SYSV 메시지 대기열을 사용하여 작업을 배포 할 수 있습니다. C로 자신의 프로그램을 작성하지 않으려면 ipcmd
도움이 될 수 있는 유틸리티 가 있습니다. 여기에 내가의 출력을 통과하기 위해 함께 넣어 무엇 find $DIRECTORY -type f
에 대한 $PARALLEL
프로세스의 수를 :
set -o errexit
set -o nounset
export IPCMD_MSQID=$(ipcmd msgget)
DIRECTORY=$1
PARALLEL=$2
# clean up message queue on exit
trap 'ipcrm -q $IPCMD_MSQID' EXIT
for i in $(seq $PARALLEL); do
{
while true
do
message=$(ipcmd msgrcv) || exit
[ -f $message ] || break
sleep $((RANDOM/3000))
done
} &
done
find "$DIRECTORY" -type f | xargs ipcmd msgsnd
for i in $(seq $PARALLEL); do
ipcmd msgsnd "/dev/null/bar"
done
wait
테스트 실행은 다음과 같습니다.
$ for i in $(seq 20 10 100) ; do time parallel.sh /usr/lib/ $i ; done
parallel.sh /usr/lib/ $i 0.30s user 0.67s system 0% cpu 1:57.23 total
parallel.sh /usr/lib/ $i 0.28s user 0.69s system 1% cpu 1:09.58 total
parallel.sh /usr/lib/ $i 0.19s user 0.80s system 1% cpu 1:05.29 total
parallel.sh /usr/lib/ $i 0.29s user 0.73s system 2% cpu 44.417 total
parallel.sh /usr/lib/ $i 0.25s user 0.80s system 2% cpu 37.353 total
parallel.sh /usr/lib/ $i 0.21s user 0.85s system 3% cpu 32.354 total
parallel.sh /usr/lib/ $i 0.30s user 0.82s system 3% cpu 28.542 total
parallel.sh /usr/lib/ $i 0.27s user 0.88s system 3% cpu 30.219 total
parallel.sh /usr/lib/ $i 0.34s user 0.84s system 4% cpu 26.535 total