Get started
This guide walks you through the quickest way to get started with the following methods to manage topics.
- pulsar-admin
- REST API
- Java
pulsar-admin CLI is a command-line tool and is available in the bin folder of your Pulsar installation.
REST API belongs to HTTP calls, which are made against the admin APIs provided by brokers. In addition, both the Java admin API and pulsar-admin CLI use the REST API.
Java admin API is a programmable interface written in Java.
Check the detailed steps below.
- pulsar-admin
- REST API
- Java
To manage topics using pulsar-admin CLI, complte the following steps.
Set the service URL.
Create a partitioned topic.
Update the number of a partition.
Produce messages to the topic.
Check the stats of the topic.
Delete the topic.
Prerequisites
- Install and start Pulsar standalone. This tutorial runs Pulsar 2.11 as an example.
Steps
Step 1: Set the service URLs to point to the broker service in client.conf.
```bash
webServiceUrl=http://localhost:8080/
brokerServiceUrl=pulsar://localhost:6650/
```
Step 2: Create a persistent topic named test-topic-1 with 6 partitions.
**Input**
```bash
bin/pulsar-admin topics create-partitioned-topic \
persistent://public/default/test-topic-1 \
--partitions 6
```
**Output**
There is no output. You can check the status of the topic in Step 5.
Step 3: Update the number of the partition to 8.
**Input**
```bash
bin/pulsar-admin topics update-partitioned-topic \
persistent://public/default/test-topic-1 \
--partitions 8
```
**Output**
There is no output. You can check the number of partitions in Step 5.
Step 4: Produce some messages to the partitioned topic test-topic-1.
**Input**
```bash
bin/pulsar-perf produce -u pulsar://localhost:6650 -r 1000 -i 1000 persistent://public/default/test-topic-1
```
**Output**
```bash
2023-03-07T15:33:56,832+0800 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Starting Pulsar perf producer with config: {
"confFile" : "/Users/yu/apache-pulsar-2.11.0/conf/client.conf",
"serviceURL" : "pulsar://localhost:6650",
"authPluginClassName" : "",
"authParams" : "",
"tlsTrustCertsFilePath" : "",
"tlsAllowInsecureConnection" : false,
"tlsHostnameVerificationEnable" : false,
"maxConnections" : 1,
"statsIntervalSeconds" : 1000,
"ioThreads" : 1,
"enableBusyWait" : false,
"listenerName" : null,
"listenerThreads" : 1,
"maxLookupRequest" : 50000,
"topics" : [ "persistent://public/default/test-topic-1" ],
"numTestThreads" : 1,
"msgRate" : 1000,
"msgSize" : 1024,
"numTopics" : 1,
"numProducers" : 1,
"separator" : "-",
"sendTimeout" : 0,
"producerName" : null,
"adminURL" : "http://localhost:8080/",
...
2023-03-07T15:35:03,769+0800 [Thread-0] INFO org.apache.pulsar.testclient.PerformanceProducer - Aggregated latency stats --- Latency: mean: 8.931 ms - med: 3.775 - 95pct: 32.144 - 99pct: 98.432 - 99.9pct: 216.088 - 99.99pct: 304.807 - 99.999pct: 349.391 - Max: 351.235
```
Step 5: Check the internal stats of the partitioned topic test-topic-1.
**Input**
```bash
bin/pulsar-admin topics partitioned-stats-internal \
persistent://public/default/test-topic-1
```
**Output**
Below is a part of the output. For detailed explanations of topic stats, see Pulsar statistics.
```bash
{
"metadata" : {
"partitions" : 8
},
"partitions" : {
"persistent://public/default/test-topic-1-partition-1" : {
"entriesAddedCounter" : 4213,
"numberOfEntries" : 4213,
"totalSize" : 8817693,
"currentLedgerEntries" : 4212,
"currentLedgerSize" : 8806289,
"lastLedgerCreatedTimestamp" : "2023-03-07T15:33:59.367+08:00",
"waitingCursorsCount" : 0,
"pendingAddEntriesCount" : 0,
"lastConfirmedEntry" : "65:4211",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 49,
"entries" : 1,
"size" : 11404,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 65,
"entries" : 0,
"size" : 0,
"offloaded" : false,
"underReplicated" : false
} ],
"cursors" : {
"test-subscriptio-1" : {
"markDeletePosition" : "49:-1",
"readPosition" : "49:0",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 0,
"cursorLedger" : -1,
"cursorLedgerLastEntry" : -1,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2023-03-06T16:41:32.801+08:00",
"state" : "NoLedger",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"subscriptionHavePendingRead" : false,
"subscriptionHavePendingReplayRead" : false,
"properties" : { }
},
"test-subscription-1" : {
"markDeletePosition" : "49:-1",
"readPosition" : "49:0",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 0,
"cursorLedger" : -1,
"cursorLedgerLastEntry" : -1,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2023-03-06T16:41:32.801+08:00",
"state" : "NoLedger",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"subscriptionHavePendingRead" : false,
"subscriptionHavePendingReplayRead" : false,
"properties" : { }
}
},
"schemaLedgers" : [ ],
"compactedLedger" : {
"ledgerId" : -1,
"entries" : -1,
"size" : -1,
"offloaded" : false,
"underReplicated" : false
}
},
...
```
Step 6: Delete the topic test-topic-1.
**Input**
```bash
bin/pulsar-admin topics delete-partitioned-topic persistent://public/default/test-topic-1
```
**Output**
There is no output. You can verify whether the _test-topic-1_ exists or not using the following command.
**Input**
List topics in `public/default` namespace.
```bash
bin/pulsar-admin topics list public/default
```
To manage topics using REST API, complete the following steps.
Create a partitioned topic
Update the number of a partition.
Produce messages to the topic.
Check the stats of the topic.
Delete the topic.
Prerequisites
- Install and start Pulsar standalone. This tutorial runs Pulsar 2.11 as an example.
Steps
Step 1: Create a persistent topic named test-topic-2 with 4 partitions.
**Input**
```bash
curl -X PUT http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitions -H 'Content-Type: application/json' -d "4"
```
**Output**
There is no output. You can check the topic in Step 4.
Step 2: Update the number of the partition to 5.
**Input**
```bash
curl -X POST http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitions -H 'Content-Type: application/json' -d "5"
```
**Output**
There is no output. You can check the status of the topic in Step 4.
Step 3: Produce some messages to the partitioned topic test-topic-2.
**Input**
```bash
bin/pulsar-perf produce -u pulsar://localhost:6650 -r 1000 -i 1000 persistent://public/default/test-topic-2
```
**Output**
```bash
2023-03-08T15:47:06,268+0800 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Starting Pulsar perf producer with config: {
"confFile" : "/Users/yu/apache-pulsar-2.11.0/conf/client.conf",
"serviceURL" : "pulsar://localhost:6650",
"authPluginClassName" : "",
"authParams" : "",
"tlsTrustCertsFilePath" : "",
"tlsAllowInsecureConnection" : false,
"tlsHostnameVerificationEnable" : false,
"maxConnections" : 1,
"statsIntervalSeconds" : 1000,
"ioThreads" : 1,
"enableBusyWait" : false,
"listenerName" : null,
"listenerThreads" : 1,
"maxLookupRequest" : 50000,
"topics" : [ "persistent://public/default/test-topic-2" ],
"numTestThreads" : 1,
"msgRate" : 1000,
"msgSize" : 1024,
"numTopics" : 1,
"numProducers" : 1,
"separator" : "-",
"sendTimeout" : 0,
"producerName" : null,
"adminURL" : "http://localhost:8080/",
"deprecatedAuthPluginClassName" : null,
"maxOutstanding" : 0,
"maxPendingMessagesAcrossPartitions" : 0,
"partitions" : null,
"numMessages" : 0,
"compression" : "NONE",
"payloadFilename" : null,
"payloadDelimiter" : "\\n",
"batchTimeMillis" : 1.0,
"batchMaxMessages" : 1000,
"batchMaxBytes" : 4194304,
"testTime" : 0,
"warmupTimeSeconds" : 1.0,
"encKeyName" : null,
"encKeyFile" : null,
"delay" : 0,
"exitOnFailure" : false,
"messageKeyGenerationMode" : null,
"producerAccessMode" : "Shared",
"formatPayload" : false,
"formatterClass" : "org.apache.pulsar.testclient.DefaultMessageFormatter",
"transactionTimeout" : 10,
"numMessagesPerTransaction" : 50,
"isEnableTransaction" : false,
"isAbortTransaction" : false,
"histogramFile" : null
}
...
2023-03-08T15:53:28,178+0800 [Thread-0] INFO org.apache.pulsar.testclient.PerformanceProducer - Aggregated latency stats --- Latency: mean: 4.481 ms - med: 2.918 - 95pct: 10.710 - 99pct: 38.928 - 99.9pct: 112.689 - 99.99pct: 154.241 - 99.999pct: 193.249 - Max: 241.717
```
Step 4: Check the internal stats of the topic test-topic-2.
**Input**
```bash
curl -X GET http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitioned-internalStats
```
**Output**
For detailed explanations of topic stats, see Pulsar statistics.
```bash
{"metadata":{"partitions":5},"partitions":{"persistent://public/default/test-topic-2-partition-3":{"entriesAddedCounter":47087,"numberOfEntries":47087,"totalSize":80406959,"currentLedgerEntries":47087,"currentLedgerSize":80406959,"lastLedgerCreatedTimestamp":"2023-03-08T15:47:07.273+08:00","waitingCursorsCount":0,"pendingAddEntriesCount":0,"lastConfirmedEntry":"117:47086","state":"LedgerOpened","ledgers":[{"ledgerId":117,"entries":0,"size":0,"offloaded":false,"underReplicated":false}],"cursors":{},"schemaLedgers":[],"compactedLedger":{"ledgerId":-1,"entries":-1,"size":-1,"offloaded":false,"underReplicated":false}},"persistent://public/default/test-topic-2-partition-2":{"entriesAddedCounter":46995,"numberOfEntries":46995,"totalSize":80445417,"currentLedgerEntries":46995,"currentLedgerSize":80445417,"lastLedgerCreatedTimestamp":"2023-03-08T15:47:07.43+08:00","waitingCursorsCount":0,"pendingAddEntriesCount":0,"lastConfirmedEntry":"118:46994","state":"LedgerOpened","ledgers":[{"ledgerId":118,"entries":0,"size":0,"offloaded":false,"underReplicated":false}],...
```
Step 5: Delete the topic test-topic-2.
**Input**
```
curl -X DELETE http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitions
```
**Output**
There is no output. You can verify whether the _test-topic-2_ exists or not using the following command.
**Input**
List topics in `public/default` namespace.
```
curl -X GET http://localhost:8080/admin/v2/persistent/public/default
```
To manage topics using Java admin API, complete following steps.
Initiate a Pulsar Java client.
Create a partitioned topic
Update the number of a partition.
Produce messages to the topic.
Check the stats of the topic.
Delete the topic.
Prerequisites
Prepare a Java project and add the following dependency to your POM file.
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin</artifactId>
<version>2.11.0</version>
</dependency>
Steps
Step 1: Initiate a Pulsar Java client in your Java project.
**Input**
```java
String url = "http://localhost:8080";
PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(url)
.build();
```
Step 2: Create a partitioned topic test-topic-1 with 4 partitions.
**Input**
```java
admin.topics().createPartitionedTopic("persistent://public/default/test-topic-1", 4);
```
Step 3: Update the number of the partition to 5.
**Input**
```java
admin.topics().updatePartitionedTopic("test-topic-1", 5);
```
Step 4: Produce some messages to the topic test-topic-1.
**Input**
```java
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("test-topic-1")
.blockIfQueueFull(true)
.create();
for (int i = 0; i < 100; ++i) {
producer.newMessage().value("test").send();
}
producer.close();
client.close();
```
Step 5: Check the stats of the topic test-topic-1.
**Input**
```java
PartitionedTopicStats stats = admin.topics().getPartitionedStats("persistent://public/default/test-topic-1",false);
System.out.println(stats.getMsgInCounter());
```
**Output**
```java
100
```
Step 6: Delete the topic test-topic-1.
**Input**
```java
admin.topics().deletePartitionedTopic("test-topic-1");
```
Related topics
To understand basics, see Pulsar admin API - Overview
To learn usage scenarios, see Pulsar admin API - Use cases.
To learn common administrative tasks, see Pulsar admin API - Features.
To perform administrative operations, see Pulsar admin API - Tools.
To check the detailed usage, see the references below.
Pulsar admin APIs