Module: CPEE::Message

Defined in:
lib/cpee/message.rb

Constant Summary collapse

@@mutex =
Mutex.new
@@who =
'cpee'
@@type =
'instance'
@@tworkers =
1
@@last =
-1

Class Method Summary collapse

Class Method Details

.send(type, event, cpee, instance, instance_uuid, instance_name, content = {}, backend = nil, tt = nil) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/cpee/message.rb', line 63

def self::send(type, event, cpee, instance, instance_uuid, instance_name, content={}, backend=nil, tt=nil)
  target = '%02i' % (tt || CPEE::Message::target)
  topic = ::File::dirname(event)
  name = ::File::basename(event)
  payload = {
    @@who => cpee,
    "#{@@type}-url" => File.join(cpee,instance.to_s),
    @@type => instance,
    'topic' => topic,
    'type' => type,
    'name' => name,
    'timestamp' =>  Time.now.xmlschema(6),
    'content' => content
  }
  payload["#{@@type}-uuid"] = instance_uuid if instance_uuid
  payload["#{@@type}-name"] = instance_name if instance_name

  backend.publish("#{type}:#{target}:#{event}", "#{instance},#{instance_uuid} #{JSON::generate(payload)}")
end

.send_url(type, event, cpee, content = {}, backend) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/cpee/message.rb', line 83

def self::send_url(type, event, cpee, content={}, backend)
  EM.defer do
    topic = ::File::dirname(event)
    name = ::File::basename(event)
    payload = {
      @@who => cpee,
      'topic' => topic,
      'type' => type,
      'name' => name,
      'timestamp' =>  Time.now.xmlschema(3),
      'content' => content
    }
    client = Riddl::Client.new(backend)
    client.post [
      Riddl::Parameter::Simple::new('type',type),
      Riddl::Parameter::Simple::new('topic',topic),
      Riddl::Parameter::Simple::new('event',name),
      Riddl::Parameter::Complex::new('notification','application/json',JSON::generate(payload))
    ]
  end
end

.set_workers(workers) ⇒ Object

}}}



37
38
39
40
# File 'lib/cpee/message.rb', line 37

def self::set_workers(workers)
  @@tworkers = (workers < 1 && workers > 99 ? 1 : workers).freeze
  @@last = -1
end

.targetObject



42
43
44
# File 'lib/cpee/message.rb', line 42

def self::target
  @@mutex.synchronize { @@last < @@tworkers-1 ? @@last += 1 : @@last = 0 }
end

.typeObject

}}}



30
31
32
# File 'lib/cpee/message.rb', line 30

def self::type #{{{
  @@type
end

.type=(it) ⇒ Object

}}}



33
34
35
# File 'lib/cpee/message.rb', line 33

def self::type=(it) #{{{
  @@type = it
end

.wait(backend, sub, tt = nil) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/cpee/message.rb', line 46

def self::wait(backend,sub,tt=nil)
  target = '%02i' % (tt || CPEE::Message::target)
  wid = SecureRandom.hex(16)
  begin
    sub.subscribe_with_timeout(2,"event:#{target}:transaction/finished") do |on|
      on.message do |what,message|
        mess = message[0...message.index(' ')]
        sub.unsubscribe("event:#{target}:transaction/finished") if mess == wid
      end
      backend.publish("event:#{target}:transaction/start","#{wid} {}")
    end
  rescue => e
    puts "timeout error"
  end
  sub.disconnect!
end

.whoObject

{{{



24
25
26
# File 'lib/cpee/message.rb', line 24

def self::who #{{{
  @@who
end

.who=(it) ⇒ Object

}}}



27
28
29
# File 'lib/cpee/message.rb', line 27

def self::who=(it) #{{{
  @@who = it
end