Partagez les données de manière transparente entre les instances
Lorsque j’ai été présenté pour la première fois aux services aux apatrides, j’ai eu du mal à passer le « où est le bœuf ? » en pensant. À quoi bon un service qui ne se souvient de rien d’une demande à l’autre ? J’ai progressivement commencé à comprendre que l’astuce des services sans état est que tous les états doivent être conservés après chaque demande. Mais c’était quand même difficile parce que je ne pouvais pas utiliser les structures de données traditionnelles comme les cartes ou les listes ; J’ai dû créer des enregistrements et des relations, ce qui peut devenir encombrant.
Puis j’ai découvert Redissonun client Java pour Redis qui ressemble à des structures de données Java traditionnelles mais est conservé sur le disque. Par exemple, je peux créer un objet qui obéit au contrat de carte, mais la carte est disponible dans plusieurs instances d’un service autrement sans état.
Supposons que vous disposiez d’un service orienté message existant qui récupère les messages d’un bus de messages, effectue un traitement, puis renvoie un message avec des informations sur le traitement. Il s’agit d’un flux asynchrone ; si vous vouliez la présenter comme une requête synchrone, vous deviez attendre la réponse. Attendre une réponse n’est pas un problème, car il existe de nombreuses façons de gérer cela. Mais si vous aviez plusieurs instances du service synchrone, seul le service qui a envoyé le message initial souhaiterait recevoir la réponse pour la retransmettre en tant que réponse.
J’ai créé le système événementiel décrit ci-dessus, qui reçoit un message sur le Kafka message, le stocke dans une base de données et envoie un autre message indiquant que le processus est terminé. C’est un service artificiel, il n’y a aucune raison de le rendre asynchrone, mais parfois nous devons faire face à des services existants qui semblent aussi artificiels. Dans cet article, je vais construire un système qui fait en sorte que le flux obéisse au flux traditionnel de requête-réponse des services RESTful.
Pour le démontrer, je vais devoir développer une configuration assez complexe. J’aurai besoin d’une instance de Kafka, Redis, du service asynchrone et de deux instances du service de synchronisation. Pour que toutes ces pièces fonctionnent ensemble, j’utiliserai Kubernetes avec Helm. J’utiliserai également Gitlab avec leur pipeline CI/CD pour mettre à jour les graphiques helm et ArgoCD pour déployer à partir des graphiques helm.
Je n’entrerai pas dans tous les détails de cette configuration, mais les dépôts Gitlab sont tous publics si vous voulez patauger dans la fange. Je vais peut-être écrire quelques-uns des mécanismes sous-jacents du système CI/CD, mais l’essentiel est que je n’ai qu’à pousser vers Git, et après quelques minutes, les services sont mis à jour.
Pour comprendre le code que je vais présenter, vous devez être capable de suivre Reactive Java. Si vous avez encore du mal avec cela, vous pouvez lire mon article :
Je vais créer un service Web RESTful qui enverra un message dans la file d’attente des messages Kafka, puis attendra la réponse. Lorsque vous envoyez un message, il revient immédiatement, nous devons donc maintenant rechercher la réponse dans la file d’attente consommatrice.
Le problème est que si nous avons deux instances de ce service, nous ne savons pas à quelle instance le message de réponse sera envoyé. Les messages ne sont envoyés qu’à une seule instance dans un groupe de consommateurs. Nous pourrions jouer avec les groupes de consommateurs, mais cela se complique car chaque instance de Kubernetes est configurée de la même manière. La solution consiste à ajouter le message à une carte distribuée dans l’instance consommatrice afin que toutes les instances puissent le voir. Voici comment procéder :
@Slf4j
@RestController
@RequestMapping("/contrived")
public class SyncController {
private final Messaging messaging;
private final ObjectMapper objectMapper;
public SyncController(Messaging messaging,
ObjectMapper objectMapper) {
this.messaging = messaging;
this.objectMapper = objectMapper;
}
@GetMapping(path="/example/{message}",
produces = MediaType.APPLICATION_JSON_VALUE)
Mono<ResponseEntity<JsonNode>> initiateTransaction(
@PathVariable("message") final String message) {
return Mono.just(objectMapper
.createObjectNode()
.put("message", message))
.flatMap(messaging::addToQueueAndWaitForResponse)
.map(json-> ResponseEntity.ok().body(json))
.onErrorResume(e -> Mono
.just(ResponseEntity.badRequest().
body(objectMapper.createObjectNode()
.put("failures: ", e.getMessage()))));}
}
C’est le contrôleur que j’ai trouvé. La magie opère dans le addToQueueAndWaitForResponse
dans le bean Messagerie. Voici le code :
@Configuration
@Slf4j
public class Messaging {
private final BlockingQueue<JsonNode> queue =
new ArrayBlockingQueue<>(4);
private final ObjectMapper objectMapper;
private final RMapCacheReactive<String, JsonNode> responseMap;public Messaging(@Value("${redisson.address}") String address,
@Value("${redisson.password}") String password,
ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
try {
Config redissonConfig = new Config();
redissonConfig
.useSingleServer()
.setAddress(address)
.setPassword(password);
RedissonReactiveClient redisson =
Redisson.create(redissonConfig).reactive();
responseMap = redisson.getMapCache("sync-service");
} catch(Exception ex) {
log.error("exception creating redisson client", ex);
throw ex; // spring doesn't display a good error message
}
}
public Mono<JsonNode> addToQueueAndWaitForResponse(JsonNode json) {
queue.add(json);
return responseMap.containsKey(json.path("message").asText(""))
.filter(b -> b)
.switchIfEmpty(Mono.error(() -> new EmptyException()))
.retryWhen(RetrySpec
.fixedDelay(30, Duration.ofSeconds(1))
.filter(e -> e instanceof EmptyException))
.flatMap(b -> responseMap.get(
json
.path("message")
.asText("")))
.onErrorMap(e -> e
.getClass()
.getName()
.contains("RetryExhastedException")
? new IllegalStateException("Could not complete transaction")
: e);
}
@Bean
public Supplier<String> eventProducer(){
return () -> {
try {
JsonNode item = queue.take();
log.info("pulled {} from queue", item);
return item.toPrettyString();
} catch (InterruptedException e) {
log.warn("process interrupted");
throw new IllegalStateException("process interrupted");
}
};
}
@Bean
public Consumer<String> eventConsumer() {
return value -> Mono.just(value)
.doOnNext(s -> log.debug("consumed {}", s))
.flatMap(v -> {
try {
return Mono.just(objectMapper.readTree(v));
} catch (JsonProcessingException e) {
log.warn("exception deserializing {}", v);
return Mono.error(e);
}
})
.doOnNext(json -> log.info("message {} received", json))
.flatMap(json -> responseMap
.fastPut(json
.path("response")
.asText(""),
json,
10,
TimeUnit.MINUTES))
.subscribe(i -> log.info("consume returned {}", i),
e -> log.warn("consume returned error", e));
}
private class EmptyException extends Throwable {
}
}
Dans le constructeur, nous créons un Redisson MapCache
appelé sync-service
. Puis dans le addToQueueAndWaitForResponse
méthode, nous l’ajoutons à une file d’attente qui l’enverra à Kafka, puis nous verrons si la réponse est dans la MapCache
. Bien sûr, ce n’est pas la première fois, il y a donc une nouvelle tentative qui sera effectuée une fois par seconde pendant 30 secondes. Une fois qu’il trouve la réponse (identifiée par la chaîne dans le message), il la renverra au contrôleur, qui la renverra au demandeur.
Comment le message est-il ajouté au MapCache
? Dans la méthode consommateur, il ajoutera tous les messages au MapCache
avec une durée de vie de dix minutes. Une fois qu’il est ajouté, toutes les instances peuvent le voir, mais seuls ceux qui s’y intéressent le traiteront. La durée de vie signifie que nous n’avons pas à nettoyer la carte des messages indésirables.
Il ne s’agit que de quelques centaines de lignes de code, mais il montre comment créer des structures de données distribuées afin que les services puissent apparaître avec état mais s’exécutent dans plusieurs instances avec un état synchronisé.
Le code pour le async-service
peut être trouvé ici:
https://gitlab.com/kamradtfamily.net/async-service
Le code pour le service de synchronisation peut être trouvé ici :
https://gitlab.com/kamradtfamily.net/sync-service
Les cartes de barre peuvent être trouvées ici:
Merci d’avoir lu! Restez à l’écoute pour plus.