博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
storm trident的filter和函数
阅读量:6791 次
发布时间:2019-06-26

本文共 8745 字,大约阅读时间需要 29 分钟。

目的:通过kafka输出的信息进行过滤,添加指定的字段后,进行打印

SentenceSpout:

package Trident;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Values;import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.serializer.StringDecoder;import kafka.utils.VerifiableProperties;/** * 从kafka获取数据 spout发射 * @author BFD-593 * */public class SentenceSpout extends BaseRichSpout{	//TODO	private SpoutOutputCollector collector;	private ConsumerConnector consumer;	private int index=0;	@Override	public void nextTuple() {		Map
topicCountMap = new HashMap
(); topicCountMap.put("helloworld", new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map
>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); KafkaStream
stream = consumerMap.get("helloworld").get(0); ConsumerIterator
it = stream.iterator(); int messageCount = 0; while (it.hasNext()){ String string = it.next().message().toString()+" 1"+" 2"; String name = string.split(" ")[0]; String value = string.split(" ")[1]==null?"":string.split(" ")[1]; String value2= string.split(" ")[2]==null?"":string.split(" ")[2]; this.collector.emit(new Values(name,value,value2)); } } @Override public void open(Map map, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; Properties props = new Properties(); // zookeeper 配置 props.put("zookeeper.connect", "192.168.170.185:2181"); // 消费者所在组 props.put("group.id", "testgroup"); // zk连接超时 props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); // 序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig config = new ConsumerConfig(props); this.consumer = Consumer.createJavaConsumerConnector(config); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { Fields field = new Fields("name", "sentence","sentence2"); declarer.declare(field); }}

FunctionBolt:

package Trident;		import org.apache.storm.trident.operation.BaseFunction;	import org.apache.storm.trident.operation.TridentCollector;	import org.apache.storm.trident.tuple.TridentTuple;	import org.apache.storm.tuple.Values;	/**	 * trident的函数操作:将spout发射的数据,添加一个fileds   gender的	 * 它不会替换掉原来的tuple	 * @author BFD-593	 *	 */	public class FunctionBolt extends BaseFunction{				@Override		public void execute(TridentTuple tuple, TridentCollector collector) {			String str = tuple.getStringByField("name");			if(str.equals("a")){				collector.emit(new Values("男"));			}else{				collector.emit(new Values("女"));			}		}		}

MyFilter:

 

package Trident;import java.util.Map;import org.apache.storm.trident.operation.BaseFilter;import org.apache.storm.trident.operation.TridentOperationContext;import org.apache.storm.trident.tuple.TridentTuple;/** * trident的过滤操作:将spout的发送的tuple,过滤掉fields0是a并且fields1是b的tuple * @author BFD-593 * */public class MyFilter extends BaseFilter{	private TridentOperationContext context;		@Override	public void prepare(Map conf, TridentOperationContext context) {		super.prepare(conf, context);		this.context = context;	}	@Override	public boolean isKeep(TridentTuple tuple) {		String name = tuple.getStringByField("name");		String value = tuple.getStringByField("sentence");		return (!"a".equals(name))||(!"b".equals(value));	}}

PrintFilter:

package Trident;import java.util.Iterator;import java.util.Map;import org.apache.storm.trident.operation.BaseFilter;import org.apache.storm.trident.operation.TridentOperationContext;import org.apache.storm.trident.tuple.TridentTuple;import org.apache.storm.tuple.Fields;/** * 过滤打印所有的fields以及值  * @author BFD-593 * */public class PrintFilter extends BaseFilter{	private TridentOperationContext context = null;		@Override	public void prepare(Map conf, TridentOperationContext context) {		super.prepare(conf, context);		this.context = context;	}		@Override	public boolean isKeep(TridentTuple tuple) {		Fields fields = tuple.getFields();		Iterator
iterator = fields.iterator(); String str = ""; while(iterator.hasNext()){ String next = iterator.next(); Object value = tuple.getValueByField(next); str = str + next +":"+ value+","; } System.out.println(str); return true; }}

TopologyTrident:

package Trident;import org.apache.kafka.common.utils.Utils;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.trident.TridentTopology;import org.apache.storm.trident.operation.builtin.Count;import org.apache.storm.tuple.Fields;/** * trident的过滤操作、函数操作、分驱聚合操作 * @author BFD-593 * */public class TopologyTrident {	public static void main(String[] args) {		SentenceSpout spout = new SentenceSpout();				TridentTopology topology = new TridentTopology();		topology.newStream("spout", spout).each(new Fields("name"),new FunctionBolt(),new Fields("gender")).each(new Fields("name","sentence"), new MyFilter())		.each(new Fields("name","sentence","sentence2","gender"), new PrintFilter());				Config conf = new Config();				LocalCluster clu = new LocalCluster();		clu.submitTopology("mytopology", conf, topology.build());				Utils.sleep(100000000);		clu.killTopology("mytopology");		clu.shutdown();			}}

  

 

 

package Trident;

import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;
import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Values;
import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.serializer.StringDecoder;import kafka.utils.VerifiableProperties;
/** * 从kafka获取数据 spout发射 * @author BFD-593 * */public class SentenceSpout extends BaseRichSpout{//TODOprivate SpoutOutputCollector collector;private ConsumerConnector consumer;private int index=0;@Overridepublic void nextTuple() {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();          topicCountMap.put("helloworld", new Integer(1));            StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());          StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());          Map<String, List<KafkaStream<String, String>>> consumerMap =           consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);          KafkaStream<String, String> stream = consumerMap.get("helloworld").get(0);          ConsumerIterator<String, String> it = stream.iterator();                   int messageCount = 0;          while (it.hasNext()){          String string = it.next().message().toString()+" 1"+" 2";        String name = string.split(" ")[0];        String value = string.split(" ")[1]==null?"":string.split(" ")[1];        String value2= string.split(" ")[2]==null?"":string.split(" ")[2];            this.collector.emit(new Values(name,value,value2));        }  }
@Overridepublic void open(Map map, TopologyContext context, SpoutOutputCollector collector) {this.collector =  collector;Properties props = new Properties(); // zookeeper 配置          props.put("zookeeper.connect", "192.168.170.185:2181");            // 消费者所在组          props.put("group.id", "testgroup");            // zk连接超时          props.put("zookeeper.session.timeout.ms", "4000");          props.put("zookeeper.sync.time.ms", "200");          props.put("auto.commit.interval.ms", "1000");          props.put("auto.offset.reset", "smallest");                    // 序列化类          props.put("serializer.class", "kafka.serializer.StringEncoder");            ConsumerConfig config = new ConsumerConfig(props);  this.consumer = Consumer.createJavaConsumerConnector(config);}
@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {Fields field = new Fields("name", "sentence","sentence2");declarer.declare(field);}
}

 

转载于:https://www.cnblogs.com/wangjing666/p/6913047.html

你可能感兴趣的文章
oracle 查询用户权限
查看>>
MySQL中视图、事务、触发器、索引等操作的基本使用
查看>>
Daily Scrum - 11/13
查看>>
SEO之图片优化
查看>>
linux关机重启命令
查看>>
Python处理word文件
查看>>
gcc6.3的安装
查看>>
通过response对象的sendRedirect方法重定向网页
查看>>
如何解决arcmap中的反走样问题。
查看>>
C++基础之函数和作用域
查看>>
Android 关于在ScrollView中加上一个ListView,ListView内容显示不完全(总是显示第一项)的问题的两种简单的解决方案...
查看>>
【转】asp.net(c#)加密解密算法之sha1、md5、des、aes实现源码详解
查看>>
sql标识符和格式
查看>>
LB 面试
查看>>
调用WebService DataTable类型方法
查看>>
html中加载外部字体
查看>>
c++在函数后面加const
查看>>
基类中定义的虚函数,子类中必须要覆盖吗?为什么?
查看>>
OGL 客户端API(一)
查看>>
ADF_Advanced ADF系列3_启用Fusion应用的安全性(Part1)
查看>>