Implementation of RJack::JMS::JMSContext for Qpid, using the Qpid JNDI Properties syntax. Provides scripted setup and a factory for JMS Connection, Session, and Destinations (including full AMQP queue and exchange creation) via Qpid Addresses created from ruby.
Array of [ host ] arrays to brokers (default: [[‘localhost’]] ) Default port is 5672.
Connection ‘clientid’ (default: ‘default-client’) See JNDI Properties, 3.2.2 Connection URLs
Hash of destination JNDI name to an ‘address; options’ Hash. The option hash may use ruby Symbol or String keys, and true, false, Symbol, String, Hash, or Array values. This will be serialized into the Qpid Addresses Syntax. The special keys :address (default: same as JNDI name) and :subject (optional address/subject) are also supported when serializing. (Default: empty)
Connection factory, JNDI name (default: ‘local’)
Password to connect with (required, often unused by broker.)
Acknowledge Mode specified on #create_session (default: javax.jms.Session::AUTO_ACKNOWLEDGE).
User to connect with (required, often unused by broker, default: ‘qpid’)
Connection ‘virtualhost’ (default: ‘default-vhost’) See JNDI Properties, 3.2.2 Connection URLs
# File lib/rjack-qpid-client/qpid_jms_context.rb, line 74 def initialize @username = 'qpid' @password = 'pswd' @brokers = [ [ 'localhost' ] ] @virtual_host = 'default-vhost' @client_id = 'default-client' @factory_jndi_name = 'local' @session_acknowledge_mode = Session::AUTO_ACKNOWLEDGE @destinations = {} @log = RJack::SLF4J[ self.class ] end
Appends Socket.gethostname and the current Process.pid to the specified prefix to create a (transient) unique address name
# File lib/rjack-qpid-client/qpid_jms_context.rb, line 242 def address_per_process( prefix ) [ prefix, Socket.gethostname.to_s.split( '.' ).first, Process.pid.to_s ].compact.join( '-' ) end
Serialize destination Addresses (new format). Reference: section 2.4.3.5
# File lib/rjack-qpid-client/qpid_jms_context.rb, line 169 def address_serialize( name, opts = {} ) opts = opts.dup out = ( opts.delete( :address ) || name ).to_s subject = opts.delete( :subject ) out += '/' + subject.to_s if subject out += '; ' + option_serialize( opts ) if opts out end
Broker list Connection URL parameter value from input brokers.
# File lib/rjack-qpid-client/qpid_jms_context.rb, line 232 def broker_list l = brokers.map do | host, port | 'tcp://%s:%s' % [ host, port || 5672 ] end l.join( ';' ) end
Close the JNDI context (no more lookups may be made.)
# File lib/rjack-qpid-client/qpid_jms_context.rb, line 141 def close @con_factory = nil @context.close if @context @context = nil end
The javax.jms.ConnectionFactory, created on first call, by lookup of #factory_jndi_name from context.
Throws javax.naming.NamingException
# File lib/rjack-qpid-client/qpid_jms_context.rb, line 98 def connection_factory @con_factory ||= context.lookup( factory_jndi_name ) end
The AMQP specific Connection URL as defined in Qpid JNDI Properities.
# File lib/rjack-qpid-client/qpid_jms_context.rb, line 213 def connection_url url = "amqp://" if username url += username url += ':' + password if password url += '@' end url += [ client_id, virtual_host ].join( '/' ) url += '?' url += "brokerlist='#{ broker_list }'" url end
The JNDI InitialContext, created on first call, from properties.
# File lib/rjack-qpid-client/qpid_jms_context.rb, line 90 def context @context ||= InitialContext.new( properties ) end
Creates a new javax.jms.Connection from the connection_factory. Caller should close this connection when done with it.
Throws javax.jms.JMSException, javax.naming.NamingException
# File lib/rjack-qpid-client/qpid_jms_context.rb, line 107 def create_connection connection_factory.create_connection rescue NativeException => x raise x.cause end
Create a javax.jms.Session from the connection previously obtained via create_connection.
Throws javax.jms.JMSException
# File lib/rjack-qpid-client/qpid_jms_context.rb, line 118 def create_session( connection ) connection.create_session( false, session_acknowledge_mode ) rescue NativeException => x raise x.cause end
Lookup (and thus create) a javax.jms.Destination by JNDI name as key into destination. The name and full address specification is logged at this point.
Throws javax.naming.NamingException
# File lib/rjack-qpid-client/qpid_jms_context.rb, line 130 def lookup_destination( name ) dest = context.lookup( name ) @log.info( "Lookup of destinations[ '%s' ] =\n %s" % [ name, address_serialize( name, @destinations[ name ] ) ] ) dest rescue NativeException => x raise x.cause end
Serialize addresses options Hash
# File lib/rjack-qpid-client/qpid_jms_context.rb, line 179 def option_serialize( val ) case( val ) when TrueClass, FalseClass, Symbol, Integer val.to_s when String val.to_s.inspect #quote/escape when Hash pairs = val.map do | key, value | [ wrap_key( key ), option_serialize( value ) ].join( ": " ) end '{ ' + pairs.join( ', ' ) + ' }' when Array values = val.map do | value | option_serialize( value ) end '[ ' + values.join( ', ' ) + ' ]' else raise "Unknown option value class: " + val.class.to_s end end
Qpid JNDI Properties including #connection_url and destinations.
# File lib/rjack-qpid-client/qpid_jms_context.rb, line 149 def properties props = Java::java.util.Hashtable.new props[ Context::INITIAL_CONTEXT_FACTORY ] = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory" props[ [ "connectionfactory", factory_jndi_name ].join( '.' ) ] = connection_url destinations.each_pair do |name,opts| props[ [ "destination", name ].join( '.' ) ] = address_serialize( name, opts ) end props end
Quote keys that require it based on Qpid address scheme (most commonly, those with ‘.’ in them).
# File lib/rjack-qpid-client/qpid_jms_context.rb, line 202 def wrap_key( key ) k = key.to_s if ( k =~ /^[a-zA-Z_][a-zA-Z0-9_-]*[a-zA-Z0-9_]$/ ) k else "'" + k + "'" end end