Meilleure pratique pour interroger un grand nombre d’entités ndb à partir d’un magasin de données

Je me suis heurté à une limite intéressante avec le magasin de données App Engine. Je crée un gestionnaire pour nous aider à parsingr certaines données d’utilisation sur l’un de nos serveurs de production. Pour effectuer l’parsing, je dois interroger et résumer plus de 10 000 entités extraites du magasin de données. Le calcul n’est pas difficile, c’est juste un histogramme d’éléments qui passent un filtre spécifique des échantillons d’utilisation. Le problème que j’ai rencontré est que je ne parviens pas à récupérer les données de la banque de données assez rapidement pour effectuer un traitement avant de toucher la date limite de la requête.

J’ai essayé tout ce que je peux imaginer pour fragmenter la requête en appels RPC parallèles pour améliorer les performances, mais, selon appstats, je n’arrive pas à exécuter les requêtes en parallèle. Quelle que soit la méthode que j’essaie (voir ci-dessous), il semble toujours que le RPC retombe dans une cascade de requêtes successives séquentielles.

Remarque: le code de requête et d’parsing fonctionne, il se déroule lentement, car je ne peux pas obtenir les données assez rapidement à partir du magasin de données.

Contexte

Je n’ai pas de version live que je puisse partager, mais voici le modèle de base pour la partie du système dont je parle:

class Session(ndb.Model): """ A tracked user session. (customer account (company), version, OS, etc) """ data = ndb.JsonProperty(required = False, indexed = False) class Sample(ndb.Model): name = ndb.SsortingngProperty (required = True, indexed = True) session = ndb.KeyProperty (required = True, kind = Session) timestamp = ndb.DateTimeProperty(required = True, indexed = True) tags = ndb.SsortingngProperty (repeated = True, indexed = True) 

Vous pouvez considérer les exemples comme des moments où un utilisateur utilise une fonctionnalité d’un nom donné. (ex: ‘systemA.feature_x’). Les balises sont basées sur les détails du client, les informations système et la fonctionnalité. ex: [‘winxp’, ‘2.5.1’, ​​’systemA’, ‘feature_x’, ‘premium_account’]). Les balises forment donc un ensemble de jetons dénormalisés qui pourraient être utilisés pour trouver des échantillons d’intérêt.

L’parsing que j’essaie de faire consiste à prendre une plage de dates et à demander combien de fois un ensemble de fonctionnalités (peut-être toutes les fonctionnalités) était utilisé par jour (ou par heure) par compte client (entreprise et non par utilisateur).

Donc, l’entrée du gestionnaire est quelque chose comme:

  • Date de début
  • Date de fin
  • Mots clés)

La sortie serait:

 [{ 'company_account': , 'counts': [ {'timeperiod': , 'count': }, ... ] }, ... ] 

Code commun pour les requêtes

Voici un code commun à toutes les requêtes. La structure générale du gestionnaire est un gestionnaire get simple utilisant webapp2 qui configure les parameters de requête, exécute la requête, traite les résultats, crée les données à renvoyer.

 # -- Build Query Object --- # query_opts = {} query_opts['batch_size'] = 500 # Bring in large groups of entities q = Sample.query() q = q.order(Sample.timestamp) # Tags tag_args = [(Sample.tags == t) for t in tags] q = q.filter(ndb.query.AND(*tag_args)) def handle_sample(sample): session_obj = sample.session.get() # Usually found in local or memcache thanks to ndb count_key = session_obj.data['customer'] addCountForPeriod(count_key, sample.timestamp) 

Méthodes essayées

J’ai essayé une variété de méthodes pour tenter d’extraire des données du magasin de données aussi rapidement que possible et en parallèle. Les méthodes que j’ai essayées jusqu’à présent incluent:

A. Itération unique

Ceci est plus un cas de base simple à comparer aux autres méthodes. Je construis juste la requête et itère sur tous les éléments permettant à ndb de faire ce qu’il fait pour les tirer les uns après les autres.

 q = q.filter(Sample.timestamp >= start_time) q = q.filter(Sample.timestamp <= end_time) q_iter = q.iter(**query_opts) for sample in q_iter: handle_sample(sample) 

B. Grand Fetch

L’idée ici était de voir si je pouvais faire un très gros fetch.

 q = q.filter(Sample.timestamp >= start_time) q = q.filter(Sample.timestamp <= end_time) samples = q.fetch(20000, **query_opts) for sample in samples: handle_sample(sample) 

C. Async va à travers l’intervalle de temps

L’idée ici est de reconnaître que les échantillons sont assez bien espacés dans le temps, je peux donc créer un ensemble de requêtes indépendantes qui divisent la région temporelle en morceaux et tentent d’exécuter chacun d’eux en parallèle en utilisant async:

 # split up timestamp space into 20 equal parts and async query each of them ts_delta = (end_time - start_time) / 20 cur_start_time = start_time q_futures = [] for x in range(ts_intervals): cur_end_time = (cur_start_time + ts_delta) if x == (ts_intervals-1): # Last one has to cover full range cur_end_time = end_time f = q.filter(Sample.timestamp >= cur_start_time, Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts) q_futures.append(f) cur_start_time = cur_end_time # Now loop through and collect results for f in q_futures: samples = f.get_result() for sample in samples: handle_sample(sample) 

D. Cartographie asynchrone

J’ai essayé cette méthode parce que la documentation donnait l’impression que ndb pouvait exploiter un certain parallélisme automatiquement en utilisant la méthode Query.map_async.

 q = q.filter(Sample.timestamp >= start_time) q = q.filter(Sample.timestamp <= end_time) @ndb.tasklet def process_sample(sample): period_ts = getPeriodTimestamp(sample.timestamp) session_obj = yield sample.session.get_async() # Lookup the session object from cache count_key = session_obj.data['customer'] addCountForPeriod(count_key, sample.timestamp) raise ndb.Return(None) q_future = q.map_async(process_sample, **query_opts) res = q_future.get_result() 

Résultat

J’ai testé un exemple de requête pour collecter le temps de réponse global et les traces d’appstats. Les résultats sont les suivants:

A. Itération unique

réel: 15.645s

Celui-ci va séquentiellement en allant chercher les lots les uns après les autres et récupère ensuite chaque session de memcache.

Méthode A appstats

B. Grand Fetch

réel: 12.12s

Effectivement le même que l’option A mais un peu plus rapide pour une raison quelconque.

Méthode B appstats

C. Async va à travers l’intervalle de temps

réel: 15.251s

Semble fournir plus de parallélisme au début mais semble être ralenti par une séquence d’appels à la prochaine lors de l’itération des résultats. En outre, il ne semble pas être en mesure de chevaucher les recherches de session memcache avec les requêtes en attente.

Méthode C appstats

D. Cartographie asynchrone

réel: 13.752s

Celui-ci est le plus difficile à comprendre pour moi. Il semble y avoir beaucoup de chevauchement, mais tout semble s’étendre dans une cascade au lieu d’être parallèle.

Méthode D appstats

Recommandations

Sur la base de tout cela, qu’est-ce qui me manque? Est-ce que je viens de limiter l’App Engine ou existe-t-il un meilleur moyen de réduire un grand nombre d’entités en parallèle?

Je ne sais pas quoi essayer ensuite. J’ai pensé à réécrire le client pour que plusieurs requêtes soient lancées sur le moteur de l’application en parallèle, mais cela semble assez brutal. Je m’attendrais vraiment à ce que le moteur d’application soit capable de gérer ce cas d’utilisation, alors je suppose qu’il me manque quelque chose.

Mettre à jour

Au final, j’ai trouvé que l’option C était la meilleure pour mon cas. J’ai pu l’optimiser pour terminer en 6,1 secondes. Toujours pas parfait, mais beaucoup mieux.

Après avoir reçu des conseils de plusieurs personnes, j’ai constaté que les éléments suivants étaient essentiels pour comprendre et garder à l’esprit:

  • Plusieurs requêtes peuvent s’exécuter en parallèle
  • Seulement 10 RPC peuvent être en vol à la fois
  • Essayez de dénormaliser au point qu’il n’y a pas de questions secondaires
  • Ce type de tâche est préférable pour mapper les files d’attente de tâches et réduire les requêtes en temps réel.

Donc ce que j’ai fait pour le rendre plus rapide:

  • J’ai partitionné l’espace de requête depuis le début en fonction de l’heure. (note: plus les partitions sont égales en termes d’entités retournées, mieux c’est)
  • J’ai dénormalisé les données pour éliminer la nécessité d’une requête de session secondaire
  • J’ai utilisé les opérations asynchrones ndb et wait_any () pour chevaucher les requêtes avec le traitement

Je n’obtiens toujours pas les performances que j’attendrais ou aimerais, mais c’est réalisable pour le moment. Je souhaite juste que ce soit un meilleur moyen de mettre en mémoire rapidement un grand nombre d’entités séquentielles dans les gestionnaires.

Un traitement de grande ampleur comme celui-ci ne devrait pas être effectué dans une requête utilisateur, qui a un délai de 60 secondes. Au lieu de cela, cela devrait être fait dans un contexte qui prend en charge les requêtes de longue durée. La queue des tâches prend en charge les demandes jusqu’à 10 minutes et (je crois) les contraintes de mémoire normales (les instances F1, par défaut, ont 128 Mo de mémoire ). Pour des limites encore plus élevées (pas de délai d’attente de requête, 1 Go + de mémoire), utilisez des backends .

Voici quelque chose à essayer: configurez une URL qui, lorsqu’elle est utilisée, déclenche une tâche de queue de tâches. Il renvoie une page Web qui interroge tous les ~ 5s vers une autre URL qui répond par true / false si la tâche de la queue des tâches est déjà terminée. La queue de tâches traite les données, ce qui peut prendre 10 secondes, et enregistre le résultat dans la banque de données sous la forme de données calculées ou d’une page Web rendue. Une fois que la page initiale détecte qu’elle est terminée, l’utilisateur est redirigé vers la page, qui récupère les résultats désormais calculés à partir du magasin de données.

La nouvelle fonctionnalité expérimentale de traitement de données (une API AppEngine pour MapReduce) semble très appropriée pour résoudre ce problème. Il effectue le partitionnement automatique pour exécuter plusieurs processus de travail en parallèle.

Les opérations de données volumineuses sur App Engine sont mieux implémentées en utilisant une opération de type mapreduce.

Voici une vidéo décrivant le processus, mais incluant BigQuery https://developers.google.com/events/io/sessions/gooio2012/307/

Il ne semble pas que vous ayez besoin de BigQuery, mais vous souhaiterez probablement utiliser les parties Map et Reduce du pipeline.

La principale différence entre ce que vous faites et la situation mapreduce est que vous lancez une instance et que vous parcourez les requêtes, où sur mapreduce, vous aurez une instance distincte exécutée en parallèle pour chaque requête. Vous aurez besoin d’une opération de réduction pour “résumer” toutes les données et écrire le résultat quelque part.

L’autre problème que vous avez est que vous devriez utiliser des curseurs pour itérer. https://developers.google.com/appengine/docs/java/datastore/queries#Query_Cursors

Si l’iterator utilise un décalage de requête, il sera inefficace, car un décalage émet la même requête, passe un certain nombre de résultats et vous donne le prochain ensemble, tandis que le curseur passe directement à l’ensemble suivant.

J’ai un problème similaire et après avoir travaillé avec Google pendant quelques semaines, je peux confirmer qu’il n’y a pas de solution magique au moins à partir de décembre 2017.

tl; dr: On peut s’attendre à un débit de 220 entités / seconde pour un SDK standard fonctionnant sur une instance B1 jusqu’à 900 entités / seconde pour un SDK corrigé s’exécutant sur une instance B8.

La limitation est liée au processeur et la modification du type instancé affecte directement les performances. Ceci est confirmé par des résultats similaires obtenus sur les instances B4 et B4_1G

Le meilleur débit que j’ai eu pour une entité Expando avec environ 30 champs est:

Standard SDK GAE

  • Instance B1: ~ 220 entités / seconde
  • Instance B2: ~ 250 entités / seconde
  • Instance B4: ~ 560 entités / seconde
  • Instance B4_1G: ~ 560 entités / seconde
  • Instance B8: ~ 650 entités / seconde

Patched GAE SDK

  • Instance B1: ~ 420 entités / seconde
  • Instance B8: ~ 900 entités / seconde

Pour le standard GAE SDK, j’ai essayé différentes approches, y compris le multi-threading, mais le meilleur a été fetch_async avec wait_any . La bibliothèque NDB actuelle fait déjà un excellent travail en utilisant les asynchrones et les contrats à terme sous le capot.

J’ai trouvé deux approches intéressantes pour optimiser ceci:

  • Matt Faus – Accélérer la lecture du magasin de données GAE avec la projection Protobuf
  • Evan Jones – Traçage d’un bogue de performance Python sur App Engine

Matt Faus explique très bien le problème:

GAE SDK fournit une API pour lire et écrire des objects dérivés de vos classes dans le magasin de données. Cela vous évite de devoir valider les données brutes renvoyées par la banque de données et de les reconditionner dans un object facile à utiliser. En particulier, GAE utilise des tampons de protocole pour transmettre les données brutes du magasin à la machine frontale qui en a besoin. Le SDK est alors chargé de décoder ce format et de renvoyer un object propre à votre code. Cet utilitaire est génial, mais parfois il fait un peu plus de travail que vous le souhaitez. […] En utilisant notre outil de profilage, j’ai découvert que 50% du temps passé à récupérer ces entités était pendant la phase de décodage protobuf-à-python-object. Cela signifie que le processeur sur le serveur frontal était un goulot d’étranglement dans ces lectures de banque de données!

GAE-data-access-web-request

Les deux approches tentent de réduire le temps passé à faire le protobuf en décodage Python en réduisant le nombre de champs décodés.

J’ai essayé les deux approches mais je ne réussis qu’avec Matt. Les internes du SDK ont changé depuis qu’Evan a publié sa solution. J’ai dû changer un peu le code publié par Matt ici , mais c’était assez facile – s’il ya un intérêt, je peux publier le code final.

Pour une entité Expando régulière avec environ 30 champs, j’ai utilisé la solution de Matt pour décoder uniquement quelques champs et obtenu une amélioration significative.

En conclusion, il faut planifier en conséquence et ne pas espérer pouvoir traiter beaucoup plus que quelques centaines d’entités dans une requête GAE “en temps réel”.