Apache Pulsar Educational Videos & Tutorials

The videos and tutorials that follow will help you learn Apache Pulsar.  This Apache Pulsar Learning Hub is split into three parts:

  • Part I:   Learn how Pulsar works and the theory behind its structure.
  • Part II:  Work through the installation and configuration of Pulsar for a simple use case.  
  • Part III: Copy and paste code samples used for the installation and real world example.

These Pulsar learning resources are brought to you by Dattell, a full service managed Pulsar provider.  Learn more about  24x7x365 managed Pulsar services on your environment.

Part I: Apache Pulsar Basics & Benefits

Learn about the benefits of Apache Pulsar and the basics of its structure.  Topics covered include: Pulsar’s Two-Layer Structure, Subscription Types, Message Guarantees, Schema Registry, Pulsar Functions, BookKeeper for Pulsar, Benefits of the Two-Layer System, Bookie Failures, Tiered Storage, ZooKeeper for Pulsar, Geo-Replication, and Security.

Part II: Apache Pulsar Installation & Configuration

The videos below are a complete tutorial for setting up Apache Pulsar on a virtual machine for a simple use case.  We cover configurations needed for the operating system, ZooKeeper, BookKeeper, and Pulsar.  The last video is an example real world project. 

Part III: Apache Pulsar Installation Reference Code

Below are code samples that are used in the above Pulsar installation tutorial and example project.  The code can be copied and pasted into your implementation.

Code for Pulsar Installation Tutorial

I. Pulsar Installation Notes

				
					# setup environment

download/install virtualbox
https://www.virtualbox.org/wiki/Downloads

download/install centos
https://www.centos.org/download/

create VM

#update OS
yum update -y

install guest tools

# modify OS params

share vim tutorial
https://opensource.com/article/19/3/getting-started-vim

add user to sudoers

/etc/sysctl.conf
	vm.swappiness=0
	fs.file-max=100000

/etc/pam.d/login 
	session required pam_limits.so

/etc/security/limits.conf
	pulsar soft nofile unlimited
	pulsar hard nofile unlimited

create clones

change hostnames

add to /etc/hosts file

install clusterssh

verify connectivity



systemctl stop firewalld
systemctl disable firewalld

install java
yum install java-11-openjdk

# create install dir in opt
mkdir -p /opt/training/apps
mkdir -p /opt/training/src

download pulsar
cd /opt/training/src
wget https://archive.apache.org/dist/pulsar/pulsar-2.7.0/apache-pulsar-2.7.0-bin.tar.gz



# untar src files
cd /opt/training/apps
tar -xf ../src/apache-pulsar-2.7.0-bin.tar.gz

# create symlink in apps dir
ln -s apache-pulsar-2.7.0/ pulsar

# edit zookeeper config
mkdir -p /opt/training/apps/data/zookeeper
vim /opt/training/apps/data/zookeeper/myid

vim /opt/training/apps/pulsar/conf/zookeeper.conf

	dataDir=/opt/training/apps/data/zookeeper

	server.1=192.168.1.145:2888:3888
	server.2=192.168.1.140:2888:3888
	server.3=192.168.1.119:2888:3888

# add users
useradd -M pulsar
usermod -L pulsar

chown -R pulsar:pulsar /opt/training 


vim /etc/systemd/system/zookeeper.service
systemctl daemon-reload
systemctl start zookeeper

bin/pulsar initialize-cluster-metadata   --cluster pulsar-training-01   --zookeeper pulsartraining01:2181   --configuration-store pulsartraining01:2181   --web-service-url http://pulsartraining01:8080,pulsartraining02:8080,pulsartraining03:8080   --web-service-url-tls https://pulsartraining01:8443,pulsartraining02:8443,pulsartraining03:8443   --broker-service-url pulsar://pulsartraining01:6650,pulsartraining02:6650,pulsartraining03:8443   --broker-service-url-tls pulsar+ssl://pulsartraining01:6651,pulsartraining02:6651,pulsartraining03:6652




# start zookeeper if restarted
systemctl start zookeeper

# edit bookkeeper config files


zkServers=pulsartraining01:2181,pulsartraining02:2181,pulsartraining03:2181

vim /etc/systemd/system/bookkeeper.service

systemctl daemon-reload
systemctl start bookkeeper

# test bookkeeper
bin/bookkeeper shell bookiesanity


bin/bookkeeper shell simpletest --ensemble 3 --writeQuorum 2 --ackQuorum 2 --numEntries 2






#edit pulsar.conf file

zookeeperServers=pulsartraining01:2181,pulsartraining02:2181,pulsartraining03:2181
configurationStoreServers=pulsartraining01:2181,pulsartraining02:2181,pulsartraining03:2181

clusterName=pulsar-training-01

brokerServicePort=6650
brokerServicePortTls=6651
webServicePort=8080
webServicePortTls=8443

#edit client.conf

webServiceUrl=http://pulsartraining01:8080
brokerServiceurl=pulsar://pulsartraining01:6650

# add service files
vim /etc/systemd/system/pulsar.service

# reload daemon
systemctl daemon-reload

systemctl start pulsar

# test pulsar

bin/pulsar-client produce persistent://public/default/test -n 1 -m "Hello world Pulsar edition"

bin/pulsar-client consume persistent://public/default/test -n 100 -s "consumer-subscription-name" -t "Exclusive"







##  editing heap sizes, turning off swap

swapoff -a

vim /etc/fstab

#zookeeper
mkdir /opt/training/apps/conf

vim /opt/training/apps/conf/java.env
	export SERVER_JVMFLAGS="-Xms512m -Xmx512m -XX:+AlwaysPreTouch"

#bookkeeper
vim /opt/training/apps/pulsar/conf/bkenv.sh
	#BOOKIE_MEM=${BOOKIE_MEM:-${PULSAR_MEM:-"-Xms2g -Xmx2g -XX:MaxDirectMemorySize=2g"}}
	BOOKIE_MEM=${BOOKIE_MEM:-${PULSAR_MEM:-"-Xms512m -Xmx512m -XX:MaxDirectMemorySize=512m"}}

#pulsar
vim /opt/training/apps/pulsar/conf/pulsar_env.sh
	#PULSAR_MEM=${PULSAR_MEM:-"-Xms2g -Xmx2g -XX:MaxDirectMemorySize=4g"}
	PULSAR_MEM=${PULSAR_MEM:-"-Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g"}







##  Example use case

# message deduplication
vim /opt/training/apps/pulsar/conf/broker.conf
	brokerDeduplicationEnabled=true

#### create partitioned topic
# create tenant
./pulsar-admin tenants create pulsartraining-tenant1

# create namespace
./pulsar-admin namespaces create pulsartraining-tenant1/pulsartraining-namespace1

# create partitioned topic
./pulsar-admin topics create-partitioned-topic persistent://pulsartraining-tenant1/pulsartraining-namespace1/stock-topic-partitioned1  --partitions 12

# message deduplication
vim /opt/training/apps/pulsar/conf/broker.conf
	brokerDeduplicationEnabled=true

# do not configure transactions
				
			

II. ZooKeeper Installation Code

				
					[Unit]
Description=Zookeeper
Documentation=http://zookeeper.apache.org
Requires=network.target
After=network.target
After=syslog.target

[Service]
Type=simple
WorkingDirectory=/opt/training/apps
User=pulsar
Group=pulsar
Environment=OPTS=-Dstats_server_port=8001
ExecStart=/opt/training/apps/pulsar/bin/pulsar zookeeper
TimeoutSec=20
Restart=on-failure

[Install]
WantedBy=default.target
				
			

III. BookKeeper Installation Code

				
					[Unit]
Description=BookKeeper
Documentation=https://bookkeeper.apache.org
Wants=network-online.target
After=network-online.target
After=syslog.target

[Service]
Type=simple
ExecStart=/opt/training/apps/pulsar/bin/pulsar bookie
WorkingDirectory=/opt/training/apps
Restart=on-failure

PrivateTmp=true

User=pulsar
Group=pulsar

# Specifies the maximum file descriptor number that can be opened by this process
LimitNOFILE=655350

# Specifies the maximum number of processes
LimitNPROC=4096

# Specifies the maximum size of virtual memory
LimitAS=infinity

# Specifies the maximum file size
LimitFSIZE=infinity


[Install]
WantedBy=multi-user.target
				
			

IV. Pulsar Installation Code

				
					[Install]
WantedBy=multi-user.target

[Unit]
Description=Pulsar Broker
After=network.target
After=syslog.target

[Service]
Type=simple
ExecStart=/opt/training/apps/pulsar/bin/pulsar broker
WorkingDirectory=/opt/training/apps
User=pulsar
Group=pulsar
RestartSec=20s
Restart=on-failure


[Install]
WantedBy=multi-user.target
				
			

Code for Pulsar Example Project

I. Consumer 1

				
					import pulsar

client = pulsar.Client('pulsar://pulsartraining01:6650', authentication=None, operation_timeout_seconds=30, 
                       io_threads=1, message_listener_threads=1, concurrent_lookup_requests=50000, 
                       log_conf_file_path=None, use_tls=False, tls_trust_certs_file_path=None, 
                       tls_allow_insecure_connection=False, tls_validate_hostname=False)

consumer_type = pulsar.ConsumerType.KeyShared
initial_position = pulsar.InitialPosition.Latest

consumer = client.subscribe('stock-topic-partitioned1', 'training-subscription-partitioned1', 
                            consumer_type = consumer_type, message_listener=None, receiver_queue_size=1000, 
                            max_total_receiver_queue_size_across_partitions=50000, consumer_name="consumer1", 
                            unacked_messages_timeout_ms=None, broker_consumer_stats_cache_time_ms=30000, 
                            negative_ack_redelivery_delay_ms=60000, is_read_compacted=False, 
                            properties=None, pattern_auto_discovery_period=60, 
                            initial_position=initial_position)

while True:
    msg = consumer.receive(timeout_millis=None)
    try:
        print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
        consumer.acknowledge(msg)
    except:
        consumer.negative_acknowledge(msg)

client.close()
				
			

II. Consumer 2

				
					import pulsar

client = pulsar.Client('pulsar://pulsartraining01:6650', authentication=None, operation_timeout_seconds=30, 
                       io_threads=1, message_listener_threads=1, concurrent_lookup_requests=50000, 
                       log_conf_file_path=None, use_tls=False, tls_trust_certs_file_path=None, 
                       tls_allow_insecure_connection=False, tls_validate_hostname=False)

consumer_type = pulsar.ConsumerType.KeyShared
initial_position = pulsar.InitialPosition.Latest

consumer = client.subscribe('stock-topic-partitioned1', 'training-subscription-partitioned1', 
                            consumer_type = consumer_type, message_listener=None, receiver_queue_size=1000, 
                            max_total_receiver_queue_size_across_partitions=50000, consumer_name="consumer2", 
                            unacked_messages_timeout_ms=None, broker_consumer_stats_cache_time_ms=30000, 
                            negative_ack_redelivery_delay_ms=60000, is_read_compacted=False, 
                            properties=None, pattern_auto_discovery_period=60, 
                            initial_position=initial_position)

while True:
    msg = consumer.receive(timeout_millis=None)
    try:
        print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
        consumer.acknowledge(msg)
    except:
        consumer.negative_acknowledge(msg)

client.close()
				
			

III. Consumer 3

				
					import pulsar

client = pulsar.Client('pulsar://pulsartraining01:6650', authentication=None, operation_timeout_seconds=30, 
                       io_threads=1, message_listener_threads=1, concurrent_lookup_requests=50000, 
                       log_conf_file_path=None, use_tls=False, tls_trust_certs_file_path=None, 
                       tls_allow_insecure_connection=False, tls_validate_hostname=False)

consumer_type = pulsar.ConsumerType.KeyShared
initial_position = pulsar.InitialPosition.Latest

consumer = client.subscribe('stock-topic-partitioned1', 'training-subscription-partitioned1', 
                            consumer_type = consumer_type, message_listener=None, receiver_queue_size=1000, 
                            max_total_receiver_queue_size_across_partitions=50000, consumer_name="consumer3", 
                            unacked_messages_timeout_ms=None, broker_consumer_stats_cache_time_ms=30000, 
                            negative_ack_redelivery_delay_ms=60000, is_read_compacted=False, 
                            properties=None, pattern_auto_discovery_period=60, 
                            initial_position=initial_position)

while True:
    msg = consumer.receive(timeout_millis=None)
    try:
        print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
        consumer.acknowledge(msg)
    except:
        consumer.negative_acknowledge(msg)

client.close()
				
			

IV. Producer

				
					import yfinance as yf
import pulsar
import json

tickerList = [ "ABT", "ABBV", "ABMD", "ACN", "ATVI", "ADBE", "AMD", 
              "AAP", "AES", "AFL", "A", "APD" ]



client = pulsar.Client('pulsar://pulsartraining01:6650')

producer = client.create_producer('stock-topic-partitioned1', producer_name="producer1", 
                                  initial_sequence_id=None, send_timeout_millis=0, 
                                  compression_type=pulsar.CompressionType.NONE, 
                                  max_pending_messages=1000, max_pending_messages_across_partitions=50000, 
                                  block_if_queue_full=False, batching_enabled=False, 
                                  batching_max_messages=1000, batching_max_allowed_size_in_bytes=131072, 
                                  batching_max_publish_delay_ms=10, 
                                  message_routing_mode=pulsar.PartitionsRoutingMode.RoundRobinDistribution, 
                                  properties=None, batching_type=pulsar.BatchingType.Default)


for i in tickerList :
    
    historicalData = yf.Ticker(i).history(period = "1d", interval = "1d")
    historicalData['ticker'] = i
    histDataJsonRaw = historicalData.to_json(orient = "records")

    producer.send((histDataJsonRaw).encode('utf-8'), partition_key = i, sequence_id=None, 
                  replication_clusters=None, disable_replication=False, event_timestamp=None, 
                  deliver_at=None, deliver_after=None)

client.close()
				
			

Contact Form for Pulsar support

Dattell offers comprehensive managed Pulsar on your environment.  Contact us to learn more about our support packages. Or visit our Apache Pulsar Support Page for more information.