[python] 좋은 속도 제한 알고리즘은 무엇입니까?

의사 코드 또는 더 나은 Python을 사용할 수 있습니다. Python IRC 봇에 대해 속도 제한 대기열을 구현하려고하는데 부분적으로 작동하지만 누군가가 제한보다 적은 메시지를 트리거하면 (예 : 속도 제한은 8 초당 메시지 5 개, 사람은 4 개만 트리거 함), 다음 트리거가 8 초 이상 (예 : 16 초 후)이면 봇이 메시지를 보내지 만 8 초 기간이 경과했기 때문에 필요하지 않더라도 대기열이 가득 차고 봇은 8 초를 기다립니다.



답변

여기에 간단한 알고리즘은 그들이 너무 빨리 도착하면, 당신이 원하는 경우 단지 메시지를 삭제하려면 (대신 큐가 임의의 큰 얻을 수 있기 때문에 의미가있는, 그들 큐잉) :

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    discard_message();
  else:
    forward_message();
    allowance -= 1.0;

이 솔루션에는 데이터 구조, 타이머 등이 ​​없으며 깔끔하게 작동합니다. :)이를 확인하기 위해 ‘허용’은 최대 초당 5/8 단위, 즉 8 초당 최대 5 단위의 속도로 증가합니다. 전달되는 모든 메시지는 하나의 단위를 차감하므로 8 초마다 5 개 이상의 메시지를 보낼 수 없습니다.

참고 rate비 소수점 아래 부분없이, 즉 정수를해야한다, 또는 알고리즘 (실제 속도는되지 않습니다 제대로 작동하지 않습니다 rate/per). 예를 들어 1.0으로 성장 rate=0.5; per=1.0;하지 않기 때문에 작동하지 않습니다 allowance. 그러나 rate=1.0; per=2.0;잘 작동합니다.


답변

큐에 넣는 함수 전에이 데코레이터 @RateLimited (ratepersec)를 사용하십시오.

기본적으로 마지막 시간 이후 1 / rate 초가 경과했는지 확인하고 그렇지 않은 경우 나머지 시간 동안 대기하고, 그렇지 않으면 대기하지 않습니다. 이렇게하면 속도 / 초로 효과적으로 제한됩니다. 데코레이터는 속도 제한을 원하는 모든 기능에 적용 할 수 있습니다.

귀하의 경우 8 초당 최대 5 개의 메시지를 원하면 sendToQueue 함수 전에 @RateLimited (0.625)를 사용하십시오.

import time

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
    print num

if __name__ == "__main__":
    print "This should print 1,2,3... at about 2 per second."
    for i in range(1,100):
        PrintNumber(i)


답변

토큰 버킷은 구현하기가 매우 간단합니다.

5 개의 토큰이있는 버킷으로 시작합니다.

5/8 초마다 : 버킷에 5 개 미만의 토큰이있는 경우 하나를 추가합니다.

메시지를 보낼 때마다 : 버킷에 1 개 이상의 토큰이있는 경우 하나의 토큰을 꺼내서 메시지를 보냅니다. 그렇지 않으면 메시지를 기다리거나 삭제하십시오.

(분명히 실제 코드에서는 실제 토큰 대신 정수 카운터를 사용하고 타임 스탬프를 저장하여 매 5/8 단계를 최적화 할 수 있습니다)


질문을 다시 읽고 속도 제한이 8 초마다 완전히 재설정되면 다음 수정 사항이 있습니다.

last_send오래 전의 타임 스탬프로 시작합니다 (예 : 에포크). 또한 동일한 5 개 토큰 버킷으로 시작하십시오.

매 5/8 초 규칙을 따르십시오.

메시지를 보낼 때마다 : 먼저 last_send≥ 8 초 전에 확인하십시오 . 그렇다면 버킷을 채우십시오 (토큰 5 개로 설정). 둘째, 버킷에 토큰이 있으면 메시지를 보냅니다 (그렇지 않으면 drop / wait / etc.). 셋째, last_send지금으로 설정 합니다.

해당 시나리오에서 작동합니다.


저는 실제로 이와 같은 전략을 사용하여 IRC 봇을 작성했습니다 (첫 번째 접근 방식). Python이 아닌 Perl에 있지만 설명 할 코드는 다음과 같습니다.

여기서 첫 번째 부분은 버킷에 토큰 추가를 처리합니다. 시간 (마지막 줄에서 두 번째 줄)에 따라 토큰을 추가하는 최적화를 볼 수 있으며 마지막 줄은 버킷 내용을 최대 (MESSAGE_BURST)로 고정합니다.

    my $start_time = time;
    ...
    # Bucket handling
    my $bucket = $conn->{fujiko_limit_bucket};
    my $lasttx = $conn->{fujiko_limit_lasttx};
    $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
    ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$ conn은 전달되는 데이터 구조입니다. 이것은 일상적으로 실행되는 메서드 내부에 있습니다 (다음에 할 일이있을 때 계산하고 그 시간 동안 또는 네트워크 트래픽을 얻을 때까지 잠자기 상태를 유지합니다). 메서드의 다음 부분은 전송을 처리합니다. 메시지에는 관련된 우선 순위가 있기 때문에 다소 복잡합니다.

    # Queue handling. Start with the ultimate queue.
    my $queues = $conn->{fujiko_queues};
    foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
            # Ultimate is special. We run ultimate no matter what. Even if
            # it sends the bucket negative.
            --$bucket;
            $entry->{code}(@{$entry->{args}});
    }
    $queues->[PRIORITY_ULTIMATE] = [];

그것이 무엇이든 실행되는 첫 번째 대기열입니다. 홍수로 인해 연결이 끊어 지더라도. 서버의 PING에 응답하는 것과 같이 매우 중요한 작업에 사용됩니다. 다음으로 나머지 대기열 :

    # Continue to the other queues, in order of priority.
    QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
            my $queue = $queues->[$pri];
            while (scalar(@$queue)) {
                    if ($bucket < 1) {
                            # continue later.
                            $need_more_time = 1;
                            last QRUN;
                    } else {
                            --$bucket;
                            my $entry = shift @$queue;
                            $entry->{code}(@{$entry->{args}});
                    }
            }
    }

마지막으로 버킷 상태는 $ conn 데이터 구조에 다시 저장됩니다 (실제로는 메서드에서 조금 뒤, 먼저 더 많은 작업을 수행 할 시간을 계산합니다).

    # Save status.
    $conn->{fujiko_limit_bucket} = $bucket;
    $conn->{fujiko_limit_lasttx} = $start_time;

보시다시피 실제 버킷 처리 코드는 약 4 줄로 매우 작습니다. 나머지 코드는 우선 순위 대기열 처리입니다. 봇에는 우선 순위 대기열이 있으므로, 누군가와 채팅을하여 중요한 킥 / 금지 임무를 수행하는 것을 막을 수 없습니다.


답변

메시지가 전송 될 때까지 처리를 차단하여 추가 메시지를 대기열에 추가하려면 antti의 아름다운 솔루션을 다음과 같이 수정할 수도 있습니다.

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    time.sleep( (1-allowance) * (per/rate))
    forward_message();
    allowance = 0.0;
  else:
    forward_message();
    allowance -= 1.0;

메시지를 보낼 수있는 충분한 여유가있을 때까지 기다립니다. 비율의 두 배로 시작하지 않으려면 허용량도 0으로 초기화 할 수 있습니다.


답변

마지막 5 개 라인이 전송 된 시간을 유지하십시오. 5 번째 가장 최근 메시지 (존재하는 경우)가 과거에 최소 8 초 (last_five를 시간 배열로 사용)가 될 때까지 대기중인 메시지를 보관합니다.

now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
    last_five.insert(0, now)
    send_message(msg)
if len(last_five) > 5:
    last_five.pop()


답변

한 가지 해결책은 각 대기열 항목에 타임 스탬프를 첨부하고 8 초가 지난 후 항목을 삭제하는 것입니다. 대기열이 추가 될 때마다이 검사를 수행 할 수 있습니다.

이것은 대기열 크기를 5로 제한하고 대기열이 가득 찬 동안 추가 항목을 삭제하는 경우에만 작동합니다.


답변

여전히 관심이있는 사람이 있다면이 간단한 호출 가능 클래스를 시간 제한 LRU 키 값 저장소와 함께 사용하여 IP 당 요청 속도를 제한합니다. deque를 사용하지만 대신 목록과 함께 사용하도록 다시 작성할 수 있습니다.

from collections import deque
import time


class RateLimiter:
    def __init__(self, maxRate=5, timeUnit=1):
        self.timeUnit = timeUnit
        self.deque = deque(maxlen=maxRate)

    def __call__(self):
        if self.deque.maxlen == len(self.deque):
            cTime = time.time()
            if cTime - self.deque[0] > self.timeUnit:
                self.deque.append(cTime)
                return False
            else:
                return True
        self.deque.append(time.time())
        return False

r = RateLimiter()
for i in range(0,100):
    time.sleep(0.1)
    print(i, "block" if r() else "pass")