001package org.jpos.metrics.iso; 002 003import io.micrometer.core.instrument.*; 004import org.jpos.core.Configurable; 005import org.jpos.core.Configuration; 006import org.jpos.core.ConfigurationException; 007import org.jpos.core.Environment; 008import org.jpos.iso.ISOMsg; 009import org.jpos.metrics.MeterFactory; 010import org.jpos.metrics.MeterInfo; 011import org.jpos.util.LogEvent; 012import org.jpos.util.LogSource; 013import org.jpos.util.Logger; 014 015import java.util.*; 016import java.util.concurrent.ConcurrentHashMap; 017import java.util.function.Function; 018import java.util.stream.Collectors; 019import java.util.stream.Stream; 020 021/** Counts ISO messages using Micrometer meters and tags resolved from message fields. */ 022public class ISOMsgCounter implements ISOMsgMetrics, LogSource, Configurable { 023 private final static Map<String, Function<ISOMsg,String>> aliases= Map.of( 024 "mti", (m) -> m.getString(0), 025 "rc", (m) -> m.getString(39), 026 "scheme", (m) -> m.getString("113.66"), // jPOS-CMF field 027 "isemv", (m) -> Boolean.toString(m.hasField(55)), 028 "ttype", ISOMsgCounter::getTtype, 029 "itc", ISOMsgCounter::getITC 030 ); 031 032 private MeterRegistry registry; 033 034 // Store my meters for safe removal later on. Meters are uniquely identified by their Meter.Id. 035 private final Set<Meter> meters = ConcurrentHashMap.newKeySet(); 036 private boolean frozen = false; 037 038 // custom properties 039 private String metricName; 040 private String metricDescription; 041 private Tags tags = Tags.empty(); 042 private final Map<String,String> fieldSet = new HashMap<>(); 043 044 private Logger logger; 045 private String realm; 046 047 /** Default constructor. 048 * @throws ConfigurationException if configuration fails 049 */ 050 public ISOMsgCounter() throws ConfigurationException { 051 // Configure initial **default global** tags and fieldsets from env vars. 052 053 // Custom tags are added to the Meter. 054 // Syntax: comma/space separated entries of the form "tag:value" or just "tag" . 055 var envTags = Environment.get("${"+ENV_CHANNEL_TAGS+"}", DEFAULT_TAGS); 056 var tagMap = parseTagPairs(envTags, false); 057 tagMap.forEach((k,v) -> tags = tags.and(k,v)); 058 059 // Fieldsets are tags and values taken from the ISOMsg 060 // Syntax: comma/space separated entries of the form "alias, tag:alias, tag:isofield". 061 var envFields = Environment.get("${"+ENV_CHANNEL_FIELDS+"}", DEFAULT_CHANNEL_FIELDS); 062 var fieldsMap = parseTagPairs(envFields, true); 063 validateFieldSetMap(fieldsMap); 064 fieldSet.putAll(fieldsMap); 065 } 066 067 068 /** 069 * Returns the metric meters. 070 * @return This overrides the default implementation, also including the keys from the internal field set. 071 */ 072 @Override 073 public String getMetricSignature() { 074 List<String> keys = new ArrayList<>(fieldSet.size()*2); 075 tags.forEach(t -> keys.add(t.getKey())); 076 fieldSet.forEach((k,_) -> keys.add(k)); 077 return getMetricName()+"|"+ 078 keys.stream().sorted().collect(Collectors.joining(",")); 079 } 080 081 082 @Override 083 public String getMetricName() { 084 return metricName != null ? metricName : DEFAULT_CHANNEL_METRIC_NAME; 085 } 086 @Override 087 public void setMetricName(String metricName) { 088 throwIfFrozen(true); 089 Objects.requireNonNull(metricName, "Metric name can't be null"); 090 this.metricName = metricName; 091 } 092 093 094 public String getMetricDescription() { 095 return metricDescription != null ? metricDescription : ""; 096 } 097 public void setMetricDescription(String metricDescription) { 098 this.metricDescription = metricDescription; 099 } 100 101 @Override 102 public Tags addTags(Tags tags) { 103 throwIfFrozen(true); 104 if (!meters.isEmpty()) { 105 String name = tags.stream() 106 .filter(t->"name".equals(t.getKey())) 107 .map(Tag::getValue).findAny() 108 .orElse(getMetricName()); 109 throw new IllegalStateException("ISOMsgCounter "+name+" can't add tags after started"); 110 } 111 return (this.tags= this.tags.and(tags)); 112 } 113 public Tags getTags() { 114 return Tags.of(tags); 115 } 116 117 @Override 118 public void recordMessage(ISOMsg m) { 119 if (registry != null && m != null) { 120 throwIfFrozen(false); 121 Tags ft = resolveFieldTags(m, fieldSet); 122 Counter c = MeterFactory.updateCounter(registry, 123 getMetricName(), 124 tags.and(ft), 125 getMetricDescription()); 126 meters.add(c); 127 c.increment(); 128 } 129 } 130 131 @Override 132 public void recordMessage(ISOMsg m, MeterInfo meterInfo) { 133 if (registry != null && m != null) { 134 throwIfFrozen(false); 135 Tags ft = resolveFieldTags(m, fieldSet); 136 String myName = getMetricName(); 137 138 Counter c; 139 if (!DEFAULT_CHANNEL_METRIC_NAME.equals(myName)) 140 c = MeterFactory.updateCounter(registry, 141 getMetricName(), 142 meterInfo.add(tags).and(ft), // allow our tags to override meterInfo's 143 getMetricDescription()); 144 else 145 c = MeterFactory.updateCounter(registry, meterInfo, tags.and(ft)); 146 meters.add(c); 147 c.increment(); 148 } 149 } 150 151 152 @Override 153 public boolean register(MeterRegistry registry) { 154 Objects.requireNonNull(registry, "Null registry passed to register() method."); 155 this.registry = registry; 156 frozen = true; 157 return true; 158 } 159 160 @Override 161 public void unregister() { 162 removeMeters(); 163 registry = null; 164 } 165 166 167 @Override 168 public MeterRegistry getRegistry() { 169 return registry; 170 } 171 172 173 @Override 174 public void removeMeters() { 175 if (registry != null) { 176 LogEvent evt = logger != null ? new LogEvent(this, "info", "Removing meters: ") : null; 177 178 // flag will make new recordMessage calls fail, in a NON-thread safe way 179 // but this is normally called after the channel is being stopped anyway 180 frozen = false; 181 meters.forEach(m -> { 182 if (evt != null) evt.addMessage(m.getId()); 183 registry.remove(m); 184 }); 185 meters.clear(); 186 if (evt != null) 187 Logger.log(evt); 188 } 189 } 190 191 // ============= configuration ============= 192 193 @Override 194 public void setConfiguration(Configuration cfg) throws ConfigurationException { 195 String name = cfg.get("name", null); 196 if (name != null) 197 setMetricName(name); 198 boolean customName = !DEFAULT_CHANNEL_METRIC_NAME.equals(getMetricName()); 199 200 setMetricDescription(cfg.get("description", null)); 201 202 // Process custom tag overrides (global defaults were handled in constructor). 203 // Custom config overrides can only override the values of the pre-existing global env tags. 204 // New tags can't be added, global tags can't be removed. 205 // 206 // Exception: If this class has a custom metric name, then it can define its own tag set. 207 boolean hasTags = cfg.get("tags", null) != null; 208 if (customName && hasTags) 209 tags = Tags.empty(); // start afresh if custom metric has tags 210 211 var currTags = getTagsAsMap(); 212 var ovrMap = parseTagPairs(cfg.get("tags", ""), false); 213 for (var ent : ovrMap.entrySet()) { 214 if (currTags.containsKey(ent.getKey()) || customName) // if known tag, or custom name 215 currTags.put(ent.getKey(), ent.getValue()); // then allow override! 216 else 217 throw new ConfigurationException("Attempt to add unknown metric tag: '"+ent.getKey()+"'"); 218 } 219 currTags.forEach((k,v) -> tags = tags.and(k,v)); // add/override all custom tags to our tags 220 221 222 // Process custom isofield overrides (global defaults were handled in constructor). 223 // Custom overrides can only override pre-existing env tags, unless this class has a custom metric name. 224 boolean hasFields = cfg.get("fields", null) != null; 225 if (customName && hasFields) 226 fieldSet.clear(); // start afresh if custom metric has fields 227 228 var fieldsOvrMap = parseTagPairs(cfg.get("fields", ""), true); 229 validateFieldSetMap(fieldsOvrMap); 230 for (var ent : fieldsOvrMap.entrySet()) { 231 if (fieldSet.containsKey(ent.getKey()) || customName) // known tag, or custom metric 232 fieldSet.put(ent.getKey(), ent.getValue()); // allow, override! 233 else 234 throw new ConfigurationException( 235 "Attempt to add unknown metric isofield tag: '"+ent.getKey()+"'"); 236 } 237 } 238 239 240 241 // copySingleTag: copy the tag name as value if no colon syntax; else, set value as empty string 242 /** Parses tag-value pairs from a string. 243 * @param tp tag-pair string to parse 244 * @param copySingleTag if true, copy single tags 245 * @return the parsed tag pairs 246 */ 247 protected Map<String,String> parseTagPairs(String tp, boolean copySingleTag) { 248 Map<String,String> ret = new HashMap<>(); 249 String[] tagPairs = tp.trim().split("[, ]+"); 250 for (String pair : tagPairs) { 251 if (pair.isEmpty()) continue; // avoids possible commas at beginning of tp 252 253 String[] tv = pair.trim().split(":"); 254 ret.put(tv[0], 255 tv.length >= 2 ? tv[1] : 256 copySingleTag ? tv[0] : ""); 257 } 258 return ret; 259 } 260 261 /** Validates the field set map. 262 * @param fieldsMap the field set map to validate 263 * @throws ConfigurationException if validation fails 264 */ 265 protected void validateFieldSetMap(Map<String,String> fieldsMap) throws ConfigurationException { 266 for (var valexpr : fieldsMap.values()) { 267 boolean isField = valexpr.matches("^[0-9]+(\\.[0-9]+)*$"); // is isomsg field path dot-syntax? 268 if (!isField && aliases.get(valexpr) == null) 269 throw new ConfigurationException("Unknown metric tag alias for fieldset: '"+valexpr+"'"); 270 } 271 } 272 273 274 // ============= some helper methods ============= 275 276 private void throwIfFrozen(boolean frozenCondition) { 277 if (frozen == frozenCondition) 278 throw new IllegalStateException(frozen ? 279 "Can't modify this ISOMsgCounter after registration ("+getMetricSignature()+")" : 280 "Can't use this ISOMsgCounter before registration ("+getMetricSignature()+")" 281 ); 282 } 283 284 285 /** 286 * Returns a clone set, which may not be up to date next time you use it. 287 * 288 * @return a snapshot of the live meter set 289 */ 290 protected Set<Meter> getMeters() { 291 return new HashSet<>(meters); 292 } 293 294 // useful for tests and debugging 295 /** Returns the configured field set map. 296 * @return the field set map 297 */ 298 protected Map<String,String> getFieldSet() { 299 return Map.copyOf(fieldSet); 300 } 301 302 303 // Make Tags easy to navigate; do not abuse of this one unless you need repeated querying 304 /** Returns the metric tags as a map. 305 * @return the tags map 306 */ 307 public Map<String,String> getTagsAsMap() { 308 Map<String,String> tm= new HashMap<>(); // make Tags easy to navigate 309 getTags().forEach(t->tm.put(t.getKey(),t.getValue())); 310 return tm; 311 } 312 313 private static String getTtype(ISOMsg m) { 314 return m.hasField(3) ? m.getString(3).substring(0,2) : ""; 315 } 316 317 private static String getITC(ISOMsg m) { 318 String mti = m.getString(0); 319 if (mti == null || mti.trim().isEmpty()) return ""; 320 // some common fields to make an ITC from 321 return Stream.of(mti, getTtype(m), m.getString(24), m.getString(25), m.getString(70)) 322 .filter(s -> s != null && !s.isEmpty()) 323 .collect(Collectors.joining(".")); 324 } 325 326 327 /** 328 * Hook for subclasses to resolve, against an ISOMsg, the valexpr part of a tag:valexpr in a fieldset. 329 * <br> 330 * A subclass may add or override its own aliases, or have a special way to convert "valexpr" 331 * to a String taken from the given ISOMsg. 332 * <br> 333 * If the subclass can't resolve the alias/valexpr, it may call super (i.e. this method) 334 * as a fallback. 335 * 336 * @param m the ISOMsg whose fields are inspected 337 * @param val the valexpr (alias name or field path) to resolve 338 * @return the resolved value, or an empty string when no value is available 339 */ 340 protected String resolveValExpr(ISOMsg m, String val) { 341 var fun = aliases.get(val); // check if valexpr is a registered alias 342 val = fun != null ? fun.apply(m) : m.getString(val); 343 return val != null ? val : ""; 344 } 345 346 /** 347 * Returns a micrometer {@link Tags}, with keys and values resolved from a given fieldset against 348 * a given ISOMsg.<br/> 349 * Some of the valexprs in the fieldset may be aliases that need to be resolved to an ISOMsg field, 350 * and the field path is used to get the value from the ISOMsg.<br/> 351 * This method relies on the protected {@link #resolveValExpr(ISOMsg, String)} to resolve each valexpr. 352 * 353 * @param m the ISOMsg 354 * @param fieldset field-to-tag mapping to resolve against the message 355 * @return a micrometer {@link Tags} with all the tags from fieldset and the resolved values from the message 356 */ 357 private Tags resolveFieldTags(ISOMsg m, Map<String,String> fieldset) { 358 Tags tt = Tags.empty(); 359 // each entry is {tag,valexpr}, where valexpr may be an alias or an isofield path 360 for (var ent : fieldset.entrySet()) { 361 String val = resolveValExpr(m, ent.getValue()); 362 tt = tt.and(ent.getKey(), val); 363 } 364 return tt; 365 } 366 367 368 @Override 369 public void setLogger(Logger logger, String realm) { 370 this.logger = logger; 371 this.realm = realm; 372 } 373 @Override 374 public String getRealm() { 375 return realm; 376 } 377 @Override 378 public Logger getLogger() { 379 return logger; 380 } 381}