Skip to content

Commit

Permalink
Wildcard support implemented, readme updated
Browse files Browse the repository at this point in the history
Exporter now is using single queue manager connection for all activities

Resolves: #144, #145
  • Loading branch information
echerniak committed Dec 18, 2019
1 parent 648e0c3 commit 5fb38c7
Show file tree
Hide file tree
Showing 18 changed files with 449 additions and 257 deletions.
25 changes: 18 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ Supports [IBM MQ](https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.0.0/co

Was tested on MQ ver.9.0.x.x and MQ ver. 9.1.x.x.

**Note:** The Publish/Subscribe Mode "[PSMODE](https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.1.0/com.ibm.mq.con.doc/q017110_.htm)" Queue Manager attribute in IBM MQ should have value "enabled".

#### Dependencies
<sub><sup> [Back to TOC.](#table-of-contents) </sup></sub><br/>
List of dependencies:
Expand All @@ -73,7 +75,7 @@ qmgrConnectionParams:
# Queue manager name.
qmgrName: QM
# Queue manager host.
qmgrHost: hostname
qmgrHost: localhost
# Queue manager connection port.
qmgrPort: 1414
# Queue manager connection channel.
Expand Down Expand Up @@ -128,20 +130,29 @@ PCFParameters:
# only one PCF command will be sent. But response will contain metrics for all 10.000 queues and that will lead to performance problems.
usePCFWildcards: true
# Interval in seconds between sending PCF commands.
scrapeInterval: 10
scrapeInterval: 300

# Further block contains info about monitoring objects. It supports "*" wildcard at the end of the name.
# Firstly, objects from "include" section are retrieved.
# Then objects from "exclude" section are retrieved.
# Finally, objects that are in the first group but not in the second are added to the monitoring list.

# Monitored queues.
queues:
- QUEUE1
- QUEUE2
include:
- '*'
exclude:
- SYSTEM.*

# Monitored listeners.
listeners:
- LISTENER01

include:
exclude:
- SYSTEM.*

# Monitored channels.
channels:
- MANAGEMENT.CHANNEL
include:
```
#### Build
<sub><sup> [Back to TOC.](#table-of-contents) </sup></sub><br/>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>ru.cinimex</groupId>
<artifactId>mq-java-exporter</artifactId>
<version>1.0-SNAPSHOT</version>
<version>0.4.0-rc1</version>
<!-- Output to jar format -->
<packaging>jar</packaging>
<properties>
Expand Down
20 changes: 10 additions & 10 deletions src/main/java/ru/cinimex/exporter/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/**
* Class is used for parsing config file.
Expand All @@ -29,9 +29,9 @@ public class Config {
private int connTimeout;
private int endpPort;
private String endpURL;
private ArrayList<String> queues;
private ArrayList<String> channels;
private ArrayList<String> listeners;
private Map<String, List<String>> queues;
private Map<String, List<String>> channels;
private Map<String, List<String>> listeners;
private boolean sendPCFCommands;
private boolean usePCFWildcards;
private int scrapeInterval;
Expand Down Expand Up @@ -59,9 +59,9 @@ public Config(String path) {
this.password = (String) qmgrConnectionParams.get("password");
this.mqscp = (boolean) qmgrConnectionParams.get("mqscp");
this.connTimeout = (Integer) qmgrConnectionParams.get("connTimeout");
queues = (ArrayList<String>) config.get("queues");
listeners = (ArrayList<String>) config.get("listeners");
channels = (ArrayList<String>) config.get("channels");
this.queues = (Map<String, List<String>>) config.get("queues");
this.listeners = (Map<String, List<String>>) config.get("listeners");
this.channels = (Map<String, List<String>>) config.get("channels");
this.endpPort = (Integer) prometheusEndpointParams.get("port");
this.endpURL = (String) (prometheusEndpointParams.get("url"));
this.sendPCFCommands = (boolean) pcfParameters.get("sendPCFCommands");
Expand Down Expand Up @@ -101,11 +101,11 @@ public boolean usePCFWildcards() {
return usePCFWildcards;
}

public List<String> getChannels() {
public Map<String, List<String>> getChannels() {
return channels;
}

public List<String> getListeners() {
public Map<String, List<String>> getListeners() {
return listeners;
}

Expand Down Expand Up @@ -145,7 +145,7 @@ public String getEndpURL() {
return endpURL;
}

public List<String> getQueues() {
public Map<String, List<String>> getQueues() {
return queues;
}

Expand Down
117 changes: 89 additions & 28 deletions src/main/java/ru/cinimex/exporter/ExporterLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.ibm.mq.MQTopic;
import com.ibm.mq.constants.MQConstants;
import com.ibm.mq.pcf.PCFMessage;
import com.ibm.mq.pcf.PCFMessageAgent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import ru.cinimex.exporter.mq.MQConnection;
Expand All @@ -24,46 +25,33 @@
import java.util.ArrayList;
import java.util.List;

import static ru.cinimex.exporter.mq.MQConnection.createMQConnectionParams;

/**
* Main class of mq exporter tool. Parses config, scans topics, starts subscribers.
*/
public class ExporterLauncher {
private static final Logger logger = LogManager.getLogger(ExporterLauncher.class);
private static final String TOPIC_STRING = "$SYS/MQ/INFO/QMGR/%s/Monitor/METADATA/CLASSES";
private static final int GMO = MQConstants.MQGMO_WAIT | MQConstants.MQGMO_COMPLETE_MSG | MQConstants.MQGMO_SYNCPOINT;
private static MQSubscriberManager manager;
private static MQSubscriberManager manager;
private static HTTPServer server;

public static void main(String[] args) {
public static void main(String[] args) throws MQException, IOException {
if (args.length == 0) {
logger.error("It seems like you forgot to specify path to the config file.");
System.exit(1);
}
Config config = new Config(args[0]);
MQConnection.establish(config.getQmgrName(), createMQConnectionParams(config));

createShutdownHook();
ArrayList<PCFElement> elements = getAllPublishedMetrics(config);
ArrayList<MQObject.MQType> monitoringTypes = new ArrayList<>();
ArrayList<MQObject> objects = new ArrayList<>();

if (config.getQueues() != null && !config.getQueues().isEmpty()) {
monitoringTypes.add(MQObject.MQType.QUEUE);
for (String queueName : config.getQueues()) {
objects.add(new MQObject(queueName, MQObject.MQType.QUEUE));
}
}
if (config.getChannels() != null && !config.getChannels().isEmpty()) {
monitoringTypes.add(MQObject.MQType.CHANNEL);
for (String channelName : config.getChannels()) {
objects.add(new MQObject(channelName, MQObject.MQType.CHANNEL));
}
}
if (config.getListeners() != null && !config.getListeners().isEmpty()) {
monitoringTypes.add(MQObject.MQType.LISTENER);
for (String listenerName : config.getListeners()) {
objects.add(new MQObject(listenerName, MQObject.MQType.LISTENER));
}
}
List<MQObject> objects = getMonitoringObjects(config);
monitoringTypes.add(MQObject.MQType.QUEUE);
monitoringTypes.add(MQObject.MQType.CHANNEL);
monitoringTypes.add(MQObject.MQType.LISTENER);

MetricsManager.initMetrics(elements, monitoringTypes);
manager = new MQSubscriberManager(config);
Expand All @@ -83,27 +71,26 @@ public static void main(String[] args) {
* @return - array, filled with metrics headers.
*/
private static ArrayList<PCFElement> getAllPublishedMetrics(Config config) {
MQConnection connection = new MQConnection();
MQTopic topic = null;
ArrayList<PCFElement> elements = new ArrayList<>();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = GMO;
gmo.waitInterval = 30000;
try {
connection.establish(config.getQmgrName(), MQConnection.createMQConnectionParams(config));
topic = connection.createTopic(String.format(TOPIC_STRING, config.getQmgrName()));
String qmgrName = config.getQmgrName();
topic = MQConnection.createTopic(String.format(TOPIC_STRING, qmgrName));
MQMessage msg = getEmptyMessage();
topic.get(msg, gmo);
PCFMessage pcfResponse = new PCFMessage(msg);
List<PCFClass> classes = PCFDataParser.getPCFClasses(pcfResponse);
for (PCFClass pcfClass : classes) {
topic = connection.createTopic(pcfClass.getTopicString());
topic = MQConnection.createTopic(pcfClass.getTopicString());
msg = getEmptyMessage();
topic.get(msg, gmo);
pcfResponse = new PCFMessage(msg);
List<PCFType> types = PCFDataParser.getPCFTypes(pcfResponse);
for (PCFType type : types) {
topic = connection.createTopic(type.getTopicString());
topic = MQConnection.createTopic(type.getTopicString());
msg = getEmptyMessage();
topic.get(msg, gmo);
pcfResponse = new PCFMessage(msg);
Expand All @@ -118,14 +105,87 @@ private static ArrayList<PCFElement> getAllPublishedMetrics(Config config) {
if (topic != null && topic.isOpen()) {
topic.close();
}
connection.close();
} catch (MQException e) {
logger.error("Error occurred during disconnecting from topic {}. Error: ", topic.toString(), e);
}
}
return elements;
}

private static List<MQObject> getMonitoringObjects(Config config) throws MQException, IOException {
List<MQObject> objects = new ArrayList<>();

for (MQObject.MQType type : MQObject.MQType.values()) {
switch (type) {
case QUEUE:
objects.addAll(inquireMQObjectsByPatterns(config.getQueues().get("include"), type, MQConstants.MQCACF_Q_NAMES));
objects.removeAll(inquireMQObjectsByPatterns(config.getQueues().get("exclude"), type, MQConstants.MQCACF_Q_NAMES));
break;
case CHANNEL:
objects.addAll(inquireMQObjectsByPatterns(config.getChannels().get("include"), type, MQConstants.MQCACH_CHANNEL_NAMES));
objects.removeAll(inquireMQObjectsByPatterns(config.getChannels().get("exclude"), type, MQConstants.MQCACH_CHANNEL_NAMES));
break;
case LISTENER:
objects.addAll(inquireMQObjectsByPatterns(config.getListeners().get("include"), type, MQConstants.MQCACH_LISTENER_NAME));
objects.removeAll(inquireMQObjectsByPatterns(config.getListeners().get("exclude"), type, MQConstants.MQCACH_LISTENER_NAME));
break;
}
}
return objects;
}

private static List<MQObject> inquireMQObjectsByPatterns(List<String> rules, MQObject.MQType type, int getValueParam) throws MQException, IOException {

List<MQObject> objects = new ArrayList<>();
if (rules != null && !rules.isEmpty()) {
PCFMessageAgent agent = new PCFMessageAgent();
agent.connect(MQConnection.getQueueManager());
for (String rule : rules) {
PCFMessage pcfCommand = preparePCFCommand(type, rule);
PCFMessage[] pcfResponse = agent.send(pcfCommand);
if (!type.equals(MQObject.MQType.LISTENER)) {
String[] names = (String[]) pcfResponse[0].getParameterValue(getValueParam);

for (int index = 0; index < names.length; index++) {
String objName = names[index].trim();
if (!objName.startsWith("AMQ.")) {
objects.add(new MQObject(objName, type));
}
}
} else {
for (int index = 0; index < pcfResponse.length; index++) {
objects.add(new MQObject(pcfResponse[index].getParameterValue(MQConstants.MQCACH_LISTENER_NAME).toString().trim(), type));
}
}
}
agent.disconnect();
}

return objects;
}

private static PCFMessage preparePCFCommand(MQObject.MQType type, String name) {
PCFMessage command;
switch (type) {
case QUEUE:
command = new PCFMessage(MQConstants.MQCMD_INQUIRE_Q_NAMES);
command.addParameter(MQConstants.MQCA_Q_NAME, name);
command.addParameter(MQConstants.MQIA_Q_TYPE, MQConstants.MQQT_LOCAL);
break;
case CHANNEL:
command = new PCFMessage(MQConstants.MQCMD_INQUIRE_CHANNEL_NAMES);
command.addParameter(MQConstants.MQCACH_CHANNEL_NAME, name);
break;
case LISTENER:
command = new PCFMessage(MQConstants.MQCMD_INQUIRE_LISTENER);
command.addParameter(MQConstants.MQCACH_LISTENER_NAME, name);
break;
default:
throw new IllegalStateException("Unexpected value: " + type);
}
return command;
}

private static void createShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Exporter finishes all activities...");
Expand All @@ -142,6 +202,7 @@ private static void createShutdownHook() {
logger.debug("Stopping HTTP server...");
server.stop();
}
MQConnection.close();
logger.info("Goodbye!");
LogManager.shutdown();
}));
Expand Down
Loading

0 comments on commit 5fb38c7

Please sign in to comment.