Skip to content

Commit 85698b4

Browse files
committed
add support for feeding gstreamer pipelines into folk image frames
1 parent ff165ca commit 85698b4

File tree

2 files changed

+234
-0
lines changed

2 files changed

+234
-0
lines changed

test/gstreamer.tcl

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
loadVirtualPrograms [list "virtual-programs/gstreamer.folk" "virtual-programs/images.folk"]
2+
Step
3+
4+
# namespace eval Pipeline $::makePipeline
5+
# set pl [Pipeline::create "videotestsrc"]
6+
# set img [Pipeline::frame $pl]
7+
# Pipeline::freeImage $img
8+
# Pipeline::destroy $pl
9+
10+
Step
11+
12+
When the gstreamer pipeline "videotestsrc" frame is /frame/ at /ts/ {
13+
Wish the web server handles route "/gst-image/$" with handler [list apply {{im} {
14+
# set width [dict get $im width]
15+
# set height [dict get $im height]
16+
set filename "/tmp/web-image-frame.png"
17+
image saveAsPng $im $filename
18+
set fsize [file size $filename]
19+
set fd [open $filename r]
20+
fconfigure $fd -encoding binary -translation binary
21+
set body [read $fd $fsize]
22+
close $fd
23+
dict create statusAndHeaders "HTTP/1.1 200 OK\nConnection: close\nContent-Type: image/png\nContent-Length: $fsize\n\n" body $body
24+
}} $frame]
25+
}
26+
27+
Step
28+
29+
forever { Step }

virtual-programs/gstreamer.folk

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
set makePipeline {
2+
rename [c create] cc
3+
4+
cc cflags {*}[exec pkg-config --cflags --libs gstreamer-1.0]
5+
cc include <gst/gst.h>
6+
cc include <assert.h>
7+
8+
proc defineGObjectType {cc type cast} {
9+
set cc [uplevel {namespace current}]::$cc
10+
$cc argtype $type* [format {
11+
%s* $argname;
12+
GObject* _$argname;
13+
sscanf(Tcl_GetString($obj), "(%s) 0x%%p", &_$argname);
14+
$argname = %s(_$argname);
15+
} $type $type $cast]
16+
17+
# Tcl_ObjPrintf doesn't work with %lld/%llx for some reason,
18+
# so we do it by hand.
19+
$cc rtype $type* [format {
20+
$robj = Tcl_ObjPrintf("(%s) 0x%%" PRIxPTR, (uintptr_t) G_OBJECT($rvalue));
21+
} $type]
22+
}
23+
24+
defineImageType cc
25+
defineGObjectType cc GstElement GST_ELEMENT
26+
defineGObjectType cc GstBus GST_BUS
27+
28+
cc struct pipeline_t {
29+
GstElement* pipeline;
30+
GstElement* sink;
31+
GstBus* bus;
32+
}
33+
34+
cc struct frame_t {
35+
bool valid;
36+
uint64_t timestamp;
37+
image_t image;
38+
}
39+
40+
cc code {
41+
void quit(const char* msg) {
42+
fprintf(stderr, "[%s] %d: %s\n", msg, errno, strerror(errno));
43+
exit(1);
44+
}
45+
46+
void log_messages(GstBus* bus) {
47+
GstMessage* msg;
48+
GError *err = NULL;
49+
gchar *dbg_info = NULL;
50+
while ((msg = gst_bus_pop_filtered(bus, GST_MESSAGE_ERROR | GST_MESSAGE_WARNING))) {
51+
switch (GST_MESSAGE_TYPE (msg)) {
52+
case GST_MESSAGE_ERROR: {
53+
gst_message_parse_error(msg, &err, &dbg_info);
54+
g_printerr("ERROR from element %s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
55+
g_printerr("Debugging info: %s\n", (dbg_info) ? dbg_info : "none");
56+
g_error_free(err);
57+
g_free(dbg_info);
58+
break;
59+
}
60+
case GST_MESSAGE_WARNING: {
61+
gst_message_parse_warning(msg, &err, &dbg_info);
62+
g_printerr("WARNING from element %s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
63+
g_printerr("Debugging info: %s\n", (dbg_info) ? dbg_info : "none");
64+
g_error_free(err);
65+
g_free(dbg_info);
66+
break;
67+
}
68+
default:
69+
break;
70+
}
71+
}
72+
}
73+
}
74+
75+
cc proc destroy {pipeline_t* p} void {
76+
gst_object_unref(p->bus);
77+
gst_object_unref(p->sink);
78+
gst_element_set_state(p->pipeline, GST_STATE_NULL);
79+
gst_object_unref(p->pipeline);
80+
ckfree(p);
81+
}
82+
83+
cc proc create {char* srcdec} pipeline_t* {
84+
GError* err = NULL;
85+
gst_init(NULL, NULL);
86+
87+
char buf[512];
88+
snprintf(buf, sizeof(buf), "%s ! videoconvert ! appsink caps=video/x-raw,format=RGBA name=output", srcdec);
89+
GstElement* pipeline = gst_parse_launch(buf, &err);
90+
if (err) {
91+
g_printerr("ERROR launching gst pipeline: %s\n", err->message);
92+
return NULL;
93+
}
94+
95+
pipeline_t* p = ckalloc(sizeof (pipeline_t));
96+
p->pipeline = pipeline;
97+
p->bus = gst_element_get_bus(p->pipeline);
98+
p->sink = gst_bin_get_by_name(GST_BIN(p->pipeline), "output");
99+
100+
GstState state;
101+
gst_element_set_state(p->pipeline, GST_STATE_PLAYING);
102+
gst_element_get_state(p->pipeline, &state, NULL, GST_CLOCK_TIME_NONE);
103+
log_messages(p->bus);
104+
105+
if (state != GST_STATE_PLAYING) {
106+
g_printerr("ERROR launching gst pipeline: pipeline failed to start\n");
107+
destroy(p);
108+
return NULL;
109+
}
110+
111+
return p;
112+
}
113+
114+
if {[namespace exists ::Heap]} {
115+
cc import ::Heap::cc folkHeapAlloc as folkHeapAlloc
116+
cc import ::Heap::cc folkHeapFree as folkHeapFree
117+
} else {
118+
cc code {
119+
#define folkHeapAlloc malloc
120+
#define folkHeapFree free
121+
}
122+
}
123+
cc proc frame {pipeline_t* p} frame_t {
124+
frame_t frame;
125+
frame.valid = TRUE;
126+
127+
GstSample* sample;
128+
g_signal_emit_by_name(p->sink, "pull-sample", &sample);
129+
if (!sample) {
130+
frame.valid = FALSE;
131+
return frame;
132+
}
133+
134+
GstCaps* caps = gst_sample_get_caps(sample);
135+
// gst_println("caps are %" GST_PTR_FORMAT, caps);
136+
137+
GstStructure* s = gst_caps_get_structure(caps, 0);
138+
assert(gst_structure_get_int(s, "width", (gint*)&frame.image.width));
139+
assert(gst_structure_get_int(s, "height", (gint*)&frame.image.height));
140+
const gchar* format = gst_structure_get_string(s, "format");
141+
if (g_str_equal(format, "RGB")) {
142+
frame.image.components = 3;
143+
} else if (g_str_equal(format, "RGBA")) {
144+
frame.image.components = 4;
145+
} else {
146+
fprintf(stderr, "frame: invalid cap format '%s'\n", format);
147+
assert(0);
148+
}
149+
frame.image.bytesPerRow = frame.image.width * frame.image.components;
150+
151+
GstMapInfo map;
152+
GstBuffer* buffer = gst_sample_get_buffer(sample);
153+
gst_buffer_map(buffer, &map, GST_MAP_READ);
154+
155+
frame.image.data = folkHeapAlloc(map.size);
156+
memmove(frame.image.data, map.data, map.size);
157+
frame.timestamp = (uint64_t) GST_BUFFER_DTS(buffer);
158+
159+
gst_buffer_unmap(buffer, &map);
160+
gst_sample_unref(sample);
161+
162+
return frame;
163+
}
164+
165+
cc proc freeImage {image_t image} void {
166+
folkHeapFree(image.data);
167+
}
168+
169+
cc compile
170+
}
171+
172+
set ::pipelineIndex 0
173+
When when the gstreamer pipeline /pl/ frame is /frame/ at /ts/ /lambda/ with environment /e/ {
174+
Start process "gstreamer-[incr ::pipelineIndex]" {
175+
puts "starting gst-$pl"
176+
Wish $::thisProcess shares statements like \
177+
[list /someone/ claims the gstreamer pipeline /...anything/]
178+
179+
namespace eval Pipeline $makePipeline
180+
181+
set pipe [Pipeline::create $pl]
182+
183+
set ::oldFrames [list]
184+
When $::thisProcess has step count /c/ {
185+
set frame [Pipeline::frame $pipe]
186+
dict with frame {
187+
if {!$valid} {
188+
Commit {}
189+
return
190+
}
191+
192+
Commit {
193+
Claim the gstreamer pipeline $pl time is $timestamp
194+
Claim the gstreamer pipeline $pl frame is $image at [clock milliseconds]
195+
}
196+
197+
lappend ::oldFrames $image
198+
if {[llength $::oldFrames] >= 10} {
199+
set ::oldFrames [lassign $::oldFrames oldestFrame]
200+
Pipeline::freeImage $oldestFrame
201+
}
202+
}
203+
}
204+
}
205+
}

0 commit comments

Comments
 (0)