背景
项目中遇到一个场景,在运行时动态的创建 RabbitMQ 的 VHost,同时在这个VHost下创建对应的交换机、路由、队列。
正常我们使用 SpringBoot 封装的 RabbitTemplate时,在配置中仅能配置一个VHost,通过查看源码,发现其 VHost的配置其实是与ConnectionFactory绑定的,因此如果需要让 RabbitTemplate 在不同的VHost上进行消息发送,或者通过 RabbitAdmin进行Exchange,Queue,RoutingKey的配置的情况下,需要创建多个ConnectionFactory,也就是意味着需要创建多个 RabbitTemplate 实例。
接着通过直接挪用RabbitAutoconfiguration源码,实现了根据配置创建的不同的ConnectionFactory以及对应的 RabbitTemplate和 RabbitAdmin,这里不多赘述,但是项目启动之后,发现VHost 并不会自动创建,需要进行手动创建,如果通过控制台页面,那么很好解决这个问题,但是需要通过代码处理时,发现没有相关的API进行此操作。
管理API
经过查找,发现在 RabbitMQ的控制台页面中,有一个 HTTP API
的按钮,这里面说明了RabbitMQ管理使用的所有 API,具体如下表,同时指出了,访问时要使用HTTP basic authentication
进行认证,Content-Type 需要为application/json
,如果是/
VHost需要转义为%2F
,下表中X
表示支持该Rest操作
GET | PUT | DELETE | POST | Path | Description |
---|---|---|---|---|---|
X | /api/overview | Various random bits of information that describe the whole system. | |||
X | X | /api/cluster-name | Name identifying this RabbitMQ cluster. | ||
X | /api/nodes | A list of nodes in the RabbitMQ cluster. | |||
X | /api/nodes/name | An individual node in the RabbitMQ cluster. Add "?memory=true" to get memory statistics, and "?binary=true" to get a breakdown of binary memory use (may be expensive if there are many small binaries in the system). | |||
X | /api/extensions | A list of extensions to the management plugin. | |||
X | X | /api/definitions /api/all-configuration (deprecated) |
The server definitions - exchanges, queues, bindings, users,
virtual hosts, permissions, topic permissions, and parameters. Everything apart from
messages. POST to upload an existing set of definitions. Note
that:
multipart/form-data as
well as application/json ) in which case the
definitions should be uploaded as a form field named
"file".
|
||
X | X | /api/definitions/vhost |
The server definitions for a given virtual host -
exchanges, queues, bindings and policies.
POST to upload an existing set of definitions. Note that:
multipart/form-data as
well as application/json ) in which case the
definitions should be uploaded as a form field named
"file".
|
||
X | /api/connections | A list of all open connections. Use pagination parameters to filter connections. | |||
X | /api/vhosts/vhost/connections | A list of all open connections in a specific virtual host. Use pagination parameters to filter connections. | |||
X | X | /api/connections/name | An individual connection. DELETEing it will close the connection. Optionally set the "X-Reason" header when DELETEing to provide a reason. | ||
X | X | /api/connections/username/username | A list of all open connections for a specific username. Use pagination parameters to filter connections. DELETEing a resource will close all the connections for a username. Optionally set the "X-Reason" header when DELETEing to provide a reason. | ||
X | /api/connections/name/channels | List of all channels for a given connection. | |||
X | /api/channels | A list of all open channels. Use pagination parameters to filter channels. | |||
X | /api/vhosts/vhost/channels | A list of all open channels in a specific virtual host. Use pagination parameters to filter channels. | |||
X | /api/channels/channel | Details about an individual channel. | |||
X | /api/consumers | A list of all consumers. | |||
X | /api/consumers/vhost | A list of all consumers in a given virtual host. | |||
X | /api/exchanges | A list of all exchanges. Use pagination parameters to filter exchanges. | |||
X | /api/exchanges/vhost | A list of all exchanges in a given virtual host. Use pagination parameters to filter exchanges. | |||
X | X | X | /api/exchanges/vhost/name |
An individual exchange. To PUT an exchange, you will need a body looking something like this:
{"type":"direct","auto_delete":false,"durable":true,"internal":false,"arguments":{}}The type key is mandatory; other keys are optional.
When DELETEing an exchange you can add the query string
parameter |
|
X | /api/exchanges/vhost/name/bindings/source | A list of all bindings in which a given exchange is the source. | |||
X | /api/exchanges/vhost/name/bindings/destination | A list of all bindings in which a given exchange is the destination. | |||
X | /api/exchanges/vhost/name/publish |
Publish a message to a given exchange. You will need a body
looking something like:
{"properties":{},"routing_key":"my key","payload":"my body","payload_encoding":"string"}All keys are mandatory. The payload_encoding
key should be either "string" (in which case the payload
will be taken to be the UTF-8 encoding of the payload field)
or "base64" (in which case the payload field is taken to be
base64 encoded).If the message is published successfully, the response will look like: {"routed": true} routed will be true if the message was sent to
at least one queue.
Please note that the HTTP API is not ideal for high performance publishing; the need to create a new TCP connection for each message published can limit message throughput compared to AMQP or other protocols using long-lived connections. |
|||
X | /api/queues | A list of all queues. Use pagination parameters to filter queues. | |||
X | /api/queues/vhost | A list of all queues in a given virtual host. Use pagination parameters to filter queues. | |||
X | X | X | /api/queues/vhost/name |
An individual queue. To PUT a queue, you will need a body looking something like this:
{"auto_delete":false,"durable":true,"arguments":{},"node":"rabbit@smacmullen"}All keys are optional.
When DELETEing a queue you can add the query string
parameters |
|
X | /api/queues/vhost/name/bindings | A list of all bindings on a given queue. | |||
X | /api/queues/vhost/name/contents | Contents of a queue. DELETE to purge. Note you can't GET this. | |||
X | /api/queues/vhost/name/actions |
Actions that can be taken on a queue. POST a body like:
{"action":"sync"}Currently the actions which are supported are sync and cancel_sync .
|
|||
X | /api/queues/vhost/name/get |
Get messages from a queue. (This is not an HTTP GET as it
will alter the state of the queue.) You should post a body looking like:
{"count":5,"ackmode":"ack_requeue_true","encoding":"auto","truncate":50000}
Please note that the get path in the HTTP API is intended for diagnostics etc - it does not implement reliable delivery and so should be treated as a sysadmin's tool rather than a general API for messaging. |
|||
X | /api/bindings | A list of all bindings. | |||
X | /api/bindings/vhost | A list of all bindings in a given virtual host. | |||
X | X | /api/bindings/vhost/e/exchange/q/queue |
A list of all bindings between an exchange and a queue. Remember, an exchange and a queue can be bound together many times!
To create a new binding, POST to this
URI. Request body should be a JSON object optionally containing
two fields, {"routing_key":"my_routing_key", "arguments":{"x-arg": "value"}}All keys are optional. The response will contain a Location header
telling you the URI of your new binding.
|
||
X | X | /api/bindings/vhost/e/exchange/q/queue/props | An individual binding between an exchange and a queue. The props part of the URI is a "name" for the binding composed of its routing key and a hash of its arguments. props is the field named "properties_key" from a bindings listing response. | ||
X | X | /api/bindings/vhost/e/source/e/destination |
A list of all bindings between two exchanges, similar to the list of all bindings between an exchange and a queue, above.
To create a new binding, POST to this
URI. Request body should be a JSON object optionally containing
two fields, {"routing_key":"my_routing_key", "arguments":{"x-arg": "value"}}All keys are optional. The response will contain a Location header
telling you the URI of your new binding.
|
||
X | X | /api/bindings/vhost/e/source/e/destination/props | An individual binding between two exchanges. Similar to the individual binding between an exchange and a queue, above. | ||
X | /api/vhosts | A list of all vhosts. | |||
X | X | X | /api/vhosts/name | An individual virtual host. As a virtual host usually only
has a name, you do not need an HTTP body when PUTing one of
these. To set metadata on creation, provide a body like the following:
{"description":"virtual host description", "tags":"accounts,production"} tags is a comma-separated list of tags.
These metadata fields are optional.
To enable / disable tracing, provide a body looking like:
{"tracing":true} |
|
X | /api/vhosts/name/permissions | A list of all permissions for a given virtual host. | |||
X | /api/vhosts/name/topic-permissions | A list of all topic permissions for a given virtual host. | |||
X | /api/vhosts/name/start/node | Starts virtual host name on node node. | |||
X | /api/users/ | A list of all users. | |||
X | /api/users/without-permissions | A list of users that do not have access to any virtual host. | |||
X | /api/users/bulk-delete | Bulk deletes a list of users. Request body must contain the list:
{"users" : ["user1", "user2", "user3"]} |
|||
X | X | X | /api/users/name | An individual user. To PUT a user, you will need a body looking something like this:{"password":"secret","tags":"administrator"}or: {"password_hash":"2lmoth8l4H0DViLaK9Fxi6l9ds8=", "tags":"administrator"}The tags key is mandatory. Either
password or password_hash
can be set. If neither are set the user will not be able to log in with a password,
but other mechanisms like client certificates may be used.
Setting password_hash to "" will ensure the
user cannot use a password to log in. tags is a
comma-separated list of tags for the user. Currently recognised tags
are administrator , monitoring and management .
password_hash must be generated using the algorithm described
here.
You may also specify the hash function being used by adding the hashing_algorithm
key to the body. Currently recognised algorithms are rabbit_password_hashing_sha256 ,
rabbit_password_hashing_sha512 , and rabbit_password_hashing_md5 .
|
|
X | /api/users/user/permissions | A list of all permissions for a given user. | |||
X | /api/users/user/topic-permissions | A list of all topic permissions for a given user. | |||
X | /api/user-limits | Lists per-user limits for all users. | |||
X | /api/user-limits/user | Lists per-user limits for a specific user. | |||
X | X | /api/user-limits/user/name |
Set or delete per-user limit for user . The name URL path element
refers to the name of the limit (max-connections , max-channels ).
Limits are set using a JSON document in the body: {"value": 100}. Example request: curl -4u 'guest:guest' -H 'content-type:application/json' -X PUT localhost:15672/api/user-limits/guest/max-connections -d '{"value": 50}' |
||
X | /api/whoami | Details of the currently authenticated user. | |||
X | /api/permissions | A list of all permissions for all users. | |||
X | X | X | /api/permissions/vhost/user | An individual permission of a user and virtual host. To PUT a permission, you will need a body looking something like this:{"configure":".*","write":".*","read":".*"}All keys are mandatory. |
|
X | /api/topic-permissions | A list of all topic permissions for all users. | |||
X | X | X | /api/topic-permissions/vhost/user | Topic permissions for a user and virtual host. To PUT a topic permission, you will need a body looking something like this:
{"exchange":"amq.topic","write":"^a","read":".*"}All keys are mandatory. |
|
X | /api/parameters | A list of all vhost-scoped parameters. | |||
X | /api/parameters/component | A list of all vhost-scoped parameters for a given component. | |||
X | /api/parameters/component/vhost | A list of all vhost-scoped parameters for a given component and virtual host. | |||
X | X | X | /api/parameters/component/vhost/name | An individual vhost-scoped parameter. To PUT a parameter, you will need a body looking something like this:{"vhost": "/","component":"federation","name":"local_username","value":"guest"} |
|
X | /api/global-parameters | A list of all global parameters. | |||
X | X | X | /api/global-parameters/name | An individual global parameter. To PUT a parameter, you will need a body looking something like this:{"name":"user_vhost_mapping","value":{"guest":"/","rabbit":"warren"}} |
|
X | /api/policies | A list of all policies. | |||
X | /api/policies/vhost | A list of all policies in a given virtual host. | |||
X | X | X | /api/policies/vhost/name |
An individual policy. To PUT a policy, you will need a body looking something like this:{"pattern":"^amq.", "definition": {"federation-upstream-set":"all"}, "priority":0, "apply-to": "all"} pattern and definition are mandatory, priority and apply-to are optional.
|
|
X | /api/operator-policies | A list of all operator policy overrides. | |||
X | /api/operator-policies/vhost | A list of all operator policy overrides in a given virtual host. | |||
X | X | X | /api/operator-policies/vhost/name |
An individual operator policy. To PUT a policy, you will need a body looking something like this:{"pattern":"^amq.", "definition": {"expires":100}, "priority":0, "apply-to": "queues"} pattern and definition are mandatory, priority and apply-to are optional.
|
|
X | /api/aliveness-test/vhost | Declares a test queue on the target node, then publishes and consumes a message. Intended to be used as a very basic health check. Responds a 200 OK if the check succeeded, otherwise responds with a 503 Service Unavailable. | |||
X | /api/health/checks/alarms | Responds a 200 OK if there are no alarms in effect in the cluster, otherwise responds with a 503 Service Unavailable. | |||
X | /api/health/checks/local-alarms | Responds a 200 OK if there are no local alarms in effect on the target node, otherwise responds with a 503 Service Unavailable. | |||
X | /api/health/checks/certificate-expiration/within/unit |
Checks the expiration date on the certificates for every listener configured to use TLS. Responds a 200 OK if all certificates are valid (have not expired), otherwise responds with a 503 Service Unavailable. Valid units: days, weeks, months, years. The value of the within argument is the number of units. So, when within is 2 and unit is "months", the expiration period used by the check will be the next two months. |
|||
X | /api/health/checks/port-listener/port | Responds a 200 OK if there is an active listener on the give port, otherwise responds with a 503 Service Unavailable. | |||
X | /api/health/checks/protocol-listener/protocol | Responds a 200 OK if there is an active listener for the given protocol, otherwise responds with a 503 Service Unavailable. Valid protocol names are: amqp091, amqp10, mqtt, stomp, web-mqtt, web-stomp. | |||
X | /api/health/checks/virtual-hosts | Responds a 200 OK if all virtual hosts and running on the target node, otherwise responds with a 503 Service Unavailable. | |||
X | /api/health/checks/node-is-mirror-sync-critical | Checks if there are classic mirrored queues without synchronised mirrors online (queues that would potentially lose data if the target node is shut down). Responds a 200 OK if there are no such classic mirrored queues, otherwise responds with a 503 Service Unavailable. | |||
X | /api/health/checks/node-is-quorum-critical | Checks if there are quorum queues with minimum online quorum (queues that would lose their quorum and availability if the target node is shut down). Responds a 200 OK if there are no such quorum queues, otherwise responds with a 503 Service Unavailable. | |||
X | /api/vhost-limits | Lists per-vhost limits for all vhosts. | |||
X | /api/vhost-limits/vhost | Lists per-vhost limits for specific vhost. | |||
X | X | /api/vhost-limits/vhost/name |
Set or delete per-vhost limit for vhost . The name URL path element
refers to the name of the limit (max-connections , max-queues ).
Limits are set using a JSON document in the body: {"value": 100}. Example request: curl -4u 'guest:guest' -H 'content-type:application/json' -X PUT localhost:15672/api/vhost-limits/my-vhost/max-connections -d '{"value": 50}' |
||
X | /api/auth |
Details about the OAuth2 configuration. It will return HTTP
status 200 with body: {"oauth_enabled":"boolean", "oauth_client_id":"string", "oauth_provider_url":"string"} |
|||
X | /api/rebalance/queues |
Rebalances all queues in all vhosts. This operation is asynchronous therefore please check
the RabbitMQ log file for messages regarding the success or failure of the operation.
curl -4u 'guest:guest' -XPOST localhost:15672/api/rebalance/queues/ |
|||
X | /api/federation-links /api/federation-links/vhost |
Provides status for all federation links. Requires the rabbitmq_federation_management plugin to be enabled.
|
|||
X | X | /api/auth/attempts/node | A list of authentication attempts. | ||
X | X | /api/auth/attempts/node/source | A list of authentication attempts by remote address and username. |
实现
通过对 RestTemplate 封装过的HttpClient 调用RabbitMQ的客户端,实现创建 VHost 的需求
public void createVHost(String host,int managerPort,String userName,String password,String vhost) throws Exception {
String url = String.format("http://%s:%d/api/vhosts/%s", host, managerPort,vhost);
Map<String,String> header = new HashMap<>();
header.put("Content-Type","application/json");
header.put("Authorization","Basic "+ Base64.getEncoder().encodeToString((userName + ":" + password).getBytes(StandardCharsets.UTF_8)));
getHttpClient().put(url, header,"{}");
}