AKTUALISIERTE BEARBEITUNG AM ENDE:Zeigt Arbeitscode. Hauptmodul unverändert mit Ausnahme des Debugging-Codes. Hinweis:Ich hatte das bereits erwähnte Problem bezüglich der Notwendigkeit, mich vor der Kündigung abzumelden.
Der Code sieht korrekt aus. Ich würde gerne sehen, wie Sie es instanziieren.
In config/application.rb haben Sie wahrscheinlich mindestens so etwas wie:
require 'ws_communication'
config.middleware.use WsCommunication
Dann sollten Sie in Ihrem JavaScript-Client so etwas haben:
var ws = new WebSocket(uri);
Instanziieren Sie eine weitere Instanz von WsCommunication? Das würde @clients auf ein leeres Array setzen und könnte Ihre Symptome zeigen. Etwas in der Art wäre falsch:
var ws = new WsCommunication;
Es würde uns helfen, wenn Sie den Client und vielleicht config/application.rb zeigen würden, wenn dieser Beitrag nicht hilft.
Übrigens stimme ich dem Kommentar zu, dass @clients bei jedem Update durch einen Mutex geschützt werden sollte, wenn sich das nicht so liest. Es handelt sich um eine dynamische Struktur, die sich in einem ereignisgesteuerten System jederzeit ändern kann. Redis-Mutex ist eine gute Option. (Ich hoffe, dieser Link ist korrekt, da Github im Moment 500 Fehler auf alles zu werfen scheint.)
Beachten Sie außerdem, dass $redis.publish einen ganzzahligen Wert der Anzahl der Clients zurückgibt, die die Nachricht erhalten haben.
Schließlich stellen Sie möglicherweise fest, dass Sie sicherstellen müssen, dass Ihr Kanal vor der Kündigung gekündigt wird. Ich hatte Situationen, in denen ich am Ende jede Nachricht mehrmals oder sogar viele Male gesendet habe, weil frühere Abonnements für denselben Kanal nicht bereinigt wurden. Da Sie den Kanal innerhalb eines Threads abonnieren, müssen Sie sich innerhalb desselben Threads abmelden, oder der Prozess "hängt" einfach und wartet darauf, dass der richtige Thread auf magische Weise erscheint. Ich handhabe diese Situation, indem ich ein "Abmelden"-Flag setze und dann eine Nachricht sende. Dann teste ich innerhalb des on.message-Blocks auf das Unsubscribe-Flag und gebe dort das Unsubscribe aus.
Das Modul, das Sie bereitgestellt haben, mit nur geringfügigen Debugging-Änderungen:
require 'faye/websocket'
require 'redis'
class WsCommunication
KEEPALIVE_TIME = 15 #seconds
CHANNEL = 'vip-deck'
def initialize(app)
@app = app
@clients = []
uri = URI.parse(ENV['REDISCLOUD_URL'])
$redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
Thread.new do
redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
redis_sub.subscribe(CHANNEL) do |on|
on.message do |channel, msg|
puts "Message event. Clients receiving:#{@clients.count};"
@clients.each { |ws| ws.send(msg) }
end
end
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
ws.on :open do |event|
@clients << ws
puts "Open event. Clients open:#{@clients.count};"
end
ws.on :message do |event|
receivers = $redis.publish(CHANNEL, event.data)
puts "Message published:#{event.data}; Receivers:#{receivers};"
end
ws.on :close do |event|
@clients.delete(ws)
puts "Close event. Clients open:#{@clients.count};"
ws = nil
end
ws.rack_response
else
@app.call(env)
end
end
end
Der Test-Abonnentencode, den ich bereitgestellt habe:
# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
ws = WebSocket::Client::Simple.connect url
ws.on :message do |msg|
puts msg
end
ws.on :open do
puts "-- Subscriber open (#{ws.url})"
end
ws.on :close do |e|
puts "-- Subscriber close (#{e.inspect})"
exit 1
end
ws.on :error do |e|
puts "-- Subscriber error (#{e.inspect})"
end
end
Der von mir bereitgestellte Test-Publisher-Code. Publisher und Subscriber könnten einfach kombiniert werden, da dies nur Tests sind:
# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
count ||= 0
timer = EventMachine.add_periodic_timer(5+rand(5)) do
count += 1
send({"MESSAGE": "COUNT:#{count};"})
end
@ws = WebSocket::Client::Simple.connect url
@ws.on :message do |msg|
puts msg
end
@ws.on :open do
puts "-- Publisher open"
end
@ws.on :close do |e|
puts "-- Publisher close (#{e.inspect})"
exit 1
end
@ws.on :error do |e|
puts "-- Publisher error (#{e.inspect})"
@ws.close
end
def self.send message
payload = message.is_a?(Hash) ? message : {payload: message}
@ws.send(payload.to_json)
end
end
Ein Beispiel für config.ru, das all dies auf der Rack-Middleware-Schicht ausführt:
require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new
Das ist Main. Ich habe es aus meiner laufenden Version entfernt, sodass es möglicherweise optimiert werden muss, wenn Sie es verwenden:
%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)
Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
class Main < Sinatra::Base
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
get "/" do
erb :"index.html"
end
get "/assets/js/application.js" do
content_type :js
@scheme = env == "production" ? "wss://" : "ws://"
erb :"application.js"
end
end