Apache Pulsar Educational Videos & Tutorials
The videos and tutorials that follow will help you learn Apache Pulsar. This Apache Pulsar Learning Hub is split into three parts:
- Part I: Learn how Pulsar works and the theory behind its structure.
- Part II: Work through the installation and configuration of Pulsar for a simple use case.
- Part III: Copy and paste code samples used for the installation and real world example.
These Pulsar learning resources are brought to you by Dattell, a full service managed Pulsar provider. Learn more about 24x7x365 managed Pulsar services on your environment.
Part I: Apache Pulsar Basics & Benefits
Learn about the benefits of Apache Pulsar and the basics of its structure. Topics covered include: Pulsar’s Two-Layer Structure, Subscription Types, Message Guarantees, Schema Registry, Pulsar Functions, BookKeeper for Pulsar, Benefits of the Two-Layer System, Bookie Failures, Tiered Storage, ZooKeeper for Pulsar, Geo-Replication, and Security.
Apache Pulsar Education Videos
Part II: Apache Pulsar Installation & Configuration
The videos below are a complete tutorial for setting up Apache Pulsar on a virtual machine for a simple use case. We cover configurations needed for the operating system, ZooKeeper, BookKeeper, and Pulsar. The last video is an example real world project.
Playlist
Part III: Apache Pulsar Installation Reference Code
Below are code samples that are used in the above Pulsar installation tutorial and example project. The code can be copied and pasted into your implementation.
Code for Pulsar Installation Tutorial
I. Pulsar Installation Notes
# setup environment
download/install virtualbox
https://www.virtualbox.org/wiki/Downloads
download/install centos
https://www.centos.org/download/
create VM
#update OS
yum update -y
install guest tools
# modify OS params
share vim tutorial
https://opensource.com/article/19/3/getting-started-vim
add user to sudoers
/etc/sysctl.conf
vm.swappiness=0
fs.file-max=100000
/etc/pam.d/login
session required pam_limits.so
/etc/security/limits.conf
pulsar soft nofile unlimited
pulsar hard nofile unlimited
create clones
change hostnames
add to /etc/hosts file
install clusterssh
verify connectivity
systemctl stop firewalld
systemctl disable firewalld
install java
yum install java-11-openjdk
# create install dir in opt
mkdir -p /opt/training/apps
mkdir -p /opt/training/src
download pulsar
cd /opt/training/src
wget https://archive.apache.org/dist/pulsar/pulsar-2.7.0/apache-pulsar-2.7.0-bin.tar.gz
# untar src files
cd /opt/training/apps
tar -xf ../src/apache-pulsar-2.7.0-bin.tar.gz
# create symlink in apps dir
ln -s apache-pulsar-2.7.0/ pulsar
# edit zookeeper config
mkdir -p /opt/training/apps/data/zookeeper
vim /opt/training/apps/data/zookeeper/myid
vim /opt/training/apps/pulsar/conf/zookeeper.conf
dataDir=/opt/training/apps/data/zookeeper
server.1=192.168.1.145:2888:3888
server.2=192.168.1.140:2888:3888
server.3=192.168.1.119:2888:3888
# add users
useradd -M pulsar
usermod -L pulsar
chown -R pulsar:pulsar /opt/training
vim /etc/systemd/system/zookeeper.service
systemctl daemon-reload
systemctl start zookeeper
bin/pulsar initialize-cluster-metadata --cluster pulsar-training-01 --zookeeper pulsartraining01:2181 --configuration-store pulsartraining01:2181 --web-service-url http://pulsartraining01:8080,pulsartraining02:8080,pulsartraining03:8080 --web-service-url-tls https://pulsartraining01:8443,pulsartraining02:8443,pulsartraining03:8443 --broker-service-url pulsar://pulsartraining01:6650,pulsartraining02:6650,pulsartraining03:8443 --broker-service-url-tls pulsar+ssl://pulsartraining01:6651,pulsartraining02:6651,pulsartraining03:6652
# start zookeeper if restarted
systemctl start zookeeper
# edit bookkeeper config files
zkServers=pulsartraining01:2181,pulsartraining02:2181,pulsartraining03:2181
vim /etc/systemd/system/bookkeeper.service
systemctl daemon-reload
systemctl start bookkeeper
# test bookkeeper
bin/bookkeeper shell bookiesanity
bin/bookkeeper shell simpletest --ensemble 3 --writeQuorum 2 --ackQuorum 2 --numEntries 2
#edit pulsar.conf file
zookeeperServers=pulsartraining01:2181,pulsartraining02:2181,pulsartraining03:2181
configurationStoreServers=pulsartraining01:2181,pulsartraining02:2181,pulsartraining03:2181
clusterName=pulsar-training-01
brokerServicePort=6650
brokerServicePortTls=6651
webServicePort=8080
webServicePortTls=8443
#edit client.conf
webServiceUrl=http://pulsartraining01:8080
brokerServiceurl=pulsar://pulsartraining01:6650
# add service files
vim /etc/systemd/system/pulsar.service
# reload daemon
systemctl daemon-reload
systemctl start pulsar
# test pulsar
bin/pulsar-client produce persistent://public/default/test -n 1 -m "Hello world Pulsar edition"
bin/pulsar-client consume persistent://public/default/test -n 100 -s "consumer-subscription-name" -t "Exclusive"
## editing heap sizes, turning off swap
swapoff -a
vim /etc/fstab
#zookeeper
mkdir /opt/training/apps/conf
vim /opt/training/apps/conf/java.env
export SERVER_JVMFLAGS="-Xms512m -Xmx512m -XX:+AlwaysPreTouch"
#bookkeeper
vim /opt/training/apps/pulsar/conf/bkenv.sh
#BOOKIE_MEM=${BOOKIE_MEM:-${PULSAR_MEM:-"-Xms2g -Xmx2g -XX:MaxDirectMemorySize=2g"}}
BOOKIE_MEM=${BOOKIE_MEM:-${PULSAR_MEM:-"-Xms512m -Xmx512m -XX:MaxDirectMemorySize=512m"}}
#pulsar
vim /opt/training/apps/pulsar/conf/pulsar_env.sh
#PULSAR_MEM=${PULSAR_MEM:-"-Xms2g -Xmx2g -XX:MaxDirectMemorySize=4g"}
PULSAR_MEM=${PULSAR_MEM:-"-Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g"}
## Example use case
# message deduplication
vim /opt/training/apps/pulsar/conf/broker.conf
brokerDeduplicationEnabled=true
#### create partitioned topic
# create tenant
./pulsar-admin tenants create pulsartraining-tenant1
# create namespace
./pulsar-admin namespaces create pulsartraining-tenant1/pulsartraining-namespace1
# create partitioned topic
./pulsar-admin topics create-partitioned-topic persistent://pulsartraining-tenant1/pulsartraining-namespace1/stock-topic-partitioned1 --partitions 12
# message deduplication
vim /opt/training/apps/pulsar/conf/broker.conf
brokerDeduplicationEnabled=true
# do not configure transactions
II. ZooKeeper Installation Code
[Unit]
Description=Zookeeper
Documentation=http://zookeeper.apache.org
Requires=network.target
After=network.target
After=syslog.target
[Service]
Type=simple
WorkingDirectory=/opt/training/apps
User=pulsar
Group=pulsar
Environment=OPTS=-Dstats_server_port=8001
ExecStart=/opt/training/apps/pulsar/bin/pulsar zookeeper
TimeoutSec=20
Restart=on-failure
[Install]
WantedBy=default.target
III. BookKeeper Installation Code
[Unit]
Description=BookKeeper
Documentation=https://bookkeeper.apache.org
Wants=network-online.target
After=network-online.target
After=syslog.target
[Service]
Type=simple
ExecStart=/opt/training/apps/pulsar/bin/pulsar bookie
WorkingDirectory=/opt/training/apps
Restart=on-failure
PrivateTmp=true
User=pulsar
Group=pulsar
# Specifies the maximum file descriptor number that can be opened by this process
LimitNOFILE=655350
# Specifies the maximum number of processes
LimitNPROC=4096
# Specifies the maximum size of virtual memory
LimitAS=infinity
# Specifies the maximum file size
LimitFSIZE=infinity
[Install]
WantedBy=multi-user.target
IV. Pulsar Installation Code
[Install]
WantedBy=multi-user.target
[Unit]
Description=Pulsar Broker
After=network.target
After=syslog.target
[Service]
Type=simple
ExecStart=/opt/training/apps/pulsar/bin/pulsar broker
WorkingDirectory=/opt/training/apps
User=pulsar
Group=pulsar
RestartSec=20s
Restart=on-failure
[Install]
WantedBy=multi-user.target
Code for Pulsar Example Project
I. Consumer 1
import pulsar
client = pulsar.Client('pulsar://pulsartraining01:6650', authentication=None, operation_timeout_seconds=30,
io_threads=1, message_listener_threads=1, concurrent_lookup_requests=50000,
log_conf_file_path=None, use_tls=False, tls_trust_certs_file_path=None,
tls_allow_insecure_connection=False, tls_validate_hostname=False)
consumer_type = pulsar.ConsumerType.KeyShared
initial_position = pulsar.InitialPosition.Latest
consumer = client.subscribe('stock-topic-partitioned1', 'training-subscription-partitioned1',
consumer_type = consumer_type, message_listener=None, receiver_queue_size=1000,
max_total_receiver_queue_size_across_partitions=50000, consumer_name="consumer1",
unacked_messages_timeout_ms=None, broker_consumer_stats_cache_time_ms=30000,
negative_ack_redelivery_delay_ms=60000, is_read_compacted=False,
properties=None, pattern_auto_discovery_period=60,
initial_position=initial_position)
while True:
msg = consumer.receive(timeout_millis=None)
try:
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
consumer.acknowledge(msg)
except:
consumer.negative_acknowledge(msg)
client.close()
II. Consumer 2
import pulsar
client = pulsar.Client('pulsar://pulsartraining01:6650', authentication=None, operation_timeout_seconds=30,
io_threads=1, message_listener_threads=1, concurrent_lookup_requests=50000,
log_conf_file_path=None, use_tls=False, tls_trust_certs_file_path=None,
tls_allow_insecure_connection=False, tls_validate_hostname=False)
consumer_type = pulsar.ConsumerType.KeyShared
initial_position = pulsar.InitialPosition.Latest
consumer = client.subscribe('stock-topic-partitioned1', 'training-subscription-partitioned1',
consumer_type = consumer_type, message_listener=None, receiver_queue_size=1000,
max_total_receiver_queue_size_across_partitions=50000, consumer_name="consumer2",
unacked_messages_timeout_ms=None, broker_consumer_stats_cache_time_ms=30000,
negative_ack_redelivery_delay_ms=60000, is_read_compacted=False,
properties=None, pattern_auto_discovery_period=60,
initial_position=initial_position)
while True:
msg = consumer.receive(timeout_millis=None)
try:
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
consumer.acknowledge(msg)
except:
consumer.negative_acknowledge(msg)
client.close()
III. Consumer 3
import pulsar
client = pulsar.Client('pulsar://pulsartraining01:6650', authentication=None, operation_timeout_seconds=30,
io_threads=1, message_listener_threads=1, concurrent_lookup_requests=50000,
log_conf_file_path=None, use_tls=False, tls_trust_certs_file_path=None,
tls_allow_insecure_connection=False, tls_validate_hostname=False)
consumer_type = pulsar.ConsumerType.KeyShared
initial_position = pulsar.InitialPosition.Latest
consumer = client.subscribe('stock-topic-partitioned1', 'training-subscription-partitioned1',
consumer_type = consumer_type, message_listener=None, receiver_queue_size=1000,
max_total_receiver_queue_size_across_partitions=50000, consumer_name="consumer3",
unacked_messages_timeout_ms=None, broker_consumer_stats_cache_time_ms=30000,
negative_ack_redelivery_delay_ms=60000, is_read_compacted=False,
properties=None, pattern_auto_discovery_period=60,
initial_position=initial_position)
while True:
msg = consumer.receive(timeout_millis=None)
try:
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
consumer.acknowledge(msg)
except:
consumer.negative_acknowledge(msg)
client.close()
IV. Producer
import yfinance as yf
import pulsar
import json
tickerList = [ "ABT", "ABBV", "ABMD", "ACN", "ATVI", "ADBE", "AMD",
"AAP", "AES", "AFL", "A", "APD" ]
client = pulsar.Client('pulsar://pulsartraining01:6650')
producer = client.create_producer('stock-topic-partitioned1', producer_name="producer1",
initial_sequence_id=None, send_timeout_millis=0,
compression_type=pulsar.CompressionType.NONE,
max_pending_messages=1000, max_pending_messages_across_partitions=50000,
block_if_queue_full=False, batching_enabled=False,
batching_max_messages=1000, batching_max_allowed_size_in_bytes=131072,
batching_max_publish_delay_ms=10,
message_routing_mode=pulsar.PartitionsRoutingMode.RoundRobinDistribution,
properties=None, batching_type=pulsar.BatchingType.Default)
for i in tickerList :
historicalData = yf.Ticker(i).history(period = "1d", interval = "1d")
historicalData['ticker'] = i
histDataJsonRaw = historicalData.to_json(orient = "records")
producer.send((histDataJsonRaw).encode('utf-8'), partition_key = i, sequence_id=None,
replication_clusters=None, disable_replication=False, event_timestamp=None,
deliver_at=None, deliver_after=None)
client.close()
Contact Form for Pulsar support
Dattell offers comprehensive managed Pulsar on your environment. Contact us to learn more about our support packages. Or visit our Apache Pulsar Support Page for more information.