Résolvons les problèmes d’utilisation qui se sont produits dans nos implémentations précédentes
Dans nos articles précédents, nous avons appris comment construire un limiteur de débit simple et un limiteur de débit qui nous aide à répartir uniformément les demandes. Vous pouvez consulter les articles liés. Je vais essayer de rendre cet article le plus complet possible afin que vous ne soyez pas dépendant des précédents, mais il est toujours bon d’avoir le contexte historique.
Les limites des implémentations précédentes sont devenues les exigences de cette implémentation.
- Si nos limiteurs de débit n’étaient pas utilisés pendant un certain temps, ils ne pourraient pas compenser la sous-utilisation des ressources.
Cela peut se produire des deux manières suivantes :
un. Si le limiteur de débit a une transaction/seconde de 5
, et dans la seconde précédente, seuls deux ont été utilisés, nous devrions pouvoir utiliser les anciens permis inutilisés. Les politiques exactes peuvent dépendre des implémentations.
b. Supposons que notre limiteur de débit ait un TPS (transactions par seconde) de 1
. Une demande a été faite et le limiteur est resté inactif pendant cinq secondes. Si la demande arrive à 6,3 secondes, notre implémentation précédente l’a fait attendre 0,7 seconde. Mais idéalement, nous devrions exécuter immédiatement.
2. Nous ne pouvions pas acquérir plusieurs permis à la fois ; ils ont été construits pour n’acquérir qu’un seul permis. C’est une limite. Que se passe-t-il si un client souhaite effectuer cinq appels n/w sur cinq threads différents via un pool de threads ? Ne pas pouvoir acquérir n
permis conduirait d’autres appelants non liés à acquérir le permis, ce qui pourrait ne pas être ce que nous voulons.
Pour résoudre la sous-utilisation, nous devons utiliser des jetons produits pendant la période d’inactivité, mais que se passe-t-il si la période d’inactivité est énorme ? Cela peut conduire à un flot de jetons inutilisés, et notre limiteur n’aurait aucun rôle à jouer car les ressources peuvent être bombardées. Nous introduisons un concept appelé « permis stockés max », qui est la limite maximale des permis que nous pouvons stocker. Le client doit le fournir.
Résoudre l’acquisition de plusieurs permis est simple. Il y a deux étapes : introduire une nouvelle API dans notre contrat et comprendre comment fonctionne le throttling. Le code ci-dessous montre le nouveau contrat pour nos limiteurs de débit, et nous parlerons de la limitation dans les sections suivantes.
public interface RateLimiter {
/**
* Rate limits the code passed inside as an argument.
*
* @param code representation of the piece of code that needs to be rate limited.
* @return true if executed, false otherwise.
*/
boolean throttle(Code code);/**
* When the piece of code that needs to be rate limited cannot be represented as a contiguous
* code, then entry() should be used before we start executing the code. This brings the code inside the rate
* limiting boundaries.
*
* @return true if the code will execute and false if rate limited.
* <p
*/
boolean acquire();
/**
* Allows multiple permits at the same time. If an expensive task takes n permits, the further calls should take the
* toll on the rate.
* @param permits Permits required.
* @return true, if successful, false otherwise.
*/
boolean acquire(int permits);
/**
* Interface to represent a contiguous piece of code that needs to be rate limited.
*/
interface Code {
/**
* Calling this function should execute the code that is delegated to this interface.
*/
void invoke();
}
}
- TPS — Le nombre de transactions que nous pouvons effectuer en une seconde est appelé « transactions par seconde ».
- Permis – Chaque transaction peut se faire par l’acquisition d’un permis.
- Permis stockés — Nombre de permis que nous avons en stock à tout moment. Il s’agit de la somme des « permis frais » d’une seconde et de ceux stockés de la sous-utilisation précédente.
- Permis max — Nous ne pouvons pas stocker de permis illimités ; c’est le plafond maximum à ce sujet.
- MST (
SmoothTickTimeDuration
) — Ceci est crucial pour le timing. Nous divisons une seconde par TPS et obtenons STTD. Mathématiquement, il s’agit de la durée pendant laquelle chaque demande doit être espacée pour maintenir le débit souhaité.
Dans des situations simples où nous acquérons un permis, notre STTD maintiendra l’écart entre les demandes lorsque nous ajouterons STTD au temps d’exécution actuel. Lors de la prochaine demande, nous obtenons la différence entre la prochaine heure disponible et l’heure actuelle. Cela fera attendre le thread appelant pendant cette durée, ce qui maintiendra notre débit intact.
Mais considérons une situation où nous voulons acquérir cinq permis avec un TPS de 1
. Comment acquérir le permis ? Doit-on acquérir ou refuser la demande ? Dans des implémentations plus simples, nous pouvons refuser la demande, mais cela pourrait être un scénario courant dans des situations réelles.
Pour résoudre ce problème, nous fournissons cinq permis. L’un serait de la seconde actuelle (comme TPS est 1
), et quatre sont extraites des secondes futures. Cela fera bloquer la demande suivante pendant quatre secondes. De cette façon, nous pouvons maintenir le taux souhaité. En bref, nous n’étranglons pas la demande coûteuse, mais l’amende est prélevée sur les demandes ultérieures.
Notre implémentation deviendra plus simple si nous divisons le problème en plusieurs sous-problèmes. Ne vous inquiétez pas; nous ne ferons pas de « programmation dynamique » ici. Mauvaise blague! Je sais.
Nous devons demander deux choses au client : le TPS et le nombre maximal de permis pouvant être stockés.
public BurstySmoothRateLimiter(int tps, int maxStoredPermits) {
this.mMaxStoredPermits = maxStoredPermits;
this.mRequiredSmoothTickDuration = (double) NANO_PER_SEC / tps;
}
Problème 1 : Compte tenu de la situation, 0 — — — 1 — — — 2 — — — 3 — — — 4 — — –b-5, l’exécution précédente a eu lieu à 2
et une demande est arrivée à b
. Nous avons attendu jusqu’à 5
pour exécuter la demande.
Pour résoudre ce problème, nous introduirons une synchronisation de circuit qui modifierait la mNextFreeAvailableTime
jusqu’à maintenant. Il calculera également le nombre de permis que nous n’avons pas utilisés pendant la durée d’inactivité.
// This method does the sync and returns the next available time of execution
private double syncLocked(long now) {
// If current time is after the next free time, override to now.
if (now > mNextFreeAvailableTime) {
// Sync potentially available permits
int freshPermits = (int) ((now - mNextFreeAvailableTime) / mRequiredSmoothTickDuration);
mStoredPermits = Math.min(mStoredPermits + freshPermits, mMaxStoredPermits);
mNextFreeAvailableTime = now;
}
return mNextFreeAvailableTime;
}
Nous avons le prochain temps disponible avec nous. Nous exécuterons la demande en cours à ce moment.
Problème 2 : Comment acquérons-nous les permis ? Par exemple, si le nombre de permis est supérieur à notre TPS.
Tout d’abord, nous verrons si nous pouvons répondre à la demande en utilisant les permis stockés dans le limiteur. Si oui, nous le faisons. Sinon, nous acquérons de nouveaux permis et retardons les demandes à venir pour maintenir le taux.
int storePermitsUsed = Math.max(0, Math.min(permitsAskedFor, mStoredPermits));
// If we use all stored permits, then we have demanded >= storedPermits.
if (storePermitsUsed == mStoredPermits) {
freshPermitsUsed = permitsAskedFor - storePermitsUsed;
}// Wait if we cannot satisfy the request from stored permits.
// In case we can satisfy the request from stored permits, this wait will be 0
double wait = freshPermitsUsed * mRequiredSmoothTickDuration;
Maintenant, nous ajoutons l’attente pour calculer le prochain temps disponible et réduisons les permis utilisés à partir des permis stockés.
// Reduce the number of available permits
mStoredPermits -= storePermitsUsed;
mNextFreeAvailableTime += wait; // Calculated in the previous snippet.
Voici à quoi ressemble la méthode complète :
/**
* Reserves us 'n' permits and returns the earliest when these permits could be used.
* If a caller asks for more permits than max transactions per second, we do not stop per se but, the upcoming
* transactions get delayed for the required amount of time.
* @param permits permits asked for
* @param now current time in nanos
* @return Earliest time when 'n' permits could be used.
*/
private double reservePermitsLocked(int permits, long now) {
int freshPermitsUsed = 0;
double nextFreeAvailable = syncLocked(now);
int storePermitsUsed = Math.max(0, Math.min(permits, mStoredPermits));
// If we use all stored permits, then we have demanded >= storedPermits.
if (storePermitsUsed == mStoredPermits) {
freshPermitsUsed = permits - storePermitsUsed;
}double wait = freshPermitsUsed * mRequiredSmoothTickDuration;
// Reduce the number of available permits
mStoredPermits -= storePermitsUsed;
mNextFreeAvailableTime += wait;
// Fresh permits will encounter throttling and stored permits will fire in burst to cover under utilization.
return nextFreeAvailable;
}
Maintenant que nous avons acquis les permis requis et calculé le prochain temps disponible et d’exécution de la demande de permis en cours, nous pouvons retourner le temps libre disponible au acquire()
appel.
Lors de l’acquisition, nous vérifions si nous devons attendre avant de faire la demande ou si nous pouvons la faire immédiatement. Nous laissons le thread dormir sans interruption s’il y a un temps d’attente.
@Override
public boolean acquire(int permits) {
synchronized (mLock) {
if (permits <= 0) {
return false;
}
long now = System.nanoTime();
double nextAvailableTimeNanos = reservePermitsLocked(permits, now);
double wait = Math.max(nextAvailableTimeNanos - now, 0);
sleepWithoutInterruptions((long) wait);
return true;
}
}
Et c’est tout. Nous avons créé une nouvelle implémentation de notre limiteur de débit capable d’acquérir plusieurs jetons et de prendre en charge la sous-utilisation jusqu’à un certain niveau de permis.
Nous pouvons l’utiliser comme le limiteur de débit que nous avons implémenté dans les articles précédents mais sans la capacité de stocker des jetons. Dans cet exemple, nous allons faire le maxStoredPermits
égal .
public static void main(String[] args) {
RateLimiter limiter = new BurstySmoothRateLimiter(3,0);
Thread[] group = new Thread[16];
Runnable r = () -> {
for (int i = 0; i < 50; i++) {
if (limiter.acquire(1)) {
System.out.println("Values:- " + Thread.currentThread().getName() + ": " + i);
}}
};
for (int i = 0; i < 16; i++) {
group[i] = new Thread(r);
group[i].start();
}
}
Cela devrait permettre à chaque transaction de se produire avec un intervalle de 0,3 seconde. Voici le résultat :
Maintenant, nous allons essayer d’acquérir dix permis à partir de la capacité de 2. Cela devrait nous fournir dix permis, mais faites en sorte que la prochaine demande paie le coût supplémentaire en le limitant pour huit jetons supplémentaires.
public static void main(String[] args) {
RateLimiter limiter = new BurstySmoothRateLimiter(2,0);
Thread[] group = new Thread[16];
Runnable r = () -> {
for (int i = 0; i < 50; i++) {
// Mimicks the expensive operation that requests for 10 permits at a time.
if (limiter.acquire(10)) {
System.out.println("Values:- " + Thread.currentThread().getName() + ": " + i);
}}
};
for (int i = 0; i < 16; i++) {
group[i] = new Thread(r);
group[i].start();
}
}
Voici les résultats attendus :
De même, vous pouvez utiliser le maxStoredPermit
paramètre pour stocker les permis précédemment sous-utilisés.
Pour voir l’implémentation complète de cette classe, veuillez consulter le code ici.
Merci d’avoir lu. Restez à l’écoute pour plus.