ronnie
2022-10-14 1504bb53e29d3d46222c0b3ea994fc494b48e153
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
 
"""Chrome OS Parnter Concole remote actions."""
 
from __future__ import print_function
 
import base64
import logging
 
import common
 
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib import utils
from autotest_lib.server.hosts import moblab_host
from autotest_lib.site_utils import pubsub_utils
from autotest_lib.site_utils import cloud_console_pb2 as cpcon
 
 
_PUBSUB_TOPIC = global_config.global_config.get_config_value(
        'CROS', 'cloud_notification_topic', default=None)
 
# Current notification version.
CURRENT_MESSAGE_VERSION = '1'
 
# Test upload pubsub notification attributes
LEGACY_ATTR_VERSION = 'version'
LEGACY_ATTR_GCS_URI = 'gcs_uri'
LEGACY_ATTR_MOBLAB_MAC = 'moblab_mac_address'
LEGACY_ATTR_MOBLAB_ID = 'moblab_id'
# the message data for new test result notification.
LEGACY_TEST_OFFLOAD_MESSAGE = 'NEW_TEST_RESULT'
 
 
def is_cloud_notification_enabled():
    """Checks if cloud pubsub notification is enabled.
 
    @returns: True if cloud pubsub notification is enabled. Otherwise, False.
    """
    return  global_config.global_config.get_config_value(
        'CROS', 'cloud_notification_enabled', type=bool, default=False)
 
 
def _get_message_type_name(message_type_enum):
    """Gets the message type name from message type enum.
 
    @param message_type_enum: The message type enum.
 
    @return The corresponding message type name as string, or 'MSG_UNKNOWN'.
    """
    return cpcon.MessageType.Name(message_type_enum)
 
 
def _get_attribute_name(attribute_enum):
    """Gets the message attribute name from attribte enum.
 
    @param attribute_enum: The attribute enum.
 
    @return The corresponding attribute name as string, or 'ATTR_INVALID'.
    """
    return cpcon.MessageAttribute.Name(attribute_enum)
 
 
class CloudConsoleClient(object):
    """The remote interface to the Cloud Console."""
    def send_heartbeat(self):
        """Sends a heartbeat.
 
        @returns True if the notification is successfully sent.
            Otherwise, False.
        """
        pass
 
    def send_event(self, event_type=None, event_data=None):
        """Sends an event notification to the remote console.
 
        @param event_type: The event type that is defined in the protobuffer
            file 'cloud_console.proto'.
        @param event_data: The event data.
 
        @returns True if the notification is successfully sent.
            Otherwise, False.
        """
        pass
 
    def send_log(self, msg, level=None, session_id=None):
        """Sends a log message to the remote console.
 
        @param msg: The log message.
        @param level: The logging level.
        @param session_id: The current session id.
 
        @returns True if the notification is successfully sent.
            Otherwise, False.
        """
        pass
 
    def send_alert(self, msg, level=None, session_id=None):
        """Sends an alert to the remote console.
 
        @param msg: The alert message.
        @param level: The logging level.
        @param session_id: The current session id.
 
        @returns True if the notification is successfully sent.
            Otherwise, False.
        """
        pass
 
    def send_test_job_offloaded_message(self, gcs_uri):
        """Sends a test job offloaded message to the remote console.
 
        @param gcs_uri: The test result Google Cloud Storage URI.
 
        @returns True if the notification is successfully sent.
            Otherwise, False.
        """
        pass
 
 
# Make it easy to mock out
def _create_pubsub_client(credential):
    return pubsub_utils.PubSubClient(credential)
 
 
class PubSubBasedClient(CloudConsoleClient):
    """A Cloud PubSub based implementation of the CloudConsoleClient interface.
    """
    def __init__(
            self,
            credential=moblab_host.MOBLAB_SERVICE_ACCOUNT_LOCATION,
            pubsub_topic=_PUBSUB_TOPIC):
        """Constructor.
 
        @param credential: The service account credential filename. Default to
            '/home/moblab/.service_account.json'.
        @param pubsub_topic: The cloud pubsub topic name to use.
        """
        super(PubSubBasedClient, self).__init__()
        self._pubsub_client = _create_pubsub_client(credential)
        self._pubsub_topic = pubsub_topic
 
 
    def _create_message(self, data, msg_attributes):
        """Creates a cloud pubsub notification object.
 
        @param data: The message data as a string.
        @param msg_attributes: The message attribute map.
 
        @returns: A pubsub message object with data and attributes.
        """
        message = {}
        if data:
            message['data'] = data
        if msg_attributes:
            message['attributes'] = msg_attributes
        return message
 
    def _create_message_attributes(self, message_type_enum):
        """Creates a cloud pubsub notification message attribute map.
 
        Fills in the version, moblab mac address, and moblab id information
        as attributes.
 
        @param message_type_enum The message type enum.
 
        @returns: A pubsub messsage attribute map.
        """
        msg_attributes = {}
        msg_attributes[_get_attribute_name(cpcon.ATTR_MESSAGE_TYPE)] = (
                _get_message_type_name(message_type_enum))
        msg_attributes[_get_attribute_name(cpcon.ATTR_MESSAGE_VERSION)] = (
                CURRENT_MESSAGE_VERSION)
        msg_attributes[_get_attribute_name(cpcon.ATTR_MOBLAB_MAC_ADDRESS)] = (
                utils.get_moblab_serial_number())
        msg_attributes[_get_attribute_name(cpcon.ATTR_MOBLAB_ID)] = (
                utils.get_moblab_id())
        return msg_attributes
 
    def _create_test_job_offloaded_message(self, gcs_uri):
        """Construct a test result notification.
 
        TODO(ntang): switch LEGACY to new message format.
        @param gcs_uri: The test result Google Cloud Storage URI.
 
        @returns The notification message.
        """
        data = base64.b64encode(LEGACY_TEST_OFFLOAD_MESSAGE)
        msg_attributes = {}
        msg_attributes[LEGACY_ATTR_VERSION] = CURRENT_MESSAGE_VERSION
        msg_attributes[LEGACY_ATTR_MOBLAB_MAC] = (
                utils.get_moblab_serial_number())
        msg_attributes[LEGACY_ATTR_MOBLAB_ID] = utils.get_moblab_id()
        msg_attributes[LEGACY_ATTR_GCS_URI] = gcs_uri
 
        return self._create_message(data, msg_attributes)
 
 
    def send_test_job_offloaded_message(self, gcs_uri):
        """Notify the cloud console a test job is offloaded.
 
        @param gcs_uri: The test result Google Cloud Storage URI.
 
        @returns True if the notification is successfully sent.
            Otherwise, False.
        """
        logging.info('Notification on gcs_uri %s', gcs_uri)
        message = self._create_test_job_offloaded_message(gcs_uri)
        return self._publish_notification(message)
 
 
    def _publish_notification(self, message):
        msg_ids = self._pubsub_client.publish_notifications(
                self._pubsub_topic, [message])
 
        if msg_ids:
            logging.debug('Successfully sent out a notification')
            return True
        logging.warning('Failed to send notification %s', str(message))
        return False
 
    def send_heartbeat(self):
        """Sends a heartbeat.
 
        @returns True if the heartbeat notification is successfully sent.
            Otherwise, False.
        """
        logging.info('Sending a heartbeat')
 
        event = cpcon.Heartbeat()
        # Don't sent local timestamp for now.
        data = event.SerializeToString()
        try:
            attributes = self._create_message_attributes(
                    cpcon.MSG_MOBLAB_HEARTBEAT)
            message = self._create_message(data, attributes)
        except ValueError:
            logging.exception('Failed to create message.')
            return False
        return self._publish_notification(message)
 
    def send_event(self, event_type=None, event_data=None):
        """Sends an event notification to the remote console.
 
        @param event_type: The event type that is defined in the protobuffer
            file 'cloud_console.proto'.
        @param event_data: The event data.
 
        @returns True if the notification is successfully sent.
            Otherwise, False.
        """
        logging.info('Send an event.')
        if not event_type:
            logging.info('Failed to send event without a type.')
            return False
 
        event = cpcon.RemoteEventMessage()
        if event_data:
            event.data = event_data
        else:
            event.data = ''
        event.type = event_type
        data = event.SerializeToString()
        try:
            attributes = self._create_message_attributes(
                    cpcon.MSG_MOBLAB_REMOTE_EVENT)
            message = self._create_message(data, attributes)
        except ValueError:
            logging.exception('Failed to create message.')
            return False
        return self._publish_notification(message)