共计 5512 个字符,预计需要花费 14 分钟才能阅读完成。
Storm 中如何进行 Librato 的 Metric 度量的实现,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面丸趣 TV 小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
辐射性质介绍一个 Librato 的 Metric 度量的实现
package com.digitalpebble.storm.crawler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.metric.api.IMetricsConsumer;
import backtype.storm.task.IErrorReporter;
import backtype.storm.task.TopologyContext;
import com.librato.metrics.HttpPoster;
import com.librato.metrics.HttpPoster.Response;
import com.librato.metrics.LibratoBatch;
import com.librato.metrics.NingHttpPoster;
import com.librato.metrics.Sanitizer;
import com.librato.metrics.Versions;
/** Sends the metrics to Librato **/
public class LibratoMetricsConsumer implements IMetricsConsumer {
public static final int DEFAULT_BATCH_SIZE = 500;
private static final Logger LOG = LoggerFactory
.getLogger(LibratoMetricsConsumer.class);
private static final String LIB_VERSION = Versions.getVersion(
META-INF/maven/com.librato.metrics/librato-java/pom.properties ,
LibratoBatch.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final Sanitizer sanitizer = new Sanitizer() {public String apply(String name) {return Sanitizer.LAST_PASS.apply(name);
private int postBatchSize = DEFAULT_BATCH_SIZE;
private long timeout = 30;
private final TimeUnit timeoutUnit = TimeUnit.SECONDS;
private String userAgent = null;
private HttpPoster httpPoster;
private Set String metricsToKeep = new HashSet String
public void prepare(Map stormConf, Object registrationArgument,
TopologyContext context, IErrorReporter errorReporter) {
// TODO configure timeouts
// this.timeout = timeout;
// this.timeoutUnit = timeoutUnit;
// this.postBatchSize = postBatchSize;
String agentIdentifier = (String) stormConf.get( librato.agent
if (agentIdentifier == null)
agentIdentifier = storm
String token = (String) stormConf.get( librato.token
String username = (String) stormConf.get( librato.username
String apiUrl = (String) stormConf.get( librato.api.url
if (apiUrl == null)
apiUrl = https://metrics-api.librato.com/v1/metrics
// check that the values are not null
if (StringUtils.isBlank(token))
throw new RuntimeException( librato.token not set
if (StringUtils.isBlank(username))
throw new RuntimeException( librato.username not set
this.userAgent = String.format( %s librato-java/%s , agentIdentifier,
LIB_VERSION);
this.httpPoster = NingHttpPoster.newPoster(username, token, apiUrl);
// get the list of metrics names to keep if any
String metrics2keep = (String) stormConf.get( librato.metrics.to.keep
if (metrics2keep != null) {String[] mets = metrics2keep.split( ,
for (String m : mets)
metricsToKeep.add(m.trim().toLowerCase());
// post(String source, long epoch)
public void handleDataPoints(TaskInfo taskInfo,
Collection DataPoint dataPoints) {
final Map String, Object payloadMap = new HashMap String, Object
payloadMap.put( source , taskInfo.srcComponentId + _
+ taskInfo.srcWorkerHost + _ + taskInfo.srcTaskId);
payloadMap.put(measure_time , taskInfo.timestamp);
final List Map String, Object gaugeData = new ArrayList Map String, Object ();
final List Map String, Object counterData = new ArrayList Map String, Object ();
int counter = 0;
final Iterator DataPoint datapointsIterator = dataPoints.iterator();
while (datapointsIterator.hasNext()) {final DataPoint dataPoint = datapointsIterator.next();
// ignore datapoint with a value which is not a map
if (!(dataPoint.value instanceof Map))
continue;
// a counter or a gauge
// convention if its name contains _counter
// then treat it as a counter
boolean isCounter = false;
if (dataPoint.name.contains( _counter)) {
isCounter = true;
dataPoint.name = dataPoint.name.replaceFirst( _counter ,
if (!metricsToKeep.isEmpty()) {if (!metricsToKeep.contains(dataPoint.name.toLowerCase())) {
continue;
try {Map String, Number metric = (Map String, Number) dataPoint.value;
for (Map.Entry String, Number entry : metric.entrySet()) {String metricId = entry.getKey();
Number val = entry.getValue();
final Map String, Object data = new HashMap String, Object
data.put( name ,
sanitizer.apply(dataPoint.name + _ + metricId));
data.put(value , val);
if (isCounter)
counterData.add(data);
else
// use as gauge
gaugeData.add(data);
counter++;
if (counter % postBatchSize == 0
|| (!datapointsIterator.hasNext() (!counterData
.isEmpty() || !gaugeData.isEmpty()))) {
final String countersKey = counters
final String gaugesKey = gauges
payloadMap.put(countersKey, counterData);
payloadMap.put(gaugesKey, gaugeData);
postPortion(payloadMap);
payloadMap.remove(gaugesKey);
payloadMap.remove(countersKey);
gaugeData.clear();
counterData.clear();} catch (RuntimeException e) {LOG.error(e.getMessage());
LOG.debug(Posted {} measurements , counter);
public void cleanup() {private void postPortion(Map String, Object chunk) {
try {final String payload = OBJECT_MAPPER.writeValueAsString(chunk);
final Future Response future = httpPoster.post(userAgent, payload);
final Response response = future.get(timeout, timeoutUnit);
final int statusCode = response.getStatusCode();
if (statusCode 200 || statusCode = 300) {
LOG.error(Received an error from Librato API. Code : {}, Message: {} ,
statusCode, response.getBody());
} catch (Exception e) {LOG.error( Unable to post to Librato API , e);
}
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注丸趣 TV 行业资讯频道,感谢您对丸趣 TV 的支持。
正文完