动态创建RabbitMQ的VirtualHost

young 21 2024-12-31

背景

项目中遇到一个场景,在运行时动态的创建 RabbitMQ 的 VHost,同时在这个VHost下创建对应的交换机、路由、队列。

正常我们使用 SpringBoot 封装的 RabbitTemplate时,在配置中仅能配置一个VHost,通过查看源码,发现其 VHost的配置其实是与ConnectionFactory绑定的,因此如果需要让 RabbitTemplate 在不同的VHost上进行消息发送,或者通过 RabbitAdmin进行Exchange,Queue,RoutingKey的配置的情况下,需要创建多个ConnectionFactory,也就是意味着需要创建多个 RabbitTemplate 实例。

接着通过直接挪用RabbitAutoconfiguration源码,实现了根据配置创建的不同的ConnectionFactory以及对应的 RabbitTemplate和 RabbitAdmin,这里不多赘述,但是项目启动之后,发现VHost 并不会自动创建,需要进行手动创建,如果通过控制台页面,那么很好解决这个问题,但是需要通过代码处理时,发现没有相关的API进行此操作。

管理API

经过查找,发现在 RabbitMQ的控制台页面中,有一个 HTTP API的按钮,这里面说明了RabbitMQ管理使用的所有 API,具体如下表,同时指出了,访问时要使用HTTP basic authentication进行认证,Content-Type 需要为application/json,如果是/VHost需要转义为%2F,下表中X表示支持该Rest操作

GET PUT DELETE POST Path Description
X /api/overview Various random bits of information that describe the whole system.
X X /api/cluster-name Name identifying this RabbitMQ cluster.
X /api/nodes A list of nodes in the RabbitMQ cluster.
X /api/nodes/name An individual node in the RabbitMQ cluster. Add "?memory=true" to get memory statistics, and "?binary=true" to get a breakdown of binary memory use (may be expensive if there are many small binaries in the system).
X /api/extensions A list of extensions to the management plugin.
X X /api/definitions
/api/all-configuration (deprecated)
The server definitions - exchanges, queues, bindings, users, virtual hosts, permissions, topic permissions, and parameters. Everything apart from messages. POST to upload an existing set of definitions. Note that:
  • The definitions are merged. Anything already existing on the server but not in the uploaded definitions is untouched.
  • Conflicting definitions on immutable objects (exchanges, queues and bindings) will be ignored. The existing definition will be preserved.
  • Conflicting definitions on mutable objects will cause the object in the server to be overwritten with the object from the definitions.
  • In the event of an error you will be left with a part-applied set of definitions.
For convenience you may upload a file from a browser to this URI (i.e. you can use multipart/form-data as well as application/json) in which case the definitions should be uploaded as a form field named "file".
X X /api/definitions/vhost
The server definitions for a given virtual host - exchanges, queues, bindings and policies. POST to upload an existing set of definitions. Note that:
  • The definitions are merged. Anything already existing on the server but not in the uploaded definitions is untouched.
  • Conflicting definitions on immutable objects (exchanges, queues and bindings) will be ignored. The existing definition will be preserved.
  • Conflicting definitions on mutable objects will cause the object in the server to be overwritten with the object from the definitions.
  • In the event of an error you will be left with a part-applied set of definitions.
For convenience you may upload a file from a browser to this URI (i.e. you can use multipart/form-data as well as application/json) in which case the definitions should be uploaded as a form field named "file".
X /api/connections A list of all open connections. Use pagination parameters to filter connections.
X /api/vhosts/vhost/connections A list of all open connections in a specific virtual host. Use pagination parameters to filter connections.
X X /api/connections/name An individual connection. DELETEing it will close the connection. Optionally set the "X-Reason" header when DELETEing to provide a reason.
X X /api/connections/username/username A list of all open connections for a specific username. Use pagination parameters to filter connections. DELETEing a resource will close all the connections for a username. Optionally set the "X-Reason" header when DELETEing to provide a reason.
X /api/connections/name/channels List of all channels for a given connection.
X /api/channels A list of all open channels. Use pagination parameters to filter channels.
X /api/vhosts/vhost/channels A list of all open channels in a specific virtual host. Use pagination parameters to filter channels.
X /api/channels/channel Details about an individual channel.
X /api/consumers A list of all consumers.
X /api/consumers/vhost A list of all consumers in a given virtual host.
X /api/exchanges A list of all exchanges. Use pagination parameters to filter exchanges.
X /api/exchanges/vhost A list of all exchanges in a given virtual host. Use pagination parameters to filter exchanges.
X X X /api/exchanges/vhost/name An individual exchange. To PUT an exchange, you will need a body looking something like this:
{"type":"direct","auto_delete":false,"durable":true,"internal":false,"arguments":{}}
The type key is mandatory; other keys are optional.

When DELETEing an exchange you can add the query string parameter if-unused=true. This prevents the delete from succeeding if the exchange is bound to a queue or as a source to another exchange.

X /api/exchanges/vhost/name/bindings/source A list of all bindings in which a given exchange is the source.
X /api/exchanges/vhost/name/bindings/destination A list of all bindings in which a given exchange is the destination.
X /api/exchanges/vhost/name/publish Publish a message to a given exchange. You will need a body looking something like:
{"properties":{},"routing_key":"my key","payload":"my body","payload_encoding":"string"}
All keys are mandatory. The payload_encoding key should be either "string" (in which case the payload will be taken to be the UTF-8 encoding of the payload field) or "base64" (in which case the payload field is taken to be base64 encoded).
If the message is published successfully, the response will look like:
{"routed": true}
routed will be true if the message was sent to at least one queue.

Please note that the HTTP API is not ideal for high performance publishing; the need to create a new TCP connection for each message published can limit message throughput compared to AMQP or other protocols using long-lived connections.

X /api/queues A list of all queues. Use pagination parameters to filter queues.
X /api/queues/vhost A list of all queues in a given virtual host. Use pagination parameters to filter queues.
X X X /api/queues/vhost/name An individual queue. To PUT a queue, you will need a body looking something like this:
{"auto_delete":false,"durable":true,"arguments":{},"node":"rabbit@smacmullen"}
All keys are optional.

When DELETEing a queue you can add the query string parameters if-empty=true and / or if-unused=true. These prevent the delete from succeeding if the queue contains messages, or has consumers, respectively.

X /api/queues/vhost/name/bindings A list of all bindings on a given queue.
X /api/queues/vhost/name/contents Contents of a queue. DELETE to purge. Note you can't GET this.
X /api/queues/vhost/name/actions Actions that can be taken on a queue. POST a body like:
{"action":"sync"}
Currently the actions which are supported are sync and cancel_sync.
X /api/queues/vhost/name/get Get messages from a queue. (This is not an HTTP GET as it will alter the state of the queue.) You should post a body looking like:
{"count":5,"ackmode":"ack_requeue_true","encoding":"auto","truncate":50000}
  • count controls the maximum number of messages to get. You may get fewer messages than this if the queue cannot immediately provide them.
  • ackmode determines whether the messages will be removed from the queue. If ackmode is ack_requeue_true or reject_requeue_true they will be requeued - if ackmode is ack_requeue_false or reject_requeue_false they will be removed.
  • encoding must be either "auto" (in which case the payload will be returned as a string if it is valid UTF-8, and base64 encoded otherwise), or "base64" (in which case the payload will always be base64 encoded).
  • If truncate is present it will truncate the message payload if it is larger than the size given (in bytes).

truncate is optional; all other keys are mandatory.

Please note that the get path in the HTTP API is intended for diagnostics etc - it does not implement reliable delivery and so should be treated as a sysadmin's tool rather than a general API for messaging.

X /api/bindings A list of all bindings.
X /api/bindings/vhost A list of all bindings in a given virtual host.
X X /api/bindings/vhost/e/exchange/q/queue

A list of all bindings between an exchange and a queue. Remember, an exchange and a queue can be bound together many times!

To create a new binding, POST to this URI. Request body should be a JSON object optionally containing two fields, routing_key (a string) and arguments (a map of optional arguments):

{"routing_key":"my_routing_key", "arguments":{"x-arg": "value"}}
All keys are optional. The response will contain a Location header telling you the URI of your new binding.

X X /api/bindings/vhost/e/exchange/q/queue/props An individual binding between an exchange and a queue. The props part of the URI is a "name" for the binding composed of its routing key and a hash of its arguments. props is the field named "properties_key" from a bindings listing response.
X X /api/bindings/vhost/e/source/e/destination

A list of all bindings between two exchanges, similar to the list of all bindings between an exchange and a queue, above.

To create a new binding, POST to this URI. Request body should be a JSON object optionally containing two fields, routing_key (a string) and arguments (a map of optional arguments):

{"routing_key":"my_routing_key", "arguments":{"x-arg": "value"}}
All keys are optional. The response will contain a Location header telling you the URI of your new binding.

X X /api/bindings/vhost/e/source/e/destination/props An individual binding between two exchanges. Similar to the individual binding between an exchange and a queue, above.
X /api/vhosts A list of all vhosts.
X X X /api/vhosts/name An individual virtual host. As a virtual host usually only has a name, you do not need an HTTP body when PUTing one of these. To set metadata on creation, provide a body like the following:
{"description":"virtual host description", "tags":"accounts,production"}
tags is a comma-separated list of tags. These metadata fields are optional. To enable / disable tracing, provide a body looking like:
{"tracing":true}
X /api/vhosts/name/permissions A list of all permissions for a given virtual host.
X /api/vhosts/name/topic-permissions A list of all topic permissions for a given virtual host.
X /api/vhosts/name/start/node Starts virtual host name on node node.
X /api/users/ A list of all users.
X /api/users/without-permissions A list of users that do not have access to any virtual host.
X /api/users/bulk-delete Bulk deletes a list of users. Request body must contain the list:
{"users" : ["user1", "user2", "user3"]}
X X X /api/users/name An individual user. To PUT a user, you will need a body looking something like this:
{"password":"secret","tags":"administrator"}
or:
{"password_hash":"2lmoth8l4H0DViLaK9Fxi6l9ds8=", "tags":"administrator"}
The tags key is mandatory. Either password or password_hash can be set. If neither are set the user will not be able to log in with a password, but other mechanisms like client certificates may be used. Setting password_hash to "" will ensure the user cannot use a password to log in. tags is a comma-separated list of tags for the user. Currently recognised tags are administrator, monitoring and management. password_hash must be generated using the algorithm described here. You may also specify the hash function being used by adding the hashing_algorithm key to the body. Currently recognised algorithms are rabbit_password_hashing_sha256, rabbit_password_hashing_sha512, and rabbit_password_hashing_md5.
X /api/users/user/permissions A list of all permissions for a given user.
X /api/users/user/topic-permissions A list of all topic permissions for a given user.
X /api/user-limits Lists per-user limits for all users.
X /api/user-limits/user Lists per-user limits for a specific user.
X X /api/user-limits/user/name Set or delete per-user limit for user. The name URL path element refers to the name of the limit (max-connections, max-channels). Limits are set using a JSON document in the body:
{"value": 100}
. Example request:
curl -4u 'guest:guest' -H 'content-type:application/json' -X PUT localhost:15672/api/user-limits/guest/max-connections -d '{"value": 50}'
X /api/whoami Details of the currently authenticated user.
X /api/permissions A list of all permissions for all users.
X X X /api/permissions/vhost/user An individual permission of a user and virtual host. To PUT a permission, you will need a body looking something like this:
{"configure":".*","write":".*","read":".*"}
All keys are mandatory.
X /api/topic-permissions A list of all topic permissions for all users.
X X X /api/topic-permissions/vhost/user Topic permissions for a user and virtual host. To PUT a topic permission, you will need a body looking something like this:
{"exchange":"amq.topic","write":"^a","read":".*"}
All keys are mandatory.
X /api/parameters A list of all vhost-scoped parameters.
X /api/parameters/component A list of all vhost-scoped parameters for a given component.
X /api/parameters/component/vhost A list of all vhost-scoped parameters for a given component and virtual host.
X X X /api/parameters/component/vhost/name An individual vhost-scoped parameter. To PUT a parameter, you will need a body looking something like this:
{"vhost": "/","component":"federation","name":"local_username","value":"guest"}
X /api/global-parameters A list of all global parameters.
X X X /api/global-parameters/name An individual global parameter. To PUT a parameter, you will need a body looking something like this:
{"name":"user_vhost_mapping","value":{"guest":"/","rabbit":"warren"}}
X /api/policies A list of all policies.
X /api/policies/vhost A list of all policies in a given virtual host.
X X X /api/policies/vhost/name An individual policy. To PUT a policy, you will need a body looking something like this:
{"pattern":"^amq.", "definition": {"federation-upstream-set":"all"}, "priority":0, "apply-to": "all"}
pattern and definition are mandatory, priority and apply-to are optional.
X /api/operator-policies A list of all operator policy overrides.
X /api/operator-policies/vhost A list of all operator policy overrides in a given virtual host.
X X X /api/operator-policies/vhost/name An individual operator policy. To PUT a policy, you will need a body looking something like this:
{"pattern":"^amq.", "definition": {"expires":100}, "priority":0, "apply-to": "queues"}
pattern and definition are mandatory, priority and apply-to are optional.
X /api/aliveness-test/vhost Declares a test queue on the target node, then publishes and consumes a message. Intended to be used as a very basic health check. Responds a 200 OK if the check succeeded, otherwise responds with a 503 Service Unavailable.
X /api/health/checks/alarms Responds a 200 OK if there are no alarms in effect in the cluster, otherwise responds with a 503 Service Unavailable.
X /api/health/checks/local-alarms Responds a 200 OK if there are no local alarms in effect on the target node, otherwise responds with a 503 Service Unavailable.
X /api/health/checks/certificate-expiration/within/unit

Checks the expiration date on the certificates for every listener configured to use TLS. Responds a 200 OK if all certificates are valid (have not expired), otherwise responds with a 503 Service Unavailable.

Valid units: days, weeks, months, years. The value of the within argument is the number of units. So, when within is 2 and unit is "months", the expiration period used by the check will be the next two months.

X /api/health/checks/port-listener/port Responds a 200 OK if there is an active listener on the give port, otherwise responds with a 503 Service Unavailable.
X /api/health/checks/protocol-listener/protocol Responds a 200 OK if there is an active listener for the given protocol, otherwise responds with a 503 Service Unavailable. Valid protocol names are: amqp091, amqp10, mqtt, stomp, web-mqtt, web-stomp.
X /api/health/checks/virtual-hosts Responds a 200 OK if all virtual hosts and running on the target node, otherwise responds with a 503 Service Unavailable.
X /api/health/checks/node-is-mirror-sync-critical Checks if there are classic mirrored queues without synchronised mirrors online (queues that would potentially lose data if the target node is shut down). Responds a 200 OK if there are no such classic mirrored queues, otherwise responds with a 503 Service Unavailable.
X /api/health/checks/node-is-quorum-critical Checks if there are quorum queues with minimum online quorum (queues that would lose their quorum and availability if the target node is shut down). Responds a 200 OK if there are no such quorum queues, otherwise responds with a 503 Service Unavailable.
X /api/vhost-limits Lists per-vhost limits for all vhosts.
X /api/vhost-limits/vhost Lists per-vhost limits for specific vhost.
X X /api/vhost-limits/vhost/name Set or delete per-vhost limit for vhost. The name URL path element refers to the name of the limit (max-connections, max-queues). Limits are set using a JSON document in the body:
{"value": 100}
. Example request:
curl -4u 'guest:guest' -H 'content-type:application/json' -X PUT localhost:15672/api/vhost-limits/my-vhost/max-connections -d '{"value": 50}'
X /api/auth Details about the OAuth2 configuration. It will return HTTP status 200 with body:
{"oauth_enabled":"boolean", "oauth_client_id":"string", "oauth_provider_url":"string"}
X /api/rebalance/queues Rebalances all queues in all vhosts. This operation is asynchronous therefore please check the RabbitMQ log file for messages regarding the success or failure of the operation.
curl -4u 'guest:guest' -XPOST localhost:15672/api/rebalance/queues/
X /api/federation-links
/api/federation-links/vhost
Provides status for all federation links. Requires the rabbitmq_federation_management plugin to be enabled.
X X /api/auth/attempts/node A list of authentication attempts.
X X /api/auth/attempts/node/source A list of authentication attempts by remote address and username.

实现

通过对 RestTemplate 封装过的HttpClient 调用RabbitMQ的客户端,实现创建 VHost 的需求

public void createVHost(String host,int managerPort,String userName,String password,String vhost) throws Exception {
      String url = String.format("http://%s:%d/api/vhosts/%s", host, managerPort,vhost);
      Map<String,String> header = new HashMap<>();
      header.put("Content-Type","application/json");
      header.put("Authorization","Basic "+ Base64.getEncoder().encodeToString((userName + ":" + password).getBytes(StandardCharsets.UTF_8)));
      getHttpClient().put(url, header,"{}");
  }