需求大概是:需要實時地推送日誌系統的日誌訊息,提供給其他同事查詢使用。當前時間查詢前一分鐘的資料,因為資料量大,考慮用es的scroll_scan方法。
**:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""查詢es資料 demo
由於es資料存在一定延遲, 所以統計一分鐘前的資料.
"""統一的request.gets函式藉口,加上異常處理.
note: 適用於返回資料為json字串的cgi介面.
"""try:
r = requests.get(url=url, params=params, timeout=timeout)
if r.ok:
ret = r.json()
return ret
else:
logger.error(' faild, code: , cause: '\
.format(url, r.status_code, r.text[:200]))
except requests.exceptions.connectionerror:
logger.exception('connection error: %s' % (url, ))
except requests.exceptions.requestexception:
logger.exception('request error'.format(url))
return {}
def requests_post(url, data, timeout=2):
"""統一的requests.post函式介面,加上異常處理.
note: 適用於返回資料為json字串的cgi介面.
"""try:
r = requests.post(url=url, data=data, timeout=timeout)
if r.ok:
ret = r.json()
return ret
else:
logger.error(' faild, code: , cause: '\
.format(url, r.status_code, r.text))
except requests.exceptions.connectionerror:
logger.exception('connection error: %s' % (url, ))
except exception:
logger.exception('request error'.format(url))
return {}
def gen_index(date, name="logstash"):
return '-'.format(name, date.strftime('%y.%m.%d'))
def get_exact_index_name(from_time, to_time, name="logstash"):
"""獲取精確的index名稱"""
from_time -= datetime.timedelta(hours=8)
to_time -= datetime.timedelta(hours=8)
day = to_time.day - from_time.day
if day >= 1:
indexs =
for idx in range(day + 1):
index_name = ",".join(indexs)
else:
index_name = gen_index(to_time, name)
return index_name
def get_query_data(from_time, to_time, should_terms):
should =
for item in should_terms:
query_template = ,
},"query": }}
}}
}return json.dumps(query_template)
def get_type_data(from_time, to_time, type_name, size=500):
index_name = get_exact_index_name(from_time, to_time)
initial_url = es_url_pre + "//_search/?scroll=2m&size=&search_type=scan".format(index_name, type_name, size)
messages, counts = , 0
should_terms = [, ]
data = get_query_data(from_time.strftime("%y-%m-%dt%h:%m:%s"),
to_time.strftime("%y-%m-%dt%h:%m:%s"),
should_terms)
rets = requests_post(initial_url, data, timeout=2)
if not rets:
return messages, counts
scroll_id, counts = rets.get("_scroll_id", ""), rets.get("hits", ).get("total", 0)
if not counts:
return messages, counts
scroll_url = es_url_pre + "_search/scroll?"
while true:
params =
res = request_get(scroll_url, params=params, timeout=1)
hits = res.get("hits", {}).get("hits", )
if not hits:
break
for hit in hits:
scroll_id = res.get("_scroll_id", "")
return messages, counts
def main(from_time, to_time):
type_name = "bilog"
size = 1000
messages, counts = get_type_data(from_time, to_time, type_name, size=size)
return messages, counts
if __name__ == "__main__":
start_time = datetime.datetime.now()
to_time = start_time.replace(second=0, microsecond=0) \
- datetime.timedelta(minutes=1)
from_time = (to_time - datetime.timedelta(minutes=1))
messages, counts = main(from_time, to_time)
end_time = datetime.datetime.now()
print end_time-start_time
es 父子查詢 es父子文件建立查詢
一 準備 1,elasticsearch 5.6.9 2,kibana 5.6.9 3,jdk1.8 二 建立索引,文件 1建立資料庫put database?pretty station stationname 三 填充資料 插入父文件一條記錄 插入id 1的6路post database lin...
ES查詢語句
1.萬用字元查詢keyword欄位 不會建分詞索引,會建索引 2.刪除並釋放磁碟空間 post monitor delete by query 1.查詢你要刪除的doc資料 以2019 5 18 00 00 00時間節點和time欄位為例 具體 如下 monitor search post 2.手動...
ES 常用查詢
1.term精確查詢,實際上是包含的意思 用法一 與bool,filter使用 get zf en search 用法二 直接term查詢 get zf en search 2.bulk 批量寫入,注意,必須指定 id,須換行 如果 id存在,執行的是update操作 3.組合查詢,bool 布林 ...