본문 바로가기

알고리즘/기타

Rate Limiting Algorithm (Leaky Bucket)

실제 많은 서비스에서 인프라의 availability를 위해 단위 시간안에 수많은 요청이 들어오는 걸 방지하는 로직은 필수적이다.

 

예를 들면, 카카오톡으로 메시지를 보낼 때, 순간적으로 많은 메시지를 보내면 나오는 경고문이나, 하루에 1000건 이상 request를 받지 않는 서비스 등 다양한 방법에서 사용한다.

 

이는 인프라의 availability에 직접적인 관여를 하는 클라우드 환경에서도 필수적인 요소인데, 대게는 네트워크 트래픽을 컨트롤 하는 데 있어서 많은 연관을 가지고 있다.

 

이러한 Rate Limiting Algorithm에는 여러가지 기법들이 존재하지만 이번에는 leaky bucket 알고리즘을 다룰 것이다.

 

Leaky Bucket Algorithm

leaky bucket 알고리즘은 물 새는 바가지를 연상시키는 이름의 알고리즘 답게 bucket이 차면 설령 그것이 흘러넘쳐 overflow가 발생하더라도 너무 많은 물을 떨어지지 않게 하는 것이 중요하다. 구멍의 갯수는 traffic speed라고 생각하면 되고 bucket size는 overflow로 drop이 될 때까지 traffic을 queueing 할 수 있는 크기라고 생각하면 된다.

 

  •  구멍을 통해 물이 나가는 것은 request가 성립이 되는 것
  •  흘러넘치는 물은 request가 버려지는 것

이 leaky bucket에서는 두가지 속성을 정의해야 한다.

  • Rate: time unit당 처리할 수 있는 이상적인 request의 양
    • 정의한 rate bound보다 많은 request가 처리되면 안된다.
  • Capacity: 동시에 가지고 있을 수 있는 request의 양
    • 정의한 capacity이상의 request는 drop된다.

이 leaky bucket 알고리즘을 이용하여 grpc stream interceptor에서 일정 수준이상 request가 들어왔을 때, 연결이 끊기는 예제는 다음과 같다.

 

package ratelimit

import (
	"context"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"log"
	"sync"
	"sync/atomic"
	"time"
)

const (
	bucketRate = time.Second
	bucketCap = 5
)

var bucketInstance *bucket
var once sync.Once

type bucket struct {
	capacity uint64
	status   uint64
}

func getBucketInstance() *bucket{
	once.Do(
		func() {
			bucketInstance = &bucket{capacity: bucketCap, status: bucketCap}
			go func() {
				t := time.NewTicker(bucketRate)
				for {
					select {
					case <-t.C:
						atomic.StoreUint64(&bucketInstance.status, bucketInstance.capacity)
					}
				}
			}()
		})
	return bucketInstance
}

func (b *bucket) Limit() bool {
	for {
		r := atomic.LoadUint64(&b.status)
		if r == 0 {
			return false
		}
		if !atomic.CompareAndSwapUint64(&b.status, r, r-1) {
			continue
		}
		return true
	}
}



type wrappedStream struct {
	grpc.ClientStream
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
	return w.ClientStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) error {
	b := getBucketInstance()
	if b.Limit() {
		return w.ClientStream.SendMsg(m)
	}
	return status.Errorf(codes.ResourceExhausted, "Too many request")
}

func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {
	return &wrappedStream{s}
}

func RateLimitClientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc,
	cc *grpc.ClientConn, method string,
	streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
		s, err := streamer(ctx, desc, cc, method, opts...)
		if err != nil {
			return nil, err
		}
		return newWrappedStream(s), nil

}

'알고리즘 > 기타' 카테고리의 다른 글

Strong Connected Component - Gabow 알고리즘  (0) 2020.04.23
Level Compressed Trie  (0) 2020.04.08
Sorting Algorithm for Hardware Acceleration  (0) 2020.03.29