Dans son docstring, elasticsearch.helpers.async_bulk
décrit lui-même comme un
Aide pour le :meth:
~elasticsearch.AsyncElasticsearch.bulk
api qui fournit plus humaine interface conviviale - il consomme un itérateur d'actions et de envoie à elasticsearch en morceaux. source
Contexte
J'ai été en utilisant AsyncElasticsearch.bulk()
avec succès pour envoyer des pandas dataframes à certains ES de l'instance
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
yield ('{ "index" : { "_index" : "%s" }}' % (self.index))
yield (json.dumps(record, default=int))
async def send_to_elasticsearch(self, df: DataFrame):
logger.info(f"{self.stage_name} sending batch to elastic")
await self.elastic_client.bulk(self._rec_to_actions(df))
Question
Cependant, quand il s'agit de async_bulk
Je suis arriver index is missing
erreurs.
async def send_to_elasticsearch(self, df: DataFrame):
await async_bulk(self.elastic_client, self._rec_to_actions(df))
Essayé de tune _rec_to_actions()
de plusieurs façons, sans beaucoup d'effet.
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
record["index"] = self.index
yield (json.dumps(record, default=int))
Je pense que le principal problème est que je ne suis pas tout à fait sûr de savoir ce qu'est une action, dans le contexte d'elasticsearch. Cette notion est partout dans la documentation, mais n'est pas clair pour la structure de données homologue dans cette bibliothèque de code source (aucun que j'ai pu trouver, de toute façon)
Ce qui est exactement une action et comment dois-je accorder mon générateur d'envoyer df données à self.index
?
environnement
- python = "3.9.5"
- elasticsearch = "7.14.1"