Storm Elasticsearch 集成
Storm Elasticsearch Bolt & Trident State
EdIndexBolt,EsPercolateBolt和Estate允许用户将storm中的数据直接传输到Elasticsearch。 详细说明请参考以下内容。
EsIndexBolt (org.apache.storm.elasticsearch.bolt.EsIndexBolt)
EsIndexBolt将tuples直接流入Elasticsearch索。 Tuples以指定的索引和类型组合进行索引。 用户应确保EsTupleMapper
可以从输入元组中提取“source”,“index”,“type”和“id”,“index”和“type”用于识别目标索引和类型。“source” 一个JSON格式的文档,将在Elasticsearch中编入索引。
EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
EsIndexBolt indexBolt = new EsIndexBolt(esConfig, tupleMapper);
EsPercolateBolt (org.apache.storm.elasticsearch.bolt.EsPercolateBolt)
EsPercolateBolt将tuples直接流入Elasticsearch。 tuples用于发送渗透请求到指定的索引和类型组合。 用户应该确保EsTupleMapper
可以从输入元组中提取“source”,“index”,“type”,“index”和“type”用于识别目标索引和类型,“source”是一个文档 在JSON格式的字符串将发送到渗透请求到弹性搜索。
EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig, tupleMapper);
如果存在非空的渗漏响应,EsPercolateBolt将会为PercolateResponse中每个Percolate.Match发出具有原始源和Percolate.Match的tuple。
EsState (org.apache.storm.elasticsearch.trident.EsState)
Elasticsearch Trident state也与EsBolts类似。 它将EsConfig和EsTupleMapper作为参数。
EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
EsLookupBolt (org.apache.storm.elasticsearch.bolt.EsLookupBolt)
EsLookupBolt对Elasticsearch执行获取请求。 为了做到这一点,需要满足三个依赖。 除了通常的EsConfig,还必须提供其他两个依赖关系: ElasticsearchGetRequest用于将传入的元组转换为将针对Elasticsearch执行的GetRequest。 EsLookupResultOutput用于声明输出字段,并将GetResponse转换为由bolt发出的值。
传入的tuple被传递给提供的GetRequest创建者,该执行的结果被传递给Elasticsearch客户端。 然后,bolt使用提供程序输出适配器(EsLookupResultOutput)将GetResponse转换为值以发送。 输出字段也由bolt的用户通过输出适配器(EsLookupResultOutput)指定。
EsConfig esConfig = createEsConfig();
ElasticsearchGetRequest getRequestAdapter = createElasticsearchGetRequest();
EsLookupResultOutput output = createOutput();
EsLookupBolt lookupBolt = new EsLookupBolt(esConfig, getRequestAdapter, output);
EsConfig (org.apache.storm.elasticsearch.common.EsConfig)
=提供的组件(Bolt,State)以EsConfig作为构造函数arg。
EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
or
Map<String, String> additionalParameters = new HashMap<>();
additionalParameters.put("client.transport.sniff", "true");
EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}, additionalParameters);
EsConfig params
Arg | Description | Type |
---|---|---|
clusterName | Elasticsearch cluster name | String (required) |
nodes | Elasticsearch nodes in a String array, each element should follow {host}:{port} pattern | String array (required) |
additionalParameters | Additional Elasticsearch Transport Client configuration parameters | Map <string, string="">(optional)</string,> |
EsTupleMapper (org.apache.storm.elasticsearch.common.EsTupleMapper)
对于存储在Elasticsearch中的tuple或者从Elasticsearch搜索到的tuple,我们需要定义使用哪些字段。 用户需要通过实现EsTupleMapper
定义你自己的。 Storm-elasticsearch提供了默认的mapperorg.apache.storm.elasticsearch.common.DefaultEsTupleMapper
,它从相同的字段中提取其源,索引,类型,id值。 您可以参考DefaultEsTupleMapper的实现来看看如何实现自己的。
Committer Sponsors
- Sriharsha Chintalapani (@harshach)
- Jungtaek Lim (@HeartSaVioR)