令牌桶算法可以通过多种编程语言实现,以下是几种常见的实现方式:
1. Java实现
```java
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class TokenBucket {
private final int capacity; // 令牌桶容量
private final int rate; // 令牌生成速率
private int tokens; // 当前令牌数量
public TokenBucket(int capacity, int rate) {
this.capacity = capacity;
this.rate = rate;
this.tokens = capacity;
}
public synchronized boolean acquire() {
refill();
if (tokens > 0) {
tokens--;
return true;
}
return false;
}
private void refill() {
long now = System.currentTimeMillis();
long elapsed = now - lastRefillTime;
if (elapsed > refillInterval) {
tokens = Math.min(capacity, tokens + (elapsed / refillInterval));
lastRefillTime = now;
}
}
public static void main(String[] args) {
TokenBucket tokenBucket = new TokenBucket(10, 1); // 容量为10,生成速率为1
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(tokenBucket::refill, 0, 1, TimeUnit.SECONDS);
for (int i = 0; i < 15; i++) {
System.out.println(tokenBucket.acquire() ? "Request accepted" : "Request rejected");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
scheduler.shutdown();
}
}
```
2. Python实现
```python
import time
class TokenBucket:
def __init__(self, rate, capacity):
"""
初始化令牌桶
:param rate: 令牌添加速率,单位是令牌/秒
:param capacity: 令牌桶的容量
"""
self.rate = rate
self.capacity = capacity
self.tokens = 0
self.last_checked = time.time()
def consume(self, tokens=1):
"""
消费令牌
:param tokens: 需要消费的令牌数,默认为1
:return: 如果消费成功,返回True;如果令牌不足,返回False
"""
self._add_tokens()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def _add_tokens(self):
now = time.time()
elapsed = now - self.last_checked
if elapsed > 1:
self.tokens = min(self.capacity, self.tokens + int(elapsed * self.rate))
self.last_checked = now
```
3. C语言实现