OPDATERET EDIT AT END:Viser arbejdskode. Hovedmodul uændret undtagen fejlretningskode. Bemærk:Jeg oplevede det problem, jeg allerede har bemærket med hensyn til behovet for at afmelde før opsigelse.
Koden ser korrekt ud. Jeg vil gerne se, hvordan du instansierer det.
I config/application.rb har du sandsynligvis mindst noget som:
require 'ws_communication'
config.middleware.use WsCommunication
Så, i din JavaScript-klient, skulle du have noget som dette:
var ws = new WebSocket(uri);
Instantierer du en anden forekomst af WsCommunication? Det ville sætte @clients til et tomt array og kunne udvise dine symptomer. Noget som dette ville være forkert:
var ws = new WsCommunication;
Det ville hjælpe os, hvis du ville vise klienten og måske config/application.rb, hvis dette indlæg ikke hjælper.
I øvrigt er jeg enig i kommentaren om, at @klienter skal beskyttes af en mutex på enhver opdatering, hvis den ikke også læser. Det er en dynamisk struktur, der kan ændre sig til enhver tid i et hændelsesdrevet system. redis-mutex er en god mulighed. (Håber det link er korrekt, da Github ser ud til at smide 500 fejl på alt i øjeblikket.)
Du kan også bemærke, at $redis.publish returnerer en heltalsværdi af antallet af klienter, der modtog beskeden.
Endelig vil du måske opdage, at du skal sikre dig, at din kanal er afmeldt, før du opsiger den. Jeg har haft situationer, hvor jeg er endt med at sende hver besked flere, endda mange, gange på grund af tidligere abonnementer på den samme kanal, der ikke blev ryddet op i. Da du abonnerer på kanalen i en tråd, bliver du nødt til at afmelde dig inden for den samme tråd, ellers vil processen bare "hænge" og vente på, at den rigtige tråd på magisk vis dukker op. Jeg håndterer den situation ved at sætte et "afmeld"-flag og derefter sende en besked. Derefter, inden for on.message-blokken, tester jeg for afmeldingsflaget og udsteder afmeldelsen der.
Det modul, du har leveret, med kun mindre fejlretningsændringer:
require 'faye/websocket'
require 'redis'
class WsCommunication
KEEPALIVE_TIME = 15 #seconds
CHANNEL = 'vip-deck'
def initialize(app)
@app = app
@clients = []
uri = URI.parse(ENV['REDISCLOUD_URL'])
$redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
Thread.new do
redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
redis_sub.subscribe(CHANNEL) do |on|
on.message do |channel, msg|
puts "Message event. Clients receiving:#{@clients.count};"
@clients.each { |ws| ws.send(msg) }
end
end
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
ws.on :open do |event|
@clients << ws
puts "Open event. Clients open:#{@clients.count};"
end
ws.on :message do |event|
receivers = $redis.publish(CHANNEL, event.data)
puts "Message published:#{event.data}; Receivers:#{receivers};"
end
ws.on :close do |event|
@clients.delete(ws)
puts "Close event. Clients open:#{@clients.count};"
ws = nil
end
ws.rack_response
else
@app.call(env)
end
end
end
Testabonnentkoden, jeg angav:
# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
ws = WebSocket::Client::Simple.connect url
ws.on :message do |msg|
puts msg
end
ws.on :open do
puts "-- Subscriber open (#{ws.url})"
end
ws.on :close do |e|
puts "-- Subscriber close (#{e.inspect})"
exit 1
end
ws.on :error do |e|
puts "-- Subscriber error (#{e.inspect})"
end
end
Testudgiverkoden, jeg har leveret. Publisher og Subscriber kunne nemt kombineres, da disse blot er tests:
# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
count ||= 0
timer = EventMachine.add_periodic_timer(5+rand(5)) do
count += 1
send({"MESSAGE": "COUNT:#{count};"})
end
@ws = WebSocket::Client::Simple.connect url
@ws.on :message do |msg|
puts msg
end
@ws.on :open do
puts "-- Publisher open"
end
@ws.on :close do |e|
puts "-- Publisher close (#{e.inspect})"
exit 1
end
@ws.on :error do |e|
puts "-- Publisher error (#{e.inspect})"
@ws.close
end
def self.send message
payload = message.is_a?(Hash) ? message : {payload: message}
@ws.send(payload.to_json)
end
end
Et eksempel på config.ru, som kører alt dette på rack-middleware-laget:
require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new
Dette er Main. Jeg fjernede den fra min kørende version, så den skal muligvis justeres, hvis du bruger den:
%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)
Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
class Main < Sinatra::Base
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
get "/" do
erb :"index.html"
end
get "/assets/js/application.js" do
content_type :js
@scheme = env == "production" ? "wss://" : "ws://"
erb :"application.js"
end
end