Skip to content

Changes made in wacth.py to print Empty newlines that are skipped when watching pod logs. #2372

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 29 additions & 19 deletions kubernetes/base/watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ def iter_resp_lines(resp):
buffer = buffer[next_newline+1:]
if line:
yield line
else:
yield '' # Only print one empty line
next_newline = buffer.find(b'\n')


Expand Down Expand Up @@ -107,24 +109,29 @@ def get_watch_argument_name(self, func):
return 'watch'

def unmarshal_event(self, data, return_type):
js = json.loads(data)
js['raw_object'] = js['object']
# BOOKMARK event is treated the same as ERROR for a quick fix of
# decoding exception
# TODO: make use of the resource_version in BOOKMARK event for more
# efficient WATCH
if return_type and js['type'] != 'ERROR' and js['type'] != 'BOOKMARK':
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
js['object'] = self._api_client.deserialize(obj, return_type)
if hasattr(js['object'], 'metadata'):
self.resource_version = js['object'].metadata.resource_version
# For custom objects that we don't have model defined, json
# deserialization results in dictionary
elif (isinstance(js['object'], dict) and 'metadata' in js['object']
and 'resourceVersion' in js['object']['metadata']):
self.resource_version = js['object']['metadata'][
'resourceVersion']
return js
if not data or data.isspace():
return None
try:
js = json.loads(data)
js['raw_object'] = js['object']
# BOOKMARK event is treated the same as ERROR for a quick fix of
# decoding exception
# TODO: make use of the resource_version in BOOKMARK event for more
# efficient WATCH
if return_type and js['type'] != 'ERROR' and js['type'] != 'BOOKMARK':
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
js['object'] = self._api_client.deserialize(obj, return_type)
if hasattr(js['object'], 'metadata'):
self.resource_version = js['object'].metadata.resource_version
# For custom objects that we don't have model defined, json
# deserialization results in dictionary
elif (isinstance(js['object'], dict) and 'metadata' in js['object']
and 'resourceVersion' in js['object']['metadata']):
self.resource_version = js['object']['metadata'][
'resourceVersion']
return js
except json.JSONDecodeError:
return None

def stream(self, func, *args, **kwargs):
"""Watch an API resource and stream the result back via a generator.
Expand Down Expand Up @@ -198,7 +205,10 @@ def stream(self, func, *args, **kwargs):
retry_after_410 = False
yield event
else:
yield line
if line:
yield line # Normal non-empty line
else:
yield '' # Only yield one empty line
if self._stop:
break
finally:
Expand Down
90 changes: 88 additions & 2 deletions kubernetes/base/watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@

import unittest

import os

import time

from unittest.mock import Mock, call

from kubernetes import client
from kubernetes import client,config

from .watch import Watch

from kubernetes.client import ApiException


class WatchTests(unittest.TestCase):
def setUp(self):
Expand Down Expand Up @@ -99,6 +105,9 @@ def test_watch_with_interspersed_newlines(self):
# Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is
# the only way to do so. Without that, the stream will re-read the test data forever.
for e in w.stream(fake_api.get_namespaces, timeout_seconds=1):
# Here added a statement for exception for empty lines.
if e is None:
continue
count += 1
self.assertEqual("test%d" % count, e['object'].metadata.name)
self.assertEqual(3, count)
Expand Down Expand Up @@ -488,7 +497,84 @@ def test_watch_with_error_event_and_timeout_param(self):
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()


@classmethod
def setUpClass(cls):
cls.api = Mock()
cls.namespace = "default"

def test_pod_log_empty_lines(self):
pod_name = "demo-bug"

try:
self.api.create_namespaced_pod = Mock()
self.api.read_namespaced_pod = Mock()
self.api.delete_namespaced_pod = Mock()
self.api.read_namespaced_pod_log = Mock()

#pod creating step
self.api.create_namespaced_pod.return_value = None

#Checking pod status
mock_pod = Mock()
mock_pod.status.phase = "Running"
self.api.read_namespaced_pod.return_value = mock_pod

# Printing at pod output
self.api.read_namespaced_pod_log.return_value = iter(["Hello from Docker\n"])

# Wait for the pod to reach 'Running'
timeout = 60
start_time = time.time()
while time.time() - start_time < timeout:
pod = self.api.read_namespaced_pod(name=pod_name, namespace=self.namespace)
if pod.status.phase == "Running":
break
time.sleep(2)
else:
self.fail("Pod did not reach 'Running' state within timeout")

# Reading and streaming logs using Watch (mocked)
w = Watch()
log_output = []
#Mock logs used for this test
w.stream = Mock(return_value=[
"Hello from Docker",
"",
"",
"\n\n",
"Another log line",
"",
"\n",
"Final log"
])
for event in w.stream(self.api.read_namespaced_pod_log, name=pod_name, namespace=self.namespace, follow=True):
log_output.append(event)
print(event)

# Print outputs
print(f"Captured logs: {log_output}")
# self.assertTrue(any("Hello from Docker" in line for line in log_output))
# self.assertTrue(any(line.strip() == "" for line in log_output), "No empty lines found in logs")
expected_log = [
"Hello from Docker",
"",
"",
"\n\n",
"Another log line",
"",
"\n",
"Final log"
]

self.assertEqual(log_output, expected_log, "Captured logs do not match expected logs")

except ApiException as e:
self.fail(f"Kubernetes API exception: {e}")
finally:
#checking pod is calling for delete
self.api.delete_namespaced_pod(name=pod_name, namespace=self.namespace)
self.api.delete_namespaced_pod.assert_called_once_with(name=pod_name, namespace=self.namespace)

if __name__ == '__main__':
unittest.main()