Qu’est-ce que tu attends?
La création, la mise à jour ou la suppression de ressources AWS peut prendre un certain temps et, lorsque cela est fait dans un script, peut entraîner des erreurs si le code suivant tente d’accéder auxdites ressources. Dans cet article de blog, nous expliquerons ce que sont les serveurs, quand et comment les utiliser et comment éviter toute erreur lorsqu’il s’agit de modifier l’état d’une ressource.
Noter: Le code suivant sera écrit en Python et à l’aide de la bibliothèque Boto3, mais Waiters peut également être utilisé avec l’un des autres SDK AWS.
- Compte AWS
- Python3/Pip
- Boto3
Dans cette procédure pas à pas, nous examinons un script python qui extrait les données d’un fichier csv et les écrit dans une instance DynamoDB nouvellement créée sur AWS.
Je vais essayer de développer de manière organique un cas d’utilisation pour les serveurs, mais si vous êtes déjà à l’aise avec le SDK Python sur votre environnement local, n’hésitez pas à faire défiler directement la section Serveurs.
Tout d’abord, nous allons jeter un œil à l’ensemble de données.
MMM,127.83,1669231715.438569
AOS,61.8,1669231717.501728
ABT,106.14,1669231718.50959
ABBV,159.42,1669231719.519121
ABMD,377.81,1669231720.609117
ACN,295.34,1669231721.636138
ATVI,76.44,1669231722.648533
ADP,262.38,1669231725.858109
A,157.01,1669231730.847122
GOOGL,98.21,1669231741.430469
MO,44.81,1669231743.767232
AMZN,94.11,1669231744.166251
AIG,61.12,1669231750.5217159
ABC,165.99,1669231756.527267
AAPL,150.95,1669231765.310137
Le nom du fichier de données est stock_prices.csv
et se trouve dans le même répertoire que notre example_waiters.py
scénario. La première colonne représente le symbole boursier, la deuxième colonne sera son prix correspondant et la dernière colonne est l’horodatage unix représentant l’heure à laquelle le cours de l’action a été extrait.
## Helper Functions
def get_file_contents(file_path):
f = open(file_path, "r")
contents = f.readlines()
f.close()return [ line.strip() for line in contents ]
def create_item_obj_for_dynamo(line_data):
symbol, price, extracted_time = line_data.split(",")
Item = {
"symbol": { "S": symbol },
"price": { "S": price },
"extracted_time": { "S": extracted_time }
}
return Item
La get_file_contents
La fonction lit notre fichier et renvoie une liste de son contenu, où l’index de la liste correspond à la ligne de données du fichier csv.
La create_item_obj_for_dynamo
convertit chaque ligne (de cette liste) en un objet qui peut être passé dans la Article paramètre de dynamodb put_item()
Appel API.
Il n’y a que deux paramètres requis dans le put_item
api, c’est-à-dire TableName
et Item
.
Pour Item
nous voyons qu’il attend un objet (ou un dictionnaire python) dont les clés correspondent aux noms des colonnes de la table et dont la valeur est un dictionnaire (la clé de ce dictionnaire correspondant au type de données que nous voulons que les valeurs de cette colonne aient et la valeur est ce que nous voulons insérer dans cette colonne).
Étant donné que DynamoDB est une base de données NoSQL, seule la Primary Key
doit être transmis pour chaque objet d’élément de la Item
paramètre pourput_item
. Dans notre exemple, la clé primaire/de partition sera un symbol
pour le symbole boursier.
Item = {
"stock": { "S": symbol },
"price": { "S": price },
"extracted_time": { "S": extracted_time }
}
Ainsi, si nous remplaçons symbol
avec stock
on rencontrerait alors un ValidationException
puisqu’il n’y a pas de clé correspondant à la Primary Key
de notre table, qui est symbol
Tant que nous fournissons la clé primaire dans l’objet, nous pouvons ensuite ajouter d’autres paires clé-valeur comme nous le souhaitons.
import boto3# initialize client
dynamo = boto3.client("dynamodb")
def create_dynamo_table(table_name):
try:
dynamo.create_table(
TableName=table_name,
AttributeDefinitions=[{
"AttributeName": "symbol",
"AttributeType": "S"
}],
KeySchema=[{
"AttributeName": "symbol",
"KeyType": "HASH"
}],
ProvisionedThroughput={
"ReadCapacityUnits": 1,
"WriteCapacityUnits": 1
}
)
except Exception as err:
print(err)
exit(1)
def add_item_to_table(table_name, item_obj):
try:
dynamo.put_item(
TableName=table_name,
Item=item_obj
)
print(f"Successfully added {item_obj['symbol']['S']} to {table_name} table")
except Exception as err:
print(f"Failed to add {item_obj['symbol']['S']} to {table_name} table")
print(err)
print("")
exit(1)
La create_dynamo_table
la fonction appelle lacreate_table
api, tous les arguments affichés sont obligatoires.
La KeySchema
dispute c’est là que nous disons explicitement que symbol
sera notre clé de partition (via le HACHER mot-clé) et AttributeDefinitions
est l’endroit où nous définissons son type de données.
Nous définissons arbitrairement les unités de capacité de lecture/écriture sur 1, car nous ne travaillons qu’avec un très petit ensemble de données. La add_item_to_table
fonction est l’endroit où nous appelons le put_item
API.
## example_waiters.py
import boto3# initialize client
dynamo = boto3.client("dynamodb")
""" --- Helper Functions --- """
def get_file_contents(file_path):
f = open(file_path, "r")
contents = f.readlines()
f.close()
return [ line.strip() for line in contents ]
def create_item_obj_for_dynamo(line_data):
symbol, price, extracted_time = line_data.split(",")
Item = {
"symbol": { "S": symbol },
"price": { "S": price },
"extracted_time": { "S": extracted_time }
}
return Item
""" --- API CALLS --- """
def create_dynamo_table(table_name):
try:
dynamo.create_table(
TableName=table_name,
AttributeDefinitions=[{
"AttributeName": "symbol",
"AttributeType": "S"
}],
KeySchema=[{
"AttributeName": "symbol",
"KeyType": "HASH"
}],
ProvisionedThroughput={
"ReadCapacityUnits": 1,
"WriteCapacityUnits": 1
}
)
except Exception as err:
print(err)
exit(1)
def add_item_to_table(table_name, item_obj):
try:
dynamo.put_item(
TableName=table_name,
Item=item_obj
)
print(f"Successfully added {item_obj['symbol']['S']} to {table_name} table")
except Exception as err:
print(f"Failed to add {item_obj['symbol']['S']} to {table_name} table")
print(err)
print("")
exit(1)
def main():
#initialize variables
table_name = "stock-prices"
file_path = "./stock_prices.csv"
#execute tasks
create_dynamo_table(table_name)
stock_data_objs = get_file_contents(file_path)
for stock_data in stock_data_objs:
item_obj = create_item_obj_for_dynamo(stock_data)
add_item_to_table(table_name, item_obj)
if __name__ == "__main__":
main()
Voici à quoi notre script entier devrait ressembler actuellement. Dans main
nous définissons d’abord notre table_name
et file_path
et sur la ligne suivante, nous créons notre table dynamodb.
La stock_data_objs
variable est un tableau qui correspond à chaque ligne de données dans stock_symbols.csv
.
Nous parcourons chaque ligne de données de ce tableau, l’analysons dans un dictionnaire python, puis transmettons ce dictionnaire auput_item
méthode à ajouter à notre stock-prices
table.
Avant de continuer, nous devons résoudre un problème potentiel
Bien que le code s’exécute localement, nous devons toujours nous assurer que nous disposons des autorisations IAM appropriées associées à notre utilisateur IAM.
Un moyen rapide de vérifier et de voir s’il manque des autorisations serait d’exécuter le script ci-dessus.
S’il y a unAccessDeniedException
lors de l’exécution du code, cela signifie que nous n’avons pas les autorisations appropriées attachées à notre utilisateur IAM pour exécuter le code. Cette section explique comment ajouter ces autorisations pour corriger cette erreur. Si vous n’avez aucune erreur, cette section peut être ignorée.
AWS Security Token Service (AWS STS) est un service utilisé pour récupérer les informations d’identification de sécurité nécessaires pour accéder aux ressources AWS, mais peut également être utilisé pour obtenir des informations concernant ces informations d’identification. Afin de savoir où ajouter les autorisations, nous devrons connaître l’utilisateur IAM que nous utilisons pour effectuer nos appels API. Nous utiliserons le get_caller_identity
méthode de STS pour aider à cela.
## get_user_arn.py
import boto3# initialize client
sts = boto3.client("sts")
# get ARN for user
response = sts.get_caller_identity()["Arn"]
print(response)
L’exécution du script ci-dessus génère l’ARN de l’utilisateur IAM dont les informations d’identification sont utilisées pour effectuer le boto3
Appels d’API à AWS, nous voyons dans mon cas que cet utilisateur est George. Maintenant, ajoutons des autorisations à l’utilisateur.
Accédez à la console de service IAM et dans le menu latéral, cliquez sur Utilisateurs. De là, nous sélectionnons l’utilisateur qui nous intéresse (qui dans mon cas est George). Avec le Autorisations onglet en surbrillance, sélectionnez maintenant Ajouter une politique en ligne pour créer une nouvelle politique d’autorisations pour votre utilisateur.
Pour le service, sélectionnez DynamoDB
et les actions nécessaires correspondent à
dynamodb:CreateTable
dynamodb:DescribeTable
dynamodb:PutItem
Nous pouvons définir Ressources pointer uniquement vers cette table DynamoDB particulière si nous souhaitons être plus restrictifs avec les autorisations. Étant donné que le nom de la table que nous allons créer s’appelle stock-prices
l’ARN prendra la forme
arn:aws:dynamodb:{your-region}:{your-account-id}:table/stock-prices
Cliquez maintenant sur Politique de révision
Donnez un nom à la stratégie, puis cliquez sur Créer une stratégie. Cela va maintenant attacher les autorisations nécessaires pour exécuter le code localement.
Maintenant que nous nous sommes assurés que nous avons les bonnes autorisations, après avoir exécuté le script, nous voyons que nous obtenons un ResourceNotFoundException
. L’affirmation donnée ici est que notre cours de la bourse la table n’a pas été trouvée et n’a donc pas été créée.
Les docs Boto3 mentionnent que CreateTable
est asynchrone, ce qui expliquerait la raison pour laquelle nous recevons ResourceNotFoundException
.
Comme le tableau est initialement dans le CRÉER état, nous aurions besoin de trouver un moyen d’attendre que son état soit ACTIF avant d’appeler put_api
.
Noter: Si nous devions commenter le code qui crée une table et exécuter à nouveau le script, quelques secondes après la première exécution, nous serions en mesure d’ajouter les données à la table et cela résoudrait le problème. Mais nous voulons pouvoir le faire de manière synchrone et donc exécuter le script une seule fois. Une approche très courante pour résoudre ce problème serait d’utiliser le describe_table
API.
Du Syntaxe de la réponsece qui nous intéresse est le TableStatus
évaluer
def get_table_status(table_name):
try:
response = dynamo.describe_table(
TableName=table_name
)["Table"]print(f"The status of the table is currently: {response['TableStatus']}")
return response["TableStatus"]
except Exception as err:
print(err)
Nous ajoutons donc une nouvelle fonction à notre script pour obtenir et enregistrer l’état de la table
def main():
#initialize variables
table_name = "stock-prices"
file_path = "./stock_prices.csv"#execute tasks
create_dynamo_table(table_name)
count = 0
status_of_table = get_table_status(table_name)
while status_of_table != "ACTIVE":
status_of_table = get_table_status(table_name)
count += 1
print(count)
stock_data_objs = get_file_contents(file_path)
for stock_data in stock_data_objs:
item_obj = create_item_obj_for_dynamo(stock_data)
add_item_to_table(table_name, item_obj)
Après create_dynamo_table
nous avons un while
boucle qui continuera à boucler jusqu’à ce que le statut de la table soit égal à ACTIFpuis il sortira de la boucle et poursuivra avec le reste du script.
J’ai ajouté une variable count pour voir combien de fois cela bouclerait.
Dans ce cas, le describe_table
l’opération a été exécutée 145 fois ! Cela fait beaucoup d’appels d’API, qui peuvent s’additionner et coûter cher en temps supplémentaire.
Une solution de contournement serait d’importer le time
module et utilisation time.sleep
count = 0
status_of_table = get_table_status(table_name)
while status_of_table != "ACTIVE":
status_of_table = get_table_status(table_name)
time.sleep(1)
count += 1
print(count)
Mise à jour de notre while
boucle pour dormir pendant une seconde, nous voyons que nous avons apporté notre describe_table
invocations en baisse de 145 à 7 exécutions. Chose intéressante, cela nous indique également qu’il faut environ 7 secondes pour créer la table, nous pourrions donc faire time.sleep(7)
pour réduire nos invocations à 1.
Mais que se passe-t-il si d’autres tables DynamoDB que nous essayons de créer et d’ajouter des éléments prennent beaucoup plus de temps pour arriver à un ACTIF Etat? Choisir un certain nombre de secondes pour time.sleep
car semble tout à fait arbitraire…
À ce stade, vous pensez peut-être qu’il doit y avoir une meilleure façon de faire cela ?
Eh bien, vous avez raison !
Et nous en sommes enfin arrivés au sujet principal de ce blog…
Les serveurs sont utilisés sur un client
objet pour interroger une ressource AWS pour un état spécifié, en suspendant l’exécution jusqu’à ce que la ressource atteigne cet état.
Pour voir quels statuts sont disponibles pour être attendus pour un objet client donné, nous utilisons le waiter_names
attribut sur le client
objet:
import boto3dynamo = boto3.client("dynamodb")
ec2 = boto3.client("ec2")
sqs = boto3.client("sqs")
print("dynamo waiters:", dynamo.waiter_names)
print("")
print("ec2 waiters:", ec2.waiter_names)
print("")
print("sqs waiters:", sqs.waiter_names)
Nous voyons qu’il n’y a que deux statuts pour DynamoDB où nous pouvons utiliser un waiter
pour interroger et arrêter notre exécution. EC2 en a beaucoup plus, alors que SQS n’en a pas. Il n’est pas toujours garanti qu’un client aura des serveurs.
Lors de l’utilisation d’objets de ressource, nous pouvons utiliser .meta.client
pour accéder à notre objet client de niveau inférieur et peut donc obtenir waiter_names
de cette façon.
def main():
#initialize variables
table_name = "stock-prices"
file_path = "./stock_prices.csv"#execute tasks
create_dynamo_table(table_name)
#waiter for table to become ACTIVE
waiter = dynamo.get_waiter("table_exists")
waiter.wait(TableName=table_name)
stock_data_objs = get_file_contents(file_path)
for stock_data in stock_data_objs:
item_obj = create_item_obj_for_dynamo(stock_data)
add_item_to_table(table_name, item_obj)
Après avoir mis à jour notre fonction principale pour utiliser le client get_waiter
méthode avec le fourni waiter_name
(dans ce cas étant table_exists
) nous avons pu obtenir notre Waiter
objet. Pour exécuter le serveur spécifié, nous utilisons wait
sur cet objet et nous voyons que notre programme crée avec succès la table et ajoute tous les éléments. Tout cela a été fait sans le préalable while
logique de boucle et utilisation explicite de ladescribe_table()
API. Cela a rendu notre code plus lisible et plus facile à gérer.
Certaines choses à noter sont, par défaut, chaque objet Waiter a des délais différents pour interroger la ressource et un nombre différent de tentatives de relance. Ci-dessus, on voit que le DynamoDB.Waiter.TableExists
attend 4 fois plus longtemps pour interroger nos ressources qu’un EC2.Waiter.InstanceExists
ce qui peut ralentir notre programme.
De plus, compte tenu de ce que nous avons remarqué précédemment, il a fallu environ 7 secondes pour que notre instance devienne ACTIFdonc attendre 20 secondes peut être trop long ici.
Heureusement, nous pouvons utiliser le WaiterConfig
paramètre pour personnaliser le Delay
et MaxAttempts
et donc avoir plus de contrôle sur nos tentatives de sondage.
Notre script final, utilisant des serveurs
## example_waiters.py
import boto3
from typing import List# initialize client
dynamo = boto3.client("dynamodb")
""" --- Helper Functions --- """
def get_file_contents(file_path: str) -> List[str]:
f = open(file_path, "r")
contents = f.readlines()
f.close()
return [ line.strip() for line in contents ]
def create_item_obj_for_dynamo(line_data: str) -> dict:
symbol, price, extracted_time = line_data.split(",")
Item = {
"symbol": { "S": symbol },
"price": { "S": price },
"extracted_time": { "S": extracted_time }
}
return Item
""" --- API CALLS --- """
def create_dynamo_table(table_name: str) -> None:
try:
dynamo.create_table(
TableName=table_name,
AttributeDefinitions=[{
"AttributeName": "symbol",
"AttributeType": "S"
}],
KeySchema=[{
"AttributeName": "symbol",
"KeyType": "HASH"
}],
ProvisionedThroughput={
"ReadCapacityUnits": 1,
"WriteCapacityUnits": 1
}
)
print(f"Successfully created the {table_name} table")
except Exception as err:
print(err)
exit(1)
def add_item_to_table(table_name: str, item_obj: dict) -> None:
try:
dynamo.put_item(
TableName=table_name,
Item=item_obj
)
print(f"Successfully added {item_obj['symbol']['S']} to {table_name} table")
except Exception as err:
print(f"Failed to add {item_obj['symbol']['S']} to {table_name} table")
print(err)
print("")
exit(1)
def main() -> None:
#initialize variables
table_name = "stock-prices"
file_path = "./stock_prices.csv"
create_dynamo_table(table_name)
#wait for table to be in ACTIVE state
waiter = dynamo.get_waiter("table_exists")
waiter_config = {"Delay": 2, "MaxAttempts": 10}
waiter.wait(TableName=table_name, WaiterConfig=waiter_config)
#parse data and add to dynamo table
stock_data_objs = get_file_contents(file_path)
for stock_data in stock_data_objs:
item_obj = create_item_obj_for_dynamo(stock_data)
add_item_to_table(table_name, item_obj)
if __name__ == "__main__":
main()
Les serveurs sont un excellent moyen d’arrêter temporairement l’exécution et d’interroger une ressource jusqu’à ce qu’un état spécifique soit atteint. Cela permet d’éviter de rencontrer des erreurs lors de l’accès à cette ressource. L’utilisation de serveurs rend le code plus lisible en supprimant la logique passe-partout et peut être configuré pour interroger à différents taux et réessayer les tentatives, ce qui aide à optimiser votre code.
Merci pour la lecture!
N’hésitez pas à me contacter également sur LinkedIn!