博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ElasticSearch的基本用法与集群搭建
阅读量:5789 次
发布时间:2019-06-18

本文共 10904 字,大约阅读时间需要 36 分钟。

一、简介

ElasticSearch和Solr都是基于Lucene的搜索引擎,不过ElasticSearch天生支持分布式,而Solr是4.0版本后的SolrCloud才是分布式版本,Solr的分布式支持需要ZooKeeper的支持。

这里有一个详细的ElasticSearch和Solr的对比:http://solr-vs-elasticsearch.com/

二、基本用法

Elasticsearch集群可以包含多个索引(indices),每一个索引可以包含多个类型(types),每一个类型包含多个文档(documents),然后每个文档包含多个字段(Fields),这种面向文档型的储存,也算是NoSQL的一种吧。

ES比传统关系型数据库,对一些概念上的理解:

Relational DB -> Databases -> Tables -> Rows -> ColumnsElasticsearch -> Indices   -> Types  -> Documents -> Fields

从创建一个Client到添加、删除、查询等基本用法:

1、创建Client

public ElasticSearchService(String ipAddress, int port) {        client = new TransportClient()                .addTransportAddress(new InetSocketTransportAddress(ipAddress,                        port));    }

这里是一个TransportClient。

ES下两种客户端对比:

TransportClient:轻量级的Client,使用Netty线程池,Socket连接到ES集群。本身不加入到集群,只作为请求的处理。

Node Client:客户端节点本身也是ES节点,加入到集群,和其他ElasticSearch节点一样。频繁的开启和关闭这类Node Clients会在集群中产生“噪音”。

2、创建/删除Index和Type信息

// 创建索引    public void createIndex() {        client.admin().indices().create(new CreateIndexRequest(IndexName))                .actionGet();    }    // 清除所有索引    public void deleteIndex() {        IndicesExistsResponse indicesExistsResponse = client.admin().indices()                .exists(new IndicesExistsRequest(new String[] { IndexName }))                .actionGet();        if (indicesExistsResponse.isExists()) {            client.admin().indices().delete(new DeleteIndexRequest(IndexName))                    .actionGet();        }    }        // 删除Index下的某个Type    public void deleteType(){        client.prepareDelete().setIndex(IndexName).setType(TypeName).execute().actionGet();    }    // 定义索引的映射类型    public void defineIndexTypeMapping() {        try {            XContentBuilder mapBuilder = XContentFactory.jsonBuilder();            mapBuilder.startObject()            .startObject(TypeName)                .startObject("properties")                    .startObject(IDFieldName).field("type", "long").field("store", "yes").endObject()                    .startObject(SeqNumFieldName).field("type", "long").field("store", "yes").endObject()                    .startObject(IMSIFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()                    .startObject(IMEIFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()                    .startObject(DeviceIDFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()                    .startObject(OwnAreaFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()                    .startObject(TeleOperFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()                    .startObject(TimeFieldName).field("type", "date").field("store", "yes").endObject()                .endObject()            .endObject()            .endObject();            PutMappingRequest putMappingRequest = Requests                    .putMappingRequest(IndexName).type(TypeName)                    .source(mapBuilder);            client.admin().indices().putMapping(putMappingRequest).actionGet();        } catch (IOException e) {            log.error(e.toString());        }    }

这里自定义了某个Type的索引映射(Mapping),默认ES会自动处理数据类型的映射:针对整型映射为long,浮点数为double,字符串映射为string,时间为date,true或false为boolean。

注意:针对字符串,ES默认会做“analyzed”处理,即先做分词、去掉stop words等处理再index。如果你需要把一个字符串做为整体被索引到,需要把这个字段这样设置:field("index", "not_analyzed")。

详情参考:https://www.elastic.co/guide/en/elasticsearch/guide/current/mapping-intro.html

3、索引数据

// 批量索引数据    public void indexHotSpotDataList(List
dataList) { if (dataList != null) { int size = dataList.size(); if (size > 0) { BulkRequestBuilder bulkRequest = client.prepareBulk(); for (int i = 0; i < size; ++i) { Hotspotdata data = dataList.get(i); String jsonSource = getIndexDataFromHotspotData(data); if (jsonSource != null) { bulkRequest.add(client .prepareIndex(IndexName, TypeName, data.getId().toString()) .setRefresh(true).setSource(jsonSource)); } } BulkResponse bulkResponse = bulkRequest.execute().actionGet(); if (bulkResponse.hasFailures()) { Iterator
iter = bulkResponse.iterator(); while (iter.hasNext()) { BulkItemResponse itemResponse = iter.next(); if (itemResponse.isFailed()) { log.error(itemResponse.getFailureMessage()); } } } } } } // 索引数据 public boolean indexHotspotData(Hotspotdata data) { String jsonSource = getIndexDataFromHotspotData(data); if (jsonSource != null) { IndexRequestBuilder requestBuilder = client.prepareIndex(IndexName, TypeName).setRefresh(true); requestBuilder.setSource(jsonSource) .execute().actionGet(); return true; } return false; } // 得到索引字符串 public String getIndexDataFromHotspotData(Hotspotdata data) { String jsonString = null; if (data != null) { try { XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); jsonBuilder.startObject().field(IDFieldName, data.getId()) .field(SeqNumFieldName, data.getSeqNum()) .field(IMSIFieldName, data.getImsi()) .field(IMEIFieldName, data.getImei()) .field(DeviceIDFieldName, data.getDeviceID()) .field(OwnAreaFieldName, data.getOwnArea()) .field(TeleOperFieldName, data.getTeleOper()) .field(TimeFieldName, data.getCollectTime()) .endObject(); jsonString = jsonBuilder.string(); } catch (IOException e) { log.equals(e); } } return jsonString; }

ES支持批量和单个数据索引。

4、查询获取数据

// 获取少量数据100个    private List
getSearchData(QueryBuilder queryBuilder) { List
ids = new ArrayList<>(); SearchResponse searchResponse = client.prepareSearch(IndexName) .setTypes(TypeName).setQuery(queryBuilder).setSize(100) .execute().actionGet(); SearchHits searchHits = searchResponse.getHits(); for (SearchHit searchHit : searchHits) { Integer id = (Integer) searchHit.getSource().get("id"); ids.add(id); } return ids; } // 获取大量数据 private List
getSearchDataByScrolls(QueryBuilder queryBuilder) { List
ids = new ArrayList<>(); // 一次获取100000数据 SearchResponse scrollResp = client.prepareSearch(IndexName) .setSearchType(SearchType.SCAN).setScroll(new TimeValue(60000)) .setQuery(queryBuilder).setSize(100000).execute().actionGet(); while (true) { for (SearchHit searchHit : scrollResp.getHits().getHits()) { Integer id = (Integer) searchHit.getSource().get(IDFieldName); ids.add(id); } scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()) .setScroll(new TimeValue(600000)).execute().actionGet(); if (scrollResp.getHits().getHits().length == 0) { break; } } return ids; }

这里的QueryBuilder是一个查询条件,ES支持分页查询获取数据,也可以一次性获取大量数据,需要使用Scroll Search。

5、聚合(Aggregation Facet)查询 

// 得到某段时间内设备列表上每个设备的数据分布情况
<设备id,数量>
public Map
getDeviceDistributedInfo(String startTime, String endTime, List
deviceList) { Map
resultsMap = new HashMap<>(); QueryBuilder deviceQueryBuilder = getDeviceQueryBuilder(deviceList); QueryBuilder rangeBuilder = getDateRangeQueryBuilder(startTime, endTime); QueryBuilder queryBuilder = QueryBuilders.boolQuery() .must(deviceQueryBuilder).must(rangeBuilder); TermsBuilder termsBuilder = AggregationBuilders.terms("DeviceIDAgg").size(Integer.MAX_VALUE) .field(DeviceIDFieldName); SearchResponse searchResponse = client.prepareSearch(IndexName) .setQuery(queryBuilder).addAggregation(termsBuilder) .execute().actionGet(); Terms terms = searchResponse.getAggregations().get("DeviceIDAgg"); if (terms != null) { for (Terms.Bucket entry : terms.getBuckets()) { resultsMap.put(entry.getKey(), String.valueOf(entry.getDocCount())); } } return resultsMap; }

Aggregation查询可以查询类似统计分析这样的功能:如某个月的数据分布情况,某类数据的最大、最小、总和、平均值等。

详情参考:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-aggs.html

三、集群配置

配置文件elasticsearch.yml

集群名和节点名:

#cluster.name: elasticsearch

#node.name: "Franz Kafka"

是否参与master选举和是否存储数据

#node.master: true

#node.data: true

分片数和副本数

#index.number_of_shards: 5

#index.number_of_replicas: 1

master选举最少的节点数,这个一定要设置为整个集群节点个数的一半加1,即N/2+1

#discovery.zen.minimum_master_nodes: 1

discovery ping的超时时间,拥塞网络,网络状态不佳的情况下设置高一点

#discovery.zen.ping.timeout: 3s

注意,分布式系统整个集群节点个数N要为奇数个!!

如何避免ElasticSearch发生脑裂(brain split):http://blog.trifork.com/2013/10/24/how-to-avoid-the-split-brain-problem-in-elasticsearch/

即使集群节点个数为奇数,minimum_master_nodes为整个集群节点个数一半加1,也难以避免脑裂的发生,详情看讨论:https://github.com/elastic/elasticsearch/issues/2488

四、Elasticsearch插件

1、elasticsearch-head是一个elasticsearch的集群管理工具:./elasticsearch-1.7.1/bin/plugin -install mobz/elasticsearch-head

2、elasticsearch-sql:使用SQL语法查询elasticsearch:./bin/plugin -u https://github.com/NLPchina/elasticsearch-sql/releases/download/1.3.5/elasticsearch-sql-1.3.5.zip --install sql

github地址:

3、elasticsearch-bigdesk是elasticsearch的一个集群监控工具,可以通过它来查看ES集群的各种状态。

安装:./bin/plugin -install lukas-vlcek/bigdesk

访问:,

4、elasticsearch-servicewrapper插件是ElasticSearch的服务化插件,

在https://github.com/elasticsearch/elasticsearch-servicewrapper下载该插件后,解压缩,将service目录拷贝到elasticsearch目录的bin目录下。

而后,可以通过执行以下语句安装、启动、停止ElasticSearch:

sh elasticsearch install

sh elasticsearch start

sh elasticsearch stop

 

参考:

https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html

https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html

http://stackoverflow.com/questions/10213009/solr-vs-elasticsearch

 

作者:
出处:
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
http://www.cnblogs.com/luxiaoxun/p/4869509.html
你可能感兴趣的文章
6倍性能差100TB容量,阿里云POLARDB咋实现?
查看>>
Sublime Text 2 技巧
查看>>
使用fscanf()函数从磁盘文件读取格式化数据
查看>>
参加婚礼
查看>>
h5 audio相关手册
查看>>
刚毕业从事java开发需要掌握的技术
查看>>
CSS Custom Properties 自定义属性
查看>>
vim
查看>>
MVVM计算器(下)
查看>>
C++中指针和引用的区别
查看>>
簡單分稀 iptables 記錄 udp 微軟 138 端口
查看>>
Java重写equals方法和hashCode方法
查看>>
Spark API编程动手实战-07-join操作深入实战
查看>>
H3C-路由策略
查看>>
centos 修改字符界面分辨率
查看>>
LNMP之Mysql主从复制(四)
查看>>
阅读Spring源代码(1)
查看>>
grep 命令
查看>>
JS二维数组的声明和使用
查看>>
v$archive_gap dg dataguard 断档处理 scn恢复
查看>>