ActiveMQ 中的消息持久化(二)_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > ActiveMQ 中的消息持久化(二)

ActiveMQ 中的消息持久化(二)

 2010/11/17 22:49:49  berdy  http://berdy.javaeye.com  我要评论(0)
  • 摘要:为了长时间的存储和管理消息,一般会使用数据库。在Activemq中默认使用的是DerbyDB。当然也可更改配置来使用其他的DB。Activemq支持以下这些DB:ApacheDerbyAxionDB2HSQLInformixMaxDBMySQLOraclePostgresqlSQLServerSybase如果要使用不在上面列表中的DB,可以通过配置SQL语句和JDBC驱动来支持自己的DB。Broker在启动的时候读取配置文件,若在配置文件中指定了特定的JDBC驱动
  • 标签:ActiveMQ消息持久化
    为了长时间的存储和管理消息,一般会使用数据库。在Activemq中默认使用的是Derby DB。当然也可更改配置来使用其他的DB。Activemq支持以下这些DB:
  • Apache Derby
  • Axion
  • DB2
  • HSQL
  • Informix
  • MaxDB
  • MySQL
  • Oracle
  • Postgresql
  • SQLServer
  • Sybase

     如果要使用不在上面列表中的DB,可以通过配置SQL语句和JDBC驱动来支持自己的DB。Broker在启动的时候读取配置文件,若在配置文件中指定了特定的JDBC驱动,则会在classpath路径下自动检测配置的JDBC驱动。下面是关于oracle的一个配置:
<beans ...>
  <broker xmlns="http://activemq.apache.org/schema/core"
          brokerName="localhost">
    ...
    <persistenceAdapter>
       <jdbcPersistenceAdapter
            dataDirectory="${activemq.base}/data"
            dataSource="#oracle-ds"/>
    </persistenceAdapter>
    ...
  </broker>
 
  <!-- Oracle DataSource Sample Setup -->
  <bean id="oracle-ds"
        class="org.apache.commons.dbcp.BasicDataSource"
        destroy-method="close">
    <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
    <property name="url" value="jdbc:oracle:thin:@localhost:1521:AMQDB"/>
    <property name="username" value="scott"/>
    <property name="password" value="tiger"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
  </bean>

</beans>

在InstallDir/conf/activemq-jdbc.xml有一个jdbc配置样例。

在使用JDBC持久化的时候根据是否支持Activemq提供的高效的journal文件分为两种。
1、支持高效日志
在消息消费者能跟上生产者的速度时,journal文件能大大减少需要写入到DB中的消息。举个例子:生产者产生了10000个消息,这10000个消息会保存到journal文件中,但是消费者的速度很快,在journal文件还未同步到DB之前,以消费了9900个消息。那么后面就只需要写入100个消息到DB了。如果消费者不能跟上生产者的速度,journal文件可以使消息以批量的方式写入DB中,JDBC驱动进行DB写入的优化。从而提升了性能。另外,journal文件支持JMS事务的一致性。
下面是一段支持高效日志的JDBC持久化的配置:
<beans ...>
  <broker ...>
    ...
  <persistenceFactory>
    <journaledJDBC journalLogFiles="5" dataSource="#mysql-ds" />
    </persistenceFactory>
    ...
  <broker>
  ...
<bean id="mysql-ds"
      class="org.apache.commons.dbcp.BasicDataSource"
      destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
    <property name="username" value="activemq"/>
    <property name="password" value="activemq"/>
    <property name="poolPreparedStatements" value="true"/>
  </bean>
</beans>

journaledJDBC 节点的有以下可配置属性:
属性默认值描述adapter 指定持久存储的DBcreateTablesOnStartuptrue是否在启动的时候创建相应的表dataDirectoryactivemq-data指定Derby存储数据文件的路径dataSource#derby指定要引用的数据源journalArchiveDirectory 指定归档日志文件存储的路径journalLogFiles2指定日志文件的数量journalLogFileSize20MB指定每个日志文件的大小journalThreadPriority10指定写入日志的线程的优先级useDatabaseLocktrue在主从配置的时候是否使用排他锁useJournaltrue指定是否使用日志
2、不使用高效日志
下面是不使用高效日志的配置:
<beans ...>
  <broker ...>
  ...
  <persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#derby-ds" />
    </persistenceAdapter>
    ...
  <broker>
  ...
<bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
    <property name="databaseName" value="derbydb"/>
    <property name="createDatabase" value="create"/>
  </bean>
</beans>

可以看出,使用高效日志的配置使在persistenceFactory节点内的。而不使用高效日志的使配置在persistenceAdapter节点内的。
下面是jdbcPersistenceAdapter 节点可配置的属性:
属性默认值描述adapter 指定持久存储的DBcleanupPeriod300000指定删除DB中已收到确认信息的消息的时间周期(ms)createTablesOnStartuptrue是否在启动的时候创建相应的表databaseLockerDefaultDatabaseLocker instance防止多个broker同时访问DB的锁实例dataDirectoryactivemq-data指定Derby存储数据文件的路径dataSource#derby指定要引用的数据源lockAcquireSleepInterval1000等待获取锁的时间长度(ms)lockKeepAlivePeriod30000定期向lock表中写入当前时间,指定了锁的存活时间useDatabaseLocktrue在主从配置的时候是否使用排他锁transactionIsolationConnection.TRANSACTION_READ_UNCOMMITTED指定事务隔离级别
如果需要配置的DB不再Activemq默认支持的DB列表中,可以通过配置SQL语句和JDBC持久层来实现。在journaledJDBC节点下配置statements 节点来指定JDBC持久层对各类数据的处理方式。下面是一个样例:
<persistenceFactory>
  <journaledJDBC ...>
    <statements>
      <statements stringIdDataType ="VARCHAR(128)"/>
    </statements>
  </journaledJDBC>
</persistenceFactory>

statements 节点的各个属性:
属性默认值描述tablePrefix 指定创建的表名的前缀messageTableNameACTIVEMQ_MSGS存储消息内容的表名durableSubAcksTableNameACTIVEMQ_ACKS存储持久订阅者的确认消息的表名lockTableNameACTIVEMQ_LOCKlock表名binaryDataTypeBLOB指定存放消息的数据类型containerNameDataTypeVARCHAR(250)指定存放Destination名称的数据类型msgIdDataTypeVARCHAR(250)存放消息ID的数据类型sequenceDataTypeINTEGER存放消息序列ID的数据类型longDataTypeBIGINTDB中存放JAVA中long型的数据类型stringIdDataTypeVARCHAR(250)存放长字符串的DB数据类型

定制自己插入和读取数据的SQL语句,可以设置下面的这些属性
  • addMessageStatement
  • updateMessageStatement
  • removeMessageStatement
  • findMessageSequenceIdStatement
  • findMessageStatement
  • findAllMessagesStatement
  • findLastSequenceIdInMsgsStatement
  • findLastSequenceIdInAcksStatement
  • createDurableSubStatement
  • findDurableSubStatement
  • findAllDurableSubsStatement
  • updateLastAckOfDurableSubStatement
  • deleteSubscriptionStatement
  • findAllDurableSubMessagesStatement
  • findDurableSubMessagesStatement
  • findAllDestinationsStatement
  • removeAllMessagesStatement
  • removeAllSubscriptionsStatement
  • deleteOldMessagesStatement
  • lockCreateStatement
  • lockUpdateStatement
  • nextDurableSubscriberMessageStatement
  • durableSubscriberMessageCountStatement
  • lastAckedDurableSubscriberMessageStatement
  • destinationMessageCountStatement
  • findNextMessageStatement
  • createSchemaStatements
  • dropSchemaStatements

设置adapter属性可以设定JDBC适配器存储和访问BLOB字段的方式,可设置的方式有:
  • org.activemq.store.jdbc.adapter.BlobJDBCAdapter
  • org.activemq.store.jdbc.adapter.BytesJDBCAdapter
  • org.activemq.store.jdbc.adapter.DefaultJDBCAdapter
  • org.activemq.store.jdbc.adapter.ImageJDBCAdapter

具体需要查看JDBC的驱动数据库的文档。
<broker persistent="true" ...>
  ...
  <persistenceFactory>
    <journaledJDBC adapter="org.activemq.store.jdbc.adapter.BlobJDBCAdapter" ... />
  </persistenceFactory>
  ...
</broker>

发表评论
用户名: 匿名