Onun docstring içinde, elasticsearch.helpers.async_bulk
bir olarak tanımlıyor
Yardımcı için: metamfetamin:
~elasticsearch.AsyncElasticsearch.bulk
sağlayan apı daha insan dostu bir arayüz-bir eylem yineleyicisi tüketir ve onları parçalar halinde elasticsearch'e gönderir. kaynak
Bağlam
Ben kullanıyorum AsyncElasticsearch.bulk()
pandalar veri çerçevelerini bazı ES örneklerine başarıyla göndermek için
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
yield ('{ "index" : { "_index" : "%s" }}' % (self.index))
yield (json.dumps(record, default=int))
async def send_to_elasticsearch(self, df: DataFrame):
logger.info(f"{self.stage_name} sending batch to elastic")
await self.elastic_client.bulk(self._rec_to_actions(df))
Sorun
Ancak, ne zaman gelir async_bulk
Ben alıyorum index is missing
hatasızlar.
async def send_to_elasticsearch(self, df: DataFrame):
await async_bulk(self.elastic_client, self._rec_to_actions(df))
Ayarlamaya çalıştım _rec_to_actions()
çok fazla etkisi olmayan çeşitli şekillerde.
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
record["index"] = self.index
yield (json.dumps(record, default=int))
Sanırım asıl sorun, elasticsearch bağlamında bir eylemin ne olduğunu bildiğimden emin değilim. Bu kavram dokümantasyonda her yerdedir, ancak bu kütüphane kaynak kodunda net bir veri yapısı karşılığı yoktur (yine de bulamadım)
Tam olarak bir eylem nedir ve jeneratörümü df'nin verilerini göndermek için nasıl ayarlamalıyım self.index
?
çevre
- python = "3.9.5"
- elasticsearch = "7.14.1"