webentwicklung-frage-antwort-db.com.de

Verwendung der Bulk-API zum Speichern der Schlüsselwörter in ES mithilfe von Python

Ich muss einige Nachrichten in ElasticSearch integrieren, die in mein Python-Programm integriert sind. Nun versuche ich die Nachricht zu speichern:

d={"message":"this is message"}
    for index_nr in range(1,5):
        ElasticSearchAPI.addToIndex(index_nr, d)
        print d

Das heißt, wenn ich 10 Nachrichten habe, muss ich meinen Code zehnmal wiederholen. Ich möchte also versuchen, eine Skriptdatei oder eine Batchdatei zu erstellen. Ich habe das ElasticSearch Guide geprüft, die BULK-API kann verwendet werden. Das Format sollte wie folgt aussehen:

{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "type1", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
{ "doc" : {"field2" : "value2"} }

was ich getan habe ist: 

{"index":{"_index":"test1","_type":"message","_id":"1"}}
{"message":"it is red"}
{"index":{"_index":"test2","_type":"message","_id":"2"}}
{"message":"it is green"}

Ich verwende auch das Curl-Tool, um das Dokument zu speichern. 

$ curl -s -XPOST localhost:9200/_bulk --data-binary @message.json

Jetzt möchte ich my Python-Code verwenden, um die Datei in der elastischen Suche zu speichern.

43
chengji18
from datetime import datetime

from elasticsearch import Elasticsearch
from elasticsearch import helpers

es = Elasticsearch()

actions = [
  {
    "_index": "tickets-index",
    "_type": "tickets",
    "_id": j,
    "_source": {
        "any":"data" + str(j),
        "timestamp": datetime.now()}
  }
  for j in range(0, 10)
]

helpers.bulk(es, actions)
90
Justina Chen

Obwohl der Code von @justinachen mir geholfen hat, mit py-elasticsearch zu beginnen, lassen Sie mich nach einem Blick in den Quellcode eine einfache Verbesserung vornehmen:

es = Elasticsearch()
j = 0
actions = []
while (j <= 10):
    action = {
        "_index": "tickets-index",
        "_type": "tickets",
        "_id": j,
        "_source": {
            "any":"data" + str(j),
            "timestamp": datetime.now()
            }
        }
    actions.append(action)
    j += 1

helpers.bulk(es, actions)

helpers.bulk() übernimmt bereits die Segmentierung für Sie. Und mit Segmentierung meine ich die Chucks, die jedes Mal an den Server gesendet werden. Wenn Sie die Anzahl der gesendeten Dokumente reduzieren möchten, gehen Sie wie folgt vor: helpers.bulk(es, actions, chunk_size=100)

Einige nützliche Informationen zum Einstieg: 

helpers.bulk() ist nur ein Wrapper von helpers.streaming_bulk, aber der erste akzeptiert eine Liste, die ihn praktisch macht.

helpers.streaming_bulk basiert auf Elasticsearch.bulk(), so dass Sie sich keine Gedanken darüber machen müssen, was Sie wählen sollen.

In den meisten Fällen sollte also helpers.bulk () alles sein, was Sie brauchen.

41
Diolor

(Die anderen in diesem Thread erwähnten Ansätze verwenden die Python-Liste für das ES-Update, was heute keine gute Lösung ist, insbesondere wenn Sie Millionen von Daten zu ES hinzufügen müssen.)

Besserer Ansatz verwendet Python-Generatoren - Gigs von Daten verarbeiten, ohne dass der Speicher zu Ende geht oder die Geschwindigkeit stark beeinträchtigt wird.

Im Folgenden finden Sie ein Beispiel aus einem praktischen Anwendungsfall - Hinzufügen von Daten aus der nginx-Protokolldatei zu ES zur Analyse.

def decode_nginx_log(_nginx_fd):
    for each_line in _nginx_fd:
        # Filter out the below from each log line
        remote_addr = ...
        timestamp   = ...
        ...

        # Index for elasticsearch. Typically timestamp.
        idx = ...

        es_fields_keys = ('remote_addr', 'timestamp', 'url', 'status')
        es_fields_vals = (remote_addr, timestamp, url, status)

        # We return a dict holding values from each line
        es_nginx_d = dict(Zip(es_fields_keys, es_fields_vals))

        # Return the row on each iteration
        yield idx, es_nginx_d   # <- Note the usage of 'yield'

def es_add_bulk(nginx_file):
    # The nginx file can be gzip or just text. Open it appropriately.
    ...

    es = Elasticsearch(hosts = [{'Host': 'localhost', 'port': 9200}])

    # NOTE the (...) round brackets. This is for a generator.
    k = ({
            "_index": "nginx",
            "_type" : "logs",
            "_id"   : idx,
            "_source": es_nginx_d,
         } for idx, es_nginx_d in decode_nginx_log(_nginx_fd))

    helpers.bulk(es, k)

# Now, just run it.
es_add_bulk('./nginx.1.log.gz')

Dieses Skelett zeigt die Verwendung von Generatoren. Sie können dies sogar auf einem nackten Computer verwenden, wenn Sie dies benötigen. Sie können dies auch erweitern, um schnell auf Ihre Bedürfnisse zugeschnitten zu sein.

Python Elasticsearch Referenz hier .

30
Ethan

Definieren Sie den Indexnamen und den Dokumenttyp mit jeder Entität:

es_client = Elasticsearch()

body = []
for entry in entries:
    body.append({'index': {'_index': index, '_type': 'doc', '_id': entry['id']}})
    body.append(entry)

response = es_client.bulk(body=body)

Geben Sie den Standardindex und den Dokumenttyp mit der Methode an:

es_client = Elasticsearch()

body = []
for entry in entries:
    body.append({'index': {'_id': entry['id']}})
    body.append(entry)

response = es_client.bulk(index='my_index', doc_type='doc', body=body)

Arbeitet mit:

ES-Version: 6.4.0

ES python lib: 6.3.1

1
Rafal Enden