Skip to content

Commit

Permalink
Fix a fairly bad bug in event de-duplication
Browse files Browse the repository at this point in the history
This is fairly edge-case-y but could bite someone. If you'd set a watch
when doing a get that failed because the node didn't exist, any subsequent
attempts to set a watch would fail silently, because the client thought that the
watch had already been set.

We now wrap the operation in the setup_watcher! method, which rolls back the
record-keeping of what watches have already been set for what nodes if an
exception is raised.

This change has the side-effect that certain operations (get,stat,exists?,children)
will block event delivery until completion, because they need to have a consistent
idea about what events are pending, and which have been delivered. This also means
that calling these methods represent a synchronization point between user threads
(these operations can only occur serially, not simultaneously).
  • Loading branch information
slyphon committed Apr 26, 2012
1 parent 8fa292c commit 9ca2f90
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 35 deletions.
33 changes: 17 additions & 16 deletions lib/z_k/client/base.rb
Expand Up @@ -326,9 +326,9 @@ def create(path, data='', opts={})
def get(path, opts={})
h = { :path => path }.merge(opts)

setup_watcher!(:data, h)

rv = check_rc(cnx.get(h), h)
rv = setup_watcher!(:data, h) do
check_rc(cnx.get(h), h)
end

opts[:callback] ? rv : rv.values_at(:data, :stat)
end
Expand Down Expand Up @@ -454,17 +454,17 @@ def stat(path, opts={})

h = { :path => path }.merge(opts)

setup_watcher!(:data, h)
setup_watcher!(:data, h) do
rv = cnx.stat(h)

rv = cnx.stat(h)
return rv if opts[:callback]

return rv if opts[:callback]

case rv[:rc]
when Zookeeper::ZOK, Zookeeper::ZNONODE
rv[:stat]
else
check_rc(rv, h) # throws the appropriate error
case rv[:rc]
when Zookeeper::ZOK, Zookeeper::ZNONODE
rv[:stat]
else
check_rc(rv, h) # throws the appropriate error
end
end
end

Expand Down Expand Up @@ -548,9 +548,10 @@ def children(path, opts={})

h = { :path => path }.merge(opts)

setup_watcher!(:child, h)
rv = setup_watcher!(:child, h) do
check_rc(cnx.get_children(h), h)
end

rv = check_rc(cnx.get_children(h), h)
opts[:callback] ? rv : rv[:children]
end

Expand Down Expand Up @@ -798,8 +799,8 @@ def check_rc(hash, inputs=nil)
end

# @private
def setup_watcher!(watch_type, opts)
event_handler.setup_watcher!(watch_type, opts)
def setup_watcher!(watch_type, opts, &b)
event_handler.setup_watcher!(watch_type, opts, &b)
end

# used in #inspect, doesn't raise an error if we're not connected
Expand Down
36 changes: 31 additions & 5 deletions lib/z_k/event_handler.rb
Expand Up @@ -153,26 +153,52 @@ def get_default_watcher_block
end
end

# returns true if there's a pending watch of type for path
# @private
def restricting_new_watches_for?(watch_type, path)
synchronize do
if set = @outstanding_watches[watch_type]
return set.include?(path)
end
end

false
end

# implements not only setting up the watcher callback, but deduplicating
# event delivery. Keeps track of in-flight watcher-type+path requests and
# doesn't re-register the watcher with the server until a response has been
# fired. This prevents one event delivery to *every* callback per :watch => true
# argument.
#
# due to somewhat poor design, we destructively modify opts before we yield
# and the client implictly knows this
#
# @private
def setup_watcher!(watch_type, opts)
return unless opts.delete(:watch)
return yield unless opts.delete(:watch)

synchronize do
set = @outstanding_watches.fetch(watch_type)
path = opts[:path]

if set.add?(path)
# this path has no outstanding watchers, let it do its thing
opts[:watcher] = watcher_callback
# if we added the path to the set, blocking further registration of
# watches and an exception is raised then we rollback
begin
# this path has no outstanding watchers, let it do its thing
opts[:watcher] = watcher_callback

yield opts
rescue Exception
set.delete(path)
raise
end
else
# outstanding watch for path and data pair already exists, so ignore
# logger.debug { "outstanding watch request for path #{path.inspect} and watcher type #{watch_type.inspect}, not re-registering" }
# we did not add the path to the set, which means we are not
# responsible for removing a block on further adds if the operation
# fails, therefore, we just yield
yield opts
end
end
end
Expand Down
61 changes: 47 additions & 14 deletions spec/watch_spec.rb
@@ -1,4 +1,4 @@
require File.join(File.dirname(__FILE__), %w[spec_helper])
require 'spec_helper'

describe ZK do
describe do
Expand All @@ -8,15 +8,18 @@

@path = "/_testWatch"
wait_until { @zk.connected? }

# make sure we start w/ clean state
@zk.rm_rf(@path)
end

after do
if @zk.connected?
@zk.close!
wait_until { !@zk.connected? }
end

mute_logger do
if @zk.connected?
@zk.close!
wait_until { !@zk.connected? }
end

ZK.open(@cnx_str) { |zk| zk.rm_rf(@path) }
end
end
Expand All @@ -25,7 +28,7 @@
locker = Mutex.new
callback_called = false

@zk.watcher.register(@path) do |event|
@zk.register(@path) do |event|
locker.synchronize do
callback_called = true
end
Expand All @@ -52,7 +55,7 @@ def wait_for_events_to_not_be_delivered(events)
it %[should only deliver an event once to each watcher registered for exists?] do
events = []

sub = @zk.watcher.register(@path) do |ev|
sub = @zk.register(@path) do |ev|
logger.debug "got event #{ev}"
events << ev
end
Expand All @@ -73,7 +76,7 @@ def wait_for_events_to_not_be_delivered(events)

@zk.create(@path, 'one', :mode => :ephemeral)

sub = @zk.watcher.register(@path) do |ev|
sub = @zk.register(@path) do |ev|
logger.debug "got event #{ev}"
events << ev
end
Expand All @@ -96,7 +99,7 @@ def wait_for_events_to_not_be_delivered(events)

@zk.create(@path, '')

sub = @zk.watcher.register(@path) do |ev|
sub = @zk.register(@path) do |ev|
logger.debug "got event #{ev}"
events << ev
end
Expand All @@ -112,6 +115,40 @@ def wait_for_events_to_not_be_delivered(events)

events.length.should == 1
end

it %[should restrict_new_watches_for? if a successul watch has been set] do
@zk.stat(@path, watch: true)
@zk.event_handler.should be_restricting_new_watches_for(:data, @path)
end

it %[should not a block on new watches after an operation fails] do
# this is a situation where we did get('/blah', :watch => true) but
# got an exception, the next watch set should work

events = []

sub = @zk.register(@path) do |ev|
logger.debug { "got event #{ev}" }
events << ev
end

# get a path that doesn't exist with a watch

lambda { @zk.get(@path, watch: true) }.should raise_error(ZK::Exceptions::NoNode)

@zk.event_handler.should_not be_restricting_new_watches_for(:data, @path)

@zk.stat(@path, watch: true)

@zk.event_handler.should be_restricting_new_watches_for(:data, @path)

@zk.create(@path, '')

wait_while { events.empty? }

events.should_not be_empty

end
end

describe 'state watcher' do
Expand Down Expand Up @@ -141,10 +178,6 @@ def wait_for_events_to_not_be_delivered(events)
m.should_receive(:state).and_return(ZookeeperConstants::ZOO_CONNECTED_STATE)
end
end

it %[should only fire the callback once] do
pending "not sure if this is the behavior we want"
end
end
end
end
Expand Down

0 comments on commit 9ca2f90

Please sign in to comment.