1   /***
2    *
3    * Copyright 2004 Protique Ltd
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.codehaus.activemq.benchmark;
19  
20  import javax.jms.DeliveryMode;
21  import javax.jms.Destination;
22  import javax.jms.JMSException;
23  import javax.jms.Message;
24  import javax.jms.MessageProducer;
25  import javax.jms.Session;
26  import java.io.BufferedReader;
27  import java.io.File;
28  import java.io.FileReader;
29  import java.io.IOException;
30  
31  /***
32   * @author James Strachan
33   * @version $Revision: 1.11 $
34   */
35  public class Producer extends BenchmarkSupport {
36  
37      int loops = -1;
38      int loopSize = 1000;
39      private int messageSize = 1000;
40  
41      public static void main(String[] args) {
42          Producer tool = new Producer();
43          if (args.length > 0) {
44              tool.setUrl(args[0]);
45          }
46          if (args.length > 1) {
47              tool.setTopic(parseBoolean(args[1]));
48          }
49          if (args.length > 2) {
50              tool.setSubject(args[2]);
51          }
52          if (args.length > 3) {
53              tool.setDurable(parseBoolean(args[3]));
54          }
55          if (args.length > 4) {
56              tool.setMessageSize(Integer.parseInt(args[4]));
57          }
58          if (args.length > 5) {
59              tool.setConnectionCount(Integer.parseInt(args[5]));
60          }
61          try {
62              tool.run();
63          }
64          catch (Exception e) {
65              System.out.println("Caught: " + e);
66              e.printStackTrace();
67          }
68      }
69  
70      public Producer() {
71      }
72  
73      public void run() throws Exception {
74          start();
75          publish();
76      }
77  
78      // Properties
79      //-------------------------------------------------------------------------
80      public int getMessageSize() {
81          return messageSize;
82      }
83  
84      public void setMessageSize(int messageSize) {
85          this.messageSize = messageSize;
86      }
87  
88      public int getLoopSize() {
89          return loopSize;
90      }
91  
92      public void setLoopSize(int loopSize) {
93          this.loopSize = loopSize;
94      }
95  
96      // Implementation methods
97      //-------------------------------------------------------------------------
98  
99      protected void publish() throws Exception {
100         final String text = getMessage();
101 
102         System.out.println("Publishing to: " + subjects.length + " subject(s)");
103 
104         for (int i = 0; i < subjects.length; i++) {
105             final String subject = subjects[i];
106             Thread thread = new Thread() {
107                 public void run() {
108                     try {
109                         publish(text, subject);
110                     }
111                     catch (JMSException e) {
112                         System.out.println("Caught: " + e);
113                         e.printStackTrace();
114                     }
115                 }
116             };
117             thread.start();
118         }
119 
120     }
121 
122     protected String getMessage() {
123         StringBuffer buffer = new StringBuffer();
124         for (int i = 0; i < messageSize; i++) {
125             char ch = 'X';
126             buffer.append(ch);
127         }
128         return buffer.toString();
129     }
130 
131     protected void publish(String text, String subject) throws JMSException {
132         Session session = createSession();
133 
134         Destination destination = createDestination(session, subject);
135 
136         MessageProducer publisher = session.createProducer(destination);
137         if (isDurable()) {
138             publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
139         }
140         else {
141             publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
142         }
143 
144         System.out.println("Starting publisher on : " + destination + " of type: " + destination.getClass().getName());
145         System.out.println("Message length: " + text.length());
146 
147         if (loops <= 0) {
148             while (true) {
149                 publishLoop(session, publisher, text);
150             }
151         }
152         else {
153             for (int i = 0; i < loops; i++) {
154                 publishLoop(session, publisher, text);
155             }
156         }
157     }
158 
159     protected void publishLoop(Session session, MessageProducer publisher, String text) throws JMSException {
160         for (int i = 0; i < loopSize; i++) {
161             Message message = session.createTextMessage(text);
162 
163             publisher.send(message);
164             count(1);
165         }
166     }
167 
168     protected String loadFile(String file) throws IOException {
169         System.out.println("Loading file: " + file);
170 
171         StringBuffer buffer = new StringBuffer();
172         BufferedReader in = new BufferedReader(new FileReader(file));
173         while (true) {
174             String line = in.readLine();
175             if (line == null) {
176                 break;
177             }
178             buffer.append(line);
179             buffer.append(File.separator);
180         }
181         return buffer.toString();
182     }
183 }