Storm流方式的统计系统怎么实现

75次阅读
没有评论

共计 11201 个字符,预计需要花费 29 分钟才能阅读完成。

本篇内容主要讲解“Storm 流方式的统计系统怎么实现”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让丸趣 TV 小编来带大家学习“Storm 流方式的统计系统怎么实现”吧!

1:初期硬件准备:

 1 如果条件具备:请保证您安装好了 redis 集群

 2 配置好您的 Storm 开发环境

     3 保证好您的开发环境的畅通:主机与主机之间,Storm 与 redis 之间

2:业务背景的介绍:

 1   在这里我们将模拟一个   流方式的数据处理过程

      2 数据的源头保存在我们的 redis 集群之中

  3   发射的数据格式为:ip,url,client_key

数据发射器

package storm.spout;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Values;
import backtype.storm.tuple.Fields;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import redis.clients.jedis.Jedis;
import storm.utils.Conf;
import java.util.Map;
import org.apache.log4j.Logger;
 * click Spout  从 redis 中间读取所需要的数据
 */
public class ClickSpout extends BaseRichSpout {
 private static final long serialVersionUID = -6200450568987812474L;
 public static Logger LOG = Logger.getLogger(ClickSpout.class);
 //  对于 redis,我们使用的是 jedis 客户端
 private Jedis jedis;
 //  主机
 private String host;
 //  端口
 private int port;
 // Spout  收集器
 private SpoutOutputCollector collector;
 @Override
 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
 // IP,URL,CLIENT_KEY outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.IP, storm.cookbook.Fields.URL, storm.cookbook.Fields.CLIENT_KEY)); @Override public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {host = conf.get(Conf.REDIS_HOST_KEY).toString(); port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString()); this.collector = spoutOutputCollector; connectToRedis(); private void connectToRedis() {jedis = new Jedis(host, port); @Override public void nextTuple() { String content = jedis.rpop( count if (content == null ||  nil .equals(content)) { try {Thread.sleep(300); } catch (InterruptedException e) { } else { //  将 jedis 对象  rpop 出来的字符串解析为  json 对象 JSONObject obj = (JSONObject) JSONValue.parse(content); String ip = obj.get(storm.cookbook.Fields.IP).toString(); String url = obj.get(storm.cookbook.Fields.URL).toString(); String clientKey = obj.get(storm.cookbook.Fields.CLIENT_KEY) .toString(); System.out.println( this is a clientKey // List Object  tuple 对象 collector.emit(new Values(ip, url, clientKey)); }

在这个过程之中,请注意:

1   我们在 OPEN 方法之中初始化   host,port,collector,以及 Redis 的连接,调用 Connect 方法并连接到 redis 数据库

2 我们在 nextTupe 取出数据,并且将他转换为一个 JSON 对象,并且拿到 ip,url,clientKey,同时将他们包装成为一个

Values 对象

让我们来看看数据的流向图:

在我们的数据从 clickSpout 读取以后,接下来,我们将采用 2 个 bolt

 1  :repeatVisitBolt 

 2   :  geographyBolt 

共同来读取同一个数据源的数据:clickSpout

3 细细察看 repeatVisitBolt

package storm.bolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import redis.clients.jedis.Jedis;
import storm.utils.Conf;
import java.util.Map;
public class RepeatVisitBolt extends BaseRichBolt {
 private OutputCollector collector;
 private Jedis jedis;
 private String host;
 private int port;
 @Override
 public void prepare(Map conf, TopologyContext topologyContext,
 OutputCollector outputCollector) {
 this.collector = outputCollector;
 host = conf.get(Conf.REDIS_HOST_KEY).toString();
 port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());
 connectToRedis();
 private void connectToRedis() {jedis = new Jedis(host, port);
 jedis.connect();
 public boolean isConnected() {if (jedis == null)
 return false;
 return jedis.isConnected();
 @Override
 public void execute(Tuple tuple) {String ip = tuple.getStringByField(storm.cookbook.Fields.IP);
 String clientKey = tuple
 .getStringByField(storm.cookbook.Fields.CLIENT_KEY);
 String url = tuple.getStringByField(storm.cookbook.Fields.URL);
 String key = url +  :  + clientKey;
 String value = jedis.get(key);
 // redis 中取,如果 redis 中没有,就插入新的一条访问记录。if (value == null) {
 jedis.set(key,  visited 
 collector.emit(new Values(clientKey, url, Boolean.TRUE.toString()));
 } else {
 collector
 .emit(new Values(clientKey, url, Boolean.FALSE.toString()));
 @Override
 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
 outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(
 storm.cookbook.Fields.CLIENT_KEY, storm.cookbook.Fields.URL,
 storm.cookbook.Fields.UNIQUE));
}

  在这里,我们把 url 和 clientKey 组合成为【url:clientKey】的格式组合,并依据这个对象,在 redis 中去查找,如果没有,那那 Set 到 redis 中间去,并且判定它为【unique】

4:

package storm.bolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class VisitStatsBolt extends BaseRichBolt {
 private OutputCollector collector;
 private int total = 0;
 private int uniqueCount = 0;
 @Override
 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
 this.collector = outputCollector;
 }
 @Override
 public void execute(Tuple tuple) {
  
  // 在这里,我们在上游来判断这个 Fields  是否是独特和唯一的
 boolean unique = Boolean.parseBoolean(tuple.getStringByField(storm.cookbook.Fields.UNIQUE));
 
 total++;
 if(unique)uniqueCount++;
 collector.emit(new Values(total,uniqueCount));
 }
 @Override
 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
 outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(storm.cookbook.Fields.TOTAL_COUNT,
  storm.cookbook.Fields.TOTAL_UNIQUE));
 }
}

第一次出现,uv ++ 

5   接下来,看看流水线 2:

package storm.bolt;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import org.json.simple.JSONObject;
import storm.cookbook.IPResolver;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
 * User: yin shaui Date: 2014/05/21 Time: 8:58 AM To change this template use
 * File | Settings | File Templates.
 */
public class GeographyBolt extends BaseRichBolt {
 // ip 解析器
 private IPResolver resolver;
 private OutputCollector collector;
 public GeographyBolt(IPResolver resolver) {
 this.resolver = resolver;
 @Override
 public void prepare(Map map, TopologyContext topologyContext,
 OutputCollector outputCollector) {
 this.collector = outputCollector;
 @Override
 public void execute(Tuple tuple) {
 // 1  从上级的目录之中拿到我们所要使用的 ip
 String ip = tuple.getStringByField(storm.cookbook.Fields.IP);
 //  将 ip  转换为 json
 JSONObject json = resolver.resolveIP(ip);
 //  将  city 和 country  组织成为一个新的元祖,在这里也就是我们的 Values 对象
 String city = (String) json.get(storm.cookbook.Fields.CITY);
 String country = (String) json.get(storm.cookbook.Fields.COUNTRY_NAME);
 collector.emit(new Values(country, city));
 @Override
 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
 //  确定了我们这次输出元祖的格式
 outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.COUNTRY,
 storm.cookbook.Fields.CITY));
}

以上 Bolt,完成了一个 Ip 到 CITY,COUNTRY 的转换

package storm.bolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
public class GeoStatsBolt extends BaseRichBolt {
 private class CountryStats {
 private int countryTotal = 0;
 private static final int COUNT_INDEX = 0;
 private static final int PERCENTAGE_INDEX = 1;
 private String countryName;
 public CountryStats(String countryName) {
 this.countryName = countryName;
 private Map String, List Integer  cityStats = new HashMap String, List Integer ();
  * @param cityName
  */
 public void cityFound(String cityName) {
 countryTotal++;
 //  已经有了值,一个加 1 的操作
 if (cityStats.containsKey(cityName)) {cityStats.get(cityName)
 .set(COUNT_INDEX,
 cityStats.get(cityName).get(COUNT_INDEX)
 .intValue() + 1);
 //  没有值的时候
 } else {
 List Integer  list = new LinkedList Integer 
 list.add(1);
 list.add(0);
 cityStats.put(cityName, list);
 double percent = (double) cityStats.get(cityName).get(COUNT_INDEX)
 / (double) countryTotal;
 cityStats.get(cityName).set(PERCENTAGE_INDEX, (int) percent);
  * @return  拿到的国家总数
  */
 public int getCountryTotal() {
 return countryTotal;
  * @param cityName  依据传入的城市名称,拿到城市总数
  * @return
  */
 public int getCityTotal(String cityName) {return cityStats.get(cityName).get(COUNT_INDEX).intValue();

public String toString() { return  Total Count for   + countryName +   is  + Integer.toString(countryTotal) +  \n  +  Cities:  + cityStats.toString(); private OutputCollector collector; // CountryStats  是一个内部类的对象 private Map String, CountryStats  stats = new HashMap String, CountryStats @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; @Override public void execute(Tuple tuple) {String country = tuple.getStringByField(storm.cookbook.Fields.COUNTRY); String city = tuple.getStringByField(storm.cookbook.Fields.CITY); //  如果国家不存在的时候,新增加一个国家,国家的统计 if (!stats.containsKey(country)) {stats.put(country, new CountryStats(country)); //  这里拿到新的统计,cityFound  是拿到某个城市的值 stats.get(country).cityFound(city); collector.emit(new Values(country, stats.get(country).getCountryTotal(), city, stats.get(country) .getCityTotal(city))); @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields( storm.cookbook.Fields.COUNTRY, storm.cookbook.Fields.COUNTRY_TOTAL, storm.cookbook.Fields.CITY, storm.cookbook.Fields.CITY_TOTAL)); }

有关地理位置的统计,附带上程序其他的使用类

package storm.cookbook;
 */
public class Fields {
 public static final String IP =  ip 
 public static final String URL =  url 
 public static final String CLIENT_KEY =  clientKey 
 public static final String COUNTRY =  country 
 public static final String COUNTRY_NAME =  country_name 
 public static final String CITY =  city 
 // 唯一的,独一无二的
 public static final String UNIQUE =  unique 
 // 城镇整数
 public static final String COUNTRY_TOTAL =  countryTotal 
 // 城市整数
 public static final String CITY_TOTAL =  cityTotal 
 // 总共计数
 public static final String TOTAL_COUNT =  totalCount 
 // 总共独一无二的
 public static final String TOTAL_UNIQUE =  totalUnique 

public class HttpIPResolver implements IPResolver, Serializable { static String url =  http://api.hostip.info/get_json.php @Override public JSONObject resolveIP(String ip) { URL geoUrl = null; BufferedReader in = null; try {geoUrl = new URL(url +  ?ip=  + ip); URLConnection connection = geoUrl.openConnection(); in = new BufferedReader(new InputStreamReader(connection.getInputStream())); String inputLine; JSONObject json = (JSONObject) JSONValue.parse(in); in.close(); return json; } catch (IOException e) {e.printStackTrace(); } finally { //  每当 in 为空的时候我们不进行如下的 close 操作,只有在 in 不为空的时候进行 close 操作 if (in != null) { try {in.close(); } catch (IOException e) {return null;}
package storm.cookbook;
import org.json.simple.JSONObject;
 * Created with IntelliJ IDEA.
 * User: admin
 * Date: 2012/12/07
 * Time: 5:29 PM
 * To change this template use File | Settings | File Templates.
 */
public interface IPResolver {public JSONObject resolveIP(String ip);
}

到此,相信大家对“Storm 流方式的统计系统怎么实现”有了更深的了解,不妨来实际操作一番吧!这里是丸趣 TV 网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计11201字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)