실제 많은 서비스에서 인프라의 availability를 위해 단위 시간안에 수많은 요청이 들어오는 걸 방지하는 로직은 필수적이다.
예를 들면, 카카오톡으로 메시지를 보낼 때, 순간적으로 많은 메시지를 보내면 나오는 경고문이나, 하루에 1000건 이상 request를 받지 않는 서비스 등 다양한 방법에서 사용한다.
이는 인프라의 availability에 직접적인 관여를 하는 클라우드 환경에서도 필수적인 요소인데, 대게는 네트워크 트래픽을 컨트롤 하는 데 있어서 많은 연관을 가지고 있다.
이러한 Rate Limiting Algorithm에는 여러가지 기법들이 존재하지만 이번에는 leaky bucket 알고리즘을 다룰 것이다.
Leaky Bucket Algorithm
leaky bucket 알고리즘은 물 새는 바가지를 연상시키는 이름의 알고리즘 답게 bucket이 차면 설령 그것이 흘러넘쳐 overflow가 발생하더라도 너무 많은 물을 떨어지지 않게 하는 것이 중요하다. 구멍의 갯수는 traffic speed라고 생각하면 되고 bucket size는 overflow로 drop이 될 때까지 traffic을 queueing 할 수 있는 크기라고 생각하면 된다.
- 구멍을 통해 물이 나가는 것은 request가 성립이 되는 것
- 흘러넘치는 물은 request가 버려지는 것
이 leaky bucket에서는 두가지 속성을 정의해야 한다.
- Rate: time unit당 처리할 수 있는 이상적인 request의 양
- 정의한 rate bound보다 많은 request가 처리되면 안된다.
- Capacity: 동시에 가지고 있을 수 있는 request의 양
- 정의한 capacity이상의 request는 drop된다.
이 leaky bucket 알고리즘을 이용하여 grpc stream interceptor에서 일정 수준이상 request가 들어왔을 때, 연결이 끊기는 예제는 다음과 같다.
package ratelimit
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"log"
"sync"
"sync/atomic"
"time"
)
const (
bucketRate = time.Second
bucketCap = 5
)
var bucketInstance *bucket
var once sync.Once
type bucket struct {
capacity uint64
status uint64
}
func getBucketInstance() *bucket{
once.Do(
func() {
bucketInstance = &bucket{capacity: bucketCap, status: bucketCap}
go func() {
t := time.NewTicker(bucketRate)
for {
select {
case <-t.C:
atomic.StoreUint64(&bucketInstance.status, bucketInstance.capacity)
}
}
}()
})
return bucketInstance
}
func (b *bucket) Limit() bool {
for {
r := atomic.LoadUint64(&b.status)
if r == 0 {
return false
}
if !atomic.CompareAndSwapUint64(&b.status, r, r-1) {
continue
}
return true
}
}
type wrappedStream struct {
grpc.ClientStream
}
func (w *wrappedStream) RecvMsg(m interface{}) error {
return w.ClientStream.RecvMsg(m)
}
func (w *wrappedStream) SendMsg(m interface{}) error {
b := getBucketInstance()
if b.Limit() {
return w.ClientStream.SendMsg(m)
}
return status.Errorf(codes.ResourceExhausted, "Too many request")
}
func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {
return &wrappedStream{s}
}
func RateLimitClientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc,
cc *grpc.ClientConn, method string,
streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
s, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return nil, err
}
return newWrappedStream(s), nil
}
'알고리즘 > 기타' 카테고리의 다른 글
Strong Connected Component - Gabow 알고리즘 (0) | 2020.04.23 |
---|---|
Level Compressed Trie (0) | 2020.04.08 |
Sorting Algorithm for Hardware Acceleration (0) | 2020.03.29 |