Elasticsearch-pa: erreur à l'aide de async_bulk, index manquant d'actions

0

La question

Dans son docstring, elasticsearch.helpers.async_bulk décrit lui-même comme un

Aide pour le :meth:~elasticsearch.AsyncElasticsearch.bulk api qui fournit plus humaine interface conviviale - il consomme un itérateur d'actions et de envoie à elasticsearch en morceaux. source

Contexte

J'ai été en utilisant AsyncElasticsearch.bulk() avec succès pour envoyer des pandas dataframes à certains ES de l'instance

def _rec_to_actions(self, df):
    for record in df.to_dict(orient="records"):
        yield ('{ "index" : { "_index" : "%s" }}' % (self.index))
        yield (json.dumps(record, default=int))

async def send_to_elasticsearch(self, df: DataFrame):
    logger.info(f"{self.stage_name} sending batch to elastic")
    await self.elastic_client.bulk(self._rec_to_actions(df))

Question

Cependant, quand il s'agit de async_bulkJe suis arriver index is missing erreurs.

async def send_to_elasticsearch(self, df: DataFrame):
    await async_bulk(self.elastic_client, self._rec_to_actions(df))

Essayé de tune _rec_to_actions() de plusieurs façons, sans beaucoup d'effet.

def _rec_to_actions(self, df):
    for record in df.to_dict(orient="records"):
        record["index"] = self.index
        yield (json.dumps(record, default=int))

Je pense que le principal problème est que je ne suis pas tout à fait sûr de savoir ce qu'est une action, dans le contexte d'elasticsearch. Cette notion est partout dans la documentation, mais n'est pas clair pour la structure de données homologue dans cette bibliothèque de code source (aucun que j'ai pu trouver, de toute façon)

Ce qui est exactement une action et comment dois-je accorder mon générateur d'envoyer df données à self.index?

environnement

  • python = "3.9.5"
  • elasticsearch = "7.14.1"
elasticsearch python
2021-11-18 16:22:30
1

La meilleure réponse

0

Cette documentation il est plus facile:

def _rec_to_actions(self, df):
    for record in df.to_dict(orient="records"):
       yield {"_index": self.index, "_source": json.dumps(record, default=int)}
2021-11-19 15:34:19

Dans d'autres langues

Cette page est dans d'autres langues

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................